diff --git a/crates/ctl/src/cli/destroy.rs b/crates/ctl/src/cli/destroy.rs index 3652bf4..60db259 100644 --- a/crates/ctl/src/cli/destroy.rs +++ b/crates/ctl/src/cli/destroy.rs @@ -52,31 +52,30 @@ impl DestroyCommand { async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - if let Event::GuestChanged(changed) = event { - let Some(guest) = changed.guest else { - continue; - }; + let Event::GuestChanged(changed) = event; + let Some(guest) = changed.guest else { + continue; + }; - if guest.id != id { - continue; + if guest.id != id { + continue; + } + + let Some(state) = guest.state else { + continue; + }; + + if let Some(ref error) = state.error_info { + if state.status() == GuestStatus::Failed { + error!("destroy failed: {}", error.message); + std::process::exit(1); + } else { + error!("guest error: {}", error.message); } + } - let Some(state) = guest.state else { - continue; - }; - - if let Some(ref error) = state.error_info { - if state.status() == GuestStatus::Failed { - error!("destroy failed: {}", error.message); - std::process::exit(1); - } else { - error!("guest error: {}", error.message); - } - } - - if state.status() == GuestStatus::Destroyed { - std::process::exit(0); - } + if state.status() == GuestStatus::Destroyed { + std::process::exit(0); } } Ok(()) diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index 83f2364..d202d3f 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -2,17 +2,16 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use krata::{ events::EventStream, v1::{ common::{ - guest_image_spec::Image, GuestImageSpec, GuestOciImageSpec, GuestSpec, GuestStatus, - GuestTaskSpec, GuestTaskSpecEnvVar, + guest_image_spec::Image, GuestImageSpec, GuestOciImageFormat, GuestOciImageSpec, + GuestSpec, GuestStatus, GuestTaskSpec, GuestTaskSpecEnvVar, }, control::{ control_service_client::ControlServiceClient, watch_events_reply::Event, - CreateGuestRequest, OciProgressEventLayerPhase, OciProgressEventPhase, + CreateGuestRequest, PullImageRequest, }, }, }; @@ -20,11 +19,15 @@ use log::error; use tokio::select; use tonic::{transport::Channel, Request}; -use crate::console::StdioConsoleStream; +use crate::{console::StdioConsoleStream, pull::pull_interactive_progress}; + +use super::pull::PullImageFormat; #[derive(Parser)] #[command(about = "Launch a new guest")] pub struct LauchCommand { + #[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] + image_format: PullImageFormat, #[arg(short, long, help = "Name of the guest")] name: Option, #[arg( @@ -71,11 +74,25 @@ impl LauchCommand { mut client: ControlServiceClient, events: EventStream, ) -> Result<()> { + let response = client + .pull_image(PullImageRequest { + image: self.oci.clone(), + format: match self.image_format { + PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }) + .await?; + let reply = pull_interactive_progress(response.into_inner()).await?; + let request = CreateGuestRequest { spec: Some(GuestSpec { name: self.name.unwrap_or_default(), image: Some(GuestImageSpec { - image: Some(Image::Oci(GuestOciImageSpec { image: self.oci })), + image: Some(Image::Oci(GuestOciImageSpec { + digest: reply.digest, + format: reply.format, + })), }), vcpus: self.cpus, mem: self.mem, @@ -126,14 +143,9 @@ impl LauchCommand { async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); - let mut multi_progress: Option<(MultiProgress, HashMap)> = None; while let Ok(event) = stream.recv().await { match event { Event::GuestChanged(changed) => { - if let Some((multi_progress, _)) = multi_progress.as_mut() { - let _ = multi_progress.clear(); - } - let Some(guest) = changed.guest else { continue; }; @@ -164,102 +176,6 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { break; } } - - Event::OciProgress(oci) => { - if multi_progress.is_none() { - multi_progress = Some((MultiProgress::new(), HashMap::new())); - } - - let Some((multi_progress, progresses)) = multi_progress.as_mut() else { - continue; - }; - - match oci.phase() { - OciProgressEventPhase::Resolved - | OciProgressEventPhase::ConfigAcquire - | OciProgressEventPhase::LayerAcquire => { - if progresses.is_empty() && !oci.layers.is_empty() { - for layer in &oci.layers { - let bar = ProgressBar::new(layer.total); - bar.set_style( - ProgressStyle::with_template("{msg} {wide_bar}").unwrap(), - ); - progresses.insert(layer.id.clone(), bar.clone()); - multi_progress.add(bar); - } - } - - for layer in oci.layers { - let Some(progress) = progresses.get_mut(&layer.id) else { - continue; - }; - - let phase = match layer.phase() { - OciProgressEventLayerPhase::Waiting => "waiting", - OciProgressEventLayerPhase::Downloading => "downloading", - OciProgressEventLayerPhase::Downloaded => "downloaded", - OciProgressEventLayerPhase::Extracting => "extracting", - OciProgressEventLayerPhase::Extracted => "extracted", - _ => "unknown", - }; - - let simple = if let Some((_, hash)) = layer.id.split_once(':') { - hash - } else { - id - }; - let simple = if simple.len() > 10 { - &simple[0..10] - } else { - simple - }; - let message = format!("{:width$} {}", simple, phase, width = 10); - - if message != progress.message() { - progress.set_message(message); - } - - progress.update(|state| { - state.set_len(layer.total); - state.set_pos(layer.value); - }); - } - } - - OciProgressEventPhase::Packing => { - for (key, bar) in &mut *progresses { - if key == "packing" { - continue; - } - bar.finish_and_clear(); - multi_progress.remove(bar); - } - progresses.retain(|k, _| k == "packing"); - if progresses.is_empty() { - let progress = ProgressBar::new(100); - progress.set_message("packing"); - progress.set_style( - ProgressStyle::with_template("{msg} {wide_bar}").unwrap(), - ); - progresses.insert("packing".to_string(), progress); - } - let Some(progress) = progresses.get("packing") else { - continue; - }; - - progress.update(|state| { - state.set_len(oci.total); - state.set_pos(oci.value); - }); - } - - _ => {} - } - - for progress in progresses { - progress.1.tick(); - } - } } } Ok(()) diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index aeaaeaf..17cd396 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -5,6 +5,7 @@ pub mod launch; pub mod list; pub mod logs; pub mod metrics; +pub mod pull; pub mod resolve; pub mod top; pub mod watch; @@ -21,7 +22,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, - resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, + pull::PullCommand, resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -48,6 +49,7 @@ pub enum Commands { Destroy(DestroyCommand), List(ListCommand), Attach(AttachCommand), + Pull(PullCommand), Logs(LogsCommand), Watch(WatchCommand), Resolve(ResolveCommand), @@ -101,6 +103,10 @@ impl ControlCommand { Commands::Top(top) => { top.run(client, events).await?; } + + Commands::Pull(pull) => { + pull.run(client).await?; + } } Ok(()) } diff --git a/crates/ctl/src/cli/pull.rs b/crates/ctl/src/cli/pull.rs new file mode 100644 index 0000000..50ab84c --- /dev/null +++ b/crates/ctl/src/cli/pull.rs @@ -0,0 +1,42 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use krata::v1::{ + common::GuestOciImageFormat, + control::{control_service_client::ControlServiceClient, PullImageRequest}, +}; + +use tonic::transport::Channel; + +use crate::pull::pull_interactive_progress; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +pub enum PullImageFormat { + Squashfs, + Erofs, +} + +#[derive(Parser)] +#[command(about = "Pull an image into the cache")] +pub struct PullCommand { + #[arg(help = "Image name")] + image: String, + #[arg(short = 's', long, default_value = "squashfs", help = "Image format")] + image_format: PullImageFormat, +} + +impl PullCommand { + pub async fn run(self, mut client: ControlServiceClient) -> Result<()> { + let response = client + .pull_image(PullImageRequest { + image: self.image.clone(), + format: match self.image_format { + PullImageFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + PullImageFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }) + .await?; + let reply = pull_interactive_progress(response.into_inner()).await?; + println!("{}", reply.digest); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/watch.rs b/crates/ctl/src/cli/watch.rs index 3a723c9..e535ca6 100644 --- a/crates/ctl/src/cli/watch.rs +++ b/crates/ctl/src/cli/watch.rs @@ -29,10 +29,9 @@ impl WatchCommand { loop { let event = stream.recv().await?; - if let Event::GuestChanged(changed) = event { - let guest = changed.guest.clone(); - self.print_event("guest.changed", changed, guest)?; - } + let Event::GuestChanged(changed) = event; + let guest = changed.guest.clone(); + self.print_event("guest.changed", changed, guest)?; } } diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index 33136a1..a57edd8 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -69,27 +69,26 @@ impl StdioConsoleStream { Ok(tokio::task::spawn(async move { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - if let Event::GuestChanged(changed) = event { - let Some(guest) = changed.guest else { - continue; - }; + let Event::GuestChanged(changed) = event; + let Some(guest) = changed.guest else { + continue; + }; - let Some(state) = guest.state else { - continue; - }; + let Some(state) = guest.state else { + continue; + }; - if guest.id != id { - continue; - } + if guest.id != id { + continue; + } - if let Some(exit_info) = state.exit_info { - return Some(exit_info.code); - } + if let Some(exit_info) = state.exit_info { + return Some(exit_info.code); + } - let status = state.status(); - if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { - return Some(10); - } + let status = state.status(); + if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { + return Some(10); } } None diff --git a/crates/ctl/src/lib.rs b/crates/ctl/src/lib.rs index c3aeefe..82ee22d 100644 --- a/crates/ctl/src/lib.rs +++ b/crates/ctl/src/lib.rs @@ -2,3 +2,4 @@ pub mod cli; pub mod console; pub mod format; pub mod metrics; +pub mod pull; diff --git a/crates/ctl/src/metrics.rs b/crates/ctl/src/metrics.rs index 1cef438..570d593 100644 --- a/crates/ctl/src/metrics.rs +++ b/crates/ctl/src/metrics.rs @@ -82,7 +82,7 @@ impl MultiMetricCollector { let collect = select! { x = events.recv() => match x { Ok(event) => { - if let Event::GuestChanged(changed) = event { + let Event::GuestChanged(changed) = event; let Some(guest) = changed.guest else { continue; }; @@ -93,7 +93,6 @@ impl MultiMetricCollector { if state.status() != GuestStatus::Destroying { guests.push(guest); } - } false }, diff --git a/crates/ctl/src/pull.rs b/crates/ctl/src/pull.rs new file mode 100644 index 0000000..e91df10 --- /dev/null +++ b/crates/ctl/src/pull.rs @@ -0,0 +1,118 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use krata::v1::control::{PullImageProgressLayerPhase, PullImageProgressPhase, PullImageReply}; +use tokio_stream::StreamExt; +use tonic::Streaming; + +pub async fn pull_interactive_progress( + mut stream: Streaming, +) -> Result { + let mut multi_progress: Option<(MultiProgress, HashMap)> = None; + + while let Some(reply) = stream.next().await { + let reply = reply?; + + if reply.progress.is_none() && !reply.digest.is_empty() { + return Ok(reply); + } + + let Some(oci) = reply.progress else { + continue; + }; + + if multi_progress.is_none() { + multi_progress = Some((MultiProgress::new(), HashMap::new())); + } + + let Some((multi_progress, progresses)) = multi_progress.as_mut() else { + continue; + }; + + match oci.phase() { + PullImageProgressPhase::Resolved + | PullImageProgressPhase::ConfigAcquire + | PullImageProgressPhase::LayerAcquire => { + if progresses.is_empty() && !oci.layers.is_empty() { + for layer in &oci.layers { + let bar = ProgressBar::new(layer.total); + bar.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); + progresses.insert(layer.id.clone(), bar.clone()); + multi_progress.add(bar); + } + } + + for layer in oci.layers { + let Some(progress) = progresses.get_mut(&layer.id) else { + continue; + }; + + let phase = match layer.phase() { + PullImageProgressLayerPhase::Waiting => "waiting", + PullImageProgressLayerPhase::Downloading => "downloading", + PullImageProgressLayerPhase::Downloaded => "downloaded", + PullImageProgressLayerPhase::Extracting => "extracting", + PullImageProgressLayerPhase::Extracted => "extracted", + _ => "unknown", + }; + + let simple = if let Some((_, hash)) = layer.id.split_once(':') { + hash + } else { + "unknown" + }; + let simple = if simple.len() > 10 { + &simple[0..10] + } else { + simple + }; + let message = format!( + "{:width$} {:phwidth$}", + simple, + phase, + width = 10, + phwidth = 11 + ); + + if message != progress.message() { + progress.set_message(message); + } + + progress.update(|state| { + state.set_len(layer.total); + state.set_pos(layer.value); + }); + } + } + + PullImageProgressPhase::Packing => { + for (key, bar) in &mut *progresses { + if key == "packing" { + continue; + } + bar.finish_and_clear(); + multi_progress.remove(bar); + } + progresses.retain(|k, _| k == "packing"); + if progresses.is_empty() { + let progress = ProgressBar::new(100); + progress.set_message("packing "); + progress.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); + progresses.insert("packing".to_string(), progress); + } + let Some(progress) = progresses.get("packing") else { + continue; + }; + + progress.update(|state| { + state.set_len(oci.total); + state.set_pos(oci.value); + }); + } + + _ => {} + } + } + Err(anyhow!("never received final reply for image pull")) +} diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 596d11d..2a81c1a 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -1,5 +1,3 @@ -use std::{pin::Pin, str::FromStr}; - use async_stream::try_stream; use futures::Stream; use krata::{ @@ -8,19 +6,26 @@ use krata::{ IdmMetricsRequest, }, v1::{ - common::{Guest, GuestState, GuestStatus}, + common::{Guest, GuestOciImageFormat, GuestState, GuestStatus}, control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, - ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest, - WatchEventsReply, WatchEventsRequest, + ListGuestsReply, ListGuestsRequest, PullImageReply, PullImageRequest, + ReadGuestMetricsReply, ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest, + SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, }, }, }; +use krataoci::{ + name::ImageName, + packer::{service::OciPackerService, OciImagePacked, OciPackedFormat}, + progress::{OciProgress, OciProgressContext}, +}; +use std::{pin::Pin, str::FromStr}; use tokio::{ select, sync::mpsc::{channel, Sender}, + task::JoinError, }; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -28,7 +33,7 @@ use uuid::Uuid; use crate::{ console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, - metrics::idm_metric_to_api, + metrics::idm_metric_to_api, oci::convert_oci_progress, }; pub struct ApiError { @@ -50,21 +55,23 @@ impl From for Status { } #[derive(Clone)] -pub struct RuntimeControlService { +pub struct DaemonControlService { events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, + packer: OciPackerService, } -impl RuntimeControlService { +impl DaemonControlService { pub fn new( events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, + packer: OciPackerService, ) -> Self { Self { events, @@ -72,6 +79,7 @@ impl RuntimeControlService { idm, guests, guest_reconciler_notify, + packer, } } } @@ -81,11 +89,19 @@ enum ConsoleDataSelect { Write(Option>), } +enum PullImageSelect { + Progress(usize), + Completed(Result, JoinError>), +} + #[tonic::async_trait] -impl ControlService for RuntimeControlService { +impl ControlService for DaemonControlService { type ConsoleDataStream = Pin> + Send + 'static>>; + type PullImageStream = + Pin> + Send + 'static>>; + type WatchEventsStream = Pin> + Send + 'static>>; @@ -337,6 +353,71 @@ impl ControlService for RuntimeControlService { Ok(Response::new(reply)) } + async fn pull_image( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let name = ImageName::parse(&request.image).map_err(|err| ApiError { + message: err.to_string(), + })?; + let format = match request.format() { + GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, + GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, + GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + }; + let (sender, mut receiver) = channel::(100); + let context = OciProgressContext::new(sender); + + let our_packer = self.packer.clone(); + + let output = try_stream! { + let mut task = tokio::task::spawn(async move { + our_packer.request(name, format, context).await + }); + loop { + let mut progresses = Vec::new(); + let what = select! { + x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x), + x = &mut task => PullImageSelect::Completed(x), + }; + match what { + PullImageSelect::Progress(count) => { + if count > 0 { + let progress = progresses.remove(progresses.len() - 1); + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: GuestOciImageFormat::Unknown.into(), + }; + yield reply; + } + }, + + PullImageSelect::Completed(result) => { + let result = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let packed = result.map_err(|err| ApiError { + message: err.to_string(), + })?; + let reply = PullImageReply { + progress: None, + digest: packed.digest, + format: match packed.format { + OciPackedFormat::Squashfs => GuestOciImageFormat::Squashfs.into(), + OciPackedFormat::Erofs => GuestOciImageFormat::Erofs.into(), + }, + }; + yield reply; + break; + }, + } + } + }; + Ok(Response::new(Box::pin(output) as Self::PullImageStream)) + } + async fn watch_events( &self, request: Request, diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 82f9d7d..9d21ff1 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -9,7 +9,6 @@ use krata::{ idm::protocol::{idm_event::Event, IdmEvent}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; -use krataoci::progress::OciProgress; use log::{error, warn}; use tokio::{ select, @@ -22,7 +21,7 @@ use tokio::{ }; use uuid::Uuid; -use crate::{db::GuestStore, idm::DaemonIdmHandle, oci::convert_oci_progress}; +use crate::{db::GuestStore, idm::DaemonIdmHandle}; pub type DaemonEvent = krata::v1::control::watch_events_reply::Event; @@ -53,8 +52,7 @@ pub struct DaemonEventGenerator { idms: HashMap)>, idm_sender: Sender<(u32, IdmEvent)>, idm_receiver: Receiver<(u32, IdmEvent)>, - oci_progress_receiver: broadcast::Receiver, - event_sender: broadcast::Sender, + _event_sender: broadcast::Sender, } impl DaemonEventGenerator { @@ -62,7 +60,6 @@ impl DaemonEventGenerator { guests: GuestStore, guest_reconciler_notify: Sender, idm: DaemonIdmHandle, - oci_progress_receiver: broadcast::Receiver, ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN); @@ -74,55 +71,53 @@ impl DaemonEventGenerator { idms: HashMap::new(), idm_sender, idm_receiver, - oci_progress_receiver, - event_sender: sender.clone(), + _event_sender: sender.clone(), }; let context = DaemonEventContext { sender }; Ok((context, generator)) } async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::GuestChanged(changed) = event { - let Some(ref guest) = changed.guest else { - return Ok(()); - }; + let DaemonEvent::GuestChanged(changed) = event; + let Some(ref guest) = changed.guest else { + return Ok(()); + }; - let Some(ref state) = guest.state else { - return Ok(()); - }; + let Some(ref state) = guest.state else { + return Ok(()); + }; - let status = state.status(); - let id = Uuid::from_str(&guest.id)?; - let domid = state.domid; - match status { - GuestStatus::Started => { - if let Entry::Vacant(e) = self.idms.entry(domid) { - let client = self.idm.client(domid).await?; - let mut receiver = client.subscribe().await?; - let sender = self.idm_sender.clone(); - let task = tokio::task::spawn(async move { - loop { - let Ok(event) = receiver.recv().await else { - break; - }; + let status = state.status(); + let id = Uuid::from_str(&guest.id)?; + let domid = state.domid; + match status { + GuestStatus::Started => { + if let Entry::Vacant(e) = self.idms.entry(domid) { + let client = self.idm.client(domid).await?; + let mut receiver = client.subscribe().await?; + let sender = self.idm_sender.clone(); + let task = tokio::task::spawn(async move { + loop { + let Ok(event) = receiver.recv().await else { + break; + }; - if let Err(error) = sender.send((domid, event)).await { - warn!("unable to deliver idm event: {}", error); - } + if let Err(error) = sender.send((domid, event)).await { + warn!("unable to deliver idm event: {}", error); } - }); - e.insert((id, task)); - } + } + }); + e.insert((id, task)); } - - GuestStatus::Destroyed => { - if let Some((_, handle)) = self.idms.remove(&domid) { - handle.abort(); - } - } - - _ => {} } + + GuestStatus::Destroyed => { + if let Some((_, handle)) = self.idms.remove(&domid) { + handle.abort(); + } + } + + _ => {} } Ok(()) } @@ -150,17 +145,6 @@ impl DaemonEventGenerator { Ok(()) } - async fn handle_oci_progress_event(&mut self, progress: OciProgress) -> Result<()> { - let Some(_) = Uuid::from_str(&progress.id).ok() else { - return Ok(()); - }; - - let event = convert_oci_progress(progress); - self.event_sender.send(DaemonEvent::OciProgress(event))?; - - Ok(()) - } - async fn evaluate(&mut self) -> Result<()> { select! { x = self.idm_receiver.recv() => match x { @@ -182,14 +166,6 @@ impl DaemonEventGenerator { Err(error.into()) } }, - x = self.oci_progress_receiver.recv() => match x { - Ok(event) => { - self.handle_oci_progress_event(event).await - }, - Err(error) => { - Err(error.into()) - } - } } } diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index a92a9ca..e81901f 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -2,21 +2,19 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use anyhow::Result; use console::{DaemonConsole, DaemonConsoleHandle}; -use control::RuntimeControlService; +use control::DaemonControlService; use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; use idm::{DaemonIdm, DaemonIdmHandle}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; -use krataoci::progress::OciProgressContext; +use krataoci::{packer::service::OciPackerService, registry::OciPlatform}; use kratart::Runtime; use log::info; use reconcile::guest::GuestReconciler; use tokio::{ + fs, net::UnixListener, - sync::{ - broadcast, - mpsc::{channel, Sender}, - }, + sync::mpsc::{channel, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::UnixListenerStream; @@ -41,17 +39,21 @@ pub struct Daemon { generator_task: JoinHandle<()>, idm: DaemonIdmHandle, console: DaemonConsoleHandle, + packer: OciPackerService, } const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; -const OCI_PROGRESS_QUEUE_LEN: usize = 1000; impl Daemon { pub async fn new(store: String) -> Result { - let (oci_progress_sender, oci_progress_receiver) = - broadcast::channel(OCI_PROGRESS_QUEUE_LEN); - let runtime = - Runtime::new(OciProgressContext::new(oci_progress_sender), store.clone()).await?; + let mut image_cache_dir = PathBuf::from(store.clone()); + image_cache_dir.push("cache"); + image_cache_dir.push("image"); + fs::create_dir_all(&image_cache_dir).await?; + + let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current())?; + + let runtime = Runtime::new(store.clone()).await?; let guests_db_path = format!("{}/guests.db", store); let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let (guest_reconciler_notify, guest_reconciler_receiver) = @@ -60,23 +62,21 @@ impl Daemon { let idm = idm.launch().await?; let console = DaemonConsole::new().await?; let console = console.launch().await?; - let (events, generator) = DaemonEventGenerator::new( - guests.clone(), - guest_reconciler_notify.clone(), - idm.clone(), - oci_progress_receiver, - ) - .await?; + let (events, generator) = + DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) + .await?; let runtime_for_reconciler = runtime.dupe().await?; let guest_reconciler = GuestReconciler::new( guests.clone(), events.clone(), runtime_for_reconciler, + packer.clone(), guest_reconciler_notify.clone(), )?; let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?; let generator_task = generator.launch().await?; + Ok(Self { store, guests, @@ -86,16 +86,18 @@ impl Daemon { generator_task, idm, console, + packer, }) } pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { - let control_service = RuntimeControlService::new( + let control_service = DaemonControlService::new( self.events.clone(), self.console.clone(), self.idm.clone(), self.guests.clone(), self.guest_reconciler_notify.clone(), + self.packer.clone(), ); let mut server = Server::builder(); @@ -121,7 +123,7 @@ impl Daemon { ControlDialAddress::UnixSocket { path } => { let path = PathBuf::from(path); if path.exists() { - tokio::fs::remove_file(&path).await?; + fs::remove_file(&path).await?; } let listener = UnixListener::bind(path)?; let stream = UnixListenerStream::new(listener); diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs index 8544728..45945a2 100644 --- a/crates/daemon/src/oci.rs +++ b/crates/daemon/src/oci.rs @@ -1,17 +1,17 @@ use krata::v1::control::{ - OciProgressEvent, OciProgressEventLayer, OciProgressEventLayerPhase, OciProgressEventPhase, + PullImageProgress, PullImageProgressLayer, PullImageProgressLayerPhase, PullImageProgressPhase, }; use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase}; -fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer { - OciProgressEventLayer { +fn convert_oci_layer_progress(layer: OciProgressLayer) -> PullImageProgressLayer { + PullImageProgressLayer { id: layer.id, phase: match layer.phase { - OciProgressLayerPhase::Waiting => OciProgressEventLayerPhase::Waiting, - OciProgressLayerPhase::Downloading => OciProgressEventLayerPhase::Downloading, - OciProgressLayerPhase::Downloaded => OciProgressEventLayerPhase::Downloaded, - OciProgressLayerPhase::Extracting => OciProgressEventLayerPhase::Extracting, - OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted, + OciProgressLayerPhase::Waiting => PullImageProgressLayerPhase::Waiting, + OciProgressLayerPhase::Downloading => PullImageProgressLayerPhase::Downloading, + OciProgressLayerPhase::Downloaded => PullImageProgressLayerPhase::Downloaded, + OciProgressLayerPhase::Extracting => PullImageProgressLayerPhase::Extracting, + OciProgressLayerPhase::Extracted => PullImageProgressLayerPhase::Extracted, } .into(), value: layer.value, @@ -19,16 +19,15 @@ fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer } } -pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent { - OciProgressEvent { - guest_id: oci.id, +pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { + PullImageProgress { phase: match oci.phase { - OciProgressPhase::Resolving => OciProgressEventPhase::Resolving, - OciProgressPhase::Resolved => OciProgressEventPhase::Resolved, - OciProgressPhase::ConfigAcquire => OciProgressEventPhase::ConfigAcquire, - OciProgressPhase::LayerAcquire => OciProgressEventPhase::LayerAcquire, - OciProgressPhase::Packing => OciProgressEventPhase::Packing, - OciProgressPhase::Complete => OciProgressEventPhase::Complete, + OciProgressPhase::Resolving => PullImageProgressPhase::Resolving, + OciProgressPhase::Resolved => PullImageProgressPhase::Resolved, + OciProgressPhase::ConfigAcquire => PullImageProgressPhase::ConfigAcquire, + OciProgressPhase::LayerAcquire => PullImageProgressPhase::LayerAcquire, + OciProgressPhase::Packing => PullImageProgressPhase::Packing, + OciProgressPhase::Complete => PullImageProgressPhase::Complete, } .into(), layers: oci diff --git a/crates/daemon/src/reconcile/guest.rs b/crates/daemon/src/reconcile/guest.rs index 429c7ac..ef8478e 100644 --- a/crates/daemon/src/reconcile/guest.rs +++ b/crates/daemon/src/reconcile/guest.rs @@ -9,10 +9,11 @@ use krata::launchcfg::LaunchPackedFormat; use krata::v1::{ common::{ guest_image_spec::Image, Guest, GuestErrorInfo, GuestExitInfo, GuestNetworkState, - GuestState, GuestStatus, + GuestOciImageFormat, GuestState, GuestStatus, }, control::GuestChangedEvent, }; +use krataoci::packer::{service::OciPackerService, OciPackedFormat}; use kratart::{launch::GuestLaunchRequest, GuestInfo, Runtime}; use log::{error, info, trace, warn}; use tokio::{ @@ -55,6 +56,7 @@ pub struct GuestReconciler { guests: GuestStore, events: DaemonEventContext, runtime: Runtime, + packer: OciPackerService, tasks: Arc>>, guest_reconciler_notify: Sender, reconcile_lock: Arc>, @@ -65,12 +67,14 @@ impl GuestReconciler { guests: GuestStore, events: DaemonEventContext, runtime: Runtime, + packer: OciPackerService, guest_reconciler_notify: Sender, ) -> Result { Ok(Self { guests, events, runtime, + packer, tasks: Arc::new(Mutex::new(HashMap::new())), guest_reconciler_notify, reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)), @@ -233,9 +237,27 @@ impl GuestReconciler { return Err(anyhow!("oci spec not specified")); } }; - let task = spec.task.as_ref().cloned().unwrap_or_default(); + let image = self + .packer + .recall( + &oci.digest, + match oci.format() { + GuestOciImageFormat::Unknown => OciPackedFormat::Squashfs, + GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, + GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, + }, + ) + .await?; + + let Some(image) = image else { + return Err(anyhow!( + "image {} in the requested format did not exist", + oci.digest + )); + }; + let info = self .runtime .launch(GuestLaunchRequest { @@ -244,9 +266,9 @@ impl GuestReconciler { name: if spec.name.is_empty() { None } else { - Some(&spec.name) + Some(spec.name.clone()) }, - image: &oci.image, + image, vcpus: spec.vcpus, mem: spec.mem, env: task diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 2fe3488..78bc7ed 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -29,8 +29,15 @@ message GuestImageSpec { } } +enum GuestOciImageFormat { + GUEST_OCI_IMAGE_FORMAT_UNKNOWN = 0; + GUEST_OCI_IMAGE_FORMAT_SQUASHFS = 1; + GUEST_OCI_IMAGE_FORMAT_EROFS = 2; +} + message GuestOciImageSpec { - string image = 1; + string digest = 1; + GuestOciImageFormat format = 2; } message GuestTaskSpec { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index ca4adf0..61cf91b 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -14,11 +14,14 @@ service ControlService { rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply); rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); + rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); + rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); + + rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); - rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); - rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); + rpc PullImage(PullImageRequest) returns (stream PullImageReply); } message CreateGuestRequest { @@ -63,7 +66,6 @@ message WatchEventsRequest {} message WatchEventsReply { oneof event { GuestChangedEvent guest_changed = 1; - OciProgressEvent oci_progress = 2; } } @@ -71,40 +73,6 @@ message GuestChangedEvent { krata.v1.common.Guest guest = 1; } -enum OciProgressEventLayerPhase { - OCI_PROGRESS_EVENT_LAYER_PHASE_UNKNOWN = 0; - OCI_PROGRESS_EVENT_LAYER_PHASE_WAITING = 1; - OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADING = 2; - OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADED = 3; - OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTING = 4; - OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTED = 5; -} - -message OciProgressEventLayer { - string id = 1; - OciProgressEventLayerPhase phase = 2; - uint64 value = 3; - uint64 total = 4; -} - -enum OciProgressEventPhase { - OCI_PROGRESS_EVENT_PHASE_UNKNOWN = 0; - OCI_PROGRESS_EVENT_PHASE_RESOLVING = 1; - OCI_PROGRESS_EVENT_PHASE_RESOLVED = 2; - OCI_PROGRESS_EVENT_PHASE_CONFIG_ACQUIRE = 3; - OCI_PROGRESS_EVENT_PHASE_LAYER_ACQUIRE = 4; - OCI_PROGRESS_EVENT_PHASE_PACKING = 5; - OCI_PROGRESS_EVENT_PHASE_COMPLETE = 6; -} - -message OciProgressEvent { - string guest_id = 1; - OciProgressEventPhase phase = 2; - repeated OciProgressEventLayer layers = 3; - uint64 value = 4; - uint64 total = 5; -} - message ReadGuestMetricsRequest { string guest_id = 1; } @@ -120,3 +88,47 @@ message SnoopIdmReply { uint32 to = 2; krata.bus.idm.IdmPacket packet = 3; } + +enum PullImageProgressLayerPhase { + PULL_IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; + PULL_IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; + PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; + PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; + PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; + PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; +} + +message PullImageProgressLayer { + string id = 1; + PullImageProgressLayerPhase phase = 2; + uint64 value = 3; + uint64 total = 4; +} + +enum PullImageProgressPhase { + PULL_IMAGE_PROGRESS_PHASE_UNKNOWN = 0; + PULL_IMAGE_PROGRESS_PHASE_RESOLVING = 1; + PULL_IMAGE_PROGRESS_PHASE_RESOLVED = 2; + PULL_IMAGE_PROGRESS_PHASE_CONFIG_ACQUIRE = 3; + PULL_IMAGE_PROGRESS_PHASE_LAYER_ACQUIRE = 4; + PULL_IMAGE_PROGRESS_PHASE_PACKING = 5; + PULL_IMAGE_PROGRESS_PHASE_COMPLETE = 6; +} + +message PullImageProgress { + PullImageProgressPhase phase = 1; + repeated PullImageProgressLayer layers = 2; + uint64 value = 3; + uint64 total = 4; +} + +message PullImageRequest { + string image = 1; + krata.v1.common.GuestOciImageFormat format = 2; +} + +message PullImageReply { + PullImageProgress progress = 1; + string digest = 2; + krata.v1.common.GuestOciImageFormat format = 3; +} diff --git a/crates/network/src/autonet.rs b/crates/network/src/autonet.rs index 6d46160..9b27d62 100644 --- a/crates/network/src/autonet.rs +++ b/crates/network/src/autonet.rs @@ -179,8 +179,6 @@ impl AutoNetworkWatcher { break; }, - Ok(_) => {}, - Err(error) => { warn!("failed to receive event: {}", error); } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index fce9f58..8bd2932 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -3,13 +3,12 @@ use std::{env::args, path::PathBuf}; use anyhow::Result; use env_logger::Env; use krataoci::{ - cache::ImageCache, - compiler::OciImageCompiler, name::ImageName, - packer::OciPackerFormat, + packer::{service::OciPackerService, OciPackedFormat}, progress::{OciProgress, OciProgressContext}, + registry::OciPlatform, }; -use tokio::{fs, sync::broadcast}; +use tokio::{fs, sync::mpsc::channel}; #[tokio::main] async fn main() -> Result<()> { @@ -23,16 +22,16 @@ async fn main() -> Result<()> { fs::create_dir(&cache_dir).await?; } - let cache = ImageCache::new(&cache_dir)?; - - let (sender, mut receiver) = broadcast::channel::(1000); + let (sender, mut receiver) = channel::(100); tokio::task::spawn(async move { loop { - let Some(progress) = receiver.recv().await.ok() else { - break; + let mut progresses = Vec::new(); + let _ = receiver.recv_many(&mut progresses, 100).await; + let Some(progress) = progresses.last() else { + continue; }; println!("phase {:?}", progress.phase); - for (id, layer) in progress.layers { + for (id, layer) in &progress.layers { println!( "{} {:?} {} of {}", id, layer.phase, layer.value, layer.total @@ -41,14 +40,14 @@ async fn main() -> Result<()> { } }); let context = OciProgressContext::new(sender); - let compiler = OciImageCompiler::new(&cache, seed, context)?; - let info = compiler - .compile(&image.to_string(), &image, OciPackerFormat::Squashfs) + let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; + let packed = service + .request(image.clone(), OciPackedFormat::Squashfs, context) .await?; println!( "generated squashfs of {} to {}", image, - info.image.to_string_lossy() + packed.path.to_string_lossy() ); Ok(()) } diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs new file mode 100644 index 0000000..5f13ae3 --- /dev/null +++ b/crates/oci/src/assemble.rs @@ -0,0 +1,239 @@ +use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; +use crate::progress::OciBoundProgress; +use crate::vfs::{VfsNode, VfsTree}; +use anyhow::{anyhow, Result}; +use log::{debug, trace, warn}; +use oci_spec::image::{ImageConfiguration, ImageManifest}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::Arc; +use tokio::fs; +use tokio::io::AsyncRead; +use tokio_stream::StreamExt; +use tokio_tar::{Archive, Entry}; +use uuid::Uuid; + +pub struct OciImageAssembled { + pub digest: String, + pub manifest: ImageManifest, + pub config: ImageConfiguration, + pub vfs: Arc, + pub tmp_dir: Option, +} + +impl Drop for OciImageAssembled { + fn drop(&mut self) { + if let Some(tmp) = self.tmp_dir.clone() { + tokio::task::spawn(async move { + let _ = fs::remove_dir_all(&tmp).await; + }); + } + } +} + +pub struct OciImageAssembler { + downloader: OciImageFetcher, + resolved: OciResolvedImage, + progress: OciBoundProgress, + work_dir: PathBuf, + disk_dir: PathBuf, + tmp_dir: Option, +} + +impl OciImageAssembler { + pub async fn new( + downloader: OciImageFetcher, + resolved: OciResolvedImage, + progress: OciBoundProgress, + work_dir: Option, + disk_dir: Option, + ) -> Result { + let tmp_dir = if work_dir.is_none() || disk_dir.is_none() { + let mut tmp_dir = std::env::temp_dir().clone(); + tmp_dir.push(format!("oci-assemble-{}", Uuid::new_v4())); + Some(tmp_dir) + } else { + None + }; + + let work_dir = if let Some(work_dir) = work_dir { + work_dir + } else { + let mut tmp_dir = tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was not created when expected"))?; + tmp_dir.push("work"); + tmp_dir + }; + + let target_dir = if let Some(target_dir) = disk_dir { + target_dir + } else { + let mut tmp_dir = tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was not created when expected"))?; + tmp_dir.push("image"); + tmp_dir + }; + + fs::create_dir_all(&work_dir).await?; + fs::create_dir_all(&target_dir).await?; + + Ok(OciImageAssembler { + downloader, + resolved, + progress, + work_dir, + disk_dir: target_dir, + tmp_dir, + }) + } + + pub async fn assemble(self) -> Result { + debug!("assemble"); + let mut layer_dir = self.work_dir.clone(); + layer_dir.push("layer"); + fs::create_dir_all(&layer_dir).await?; + self.assemble_with(&layer_dir).await + } + + async fn assemble_with(self, layer_dir: &Path) -> Result { + let local = self + .downloader + .download(self.resolved.clone(), layer_dir) + .await?; + let mut vfs = VfsTree::new(); + for layer in &local.layers { + debug!( + "process layer digest={} compression={:?}", + &layer.digest, layer.compression, + ); + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, 0, 1); + }) + .await; + debug!("process layer digest={}", &layer.digest,); + let mut archive = layer.archive().await?; + let mut entries = archive.entries()?; + while let Some(entry) = entries.next().await { + let mut entry = entry?; + let path = entry.path()?; + let Some(name) = path.file_name() else { + continue; + }; + let Some(name) = name.to_str() else { + continue; + }; + if name.starts_with(".wh.") { + self.process_whiteout_entry(&mut vfs, &entry, name, layer) + .await?; + } else { + vfs.insert_tar_entry(&entry)?; + self.process_write_entry(&mut vfs, &mut entry, layer) + .await?; + } + } + self.progress + .update(|progress| { + progress.extracted_layer(&layer.digest); + }) + .await; + } + for layer in &local.layers { + if layer.path.exists() { + fs::remove_file(&layer.path).await?; + } + } + Ok(OciImageAssembled { + vfs: Arc::new(vfs), + digest: self.resolved.digest, + manifest: self.resolved.manifest, + config: local.config, + tmp_dir: self.tmp_dir, + }) + } + + async fn process_whiteout_entry( + &self, + vfs: &mut VfsTree, + entry: &Entry>>>, + name: &str, + layer: &OciImageLayer, + ) -> Result<()> { + let path = entry.path()?; + let mut path = path.to_path_buf(); + path.pop(); + + let opaque = name == ".wh..wh..opq"; + + if !opaque { + let file = &name[4..]; + path.push(file); + } + + trace!( + "whiteout entry {:?} layer={} path={:?}", + entry.path()?, + &layer.digest, + path + ); + + let result = vfs.root.remove(&path); + if let Some((parent, mut removed)) = result { + delete_disk_paths(&removed).await?; + if opaque { + removed.children.clear(); + parent.children.push(removed); + } + } else { + warn!( + "whiteout entry layer={} path={:?} did not exist", + &layer.digest, path + ); + } + Ok(()) + } + + async fn process_write_entry( + &self, + vfs: &mut VfsTree, + entry: &mut Entry>>>, + layer: &OciImageLayer, + ) -> Result<()> { + if !entry.header().entry_type().is_file() { + return Ok(()); + } + trace!( + "unpack entry layer={} path={:?} type={:?}", + &layer.digest, + entry.path()?, + entry.header().entry_type(), + ); + entry.set_preserve_permissions(false); + entry.set_unpack_xattrs(false); + entry.set_preserve_mtime(false); + let path = entry + .unpack_in(&self.disk_dir) + .await? + .ok_or(anyhow!("unpack did not return a path"))?; + vfs.set_disk_path(&entry.path()?, &path)?; + Ok(()) + } +} + +async fn delete_disk_paths(node: &VfsNode) -> Result<()> { + let mut queue = vec![node]; + while !queue.is_empty() { + let node = queue.remove(0); + if let Some(ref disk_path) = node.disk_path { + if !disk_path.exists() { + warn!("disk path {:?} does not exist", disk_path); + } + fs::remove_file(disk_path).await?; + } + let children = node.children.iter().collect::>(); + queue.extend_from_slice(&children); + } + Ok(()) +} diff --git a/crates/oci/src/compiler.rs b/crates/oci/src/compiler.rs deleted file mode 100644 index a0b7fee..0000000 --- a/crates/oci/src/compiler.rs +++ /dev/null @@ -1,388 +0,0 @@ -use crate::cache::ImageCache; -use crate::fetch::{OciImageDownloader, OciImageLayer}; -use crate::name::ImageName; -use crate::packer::OciPackerFormat; -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; -use crate::registry::OciRegistryPlatform; -use anyhow::{anyhow, Result}; -use indexmap::IndexMap; -use log::{debug, trace}; -use oci_spec::image::{ImageConfiguration, ImageManifest}; -use std::borrow::Cow; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use tokio::fs; -use tokio::io::AsyncRead; -use tokio_stream::StreamExt; -use tokio_tar::{Archive, Entry}; -use uuid::Uuid; - -pub const IMAGE_PACKER_VERSION: u64 = 2; - -pub struct ImageInfo { - pub image: PathBuf, - pub manifest: ImageManifest, - pub config: ImageConfiguration, -} - -impl ImageInfo { - pub fn new( - image: PathBuf, - manifest: ImageManifest, - config: ImageConfiguration, - ) -> Result { - Ok(ImageInfo { - image, - manifest, - config, - }) - } -} - -pub struct OciImageCompiler<'a> { - cache: &'a ImageCache, - seed: Option, - progress: OciProgressContext, -} - -impl OciImageCompiler<'_> { - pub fn new( - cache: &ImageCache, - seed: Option, - progress: OciProgressContext, - ) -> Result { - Ok(OciImageCompiler { - cache, - seed, - progress, - }) - } - - pub async fn compile( - &self, - id: &str, - image: &ImageName, - format: OciPackerFormat, - ) -> Result { - debug!("compile image={image} format={:?}", format); - let mut tmp_dir = std::env::temp_dir().clone(); - tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); - - let mut image_dir = tmp_dir.clone(); - image_dir.push("image"); - fs::create_dir_all(&image_dir).await?; - - let mut layer_dir = tmp_dir.clone(); - layer_dir.push("layer"); - fs::create_dir_all(&layer_dir).await?; - - let mut packed_file = tmp_dir.clone(); - packed_file.push("image.packed"); - - let _guard = scopeguard::guard(tmp_dir.to_path_buf(), |delete| { - tokio::task::spawn(async move { - let _ = fs::remove_dir_all(delete).await; - }); - }); - let info = self - .download_and_compile(id, image, &layer_dir, &image_dir, &packed_file, format) - .await?; - Ok(info) - } - - async fn download_and_compile( - &self, - id: &str, - image: &ImageName, - layer_dir: &Path, - image_dir: &Path, - packed_file: &Path, - format: OciPackerFormat, - ) -> Result { - let mut progress = OciProgress { - id: id.to_string(), - phase: OciProgressPhase::Resolving, - layers: IndexMap::new(), - value: 0, - total: 0, - }; - self.progress.update(&progress); - let downloader = OciImageDownloader::new( - self.seed.clone(), - layer_dir.to_path_buf(), - OciRegistryPlatform::current(), - self.progress.clone(), - ); - let resolved = downloader.resolve(image.clone()).await?; - let cache_key = format!( - "manifest={}:version={}:format={}\n", - resolved.digest, - IMAGE_PACKER_VERSION, - format.id(), - ); - let cache_digest = sha256::digest(cache_key); - - if let Some(cached) = self.cache.recall(&cache_digest, format).await? { - return Ok(cached); - } - - progress.phase = OciProgressPhase::Resolved; - for layer in resolved.manifest.layers() { - progress.add_layer(layer.digest()); - } - self.progress.update(&progress); - - let local = downloader.download(resolved, &mut progress).await?; - for layer in &local.layers { - debug!( - "process layer digest={} compression={:?}", - &layer.digest, layer.compression, - ); - progress.extracting_layer(&layer.digest, 0, 1); - self.progress.update(&progress); - let (whiteouts, count) = self.process_layer_whiteout(layer, image_dir).await?; - progress.extracting_layer(&layer.digest, 0, count); - self.progress.update(&progress); - debug!( - "process layer digest={} whiteouts={:?}", - &layer.digest, whiteouts - ); - let mut archive = layer.archive().await?; - let mut entries = archive.entries()?; - let mut completed = 0; - while let Some(entry) = entries.next().await { - let mut entry = entry?; - let path = entry.path()?; - let mut maybe_whiteout_path_str = - path.to_str().map(|x| x.to_string()).unwrap_or_default(); - if (completed % 10) == 0 { - progress.extracting_layer(&layer.digest, completed, count); - } - completed += 1; - self.progress.update(&progress); - if whiteouts.contains(&maybe_whiteout_path_str) { - continue; - } - maybe_whiteout_path_str.push('/'); - if whiteouts.contains(&maybe_whiteout_path_str) { - continue; - } - let Some(name) = path.file_name() else { - continue; - }; - let Some(name) = name.to_str() else { - continue; - }; - - if name.starts_with(".wh.") { - continue; - } else { - self.process_write_entry(&mut entry, layer, image_dir) - .await?; - } - } - progress.extracted_layer(&layer.digest); - self.progress.update(&progress); - } - - for layer in &local.layers { - if layer.path.exists() { - fs::remove_file(&layer.path).await?; - } - } - - let image_dir_pack = image_dir.to_path_buf(); - let packed_file_pack = packed_file.to_path_buf(); - let progress_pack = progress.clone(); - let progress_context = self.progress.clone(); - let format_pack = format; - progress = tokio::task::spawn_blocking(move || { - OciImageCompiler::pack( - format_pack, - &image_dir_pack, - &packed_file_pack, - progress_pack, - progress_context, - ) - }) - .await??; - - let info = ImageInfo::new( - packed_file.to_path_buf(), - local.image.manifest, - local.config, - )?; - let info = self.cache.store(&cache_digest, &info, format).await?; - progress.phase = OciProgressPhase::Complete; - progress.value = 0; - progress.total = 0; - self.progress.update(&progress); - Ok(info) - } - - async fn process_layer_whiteout( - &self, - layer: &OciImageLayer, - image_dir: &Path, - ) -> Result<(Vec, usize)> { - let mut whiteouts = Vec::new(); - let mut archive = layer.archive().await?; - let mut entries = archive.entries()?; - let mut count = 0usize; - while let Some(entry) = entries.next().await { - let entry = entry?; - count += 1; - let path = entry.path()?; - let Some(name) = path.file_name() else { - continue; - }; - let Some(name) = name.to_str() else { - continue; - }; - - if name.starts_with(".wh.") { - let path = self - .process_whiteout_entry(&entry, name, layer, image_dir) - .await?; - if let Some(path) = path { - whiteouts.push(path); - } - } - } - Ok((whiteouts, count)) - } - - async fn process_whiteout_entry( - &self, - entry: &Entry>>>, - name: &str, - layer: &OciImageLayer, - image_dir: &Path, - ) -> Result> { - let path = entry.path()?; - let mut dst = self.check_safe_entry(path.clone(), image_dir)?; - dst.pop(); - let mut path = path.to_path_buf(); - path.pop(); - - let opaque = name == ".wh..wh..opq"; - - if !opaque { - let file = &name[4..]; - dst.push(file); - path.push(file); - self.check_safe_path(&dst, image_dir)?; - } - - trace!("whiteout entry layer={} path={:?}", &layer.digest, path,); - - let whiteout = path - .to_str() - .ok_or(anyhow!("unable to convert path to string"))? - .to_string(); - - if opaque { - if dst.is_dir() { - let mut reader = fs::read_dir(dst).await?; - while let Some(entry) = reader.next_entry().await? { - let path = entry.path(); - if path.is_symlink() || path.is_file() { - fs::remove_file(&path).await?; - } else if path.is_dir() { - fs::remove_dir_all(&path).await?; - } else { - return Err(anyhow!("opaque whiteout entry did not exist")); - } - } - } else { - debug!( - "whiteout opaque entry missing locally layer={} path={:?} local={:?}", - &layer.digest, - entry.path()?, - dst, - ); - } - } else if dst.is_file() || dst.is_symlink() { - fs::remove_file(&dst).await?; - } else if dst.is_dir() { - fs::remove_dir_all(&dst).await?; - } else { - debug!( - "whiteout entry missing locally layer={} path={:?} local={:?}", - &layer.digest, - entry.path()?, - dst, - ); - } - Ok(if opaque { None } else { Some(whiteout) }) - } - - async fn process_write_entry( - &self, - entry: &mut Entry>>>, - layer: &OciImageLayer, - image_dir: &Path, - ) -> Result<()> { - let uid = entry.header().uid()?; - let gid = entry.header().gid()?; - trace!( - "unpack entry layer={} path={:?} type={:?} uid={} gid={}", - &layer.digest, - entry.path()?, - entry.header().entry_type(), - uid, - gid, - ); - entry.set_preserve_mtime(true); - entry.set_preserve_permissions(true); - entry.set_unpack_xattrs(true); - if let Some(path) = entry.unpack_in(image_dir).await? { - if !path.is_symlink() { - std::os::unix::fs::chown(path, Some(uid as u32), Some(gid as u32))?; - } - } - Ok(()) - } - - fn check_safe_entry(&self, path: Cow, image_dir: &Path) -> Result { - let mut dst = image_dir.to_path_buf(); - dst.push(path); - if let Some(name) = dst.file_name() { - if let Some(name) = name.to_str() { - if name.starts_with(".wh.") { - let copy = dst.clone(); - dst.pop(); - self.check_safe_path(&dst, image_dir)?; - return Ok(copy); - } - } - } - self.check_safe_path(&dst, image_dir)?; - Ok(dst) - } - - fn check_safe_path(&self, dst: &Path, image_dir: &Path) -> Result<()> { - let resolved = path_clean::clean(dst); - if !resolved.starts_with(image_dir) { - return Err(anyhow!("layer attempts to work outside image dir")); - } - Ok(()) - } - - fn pack( - format: OciPackerFormat, - image_dir: &Path, - packed_file: &Path, - mut progress: OciProgress, - progress_context: OciProgressContext, - ) -> Result { - let backend = format.detect_best_backend(); - let backend = backend.create(); - backend.pack(&mut progress, &progress_context, image_dir, packed_file)?; - std::fs::remove_dir_all(image_dir)?; - progress.phase = OciProgressPhase::Packing; - progress.value = progress.total; - progress_context.update(&progress); - Ok(progress) - } -} diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index deccfe0..9260afe 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -1,8 +1,8 @@ -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; +use crate::progress::{OciBoundProgress, OciProgressPhase}; use super::{ name::ImageName, - registry::{OciRegistryClient, OciRegistryPlatform}, + registry::{OciPlatform, OciRegistryClient}, }; use std::{ @@ -24,11 +24,10 @@ use tokio::{ use tokio_stream::StreamExt; use tokio_tar::Archive; -pub struct OciImageDownloader { +pub struct OciImageFetcher { seed: Option, - storage: PathBuf, - platform: OciRegistryPlatform, - progress: OciProgressContext, + platform: OciPlatform, + progress: OciBoundProgress, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -77,16 +76,14 @@ pub struct OciLocalImage { pub layers: Vec, } -impl OciImageDownloader { +impl OciImageFetcher { pub fn new( seed: Option, - storage: PathBuf, - platform: OciRegistryPlatform, - progress: OciProgressContext, - ) -> OciImageDownloader { - OciImageDownloader { + platform: OciPlatform, + progress: OciBoundProgress, + ) -> OciImageFetcher { + OciImageFetcher { seed, - storage, platform, progress, } @@ -216,12 +213,14 @@ impl OciImageDownloader { pub async fn download( &self, image: OciResolvedImage, - progress: &mut OciProgress, + layer_dir: &Path, ) -> Result { let config: ImageConfiguration; - - progress.phase = OciProgressPhase::ConfigAcquire; - self.progress.update(progress); + self.progress + .update(|progress| { + progress.phase = OciProgressPhase::ConfigAcquire; + }) + .await; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; if let Some(seeded) = self .load_seed_json_blob::(image.manifest.config()) @@ -234,18 +233,31 @@ impl OciImageDownloader { .await?; config = serde_json::from_slice(&config_bytes)?; } - progress.phase = OciProgressPhase::LayerAcquire; - self.progress.update(progress); + self.progress + .update(|progress| { + progress.phase = OciProgressPhase::LayerAcquire; + + for layer in image.manifest.layers() { + progress.add_layer(layer.digest(), layer.size() as usize); + } + }) + .await; let mut layers = Vec::new(); for layer in image.manifest.layers() { - progress.downloading_layer(layer.digest(), 0, layer.size() as usize); - self.progress.update(progress); + self.progress + .update(|progress| { + progress.downloading_layer(layer.digest(), 0, layer.size() as usize); + }) + .await; layers.push( - self.acquire_layer(&image.name, layer, &mut client, progress) + self.acquire_layer(&image.name, layer, layer_dir, &mut client) .await?, ); - progress.downloaded_layer(layer.digest()); - self.progress.update(progress); + self.progress + .update(|progress| { + progress.downloaded_layer(layer.digest()); + }) + .await; } Ok(OciLocalImage { image, @@ -258,28 +270,22 @@ impl OciImageDownloader { &self, image: &ImageName, layer: &Descriptor, + layer_dir: &Path, client: &mut OciRegistryClient, - progress: &mut OciProgress, ) -> Result { debug!( "acquire layer digest={} size={}", layer.digest(), layer.size() ); - let mut layer_path = self.storage.clone(); + let mut layer_path = layer_dir.to_path_buf(); layer_path.push(format!("{}.layer", layer.digest())); let seeded = self.extract_seed_blob(layer, &layer_path).await?; if !seeded { let file = File::create(&layer_path).await?; let size = client - .write_blob_to_file( - &image.name, - layer, - file, - Some(progress), - Some(&self.progress), - ) + .write_blob_to_file(&image.name, layer, file, Some(self.progress.clone())) .await?; if layer.size() as u64 != size { return Err(anyhow!( diff --git a/crates/oci/src/lib.rs b/crates/oci/src/lib.rs index 1b117a7..4372fd3 100644 --- a/crates/oci/src/lib.rs +++ b/crates/oci/src/lib.rs @@ -1,7 +1,7 @@ -pub mod cache; -pub mod compiler; +pub mod assemble; pub mod fetch; pub mod name; pub mod packer; pub mod progress; pub mod registry; +pub mod vfs; diff --git a/crates/oci/src/packer.rs b/crates/oci/src/packer.rs deleted file mode 100644 index 2022ec0..0000000 --- a/crates/oci/src/packer.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::{ - fs::File, - io::{BufWriter, ErrorKind, Read}, - os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}, - path::{Path, PathBuf}, - process::{Command, Stdio}, -}; - -use anyhow::{anyhow, Result}; -use backhand::{compression::Compressor, FilesystemCompressor, FilesystemWriter, NodeHeader}; -use log::{trace, warn}; -use walkdir::WalkDir; - -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; - -#[derive(Debug, Default, Clone, Copy)] -pub enum OciPackerFormat { - #[default] - Squashfs, - Erofs, -} - -#[derive(Debug, Clone, Copy)] -pub enum OciPackerBackendType { - Backhand, - MkSquashfs, - MkfsErofs, -} - -impl OciPackerFormat { - pub fn id(&self) -> u8 { - match self { - OciPackerFormat::Squashfs => 0, - OciPackerFormat::Erofs => 1, - } - } - - pub fn extension(&self) -> &str { - match self { - OciPackerFormat::Squashfs => "erofs", - OciPackerFormat::Erofs => "erofs", - } - } - - pub fn detect_best_backend(&self) -> OciPackerBackendType { - match self { - OciPackerFormat::Squashfs => { - let status = Command::new("mksquashfs") - .arg("-version") - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::null()) - .status() - .ok(); - - let Some(code) = status.and_then(|x| x.code()) else { - return OciPackerBackendType::Backhand; - }; - - if code == 0 { - OciPackerBackendType::MkSquashfs - } else { - OciPackerBackendType::Backhand - } - } - OciPackerFormat::Erofs => OciPackerBackendType::MkfsErofs, - } - } -} - -impl OciPackerBackendType { - pub fn format(&self) -> OciPackerFormat { - match self { - OciPackerBackendType::Backhand => OciPackerFormat::Squashfs, - OciPackerBackendType::MkSquashfs => OciPackerFormat::Squashfs, - OciPackerBackendType::MkfsErofs => OciPackerFormat::Erofs, - } - } - - pub fn create(&self) -> Box { - match self { - OciPackerBackendType::Backhand => { - Box::new(OciPackerBackhand {}) as Box - } - OciPackerBackendType::MkSquashfs => { - Box::new(OciPackerMkSquashfs {}) as Box - } - OciPackerBackendType::MkfsErofs => { - Box::new(OciPackerMkfsErofs {}) as Box - } - } - } -} - -pub trait OciPackerBackend { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()>; -} - -pub struct OciPackerBackhand {} - -impl OciPackerBackend for OciPackerBackhand { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); - let mut writer = FilesystemWriter::default(); - writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); - let walk = WalkDir::new(directory).follow_links(false); - for entry in walk { - let entry = entry?; - let rel = entry - .path() - .strip_prefix(directory)? - .to_str() - .ok_or_else(|| anyhow!("failed to strip prefix of tmpdir"))?; - let rel = format!("/{}", rel); - trace!("squash write {}", rel); - let typ = entry.file_type(); - let metadata = std::fs::symlink_metadata(entry.path())?; - let uid = metadata.uid(); - let gid = metadata.gid(); - let mode = metadata.permissions().mode(); - let mtime = metadata.mtime(); - - if rel == "/" { - writer.set_root_uid(uid); - writer.set_root_gid(gid); - writer.set_root_mode(mode as u16); - continue; - } - - let header = NodeHeader { - permissions: mode as u16, - uid, - gid, - mtime: mtime as u32, - }; - if typ.is_symlink() { - let symlink = std::fs::read_link(entry.path())?; - let symlink = symlink - .to_str() - .ok_or_else(|| anyhow!("failed to read symlink"))?; - writer.push_symlink(symlink, rel, header)?; - } else if typ.is_dir() { - writer.push_dir(rel, header)?; - } else if typ.is_file() { - writer.push_file(ConsumingFileReader::new(entry.path()), rel, header)?; - } else if typ.is_block_device() { - let device = metadata.dev(); - writer.push_block_device(device as u32, rel, header)?; - } else if typ.is_char_device() { - let device = metadata.dev(); - writer.push_char_device(device as u32, rel, header)?; - } else if typ.is_fifo() { - writer.push_fifo(rel, header)?; - } else if typ.is_socket() { - writer.push_socket(rel, header)?; - } else { - return Err(anyhow!("invalid file type")); - } - } - - progress.phase = OciProgressPhase::Packing; - progress.value = 1; - progress_context.update(progress); - - let squash_file_path = file - .to_str() - .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; - - let file = File::create(file)?; - let mut bufwrite = BufWriter::new(file); - trace!("squash generate: {}", squash_file_path); - writer.write(&mut bufwrite)?; - Ok(()) - } -} - -struct ConsumingFileReader { - path: PathBuf, - file: Option, -} - -impl ConsumingFileReader { - fn new(path: &Path) -> ConsumingFileReader { - ConsumingFileReader { - path: path.to_path_buf(), - file: None, - } - } -} - -impl Read for ConsumingFileReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - if self.file.is_none() { - self.file = Some(File::open(&self.path)?); - } - let Some(ref mut file) = self.file else { - return Err(std::io::Error::new( - ErrorKind::NotFound, - "file was not opened", - )); - }; - file.read(buf) - } -} - -impl Drop for ConsumingFileReader { - fn drop(&mut self) { - let file = self.file.take(); - drop(file); - if let Err(error) = std::fs::remove_file(&self.path) { - warn!("failed to delete consuming file {:?}: {}", self.path, error); - } - } -} - -pub struct OciPackerMkSquashfs {} - -impl OciPackerBackend for OciPackerMkSquashfs { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); - let mut child = Command::new("mksquashfs") - .arg(directory) - .arg(file) - .arg("-comp") - .arg("gzip") - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::null()) - .spawn()?; - let status = child.wait()?; - if !status.success() { - Err(anyhow!( - "mksquashfs failed with exit code: {}", - status.code().unwrap() - )) - } else { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - progress_context.update(progress); - Ok(()) - } - } -} - -pub struct OciPackerMkfsErofs {} - -impl OciPackerBackend for OciPackerMkfsErofs { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); - let mut child = Command::new("mkfs.erofs") - .arg("-L") - .arg("root") - .arg(file) - .arg(directory) - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::null()) - .spawn()?; - let status = child.wait()?; - if !status.success() { - Err(anyhow!( - "mkfs.erofs failed with exit code: {}", - status.code().unwrap() - )) - } else { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - progress_context.update(progress); - Ok(()) - } - } -} diff --git a/crates/oci/src/packer/backend.rs b/crates/oci/src/packer/backend.rs new file mode 100644 index 0000000..45b0302 --- /dev/null +++ b/crates/oci/src/packer/backend.rs @@ -0,0 +1,201 @@ +use std::{path::Path, process::Stdio, sync::Arc}; + +use super::OciPackedFormat; +use crate::{ + progress::{OciBoundProgress, OciProgressPhase}, + vfs::VfsTree, +}; +use anyhow::{anyhow, Result}; +use log::warn; +use tokio::{pin, process::Command, select}; + +#[derive(Debug, Clone, Copy)] +pub enum OciPackerBackendType { + MkSquashfs, + MkfsErofs, +} + +impl OciPackerBackendType { + pub fn format(&self) -> OciPackedFormat { + match self { + OciPackerBackendType::MkSquashfs => OciPackedFormat::Squashfs, + OciPackerBackendType::MkfsErofs => OciPackedFormat::Erofs, + } + } + + pub fn create(&self) -> Box { + match self { + OciPackerBackendType::MkSquashfs => { + Box::new(OciPackerMkSquashfs {}) as Box + } + OciPackerBackendType::MkfsErofs => { + Box::new(OciPackerMkfsErofs {}) as Box + } + } + } +} + +#[async_trait::async_trait] +pub trait OciPackerBackend: Send + Sync { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()>; +} + +pub struct OciPackerMkSquashfs {} + +#[async_trait::async_trait] +impl OciPackerBackend for OciPackerMkSquashfs { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }) + .await; + + let mut child = Command::new("mksquashfs") + .arg("-") + .arg(file) + .arg("-comp") + .arg("gzip") + .arg("-tar") + .stdin(Stdio::piped()) + .stderr(Stdio::null()) + .stdout(Stdio::null()) + .spawn()?; + let stdin = child + .stdin + .take() + .ok_or(anyhow!("unable to acquire stdin stream"))?; + let mut writer = Some(tokio::task::spawn(async move { + if let Err(error) = vfs.write_to_tar(stdin).await { + warn!("failed to write tar: {}", error); + return Err(error); + } + Ok(()) + })); + let wait = child.wait(); + pin!(wait); + let status_result = loop { + if let Some(inner) = writer.as_mut() { + select! { + x = inner => { + writer = None; + match x { + Ok(_) => {}, + Err(error) => { + return Err(error.into()); + } + } + }, + status = &mut wait => { + break status; + } + }; + } else { + select! { + status = &mut wait => { + break status; + } + }; + } + }; + if let Some(writer) = writer { + writer.await??; + } + let status = status_result?; + if !status.success() { + Err(anyhow!( + "mksquashfs failed with exit code: {}", + status.code().unwrap() + )) + } else { + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }) + .await; + Ok(()) + } + } +} + +pub struct OciPackerMkfsErofs {} + +#[async_trait::async_trait] +impl OciPackerBackend for OciPackerMkfsErofs { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, path: &Path) -> Result<()> { + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }) + .await; + + let mut child = Command::new("mkfs.erofs") + .arg("-L") + .arg("root") + .arg("--tar=-") + .arg(path) + .stdin(Stdio::piped()) + .stderr(Stdio::null()) + .stdout(Stdio::null()) + .spawn()?; + let stdin = child + .stdin + .take() + .ok_or(anyhow!("unable to acquire stdin stream"))?; + let mut writer = Some(tokio::task::spawn( + async move { vfs.write_to_tar(stdin).await }, + )); + let wait = child.wait(); + pin!(wait); + let status_result = loop { + if let Some(inner) = writer.as_mut() { + select! { + x = inner => { + match x { + Ok(_) => { + writer = None; + }, + Err(error) => { + return Err(error.into()); + } + } + }, + status = &mut wait => { + break status; + } + }; + } else { + select! { + status = &mut wait => { + break status; + } + }; + } + }; + if let Some(writer) = writer { + writer.await??; + } + let status = status_result?; + if !status.success() { + Err(anyhow!( + "mkfs.erofs failed with exit code: {}", + status.code().unwrap() + )) + } else { + progress + .update(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }) + .await; + Ok(()) + } + } +} diff --git a/crates/oci/src/cache.rs b/crates/oci/src/packer/cache.rs similarity index 60% rename from crates/oci/src/cache.rs rename to crates/oci/src/packer/cache.rs index f79f335..460520a 100644 --- a/crates/oci/src/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,6 +1,5 @@ -use crate::packer::OciPackerFormat; +use crate::packer::{OciImagePacked, OciPackedFormat}; -use super::compiler::ImageInfo; use anyhow::Result; use log::debug; use oci_spec::image::{ImageConfiguration, ImageManifest}; @@ -8,18 +7,22 @@ use std::path::{Path, PathBuf}; use tokio::fs; #[derive(Clone)] -pub struct ImageCache { +pub struct OciPackerCache { cache_dir: PathBuf, } -impl ImageCache { - pub fn new(cache_dir: &Path) -> Result { - Ok(ImageCache { +impl OciPackerCache { + pub fn new(cache_dir: &Path) -> Result { + Ok(OciPackerCache { cache_dir: cache_dir.to_path_buf(), }) } - pub async fn recall(&self, digest: &str, format: OciPackerFormat) -> Result> { + pub async fn recall( + &self, + digest: &str, + format: OciPackedFormat, + ) -> Result> { let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -40,7 +43,13 @@ impl ImageCache { let config_text = fs::read_to_string(&config_path).await?; let config: ImageConfiguration = serde_json::from_str(&config_text)?; debug!("cache hit digest={}", digest); - Some(ImageInfo::new(fs_path.clone(), manifest, config)?) + Some(OciImagePacked::new( + digest.to_string(), + fs_path.clone(), + format, + config, + manifest, + )) } else { None } @@ -51,24 +60,25 @@ impl ImageCache { ) } - pub async fn store( - &self, - digest: &str, - info: &ImageInfo, - format: OciPackerFormat, - ) -> Result { - debug!("cache store digest={}", digest); + pub async fn store(&self, packed: OciImagePacked) -> Result { + debug!("cache store digest={}", packed.digest); let mut fs_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); - fs_path.push(format!("{}.{}", digest, format.extension())); - manifest_path.push(format!("{}.manifest.json", digest)); - config_path.push(format!("{}.config.json", digest)); - fs::copy(&info.image, &fs_path).await?; - let manifest_text = serde_json::to_string_pretty(&info.manifest)?; + fs_path.push(format!("{}.{}", packed.digest, packed.format.extension())); + manifest_path.push(format!("{}.manifest.json", packed.digest)); + config_path.push(format!("{}.config.json", packed.digest)); + fs::copy(&packed.path, &fs_path).await?; + let manifest_text = serde_json::to_string_pretty(&packed.manifest)?; fs::write(&manifest_path, manifest_text).await?; - let config_text = serde_json::to_string_pretty(&info.config)?; + let config_text = serde_json::to_string_pretty(&packed.config)?; fs::write(&config_path, config_text).await?; - ImageInfo::new(fs_path.clone(), info.manifest.clone(), info.config.clone()) + Ok(OciImagePacked::new( + packed.digest, + fs_path.clone(), + packed.format, + packed.config, + packed.manifest, + )) } } diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs new file mode 100644 index 0000000..da1c4b2 --- /dev/null +++ b/crates/oci/src/packer/mod.rs @@ -0,0 +1,58 @@ +use std::path::PathBuf; + +use self::backend::OciPackerBackendType; +use oci_spec::image::{ImageConfiguration, ImageManifest}; + +pub mod backend; +pub mod cache; +pub mod service; + +#[derive(Debug, Default, Clone, Copy)] +pub enum OciPackedFormat { + #[default] + Squashfs, + Erofs, +} + +impl OciPackedFormat { + pub fn extension(&self) -> &str { + match self { + OciPackedFormat::Squashfs => "squashfs", + OciPackedFormat::Erofs => "erofs", + } + } + + pub fn backend(&self) -> OciPackerBackendType { + match self { + OciPackedFormat::Squashfs => OciPackerBackendType::MkSquashfs, + OciPackedFormat::Erofs => OciPackerBackendType::MkfsErofs, + } + } +} + +#[derive(Clone)] +pub struct OciImagePacked { + pub digest: String, + pub path: PathBuf, + pub format: OciPackedFormat, + pub config: ImageConfiguration, + pub manifest: ImageManifest, +} + +impl OciImagePacked { + pub fn new( + digest: String, + path: PathBuf, + format: OciPackedFormat, + config: ImageConfiguration, + manifest: ImageManifest, + ) -> OciImagePacked { + OciImagePacked { + digest, + path, + format, + config, + manifest, + } + } +} diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs new file mode 100644 index 0000000..54cf444 --- /dev/null +++ b/crates/oci/src/packer/service.rs @@ -0,0 +1,81 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Result}; + +use crate::{ + assemble::OciImageAssembler, + fetch::OciImageFetcher, + name::ImageName, + progress::{OciBoundProgress, OciProgress, OciProgressContext}, + registry::OciPlatform, +}; + +use super::{cache::OciPackerCache, OciImagePacked, OciPackedFormat}; + +#[derive(Clone)] +pub struct OciPackerService { + seed: Option, + platform: OciPlatform, + cache: OciPackerCache, +} + +impl OciPackerService { + pub fn new( + seed: Option, + cache_dir: &Path, + platform: OciPlatform, + ) -> Result { + Ok(OciPackerService { + seed, + cache: OciPackerCache::new(cache_dir)?, + platform, + }) + } + + pub async fn recall( + &self, + digest: &str, + format: OciPackedFormat, + ) -> Result> { + self.cache.recall(digest, format).await + } + + pub async fn request( + &self, + name: ImageName, + format: OciPackedFormat, + progress_context: OciProgressContext, + ) -> Result { + let progress = OciProgress::new(); + let progress = OciBoundProgress::new(progress_context.clone(), progress); + let fetcher = + OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); + let resolved = fetcher.resolve(name).await?; + if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { + return Ok(cached); + } + let assembler = + OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; + let assembled = assembler.assemble().await?; + let mut file = assembled + .tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was missing when packing image"))?; + file.push("image.pack"); + let target = file.clone(); + let packer = format.backend().create(); + packer + .pack(progress, assembled.vfs.clone(), &target) + .await?; + + let packed = OciImagePacked::new( + assembled.digest.clone(), + file, + format, + assembled.config.clone(), + assembled.manifest.clone(), + ); + let packed = self.cache.store(packed).await?; + Ok(packed) + } +} diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index 905a12f..43bdaea 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,24 +1,40 @@ +use std::sync::Arc; + use indexmap::IndexMap; -use tokio::sync::broadcast::Sender; +use tokio::sync::{mpsc::Sender, Mutex}; #[derive(Clone, Debug)] pub struct OciProgress { - pub id: String, pub phase: OciProgressPhase, pub layers: IndexMap, pub value: u64, pub total: u64, } +impl Default for OciProgress { + fn default() -> Self { + Self::new() + } +} + impl OciProgress { - pub fn add_layer(&mut self, id: &str) { + pub fn new() -> Self { + OciProgress { + phase: OciProgressPhase::Resolving, + layers: IndexMap::new(), + value: 0, + total: 1, + } + } + + pub fn add_layer(&mut self, id: &str, size: usize) { self.layers.insert( id.to_string(), OciProgressLayer { id: id.to_string(), phase: OciProgressLayerPhase::Waiting, value: 0, - total: 0, + total: size as u64, }, ); } @@ -92,6 +108,33 @@ impl OciProgressContext { } pub fn update(&self, progress: &OciProgress) { - let _ = self.sender.send(progress.clone()); + let _ = self.sender.try_send(progress.clone()); + } +} + +#[derive(Clone)] +pub struct OciBoundProgress { + context: OciProgressContext, + instance: Arc>, +} + +impl OciBoundProgress { + pub fn new(context: OciProgressContext, progress: OciProgress) -> OciBoundProgress { + OciBoundProgress { + context, + instance: Arc::new(Mutex::new(progress)), + } + } + + pub async fn update(&self, function: impl FnOnce(&mut OciProgress)) { + let mut progress = self.instance.lock().await; + function(&mut progress); + self.context.update(&progress); + } + + pub fn update_blocking(&self, function: impl FnOnce(&mut OciProgress)) { + let mut progress = self.instance.blocking_lock(); + function(&mut progress); + self.context.update(&progress); } } diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index d3ecc34..1b93e51 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -7,28 +7,28 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode}; use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; -use crate::progress::{OciProgress, OciProgressContext}; +use crate::progress::OciBoundProgress; #[derive(Clone, Debug)] -pub struct OciRegistryPlatform { +pub struct OciPlatform { pub os: Os, pub arch: Arch, } -impl OciRegistryPlatform { +impl OciPlatform { #[cfg(target_arch = "x86_64")] const CURRENT_ARCH: Arch = Arch::Amd64; #[cfg(target_arch = "aarch64")] const CURRENT_ARCH: Arch = Arch::ARM64; - pub fn new(os: Os, arch: Arch) -> OciRegistryPlatform { - OciRegistryPlatform { os, arch } + pub fn new(os: Os, arch: Arch) -> OciPlatform { + OciPlatform { os, arch } } - pub fn current() -> OciRegistryPlatform { - OciRegistryPlatform { + pub fn current() -> OciPlatform { + OciPlatform { os: Os::Linux, - arch: OciRegistryPlatform::CURRENT_ARCH, + arch: OciPlatform::CURRENT_ARCH, } } } @@ -36,12 +36,12 @@ impl OciRegistryPlatform { pub struct OciRegistryClient { agent: Client, url: Url, - platform: OciRegistryPlatform, + platform: OciPlatform, token: Option, } impl OciRegistryClient { - pub fn new(url: Url, platform: OciRegistryPlatform) -> Result { + pub fn new(url: Url, platform: OciPlatform) -> Result { Ok(OciRegistryClient { agent: Client::new(), url, @@ -140,8 +140,7 @@ impl OciRegistryClient { name: N, descriptor: &Descriptor, mut dest: File, - mut progress_handle: Option<&mut OciProgress>, - progress_context: Option<&OciProgressContext>, + progress: Option, ) -> Result { let url = self.url.join(&format!( "/v2/{}/blobs/{}", @@ -157,15 +156,16 @@ impl OciRegistryClient { if (size - last_progress_size) > (5 * 1024 * 1024) { last_progress_size = size; - if let Some(progress_handle) = progress_handle.as_mut() { - progress_handle.downloading_layer( - descriptor.digest(), - size as usize, - descriptor.size() as usize, - ); - if let Some(progress_context) = progress_context.as_ref() { - progress_context.update(progress_handle); - } + if let Some(ref progress) = progress { + progress + .update(|progress| { + progress.downloading_layer( + descriptor.digest(), + size as usize, + descriptor.size() as usize, + ); + }) + .await; } } } diff --git a/crates/oci/src/vfs.rs b/crates/oci/src/vfs.rs new file mode 100644 index 0000000..51c3e0c --- /dev/null +++ b/crates/oci/src/vfs.rs @@ -0,0 +1,261 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Result}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, +}; +use tokio_tar::{Builder, Entry, EntryType, Header}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum VfsNodeType { + Directory, + RegularFile, + Symlink, + Hardlink, + Fifo, + CharDevice, + BlockDevice, +} + +#[derive(Clone, Debug)] +pub struct VfsNode { + pub name: String, + pub size: u64, + pub children: Vec, + pub typ: VfsNodeType, + pub uid: u64, + pub gid: u64, + pub link_name: Option, + pub mode: u32, + pub mtime: u64, + pub dev_major: Option, + pub dev_minor: Option, + pub disk_path: Option, +} + +impl VfsNode { + pub fn from(entry: &Entry) -> Result { + let header = entry.header(); + let name = entry + .path()? + .file_name() + .ok_or(anyhow!("unable to get file name for entry"))? + .to_string_lossy() + .to_string(); + let typ = header.entry_type(); + let vtype = if typ.is_symlink() { + VfsNodeType::Symlink + } else if typ.is_hard_link() { + VfsNodeType::Hardlink + } else if typ.is_dir() { + VfsNodeType::Directory + } else if typ.is_fifo() { + VfsNodeType::Fifo + } else if typ.is_block_special() { + VfsNodeType::BlockDevice + } else if typ.is_character_special() { + VfsNodeType::CharDevice + } else if typ.is_file() { + VfsNodeType::RegularFile + } else { + return Err(anyhow!("unable to determine vfs type for entry")); + }; + + Ok(VfsNode { + name, + size: header.size()?, + children: vec![], + typ: vtype, + uid: header.uid()?, + gid: header.gid()?, + link_name: header.link_name()?.map(|x| x.to_string_lossy().to_string()), + mode: header.mode()?, + mtime: header.mtime()?, + dev_major: header.device_major()?, + dev_minor: header.device_minor()?, + disk_path: None, + }) + } + + pub fn lookup(&self, path: &Path) -> Option<&VfsNode> { + let mut node = self; + for part in path { + node = node + .children + .iter() + .find(|child| child.name == part.to_string_lossy())?; + } + Some(node) + } + + pub fn lookup_mut(&mut self, path: &Path) -> Option<&mut VfsNode> { + let mut node = self; + for part in path { + node = node + .children + .iter_mut() + .find(|child| child.name == part.to_string_lossy())?; + } + Some(node) + } + + pub fn remove(&mut self, path: &Path) -> Option<(&mut VfsNode, VfsNode)> { + let parent = path.parent()?; + let node = self.lookup_mut(parent)?; + let file_name = path.file_name()?; + let file_name = file_name.to_string_lossy(); + let position = node + .children + .iter() + .position(|child| file_name == child.name)?; + let removed = node.children.remove(position); + Some((node, removed)) + } + + pub fn create_tar_header(&self) -> Result
{ + let mut header = Header::new_ustar(); + header.set_entry_type(match self.typ { + VfsNodeType::Directory => EntryType::Directory, + VfsNodeType::CharDevice => EntryType::Char, + VfsNodeType::BlockDevice => EntryType::Block, + VfsNodeType::Fifo => EntryType::Fifo, + VfsNodeType::Hardlink => EntryType::Link, + VfsNodeType::Symlink => EntryType::Symlink, + VfsNodeType::RegularFile => EntryType::Regular, + }); + header.set_uid(self.uid); + header.set_gid(self.gid); + + if let Some(device_major) = self.dev_major { + header.set_device_major(device_major)?; + } + + if let Some(device_minor) = self.dev_minor { + header.set_device_minor(device_minor)?; + } + header.set_mtime(self.mtime); + header.set_mode(self.mode); + + if let Some(link_name) = self.link_name.as_ref() { + header.set_link_name(&PathBuf::from(link_name))?; + } + header.set_size(self.size); + Ok(header) + } + + pub async fn write_to_tar( + &self, + path: &Path, + builder: &mut Builder, + ) -> Result<()> { + let mut header = self.create_tar_header()?; + header.set_path(path)?; + header.set_cksum(); + if let Some(disk_path) = self.disk_path.as_ref() { + builder + .append(&header, File::open(disk_path).await?) + .await?; + } else { + builder.append(&header, &[] as &[u8]).await?; + } + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct VfsTree { + pub root: VfsNode, +} + +impl Default for VfsTree { + fn default() -> Self { + Self::new() + } +} + +impl VfsTree { + pub fn new() -> VfsTree { + VfsTree { + root: VfsNode { + name: "".to_string(), + size: 0, + children: vec![], + typ: VfsNodeType::Directory, + uid: 0, + gid: 0, + link_name: None, + mode: 0, + mtime: 0, + dev_major: None, + dev_minor: None, + disk_path: None, + }, + } + } + + pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<()> { + let mut meta = VfsNode::from(entry)?; + let path = entry.path()?.to_path_buf(); + let parent = if let Some(parent) = path.parent() { + self.root.lookup_mut(parent) + } else { + Some(&mut self.root) + }; + + let Some(parent) = parent else { + return Err(anyhow!("unable to find parent of entry")); + }; + + let position = parent + .children + .iter() + .position(|child| meta.name == child.name); + + if let Some(position) = position { + let old = parent.children.remove(position); + if meta.typ == VfsNodeType::Directory { + meta.children = old.children; + } + } + parent.children.push(meta); + Ok(()) + } + + pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> { + let Some(node) = self.root.lookup_mut(path) else { + return Err(anyhow!( + "unable to find node {:?} to set disk path to", + path + )); + }; + node.disk_path = Some(disk_path.to_path_buf()); + Ok(()) + } + + pub async fn write_to_tar( + &self, + write: W, + ) -> Result<()> { + let mut builder = Builder::new(write); + let mut queue = vec![(PathBuf::from(""), &self.root)]; + + while !queue.is_empty() { + let (mut path, node) = queue.remove(0); + if !node.name.is_empty() { + path.push(&node.name); + } + if path.components().count() != 0 { + node.write_to_tar(&path, &mut builder).await?; + } + for child in &node.children { + queue.push((path.clone(), child)); + } + } + + let mut write = builder.into_inner().await?; + write.flush().await?; + drop(write); + Ok(()) + } +} diff --git a/crates/runtime/src/cfgblk.rs b/crates/runtime/src/cfgblk.rs index 044a16d..0ae491f 100644 --- a/crates/runtime/src/cfgblk.rs +++ b/crates/runtime/src/cfgblk.rs @@ -1,7 +1,7 @@ use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; use krata::launchcfg::LaunchInfo; -use krataoci::compiler::ImageInfo; +use krataoci::packer::OciImagePacked; use log::trace; use std::fs; use std::fs::File; @@ -9,28 +9,24 @@ use std::path::PathBuf; use uuid::Uuid; pub struct ConfigBlock<'a> { - pub image_info: &'a ImageInfo, + pub image: &'a OciImagePacked, pub file: PathBuf, pub dir: PathBuf, } impl ConfigBlock<'_> { - pub fn new<'a>(uuid: &Uuid, image_info: &'a ImageInfo) -> Result> { + pub fn new<'a>(uuid: &Uuid, image: &'a OciImagePacked) -> Result> { let mut dir = std::env::temp_dir().clone(); dir.push(format!("krata-cfg-{}", uuid)); fs::create_dir_all(&dir)?; let mut file = dir.clone(); file.push("config.squashfs"); - Ok(ConfigBlock { - image_info, - file, - dir, - }) + Ok(ConfigBlock { image, file, dir }) } pub fn build(&self, launch_config: &LaunchInfo) -> Result<()> { trace!("build launch_config={:?}", launch_config); - let manifest = self.image_info.config.to_string()?; + let manifest = self.image.config.to_string()?; let launch = serde_json::to_string(launch_config)?; let mut writer = FilesystemWriter::default(); writer.push_dir( diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index b07a6c7..b0ca22d 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,8 +10,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, }; -use krataoci::packer::OciPackerFormat; -use krataoci::progress::OciProgressContext; +use krataoci::packer::OciImagePacked; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -19,24 +18,19 @@ use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; use crate::RuntimeContext; -use krataoci::{ - cache::ImageCache, - compiler::{ImageInfo, OciImageCompiler}, - name::ImageName, -}; use super::{GuestInfo, GuestState}; -pub struct GuestLaunchRequest<'a> { +pub struct GuestLaunchRequest { pub format: LaunchPackedFormat, pub uuid: Option, - pub name: Option<&'a str>, - pub image: &'a str, + pub name: Option, pub vcpus: u32, pub mem: u64, pub env: HashMap, pub run: Option>, pub debug: bool, + pub image: OciImagePacked, } pub struct GuestLauncher { @@ -48,26 +42,13 @@ impl GuestLauncher { Ok(Self { launch_semaphore }) } - pub async fn launch<'r>( + pub async fn launch( &mut self, context: &RuntimeContext, - request: GuestLaunchRequest<'r>, + request: GuestLaunchRequest, ) -> Result { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let xen_name = format!("krata-{uuid}"); - let image_info = self - .compile( - &uuid.to_string(), - request.image, - &context.image_cache, - &context.oci_progress_context, - match request.format { - LaunchPackedFormat::Squashfs => OciPackerFormat::Squashfs, - LaunchPackedFormat::Erofs => OciPackerFormat::Erofs, - }, - ) - .await?; - let mut gateway_mac = MacAddr6::random(); gateway_mac.set_local(true); gateway_mac.set_multicast(false); @@ -90,6 +71,7 @@ impl GuestLauncher { hostname: Some( request .name + .as_ref() .map(|x| x.to_string()) .unwrap_or_else(|| format!("krata-{}", uuid)), ), @@ -116,11 +98,12 @@ impl GuestLauncher { run: request.run, }; - let cfgblk = ConfigBlock::new(&uuid, &image_info)?; + let cfgblk = ConfigBlock::new(&uuid, &request.image)?; cfgblk.build(&launch_config)?; - let image_squashfs_path = image_info + let image_squashfs_path = request .image + .path .to_str() .ok_or_else(|| anyhow!("failed to convert image path to string"))?; @@ -158,7 +141,6 @@ impl GuestLauncher { cfgblk_dir_path, ), ), - ("krata/image".to_string(), request.image.to_string()), ( "krata/network/guest/ipv4".to_string(), format!("{}/{}", guest_ipv4, ipv4_network_mask), @@ -185,8 +167,8 @@ impl GuestLauncher { ), ]; - if let Some(name) = request.name { - extra_keys.push(("krata/name".to_string(), name.to_string())); + if let Some(name) = request.name.as_ref() { + extra_keys.push(("krata/name".to_string(), name.clone())); } let config = DomainConfig { @@ -227,10 +209,10 @@ impl GuestLauncher { }; match context.xen.create(&config).await { Ok(created) => Ok(GuestInfo { - name: request.name.map(|x| x.to_string()), + name: request.name.as_ref().map(|x| x.to_string()), uuid, domid: created.domid, - image: request.image.to_string(), + image: request.image.digest, loops: vec![], guest_ipv4: Some(IpNetwork::new( IpAddr::V4(guest_ipv4), @@ -261,19 +243,6 @@ impl GuestLauncher { } } - async fn compile( - &self, - id: &str, - image: &str, - image_cache: &ImageCache, - progress: &OciProgressContext, - format: OciPackerFormat, - ) -> Result { - let image = ImageName::parse(image)?; - let compiler = OciImageCompiler::new(image_cache, None, progress.clone())?; - compiler.compile(id, &image, format).await - } - async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result { let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; let mut used: Vec = vec![]; diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index e90402c..3d0f3d3 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -17,7 +17,6 @@ use self::{ autoloop::AutoLoop, launch::{GuestLaunchRequest, GuestLauncher}, }; -use krataoci::{cache::ImageCache, progress::OciProgressContext}; pub mod autoloop; pub mod cfgblk; @@ -51,8 +50,6 @@ pub struct GuestInfo { #[derive(Clone)] pub struct RuntimeContext { - pub oci_progress_context: OciProgressContext, - pub image_cache: ImageCache, pub autoloop: AutoLoop, pub xen: XenClient, pub kernel: String, @@ -60,7 +57,7 @@ pub struct RuntimeContext { } impl RuntimeContext { - pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { + pub async fn new(store: String) -> Result { let mut image_cache_path = PathBuf::from(&store); image_cache_path.push("cache"); fs::create_dir_all(&image_cache_path)?; @@ -68,13 +65,10 @@ impl RuntimeContext { let xen = XenClient::open(0).await?; image_cache_path.push("image"); fs::create_dir_all(&image_cache_path)?; - let image_cache = ImageCache::new(&image_cache_path)?; let kernel = RuntimeContext::detect_guest_file(&store, "kernel")?; let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?; Ok(RuntimeContext { - oci_progress_context, - image_cache, autoloop: AutoLoop::new(LoopControl::open()?), xen, kernel, @@ -254,24 +248,22 @@ impl RuntimeContext { #[derive(Clone)] pub struct Runtime { - oci_progress_context: OciProgressContext, store: Arc, context: RuntimeContext, launch_semaphore: Arc, } impl Runtime { - pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { - let context = RuntimeContext::new(oci_progress_context.clone(), store.clone()).await?; + pub async fn new(store: String) -> Result { + let context = RuntimeContext::new(store.clone()).await?; Ok(Self { - oci_progress_context, store: Arc::new(store), context, launch_semaphore: Arc::new(Semaphore::new(1)), }) } - pub async fn launch<'a>(&self, request: GuestLaunchRequest<'a>) -> Result { + pub async fn launch(&self, request: GuestLaunchRequest) -> Result { let mut launcher = GuestLauncher::new(self.launch_semaphore.clone())?; launcher.launch(&self.context, request).await } @@ -328,7 +320,7 @@ impl Runtime { } pub async fn dupe(&self) -> Result { - Runtime::new(self.oci_progress_context.clone(), (*self.store).clone()).await + Runtime::new((*self.store).clone()).await } }