diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs deleted file mode 100644 index aacf5a2..0000000 --- a/crates/daemon/src/control.rs +++ /dev/null @@ -1,753 +0,0 @@ -use crate::db::zone::ZoneStore; -use crate::ip::assignment::IpAssignment; -use crate::{ - command::DaemonCommand, console::DaemonConsoleHandle, devices::DaemonDeviceManager, - event::DaemonEventContext, idm::DaemonIdmHandle, metrics::idm_metric_to_api, - oci::convert_oci_progress, zlt::ZoneLookupTable, -}; -use async_stream::try_stream; -use futures::Stream; -use krata::v1::common::ZoneResourceStatus; -use krata::v1::control::{ - GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply, - SetHostPowerManagementPolicyRequest, -}; -use krata::{ - idm::internal::{ - exec_stream_request_update::Update, request::Request as IdmRequestType, - response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart, - ExecStreamRequestStdin, ExecStreamRequestUpdate, MetricsRequest, Request as IdmRequest, - }, - v1::{ - common::{OciImageFormat, Zone, ZoneState, ZoneStatus}, - control::{ - control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, - DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecInsideZoneReply, - ExecInsideZoneRequest, GetHostCpuTopologyReply, GetHostCpuTopologyRequest, - HostCpuTopologyInfo, HostStatusReply, HostStatusRequest, ListDevicesReply, - ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest, - ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply, - ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply, - SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, - WatchEventsReply, WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest, - }, - }, -}; -use krataoci::{ - name::ImageName, - packer::{service::OciPackerService, OciPackedFormat, OciPackedImage}, - progress::{OciProgress, OciProgressContext}, -}; -use kratart::Runtime; -use std::{pin::Pin, str::FromStr}; -use tokio::{ - select, - sync::mpsc::{channel, Sender}, - task::JoinError, -}; -use tokio_stream::StreamExt; -use tonic::{Request, Response, Status, Streaming}; -use uuid::Uuid; - -pub struct ApiError { - message: String, -} - -impl From for ApiError { - fn from(value: anyhow::Error) -> Self { - ApiError { - message: value.to_string(), - } - } -} - -impl From for Status { - fn from(value: ApiError) -> Self { - Status::unknown(value.message) - } -} - -#[derive(Clone)] -pub struct DaemonControlService { - zlt: ZoneLookupTable, - devices: DaemonDeviceManager, - events: DaemonEventContext, - console: DaemonConsoleHandle, - idm: DaemonIdmHandle, - zones: ZoneStore, - ip: IpAssignment, - zone_reconciler_notify: Sender, - packer: OciPackerService, - runtime: Runtime, -} - -impl DaemonControlService { - #[allow(clippy::too_many_arguments)] - pub fn new( - zlt: ZoneLookupTable, - devices: DaemonDeviceManager, - events: DaemonEventContext, - console: DaemonConsoleHandle, - idm: DaemonIdmHandle, - zones: ZoneStore, - ip: IpAssignment, - zone_reconciler_notify: Sender, - packer: OciPackerService, - runtime: Runtime, - ) -> Self { - Self { - zlt, - devices, - events, - console, - idm, - zones, - ip, - zone_reconciler_notify, - packer, - runtime, - } - } -} - -enum ConsoleDataSelect { - Read(Option>), - Write(Option>), -} - -enum PullImageSelect { - Progress(Option), - Completed(Result, JoinError>), -} - -#[tonic::async_trait] -impl ControlService for DaemonControlService { - type ExecInsideZoneStream = - Pin> + Send + 'static>>; - - type AttachZoneConsoleStream = - Pin> + Send + 'static>>; - - type PullImageStream = - Pin> + Send + 'static>>; - - type WatchEventsStream = - Pin> + Send + 'static>>; - - type SnoopIdmStream = - Pin> + Send + 'static>>; - - async fn host_status( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let host_reservation = - self.ip - .retrieve(self.zlt.host_uuid()) - .await - .map_err(|x| ApiError { - message: x.to_string(), - })?; - Ok(Response::new(HostStatusReply { - host_domid: self.zlt.host_domid(), - host_uuid: self.zlt.host_uuid().to_string(), - krata_version: DaemonCommand::version(), - host_ipv4: host_reservation - .as_ref() - .map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix)) - .unwrap_or_default(), - host_ipv6: host_reservation - .as_ref() - .map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix)) - .unwrap_or_default(), - host_mac: host_reservation - .as_ref() - .map(|x| x.mac.to_string().to_lowercase().replace('-', ":")) - .unwrap_or_default(), - })) - } - - async fn create_zone( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let Some(spec) = request.spec else { - return Err(ApiError { - message: "zone spec not provided".to_string(), - } - .into()); - }; - let uuid = Uuid::new_v4(); - self.zones - .update( - uuid, - Zone { - id: uuid.to_string(), - status: Some(ZoneStatus { - state: ZoneState::Creating.into(), - network_status: None, - exit_status: None, - error_status: None, - resource_status: None, - host: self.zlt.host_uuid().to_string(), - domid: u32::MAX, - }), - spec: Some(spec), - }, - ) - .await - .map_err(ApiError::from)?; - self.zone_reconciler_notify - .send(uuid) - .await - .map_err(|x| ApiError { - message: x.to_string(), - })?; - Ok(Response::new(CreateZoneReply { - zone_id: uuid.to_string(), - })) - } - - async fn exec_inside_zone( - &self, - request: Request>, - ) -> Result, Status> { - let mut input = request.into_inner(); - let Some(request) = input.next().await else { - return Err(ApiError { - message: "expected to have at least one request".to_string(), - } - .into()); - }; - let request = request?; - - let Some(task) = request.task else { - return Err(ApiError { - message: "task is missing".to_string(), - } - .into()); - }; - - let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?; - let idm = self.idm.client(uuid).await.map_err(|error| ApiError { - message: error.to_string(), - })?; - - let idm_request = IdmRequest { - request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { - update: Some(Update::Start(ExecStreamRequestStart { - environment: task - .environment - .into_iter() - .map(|x| ExecEnvVar { - key: x.key, - value: x.value, - }) - .collect(), - command: task.command, - working_directory: task.working_directory, - tty: task.tty, - })), - })), - }; - - let output = try_stream! { - let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError { - message: x.to_string(), - })?; - - loop { - select! { - x = input.next() => if let Some(update) = x { - let update: Result = update.map_err(|error| ApiError { - message: error.to_string() - }.into()); - - if let Ok(update) = update { - if !update.stdin.is_empty() { - let _ = handle.update(IdmRequest { - request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { - update: Some(Update::Stdin(ExecStreamRequestStdin { - data: update.stdin, - closed: update.stdin_closed, - })), - }))}).await; - } - } - }, - x = handle.receiver.recv() => match x { - Some(response) => { - let Some(IdmResponseType::ExecStream(update)) = response.response else { - break; - }; - let reply = ExecInsideZoneReply { - exited: update.exited, - error: update.error, - exit_code: update.exit_code, - stdout: update.stdout, - stderr: update.stderr, - }; - yield reply; - }, - None => { - break; - } - } - } - } - }; - - Ok(Response::new(Box::pin(output) as Self::ExecInsideZoneStream)) - } - - async fn destroy_zone( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?; - let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else { - return Err(ApiError { - message: "zone not found".to_string(), - } - .into()); - }; - - zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default()); - - if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed { - return Err(ApiError { - message: "zone already destroyed".to_string(), - } - .into()); - } - - zone.status.as_mut().unwrap().state = ZoneState::Destroying.into(); - self.zones - .update(uuid, zone) - .await - .map_err(ApiError::from)?; - self.zone_reconciler_notify - .send(uuid) - .await - .map_err(|x| ApiError { - message: x.to_string(), - })?; - Ok(Response::new(DestroyZoneReply {})) - } - - async fn list_zones( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let zones = self.zones.list().await.map_err(ApiError::from)?; - let zones = zones.into_values().collect::>(); - Ok(Response::new(ListZonesReply { zones })) - } - - async fn resolve_zone_id( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let zones = self.zones.list().await.map_err(ApiError::from)?; - let zones = zones - .into_values() - .filter(|x| { - let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default(); - (!request.name.is_empty() && comparison_spec.name == request.name) - || x.id == request.name - }) - .collect::>(); - Ok(Response::new(ResolveZoneIdReply { - zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(), - })) - } - - async fn attach_zone_console( - &self, - request: Request>, - ) -> Result, Status> { - let mut input = request.into_inner(); - let Some(request) = input.next().await else { - return Err(ApiError { - message: "expected to have at least one request".to_string(), - } - .into()); - }; - let request = request?; - let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?; - let (sender, mut receiver) = channel(100); - let console = self - .console - .attach(uuid, sender) - .await - .map_err(|error| ApiError { - message: format!("failed to attach to console: {}", error), - })?; - - let output = try_stream! { - yield ZoneConsoleReply { data: console.initial.clone(), }; - loop { - let what = select! { - x = receiver.recv() => ConsoleDataSelect::Read(x), - x = input.next() => ConsoleDataSelect::Write(x), - }; - - match what { - ConsoleDataSelect::Read(Some(data)) => { - yield ZoneConsoleReply { data, }; - }, - - ConsoleDataSelect::Read(None) => { - break; - } - - ConsoleDataSelect::Write(Some(request)) => { - let request = request?; - if !request.data.is_empty() { - console.send(request.data).await.map_err(|error| ApiError { - message: error.to_string(), - })?; - } - }, - - ConsoleDataSelect::Write(None) => { - break; - } - } - } - }; - - Ok(Response::new( - Box::pin(output) as Self::AttachZoneConsoleStream - )) - } - - async fn read_zone_metrics( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?; - let client = self.idm.client(uuid).await.map_err(|error| ApiError { - message: error.to_string(), - })?; - - let response = client - .send(IdmRequest { - request: Some(IdmRequestType::Metrics(MetricsRequest {})), - }) - .await - .map_err(|error| ApiError { - message: error.to_string(), - })?; - - let mut reply = ReadZoneMetricsReply::default(); - if let Some(IdmResponseType::Metrics(metrics)) = response.response { - reply.root = metrics.root.map(idm_metric_to_api); - } - Ok(Response::new(reply)) - } - - async fn pull_image( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let name = ImageName::parse(&request.image).map_err(|err| ApiError { - message: err.to_string(), - })?; - let format = match request.format() { - OciImageFormat::Unknown => OciPackedFormat::Squashfs, - OciImageFormat::Squashfs => OciPackedFormat::Squashfs, - OciImageFormat::Erofs => OciPackedFormat::Erofs, - OciImageFormat::Tar => OciPackedFormat::Tar, - }; - let (context, mut receiver) = OciProgressContext::create(); - let our_packer = self.packer.clone(); - - let output = try_stream! { - let mut task = tokio::task::spawn(async move { - our_packer.request(name, format, request.overwrite_cache, request.update, context).await - }); - let abort_handle = task.abort_handle(); - let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { - handle.abort(); - }); - - loop { - let what = select! { - x = receiver.changed() => match x { - Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())), - Err(_) => PullImageSelect::Progress(None), - }, - x = &mut task => PullImageSelect::Completed(x), - }; - match what { - PullImageSelect::Progress(Some(progress)) => { - let reply = PullImageReply { - progress: Some(convert_oci_progress(progress)), - digest: String::new(), - format: OciImageFormat::Unknown.into(), - }; - yield reply; - }, - - PullImageSelect::Completed(result) => { - let result = result.map_err(|err| ApiError { - message: err.to_string(), - })?; - let packed = result.map_err(|err| ApiError { - message: err.to_string(), - })?; - let reply = PullImageReply { - progress: None, - digest: packed.digest, - format: match packed.format { - OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(), - OciPackedFormat::Erofs => OciImageFormat::Erofs.into(), - OciPackedFormat::Tar => OciImageFormat::Tar.into(), - }, - }; - yield reply; - break; - }, - - _ => { - continue; - } - } - } - }; - Ok(Response::new(Box::pin(output) as Self::PullImageStream)) - } - - async fn watch_events( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let mut events = self.events.subscribe(); - let output = try_stream! { - while let Ok(event) = events.recv().await { - yield WatchEventsReply { event: Some(event), }; - } - }; - Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) - } - - async fn snoop_idm( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let mut messages = self.idm.snoop(); - let zlt = self.zlt.clone(); - let output = try_stream! { - while let Ok(event) = messages.recv().await { - let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else { - continue; - }; - let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else { - continue; - }; - yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) }; - } - }; - Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream)) - } - - async fn list_devices( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let mut devices = Vec::new(); - let state = self.devices.copy().await.map_err(|error| ApiError { - message: error.to_string(), - })?; - for (name, state) in state { - devices.push(DeviceInfo { - name, - claimed: state.owner.is_some(), - owner: state.owner.map(|x| x.to_string()).unwrap_or_default(), - }); - } - Ok(Response::new(ListDevicesReply { devices })) - } - - async fn get_host_cpu_topology( - &self, - request: Request, - ) -> Result, Status> { - let _ = request.into_inner(); - let power = self - .runtime - .power_management_context() - .await - .map_err(ApiError::from)?; - let cputopo = power.cpu_topology().await.map_err(ApiError::from)?; - let mut cpus = vec![]; - - for cpu in cputopo { - cpus.push(HostCpuTopologyInfo { - core: cpu.core, - socket: cpu.socket, - node: cpu.node, - thread: cpu.thread, - class: cpu.class as i32, - }) - } - - Ok(Response::new(GetHostCpuTopologyReply { cpus })) - } - - async fn set_host_power_management_policy( - &self, - request: Request, - ) -> Result, Status> { - let policy = request.into_inner(); - let power = self - .runtime - .power_management_context() - .await - .map_err(ApiError::from)?; - let scheduler = &policy.scheduler; - - power - .set_smt_policy(policy.smt_awareness) - .await - .map_err(ApiError::from)?; - power - .set_scheduler_policy(scheduler) - .await - .map_err(ApiError::from)?; - - Ok(Response::new(SetHostPowerManagementPolicyReply {})) - } - - async fn get_zone( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let zones = self.zones.list().await.map_err(ApiError::from)?; - let zone = zones.get(&Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?); - Ok(Response::new(GetZoneReply { - zone: zone.cloned(), - })) - } - - async fn update_zone_resources( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError { - message: error.to_string(), - })?; - let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else { - return Err(ApiError { - message: "zone not found".to_string(), - } - .into()); - }; - - let Some(ref mut status) = zone.status else { - return Err(ApiError { - message: "zone state not available".to_string(), - } - .into()); - }; - - if status.state() != ZoneState::Created { - return Err(ApiError { - message: "zone is in an invalid state".to_string(), - } - .into()); - } - - if status.domid == 0 || status.domid == u32::MAX { - return Err(ApiError { - message: "zone domid is invalid".to_string(), - } - .into()); - } - - let mut resources = request.resources.unwrap_or_default(); - if resources.target_memory > resources.max_memory { - resources.max_memory = resources.target_memory; - } - - if resources.target_cpus < 1 { - resources.target_cpus = 1; - } - - let initial_resources = zone - .spec - .clone() - .unwrap_or_default() - .initial_resources - .unwrap_or_default(); - if resources.target_cpus > initial_resources.max_cpus { - resources.target_cpus = initial_resources.max_cpus; - } - resources.max_cpus = initial_resources.max_cpus; - - self.runtime - .set_memory_resources( - status.domid, - resources.target_memory * 1024 * 1024, - resources.max_memory * 1024 * 1024, - ) - .await - .map_err(|error| ApiError { - message: format!("failed to set memory resources: {}", error), - })?; - self.runtime - .set_cpu_resources(status.domid, resources.target_cpus) - .await - .map_err(|error| ApiError { - message: format!("failed to set cpu resources: {}", error), - })?; - status.resource_status = Some(ZoneResourceStatus { - active_resources: Some(resources), - }); - - self.zones - .update(uuid, zone) - .await - .map_err(ApiError::from)?; - Ok(Response::new(UpdateZoneResourcesReply {})) - } - - async fn read_hypervisor_console( - &self, - _request: Request, - ) -> Result, Status> { - let data = self - .runtime - .read_hypervisor_console(false) - .await - .map_err(|error| ApiError { - message: error.to_string(), - })?; - Ok(Response::new(ReadHypervisorConsoleReply { - data: data.to_string(), - })) - } -} diff --git a/crates/daemon/src/control/attach_zone_console.rs b/crates/daemon/src/control/attach_zone_console.rs new file mode 100644 index 0000000..cdc94ef --- /dev/null +++ b/crates/daemon/src/control/attach_zone_console.rs @@ -0,0 +1,82 @@ +use std::pin::Pin; +use std::str::FromStr; + +use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use tokio::select; +use tokio::sync::mpsc::channel; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Status, Streaming}; +use uuid::Uuid; + +use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest}; + +use crate::console::DaemonConsoleHandle; +use crate::control::ApiError; + +enum ConsoleDataSelect { + Read(Option>), + Write(Option>), +} + +pub struct AttachZoneConsoleRpc { + console: DaemonConsoleHandle, +} + +impl AttachZoneConsoleRpc { + pub fn new(console: DaemonConsoleHandle) -> Self { + Self { console } + } + + pub async fn process( + self, + mut input: Streaming, + ) -> Result> + Send + 'static>>> + { + let Some(request) = input.next().await else { + return Err(anyhow!("expected to have at least one request")); + }; + let request = request?; + let uuid = Uuid::from_str(&request.zone_id)?; + let (sender, mut receiver) = channel(100); + let console = self + .console + .attach(uuid, sender) + .await + .map_err(|error| anyhow!("failed to attach to console: {}", error))?; + + let output = try_stream! { + yield ZoneConsoleReply { data: console.initial.clone(), }; + loop { + let what = select! { + x = receiver.recv() => ConsoleDataSelect::Read(x), + x = input.next() => ConsoleDataSelect::Write(x), + }; + + match what { + ConsoleDataSelect::Read(Some(data)) => { + yield ZoneConsoleReply { data, }; + }, + + ConsoleDataSelect::Read(None) => { + break; + } + + ConsoleDataSelect::Write(Some(request)) => { + let request = request?; + if !request.data.is_empty() { + console.send(request.data).await.map_err(|error| ApiError { + message: error.to_string(), + })?; + } + }, + + ConsoleDataSelect::Write(None) => { + break; + } + } + } + }; + Ok(Box::pin(output)) + } +} diff --git a/crates/daemon/src/control/create_zone.rs b/crates/daemon/src/control/create_zone.rs new file mode 100644 index 0000000..00375ef --- /dev/null +++ b/crates/daemon/src/control/create_zone.rs @@ -0,0 +1,56 @@ +use crate::db::zone::ZoneStore; +use crate::zlt::ZoneLookupTable; +use anyhow::{anyhow, Result}; +use krata::v1::common::{Zone, ZoneState, ZoneStatus}; +use krata::v1::control::{CreateZoneReply, CreateZoneRequest}; +use tokio::sync::mpsc::Sender; +use uuid::Uuid; + +pub struct CreateZoneRpc { + zones: ZoneStore, + zlt: ZoneLookupTable, + zone_reconciler_notify: Sender, +} + +impl CreateZoneRpc { + pub fn new( + zones: ZoneStore, + zlt: ZoneLookupTable, + zone_reconciler_notify: Sender, + ) -> Self { + Self { + zones, + zlt, + zone_reconciler_notify, + } + } + + pub async fn process(self, request: CreateZoneRequest) -> Result { + let Some(spec) = request.spec else { + return Err(anyhow!("zone spec not provided")); + }; + let uuid = Uuid::new_v4(); + self.zones + .update( + uuid, + Zone { + id: uuid.to_string(), + status: Some(ZoneStatus { + state: ZoneState::Creating.into(), + network_status: None, + exit_status: None, + error_status: None, + resource_status: None, + host: self.zlt.host_uuid().to_string(), + domid: u32::MAX, + }), + spec: Some(spec), + }, + ) + .await?; + self.zone_reconciler_notify.send(uuid).await?; + Ok(CreateZoneReply { + zone_id: uuid.to_string(), + }) + } +} diff --git a/crates/daemon/src/control/destroy_zone.rs b/crates/daemon/src/control/destroy_zone.rs new file mode 100644 index 0000000..1f9a14a --- /dev/null +++ b/crates/daemon/src/control/destroy_zone.rs @@ -0,0 +1,42 @@ +use std::str::FromStr; + +use anyhow::{anyhow, Result}; +use tokio::sync::mpsc::Sender; +use uuid::Uuid; + +use krata::v1::common::ZoneState; +use krata::v1::control::{DestroyZoneReply, DestroyZoneRequest}; + +use crate::db::zone::ZoneStore; + +pub struct DestroyZoneRpc { + zones: ZoneStore, + zone_reconciler_notify: Sender, +} + +impl DestroyZoneRpc { + pub fn new(zones: ZoneStore, zone_reconciler_notify: Sender) -> Self { + Self { + zones, + zone_reconciler_notify, + } + } + + pub async fn process(self, request: DestroyZoneRequest) -> Result { + let uuid = Uuid::from_str(&request.zone_id)?; + let Some(mut zone) = self.zones.read(uuid).await? else { + return Err(anyhow!("zone not found")); + }; + + zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default()); + + if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed { + return Err(anyhow!("zone already destroyed")); + } + + zone.status.as_mut().unwrap().state = ZoneState::Destroying.into(); + self.zones.update(uuid, zone).await?; + self.zone_reconciler_notify.send(uuid).await?; + Ok(DestroyZoneReply {}) + } +} diff --git a/crates/daemon/src/control/exec_inside_zone.rs b/crates/daemon/src/control/exec_inside_zone.rs new file mode 100644 index 0000000..c08fc8c --- /dev/null +++ b/crates/daemon/src/control/exec_inside_zone.rs @@ -0,0 +1,116 @@ +use std::pin::Pin; +use std::str::FromStr; + +use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use tokio::select; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Status, Streaming}; +use uuid::Uuid; + +use krata::idm::internal::Request; +use krata::{ + idm::internal::{ + exec_stream_request_update::Update, request::Request as IdmRequestType, + response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart, + ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest, + }, + v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest}, +}; + +use crate::control::ApiError; +use crate::idm::DaemonIdmHandle; + +pub struct ExecInsideZoneRpc { + idm: DaemonIdmHandle, +} + +impl ExecInsideZoneRpc { + pub fn new(idm: DaemonIdmHandle) -> Self { + Self { idm } + } + + pub async fn process( + self, + mut input: Streaming, + ) -> Result> + Send + 'static>>> + { + let Some(request) = input.next().await else { + return Err(anyhow!("expected to have at least one request")); + }; + let request = request?; + + let Some(task) = request.task else { + return Err(anyhow!("task is missing")); + }; + + let uuid = Uuid::from_str(&request.zone_id)?; + let idm = self.idm.client(uuid).await?; + + let idm_request = Request { + request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { + update: Some(Update::Start(ExecStreamRequestStart { + environment: task + .environment + .into_iter() + .map(|x| ExecEnvVar { + key: x.key, + value: x.value, + }) + .collect(), + command: task.command, + working_directory: task.working_directory, + tty: task.tty, + })), + })), + }; + + let output = try_stream! { + let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError { + message: x.to_string(), + })?; + + loop { + select! { + x = input.next() => if let Some(update) = x { + let update: Result = update.map_err(|error| ApiError { + message: error.to_string() + }.into()); + + if let Ok(update) = update { + if !update.stdin.is_empty() { + let _ = handle.update(IdmRequest { + request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { + update: Some(Update::Stdin(ExecStreamRequestStdin { + data: update.stdin, + closed: update.stdin_closed, + })), + }))}).await; + } + } + }, + x = handle.receiver.recv() => match x { + Some(response) => { + let Some(IdmResponseType::ExecStream(update)) = response.response else { + break; + }; + let reply = ExecInsideZoneReply { + exited: update.exited, + error: update.error, + exit_code: update.exit_code, + stdout: update.stdout, + stderr: update.stderr, + }; + yield reply; + }, + None => { + break; + } + } + } + } + }; + + Ok(Box::pin(output)) + } +} diff --git a/crates/daemon/src/control/get_host_cpu_topology.rs b/crates/daemon/src/control/get_host_cpu_topology.rs new file mode 100644 index 0000000..1ec5139 --- /dev/null +++ b/crates/daemon/src/control/get_host_cpu_topology.rs @@ -0,0 +1,33 @@ +use anyhow::Result; +use krata::v1::control::{GetHostCpuTopologyReply, GetHostCpuTopologyRequest, HostCpuTopologyInfo}; +use kratart::Runtime; + +pub struct GetHostCpuTopologyRpc { + runtime: Runtime, +} + +impl GetHostCpuTopologyRpc { + pub fn new(runtime: Runtime) -> Self { + Self { runtime } + } + + pub async fn process( + self, + _request: GetHostCpuTopologyRequest, + ) -> Result { + let power = self.runtime.power_management_context().await?; + let cpu_topology = power.cpu_topology().await?; + let mut cpus = vec![]; + + for cpu in cpu_topology { + cpus.push(HostCpuTopologyInfo { + core: cpu.core, + socket: cpu.socket, + node: cpu.node, + thread: cpu.thread, + class: cpu.class as i32, + }) + } + Ok(GetHostCpuTopologyReply { cpus }) + } +} diff --git a/crates/daemon/src/control/get_zone.rs b/crates/daemon/src/control/get_zone.rs new file mode 100644 index 0000000..7fb8ad5 --- /dev/null +++ b/crates/daemon/src/control/get_zone.rs @@ -0,0 +1,24 @@ +use std::str::FromStr; + +use anyhow::Result; +use uuid::Uuid; + +use krata::v1::control::{GetZoneReply, GetZoneRequest}; + +use crate::db::zone::ZoneStore; + +pub struct GetZoneRpc { + zones: ZoneStore, +} + +impl GetZoneRpc { + pub fn new(zones: ZoneStore) -> Self { + Self { zones } + } + + pub async fn process(self, request: GetZoneRequest) -> Result { + let mut zones = self.zones.list().await?; + let zone = zones.remove(&Uuid::from_str(&request.zone_id)?); + Ok(GetZoneReply { zone }) + } +} diff --git a/crates/daemon/src/control/host_status.rs b/crates/daemon/src/control/host_status.rs new file mode 100644 index 0000000..34864b5 --- /dev/null +++ b/crates/daemon/src/control/host_status.rs @@ -0,0 +1,37 @@ +use crate::command::DaemonCommand; +use crate::ip::assignment::IpAssignment; +use crate::zlt::ZoneLookupTable; +use anyhow::Result; +use krata::v1::control::{HostStatusReply, HostStatusRequest}; + +pub struct HostStatusRpc { + ip: IpAssignment, + zlt: ZoneLookupTable, +} + +impl HostStatusRpc { + pub fn new(ip: IpAssignment, zlt: ZoneLookupTable) -> Self { + Self { ip, zlt } + } + + pub async fn process(self, _request: HostStatusRequest) -> Result { + let host_reservation = self.ip.retrieve(self.zlt.host_uuid()).await?; + Ok(HostStatusReply { + host_domid: self.zlt.host_domid(), + host_uuid: self.zlt.host_uuid().to_string(), + krata_version: DaemonCommand::version(), + host_ipv4: host_reservation + .as_ref() + .map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix)) + .unwrap_or_default(), + host_ipv6: host_reservation + .as_ref() + .map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix)) + .unwrap_or_default(), + host_mac: host_reservation + .as_ref() + .map(|x| x.mac.to_string().to_lowercase().replace('-', ":")) + .unwrap_or_default(), + }) + } +} diff --git a/crates/daemon/src/control/list_devices.rs b/crates/daemon/src/control/list_devices.rs new file mode 100644 index 0000000..ff05f39 --- /dev/null +++ b/crates/daemon/src/control/list_devices.rs @@ -0,0 +1,28 @@ +use anyhow::Result; + +use krata::v1::control::{DeviceInfo, ListDevicesReply, ListDevicesRequest}; + +use crate::devices::DaemonDeviceManager; + +pub struct ListDevicesRpc { + devices: DaemonDeviceManager, +} + +impl ListDevicesRpc { + pub fn new(devices: DaemonDeviceManager) -> Self { + Self { devices } + } + + pub async fn process(self, _request: ListDevicesRequest) -> Result { + let mut devices = Vec::new(); + let state = self.devices.copy().await?; + for (name, state) in state { + devices.push(DeviceInfo { + name, + claimed: state.owner.is_some(), + owner: state.owner.map(|x| x.to_string()).unwrap_or_default(), + }); + } + Ok(ListDevicesReply { devices }) + } +} diff --git a/crates/daemon/src/control/list_zones.rs b/crates/daemon/src/control/list_zones.rs new file mode 100644 index 0000000..cc9cb78 --- /dev/null +++ b/crates/daemon/src/control/list_zones.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use krata::v1::common::Zone; +use krata::v1::control::{ListZonesReply, ListZonesRequest}; + +use crate::db::zone::ZoneStore; + +pub struct ListZonesRpc { + zones: ZoneStore, +} + +impl ListZonesRpc { + pub fn new(zones: ZoneStore) -> Self { + Self { zones } + } + + pub async fn process(self, _request: ListZonesRequest) -> Result { + let zones = self.zones.list().await?; + let zones = zones.into_values().collect::>(); + Ok(ListZonesReply { zones }) + } +} diff --git a/crates/daemon/src/control/mod.rs b/crates/daemon/src/control/mod.rs new file mode 100644 index 0000000..69897e4 --- /dev/null +++ b/crates/daemon/src/control/mod.rs @@ -0,0 +1,351 @@ +use std::pin::Pin; + +use anyhow::Error; +use futures::Stream; +use tokio::sync::mpsc::Sender; +use tonic::{Request, Response, Status, Streaming}; +use uuid::Uuid; + +use krata::v1::control::{ + control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, DestroyZoneReply, + DestroyZoneRequest, ExecInsideZoneReply, ExecInsideZoneRequest, GetHostCpuTopologyReply, + GetHostCpuTopologyRequest, HostStatusReply, HostStatusRequest, ListDevicesReply, + ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest, + ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply, + ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply, + SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, WatchEventsReply, + WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest, +}; +use krata::v1::control::{ + GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply, + SetHostPowerManagementPolicyRequest, +}; +use krataoci::packer::service::OciPackerService; +use kratart::Runtime; + +use crate::control::attach_zone_console::AttachZoneConsoleRpc; +use crate::control::create_zone::CreateZoneRpc; +use crate::control::destroy_zone::DestroyZoneRpc; +use crate::control::exec_inside_zone::ExecInsideZoneRpc; +use crate::control::get_host_cpu_topology::GetHostCpuTopologyRpc; +use crate::control::get_zone::GetZoneRpc; +use crate::control::host_status::HostStatusRpc; +use crate::control::list_devices::ListDevicesRpc; +use crate::control::list_zones::ListZonesRpc; +use crate::control::pull_image::PullImageRpc; +use crate::control::read_hypervisor_console::ReadHypervisorConsoleRpc; +use crate::control::read_zone_metrics::ReadZoneMetricsRpc; +use crate::control::resolve_zone_id::ResolveZoneIdRpc; +use crate::control::set_host_power_management_policy::SetHostPowerManagementPolicyRpc; +use crate::control::snoop_idm::SnoopIdmRpc; +use crate::control::update_zone_resources::UpdateZoneResourcesRpc; +use crate::control::watch_events::WatchEventsRpc; +use crate::db::zone::ZoneStore; +use crate::ip::assignment::IpAssignment; +use crate::{ + console::DaemonConsoleHandle, devices::DaemonDeviceManager, event::DaemonEventContext, + idm::DaemonIdmHandle, zlt::ZoneLookupTable, +}; + +pub mod attach_zone_console; +pub mod create_zone; +pub mod destroy_zone; +pub mod exec_inside_zone; +pub mod get_host_cpu_topology; +pub mod get_zone; +pub mod host_status; +pub mod list_devices; +pub mod list_zones; +pub mod pull_image; +pub mod read_hypervisor_console; +pub mod read_zone_metrics; +pub mod resolve_zone_id; +pub mod set_host_power_management_policy; +pub mod snoop_idm; +pub mod update_zone_resources; +pub mod watch_events; + +pub struct ApiError { + message: String, +} + +impl From for ApiError { + fn from(value: Error) -> Self { + ApiError { + message: value.to_string(), + } + } +} + +impl From for Status { + fn from(value: ApiError) -> Self { + Status::unknown(value.message) + } +} + +#[derive(Clone)] +pub struct DaemonControlService { + zlt: ZoneLookupTable, + devices: DaemonDeviceManager, + events: DaemonEventContext, + console: DaemonConsoleHandle, + idm: DaemonIdmHandle, + zones: ZoneStore, + ip: IpAssignment, + zone_reconciler_notify: Sender, + packer: OciPackerService, + runtime: Runtime, +} + +impl DaemonControlService { + #[allow(clippy::too_many_arguments)] + pub fn new( + zlt: ZoneLookupTable, + devices: DaemonDeviceManager, + events: DaemonEventContext, + console: DaemonConsoleHandle, + idm: DaemonIdmHandle, + zones: ZoneStore, + ip: IpAssignment, + zone_reconciler_notify: Sender, + packer: OciPackerService, + runtime: Runtime, + ) -> Self { + Self { + zlt, + devices, + events, + console, + idm, + zones, + ip, + zone_reconciler_notify, + packer, + runtime, + } + } +} + +#[tonic::async_trait] +impl ControlService for DaemonControlService { + async fn host_status( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + HostStatusRpc::new(self.ip.clone(), self.zlt.clone()) + .process(request) + .await, + ) + } + + type SnoopIdmStream = + Pin> + Send + 'static>>; + + async fn snoop_idm( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + SnoopIdmRpc::new(self.idm.clone(), self.zlt.clone()) + .process(request) + .await, + ) + } + + async fn get_host_cpu_topology( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + GetHostCpuTopologyRpc::new(self.runtime.clone()) + .process(request) + .await, + ) + } + + async fn set_host_power_management_policy( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + SetHostPowerManagementPolicyRpc::new(self.runtime.clone()) + .process(request) + .await, + ) + } + + async fn list_devices( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + ListDevicesRpc::new(self.devices.clone()) + .process(request) + .await, + ) + } + + type PullImageStream = + Pin> + Send + 'static>>; + + async fn pull_image( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + PullImageRpc::new(self.packer.clone()) + .process(request) + .await, + ) + } + + async fn create_zone( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + CreateZoneRpc::new( + self.zones.clone(), + self.zlt.clone(), + self.zone_reconciler_notify.clone(), + ) + .process(request) + .await, + ) + } + + async fn destroy_zone( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + DestroyZoneRpc::new(self.zones.clone(), self.zone_reconciler_notify.clone()) + .process(request) + .await, + ) + } + + async fn resolve_zone_id( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + ResolveZoneIdRpc::new(self.zones.clone()) + .process(request) + .await, + ) + } + + async fn get_zone( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt(GetZoneRpc::new(self.zones.clone()).process(request).await) + } + + async fn update_zone_resources( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + UpdateZoneResourcesRpc::new(self.runtime.clone(), self.zones.clone()) + .process(request) + .await, + ) + } + + async fn list_zones( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt(ListZonesRpc::new(self.zones.clone()).process(request).await) + } + + type AttachZoneConsoleStream = + Pin> + Send + 'static>>; + + async fn attach_zone_console( + &self, + request: Request>, + ) -> Result, Status> { + let input = request.into_inner(); + adapt( + AttachZoneConsoleRpc::new(self.console.clone()) + .process(input) + .await, + ) + } + + type ExecInsideZoneStream = + Pin> + Send + 'static>>; + + async fn exec_inside_zone( + &self, + request: Request>, + ) -> Result, Status> { + let input = request.into_inner(); + adapt( + ExecInsideZoneRpc::new(self.idm.clone()) + .process(input) + .await, + ) + } + + async fn read_zone_metrics( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + ReadZoneMetricsRpc::new(self.idm.clone()) + .process(request) + .await, + ) + } + + type WatchEventsStream = + Pin> + Send + 'static>>; + + async fn watch_events( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + WatchEventsRpc::new(self.events.clone()) + .process(request) + .await, + ) + } + + async fn read_hypervisor_console( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + adapt( + ReadHypervisorConsoleRpc::new(self.runtime.clone()) + .process(request) + .await, + ) + } +} + +fn adapt(result: anyhow::Result) -> Result, Status> { + result + .map(Response::new) + .map_err(|error| Status::unknown(error.to_string())) +} diff --git a/crates/daemon/src/control/pull_image.rs b/crates/daemon/src/control/pull_image.rs new file mode 100644 index 0000000..b54cb6c --- /dev/null +++ b/crates/daemon/src/control/pull_image.rs @@ -0,0 +1,100 @@ +use crate::control::ApiError; +use crate::oci::convert_oci_progress; +use anyhow::Result; +use async_stream::try_stream; +use krata::v1::common::OciImageFormat; +use krata::v1::control::{PullImageReply, PullImageRequest}; +use krataoci::name::ImageName; +use krataoci::packer::service::OciPackerService; +use krataoci::packer::{OciPackedFormat, OciPackedImage}; +use krataoci::progress::{OciProgress, OciProgressContext}; +use std::pin::Pin; +use tokio::select; +use tokio::task::JoinError; +use tokio_stream::Stream; +use tonic::Status; + +enum PullImageSelect { + Progress(Option), + Completed(Result, JoinError>), +} + +pub struct PullImageRpc { + packer: OciPackerService, +} + +impl PullImageRpc { + pub fn new(packer: OciPackerService) -> Self { + Self { packer } + } + + pub async fn process( + self, + request: PullImageRequest, + ) -> Result> + Send + 'static>>> { + let name = ImageName::parse(&request.image)?; + let format = match request.format() { + OciImageFormat::Unknown => OciPackedFormat::Squashfs, + OciImageFormat::Squashfs => OciPackedFormat::Squashfs, + OciImageFormat::Erofs => OciPackedFormat::Erofs, + OciImageFormat::Tar => OciPackedFormat::Tar, + }; + let (context, mut receiver) = OciProgressContext::create(); + let our_packer = self.packer; + + let output = try_stream! { + let mut task = tokio::task::spawn(async move { + our_packer.request(name, format, request.overwrite_cache, request.update, context).await + }); + let abort_handle = task.abort_handle(); + let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { + handle.abort(); + }); + + loop { + let what = select! { + x = receiver.changed() => match x { + Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())), + Err(_) => PullImageSelect::Progress(None), + }, + x = &mut task => PullImageSelect::Completed(x), + }; + match what { + PullImageSelect::Progress(Some(progress)) => { + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: OciImageFormat::Unknown.into(), + }; + yield reply; + }, + + PullImageSelect::Completed(result) => { + let result = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let packed = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let reply = PullImageReply { + progress: None, + digest: packed.digest, + format: match packed.format { + OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(), + OciPackedFormat::Erofs => OciImageFormat::Erofs.into(), + OciPackedFormat::Tar => OciImageFormat::Tar.into(), + }, + }; + yield reply; + break; + }, + + _ => { + continue; + } + } + } + }; + Ok(Box::pin(output)) + } +} diff --git a/crates/daemon/src/control/read_hypervisor_console.rs b/crates/daemon/src/control/read_hypervisor_console.rs new file mode 100644 index 0000000..ff10261 --- /dev/null +++ b/crates/daemon/src/control/read_hypervisor_console.rs @@ -0,0 +1,23 @@ +use anyhow::Result; +use krata::v1::control::{ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest}; +use kratart::Runtime; + +pub struct ReadHypervisorConsoleRpc { + runtime: Runtime, +} + +impl ReadHypervisorConsoleRpc { + pub fn new(runtime: Runtime) -> Self { + Self { runtime } + } + + pub async fn process( + self, + _: ReadHypervisorConsoleRequest, + ) -> Result { + let data = self.runtime.read_hypervisor_console(false).await?; + Ok(ReadHypervisorConsoleReply { + data: data.to_string(), + }) + } +} diff --git a/crates/daemon/src/control/read_zone_metrics.rs b/crates/daemon/src/control/read_zone_metrics.rs new file mode 100644 index 0000000..686bdc3 --- /dev/null +++ b/crates/daemon/src/control/read_zone_metrics.rs @@ -0,0 +1,40 @@ +use std::str::FromStr; + +use anyhow::Result; +use uuid::Uuid; + +use krata::idm::internal::MetricsRequest; +use krata::idm::internal::{ + request::Request as IdmRequestType, response::Response as IdmResponseType, + Request as IdmRequest, +}; +use krata::v1::control::{ReadZoneMetricsReply, ReadZoneMetricsRequest}; + +use crate::idm::DaemonIdmHandle; +use crate::metrics::idm_metric_to_api; + +pub struct ReadZoneMetricsRpc { + idm: DaemonIdmHandle, +} + +impl ReadZoneMetricsRpc { + pub fn new(idm: DaemonIdmHandle) -> Self { + Self { idm } + } + + pub async fn process(self, request: ReadZoneMetricsRequest) -> Result { + let uuid = Uuid::from_str(&request.zone_id)?; + let client = self.idm.client(uuid).await?; + let response = client + .send(IdmRequest { + request: Some(IdmRequestType::Metrics(MetricsRequest {})), + }) + .await?; + + let mut reply = ReadZoneMetricsReply::default(); + if let Some(IdmResponseType::Metrics(metrics)) = response.response { + reply.root = metrics.root.map(idm_metric_to_api); + } + Ok(reply) + } +} diff --git a/crates/daemon/src/control/resolve_zone_id.rs b/crates/daemon/src/control/resolve_zone_id.rs new file mode 100644 index 0000000..e2540ea --- /dev/null +++ b/crates/daemon/src/control/resolve_zone_id.rs @@ -0,0 +1,30 @@ +use anyhow::Result; +use krata::v1::common::Zone; +use krata::v1::control::{ResolveZoneIdReply, ResolveZoneIdRequest}; + +use crate::db::zone::ZoneStore; + +pub struct ResolveZoneIdRpc { + zones: ZoneStore, +} + +impl ResolveZoneIdRpc { + pub fn new(zones: ZoneStore) -> Self { + Self { zones } + } + + pub async fn process(self, request: ResolveZoneIdRequest) -> Result { + let zones = self.zones.list().await?; + let zones = zones + .into_values() + .filter(|x| { + let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default(); + (!request.name.is_empty() && comparison_spec.name == request.name) + || x.id == request.name + }) + .collect::>(); + Ok(ResolveZoneIdReply { + zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(), + }) + } +} diff --git a/crates/daemon/src/control/set_host_power_management_policy.rs b/crates/daemon/src/control/set_host_power_management_policy.rs new file mode 100644 index 0000000..aeff109 --- /dev/null +++ b/crates/daemon/src/control/set_host_power_management_policy.rs @@ -0,0 +1,25 @@ +use anyhow::Result; +use krata::v1::control::{SetHostPowerManagementPolicyReply, SetHostPowerManagementPolicyRequest}; +use kratart::Runtime; + +pub struct SetHostPowerManagementPolicyRpc { + runtime: Runtime, +} + +impl SetHostPowerManagementPolicyRpc { + pub fn new(runtime: Runtime) -> Self { + Self { runtime } + } + + pub async fn process( + self, + request: SetHostPowerManagementPolicyRequest, + ) -> Result { + let power = self.runtime.power_management_context().await?; + let scheduler = &request.scheduler; + + power.set_smt_policy(request.smt_awareness).await?; + power.set_scheduler_policy(scheduler).await?; + Ok(SetHostPowerManagementPolicyReply {}) + } +} diff --git a/crates/daemon/src/control/snoop_idm.rs b/crates/daemon/src/control/snoop_idm.rs new file mode 100644 index 0000000..c48d54d --- /dev/null +++ b/crates/daemon/src/control/snoop_idm.rs @@ -0,0 +1,39 @@ +use crate::idm::DaemonIdmHandle; +use crate::zlt::ZoneLookupTable; +use anyhow::Result; +use async_stream::try_stream; +use krata::v1::control::{SnoopIdmReply, SnoopIdmRequest}; +use std::pin::Pin; +use tokio_stream::Stream; +use tonic::Status; + +pub struct SnoopIdmRpc { + idm: DaemonIdmHandle, + zlt: ZoneLookupTable, +} + +impl SnoopIdmRpc { + pub fn new(idm: DaemonIdmHandle, zlt: ZoneLookupTable) -> Self { + Self { idm, zlt } + } + + pub async fn process( + self, + _request: SnoopIdmRequest, + ) -> Result> + Send + 'static>>> { + let mut messages = self.idm.snoop(); + let zlt = self.zlt.clone(); + let output = try_stream! { + while let Ok(event) = messages.recv().await { + let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else { + continue; + }; + let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else { + continue; + }; + yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) }; + } + }; + Ok(Box::pin(output)) + } +} diff --git a/crates/daemon/src/control/update_zone_resources.rs b/crates/daemon/src/control/update_zone_resources.rs new file mode 100644 index 0000000..74385a9 --- /dev/null +++ b/crates/daemon/src/control/update_zone_resources.rs @@ -0,0 +1,82 @@ +use std::str::FromStr; + +use anyhow::{anyhow, Result}; +use uuid::Uuid; + +use krata::v1::common::{ZoneResourceStatus, ZoneState}; +use krata::v1::control::{UpdateZoneResourcesReply, UpdateZoneResourcesRequest}; +use kratart::Runtime; + +use crate::db::zone::ZoneStore; + +pub struct UpdateZoneResourcesRpc { + runtime: Runtime, + zones: ZoneStore, +} + +impl UpdateZoneResourcesRpc { + pub fn new(runtime: Runtime, zones: ZoneStore) -> Self { + Self { runtime, zones } + } + + pub async fn process( + self, + request: UpdateZoneResourcesRequest, + ) -> Result { + let uuid = Uuid::from_str(&request.zone_id)?; + let Some(mut zone) = self.zones.read(uuid).await? else { + return Err(anyhow!("zone not found")); + }; + + let Some(ref mut status) = zone.status else { + return Err(anyhow!("zone state not available")); + }; + + if status.state() != ZoneState::Created { + return Err(anyhow!("zone is in an invalid state")); + } + + if status.domid == 0 || status.domid == u32::MAX { + return Err(anyhow!("zone domid is invalid")); + } + + let mut resources = request.resources.unwrap_or_default(); + if resources.target_memory > resources.max_memory { + resources.max_memory = resources.target_memory; + } + + if resources.target_cpus < 1 { + resources.target_cpus = 1; + } + + let initial_resources = zone + .spec + .clone() + .unwrap_or_default() + .initial_resources + .unwrap_or_default(); + if resources.target_cpus > initial_resources.max_cpus { + resources.target_cpus = initial_resources.max_cpus; + } + resources.max_cpus = initial_resources.max_cpus; + + self.runtime + .set_memory_resources( + status.domid, + resources.target_memory * 1024 * 1024, + resources.max_memory * 1024 * 1024, + ) + .await + .map_err(|error| anyhow!("failed to set memory resources: {}", error))?; + self.runtime + .set_cpu_resources(status.domid, resources.target_cpus) + .await + .map_err(|error| anyhow!("failed to set cpu resources: {}", error))?; + status.resource_status = Some(ZoneResourceStatus { + active_resources: Some(resources), + }); + + self.zones.update(uuid, zone).await?; + Ok(UpdateZoneResourcesReply {}) + } +} diff --git a/crates/daemon/src/control/watch_events.rs b/crates/daemon/src/control/watch_events.rs new file mode 100644 index 0000000..c1438de --- /dev/null +++ b/crates/daemon/src/control/watch_events.rs @@ -0,0 +1,31 @@ +use crate::event::DaemonEventContext; +use anyhow::Result; +use async_stream::try_stream; +use krata::v1::control::{WatchEventsReply, WatchEventsRequest}; +use std::pin::Pin; +use tokio_stream::Stream; +use tonic::Status; + +pub struct WatchEventsRpc { + events: DaemonEventContext, +} + +impl WatchEventsRpc { + pub fn new(events: DaemonEventContext) -> Self { + Self { events } + } + + pub async fn process( + self, + _request: WatchEventsRequest, + ) -> Result> + Send + 'static>>> + { + let mut events = self.events.subscribe(); + let output = try_stream! { + while let Ok(event) = events.recv().await { + yield WatchEventsReply { event: Some(event), }; + } + }; + Ok(Box::pin(output)) + } +} diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 1171c1d..0205efc 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -41,7 +41,6 @@ pub mod metrics; pub mod oci; pub mod reconcile; pub mod zlt; - pub struct Daemon { store: String, _config: Arc,