diff --git a/crates/ctl/src/cli/destroy.rs b/crates/ctl/src/cli/destroy.rs index 3652bf4..60db259 100644 --- a/crates/ctl/src/cli/destroy.rs +++ b/crates/ctl/src/cli/destroy.rs @@ -52,31 +52,30 @@ impl DestroyCommand { async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - if let Event::GuestChanged(changed) = event { - let Some(guest) = changed.guest else { - continue; - }; + let Event::GuestChanged(changed) = event; + let Some(guest) = changed.guest else { + continue; + }; - if guest.id != id { - continue; + if guest.id != id { + continue; + } + + let Some(state) = guest.state else { + continue; + }; + + if let Some(ref error) = state.error_info { + if state.status() == GuestStatus::Failed { + error!("destroy failed: {}", error.message); + std::process::exit(1); + } else { + error!("guest error: {}", error.message); } + } - let Some(state) = guest.state else { - continue; - }; - - if let Some(ref error) = state.error_info { - if state.status() == GuestStatus::Failed { - error!("destroy failed: {}", error.message); - std::process::exit(1); - } else { - error!("guest error: {}", error.message); - } - } - - if state.status() == GuestStatus::Destroyed { - std::process::exit(0); - } + if state.status() == GuestStatus::Destroyed { + std::process::exit(0); } } Ok(()) diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index 83f2364..d202d3f 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -2,17 +2,16 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use krata::{ events::EventStream, v1::{ common::{ - guest_image_spec::Image, GuestImageSpec, GuestOciImageSpec, GuestSpec, GuestStatus, - GuestTaskSpec, GuestTaskSpecEnvVar, + guest_image_spec::Image, GuestImageSpec, GuestOciImageFormat, GuestOciImageSpec, + GuestSpec, GuestStatus, GuestTaskSpec, GuestTaskSpecEnvVar, }, control::{ control_service_client::ControlServiceClient, watch_events_reply::Event, - CreateGuestRequest, OciProgressEventLayerPhase, OciProgressEventPhase, + CreateGuestRequest, PullImageRequest, }, }, }; @@ -20,11 +19,15 @@ use log::error; use tokio::select; use tonic::{transport::Channel, Request}; -use crate::console::StdioConsoleStream; +use crate::{console::StdioConsoleStream, pull::pull_interactive_progress}; + +use super::pull::PullImageFormat; #[derive(Parser)] #[command(about = "Launch a new guest")] pub struct LauchCommand { + #[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] + image_format: PullImageFormat, #[arg(short, long, help = "Name of the guest")] name: Option, #[arg( @@ -71,11 +74,25 @@ impl LauchCommand { mut client: ControlServiceClient, events: EventStream, ) -> Result<()> { + let response = client + .pull_image(PullImageRequest { + image: self.oci.clone(), + format: match self.image_format { + PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }) + .await?; + let reply = pull_interactive_progress(response.into_inner()).await?; + let request = CreateGuestRequest { spec: Some(GuestSpec { name: self.name.unwrap_or_default(), image: Some(GuestImageSpec { - image: Some(Image::Oci(GuestOciImageSpec { image: self.oci })), + image: Some(Image::Oci(GuestOciImageSpec { + digest: reply.digest, + format: reply.format, + })), }), vcpus: self.cpus, mem: self.mem, @@ -126,14 +143,9 @@ impl LauchCommand { async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); - let mut multi_progress: Option<(MultiProgress, HashMap)> = None; while let Ok(event) = stream.recv().await { match event { Event::GuestChanged(changed) => { - if let Some((multi_progress, _)) = multi_progress.as_mut() { - let _ = multi_progress.clear(); - } - let Some(guest) = changed.guest else { continue; }; @@ -164,102 +176,6 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { break; } } - - Event::OciProgress(oci) => { - if multi_progress.is_none() { - multi_progress = Some((MultiProgress::new(), HashMap::new())); - } - - let Some((multi_progress, progresses)) = multi_progress.as_mut() else { - continue; - }; - - match oci.phase() { - OciProgressEventPhase::Resolved - | OciProgressEventPhase::ConfigAcquire - | OciProgressEventPhase::LayerAcquire => { - if progresses.is_empty() && !oci.layers.is_empty() { - for layer in &oci.layers { - let bar = ProgressBar::new(layer.total); - bar.set_style( - ProgressStyle::with_template("{msg} {wide_bar}").unwrap(), - ); - progresses.insert(layer.id.clone(), bar.clone()); - multi_progress.add(bar); - } - } - - for layer in oci.layers { - let Some(progress) = progresses.get_mut(&layer.id) else { - continue; - }; - - let phase = match layer.phase() { - OciProgressEventLayerPhase::Waiting => "waiting", - OciProgressEventLayerPhase::Downloading => "downloading", - OciProgressEventLayerPhase::Downloaded => "downloaded", - OciProgressEventLayerPhase::Extracting => "extracting", - OciProgressEventLayerPhase::Extracted => "extracted", - _ => "unknown", - }; - - let simple = if let Some((_, hash)) = layer.id.split_once(':') { - hash - } else { - id - }; - let simple = if simple.len() > 10 { - &simple[0..10] - } else { - simple - }; - let message = format!("{:width$} {}", simple, phase, width = 10); - - if message != progress.message() { - progress.set_message(message); - } - - progress.update(|state| { - state.set_len(layer.total); - state.set_pos(layer.value); - }); - } - } - - OciProgressEventPhase::Packing => { - for (key, bar) in &mut *progresses { - if key == "packing" { - continue; - } - bar.finish_and_clear(); - multi_progress.remove(bar); - } - progresses.retain(|k, _| k == "packing"); - if progresses.is_empty() { - let progress = ProgressBar::new(100); - progress.set_message("packing"); - progress.set_style( - ProgressStyle::with_template("{msg} {wide_bar}").unwrap(), - ); - progresses.insert("packing".to_string(), progress); - } - let Some(progress) = progresses.get("packing") else { - continue; - }; - - progress.update(|state| { - state.set_len(oci.total); - state.set_pos(oci.value); - }); - } - - _ => {} - } - - for progress in progresses { - progress.1.tick(); - } - } } } Ok(()) diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index aeaaeaf..17cd396 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -5,6 +5,7 @@ pub mod launch; pub mod list; pub mod logs; pub mod metrics; +pub mod pull; pub mod resolve; pub mod top; pub mod watch; @@ -21,7 +22,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, - resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, + pull::PullCommand, resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -48,6 +49,7 @@ pub enum Commands { Destroy(DestroyCommand), List(ListCommand), Attach(AttachCommand), + Pull(PullCommand), Logs(LogsCommand), Watch(WatchCommand), Resolve(ResolveCommand), @@ -101,6 +103,10 @@ impl ControlCommand { Commands::Top(top) => { top.run(client, events).await?; } + + Commands::Pull(pull) => { + pull.run(client).await?; + } } Ok(()) } diff --git a/crates/ctl/src/cli/pull.rs b/crates/ctl/src/cli/pull.rs new file mode 100644 index 0000000..50ab84c --- /dev/null +++ b/crates/ctl/src/cli/pull.rs @@ -0,0 +1,42 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use krata::v1::{ + common::GuestOciImageFormat, + control::{control_service_client::ControlServiceClient, PullImageRequest}, +}; + +use tonic::transport::Channel; + +use crate::pull::pull_interactive_progress; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +pub enum PullImageFormat { + Squashfs, + Erofs, +} + +#[derive(Parser)] +#[command(about = "Pull an image into the cache")] +pub struct PullCommand { + #[arg(help = "Image name")] + image: String, + #[arg(short = 's', long, default_value = "squashfs", help = "Image format")] + image_format: PullImageFormat, +} + +impl PullCommand { + pub async fn run(self, mut client: ControlServiceClient) -> Result<()> { + let response = client + .pull_image(PullImageRequest { + image: self.image.clone(), + format: match self.image_format { + PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }) + .await?; + let reply = pull_interactive_progress(response.into_inner()).await?; + println!("{}", reply.digest); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/watch.rs b/crates/ctl/src/cli/watch.rs index 3a723c9..e535ca6 100644 --- a/crates/ctl/src/cli/watch.rs +++ b/crates/ctl/src/cli/watch.rs @@ -29,10 +29,9 @@ impl WatchCommand { loop { let event = stream.recv().await?; - if let Event::GuestChanged(changed) = event { - let guest = changed.guest.clone(); - self.print_event("guest.changed", changed, guest)?; - } + let Event::GuestChanged(changed) = event; + let guest = changed.guest.clone(); + self.print_event("guest.changed", changed, guest)?; } } diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index 33136a1..a57edd8 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -69,27 +69,26 @@ impl StdioConsoleStream { Ok(tokio::task::spawn(async move { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - if let Event::GuestChanged(changed) = event { - let Some(guest) = changed.guest else { - continue; - }; + let Event::GuestChanged(changed) = event; + let Some(guest) = changed.guest else { + continue; + }; - let Some(state) = guest.state else { - continue; - }; + let Some(state) = guest.state else { + continue; + }; - if guest.id != id { - continue; - } + if guest.id != id { + continue; + } - if let Some(exit_info) = state.exit_info { - return Some(exit_info.code); - } + if let Some(exit_info) = state.exit_info { + return Some(exit_info.code); + } - let status = state.status(); - if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { - return Some(10); - } + let status = state.status(); + if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { + return Some(10); } } None diff --git a/crates/ctl/src/lib.rs b/crates/ctl/src/lib.rs index c3aeefe..82ee22d 100644 --- a/crates/ctl/src/lib.rs +++ b/crates/ctl/src/lib.rs @@ -2,3 +2,4 @@ pub mod cli; pub mod console; pub mod format; pub mod metrics; +pub mod pull; diff --git a/crates/ctl/src/metrics.rs b/crates/ctl/src/metrics.rs index 1cef438..570d593 100644 --- a/crates/ctl/src/metrics.rs +++ b/crates/ctl/src/metrics.rs @@ -82,7 +82,7 @@ impl MultiMetricCollector { let collect = select! { x = events.recv() => match x { Ok(event) => { - if let Event::GuestChanged(changed) = event { + let Event::GuestChanged(changed) = event; let Some(guest) = changed.guest else { continue; }; @@ -93,7 +93,6 @@ impl MultiMetricCollector { if state.status() != GuestStatus::Destroying { guests.push(guest); } - } false }, diff --git a/crates/ctl/src/pull.rs b/crates/ctl/src/pull.rs new file mode 100644 index 0000000..14313b0 --- /dev/null +++ b/crates/ctl/src/pull.rs @@ -0,0 +1,118 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use krata::v1::control::{PullImageProgressLayerPhase, PullImageProgressPhase, PullImageReply}; +use tokio_stream::StreamExt; +use tonic::Streaming; + +pub async fn pull_interactive_progress( + mut stream: Streaming, +) -> Result { + let mut multi_progress: Option<(MultiProgress, HashMap)> = None; + + while let Some(reply) = stream.next().await { + let reply = reply?; + + if reply.progress.is_none() && !reply.digest.is_empty() { + return Ok(reply); + } + + let Some(oci) = reply.progress else { + continue; + }; + + if multi_progress.is_none() { + multi_progress = Some((MultiProgress::new(), HashMap::new())); + } + + let Some((multi_progress, progresses)) = multi_progress.as_mut() else { + continue; + }; + + match oci.phase() { + PullImageProgressPhase::Resolved + | PullImageProgressPhase::ConfigAcquire + | PullImageProgressPhase::LayerAcquire => { + if progresses.is_empty() && !oci.layers.is_empty() { + for layer in &oci.layers { + let bar = ProgressBar::new(layer.total); + bar.set_style(ProgressStyle::with_template("{msg} {wide_bar}").unwrap()); + progresses.insert(layer.id.clone(), bar.clone()); + multi_progress.add(bar); + } + } + + for layer in oci.layers { + let Some(progress) = progresses.get_mut(&layer.id) else { + continue; + }; + + let phase = match layer.phase() { + PullImageProgressLayerPhase::Waiting => "waiting", + PullImageProgressLayerPhase::Downloading => "downloading", + PullImageProgressLayerPhase::Downloaded => "downloaded", + PullImageProgressLayerPhase::Extracting => "extracting", + PullImageProgressLayerPhase::Extracted => "extracted", + _ => "unknown", + }; + + let simple = if let Some((_, hash)) = layer.id.split_once(':') { + hash + } else { + "unknown" + }; + let simple = if simple.len() > 10 { + &simple[0..10] + } else { + simple + }; + let message = format!( + "{:width$} {:phwidth$}", + simple, + phase, + width = 10, + phwidth = 11 + ); + + if message != progress.message() { + progress.set_message(message); + } + + progress.update(|state| { + state.set_len(layer.total); + state.set_pos(layer.value); + }); + } + } + + PullImageProgressPhase::Packing => { + for (key, bar) in &mut *progresses { + if key == "packing" { + continue; + } + bar.finish_and_clear(); + multi_progress.remove(bar); + } + progresses.retain(|k, _| k == "packing"); + if progresses.is_empty() { + let progress = ProgressBar::new(100); + progress.set_message("packing "); + progress.set_style(ProgressStyle::with_template("{msg} {wide_bar}").unwrap()); + progresses.insert("packing".to_string(), progress); + } + let Some(progress) = progresses.get("packing") else { + continue; + }; + + progress.update(|state| { + state.set_len(oci.total); + state.set_pos(oci.value); + }); + } + + _ => {} + } + } + Err(anyhow!("never received final reply for image pull")) +} diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 596d11d..ca79e76 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -8,19 +8,28 @@ use krata::{ IdmMetricsRequest, }, v1::{ - common::{Guest, GuestState, GuestStatus}, + common::{Guest, GuestOciImageFormat, GuestState, GuestStatus}, control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, - ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest, - WatchEventsReply, WatchEventsRequest, + ListGuestsReply, ListGuestsRequest, PullImageReply, PullImageRequest, + ReadGuestMetricsReply, ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest, + SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, }, }, }; +use krataoci::{ + name::ImageName, + packer::{service::OciPackerService, OciImagePacked, OciPackedFormat}, + progress::{OciProgress, OciProgressContext}, +}; use tokio::{ select, - sync::mpsc::{channel, Sender}, + sync::{ + broadcast, + mpsc::{channel, Sender}, + }, + task::JoinError, }; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -28,7 +37,7 @@ use uuid::Uuid; use crate::{ console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, - metrics::idm_metric_to_api, + metrics::idm_metric_to_api, oci::convert_oci_progress, }; pub struct ApiError { @@ -50,21 +59,23 @@ impl From for Status { } #[derive(Clone)] -pub struct RuntimeControlService { +pub struct DaemonControlService { events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, + packer: OciPackerService, } -impl RuntimeControlService { +impl DaemonControlService { pub fn new( events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, + packer: OciPackerService, ) -> Self { Self { events, @@ -72,6 +83,7 @@ impl RuntimeControlService { idm, guests, guest_reconciler_notify, + packer, } } } @@ -81,11 +93,19 @@ enum ConsoleDataSelect { Write(Option>), } +enum PullImageSelect { + Progress(Option), + Completed(Result, JoinError>), +} + #[tonic::async_trait] -impl ControlService for RuntimeControlService { +impl ControlService for DaemonControlService { type ConsoleDataStream = Pin> + Send + 'static>>; + type PullImageStream = + Pin> + Send + 'static>>; + type WatchEventsStream = Pin> + Send + 'static>>; @@ -337,6 +357,70 @@ impl ControlService for RuntimeControlService { Ok(Response::new(reply)) } + async fn pull_image( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let name = ImageName::parse(&request.image).map_err(|err| ApiError { + message: err.to_string(), + })?; + let format = match request.format() { + GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, + GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, + GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + }; + let (sender, mut receiver) = broadcast::channel::(100); + let context = OciProgressContext::new(sender); + + let our_packer = self.packer.clone(); + + let output = try_stream! { + let mut task = tokio::task::spawn(async move { + our_packer.request(name, format, context).await + }); + loop { + let what = select! { + x = receiver.recv() => PullImageSelect::Progress(x.ok()), + x = &mut task => PullImageSelect::Completed(x), + }; + + match what { + PullImageSelect::Progress(Some(progress)) => { + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: GuestOciImageFormat::Unknown.into(), + }; + yield reply; + }, + + PullImageSelect::Completed(result) => { + let result = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let packed = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let reply = PullImageReply { + progress: None, + digest: packed.digest, + format: match packed.format { + OciPackedFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + OciPackedFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }; + yield reply; + break; + }, + + _ => {}, + } + } + }; + Ok(Response::new(Box::pin(output) as Self::PullImageStream)) + } + async fn watch_events( &self, request: Request, diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 82f9d7d..9d21ff1 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -9,7 +9,6 @@ use krata::{ idm::protocol::{idm_event::Event, IdmEvent}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; -use krataoci::progress::OciProgress; use log::{error, warn}; use tokio::{ select, @@ -22,7 +21,7 @@ use tokio::{ }; use uuid::Uuid; -use crate::{db::GuestStore, idm::DaemonIdmHandle, oci::convert_oci_progress}; +use crate::{db::GuestStore, idm::DaemonIdmHandle}; pub type DaemonEvent = krata::v1::control::watch_events_reply::Event; @@ -53,8 +52,7 @@ pub struct DaemonEventGenerator { idms: HashMap)>, idm_sender: Sender<(u32, IdmEvent)>, idm_receiver: Receiver<(u32, IdmEvent)>, - oci_progress_receiver: broadcast::Receiver, - event_sender: broadcast::Sender, + _event_sender: broadcast::Sender, } impl DaemonEventGenerator { @@ -62,7 +60,6 @@ impl DaemonEventGenerator { guests: GuestStore, guest_reconciler_notify: Sender, idm: DaemonIdmHandle, - oci_progress_receiver: broadcast::Receiver, ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN); @@ -74,55 +71,53 @@ impl DaemonEventGenerator { idms: HashMap::new(), idm_sender, idm_receiver, - oci_progress_receiver, - event_sender: sender.clone(), + _event_sender: sender.clone(), }; let context = DaemonEventContext { sender }; Ok((context, generator)) } async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::GuestChanged(changed) = event { - let Some(ref guest) = changed.guest else { - return Ok(()); - }; + let DaemonEvent::GuestChanged(changed) = event; + let Some(ref guest) = changed.guest else { + return Ok(()); + }; - let Some(ref state) = guest.state else { - return Ok(()); - }; + let Some(ref state) = guest.state else { + return Ok(()); + }; - let status = state.status(); - let id = Uuid::from_str(&guest.id)?; - let domid = state.domid; - match status { - GuestStatus::Started => { - if let Entry::Vacant(e) = self.idms.entry(domid) { - let client = self.idm.client(domid).await?; - let mut receiver = client.subscribe().await?; - let sender = self.idm_sender.clone(); - let task = tokio::task::spawn(async move { - loop { - let Ok(event) = receiver.recv().await else { - break; - }; + let status = state.status(); + let id = Uuid::from_str(&guest.id)?; + let domid = state.domid; + match status { + GuestStatus::Started => { + if let Entry::Vacant(e) = self.idms.entry(domid) { + let client = self.idm.client(domid).await?; + let mut receiver = client.subscribe().await?; + let sender = self.idm_sender.clone(); + let task = tokio::task::spawn(async move { + loop { + let Ok(event) = receiver.recv().await else { + break; + }; - if let Err(error) = sender.send((domid, event)).await { - warn!("unable to deliver idm event: {}", error); - } + if let Err(error) = sender.send((domid, event)).await { + warn!("unable to deliver idm event: {}", error); } - }); - e.insert((id, task)); - } + } + }); + e.insert((id, task)); } - - GuestStatus::Destroyed => { - if let Some((_, handle)) = self.idms.remove(&domid) { - handle.abort(); - } - } - - _ => {} } + + GuestStatus::Destroyed => { + if let Some((_, handle)) = self.idms.remove(&domid) { + handle.abort(); + } + } + + _ => {} } Ok(()) } @@ -150,17 +145,6 @@ impl DaemonEventGenerator { Ok(()) } - async fn handle_oci_progress_event(&mut self, progress: OciProgress) -> Result<()> { - let Some(_) = Uuid::from_str(&progress.id).ok() else { - return Ok(()); - }; - - let event = convert_oci_progress(progress); - self.event_sender.send(DaemonEvent::OciProgress(event))?; - - Ok(()) - } - async fn evaluate(&mut self) -> Result<()> { select! { x = self.idm_receiver.recv() => match x { @@ -182,14 +166,6 @@ impl DaemonEventGenerator { Err(error.into()) } }, - x = self.oci_progress_receiver.recv() => match x { - Ok(event) => { - self.handle_oci_progress_event(event).await - }, - Err(error) => { - Err(error.into()) - } - } } } diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index a92a9ca..e81901f 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -2,21 +2,19 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use anyhow::Result; use console::{DaemonConsole, DaemonConsoleHandle}; -use control::RuntimeControlService; +use control::DaemonControlService; use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; use idm::{DaemonIdm, DaemonIdmHandle}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; -use krataoci::progress::OciProgressContext; +use krataoci::{packer::service::OciPackerService, registry::OciPlatform}; use kratart::Runtime; use log::info; use reconcile::guest::GuestReconciler; use tokio::{ + fs, net::UnixListener, - sync::{ - broadcast, - mpsc::{channel, Sender}, - }, + sync::mpsc::{channel, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::UnixListenerStream; @@ -41,17 +39,21 @@ pub struct Daemon { generator_task: JoinHandle<()>, idm: DaemonIdmHandle, console: DaemonConsoleHandle, + packer: OciPackerService, } const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; -const OCI_PROGRESS_QUEUE_LEN: usize = 1000; impl Daemon { pub async fn new(store: String) -> Result { - let (oci_progress_sender, oci_progress_receiver) = - broadcast::channel(OCI_PROGRESS_QUEUE_LEN); - let runtime = - Runtime::new(OciProgressContext::new(oci_progress_sender), store.clone()).await?; + let mut image_cache_dir = PathBuf::from(store.clone()); + image_cache_dir.push("cache"); + image_cache_dir.push("image"); + fs::create_dir_all(&image_cache_dir).await?; + + let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current())?; + + let runtime = Runtime::new(store.clone()).await?; let guests_db_path = format!("{}/guests.db", store); let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let (guest_reconciler_notify, guest_reconciler_receiver) = @@ -60,23 +62,21 @@ impl Daemon { let idm = idm.launch().await?; let console = DaemonConsole::new().await?; let console = console.launch().await?; - let (events, generator) = DaemonEventGenerator::new( - guests.clone(), - guest_reconciler_notify.clone(), - idm.clone(), - oci_progress_receiver, - ) - .await?; + let (events, generator) = + DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) + .await?; let runtime_for_reconciler = runtime.dupe().await?; let guest_reconciler = GuestReconciler::new( guests.clone(), events.clone(), runtime_for_reconciler, + packer.clone(), guest_reconciler_notify.clone(), )?; let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?; let generator_task = generator.launch().await?; + Ok(Self { store, guests, @@ -86,16 +86,18 @@ impl Daemon { generator_task, idm, console, + packer, }) } pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { - let control_service = RuntimeControlService::new( + let control_service = DaemonControlService::new( self.events.clone(), self.console.clone(), self.idm.clone(), self.guests.clone(), self.guest_reconciler_notify.clone(), + self.packer.clone(), ); let mut server = Server::builder(); @@ -121,7 +123,7 @@ impl Daemon { ControlDialAddress::UnixSocket { path } => { let path = PathBuf::from(path); if path.exists() { - tokio::fs::remove_file(&path).await?; + fs::remove_file(&path).await?; } let listener = UnixListener::bind(path)?; let stream = UnixListenerStream::new(listener); diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs index 8544728..45945a2 100644 --- a/crates/daemon/src/oci.rs +++ b/crates/daemon/src/oci.rs @@ -1,17 +1,17 @@ use krata::v1::control::{ - OciProgressEvent, OciProgressEventLayer, OciProgressEventLayerPhase, OciProgressEventPhase, + PullImageProgress, PullImageProgressLayer, PullImageProgressLayerPhase, PullImageProgressPhase, }; use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase}; -fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer { - OciProgressEventLayer { +fn convert_oci_layer_progress(layer: OciProgressLayer) -> PullImageProgressLayer { + PullImageProgressLayer { id: layer.id, phase: match layer.phase { - OciProgressLayerPhase::Waiting => OciProgressEventLayerPhase::Waiting, - OciProgressLayerPhase::Downloading => OciProgressEventLayerPhase::Downloading, - OciProgressLayerPhase::Downloaded => OciProgressEventLayerPhase::Downloaded, - OciProgressLayerPhase::Extracting => OciProgressEventLayerPhase::Extracting, - OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted, + OciProgressLayerPhase::Waiting => PullImageProgressLayerPhase::Waiting, + OciProgressLayerPhase::Downloading => PullImageProgressLayerPhase::Downloading, + OciProgressLayerPhase::Downloaded => PullImageProgressLayerPhase::Downloaded, + OciProgressLayerPhase::Extracting => PullImageProgressLayerPhase::Extracting, + OciProgressLayerPhase::Extracted => PullImageProgressLayerPhase::Extracted, } .into(), value: layer.value, @@ -19,16 +19,15 @@ fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer } } -pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent { - OciProgressEvent { - guest_id: oci.id, +pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { + PullImageProgress { phase: match oci.phase { - OciProgressPhase::Resolving => OciProgressEventPhase::Resolving, - OciProgressPhase::Resolved => OciProgressEventPhase::Resolved, - OciProgressPhase::ConfigAcquire => OciProgressEventPhase::ConfigAcquire, - OciProgressPhase::LayerAcquire => OciProgressEventPhase::LayerAcquire, - OciProgressPhase::Packing => OciProgressEventPhase::Packing, - OciProgressPhase::Complete => OciProgressEventPhase::Complete, + OciProgressPhase::Resolving => PullImageProgressPhase::Resolving, + OciProgressPhase::Resolved => PullImageProgressPhase::Resolved, + OciProgressPhase::ConfigAcquire => PullImageProgressPhase::ConfigAcquire, + OciProgressPhase::LayerAcquire => PullImageProgressPhase::LayerAcquire, + OciProgressPhase::Packing => PullImageProgressPhase::Packing, + OciProgressPhase::Complete => PullImageProgressPhase::Complete, } .into(), layers: oci diff --git a/crates/daemon/src/reconcile/guest.rs b/crates/daemon/src/reconcile/guest.rs index 429c7ac..ef8478e 100644 --- a/crates/daemon/src/reconcile/guest.rs +++ b/crates/daemon/src/reconcile/guest.rs @@ -9,10 +9,11 @@ use krata::launchcfg::LaunchPackedFormat; use krata::v1::{ common::{ guest_image_spec::Image, Guest, GuestErrorInfo, GuestExitInfo, GuestNetworkState, - GuestState, GuestStatus, + GuestOciImageFormat, GuestState, GuestStatus, }, control::GuestChangedEvent, }; +use krataoci::packer::{service::OciPackerService, OciPackedFormat}; use kratart::{launch::GuestLaunchRequest, GuestInfo, Runtime}; use log::{error, info, trace, warn}; use tokio::{ @@ -55,6 +56,7 @@ pub struct GuestReconciler { guests: GuestStore, events: DaemonEventContext, runtime: Runtime, + packer: OciPackerService, tasks: Arc>>, guest_reconciler_notify: Sender, reconcile_lock: Arc>, @@ -65,12 +67,14 @@ impl GuestReconciler { guests: GuestStore, events: DaemonEventContext, runtime: Runtime, + packer: OciPackerService, guest_reconciler_notify: Sender, ) -> Result { Ok(Self { guests, events, runtime, + packer, tasks: Arc::new(Mutex::new(HashMap::new())), guest_reconciler_notify, reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)), @@ -233,9 +237,27 @@ impl GuestReconciler { return Err(anyhow!("oci spec not specified")); } }; - let task = spec.task.as_ref().cloned().unwrap_or_default(); + let image = self + .packer + .recall( + &oci.digest, + match oci.format() { + GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, + GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, + GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + }, + ) + .await?; + + let Some(image) = image else { + return Err(anyhow!( + "image {} in the requested format did not exist", + oci.digest + )); + }; + let info = self .runtime .launch(GuestLaunchRequest { @@ -244,9 +266,9 @@ impl GuestReconciler { name: if spec.name.is_empty() { None } else { - Some(&spec.name) + Some(spec.name.clone()) }, - image: &oci.image, + image, vcpus: spec.vcpus, mem: spec.mem, env: task diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 2fe3488..78bc7ed 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -29,8 +29,15 @@ message GuestImageSpec { } } +enum GuestOciImageFormat { + GUEST_OCI_IMAGE_FORMAT_UNKNOWN = 0; + GUEST_OCI_IMAGE_FORMAT_SQUASHFS = 1; + GUEST_OCI_IMAGE_FORMAT_EROFS = 2; +} + message GuestOciImageSpec { - string image = 1; + string digest = 1; + GuestOciImageFormat format = 2; } message GuestTaskSpec { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index ca4adf0..61cf91b 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -14,11 +14,14 @@ service ControlService { rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply); rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); + rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); + rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); + + rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); - rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); - rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); + rpc PullImage(PullImageRequest) returns (stream PullImageReply); } message CreateGuestRequest { @@ -63,7 +66,6 @@ message WatchEventsRequest {} message WatchEventsReply { oneof event { GuestChangedEvent guest_changed = 1; - OciProgressEvent oci_progress = 2; } } @@ -71,40 +73,6 @@ message GuestChangedEvent { krata.v1.common.Guest guest = 1; } -enum OciProgressEventLayerPhase { - OCI_PROGRESS_EVENT_LAYER_PHASE_UNKNOWN = 0; - OCI_PROGRESS_EVENT_LAYER_PHASE_WAITING = 1; - OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADING = 2; - OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADED = 3; - OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTING = 4; - OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTED = 5; -} - -message OciProgressEventLayer { - string id = 1; - OciProgressEventLayerPhase phase = 2; - uint64 value = 3; - uint64 total = 4; -} - -enum OciProgressEventPhase { - OCI_PROGRESS_EVENT_PHASE_UNKNOWN = 0; - OCI_PROGRESS_EVENT_PHASE_RESOLVING = 1; - OCI_PROGRESS_EVENT_PHASE_RESOLVED = 2; - OCI_PROGRESS_EVENT_PHASE_CONFIG_ACQUIRE = 3; - OCI_PROGRESS_EVENT_PHASE_LAYER_ACQUIRE = 4; - OCI_PROGRESS_EVENT_PHASE_PACKING = 5; - OCI_PROGRESS_EVENT_PHASE_COMPLETE = 6; -} - -message OciProgressEvent { - string guest_id = 1; - OciProgressEventPhase phase = 2; - repeated OciProgressEventLayer layers = 3; - uint64 value = 4; - uint64 total = 5; -} - message ReadGuestMetricsRequest { string guest_id = 1; } @@ -120,3 +88,47 @@ message SnoopIdmReply { uint32 to = 2; krata.bus.idm.IdmPacket packet = 3; } + +enum PullImageProgressLayerPhase { + PULL_IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; + PULL_IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; + PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; + PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; + PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; + PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; +} + +message PullImageProgressLayer { + string id = 1; + PullImageProgressLayerPhase phase = 2; + uint64 value = 3; + uint64 total = 4; +} + +enum PullImageProgressPhase { + PULL_IMAGE_PROGRESS_PHASE_UNKNOWN = 0; + PULL_IMAGE_PROGRESS_PHASE_RESOLVING = 1; + PULL_IMAGE_PROGRESS_PHASE_RESOLVED = 2; + PULL_IMAGE_PROGRESS_PHASE_CONFIG_ACQUIRE = 3; + PULL_IMAGE_PROGRESS_PHASE_LAYER_ACQUIRE = 4; + PULL_IMAGE_PROGRESS_PHASE_PACKING = 5; + PULL_IMAGE_PROGRESS_PHASE_COMPLETE = 6; +} + +message PullImageProgress { + PullImageProgressPhase phase = 1; + repeated PullImageProgressLayer layers = 2; + uint64 value = 3; + uint64 total = 4; +} + +message PullImageRequest { + string image = 1; + krata.v1.common.GuestOciImageFormat format = 2; +} + +message PullImageReply { + PullImageProgress progress = 1; + string digest = 2; + krata.v1.common.GuestOciImageFormat format = 3; +} diff --git a/crates/network/src/autonet.rs b/crates/network/src/autonet.rs index 6d46160..9b27d62 100644 --- a/crates/network/src/autonet.rs +++ b/crates/network/src/autonet.rs @@ -179,8 +179,6 @@ impl AutoNetworkWatcher { break; }, - Ok(_) => {}, - Err(error) => { warn!("failed to receive event: {}", error); } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 33a03bc..d4785e8 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -38,9 +38,9 @@ async fn main() -> Result<()> { } }); let context = OciProgressContext::new(sender); - let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current(), context)?; + let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let packed = service - .pack("cli", image.clone(), OciPackedFormat::Squashfs) + .request(image.clone(), OciPackedFormat::Squashfs, context) .await?; println!( "generated squashfs of {} to {}", diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs index 83dc8cc..460520a 100644 --- a/crates/oci/src/packer/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,7 +1,4 @@ -use crate::{ - fetch::OciResolvedImage, - packer::{OciImagePacked, OciPackedFormat}, -}; +use crate::packer::{OciImagePacked, OciPackedFormat}; use anyhow::Result; use log::debug; @@ -23,15 +20,15 @@ impl OciPackerCache { pub async fn recall( &self, - resolved: &OciResolvedImage, + digest: &str, format: OciPackedFormat, ) -> Result> { let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); - fs_path.push(format!("{}.{}", resolved.digest, format.extension())); - manifest_path.push(format!("{}.manifest.json", resolved.digest)); - config_path.push(format!("{}.config.json", resolved.digest)); + fs_path.push(format!("{}.{}", digest, format.extension())); + manifest_path.push(format!("{}.manifest.json", digest)); + config_path.push(format!("{}.config.json", digest)); Ok( if fs_path.exists() && manifest_path.exists() && config_path.exists() { let image_metadata = fs::metadata(&fs_path).await?; @@ -45,9 +42,9 @@ impl OciPackerCache { let manifest: ImageManifest = serde_json::from_str(&manifest_text)?; let config_text = fs::read_to_string(&config_path).await?; let config: ImageConfiguration = serde_json::from_str(&config_text)?; - debug!("cache hit digest={}", resolved.digest); + debug!("cache hit digest={}", digest); Some(OciImagePacked::new( - resolved.digest.clone(), + digest.to_string(), fs_path.clone(), format, config, @@ -57,7 +54,7 @@ impl OciPackerCache { None } } else { - debug!("cache miss digest={}", resolved.digest); + debug!("cache miss digest={}", digest); None }, ) diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 9d3994e..96e8dd4 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -17,7 +17,6 @@ pub struct OciPackerService { seed: Option, platform: OciPlatform, cache: OciPackerCache, - progress: OciProgressContext, } impl OciPackerService { @@ -25,28 +24,34 @@ impl OciPackerService { seed: Option, cache_dir: &Path, platform: OciPlatform, - progress: OciProgressContext, ) -> Result { Ok(OciPackerService { seed, cache: OciPackerCache::new(cache_dir)?, platform, - progress, }) } - pub async fn pack( + pub async fn recall( + &self, + digest: &str, + format: OciPackedFormat, + ) -> Result> { + self.cache.recall(digest, format).await + } + + pub async fn request( &self, - id: &str, name: ImageName, format: OciPackedFormat, + progress_context: OciProgressContext, ) -> Result { - let progress = OciProgress::new(id); - let progress = OciBoundProgress::new(self.progress.clone(), progress); + let progress = OciProgress::new(); + let progress = OciBoundProgress::new(progress_context.clone(), progress); let fetcher = OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); let resolved = fetcher.resolve(name).await?; - if let Some(cached) = self.cache.recall(&resolved, format).await? { + if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { return Ok(cached); } let assembler = diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index b73c945..c54cba3 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -5,17 +5,21 @@ use tokio::sync::{broadcast::Sender, Mutex}; #[derive(Clone, Debug)] pub struct OciProgress { - pub id: String, pub phase: OciProgressPhase, pub layers: IndexMap, pub value: u64, pub total: u64, } +impl Default for OciProgress { + fn default() -> Self { + Self::new() + } +} + impl OciProgress { - pub fn new(id: &str) -> Self { + pub fn new() -> Self { OciProgress { - id: id.to_string(), phase: OciProgressPhase::Resolving, layers: IndexMap::new(), value: 0, diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index bd8f0d0..b0ca22d 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,8 +10,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, }; -use krataoci::packer::service::OciPackerService; -use krataoci::packer::{OciImagePacked, OciPackedFormat}; +use krataoci::packer::OciImagePacked; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -19,20 +18,19 @@ use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; use crate::RuntimeContext; -use krataoci::name::ImageName; use super::{GuestInfo, GuestState}; -pub struct GuestLaunchRequest<'a> { +pub struct GuestLaunchRequest { pub format: LaunchPackedFormat, pub uuid: Option, - pub name: Option<&'a str>, - pub image: &'a str, + pub name: Option, pub vcpus: u32, pub mem: u64, pub env: HashMap, pub run: Option>, pub debug: bool, + pub image: OciImagePacked, } pub struct GuestLauncher { @@ -44,25 +42,13 @@ impl GuestLauncher { Ok(Self { launch_semaphore }) } - pub async fn launch<'r>( + pub async fn launch( &mut self, context: &RuntimeContext, - request: GuestLaunchRequest<'r>, + request: GuestLaunchRequest, ) -> Result { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let xen_name = format!("krata-{uuid}"); - let packed = self - .compile( - &uuid.to_string(), - request.image, - &context.packer, - match request.format { - LaunchPackedFormat::Squashfs => OciPackedFormat::Squashfs, - LaunchPackedFormat::Erofs => OciPackedFormat::Erofs, - }, - ) - .await?; - let mut gateway_mac = MacAddr6::random(); gateway_mac.set_local(true); gateway_mac.set_multicast(false); @@ -85,6 +71,7 @@ impl GuestLauncher { hostname: Some( request .name + .as_ref() .map(|x| x.to_string()) .unwrap_or_else(|| format!("krata-{}", uuid)), ), @@ -111,10 +98,11 @@ impl GuestLauncher { run: request.run, }; - let cfgblk = ConfigBlock::new(&uuid, &packed)?; + let cfgblk = ConfigBlock::new(&uuid, &request.image)?; cfgblk.build(&launch_config)?; - let image_squashfs_path = packed + let image_squashfs_path = request + .image .path .to_str() .ok_or_else(|| anyhow!("failed to convert image path to string"))?; @@ -153,7 +141,6 @@ impl GuestLauncher { cfgblk_dir_path, ), ), - ("krata/image".to_string(), request.image.to_string()), ( "krata/network/guest/ipv4".to_string(), format!("{}/{}", guest_ipv4, ipv4_network_mask), @@ -180,8 +167,8 @@ impl GuestLauncher { ), ]; - if let Some(name) = request.name { - extra_keys.push(("krata/name".to_string(), name.to_string())); + if let Some(name) = request.name.as_ref() { + extra_keys.push(("krata/name".to_string(), name.clone())); } let config = DomainConfig { @@ -222,10 +209,10 @@ impl GuestLauncher { }; match context.xen.create(&config).await { Ok(created) => Ok(GuestInfo { - name: request.name.map(|x| x.to_string()), + name: request.name.as_ref().map(|x| x.to_string()), uuid, domid: created.domid, - image: request.image.to_string(), + image: request.image.digest, loops: vec![], guest_ipv4: Some(IpNetwork::new( IpAddr::V4(guest_ipv4), @@ -256,17 +243,6 @@ impl GuestLauncher { } } - async fn compile( - &self, - id: &str, - image: &str, - packer: &OciPackerService, - format: OciPackedFormat, - ) -> Result { - let image = ImageName::parse(image)?; - packer.pack(id, image, format).await - } - async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result { let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; let mut used: Vec = vec![]; diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 15ee26d..3d0f3d3 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -17,9 +17,6 @@ use self::{ autoloop::AutoLoop, launch::{GuestLaunchRequest, GuestLauncher}, }; -use krataoci::{ - packer::service::OciPackerService, progress::OciProgressContext, registry::OciPlatform, -}; pub mod autoloop; pub mod cfgblk; @@ -53,8 +50,6 @@ pub struct GuestInfo { #[derive(Clone)] pub struct RuntimeContext { - pub oci_progress_context: OciProgressContext, - pub packer: OciPackerService, pub autoloop: AutoLoop, pub xen: XenClient, pub kernel: String, @@ -62,7 +57,7 @@ pub struct RuntimeContext { } impl RuntimeContext { - pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { + pub async fn new(store: String) -> Result { let mut image_cache_path = PathBuf::from(&store); image_cache_path.push("cache"); fs::create_dir_all(&image_cache_path)?; @@ -70,18 +65,10 @@ impl RuntimeContext { let xen = XenClient::open(0).await?; image_cache_path.push("image"); fs::create_dir_all(&image_cache_path)?; - let packer = OciPackerService::new( - None, - &image_cache_path, - OciPlatform::current(), - oci_progress_context.clone(), - )?; let kernel = RuntimeContext::detect_guest_file(&store, "kernel")?; let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?; Ok(RuntimeContext { - oci_progress_context, - packer, autoloop: AutoLoop::new(LoopControl::open()?), xen, kernel, @@ -261,24 +248,22 @@ impl RuntimeContext { #[derive(Clone)] pub struct Runtime { - oci_progress_context: OciProgressContext, store: Arc, context: RuntimeContext, launch_semaphore: Arc, } impl Runtime { - pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { - let context = RuntimeContext::new(oci_progress_context.clone(), store.clone()).await?; + pub async fn new(store: String) -> Result { + let context = RuntimeContext::new(store.clone()).await?; Ok(Self { - oci_progress_context, store: Arc::new(store), context, launch_semaphore: Arc::new(Semaphore::new(1)), }) } - pub async fn launch<'a>(&self, request: GuestLaunchRequest<'a>) -> Result { + pub async fn launch(&self, request: GuestLaunchRequest) -> Result { let mut launcher = GuestLauncher::new(self.launch_semaphore.clone())?; launcher.launch(&self.context, request).await } @@ -335,7 +320,7 @@ impl Runtime { } pub async fn dupe(&self) -> Result { - Runtime::new(self.oci_progress_context.clone(), (*self.store).clone()).await + Runtime::new((*self.store).clone()).await } }