From 6d07112e3dccd7c0845aa5205f14ada3ab4682cc Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Fri, 12 Apr 2024 11:09:26 -0700 Subject: [PATCH] feat: implement oci image progress (#64) * feat: oci progress events * feat: oci progress bars on launch --- Cargo.lock | 55 +++++++++++ Cargo.toml | 1 + crates/ctl/Cargo.toml | 1 + crates/ctl/src/cli/destroy.rs | 44 +++++---- crates/ctl/src/cli/launch.rs | 86 ++++++++++++++++- crates/ctl/src/cli/watch.rs | 9 +- crates/ctl/src/console.rs | 34 ++++--- crates/daemon/Cargo.toml | 1 + crates/daemon/bin/daemon.rs | 5 +- crates/daemon/src/event.rs | 99 ++++++++++++-------- crates/daemon/src/lib.rs | 24 ++++- crates/daemon/src/oci.rs | 42 +++++++++ crates/krata/proto/krata/v1/control.proto | 35 +++++++ crates/krata/src/v1/common.rs | 1 + crates/krata/src/v1/control.rs | 1 + crates/network/src/autonet.rs | 2 + crates/oci/examples/squashify.rs | 20 +++- crates/oci/src/compiler.rs | 107 ++++++++++++++++++---- crates/oci/src/fetch.rs | 35 ++++++- crates/oci/src/lib.rs | 1 + crates/oci/src/progress.rs | 98 ++++++++++++++++++++ crates/oci/src/registry.rs | 19 ++++ crates/runtime/Cargo.toml | 4 - crates/runtime/examples/squashify.rs | 29 ------ crates/runtime/src/launch.rs | 22 ++++- crates/runtime/src/lib.rs | 14 ++- 26 files changed, 630 insertions(+), 159 deletions(-) create mode 100644 crates/daemon/src/oci.rs create mode 100644 crates/oci/src/progress.rs delete mode 100644 crates/runtime/examples/squashify.rs diff --git a/Cargo.lock b/Cargo.lock index 93499ff..5069f7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,19 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys 0.52.0", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -693,6 +706,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4445909572dbd556c457c849c4ca58623d84b27c8fff1e74b0b4227d8b90d17b" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "env_filter" version = "0.1.0" @@ -1223,6 +1242,28 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "indicatif" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +dependencies = [ + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -1321,6 +1362,7 @@ dependencies = [ "env_logger", "fancy-duration", "human_bytes", + "indicatif", "krata", "log", "prost-reflect", @@ -1347,6 +1389,7 @@ dependencies = [ "env_logger", "futures", "krata", + "krata-oci", "krata-runtime", "log", "prost", @@ -1801,6 +1844,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.32.2" @@ -1945,6 +1994,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/Cargo.toml b/Cargo.toml index abb2bc4..15b23dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ fancy-duration = "0.9.2" flate2 = "1.0" futures = "0.3.30" human_bytes = "0.4" +indicatif = "0.17.8" ipnetwork = "0.20.0" libc = "0.2" log = "0.4.20" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index e93d255..fd7533d 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -18,6 +18,7 @@ ctrlc = { workspace = true, features = ["termination"] } env_logger = { workspace = true } fancy-duration = { workspace = true } human_bytes = { workspace = true } +indicatif = { workspace = true } krata = { path = "../krata", version = "^0.0.8" } log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } diff --git a/crates/ctl/src/cli/destroy.rs b/crates/ctl/src/cli/destroy.rs index 869b228..3652bf4 100644 --- a/crates/ctl/src/cli/destroy.rs +++ b/crates/ctl/src/cli/destroy.rs @@ -52,32 +52,30 @@ impl DestroyCommand { async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - match event { - Event::GuestChanged(changed) => { - let Some(guest) = changed.guest else { - continue; - }; + if let Event::GuestChanged(changed) = event { + let Some(guest) = changed.guest else { + continue; + }; - if guest.id != id { - continue; + if guest.id != id { + continue; + } + + let Some(state) = guest.state else { + continue; + }; + + if let Some(ref error) = state.error_info { + if state.status() == GuestStatus::Failed { + error!("destroy failed: {}", error.message); + std::process::exit(1); + } else { + error!("guest error: {}", error.message); } + } - let Some(state) = guest.state else { - continue; - }; - - if let Some(ref error) = state.error_info { - if state.status() == GuestStatus::Failed { - error!("destroy failed: {}", error.message); - std::process::exit(1); - } else { - error!("guest error: {}", error.message); - } - } - - if state.status() == GuestStatus::Destroyed { - std::process::exit(0); - } + if state.status() == GuestStatus::Destroyed { + std::process::exit(0); } } } diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index 672b00d..360a7eb 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use krata::{ events::EventStream, v1::{ @@ -11,7 +12,7 @@ use krata::{ }, control::{ control_service_client::ControlServiceClient, watch_events_reply::Event, - CreateGuestRequest, + CreateGuestRequest, OciProgressEventLayerPhase, OciProgressEventPhase, }, }, }; @@ -125,9 +126,14 @@ impl LauchCommand { async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); + let mut multi_progress: Option<(MultiProgress, HashMap)> = None; while let Ok(event) = stream.recv().await { match event { Event::GuestChanged(changed) => { + if let Some((multi_progress, _)) = multi_progress.as_mut() { + let _ = multi_progress.clear(); + } + let Some(guest) = changed.guest else { continue; }; @@ -158,6 +164,84 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { break; } } + + Event::OciProgress(oci) => { + if multi_progress.is_none() { + multi_progress = Some((MultiProgress::new(), HashMap::new())); + } + + let Some((multi_progress, progresses)) = multi_progress.as_mut() else { + continue; + }; + + match oci.phase() { + OciProgressEventPhase::Resolved + | OciProgressEventPhase::ConfigAcquire + | OciProgressEventPhase::LayerAcquire => { + if progresses.is_empty() && !oci.layers.is_empty() { + for layer in &oci.layers { + let bar = ProgressBar::new(layer.total); + bar.set_style( + ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}") + .unwrap(), + ); + progresses.insert(layer.id.clone(), bar.clone()); + multi_progress.add(bar); + } + } + + for layer in oci.layers { + let Some(progress) = progresses.get_mut(&layer.id) else { + continue; + }; + + let phase = match layer.phase() { + OciProgressEventLayerPhase::Waiting => "waiting", + OciProgressEventLayerPhase::Downloading => "downloading", + OciProgressEventLayerPhase::Downloaded => "downloaded", + OciProgressEventLayerPhase::Extracting => "extracting", + OciProgressEventLayerPhase::Extracted => "extracted", + _ => "unknown", + }; + + progress.set_message(format!("{} {}", layer.id, phase)); + progress.set_length(layer.total); + progress.set_position(layer.value); + } + } + + OciProgressEventPhase::Packing => { + for (key, progress) in &mut *progresses { + if key == "packing" { + continue; + } + progress.finish_and_clear(); + multi_progress.remove(progress); + } + progresses.retain(|k, _| k == "packing"); + if progresses.is_empty() { + let progress = ProgressBar::new(100); + progress.set_style( + ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}") + .unwrap(), + ); + progresses.insert("packing".to_string(), progress); + } + let Some(progress) = progresses.get("packing") else { + continue; + }; + progress.set_message("packing image"); + progress.set_length(oci.total); + progress.set_position(oci.value); + } + + _ => {} + } + + for progress in progresses { + progress.1.tick(); + } + } } } Ok(()) diff --git a/crates/ctl/src/cli/watch.rs b/crates/ctl/src/cli/watch.rs index 81c736b..3a723c9 100644 --- a/crates/ctl/src/cli/watch.rs +++ b/crates/ctl/src/cli/watch.rs @@ -28,11 +28,10 @@ impl WatchCommand { let mut stream = events.subscribe(); loop { let event = stream.recv().await?; - match event { - Event::GuestChanged(changed) => { - let guest = changed.guest.clone(); - self.print_event("guest.changed", changed, guest)?; - } + + if let Event::GuestChanged(changed) = event { + let guest = changed.guest.clone(); + self.print_event("guest.changed", changed, guest)?; } } } diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index ff46212..33136a1 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -69,28 +69,26 @@ impl StdioConsoleStream { Ok(tokio::task::spawn(async move { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { - match event { - Event::GuestChanged(changed) => { - let Some(guest) = changed.guest else { - continue; - }; + if let Event::GuestChanged(changed) = event { + let Some(guest) = changed.guest else { + continue; + }; - let Some(state) = guest.state else { - continue; - }; + let Some(state) = guest.state else { + continue; + }; - if guest.id != id { - continue; - } + if guest.id != id { + continue; + } - if let Some(exit_info) = state.exit_info { - return Some(exit_info.code); - } + if let Some(exit_info) = state.exit_info { + return Some(exit_info.code); + } - let status = state.status(); - if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { - return Some(10); - } + let status = state.status(); + if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { + return Some(10); } } } diff --git a/crates/daemon/Cargo.toml b/crates/daemon/Cargo.toml index 97974f6..5ad4c92 100644 --- a/crates/daemon/Cargo.toml +++ b/crates/daemon/Cargo.toml @@ -18,6 +18,7 @@ clap = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } krata = { path = "../krata", version = "^0.0.8" } +krata-oci = { path = "../oci", version = "^0.0.8" } krata-runtime = { path = "../runtime", version = "^0.0.8" } log = { workspace = true } prost = { workspace = true } diff --git a/crates/daemon/bin/daemon.rs b/crates/daemon/bin/daemon.rs index 0d6956f..9a1d984 100644 --- a/crates/daemon/bin/daemon.rs +++ b/crates/daemon/bin/daemon.rs @@ -3,7 +3,6 @@ use clap::Parser; use env_logger::Env; use krata::dial::ControlDialAddress; use kratad::Daemon; -use kratart::Runtime; use log::LevelFilter; use std::{ str::FromStr, @@ -27,8 +26,8 @@ async fn main() -> Result<()> { let args = DaemonCommand::parse(); let addr = ControlDialAddress::from_str(&args.listen)?; - let runtime = Runtime::new(args.store.clone()).await?; - let mut daemon = Daemon::new(args.store.clone(), runtime).await?; + + let mut daemon = Daemon::new(args.store.clone()).await?; daemon.listen(addr).await?; Ok(()) } diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 55d9872..82f9d7d 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -9,6 +9,7 @@ use krata::{ idm::protocol::{idm_event::Event, IdmEvent}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; +use krataoci::progress::OciProgress; use log::{error, warn}; use tokio::{ select, @@ -21,7 +22,7 @@ use tokio::{ }; use uuid::Uuid; -use crate::{db::GuestStore, idm::DaemonIdmHandle}; +use crate::{db::GuestStore, idm::DaemonIdmHandle, oci::convert_oci_progress}; pub type DaemonEvent = krata::v1::control::watch_events_reply::Event; @@ -52,7 +53,8 @@ pub struct DaemonEventGenerator { idms: HashMap)>, idm_sender: Sender<(u32, IdmEvent)>, idm_receiver: Receiver<(u32, IdmEvent)>, - _event_sender: broadcast::Sender, + oci_progress_receiver: broadcast::Receiver, + event_sender: broadcast::Sender, } impl DaemonEventGenerator { @@ -60,6 +62,7 @@ impl DaemonEventGenerator { guests: GuestStore, guest_reconciler_notify: Sender, idm: DaemonIdmHandle, + oci_progress_receiver: broadcast::Receiver, ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN); @@ -71,55 +74,54 @@ impl DaemonEventGenerator { idms: HashMap::new(), idm_sender, idm_receiver, - _event_sender: sender.clone(), + oci_progress_receiver, + event_sender: sender.clone(), }; let context = DaemonEventContext { sender }; Ok((context, generator)) } async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> { - match event { - DaemonEvent::GuestChanged(changed) => { - let Some(ref guest) = changed.guest else { - return Ok(()); - }; + if let DaemonEvent::GuestChanged(changed) = event { + let Some(ref guest) = changed.guest else { + return Ok(()); + }; - let Some(ref state) = guest.state else { - return Ok(()); - }; + let Some(ref state) = guest.state else { + return Ok(()); + }; - let status = state.status(); - let id = Uuid::from_str(&guest.id)?; - let domid = state.domid; - match status { - GuestStatus::Started => { - if let Entry::Vacant(e) = self.idms.entry(domid) { - let client = self.idm.client(domid).await?; - let mut receiver = client.subscribe().await?; - let sender = self.idm_sender.clone(); - let task = tokio::task::spawn(async move { - loop { - let Ok(event) = receiver.recv().await else { - break; - }; + let status = state.status(); + let id = Uuid::from_str(&guest.id)?; + let domid = state.domid; + match status { + GuestStatus::Started => { + if let Entry::Vacant(e) = self.idms.entry(domid) { + let client = self.idm.client(domid).await?; + let mut receiver = client.subscribe().await?; + let sender = self.idm_sender.clone(); + let task = tokio::task::spawn(async move { + loop { + let Ok(event) = receiver.recv().await else { + break; + }; - if let Err(error) = sender.send((domid, event)).await { - warn!("unable to deliver idm event: {}", error); - } + if let Err(error) = sender.send((domid, event)).await { + warn!("unable to deliver idm event: {}", error); } - }); - e.insert((id, task)); - } + } + }); + e.insert((id, task)); } - - GuestStatus::Destroyed => { - if let Some((_, handle)) = self.idms.remove(&domid) { - handle.abort(); - } - } - - _ => {} } + + GuestStatus::Destroyed => { + if let Some((_, handle)) = self.idms.remove(&domid) { + handle.abort(); + } + } + + _ => {} } } Ok(()) @@ -148,6 +150,17 @@ impl DaemonEventGenerator { Ok(()) } + async fn handle_oci_progress_event(&mut self, progress: OciProgress) -> Result<()> { + let Some(_) = Uuid::from_str(&progress.id).ok() else { + return Ok(()); + }; + + let event = convert_oci_progress(progress); + self.event_sender.send(DaemonEvent::OciProgress(event))?; + + Ok(()) + } + async fn evaluate(&mut self) -> Result<()> { select! { x = self.idm_receiver.recv() => match x { @@ -168,6 +181,14 @@ impl DaemonEventGenerator { Err(error) => { Err(error.into()) } + }, + x = self.oci_progress_receiver.recv() => match x { + Ok(event) => { + self.handle_oci_progress_event(event).await + }, + Err(error) => { + Err(error.into()) + } } } } diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 1240121..a92a9ca 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -7,12 +7,16 @@ use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; use idm::{DaemonIdm, DaemonIdmHandle}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; +use krataoci::progress::OciProgressContext; use kratart::Runtime; use log::info; use reconcile::guest::GuestReconciler; use tokio::{ net::UnixListener, - sync::mpsc::{channel, Sender}, + sync::{ + broadcast, + mpsc::{channel, Sender}, + }, task::JoinHandle, }; use tokio_stream::wrappers::UnixListenerStream; @@ -25,6 +29,7 @@ pub mod db; pub mod event; pub mod idm; pub mod metrics; +pub mod oci; pub mod reconcile; pub struct Daemon { @@ -39,9 +44,14 @@ pub struct Daemon { } const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; +const OCI_PROGRESS_QUEUE_LEN: usize = 1000; impl Daemon { - pub async fn new(store: String, runtime: Runtime) -> Result { + pub async fn new(store: String) -> Result { + let (oci_progress_sender, oci_progress_receiver) = + broadcast::channel(OCI_PROGRESS_QUEUE_LEN); + let runtime = + Runtime::new(OciProgressContext::new(oci_progress_sender), store.clone()).await?; let guests_db_path = format!("{}/guests.db", store); let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let (guest_reconciler_notify, guest_reconciler_receiver) = @@ -50,9 +60,13 @@ impl Daemon { let idm = idm.launch().await?; let console = DaemonConsole::new().await?; let console = console.launch().await?; - let (events, generator) = - DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) - .await?; + let (events, generator) = DaemonEventGenerator::new( + guests.clone(), + guest_reconciler_notify.clone(), + idm.clone(), + oci_progress_receiver, + ) + .await?; let runtime_for_reconciler = runtime.dupe().await?; let guest_reconciler = GuestReconciler::new( guests.clone(), diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs new file mode 100644 index 0000000..8544728 --- /dev/null +++ b/crates/daemon/src/oci.rs @@ -0,0 +1,42 @@ +use krata::v1::control::{ + OciProgressEvent, OciProgressEventLayer, OciProgressEventLayerPhase, OciProgressEventPhase, +}; +use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase}; + +fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer { + OciProgressEventLayer { + id: layer.id, + phase: match layer.phase { + OciProgressLayerPhase::Waiting => OciProgressEventLayerPhase::Waiting, + OciProgressLayerPhase::Downloading => OciProgressEventLayerPhase::Downloading, + OciProgressLayerPhase::Downloaded => OciProgressEventLayerPhase::Downloaded, + OciProgressLayerPhase::Extracting => OciProgressEventLayerPhase::Extracting, + OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted, + } + .into(), + value: layer.value, + total: layer.total, + } +} + +pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent { + OciProgressEvent { + guest_id: oci.id, + phase: match oci.phase { + OciProgressPhase::Resolving => OciProgressEventPhase::Resolving, + OciProgressPhase::Resolved => OciProgressEventPhase::Resolved, + OciProgressPhase::ConfigAcquire => OciProgressEventPhase::ConfigAcquire, + OciProgressPhase::LayerAcquire => OciProgressEventPhase::LayerAcquire, + OciProgressPhase::Packing => OciProgressEventPhase::Packing, + OciProgressPhase::Complete => OciProgressEventPhase::Complete, + } + .into(), + layers: oci + .layers + .into_values() + .map(convert_oci_layer_progress) + .collect::>(), + value: oci.value, + total: oci.total, + } +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 8ebd193..1dfaf0f 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -61,6 +61,7 @@ message WatchEventsRequest {} message WatchEventsReply { oneof event { GuestChangedEvent guest_changed = 1; + OciProgressEvent oci_progress = 2; } } @@ -68,6 +69,40 @@ message GuestChangedEvent { krata.v1.common.Guest guest = 1; } +enum OciProgressEventLayerPhase { + OCI_PROGRESS_EVENT_LAYER_PHASE_UNKNOWN = 0; + OCI_PROGRESS_EVENT_LAYER_PHASE_WAITING = 1; + OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADING = 2; + OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADED = 3; + OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTING = 4; + OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTED = 5; +} + +message OciProgressEventLayer { + string id = 1; + OciProgressEventLayerPhase phase = 2; + uint64 value = 3; + uint64 total = 4; +} + +enum OciProgressEventPhase { + OCI_PROGRESS_EVENT_PHASE_UNKNOWN = 0; + OCI_PROGRESS_EVENT_PHASE_RESOLVING = 1; + OCI_PROGRESS_EVENT_PHASE_RESOLVED = 2; + OCI_PROGRESS_EVENT_PHASE_CONFIG_ACQUIRE = 3; + OCI_PROGRESS_EVENT_PHASE_LAYER_ACQUIRE = 4; + OCI_PROGRESS_EVENT_PHASE_PACKING = 5; + OCI_PROGRESS_EVENT_PHASE_COMPLETE = 6; +} + +message OciProgressEvent { + string guest_id = 1; + OciProgressEventPhase phase = 2; + repeated OciProgressEventLayer layers = 3; + uint64 value = 4; + uint64 total = 5; +} + message ReadGuestMetricsRequest { string guest_id = 1; } diff --git a/crates/krata/src/v1/common.rs b/crates/krata/src/v1/common.rs index 3a9944f..e9484ad 100644 --- a/crates/krata/src/v1/common.rs +++ b/crates/krata/src/v1/common.rs @@ -1 +1,2 @@ +#![allow(clippy::all)] tonic::include_proto!("krata.v1.common"); diff --git a/crates/krata/src/v1/control.rs b/crates/krata/src/v1/control.rs index bfe405a..56b67f7 100644 --- a/crates/krata/src/v1/control.rs +++ b/crates/krata/src/v1/control.rs @@ -1 +1,2 @@ +#![allow(clippy::all)] tonic::include_proto!("krata.v1.control"); diff --git a/crates/network/src/autonet.rs b/crates/network/src/autonet.rs index 9b27d62..6d46160 100644 --- a/crates/network/src/autonet.rs +++ b/crates/network/src/autonet.rs @@ -179,6 +179,8 @@ impl AutoNetworkWatcher { break; }, + Ok(_) => {}, + Err(error) => { warn!("failed to receive event: {}", error); } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index d735874..8ad0f5e 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -2,8 +2,10 @@ use std::{env::args, path::PathBuf}; use anyhow::Result; use env_logger::Env; -use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName}; -use tokio::fs; +use krataoci::{ + cache::ImageCache, compiler::ImageCompiler, name::ImageName, progress::OciProgressContext, +}; +use tokio::{fs, sync::broadcast}; #[tokio::main] async fn main() -> Result<()> { @@ -18,8 +20,18 @@ async fn main() -> Result<()> { } let cache = ImageCache::new(&cache_dir)?; - let compiler = ImageCompiler::new(&cache, seed)?; - let info = compiler.compile(&image).await?; + + let (sender, mut receiver) = broadcast::channel(1000); + tokio::task::spawn(async move { + loop { + let Some(_) = receiver.recv().await.ok() else { + break; + }; + } + }); + let context = OciProgressContext::new(sender); + let compiler = ImageCompiler::new(&cache, seed, context)?; + let info = compiler.compile(&image.to_string(), &image).await?; println!( "generated squashfs of {} to {}", image, diff --git a/crates/oci/src/compiler.rs b/crates/oci/src/compiler.rs index 58aa4ee..5d3a406 100644 --- a/crates/oci/src/compiler.rs +++ b/crates/oci/src/compiler.rs @@ -1,6 +1,7 @@ use crate::cache::ImageCache; use crate::fetch::{OciImageDownloader, OciImageLayer}; use crate::name::ImageName; +use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; use crate::registry::OciRegistryPlatform; use anyhow::{anyhow, Result}; use backhand::compression::Compressor; @@ -8,6 +9,7 @@ use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader}; use log::{debug, trace, warn}; use oci_spec::image::{ImageConfiguration, ImageManifest}; use std::borrow::Cow; +use std::collections::BTreeMap; use std::fs::File; use std::io::{BufWriter, ErrorKind, Read}; use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; @@ -45,14 +47,23 @@ impl ImageInfo { pub struct ImageCompiler<'a> { cache: &'a ImageCache, seed: Option, + progress: OciProgressContext, } impl ImageCompiler<'_> { - pub fn new(cache: &ImageCache, seed: Option) -> Result { - Ok(ImageCompiler { cache, seed }) + pub fn new( + cache: &ImageCache, + seed: Option, + progress: OciProgressContext, + ) -> Result { + Ok(ImageCompiler { + cache, + seed, + progress, + }) } - pub async fn compile(&self, image: &ImageName) -> Result { + pub async fn compile(&self, id: &str, image: &ImageName) -> Result { debug!("compile image={image}"); let mut tmp_dir = std::env::temp_dir().clone(); tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); @@ -68,7 +79,7 @@ impl ImageCompiler<'_> { let mut squash_file = tmp_dir.clone(); squash_file.push("image.squashfs"); let info = self - .download_and_compile(image, &layer_dir, &image_dir, &squash_file) + .download_and_compile(id, image, &layer_dir, &image_dir, &squash_file) .await?; fs::remove_dir_all(&tmp_dir).await?; Ok(info) @@ -76,15 +87,25 @@ impl ImageCompiler<'_> { async fn download_and_compile( &self, + id: &str, image: &ImageName, layer_dir: &Path, image_dir: &Path, squash_file: &Path, ) -> Result { + let mut progress = OciProgress { + id: id.to_string(), + phase: OciProgressPhase::Resolving, + layers: BTreeMap::new(), + value: 0, + total: 0, + }; + self.progress.update(&progress); let downloader = OciImageDownloader::new( self.seed.clone(), layer_dir.to_path_buf(), OciRegistryPlatform::current(), + self.progress.clone(), ); let resolved = downloader.resolve(image.clone()).await?; let cache_key = format!( @@ -93,28 +114,44 @@ impl ImageCompiler<'_> { ); let cache_digest = sha256::digest(cache_key); + progress.phase = OciProgressPhase::Complete; + self.progress.update(&progress); if let Some(cached) = self.cache.recall(&cache_digest).await? { return Ok(cached); } - let local = downloader.download(resolved).await?; + progress.phase = OciProgressPhase::Resolved; + for layer in resolved.manifest.layers() { + progress.add_layer(layer.digest()); + } + self.progress.update(&progress); + + let local = downloader.download(resolved, &mut progress).await?; for layer in &local.layers { debug!( "process layer digest={} compression={:?}", &layer.digest, layer.compression, ); - let whiteouts = self.process_layer_whiteout(layer, image_dir).await?; + progress.extracting_layer(&layer.digest, 0, 0); + self.progress.update(&progress); + let (whiteouts, count) = self.process_layer_whiteout(layer, image_dir).await?; + progress.extracting_layer(&layer.digest, 0, count); + self.progress.update(&progress); debug!( "process layer digest={} whiteouts={:?}", &layer.digest, whiteouts ); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; + let mut completed = 0; while let Some(entry) = entries.next().await { let mut entry = entry?; let path = entry.path()?; let mut maybe_whiteout_path_str = path.to_str().map(|x| x.to_string()).unwrap_or_default(); + progress.extracting_layer(&layer.digest, completed, count); + completed += 1; + self.progress.update(&progress); if whiteouts.contains(&maybe_whiteout_path_str) { continue; } @@ -123,10 +160,10 @@ impl ImageCompiler<'_> { continue; } let Some(name) = path.file_name() else { - return Err(anyhow!("unable to get file name")); + continue; }; let Some(name) = name.to_str() else { - return Err(anyhow!("unable to get file name as string")); + continue; }; if name.starts_with(".wh.") { @@ -136,6 +173,8 @@ impl ImageCompiler<'_> { .await?; } } + progress.extracted_layer(&layer.digest); + self.progress.update(&progress); } for layer in &local.layers { @@ -144,31 +183,51 @@ impl ImageCompiler<'_> { } } - self.squash(image_dir, squash_file)?; + let image_dir_squash = image_dir.to_path_buf(); + let squash_file_squash = squash_file.to_path_buf(); + let progress_squash = progress.clone(); + let progress_context = self.progress.clone(); + progress = tokio::task::spawn_blocking(move || { + ImageCompiler::squash( + &image_dir_squash, + &squash_file_squash, + progress_squash, + progress_context, + ) + }) + .await??; + let info = ImageInfo::new( squash_file.to_path_buf(), local.image.manifest, local.config, )?; - self.cache.store(&cache_digest, &info).await + let info = self.cache.store(&cache_digest, &info).await?; + progress.phase = OciProgressPhase::Complete; + progress.value = 0; + progress.total = 0; + self.progress.update(&progress); + Ok(info) } async fn process_layer_whiteout( &self, layer: &OciImageLayer, image_dir: &Path, - ) -> Result> { + ) -> Result<(Vec, usize)> { let mut whiteouts = Vec::new(); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; + let mut count = 0usize; while let Some(entry) = entries.next().await { let entry = entry?; + count += 1; let path = entry.path()?; let Some(name) = path.file_name() else { - return Err(anyhow!("unable to get file name")); + continue; }; let Some(name) = name.to_str() else { - return Err(anyhow!("unable to get file name as string")); + continue; }; if name.starts_with(".wh.") { @@ -180,7 +239,7 @@ impl ImageCompiler<'_> { } } } - Ok(whiteouts) + Ok((whiteouts, count)) } async fn process_whiteout_entry( @@ -300,7 +359,16 @@ impl ImageCompiler<'_> { Ok(()) } - fn squash(&self, image_dir: &Path, squash_file: &Path) -> Result<()> { + fn squash( + image_dir: &Path, + squash_file: &Path, + mut progress: OciProgress, + progress_context: OciProgressContext, + ) -> Result { + progress.phase = OciProgressPhase::Packing; + progress.total = 2; + progress.value = 0; + progress_context.update(&progress); let mut writer = FilesystemWriter::default(); writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); let walk = WalkDir::new(image_dir).follow_links(false); @@ -358,6 +426,10 @@ impl ImageCompiler<'_> { } } + progress.phase = OciProgressPhase::Packing; + progress.value = 1; + progress_context.update(&progress); + let squash_file_path = squash_file .to_str() .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; @@ -367,7 +439,10 @@ impl ImageCompiler<'_> { trace!("squash generate: {}", squash_file_path); writer.write(&mut bufwrite)?; std::fs::remove_dir_all(image_dir)?; - Ok(()) + progress.phase = OciProgressPhase::Packing; + progress.value = 2; + progress_context.update(&progress); + Ok(progress) } } diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index 5c5413d..deccfe0 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -1,3 +1,5 @@ +use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; + use super::{ name::ImageName, registry::{OciRegistryClient, OciRegistryPlatform}, @@ -26,6 +28,7 @@ pub struct OciImageDownloader { seed: Option, storage: PathBuf, platform: OciRegistryPlatform, + progress: OciProgressContext, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -79,11 +82,13 @@ impl OciImageDownloader { seed: Option, storage: PathBuf, platform: OciRegistryPlatform, + progress: OciProgressContext, ) -> OciImageDownloader { OciImageDownloader { seed, storage, platform, + progress, } } @@ -208,9 +213,15 @@ impl OciImageDownloader { }) } - pub async fn download(&self, image: OciResolvedImage) -> Result { + pub async fn download( + &self, + image: OciResolvedImage, + progress: &mut OciProgress, + ) -> Result { let config: ImageConfiguration; + progress.phase = OciProgressPhase::ConfigAcquire; + self.progress.update(progress); let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; if let Some(seeded) = self .load_seed_json_blob::(image.manifest.config()) @@ -223,9 +234,18 @@ impl OciImageDownloader { .await?; config = serde_json::from_slice(&config_bytes)?; } + progress.phase = OciProgressPhase::LayerAcquire; + self.progress.update(progress); let mut layers = Vec::new(); for layer in image.manifest.layers() { - layers.push(self.acquire_layer(&image.name, layer, &mut client).await?); + progress.downloading_layer(layer.digest(), 0, layer.size() as usize); + self.progress.update(progress); + layers.push( + self.acquire_layer(&image.name, layer, &mut client, progress) + .await?, + ); + progress.downloaded_layer(layer.digest()); + self.progress.update(progress); } Ok(OciLocalImage { image, @@ -239,6 +259,7 @@ impl OciImageDownloader { image: &ImageName, layer: &Descriptor, client: &mut OciRegistryClient, + progress: &mut OciProgress, ) -> Result { debug!( "acquire layer digest={} size={}", @@ -251,7 +272,15 @@ impl OciImageDownloader { let seeded = self.extract_seed_blob(layer, &layer_path).await?; if !seeded { let file = File::create(&layer_path).await?; - let size = client.write_blob_to_file(&image.name, layer, file).await?; + let size = client + .write_blob_to_file( + &image.name, + layer, + file, + Some(progress), + Some(&self.progress), + ) + .await?; if layer.size() as u64 != size { return Err(anyhow!( "downloaded layer size differs from size in manifest", diff --git a/crates/oci/src/lib.rs b/crates/oci/src/lib.rs index e7272b6..ac3471a 100644 --- a/crates/oci/src/lib.rs +++ b/crates/oci/src/lib.rs @@ -2,4 +2,5 @@ pub mod cache; pub mod compiler; pub mod fetch; pub mod name; +pub mod progress; pub mod registry; diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs new file mode 100644 index 0000000..ef5a554 --- /dev/null +++ b/crates/oci/src/progress.rs @@ -0,0 +1,98 @@ +use std::collections::BTreeMap; + +use tokio::sync::broadcast::Sender; + +#[derive(Clone, Debug)] +pub struct OciProgress { + pub id: String, + pub phase: OciProgressPhase, + pub layers: BTreeMap, + pub value: u64, + pub total: u64, +} + +impl OciProgress { + pub fn add_layer(&mut self, id: &str) { + self.layers.insert( + id.to_string(), + OciProgressLayer { + id: id.to_string(), + phase: OciProgressLayerPhase::Waiting, + value: 0, + total: 0, + }, + ); + } + + pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Downloading; + entry.value = downloaded as u64; + entry.total = total as u64; + } + } + + pub fn downloaded_layer(&mut self, id: &str) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Downloaded; + entry.value = entry.total; + } + } + + pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Extracting; + entry.value = extracted as u64; + entry.total = total as u64; + } + } + + pub fn extracted_layer(&mut self, id: &str) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Extracted; + entry.value = entry.total; + } + } +} + +#[derive(Clone, Debug)] +pub enum OciProgressPhase { + Resolving, + Resolved, + ConfigAcquire, + LayerAcquire, + Packing, + Complete, +} + +#[derive(Clone, Debug)] +pub struct OciProgressLayer { + pub id: String, + pub phase: OciProgressLayerPhase, + pub value: u64, + pub total: u64, +} + +#[derive(Clone, Debug)] +pub enum OciProgressLayerPhase { + Waiting, + Downloading, + Downloaded, + Extracting, + Extracted, +} + +#[derive(Clone)] +pub struct OciProgressContext { + sender: Sender, +} + +impl OciProgressContext { + pub fn new(sender: Sender) -> OciProgressContext { + OciProgressContext { sender } + } + + pub fn update(&self, progress: &OciProgress) { + let _ = self.sender.send(progress.clone()); + } +} diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index e40a5cf..d3ecc34 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -7,6 +7,8 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode}; use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; +use crate::progress::{OciProgress, OciProgressContext}; + #[derive(Clone, Debug)] pub struct OciRegistryPlatform { pub os: Os, @@ -138,6 +140,8 @@ impl OciRegistryClient { name: N, descriptor: &Descriptor, mut dest: File, + mut progress_handle: Option<&mut OciProgress>, + progress_context: Option<&OciProgressContext>, ) -> Result { let url = self.url.join(&format!( "/v2/{}/blobs/{}", @@ -146,9 +150,24 @@ impl OciRegistryClient { ))?; let mut response = self.call(self.agent.get(url.as_str())).await?; let mut size: u64 = 0; + let mut last_progress_size: u64 = 0; while let Some(chunk) = response.chunk().await? { dest.write_all(&chunk).await?; size += chunk.len() as u64; + + if (size - last_progress_size) > (5 * 1024 * 1024) { + last_progress_size = size; + if let Some(progress_handle) = progress_handle.as_mut() { + progress_handle.downloading_layer( + descriptor.digest(), + size as usize, + descriptor.size() as usize, + ); + if let Some(progress_context) = progress_context.as_ref() { + progress_context.update(progress_handle); + } + } + } } Ok(size) } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index efafc98..deb4096 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -31,10 +31,6 @@ name = "kratart" [dev-dependencies] env_logger = { workspace = true } -[[example]] -name = "kratart-squashify" -path = "examples/squashify.rs" - [[example]] name = "kratart-channel" path = "examples/channel.rs" diff --git a/crates/runtime/examples/squashify.rs b/crates/runtime/examples/squashify.rs deleted file mode 100644 index d735874..0000000 --- a/crates/runtime/examples/squashify.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::{env::args, path::PathBuf}; - -use anyhow::Result; -use env_logger::Env; -use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName}; -use tokio::fs; - -#[tokio::main] -async fn main() -> Result<()> { - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - let image = ImageName::parse(&args().nth(1).unwrap())?; - let seed = args().nth(2).map(PathBuf::from); - - let cache_dir = PathBuf::from("krata-cache"); - if !cache_dir.exists() { - fs::create_dir(&cache_dir).await?; - } - - let cache = ImageCache::new(&cache_dir)?; - let compiler = ImageCompiler::new(&cache, seed)?; - let info = compiler.compile(&image).await?; - println!( - "generated squashfs of {} to {}", - image, - info.image_squashfs.to_string_lossy() - ); - Ok(()) -} diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index dad8433..131931d 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -9,6 +9,7 @@ use ipnetwork::{IpNetwork, Ipv4Network}; use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, }; +use krataoci::progress::OciProgressContext; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -51,7 +52,14 @@ impl GuestLauncher { ) -> Result { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let xen_name = format!("krata-{uuid}"); - let image_info = self.compile(request.image, &context.image_cache).await?; + let image_info = self + .compile( + &uuid.to_string(), + request.image, + &context.image_cache, + &context.oci_progress_context, + ) + .await?; let mut gateway_mac = MacAddr6::random(); gateway_mac.set_local(true); @@ -243,10 +251,16 @@ impl GuestLauncher { } } - async fn compile(&self, image: &str, image_cache: &ImageCache) -> Result { + async fn compile( + &self, + id: &str, + image: &str, + image_cache: &ImageCache, + progress: &OciProgressContext, + ) -> Result { let image = ImageName::parse(image)?; - let compiler = ImageCompiler::new(image_cache, None)?; - compiler.compile(&image).await + let compiler = ImageCompiler::new(image_cache, None, progress.clone())?; + compiler.compile(id, &image).await } async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result { diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 1c42763..e90402c 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -17,7 +17,7 @@ use self::{ autoloop::AutoLoop, launch::{GuestLaunchRequest, GuestLauncher}, }; -use krataoci::cache::ImageCache; +use krataoci::{cache::ImageCache, progress::OciProgressContext}; pub mod autoloop; pub mod cfgblk; @@ -51,6 +51,7 @@ pub struct GuestInfo { #[derive(Clone)] pub struct RuntimeContext { + pub oci_progress_context: OciProgressContext, pub image_cache: ImageCache, pub autoloop: AutoLoop, pub xen: XenClient, @@ -59,7 +60,7 @@ pub struct RuntimeContext { } impl RuntimeContext { - pub async fn new(store: String) -> Result { + pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { let mut image_cache_path = PathBuf::from(&store); image_cache_path.push("cache"); fs::create_dir_all(&image_cache_path)?; @@ -72,6 +73,7 @@ impl RuntimeContext { let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?; Ok(RuntimeContext { + oci_progress_context, image_cache, autoloop: AutoLoop::new(LoopControl::open()?), xen, @@ -252,15 +254,17 @@ impl RuntimeContext { #[derive(Clone)] pub struct Runtime { + oci_progress_context: OciProgressContext, store: Arc, context: RuntimeContext, launch_semaphore: Arc, } impl Runtime { - pub async fn new(store: String) -> Result { - let context = RuntimeContext::new(store.clone()).await?; + pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result { + let context = RuntimeContext::new(oci_progress_context.clone(), store.clone()).await?; Ok(Self { + oci_progress_context, store: Arc::new(store), context, launch_semaphore: Arc::new(Semaphore::new(1)), @@ -324,7 +328,7 @@ impl Runtime { } pub async fn dupe(&self) -> Result { - Runtime::new((*self.store).clone()).await + Runtime::new(self.oci_progress_context.clone(), (*self.store).clone()).await } }