feature(krata): prepare for workload rework (#276)

* chore(code): simple code cleanup

* chore(code): additional code cleanup

* feature(krata): rework api and make ip assignment persistent to database

* rework and cleanup

* fix daemon config references
This commit is contained in:
Alex Zenla
2024-08-13 23:17:47 -07:00
committed by GitHub
parent 2a107a370f
commit 01a94ad23e
41 changed files with 1227 additions and 873 deletions

566
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@ use clap::{Parser, ValueEnum};
use comfy_table::presets::UTF8_FULL_CONDENSED;
use comfy_table::{Cell, Table};
use krata::v1::control::{
control_service_client::ControlServiceClient, HostCpuTopologyClass, HostCpuTopologyRequest,
control_service_client::ControlServiceClient, GetHostCpuTopologyRequest, HostCpuTopologyClass,
};
use tonic::{transport::Channel, Request};
@ -31,7 +31,7 @@ pub struct HostCpuTopologyCommand {
impl HostCpuTopologyCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.get_host_cpu_topology(Request::new(HostCpuTopologyRequest {}))
.get_host_cpu_topology(Request::new(GetHostCpuTopologyRequest {}))
.await?
.into_inner();

View File

@ -1,17 +1,17 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, IdentifyHostRequest};
use krata::v1::control::{control_service_client::ControlServiceClient, HostStatusRequest};
use tonic::{transport::Channel, Request};
#[derive(Parser)]
#[command(about = "Identify information about the host")]
pub struct HostIdentifyCommand {}
#[command(about = "Get information about the host")]
pub struct HostStatusCommand {}
impl HostIdentifyCommand {
impl HostStatusCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.identify_host(Request::new(IdentifyHostRequest {}))
.host_status(Request::new(HostStatusRequest {}))
.await?
.into_inner();
println!("Host UUID: {}", response.host_uuid);

View File

@ -6,7 +6,7 @@ use krata::events::EventStream;
use krata::v1::control::control_service_client::ControlServiceClient;
use crate::cli::host::cpu_topology::HostCpuTopologyCommand;
use crate::cli::host::identify::HostIdentifyCommand;
use crate::cli::host::identify::HostStatusCommand;
use crate::cli::host::idm_snoop::HostIdmSnoopCommand;
pub mod cpu_topology;
@ -33,7 +33,7 @@ impl HostCommand {
#[derive(Subcommand)]
pub enum HostCommands {
CpuTopology(HostCpuTopologyCommand),
Identify(HostIdentifyCommand),
Status(HostStatusCommand),
IdmSnoop(HostIdmSnoopCommand),
}
@ -46,7 +46,7 @@ impl HostCommands {
match self {
HostCommands::CpuTopology(cpu_topology) => cpu_topology.run(client).await,
HostCommands::Identify(identify) => identify.run(client).await,
HostCommands::Status(status) => status.run(client).await,
HostCommands::IdmSnoop(snoop) => snoop.run(client, events).await,
}

View File

@ -12,7 +12,7 @@ use clap::Parser;
use krata::{
client::ControlClientProvider,
events::EventStream,
v1::control::{control_service_client::ControlServiceClient, ResolveZoneRequest},
v1::control::{control_service_client::ControlServiceClient, ResolveZoneIdRequest},
};
use tonic::{transport::Channel, Request};
@ -51,7 +51,7 @@ impl ControlCommand {
ControlCommands::Device(device) => device.run(client, events).await,
ControlCommands::Host(snoop) => snoop.run(client, events).await,
ControlCommands::Host(host) => host.run(client, events).await,
}
}
}
@ -61,14 +61,14 @@ pub async fn resolve_zone(
name: &str,
) -> Result<String> {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest {
.resolve_zone_id(Request::new(ResolveZoneIdRequest {
name: name.to_string(),
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
Ok(zone.id)
if !reply.zone_id.is_empty() {
Ok(reply.zone_id)
} else {
Err(anyhow!("unable to resolve zone '{}'", name))
}

View File

@ -2,20 +2,16 @@ use anyhow::Result;
use clap::Parser;
use krata::{
events::EventStream,
v1::{
common::ZoneStatus,
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
DestroyZoneRequest,
},
v1::control::{
control_service_client::ControlServiceClient, watch_events_reply::Event, DestroyZoneRequest,
},
};
use crate::cli::resolve_zone;
use krata::v1::common::ZoneState;
use log::error;
use tonic::{transport::Channel, Request};
use crate::cli::resolve_zone;
#[derive(Parser)]
#[command(about = "Destroy a zone")]
pub struct ZoneDestroyCommand {
@ -61,12 +57,12 @@ async fn wait_zone_destroyed(id: &str, events: EventStream) -> Result<()> {
continue;
}
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == ZoneStatus::Failed {
if let Some(ref error) = status.error_status {
if status.state() == ZoneState::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
@ -74,7 +70,7 @@ async fn wait_zone_destroyed(id: &str, events: EventStream) -> Result<()> {
}
}
if state.status() == ZoneStatus::Destroyed {
if status.state() == ZoneState::Destroyed {
std::process::exit(0);
}
}

View File

@ -5,7 +5,7 @@ use anyhow::Result;
use clap::Parser;
use krata::v1::{
common::{ZoneTaskSpec, ZoneTaskSpecEnvVar},
control::{control_service_client::ControlServiceClient, ExecZoneRequest},
control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest},
};
use tonic::{transport::Channel, Request};
@ -34,7 +34,7 @@ pub struct ZoneExecCommand {
impl ZoneExecCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let zone_id: String = resolve_zone(&mut client, &self.zone).await?;
let initial = ExecZoneRequest {
let initial = ExecInsideZoneRequest {
zone_id,
task: Some(ZoneTaskSpec {
environment: env_map(&self.env.unwrap_or_default())
@ -52,7 +52,10 @@ impl ZoneExecCommand {
let stream = StdioConsoleStream::stdin_stream_exec(initial).await;
let response = client.exec_zone(Request::new(stream)).await?.into_inner();
let response = client
.exec_inside_zone(Request::new(stream))
.await?
.into_inner();
let code = StdioConsoleStream::exec_output(response).await?;
std::process::exit(code);

View File

@ -7,7 +7,7 @@ use krata::{
v1::{
common::{
zone_image_spec::Image, OciImageFormat, ZoneImageSpec, ZoneOciImageSpec, ZoneSpec,
ZoneSpecDevice, ZoneStatus, ZoneTaskSpec, ZoneTaskSpecEnvVar,
ZoneSpecDevice, ZoneState, ZoneTaskSpec, ZoneTaskSpecEnvVar,
},
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
@ -120,7 +120,7 @@ impl ZoneLaunchCommand {
image: Some(image),
kernel,
initrd,
vcpus: self.cpus,
cpus: self.cpus,
mem: self.mem,
task: Some(ZoneTaskSpec {
environment: env_map(&self.env.unwrap_or_default())
@ -209,12 +209,12 @@ async fn wait_zone_started(id: &str, events: EventStream) -> Result<()> {
continue;
}
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == ZoneStatus::Failed {
if let Some(ref error) = status.error_status {
if status.state() == ZoneState::Failed {
error!("launch failed: {}", error.message);
std::process::exit(1);
} else {
@ -222,12 +222,12 @@ async fn wait_zone_started(id: &str, events: EventStream) -> Result<()> {
}
}
if state.status() == ZoneStatus::Destroyed {
if status.state() == ZoneState::Destroyed {
error!("zone destroyed");
std::process::exit(1);
}
if state.status() == ZoneStatus::Started {
if status.state() == ZoneState::Created {
break;
}
}

View File

@ -4,18 +4,19 @@ use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, Color, Table};
use krata::{
events::EventStream,
v1::{
common::{Zone, ZoneStatus},
common::Zone,
control::{
control_service_client::ControlServiceClient, ListZonesRequest, ResolveZoneRequest,
control_service_client::ControlServiceClient, ListZonesRequest, ResolveZoneIdRequest,
},
},
};
use crate::format::{kv2line, proto2dynamic, proto2kv, zone_simple_line, zone_state_text};
use krata::v1::common::ZoneState;
use krata::v1::control::GetZoneRequest;
use serde_json::Value;
use tonic::{transport::Channel, Request};
use crate::format::{kv2line, proto2dynamic, proto2kv, zone_simple_line, zone_status_text};
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum ZoneListFormat {
Table,
@ -44,11 +45,21 @@ impl ZoneListCommand {
) -> Result<()> {
let mut zones = if let Some(ref zone) = self.zone {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest { name: zone.clone() }))
.resolve_zone_id(Request::new(ResolveZoneIdRequest { name: zone.clone() }))
.await?
.into_inner();
if let Some(zone) = reply.zone {
vec![zone]
if !reply.zone_id.is_empty() {
let reply = client
.get_zone(Request::new(GetZoneRequest {
zone_id: reply.zone_id,
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
vec![zone]
} else {
return Err(anyhow!("unable to resolve zone '{}'", zone));
}
} else {
return Err(anyhow!("unable to resolve zone '{}'", zone));
}
@ -115,30 +126,30 @@ impl ZoneListCommand {
let mut table = Table::new();
table.load_preset(UTF8_FULL_CONDENSED);
table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic);
table.set_header(vec!["name", "uuid", "status", "ipv4", "ipv6"]);
table.set_header(vec!["name", "uuid", "state", "ipv4", "ipv6"]);
for zone in zones {
let ipv4 = zone
.state
.status
.as_ref()
.and_then(|x| x.network.as_ref())
.and_then(|x| x.network_status.as_ref())
.map(|x| x.zone_ipv4.as_str())
.unwrap_or("n/a");
let ipv6 = zone
.state
.status
.as_ref()
.and_then(|x| x.network.as_ref())
.and_then(|x| x.network_status.as_ref())
.map(|x| x.zone_ipv6.as_str())
.unwrap_or("n/a");
let Some(spec) = zone.spec else {
continue;
};
let status = zone.state.as_ref().cloned().unwrap_or_default().status();
let status_text = zone_status_text(status);
let state = zone.status.as_ref().cloned().unwrap_or_default().state();
let status_text = zone_state_text(state);
let status_color = match status {
ZoneStatus::Destroyed | ZoneStatus::Failed => Color::Red,
ZoneStatus::Destroying | ZoneStatus::Exited | ZoneStatus::Starting => Color::Yellow,
ZoneStatus::Started => Color::Green,
let status_color = match state {
ZoneState::Destroyed | ZoneState::Failed => Color::Red,
ZoneState::Destroying | ZoneState::Exited | ZoneState::Creating => Color::Yellow,
ZoneState::Created => Color::Green,
_ => Color::Reset,
};

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, ResolveZoneRequest};
use krata::v1::control::{control_service_client::ControlServiceClient, ResolveZoneIdRequest};
use tonic::{transport::Channel, Request};
@ -14,13 +14,13 @@ pub struct ZoneResolveCommand {
impl ZoneResolveCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest {
.resolve_zone_id(Request::new(ResolveZoneIdRequest {
name: self.zone.clone(),
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
println!("{}", zone.id);
if !reply.zone_id.is_empty() {
println!("{}", reply.zone_id);
} else {
std::process::exit(1);
}

View File

@ -24,7 +24,7 @@ use ratatui::{
};
use crate::{
format::zone_status_text,
format::zone_state_text,
metrics::{
lookup_metric_value, MultiMetricCollector, MultiMetricCollectorHandle, MultiMetricState,
},
@ -106,7 +106,7 @@ impl ZoneTopApp {
break;
}
}
};
}
}
Ok(())
}
@ -157,7 +157,7 @@ impl Widget for &mut ZoneTopApp {
continue;
};
let Some(ref state) = ms.zone.state else {
let Some(ref status) = ms.zone.status else {
continue;
};
@ -177,7 +177,7 @@ impl Widget for &mut ZoneTopApp {
let row = Row::new(vec![
spec.name.clone(),
ms.zone.id.clone(),
zone_status_text(state.status()),
zone_state_text(status.state()),
memory_total.unwrap_or_default(),
memory_used.unwrap_or_default(),
memory_free.unwrap_or_default(),

View File

@ -4,14 +4,12 @@ use crossterm::{
terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled},
tty::IsTty,
};
use krata::v1::common::ZoneState;
use krata::{
events::EventStream,
v1::{
common::ZoneStatus,
control::{
watch_events_reply::Event, ExecZoneReply, ExecZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
v1::control::{
watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
};
use log::debug;
@ -49,8 +47,8 @@ impl StdioConsoleStream {
}
pub async fn stdin_stream_exec(
initial: ExecZoneRequest,
) -> impl Stream<Item = ExecZoneRequest> {
initial: ExecInsideZoneRequest,
) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin();
stream! {
yield initial;
@ -68,7 +66,7 @@ impl StdioConsoleStream {
if size == 1 && buffer[0] == 0x1d {
break;
}
yield ExecZoneRequest { zone_id: String::default(), task: None, data };
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, data };
}
}
}
@ -90,7 +88,7 @@ impl StdioConsoleStream {
Ok(())
}
pub async fn exec_output(mut stream: Streaming<ExecZoneReply>) -> Result<i32> {
pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>) -> Result<i32> {
let mut stdout = stdout();
let mut stderr = stderr();
while let Some(reply) = stream.next().await {
@ -128,7 +126,7 @@ impl StdioConsoleStream {
continue;
};
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
@ -136,12 +134,12 @@ impl StdioConsoleStream {
continue;
}
if let Some(exit_info) = state.exit_info {
return Some(exit_info.code);
if let Some(exit_status) = status.exit_status {
return Some(exit_status.code);
}
let status = state.status();
if status == ZoneStatus::Destroying || status == ZoneStatus::Destroyed {
let state = status.state();
if state == ZoneState::Destroying || state == ZoneState::Destroyed {
return Some(10);
}
}

View File

@ -3,11 +3,12 @@ use std::{collections::HashMap, time::Duration};
use anyhow::Result;
use fancy_duration::FancyDuration;
use human_bytes::human_bytes;
use krata::v1::common::{Zone, ZoneMetricFormat, ZoneMetricNode, ZoneStatus};
use prost_reflect::{DynamicMessage, ReflectMessage};
use prost_types::Value;
use termtree::Tree;
use krata::v1::common::{Zone, ZoneMetricFormat, ZoneMetricNode, ZoneState};
pub fn proto2dynamic(proto: impl ReflectMessage) -> Result<DynamicMessage> {
Ok(DynamicMessage::decode(
proto.descriptor(),
@ -75,30 +76,30 @@ pub fn kv2line(map: HashMap<String, String>) -> String {
.join(" ")
}
pub fn zone_status_text(status: ZoneStatus) -> String {
pub fn zone_state_text(status: ZoneState) -> String {
match status {
ZoneStatus::Starting => "starting",
ZoneStatus::Started => "started",
ZoneStatus::Destroying => "destroying",
ZoneStatus::Destroyed => "destroyed",
ZoneStatus::Exited => "exited",
ZoneStatus::Failed => "failed",
ZoneState::Creating => "creating",
ZoneState::Created => "created",
ZoneState::Destroying => "destroying",
ZoneState::Destroyed => "destroyed",
ZoneState::Exited => "exited",
ZoneState::Failed => "failed",
_ => "unknown",
}
.to_string()
}
pub fn zone_simple_line(zone: &Zone) -> String {
let state = zone_status_text(
zone.state
let state = zone_state_text(
zone.status
.as_ref()
.map(|x| x.status())
.unwrap_or(ZoneStatus::Unknown),
.map(|x| x.state())
.unwrap_or(ZoneState::Unknown),
);
let name = zone.spec.as_ref().map(|x| x.name.as_str()).unwrap_or("");
let network = zone.state.as_ref().and_then(|x| x.network.as_ref());
let ipv4 = network.map(|x| x.zone_ipv4.as_str()).unwrap_or("");
let ipv6 = network.map(|x| x.zone_ipv6.as_str()).unwrap_or("");
let network_status = zone.status.as_ref().and_then(|x| x.network_status.as_ref());
let ipv4 = network_status.map(|x| x.zone_ipv4.as_str()).unwrap_or("");
let ipv6 = network_status.map(|x| x.zone_ipv6.as_str()).unwrap_or("");
format!("{}\t{}\t{}\t{}\t{}", zone.id, state, name, ipv4, ipv6)
}

View File

@ -1,8 +1,10 @@
use crate::format::metrics_value_pretty;
use anyhow::Result;
use krata::v1::common::ZoneState;
use krata::{
events::EventStream,
v1::{
common::{Zone, ZoneMetricNode, ZoneStatus},
common::{Zone, ZoneMetricNode},
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
ListZonesRequest, ReadZoneMetricsRequest,
@ -19,8 +21,6 @@ use tokio::{
};
use tonic::transport::Channel;
use crate::format::metrics_value_pretty;
pub struct MetricState {
pub zone: Zone,
pub root: Option<ZoneMetricNode>,
@ -86,11 +86,11 @@ impl MultiMetricCollector {
let Some(zone) = changed.zone else {
continue;
};
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
zones.retain(|x| x.id != zone.id);
if state.status() != ZoneStatus::Destroying {
if status.state() != ZoneState::Destroying {
zones.push(zone);
}
false
@ -112,11 +112,11 @@ impl MultiMetricCollector {
let mut metrics = Vec::new();
for zone in &zones {
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
if state.status() != ZoneStatus::Started {
if status.state() != ZoneState::Created {
continue;
}

View File

@ -9,6 +9,7 @@ edition = "2021"
resolver = "2"
[dependencies]
krata-advmac = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
@ -17,6 +18,7 @@ circular-buffer = { workspace = true }
clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata-oci = { path = "../oci", version = "^0.0.15" }
krata-runtime = { path = "../runtime", version = "^0.0.15" }
@ -25,6 +27,7 @@ prost = { workspace = true }
redb = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
signal-hook = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }

View File

@ -10,6 +10,8 @@ pub struct DaemonConfig {
pub oci: OciConfig,
#[serde(default)]
pub pci: DaemonPciConfig,
#[serde(default = "default_network")]
pub network: DaemonNetworkConfig,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
@ -49,6 +51,65 @@ pub enum DaemonPciDeviceRdmReservePolicy {
Relaxed,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonNetworkConfig {
#[serde(default = "default_network_nameservers")]
pub nameservers: Vec<String>,
#[serde(default = "default_network_ipv4")]
pub ipv4: DaemonIpv4NetworkConfig,
#[serde(default = "default_network_ipv6")]
pub ipv6: DaemonIpv6NetworkConfig,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonIpv4NetworkConfig {
#[serde(default = "default_network_ipv4_subnet")]
pub subnet: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonIpv6NetworkConfig {
#[serde(default = "default_network_ipv6_subnet")]
pub subnet: String,
}
fn default_network() -> DaemonNetworkConfig {
DaemonNetworkConfig {
nameservers: default_network_nameservers(),
ipv4: default_network_ipv4(),
ipv6: default_network_ipv6(),
}
}
fn default_network_nameservers() -> Vec<String> {
vec![
"1.1.1.1".to_string(),
"1.0.0.1".to_string(),
"2606:4700:4700::1111".to_string(),
"2606:4700:4700::1001".to_string(),
]
}
fn default_network_ipv4() -> DaemonIpv4NetworkConfig {
DaemonIpv4NetworkConfig {
subnet: default_network_ipv4_subnet(),
}
}
fn default_network_ipv4_subnet() -> String {
"10.75.80.0/24".to_string()
}
fn default_network_ipv6() -> DaemonIpv6NetworkConfig {
DaemonIpv6NetworkConfig {
subnet: default_network_ipv6_subnet(),
}
}
fn default_network_ipv6_subnet() -> String {
"fdd4:1476:6c7e::/48".to_string()
}
impl DaemonConfig {
pub async fn load(path: &Path) -> Result<DaemonConfig> {
if path.exists() {

View File

@ -1,5 +1,15 @@
use crate::db::zone::ZoneStore;
use crate::{
command::DaemonCommand, console::DaemonConsoleHandle, devices::DaemonDeviceManager,
event::DaemonEventContext, idm::DaemonIdmHandle, metrics::idm_metric_to_api,
oci::convert_oci_progress, zlt::ZoneLookupTable,
};
use async_stream::try_stream;
use futures::Stream;
use krata::v1::control::{
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
SetHostPowerManagementPolicyRequest,
};
use krata::{
idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType,
@ -10,11 +20,11 @@ use krata::{
common::{OciImageFormat, Zone, ZoneState, ZoneStatus},
control::{
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest,
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecZoneReply, ExecZoneRequest,
HostCpuTopologyInfo, HostCpuTopologyReply, HostCpuTopologyRequest,
HostPowerManagementPolicy, IdentifyHostReply, IdentifyHostRequest, ListDevicesReply,
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecInsideZoneReply,
ExecInsideZoneRequest, GetHostCpuTopologyReply, GetHostCpuTopologyRequest,
HostCpuTopologyInfo, HostStatusReply, HostStatusRequest, ListDevicesReply,
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
ReadZoneMetricsReply, ReadZoneMetricsRequest, ResolveZoneReply, ResolveZoneRequest,
ReadZoneMetricsReply, ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest,
SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
@ -36,12 +46,6 @@ use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid;
use crate::{
command::DaemonCommand, console::DaemonConsoleHandle, db::ZoneStore,
devices::DaemonDeviceManager, event::DaemonEventContext, idm::DaemonIdmHandle,
metrics::idm_metric_to_api, oci::convert_oci_progress, zlt::ZoneLookupTable,
};
pub struct ApiError {
message: String,
}
@ -112,8 +116,8 @@ enum PullImageSelect {
#[tonic::async_trait]
impl ControlService for DaemonControlService {
type ExecZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecZoneReply, Status>> + Send + 'static>>;
type ExecInsideZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
type AttachZoneConsoleStream =
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
@ -127,12 +131,12 @@ impl ControlService for DaemonControlService {
type SnoopIdmStream =
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
async fn identify_host(
async fn host_status(
&self,
request: Request<IdentifyHostRequest>,
) -> Result<Response<IdentifyHostReply>, Status> {
request: Request<HostStatusRequest>,
) -> Result<Response<HostStatusReply>, Status> {
let _ = request.into_inner();
Ok(Response::new(IdentifyHostReply {
Ok(Response::new(HostStatusReply {
host_domid: self.glt.host_domid(),
host_uuid: self.glt.host_uuid().to_string(),
krata_version: DaemonCommand::version(),
@ -156,11 +160,11 @@ impl ControlService for DaemonControlService {
uuid,
Zone {
id: uuid.to_string(),
state: Some(ZoneState {
status: ZoneStatus::Starting.into(),
network: None,
exit_info: None,
error_info: None,
status: Some(ZoneStatus {
state: ZoneState::Creating.into(),
network_status: None,
exit_status: None,
error_status: None,
host: self.glt.host_uuid().to_string(),
domid: u32::MAX,
}),
@ -180,10 +184,10 @@ impl ControlService for DaemonControlService {
}))
}
async fn exec_zone(
async fn exec_inside_zone(
&self,
request: Request<Streaming<ExecZoneRequest>>,
) -> Result<Response<Self::ExecZoneStream>, Status> {
request: Request<Streaming<ExecInsideZoneRequest>>,
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
let mut input = request.into_inner();
let Some(request) = input.next().await else {
return Err(ApiError {
@ -232,7 +236,7 @@ impl ControlService for DaemonControlService {
loop {
select! {
x = input.next() => if let Some(update) = x {
let update: Result<ExecZoneRequest, Status> = update.map_err(|error| ApiError {
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
message: error.to_string()
}.into());
@ -252,7 +256,7 @@ impl ControlService for DaemonControlService {
let Some(IdmResponseType::ExecStream(update)) = response.response else {
break;
};
let reply = ExecZoneReply {
let reply = ExecInsideZoneReply {
exited: update.exited,
error: update.error,
exit_code: update.exit_code,
@ -265,11 +269,11 @@ impl ControlService for DaemonControlService {
break;
}
}
};
}
}
};
Ok(Response::new(Box::pin(output) as Self::ExecZoneStream))
Ok(Response::new(Box::pin(output) as Self::ExecInsideZoneStream))
}
async fn destroy_zone(
@ -287,16 +291,16 @@ impl ControlService for DaemonControlService {
.into());
};
zone.state = Some(zone.state.as_mut().cloned().unwrap_or_default());
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
if zone.state.as_ref().unwrap().status() == ZoneStatus::Destroyed {
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
return Err(ApiError {
message: "zone already destroyed".to_string(),
}
.into());
}
zone.state.as_mut().unwrap().status = ZoneStatus::Destroying.into();
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
self.zones
.update(uuid, zone)
.await
@ -320,10 +324,10 @@ impl ControlService for DaemonControlService {
Ok(Response::new(ListZonesReply { zones }))
}
async fn resolve_zone(
async fn resolve_zone_id(
&self,
request: Request<ResolveZoneRequest>,
) -> Result<Response<ResolveZoneReply>, Status> {
request: Request<ResolveZoneIdRequest>,
) -> Result<Response<ResolveZoneIdReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zones = zones
@ -334,8 +338,8 @@ impl ControlService for DaemonControlService {
|| x.id == request.name
})
.collect::<Vec<Zone>>();
Ok(Response::new(ResolveZoneReply {
zone: zones.first().cloned(),
Ok(Response::new(ResolveZoneIdReply {
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
}))
}
@ -558,8 +562,8 @@ impl ControlService for DaemonControlService {
async fn get_host_cpu_topology(
&self,
request: Request<HostCpuTopologyRequest>,
) -> Result<Response<HostCpuTopologyReply>, Status> {
request: Request<GetHostCpuTopologyRequest>,
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
let _ = request.into_inner();
let power = self
.runtime
@ -579,13 +583,13 @@ impl ControlService for DaemonControlService {
})
}
Ok(Response::new(HostCpuTopologyReply { cpus }))
Ok(Response::new(GetHostCpuTopologyReply { cpus }))
}
async fn set_host_power_management_policy(
&self,
request: Request<HostPowerManagementPolicy>,
) -> Result<Response<HostPowerManagementPolicy>, Status> {
request: Request<SetHostPowerManagementPolicyRequest>,
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
let policy = request.into_inner();
let power = self
.runtime
@ -603,9 +607,20 @@ impl ControlService for DaemonControlService {
.await
.map_err(ApiError::from)?;
Ok(Response::new(HostPowerManagementPolicy {
scheduler: scheduler.to_string(),
smt_awareness: policy.smt_awareness,
Ok(Response::new(SetHostPowerManagementPolicyReply {}))
}
async fn get_zone(
&self,
request: Request<GetZoneRequest>,
) -> Result<Response<GetZoneReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zone = zones.get(&Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?);
Ok(Response::new(GetZoneReply {
zone: zone.cloned(),
}))
}
}

118
crates/daemon/src/db/ip.rs Normal file
View File

@ -0,0 +1,118 @@
use crate::db::KrataDatabase;
use advmac::MacAddr6;
use anyhow::Result;
use log::error;
use redb::{ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use uuid::Uuid;
const IP_RESERVATION_TABLE: TableDefinition<u128, &[u8]> = TableDefinition::new("ip-reservation");
#[derive(Clone)]
pub struct IpReservationStore {
db: KrataDatabase,
}
impl IpReservationStore {
pub fn open(db: KrataDatabase) -> Result<Self> {
let write = db.database.begin_write()?;
let _ = write.open_table(IP_RESERVATION_TABLE);
write.commit()?;
Ok(IpReservationStore { db })
}
pub async fn read(&self, id: Uuid) -> Result<Option<IpReservation>> {
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
let Some(entry) = table.get(id.to_u128_le())? else {
return Ok(None);
};
let bytes = entry.value();
Ok(Some(serde_json::from_slice(bytes)?))
}
pub async fn list(&self) -> Result<HashMap<Uuid, IpReservation>> {
enum ListEntry {
Valid(Uuid, IpReservation),
Invalid(Uuid),
}
let mut reservations: HashMap<Uuid, IpReservation> = HashMap::new();
let corruptions = {
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
table
.iter()?
.flat_map(|result| {
result.map(|(key, value)| {
let uuid = Uuid::from_u128_le(key.value());
match serde_json::from_slice::<IpReservation>(value.value()) {
Ok(reservation) => ListEntry::Valid(uuid, reservation),
Err(error) => {
error!(
"found invalid ip reservation in database for uuid {}: {}",
uuid, error
);
ListEntry::Invalid(uuid)
}
}
})
})
.filter_map(|entry| match entry {
ListEntry::Valid(uuid, reservation) => {
reservations.insert(uuid, reservation);
None
}
ListEntry::Invalid(uuid) => Some(uuid),
})
.collect::<Vec<Uuid>>()
};
if !corruptions.is_empty() {
let write = self.db.database.begin_write()?;
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
for corruption in corruptions {
table.remove(corruption.to_u128_le())?;
}
}
Ok(reservations)
}
pub async fn update(&self, id: Uuid, entry: IpReservation) -> Result<()> {
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
let bytes = serde_json::to_vec(&entry)?;
table.insert(id.to_u128_le(), bytes.as_slice())?;
}
write.commit()?;
Ok(())
}
pub async fn remove(&self, id: Uuid) -> Result<()> {
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
table.remove(id.to_u128_le())?;
}
write.commit()?;
Ok(())
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IpReservation {
pub uuid: String,
pub ipv4: Ipv4Addr,
pub ipv6: Ipv6Addr,
pub mac: MacAddr6,
pub ipv4_prefix: u8,
pub ipv6_prefix: u8,
pub gateway_ipv4: Ipv4Addr,
pub gateway_ipv6: Ipv6Addr,
pub gateway_mac: MacAddr6,
}

View File

@ -0,0 +1,21 @@
use anyhow::Result;
use redb::Database;
use std::path::Path;
use std::sync::Arc;
pub mod ip;
pub mod zone;
#[derive(Clone)]
pub struct KrataDatabase {
pub database: Arc<Database>,
}
impl KrataDatabase {
pub fn open(path: &Path) -> Result<Self> {
let database = Database::create(path)?;
Ok(KrataDatabase {
database: Arc::new(database),
})
}
}

View File

@ -1,33 +1,31 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use std::collections::HashMap;
use crate::db::KrataDatabase;
use anyhow::Result;
use krata::v1::common::Zone;
use log::error;
use prost::Message;
use redb::{Database, ReadableTable, TableDefinition};
use redb::{ReadableTable, TableDefinition};
use uuid::Uuid;
const ZONES: TableDefinition<u128, &[u8]> = TableDefinition::new("zones");
const ZONE_TABLE: TableDefinition<u128, &[u8]> = TableDefinition::new("zone");
#[derive(Clone)]
pub struct ZoneStore {
database: Arc<Database>,
db: KrataDatabase,
}
impl ZoneStore {
pub fn open(path: &Path) -> Result<Self> {
let database = Database::create(path)?;
let write = database.begin_write()?;
let _ = write.open_table(ZONES);
pub fn open(db: KrataDatabase) -> Result<Self> {
let write = db.database.begin_write()?;
let _ = write.open_table(ZONE_TABLE);
write.commit()?;
Ok(ZoneStore {
database: Arc::new(database),
})
Ok(ZoneStore { db })
}
pub async fn read(&self, id: Uuid) -> Result<Option<Zone>> {
let read = self.database.begin_read()?;
let table = read.open_table(ZONES)?;
let read = self.db.database.begin_read()?;
let table = read.open_table(ZONE_TABLE)?;
let Some(entry) = table.get(id.to_u128_le())? else {
return Ok(None);
};
@ -37,8 +35,8 @@ impl ZoneStore {
pub async fn list(&self) -> Result<HashMap<Uuid, Zone>> {
let mut zones: HashMap<Uuid, Zone> = HashMap::new();
let read = self.database.begin_read()?;
let table = read.open_table(ZONES)?;
let read = self.db.database.begin_read()?;
let table = read.open_table(ZONE_TABLE)?;
for result in table.iter()? {
let (key, value) = result?;
let uuid = Uuid::from_u128_le(key.value());
@ -58,9 +56,9 @@ impl ZoneStore {
}
pub async fn update(&self, id: Uuid, entry: Zone) -> Result<()> {
let write = self.database.begin_write()?;
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(ZONES)?;
let mut table = write.open_table(ZONE_TABLE)?;
let bytes = entry.encode_to_vec();
table.insert(id.to_u128_le(), bytes.as_slice())?;
}
@ -69,9 +67,9 @@ impl ZoneStore {
}
pub async fn remove(&self, id: Uuid) -> Result<()> {
let write = self.database.begin_write()?;
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(ZONES)?;
let mut table = write.open_table(ZONE_TABLE)?;
table.remove(id.to_u128_le())?;
}
write.commit()?;

View File

@ -4,9 +4,10 @@ use std::{
time::Duration,
};
use crate::{db::ZoneStore, idm::DaemonIdmHandle};
use crate::db::zone::ZoneStore;
use crate::idm::DaemonIdmHandle;
use anyhow::Result;
use krata::v1::common::ZoneExitInfo;
use krata::v1::common::ZoneExitStatus;
use krata::{
idm::{internal::event::Event as EventType, internal::Event},
v1::common::{ZoneState, ZoneStatus},
@ -83,15 +84,15 @@ impl DaemonEventGenerator {
return Ok(());
};
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
return Ok(());
};
let status = state.status();
let state = status.state();
let id = Uuid::from_str(&zone.id)?;
let domid = state.domid;
match status {
ZoneStatus::Started => {
let domid = status.domid;
match state {
ZoneState::Created => {
if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client_by_domid(domid).await?;
let mut receiver = client.subscribe().await?;
@ -111,7 +112,7 @@ impl DaemonEventGenerator {
}
}
ZoneStatus::Destroyed => {
ZoneState::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
@ -131,13 +132,13 @@ impl DaemonEventGenerator {
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
if let Some(mut zone) = self.zones.read(id).await? {
zone.state = Some(ZoneState {
status: ZoneStatus::Exited.into(),
network: zone.state.clone().unwrap_or_default().network,
exit_info: Some(ZoneExitInfo { code }),
error_info: None,
host: zone.state.clone().map(|x| x.host).unwrap_or_default(),
domid: zone.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
zone.status = Some(ZoneStatus {
state: ZoneState::Exited.into(),
network_status: zone.status.clone().unwrap_or_default().network_status,
exit_status: Some(ZoneExitStatus { code }),
error_status: None,
host: zone.status.clone().map(|x| x.host).unwrap_or_default(),
domid: zone.status.clone().map(|x| x.domid).unwrap_or(u32::MAX),
});
self.zones.update(id, zone).await?;

View File

@ -85,13 +85,13 @@ pub struct DaemonIdm {
impl DaemonIdm {
pub async fn new(glt: ZoneLookupTable) -> Result<DaemonIdm> {
debug!("allocating channel for IDM");
debug!("allocating channel service for idm");
let (service, tx_raw_sender, rx_receiver) =
ChannelService::new("krata-channel".to_string(), None).await?;
let (tx_sender, tx_receiver) = channel(100);
let (snoop_sender, _) = broadcast::channel(100);
debug!("starting channel service");
debug!("starting idm channel service");
let task = service.launch().await?;
let clients = Arc::new(Mutex::new(HashMap::new()));
@ -133,52 +133,99 @@ impl DaemonIdm {
})
}
async fn process_rx_packet(
&mut self,
domid: u32,
data: Option<Vec<u8>>,
buffers: &mut HashMap<u32, BytesMut>,
) -> Result<()> {
// check if data is present, if it is not, that signals a closed channel.
if let Some(data) = data {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
buffer.extend_from_slice(&data);
loop {
// check if the buffer is less than the header size, if so, wait for more data
if buffer.len() < 6 {
break;
}
// check for the magic bytes 0xff, 0xff at the start of the message, if that doesn't
// exist, clear the buffer. this ensures that partial messages won't be processed.
if buffer[0] != 0xff || buffer[1] != 0xff {
buffer.clear();
return Ok(());
}
// read the size from the buffer as a little endian u32
let size = (buffer[2] as u32
| (buffer[3] as u32) << 8
| (buffer[4] as u32) << 16
| (buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if buffer.len() < needed {
return Ok(());
}
let mut packet = buffer.split_to(needed);
// advance the buffer by the header, leaving only the raw data.
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
let _ =
client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds)
.await?;
let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet.clone());
}
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
from: domid,
to: 0,
packet,
});
}
Err(packet) => {
warn!("received invalid packet from domain {}: {}", domid, packet);
}
}
}
} else {
let mut clients = self.clients.lock().await;
let mut feeds = self.feeds.lock().await;
clients.remove(&domid);
feeds.remove(&domid);
}
Ok(())
}
async fn tx_packet(&mut self, domid: u32, packet: IdmTransportPacket) -> Result<()> {
let data = packet.encode_to_vec();
let mut buffer = vec![0u8; 6];
let length = data.len() as u32;
// magic bytes
buffer[0] = 0xff;
buffer[1] = 0xff;
// little endian u32 for message size
buffer[2] = length as u8;
buffer[3] = (length << 8) as u8;
buffer[4] = (length << 16) as u8;
buffer[5] = (length << 24) as u8;
buffer.extend_from_slice(&data);
self.tx_raw_sender.send((domid, buffer)).await?;
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
from: 0,
to: domid,
packet,
});
Ok(())
}
async fn process(&mut self, buffers: &mut HashMap<u32, BytesMut>) -> Result<()> {
loop {
select! {
x = self.rx_receiver.recv() => match x {
Some((domid, data)) => {
if let Some(data) = data {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
buffer.extend_from_slice(&data);
loop {
if buffer.len() < 6 {
break;
}
if buffer[0] != 0xff || buffer[1] != 0xff {
buffer.clear();
break;
}
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if buffer.len() < needed {
break;
}
let mut packet = buffer.split_to(needed);
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet.clone());
}
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet });
}
Err(packet) => {
warn!("received invalid packet from domain {}: {}", domid, packet);
}
}
}
} else {
let mut clients = self.clients.lock().await;
let mut feeds = self.feeds.lock().await;
clients.remove(&domid);
feeds.remove(&domid);
}
self.process_rx_packet(domid, data, buffers).await?;
},
None => {
@ -187,25 +234,14 @@ impl DaemonIdm {
},
x = self.tx_receiver.recv() => match x {
Some((domid, packet)) => {
let data = packet.encode_to_vec();
let mut buffer = vec![0u8; 6];
let length = data.len() as u32;
buffer[0] = 0xff;
buffer[1] = 0xff;
buffer[2] = length as u8;
buffer[3] = (length << 8) as u8;
buffer[4] = (length << 16) as u8;
buffer[5] = (length << 24) as u8;
buffer.extend_from_slice(&data);
self.tx_raw_sender.send((domid, buffer)).await?;
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet });
self.tx_packet(domid, packet).await?;
},
None => {
break;
}
}
};
}
}
Ok(())
}

View File

@ -0,0 +1,179 @@
use advmac::MacAddr6;
use anyhow::{anyhow, Result};
use ipnetwork::{Ipv4Network, Ipv6Network};
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::db::ip::{IpReservation, IpReservationStore};
#[derive(Default, Clone)]
pub struct IpAssignmentState {
pub ipv4: HashMap<Ipv4Addr, IpReservation>,
pub ipv6: HashMap<Ipv6Addr, IpReservation>,
}
#[derive(Clone)]
pub struct IpAssignment {
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
gateway_ipv4: Ipv4Addr,
gateway_ipv6: Ipv6Addr,
gateway_mac: MacAddr6,
store: IpReservationStore,
state: Arc<RwLock<IpAssignmentState>>,
}
impl IpAssignment {
pub async fn new(
host_uuid: Uuid,
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
store: IpReservationStore,
) -> Result<Self> {
let mut state = IpAssignment::fetch_current_state(&store).await?;
let reservation = if let Some(reservation) = store.read(host_uuid).await? {
reservation
} else {
IpAssignment::allocate(
&mut state,
&store,
host_uuid,
ipv4_network,
ipv6_network,
None,
None,
None,
)
.await?
};
let assignment = IpAssignment {
ipv4_network,
ipv6_network,
gateway_ipv4: reservation.ipv4,
gateway_ipv6: reservation.ipv6,
gateway_mac: reservation.gateway_mac,
store,
state: Arc::new(RwLock::new(state)),
};
Ok(assignment)
}
async fn fetch_current_state(store: &IpReservationStore) -> Result<IpAssignmentState> {
let reservations = store.list().await?;
let mut state = IpAssignmentState::default();
for reservation in reservations.values() {
state.ipv4.insert(reservation.ipv4, reservation.clone());
state.ipv6.insert(reservation.ipv6, reservation.clone());
}
Ok(state)
}
#[allow(clippy::too_many_arguments)]
async fn allocate(
state: &mut IpAssignmentState,
store: &IpReservationStore,
uuid: Uuid,
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
gateway_ipv4: Option<Ipv4Addr>,
gateway_ipv6: Option<Ipv6Addr>,
gateway_mac: Option<MacAddr6>,
) -> Result<IpReservation> {
let found_ipv4: Option<Ipv4Addr> = ipv4_network
.iter()
.filter(|ip| {
ip.is_private() && !(ip.is_loopback() || ip.is_multicast() || ip.is_broadcast())
})
.filter(|ip| {
let last = ip.octets()[3];
// filter for IPs ending in .1 to .250 because .250+ can have special meaning
last > 0 && last < 250
})
.find(|ip| !state.ipv4.contains_key(ip));
let found_ipv6: Option<Ipv6Addr> = ipv6_network
.iter()
.filter(|ip| !ip.is_loopback() && !ip.is_multicast())
.find(|ip| !state.ipv6.contains_key(ip));
let Some(ipv4) = found_ipv4 else {
return Err(anyhow!(
"unable to allocate ipv4 address, assigned network is exhausted"
));
};
let Some(ipv6) = found_ipv6 else {
return Err(anyhow!(
"unable to allocate ipv6 address, assigned network is exhausted"
));
};
let mut mac = MacAddr6::random();
mac.set_local(false);
mac.set_multicast(false);
let reservation = IpReservation {
uuid: uuid.to_string(),
ipv4,
ipv6,
mac,
ipv4_prefix: ipv4_network.prefix(),
ipv6_prefix: ipv6_network.prefix(),
gateway_ipv4: gateway_ipv4.unwrap_or(ipv4),
gateway_ipv6: gateway_ipv6.unwrap_or(ipv6),
gateway_mac: gateway_mac.unwrap_or(mac),
};
state.ipv4.insert(ipv4, reservation.clone());
state.ipv6.insert(ipv6, reservation.clone());
store.update(uuid, reservation.clone()).await?;
Ok(reservation)
}
pub async fn assign(&self, uuid: Uuid) -> Result<IpReservation> {
let mut state = self.state.write().await;
let reservation = IpAssignment::allocate(
&mut state,
&self.store,
uuid,
self.ipv4_network,
self.ipv6_network,
Some(self.gateway_ipv4),
Some(self.gateway_ipv6),
Some(self.gateway_mac),
)
.await?;
Ok(reservation)
}
pub async fn recall(&self, uuid: Uuid) -> Result<()> {
let mut state = self.state.write().await;
self.store.remove(uuid).await?;
state
.ipv4
.retain(|_, reservation| reservation.uuid != uuid.to_string());
state
.ipv6
.retain(|_, reservation| reservation.uuid != uuid.to_string());
Ok(())
}
pub async fn retrieve(&self, uuid: Uuid) -> Result<Option<IpReservation>> {
self.store.read(uuid).await
}
pub async fn reload(&self) -> Result<()> {
let mut state = self.state.write().await;
let intermediate = IpAssignment::fetch_current_state(&self.store).await?;
*state = intermediate;
Ok(())
}
pub async fn read(&self) -> Result<IpAssignmentState> {
Ok(self.state.read().await.clone())
}
}

View File

@ -0,0 +1 @@
pub mod assignment;

View File

@ -1,18 +1,22 @@
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use crate::db::ip::IpReservationStore;
use crate::db::zone::ZoneStore;
use crate::db::KrataDatabase;
use crate::ip::assignment::IpAssignment;
use anyhow::{anyhow, Result};
use config::DaemonConfig;
use console::{DaemonConsole, DaemonConsoleHandle};
use control::DaemonControlService;
use db::ZoneStore;
use devices::DaemonDeviceManager;
use event::{DaemonEventContext, DaemonEventGenerator};
use idm::{DaemonIdm, DaemonIdmHandle};
use ipnetwork::{Ipv4Network, Ipv6Network};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
use kratart::Runtime;
use log::{debug, info};
use reconcile::zone::ZoneReconciler;
use std::path::Path;
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use tokio::{
fs,
net::UnixListener,
@ -32,6 +36,7 @@ pub mod db;
pub mod devices;
pub mod event;
pub mod idm;
pub mod ip;
pub mod metrics;
pub mod oci;
pub mod reconcile;
@ -101,31 +106,33 @@ impl Daemon {
debug!("initializing caches and hydrating zone state");
let seed = config.oci.seed.clone().map(PathBuf::from);
let packer = OciPackerService::new(seed, &image_cache_dir, OciPlatform::current()).await?;
let glt = ZoneLookupTable::new(0, host_uuid);
let zones_db_path = format!("{}/zones.db", store);
let zones = ZoneStore::open(&PathBuf::from(zones_db_path))?;
debug!("initializing core runtime");
let runtime = Runtime::new().await?;
let zlt = ZoneLookupTable::new(0, host_uuid);
let db_path = format!("{}/krata.db", store);
let database = KrataDatabase::open(Path::new(&db_path))?;
let zones = ZoneStore::open(database.clone())?;
let (zone_reconciler_notify, zone_reconciler_receiver) =
channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
debug!("initializing core runtime");
let runtime = Runtime::new(host_uuid).await?;
debug!("starting IDM service");
let idm = DaemonIdm::new(glt.clone()).await?;
let idm = DaemonIdm::new(zlt.clone()).await?;
let idm = idm.launch().await?;
debug!("initializing console interfaces");
let console = DaemonConsole::new(glt.clone()).await?;
let console = DaemonConsole::new(zlt.clone()).await?;
let console = console.launch().await?;
debug!("initializing zone reconciler");
let (events, generator) =
DaemonEventGenerator::new(zones.clone(), zone_reconciler_notify.clone(), idm.clone())
.await?;
let runtime_for_reconciler = runtime.dupe().await?;
let ipv4_network = Ipv4Network::from_str(&config.network.ipv4.subnet)?;
let ipv6_network = Ipv6Network::from_str(&config.network.ipv6.subnet)?;
let ip_reservation_store = IpReservationStore::open(database)?;
let ip_assignment =
IpAssignment::new(host_uuid, ipv4_network, ipv6_network, ip_reservation_store).await?;
debug!("initializing zone reconciler");
let zone_reconciler = ZoneReconciler::new(
devices.clone(),
glt.clone(),
zlt.clone(),
zones.clone(),
events.clone(),
runtime_for_reconciler,
@ -134,6 +141,8 @@ impl Daemon {
kernel_path,
initrd_path,
addons_path,
ip_assignment,
config.clone(),
)?;
let zone_reconciler_task = zone_reconciler.launch(zone_reconciler_receiver).await?;
@ -152,7 +161,7 @@ impl Daemon {
Ok(Self {
store,
_config: config,
glt,
glt: zlt,
devices,
zones,
events,
@ -167,7 +176,7 @@ impl Daemon {
}
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
debug!("starting API service");
debug!("starting control service");
let control_service = DaemonControlService::new(
self.glt.clone(),
self.devices.clone(),

View File

@ -1,41 +1,41 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{anyhow, Result};
use futures::StreamExt;
use krata::launchcfg::LaunchPackedFormat;
use krata::v1::common::ZoneOciImageSpec;
use krata::v1::common::{OciImageFormat, Zone, ZoneState, ZoneStatus};
use krataoci::packer::{service::OciPackerService, OciPackedFormat};
use kratart::launch::{PciBdf, PciDevice, PciRdmReservePolicy};
use kratart::launch::{PciBdf, PciDevice, PciRdmReservePolicy, ZoneLaunchNetwork};
use kratart::{launch::ZoneLaunchRequest, Runtime};
use log::info;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::config::DaemonPciDeviceRdmReservePolicy;
use crate::config::{DaemonConfig, DaemonPciDeviceRdmReservePolicy};
use crate::devices::DaemonDeviceManager;
use crate::{
reconcile::zone::{zoneinfo_to_networkstate, ZoneReconcilerResult},
zlt::ZoneLookupTable,
};
use crate::ip::assignment::IpAssignment;
use crate::reconcile::zone::ip_reservation_to_network_status;
use crate::{reconcile::zone::ZoneReconcilerResult, zlt::ZoneLookupTable};
use krata::v1::common::zone_image_spec::Image;
use tokio::fs::{self, File};
use tokio::io::AsyncReadExt;
use tokio_tar::Archive;
use uuid::Uuid;
pub struct ZoneStarter<'a> {
pub struct ZoneCreator<'a> {
pub devices: &'a DaemonDeviceManager,
pub kernel_path: &'a Path,
pub initrd_path: &'a Path,
pub addons_path: &'a Path,
pub packer: &'a OciPackerService,
pub glt: &'a ZoneLookupTable,
pub ip_assignment: &'a IpAssignment,
pub zlt: &'a ZoneLookupTable,
pub runtime: &'a Runtime,
pub config: &'a DaemonConfig,
}
impl ZoneStarter<'_> {
impl ZoneCreator<'_> {
pub async fn oci_spec_tar_read_file(
&self,
file: &Path,
@ -75,7 +75,7 @@ impl ZoneStarter<'_> {
))
}
pub async fn start(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
pub async fn create(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let Some(ref spec) = zone.spec else {
return Err(anyhow!("zone spec not specified"));
};
@ -174,6 +174,8 @@ impl ZoneStarter<'_> {
}
}
let reservation = self.ip_assignment.assign(uuid).await?;
let info = self
.runtime
.launch(ZoneLaunchRequest {
@ -187,7 +189,7 @@ impl ZoneStarter<'_> {
image,
kernel,
initrd,
vcpus: spec.vcpus,
vcpus: spec.cpus,
mem: spec.mem,
pcis,
env: task
@ -198,16 +200,26 @@ impl ZoneStarter<'_> {
run: empty_vec_optional(task.command.clone()),
debug: false,
addons_image: Some(self.addons_path.to_path_buf()),
network: ZoneLaunchNetwork {
ipv4: reservation.ipv4.to_string(),
ipv4_prefix: reservation.ipv4_prefix,
ipv6: reservation.ipv6.to_string(),
ipv6_prefix: reservation.ipv6_prefix,
gateway_ipv4: reservation.gateway_ipv4.to_string(),
gateway_ipv6: reservation.gateway_ipv6.to_string(),
zone_mac: reservation.mac,
nameservers: self.config.network.nameservers.clone(),
},
})
.await?;
self.glt.associate(uuid, info.domid).await;
info!("started zone {}", uuid);
zone.state = Some(ZoneState {
status: ZoneStatus::Started.into(),
network: Some(zoneinfo_to_networkstate(&info)),
exit_info: None,
error_info: None,
host: self.glt.host_uuid().to_string(),
self.zlt.associate(uuid, info.domid).await;
info!("created zone {}", uuid);
zone.status = Some(ZoneStatus {
state: ZoneState::Created.into(),
network_status: Some(ip_reservation_to_network_status(&reservation)),
exit_status: None,
error_status: None,
host: self.zlt.host_uuid().to_string(),
domid: info.domid,
});
success.store(true, Ordering::Release);

View File

@ -5,13 +5,23 @@ use std::{
time::Duration,
};
use self::create::ZoneCreator;
use crate::config::DaemonConfig;
use crate::db::ip::IpReservation;
use crate::ip::assignment::IpAssignment;
use crate::{
db::zone::ZoneStore,
devices::DaemonDeviceManager,
event::{DaemonEvent, DaemonEventContext},
zlt::ZoneLookupTable,
};
use anyhow::Result;
use krata::v1::{
common::{Zone, ZoneErrorInfo, ZoneExitInfo, ZoneNetworkState, ZoneState, ZoneStatus},
common::{Zone, ZoneErrorStatus, ZoneExitStatus, ZoneNetworkStatus, ZoneState, ZoneStatus},
control::ZoneChangedEvent,
};
use krataoci::packer::service::OciPackerService;
use kratart::{Runtime, ZoneInfo};
use kratart::Runtime;
use log::{error, info, trace, warn};
use tokio::{
select,
@ -24,16 +34,7 @@ use tokio::{
};
use uuid::Uuid;
use crate::{
db::ZoneStore,
devices::DaemonDeviceManager,
event::{DaemonEvent, DaemonEventContext},
zlt::ZoneLookupTable,
};
use self::start::ZoneStarter;
mod start;
mod create;
const PARALLEL_LIMIT: u32 = 5;
@ -68,6 +69,8 @@ pub struct ZoneReconciler {
tasks: Arc<Mutex<HashMap<Uuid, ZoneReconcilerEntry>>>,
zone_reconciler_notify: Sender<Uuid>,
zone_reconcile_lock: Arc<RwLock<()>>,
ip_assignment: IpAssignment,
config: Arc<DaemonConfig>,
}
impl ZoneReconciler {
@ -83,6 +86,8 @@ impl ZoneReconciler {
kernel_path: PathBuf,
initrd_path: PathBuf,
modules_path: PathBuf,
ip_assignment: IpAssignment,
config: Arc<DaemonConfig>,
) -> Result<Self> {
Ok(Self {
devices,
@ -97,6 +102,8 @@ impl ZoneReconciler {
tasks: Arc::new(Mutex::new(HashMap::new())),
zone_reconciler_notify,
zone_reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)),
ip_assignment,
config,
})
}
@ -132,7 +139,7 @@ impl ZoneReconciler {
error!("runtime reconciler failed: {}", error);
}
}
};
}
}
}))
}
@ -166,21 +173,21 @@ impl ZoneReconciler {
let runtime_zone = runtime_zones.iter().find(|x| x.uuid == uuid);
match runtime_zone {
None => {
let mut state = stored_zone.state.as_mut().cloned().unwrap_or_default();
if state.status() == ZoneStatus::Started {
state.status = ZoneStatus::Starting.into();
let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
if status.state() == ZoneState::Created {
status.state = ZoneState::Creating.into();
}
stored_zone.state = Some(state);
stored_zone.status = Some(status);
}
Some(runtime) => {
self.zlt.associate(uuid, runtime.domid).await;
let mut state = stored_zone.state.as_mut().cloned().unwrap_or_default();
let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
if let Some(code) = runtime.state.exit_code {
state.status = ZoneStatus::Exited.into();
state.exit_info = Some(ZoneExitInfo { code });
status.state = ZoneState::Exited.into();
status.exit_status = Some(ZoneExitStatus { code });
} else {
state.status = ZoneStatus::Started.into();
status.state = ZoneState::Created.into();
}
for device in &stored_zone
@ -193,8 +200,11 @@ impl ZoneReconciler {
device_claims.insert(device.name.clone(), uuid);
}
state.network = Some(zoneinfo_to_networkstate(runtime));
stored_zone.state = Some(state);
if let Some(reservation) = self.ip_assignment.retrieve(uuid).await? {
status.network_status =
Some(ip_reservation_to_network_status(&reservation));
}
stored_zone.status = Some(status);
}
}
@ -228,20 +238,20 @@ impl ZoneReconciler {
zone: Some(zone.clone()),
}))?;
let start_status = zone.state.as_ref().map(|x| x.status()).unwrap_or_default();
let result = match start_status {
ZoneStatus::Starting => self.start(uuid, &mut zone).await,
ZoneStatus::Exited => self.exited(&mut zone).await,
ZoneStatus::Destroying => self.destroy(uuid, &mut zone).await,
let start_state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
let result = match start_state {
ZoneState::Creating => self.create(uuid, &mut zone).await,
ZoneState::Exited => self.exited(&mut zone).await,
ZoneState::Destroying => self.destroy(uuid, &mut zone).await,
_ => Ok(ZoneReconcilerResult::Unchanged),
};
let result = match result {
Ok(result) => result,
Err(error) => {
zone.state = Some(zone.state.as_mut().cloned().unwrap_or_default());
zone.state.as_mut().unwrap().status = ZoneStatus::Failed.into();
zone.state.as_mut().unwrap().error_info = Some(ZoneErrorInfo {
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
zone.status.as_mut().unwrap().state = ZoneState::Failed.into();
zone.status.as_mut().unwrap().error_status = Some(ZoneErrorStatus {
message: error.to_string(),
});
warn!("failed to start zone {}: {}", zone.id, error);
@ -251,8 +261,8 @@ impl ZoneReconciler {
info!("reconciled zone {}", uuid);
let status = zone.state.as_ref().map(|x| x.status()).unwrap_or_default();
let destroyed = status == ZoneStatus::Destroyed;
let state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
let destroyed = state == ZoneState::Destroyed;
let rerun = if let ZoneReconcilerResult::Changed { rerun } = result {
let event = DaemonEvent::ZoneChanged(ZoneChangedEvent {
@ -276,22 +286,24 @@ impl ZoneReconciler {
Ok(rerun)
}
async fn start(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let starter = ZoneStarter {
async fn create(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let starter = ZoneCreator {
devices: &self.devices,
kernel_path: &self.kernel_path,
initrd_path: &self.initrd_path,
addons_path: &self.addons_path,
packer: &self.packer,
glt: &self.zlt,
ip_assignment: &self.ip_assignment,
zlt: &self.zlt,
runtime: &self.runtime,
config: &self.config,
};
starter.start(uuid, zone).await
starter.create(uuid, zone).await
}
async fn exited(&self, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
if let Some(ref mut state) = zone.state {
state.set_status(ZoneStatus::Destroying);
if let Some(ref mut status) = zone.status {
status.set_state(ZoneState::Destroying);
Ok(ZoneReconcilerResult::Changed { rerun: true })
} else {
Ok(ZoneReconcilerResult::Unchanged)
@ -303,18 +315,19 @@ impl ZoneReconciler {
trace!("failed to destroy runtime zone {}: {}", uuid, error);
}
let domid = zone.state.as_ref().map(|x| x.domid);
let domid = zone.status.as_ref().map(|x| x.domid);
if let Some(domid) = domid {
self.zlt.remove(uuid, domid).await;
}
info!("destroyed zone {}", uuid);
zone.state = Some(ZoneState {
status: ZoneStatus::Destroyed.into(),
network: None,
exit_info: None,
error_info: None,
self.ip_assignment.recall(uuid).await?;
zone.status = Some(ZoneStatus {
state: ZoneState::Destroyed.into(),
network_status: None,
exit_status: None,
error_status: None,
host: self.zlt.host_uuid().to_string(),
domid: domid.unwrap_or(u32::MAX),
});
@ -362,13 +375,13 @@ impl ZoneReconciler {
}
}
pub fn zoneinfo_to_networkstate(info: &ZoneInfo) -> ZoneNetworkState {
ZoneNetworkState {
zone_ipv4: info.zone_ipv4.map(|x| x.to_string()).unwrap_or_default(),
zone_ipv6: info.zone_ipv6.map(|x| x.to_string()).unwrap_or_default(),
zone_mac: info.zone_mac.as_ref().cloned().unwrap_or_default(),
gateway_ipv4: info.gateway_ipv4.map(|x| x.to_string()).unwrap_or_default(),
gateway_ipv6: info.gateway_ipv6.map(|x| x.to_string()).unwrap_or_default(),
gateway_mac: info.gateway_mac.as_ref().cloned().unwrap_or_default(),
pub fn ip_reservation_to_network_status(ip: &IpReservation) -> ZoneNetworkStatus {
ZoneNetworkStatus {
zone_ipv4: format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
zone_ipv6: format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
zone_mac: ip.mac.to_string().replace('-', ":"),
gateway_ipv4: format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix),
gateway_ipv6: format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix),
gateway_mac: ip.gateway_mac.to_string().replace('-', ":"),
}
}

View File

@ -11,7 +11,7 @@ import "google/protobuf/struct.proto";
message Zone {
string id = 1;
ZoneSpec spec = 2;
ZoneState state = 3;
ZoneStatus status = 3;
}
message ZoneSpec {
@ -21,7 +21,7 @@ message ZoneSpec {
ZoneImageSpec kernel = 3;
// If not specified, defaults to the daemon default initrd.
ZoneImageSpec initrd = 4;
uint32 vcpus = 5;
uint32 cpus = 5;
uint64 mem = 6;
ZoneTaskSpec task = 7;
repeated ZoneSpecAnnotation annotations = 8;
@ -67,26 +67,26 @@ message ZoneSpecDevice {
string name = 1;
}
message ZoneState {
ZoneStatus status = 1;
ZoneNetworkState network = 2;
ZoneExitInfo exit_info = 3;
ZoneErrorInfo error_info = 4;
message ZoneStatus {
ZoneState state = 1;
ZoneNetworkStatus network_status = 2;
ZoneExitStatus exit_status = 3;
ZoneErrorStatus error_status = 4;
string host = 5;
uint32 domid = 6;
}
enum ZoneStatus {
ZONE_STATUS_UNKNOWN = 0;
ZONE_STATUS_STARTING = 1;
ZONE_STATUS_STARTED = 2;
ZONE_STATUS_EXITED = 3;
ZONE_STATUS_DESTROYING = 4;
ZONE_STATUS_DESTROYED = 5;
ZONE_STATUS_FAILED = 6;
enum ZoneState {
ZONE_STATE_UNKNOWN = 0;
ZONE_STATE_CREATING = 1;
ZONE_STATE_CREATED = 2;
ZONE_STATE_EXITED = 3;
ZONE_STATE_DESTROYING = 4;
ZONE_STATE_DESTROYED = 5;
ZONE_STATE_FAILED = 6;
}
message ZoneNetworkState {
message ZoneNetworkStatus {
string zone_ipv4 = 1;
string zone_ipv6 = 2;
string zone_mac = 3;
@ -95,11 +95,11 @@ message ZoneNetworkState {
string gateway_mac = 6;
}
message ZoneExitInfo {
message ZoneExitStatus {
int32 code = 1;
}
message ZoneErrorInfo {
message ZoneErrorStatus {
string message = 1;
}

View File

@ -10,31 +10,34 @@ import "krata/idm/transport.proto";
import "krata/v1/common.proto";
service ControlService {
rpc IdentifyHost(IdentifyHostRequest) returns (IdentifyHostReply);
rpc CreateZone(CreateZoneRequest) returns (CreateZoneReply);
rpc DestroyZone(DestroyZoneRequest) returns (DestroyZoneReply);
rpc ResolveZone(ResolveZoneRequest) returns (ResolveZoneReply);
rpc ListZones(ListZonesRequest) returns (ListZonesReply);
rpc ListDevices(ListDevicesRequest) returns (ListDevicesReply);
rpc ExecZone(stream ExecZoneRequest) returns (stream ExecZoneReply);
rpc AttachZoneConsole(stream ZoneConsoleRequest) returns (stream ZoneConsoleReply);
rpc ReadZoneMetrics(ReadZoneMetricsRequest) returns (ReadZoneMetricsReply);
rpc HostStatus(HostStatusRequest) returns (HostStatusReply);
rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply);
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
rpc GetHostCpuTopology(GetHostCpuTopologyRequest) returns (GetHostCpuTopologyReply);
rpc SetHostPowerManagementPolicy(SetHostPowerManagementPolicyRequest) returns (SetHostPowerManagementPolicyReply);
rpc ListDevices(ListDevicesRequest) returns (ListDevicesReply);
rpc PullImage(PullImageRequest) returns (stream PullImageReply);
rpc GetHostCpuTopology(HostCpuTopologyRequest) returns (HostCpuTopologyReply);
rpc SetHostPowerManagementPolicy(HostPowerManagementPolicy) returns (HostPowerManagementPolicy);
rpc CreateZone(CreateZoneRequest) returns (CreateZoneReply);
rpc DestroyZone(DestroyZoneRequest) returns (DestroyZoneReply);
rpc ResolveZoneId(ResolveZoneIdRequest) returns (ResolveZoneIdReply);
rpc GetZone(GetZoneRequest) returns (GetZoneReply);
rpc ListZones(ListZonesRequest) returns (ListZonesReply);
rpc AttachZoneConsole(stream ZoneConsoleRequest) returns (stream ZoneConsoleReply);
rpc ExecInsideZone(stream ExecInsideZoneRequest) returns (stream ExecInsideZoneReply);
rpc ReadZoneMetrics(ReadZoneMetricsRequest) returns (ReadZoneMetricsReply);
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
}
message IdentifyHostRequest {}
message HostStatusRequest {}
message IdentifyHostReply {
message HostStatusReply {
string host_uuid = 1;
uint32 host_domid = 2;
string krata_version = 3;
@ -45,36 +48,44 @@ message CreateZoneRequest {
}
message CreateZoneReply {
string Zone_id = 1;
string zone_id = 1;
}
message DestroyZoneRequest {
string Zone_id = 1;
string zone_id = 1;
}
message DestroyZoneReply {}
message ResolveZoneRequest {
message ResolveZoneIdRequest {
string name = 1;
}
message ResolveZoneReply {
krata.v1.common.Zone Zone = 1;
message ResolveZoneIdReply {
string zone_id = 1;
}
message GetZoneRequest {
string zone_id = 1;
}
message GetZoneReply {
krata.v1.common.Zone zone = 1;
}
message ListZonesRequest {}
message ListZonesReply {
repeated krata.v1.common.Zone Zones = 1;
repeated krata.v1.common.Zone zones = 1;
}
message ExecZoneRequest {
string Zone_id = 1;
message ExecInsideZoneRequest {
string zone_id = 1;
krata.v1.common.ZoneTaskSpec task = 2;
bytes data = 3;
}
message ExecZoneReply {
message ExecInsideZoneReply {
bool exited = 1;
string error = 2;
int32 exit_code = 3;
@ -83,7 +94,7 @@ message ExecZoneReply {
}
message ZoneConsoleRequest {
string Zone_id = 1;
string zone_id = 1;
bytes data = 2;
}
@ -95,16 +106,16 @@ message WatchEventsRequest {}
message WatchEventsReply {
oneof event {
ZoneChangedEvent Zone_changed = 1;
ZoneChangedEvent zone_changed = 1;
}
}
message ZoneChangedEvent {
krata.v1.common.Zone Zone = 1;
krata.v1.common.Zone zone = 1;
}
message ReadZoneMetricsRequest {
string Zone_id = 1;
string zone_id = 1;
}
message ReadZoneMetricsReply {
@ -219,15 +230,15 @@ message HostCpuTopologyInfo {
HostCpuTopologyClass class = 5;
}
message HostCpuTopologyRequest {}
message GetHostCpuTopologyRequest {}
message HostCpuTopologyReply {
message GetHostCpuTopologyReply {
repeated HostCpuTopologyInfo cpus = 1;
}
message HostPowerManagementPolicyRequest {}
message HostPowerManagementPolicy {
message SetHostPowerManagementPolicyRequest {
string scheduler = 1;
bool smt_awareness = 2;
}
message SetHostPowerManagementPolicyReply {}

View File

@ -34,7 +34,7 @@ use super::{
type OneshotRequestMap<R> = Arc<Mutex<HashMap<u64, oneshot::Sender<<R as IdmRequest>::Response>>>>;
type StreamRequestMap<R> = Arc<Mutex<HashMap<u64, Sender<<R as IdmRequest>::Response>>>>;
type StreamRequestUpdateMap<R> = Arc<Mutex<HashMap<u64, mpsc::Sender<R>>>>;
type StreamRequestUpdateMap<R> = Arc<Mutex<HashMap<u64, Sender<R>>>>;
pub type IdmInternalClient = IdmClient<internal::Request, internal::Event>;
const IDM_PACKET_QUEUE_LEN: usize = 100;

View File

@ -76,44 +76,44 @@ impl AutoNetworkWatcher {
let mut networks: Vec<NetworkMetadata> = Vec::new();
for (uuid, zone) in &all_zones {
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
if state.domid == u32::MAX {
if status.domid == u32::MAX {
continue;
}
let Some(ref network) = state.network else {
let Some(ref network_status) = status.network_status else {
continue;
};
let Ok(zone_ipv4_cidr) = Ipv4Cidr::from_str(&network.zone_ipv4) else {
let Ok(zone_ipv4_cidr) = Ipv4Cidr::from_str(&network_status.zone_ipv4) else {
continue;
};
let Ok(zone_ipv6_cidr) = Ipv6Cidr::from_str(&network.zone_ipv6) else {
let Ok(zone_ipv6_cidr) = Ipv6Cidr::from_str(&network_status.zone_ipv6) else {
continue;
};
let Ok(zone_mac) = EthernetAddress::from_str(&network.zone_mac) else {
let Ok(zone_mac) = EthernetAddress::from_str(&network_status.zone_mac) else {
continue;
};
let Ok(gateway_ipv4_cidr) = Ipv4Cidr::from_str(&network.gateway_ipv4) else {
let Ok(gateway_ipv4_cidr) = Ipv4Cidr::from_str(&network_status.gateway_ipv4) else {
continue;
};
let Ok(gateway_ipv6_cidr) = Ipv6Cidr::from_str(&network.gateway_ipv6) else {
let Ok(gateway_ipv6_cidr) = Ipv6Cidr::from_str(&network_status.gateway_ipv6) else {
continue;
};
let Ok(gateway_mac) = EthernetAddress::from_str(&network.gateway_mac) else {
let Ok(gateway_mac) = EthernetAddress::from_str(&network_status.gateway_mac) else {
continue;
};
networks.push(NetworkMetadata {
domid: state.domid,
domid: status.domid,
uuid: *uuid,
zone: NetworkSide {
ipv4: zone_ipv4_cidr,
@ -187,7 +187,7 @@ impl AutoNetworkWatcher {
_ = sleep(Duration::from_secs(10)) => {
break;
}
};
}
}
Ok(())
}

View File

@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let (context, mut receiver) = OciProgressContext::create();
tokio::task::spawn(async move {
loop {
if (receiver.changed().await).is_err() {
if receiver.changed().await.is_err() {
break;
}
let progress = receiver.borrow_and_update();

View File

@ -97,13 +97,13 @@ impl OciPackerBackend for OciPackerMkSquashfs {
status = &mut wait => {
break status;
}
};
}
} else {
select! {
status = &mut wait => {
break status;
}
};
}
}
};
if let Some(writer) = writer {
@ -172,13 +172,13 @@ impl OciPackerBackend for OciPackerMkfsErofs {
status = &mut wait => {
break status;
}
};
}
} else {
select! {
status = &mut wait => {
break status;
}
};
}
}
};
if let Some(writer) = writer {

View File

@ -228,7 +228,7 @@ impl OciBoundProgress {
context.update(&progress);
let mut receiver = self.context.subscribe();
tokio::task::spawn(async move {
while (receiver.changed().await).is_ok() {
while receiver.changed().await.is_ok() {
context
.sender
.send_replace(receiver.borrow_and_update().clone());

View File

@ -60,11 +60,11 @@ impl ChannelService {
let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
debug!("opening Xen event channel");
debug!("opening xenevtchn");
let evtchn = EventChannelService::open().await?;
debug!("opening XenStore");
debug!("opening xenstore");
let store = XsdClient::open().await?;
debug!("opening GrantTab");
debug!("opening xengnt");
let gnttab = GrantTab::open()?;
Ok((
@ -503,7 +503,7 @@ impl KrataChannelBackendProcessor {
break;
}
}
};
}
}
Ok(())
}

View File

@ -1,19 +1,21 @@
use std::collections::HashMap;
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use advmac::MacAddr6;
use anyhow::{anyhow, Result};
use ipnetwork::IpNetwork;
use tokio::sync::Semaphore;
use uuid::Uuid;
use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
LaunchPackedFormat, LaunchRoot,
};
use krataoci::packer::OciPackedImage;
use tokio::sync::Semaphore;
use uuid::Uuid;
pub use xenclient::{
pci::PciBdf, DomainPciDevice as PciDevice, DomainPciRdmReservePolicy as PciRdmReservePolicy,
};
use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface};
use xenplatform::domain::BaseDomainConfig;
@ -22,10 +24,6 @@ use crate::RuntimeContext;
use super::{ZoneInfo, ZoneState};
pub use xenclient::{
pci::PciBdf, DomainPciDevice as PciDevice, DomainPciRdmReservePolicy as PciRdmReservePolicy,
};
pub struct ZoneLaunchRequest {
pub format: LaunchPackedFormat,
pub kernel: Vec<u8>,
@ -40,6 +38,18 @@ pub struct ZoneLaunchRequest {
pub debug: bool,
pub image: OciPackedImage,
pub addons_image: Option<PathBuf>,
pub network: ZoneLaunchNetwork,
}
pub struct ZoneLaunchNetwork {
pub ipv4: String,
pub ipv4_prefix: u8,
pub ipv6: String,
pub ipv6_prefix: u8,
pub gateway_ipv4: String,
pub gateway_ipv6: String,
pub zone_mac: MacAddr6,
pub nameservers: Vec<String>,
}
pub struct ZoneLauncher {
@ -58,15 +68,7 @@ impl ZoneLauncher {
) -> Result<ZoneInfo> {
let uuid = request.uuid.unwrap_or_else(Uuid::new_v4);
let xen_name = format!("krata-{uuid}");
let mut gateway_mac = MacAddr6::random();
gateway_mac.set_local(true);
gateway_mac.set_multicast(false);
let mut zone_mac = MacAddr6::random();
zone_mac.set_local(true);
zone_mac.set_multicast(false);
let _launch_permit = self.launch_semaphore.acquire().await?;
let mut ip = context.ipvendor.assign(uuid).await?;
let launch_config = LaunchInfo {
root: LaunchRoot {
format: request.format.clone(),
@ -81,20 +83,15 @@ impl ZoneLauncher {
network: Some(LaunchNetwork {
link: "eth0".to_string(),
ipv4: LaunchNetworkIpv4 {
address: format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
gateway: ip.gateway_ipv4.to_string(),
address: format!("{}/{}", request.network.ipv4, request.network.ipv4_prefix),
gateway: request.network.gateway_ipv4,
},
ipv6: LaunchNetworkIpv6 {
address: format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
gateway: ip.gateway_ipv6.to_string(),
address: format!("{}/{}", request.network.ipv6, request.network.ipv6_prefix),
gateway: request.network.gateway_ipv6.to_string(),
},
resolver: LaunchNetworkResolver {
nameservers: vec![
"1.1.1.1".to_string(),
"1.0.0.1".to_string(),
"2606:4700:4700::1111".to_string(),
"2606:4700:4700::1001".to_string(),
],
nameservers: request.network.nameservers,
},
}),
env: request.env,
@ -145,8 +142,7 @@ impl ZoneLauncher {
}
let cmdline = cmdline_options.join(" ");
let zone_mac_string = zone_mac.to_string().replace('-', ":");
let gateway_mac_string = gateway_mac.to_string().replace('-', ":");
let zone_mac_string = request.network.zone_mac.to_string().replace('-', ":");
let mut disks = vec![
DomainDisk {
@ -190,30 +186,6 @@ impl ZoneLauncher {
let mut extra_keys = vec![
("krata/uuid".to_string(), uuid.to_string()),
("krata/loops".to_string(), loops.join(",")),
(
"krata/network/zone/ipv4".to_string(),
format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
),
(
"krata/network/zone/ipv6".to_string(),
format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
),
(
"krata/network/zone/mac".to_string(),
zone_mac_string.clone(),
),
(
"krata/network/gateway/ipv4".to_string(),
format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix),
),
(
"krata/network/gateway/ipv6".to_string(),
format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix),
),
(
"krata/network/gateway/mac".to_string(),
gateway_mac_string.clone(),
),
];
if let Some(name) = request.name.as_ref() {
@ -251,29 +223,14 @@ impl ZoneLauncher {
extra_rw_paths: vec!["krata/zone".to_string()],
};
match context.xen.create(&config).await {
Ok(created) => {
ip.commit().await?;
Ok(ZoneInfo {
name: request.name.as_ref().map(|x| x.to_string()),
uuid,
domid: created.domid,
image: request.image.digest,
loops: vec![],
zone_ipv4: Some(IpNetwork::new(IpAddr::V4(ip.ipv4), ip.ipv4_prefix)?),
zone_ipv6: Some(IpNetwork::new(IpAddr::V6(ip.ipv6), ip.ipv6_prefix)?),
zone_mac: Some(zone_mac_string.clone()),
gateway_ipv4: Some(IpNetwork::new(
IpAddr::V4(ip.gateway_ipv4),
ip.ipv4_prefix,
)?),
gateway_ipv6: Some(IpNetwork::new(
IpAddr::V6(ip.gateway_ipv6),
ip.ipv6_prefix,
)?),
gateway_mac: Some(gateway_mac_string.clone()),
state: ZoneState { exit_code: None },
})
}
Ok(created) => Ok(ZoneInfo {
name: request.name.as_ref().map(|x| x.to_string()),
uuid,
domid: created.domid,
image: request.image.digest,
loops: vec![],
state: ZoneState { exit_code: None },
}),
Err(error) => {
let _ = context.autoloop.unloop(&image_squashfs_loop.path).await;
let _ = context.autoloop.unloop(&cfgblk_squashfs_loop.path).await;

View File

@ -1,12 +1,10 @@
use std::{fs, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc};
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
use ip::IpVendor;
use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network};
use krataloopdev::LoopControl;
use log::{debug, error};
use tokio::sync::Semaphore;
use uuid::Uuid;
use xenclient::XenClient;
use xenstore::{XsdClient, XsdInterface};
@ -19,7 +17,6 @@ use self::{
pub mod autoloop;
pub mod cfgblk;
pub mod channel;
pub mod ip;
pub mod launch;
pub mod power;
@ -48,12 +45,6 @@ pub struct ZoneInfo {
pub domid: u32,
pub image: String,
pub loops: Vec<ZoneLoopInfo>,
pub zone_ipv4: Option<IpNetwork>,
pub zone_ipv6: Option<IpNetwork>,
pub zone_mac: Option<String>,
pub gateway_ipv4: Option<IpNetwork>,
pub gateway_ipv6: Option<IpNetwork>,
pub gateway_mac: Option<String>,
pub state: ZoneState,
}
@ -61,28 +52,14 @@ pub struct ZoneInfo {
pub struct RuntimeContext {
pub autoloop: AutoLoop,
pub xen: XenClient<RuntimePlatform>,
pub ipvendor: IpVendor,
}
impl RuntimeContext {
pub async fn new(host_uuid: Uuid) -> Result<Self> {
debug!("initializing XenClient");
pub async fn new() -> Result<Self> {
let xen = XenClient::new(0, RuntimePlatform::new()).await?;
debug!("initializing ip allocation vendor");
let ipv4_network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?;
let ipv6_network = Ipv6Network::from_str("fdd4:1476:6c7e::/48")?;
let ipvendor =
IpVendor::new(xen.store.clone(), host_uuid, ipv4_network, ipv6_network).await?;
debug!("initializing loop devices");
let autoloop = AutoLoop::new(LoopControl::open()?);
debug!("krata runtime initialized!");
Ok(RuntimeContext {
autoloop,
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
ipvendor,
})
}
@ -123,61 +100,6 @@ impl RuntimeContext {
.store
.read_string(&format!("{}/krata/loops", &dom_path))
.await?;
let zone_ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/ipv4", &dom_path))
.await?;
let zone_ipv6 = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/ipv6", &dom_path))
.await?;
let zone_mac = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/mac", &dom_path))
.await?;
let gateway_ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/ipv4", &dom_path))
.await?;
let gateway_ipv6 = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/ipv6", &dom_path))
.await?;
let gateway_mac = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/mac", &dom_path))
.await?;
let zone_ipv4 = if let Some(zone_ipv4) = zone_ipv4 {
IpNetwork::from_str(&zone_ipv4).ok()
} else {
None
};
let zone_ipv6 = if let Some(zone_ipv6) = zone_ipv6 {
IpNetwork::from_str(&zone_ipv6).ok()
} else {
None
};
let gateway_ipv4 = if let Some(gateway_ipv4) = gateway_ipv4 {
IpNetwork::from_str(&gateway_ipv4).ok()
} else {
None
};
let gateway_ipv6 = if let Some(gateway_ipv6) = gateway_ipv6 {
IpNetwork::from_str(&gateway_ipv6).ok()
} else {
None
};
let exit_code = self
.xen
.store
@ -198,12 +120,6 @@ impl RuntimeContext {
domid,
image,
loops,
zone_ipv4,
zone_ipv6,
zone_mac,
gateway_ipv4,
gateway_ipv6,
gateway_mac,
state,
});
}
@ -245,16 +161,14 @@ impl RuntimeContext {
#[derive(Clone)]
pub struct Runtime {
host_uuid: Uuid,
context: RuntimeContext,
launch_semaphore: Arc<Semaphore>,
}
impl Runtime {
pub async fn new(host_uuid: Uuid) -> Result<Self> {
let context = RuntimeContext::new(host_uuid).await?;
pub async fn new() -> Result<Self> {
let context = RuntimeContext::new().await?;
Ok(Self {
host_uuid,
context,
launch_semaphore: Arc::new(Semaphore::new(10)),
})
@ -290,11 +204,6 @@ impl Runtime {
return Err(anyhow!("unable to find krata uuid based on the domain",));
}
let uuid = Uuid::parse_str(&uuid)?;
let ip = self
.context
.ipvendor
.read_domain_assignment(uuid, domid)
.await?;
let loops = store
.read_string(format!("{}/krata/loops", dom_path).as_str())
.await?;
@ -314,16 +223,6 @@ impl Runtime {
}
}
}
if let Some(ip) = ip {
if let Err(error) = self.context.ipvendor.recall(&ip).await {
error!(
"failed to recall ip assignment for zone {}: {}",
uuid, error
);
}
}
Ok(uuid)
}
@ -332,11 +231,11 @@ impl Runtime {
}
pub async fn dupe(&self) -> Result<Runtime> {
Runtime::new(self.host_uuid).await
Runtime::new().await
}
pub async fn power_management_context(&self) -> Result<PowerManagementContext> {
let context = RuntimeContext::new(self.host_uuid).await?;
let context = RuntimeContext::new().await?;
Ok(PowerManagementContext { context })
}
}

View File

@ -130,8 +130,7 @@ impl<P: BootSetupPlatform> XenClient<P> {
match self.init(created.domid, config, &created).await {
Ok(_) => Ok(created),
Err(err) => {
// ignore since destroying a domain is best
// effort when an error occurs
// ignore since destroying a domain is best-effort when an error occurs
let _ = self.domain_manager.destroy(created.domid).await;
Err(err)
}

View File

@ -7,7 +7,7 @@ use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}
use crate::raw::EVENT_CHANNEL_DEVICE;
use byteorder::{LittleEndian, ReadBytesExt};
use log::warn;
use log::error;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::size_of;
@ -16,7 +16,6 @@ use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
@ -185,9 +184,10 @@ impl EventChannelProcessor {
if self.flag.load(Ordering::Acquire) {
break;
}
warn!("failed to process event channel notifications: {}", error);
error!("failed to process event channel wakes: {}", error);
}
});
Ok(())
}

View File

@ -298,7 +298,7 @@ impl XsdSocketProcessor {
break;
}
}
};
}
}
Ok(())
}

View File

@ -95,7 +95,7 @@ impl ZoneBackground {
break;
}
}
};
}
}
Ok(())
}