From cb89ac02d18b2b6c1ffdefa7744161b86671efb9 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Fri, 12 Apr 2024 12:38:31 +0000 Subject: [PATCH] feat: oci progress bars on launch --- Cargo.lock | 54 ++++++++++++++ Cargo.toml | 1 + crates/ctl/Cargo.toml | 1 + crates/ctl/src/cli/launch.rs | 86 ++++++++++++++++++++++- crates/daemon/src/oci.rs | 6 +- crates/krata/proto/krata/v1/control.proto | 6 +- crates/oci/src/compiler.rs | 60 +++++++++++----- crates/oci/src/progress.rs | 27 ++++--- crates/runtime/src/launch.rs | 21 ++++-- 9 files changed, 216 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e5284f9..5069f7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,19 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys 0.52.0", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -693,6 +706,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4445909572dbd556c457c849c4ca58623d84b27c8fff1e74b0b4227d8b90d17b" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "env_filter" version = "0.1.0" @@ -1223,6 +1242,28 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "indicatif" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +dependencies = [ + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -1321,6 +1362,7 @@ dependencies = [ "env_logger", "fancy-duration", "human_bytes", + "indicatif", "krata", "log", "prost-reflect", @@ -1802,6 +1844,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.32.2" @@ -1946,6 +1994,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/Cargo.toml b/Cargo.toml index abb2bc4..15b23dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ fancy-duration = "0.9.2" flate2 = "1.0" futures = "0.3.30" human_bytes = "0.4" +indicatif = "0.17.8" ipnetwork = "0.20.0" libc = "0.2" log = "0.4.20" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index e93d255..fd7533d 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -18,6 +18,7 @@ ctrlc = { workspace = true, features = ["termination"] } env_logger = { workspace = true } fancy-duration = { workspace = true } human_bytes = { workspace = true } +indicatif = { workspace = true } krata = { path = "../krata", version = "^0.0.8" } log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index 776cd50..360a7eb 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use krata::{ events::EventStream, v1::{ @@ -11,7 +12,7 @@ use krata::{ }, control::{ control_service_client::ControlServiceClient, watch_events_reply::Event, - CreateGuestRequest, + CreateGuestRequest, OciProgressEventLayerPhase, OciProgressEventPhase, }, }, }; @@ -125,9 +126,14 @@ impl LauchCommand { async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { let mut stream = events.subscribe(); + let mut multi_progress: Option<(MultiProgress, HashMap)> = None; while let Ok(event) = stream.recv().await { match event { Event::GuestChanged(changed) => { + if let Some((multi_progress, _)) = multi_progress.as_mut() { + let _ = multi_progress.clear(); + } + let Some(guest) = changed.guest else { continue; }; @@ -159,7 +165,83 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { } } - Event::OciProgress(_oci) => {} + Event::OciProgress(oci) => { + if multi_progress.is_none() { + multi_progress = Some((MultiProgress::new(), HashMap::new())); + } + + let Some((multi_progress, progresses)) = multi_progress.as_mut() else { + continue; + }; + + match oci.phase() { + OciProgressEventPhase::Resolved + | OciProgressEventPhase::ConfigAcquire + | OciProgressEventPhase::LayerAcquire => { + if progresses.is_empty() && !oci.layers.is_empty() { + for layer in &oci.layers { + let bar = ProgressBar::new(layer.total); + bar.set_style( + ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}") + .unwrap(), + ); + progresses.insert(layer.id.clone(), bar.clone()); + multi_progress.add(bar); + } + } + + for layer in oci.layers { + let Some(progress) = progresses.get_mut(&layer.id) else { + continue; + }; + + let phase = match layer.phase() { + OciProgressEventLayerPhase::Waiting => "waiting", + OciProgressEventLayerPhase::Downloading => "downloading", + OciProgressEventLayerPhase::Downloaded => "downloaded", + OciProgressEventLayerPhase::Extracting => "extracting", + OciProgressEventLayerPhase::Extracted => "extracted", + _ => "unknown", + }; + + progress.set_message(format!("{} {}", layer.id, phase)); + progress.set_length(layer.total); + progress.set_position(layer.value); + } + } + + OciProgressEventPhase::Packing => { + for (key, progress) in &mut *progresses { + if key == "packing" { + continue; + } + progress.finish_and_clear(); + multi_progress.remove(progress); + } + progresses.retain(|k, _| k == "packing"); + if progresses.is_empty() { + let progress = ProgressBar::new(100); + progress.set_style( + ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}") + .unwrap(), + ); + progresses.insert("packing".to_string(), progress); + } + let Some(progress) = progresses.get("packing") else { + continue; + }; + progress.set_message("packing image"); + progress.set_length(oci.total); + progress.set_position(oci.value); + } + + _ => {} + } + + for progress in progresses { + progress.1.tick(); + } + } } } Ok(()) diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs index d3941c9..8544728 100644 --- a/crates/daemon/src/oci.rs +++ b/crates/daemon/src/oci.rs @@ -14,7 +14,8 @@ fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted, } .into(), - progress: layer.progress, + value: layer.value, + total: layer.total, } } @@ -35,6 +36,7 @@ pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent { .into_values() .map(convert_oci_layer_progress) .collect::>(), - progress: oci.progress, + value: oci.value, + total: oci.total, } } diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index ef51cab..1dfaf0f 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -81,7 +81,8 @@ enum OciProgressEventLayerPhase { message OciProgressEventLayer { string id = 1; OciProgressEventLayerPhase phase = 2; - double progress = 3; + uint64 value = 3; + uint64 total = 4; } enum OciProgressEventPhase { @@ -98,7 +99,8 @@ message OciProgressEvent { string guest_id = 1; OciProgressEventPhase phase = 2; repeated OciProgressEventLayer layers = 3; - double progress = 4; + uint64 value = 4; + uint64 total = 5; } message ReadGuestMetricsRequest { diff --git a/crates/oci/src/compiler.rs b/crates/oci/src/compiler.rs index 7dc4c4a..5d3a406 100644 --- a/crates/oci/src/compiler.rs +++ b/crates/oci/src/compiler.rs @@ -97,7 +97,8 @@ impl ImageCompiler<'_> { id: id.to_string(), phase: OciProgressPhase::Resolving, layers: BTreeMap::new(), - progress: 0.0, + value: 0, + total: 0, }; self.progress.update(&progress); let downloader = OciImageDownloader::new( @@ -148,8 +149,8 @@ impl ImageCompiler<'_> { let path = entry.path()?; let mut maybe_whiteout_path_str = path.to_str().map(|x| x.to_string()).unwrap_or_default(); - completed += 1; progress.extracting_layer(&layer.digest, completed, count); + completed += 1; self.progress.update(&progress); if whiteouts.contains(&maybe_whiteout_path_str) { continue; @@ -159,10 +160,10 @@ impl ImageCompiler<'_> { continue; } let Some(name) = path.file_name() else { - return Err(anyhow!("unable to get file name")); + continue; }; let Some(name) = name.to_str() else { - return Err(anyhow!("unable to get file name as string")); + continue; }; if name.starts_with(".wh.") { @@ -172,6 +173,8 @@ impl ImageCompiler<'_> { .await?; } } + progress.extracted_layer(&layer.digest); + self.progress.update(&progress); } for layer in &local.layers { @@ -180,13 +183,31 @@ impl ImageCompiler<'_> { } } - self.squash(image_dir, squash_file, &mut progress)?; + let image_dir_squash = image_dir.to_path_buf(); + let squash_file_squash = squash_file.to_path_buf(); + let progress_squash = progress.clone(); + let progress_context = self.progress.clone(); + progress = tokio::task::spawn_blocking(move || { + ImageCompiler::squash( + &image_dir_squash, + &squash_file_squash, + progress_squash, + progress_context, + ) + }) + .await??; + let info = ImageInfo::new( squash_file.to_path_buf(), local.image.manifest, local.config, )?; - self.cache.store(&cache_digest, &info).await + let info = self.cache.store(&cache_digest, &info).await?; + progress.phase = OciProgressPhase::Complete; + progress.value = 0; + progress.total = 0; + self.progress.update(&progress); + Ok(info) } async fn process_layer_whiteout( @@ -203,10 +224,10 @@ impl ImageCompiler<'_> { count += 1; let path = entry.path()?; let Some(name) = path.file_name() else { - return Err(anyhow!("unable to get file name")); + continue; }; let Some(name) = name.to_str() else { - return Err(anyhow!("unable to get file name as string")); + continue; }; if name.starts_with(".wh.") { @@ -339,14 +360,15 @@ impl ImageCompiler<'_> { } fn squash( - &self, image_dir: &Path, squash_file: &Path, - progress: &mut OciProgress, - ) -> Result<()> { + mut progress: OciProgress, + progress_context: OciProgressContext, + ) -> Result { progress.phase = OciProgressPhase::Packing; - progress.progress = 0.0; - self.progress.update(progress); + progress.total = 2; + progress.value = 0; + progress_context.update(&progress); let mut writer = FilesystemWriter::default(); writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); let walk = WalkDir::new(image_dir).follow_links(false); @@ -405,8 +427,8 @@ impl ImageCompiler<'_> { } progress.phase = OciProgressPhase::Packing; - progress.progress = 50.0; - self.progress.update(progress); + progress.value = 1; + progress_context.update(&progress); let squash_file_path = squash_file .to_str() @@ -417,10 +439,10 @@ impl ImageCompiler<'_> { trace!("squash generate: {}", squash_file_path); writer.write(&mut bufwrite)?; std::fs::remove_dir_all(image_dir)?; - progress.phase = OciProgressPhase::Complete; - progress.progress = 100.0; - self.progress.update(progress); - Ok(()) + progress.phase = OciProgressPhase::Packing; + progress.value = 2; + progress_context.update(&progress); + Ok(progress) } } diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index db48152..ef5a554 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -7,7 +7,8 @@ pub struct OciProgress { pub id: String, pub phase: OciProgressPhase, pub layers: BTreeMap, - pub progress: f64, + pub value: u64, + pub total: u64, } impl OciProgress { @@ -17,7 +18,8 @@ impl OciProgress { OciProgressLayer { id: id.to_string(), phase: OciProgressLayerPhase::Waiting, - progress: 0.0, + value: 0, + total: 0, }, ); } @@ -25,36 +27,30 @@ impl OciProgress { pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloading; - entry.progress = if total != 0 { - (downloaded as f64 / total as f64) * 100.0 - } else { - 100.0 - }; + entry.value = downloaded as u64; + entry.total = total as u64; } } pub fn downloaded_layer(&mut self, id: &str) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloaded; - entry.progress = 100.0; + entry.value = entry.total; } } pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracting; - entry.progress = if total != 0 { - (extracted as f64 / total as f64) * 100.0 - } else { - 100.0 - }; + entry.value = extracted as u64; + entry.total = total as u64; } } pub fn extracted_layer(&mut self, id: &str) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracted; - entry.progress = 100.0; + entry.value = entry.total; } } } @@ -73,7 +69,8 @@ pub enum OciProgressPhase { pub struct OciProgressLayer { pub id: String, pub phase: OciProgressLayerPhase, - pub progress: f64, + pub value: u64, + pub total: u64, } #[derive(Clone, Debug)] diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index 5b1f124..131931d 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,7 +10,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, }; use krataoci::progress::OciProgressContext; -use tokio::sync::{broadcast, Semaphore}; +use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; use xenstore::XsdInterface; @@ -53,7 +53,12 @@ impl GuestLauncher { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let xen_name = format!("krata-{uuid}"); let image_info = self - .compile(&uuid.to_string(), request.image, &context.image_cache) + .compile( + &uuid.to_string(), + request.image, + &context.image_cache, + &context.oci_progress_context, + ) .await?; let mut gateway_mac = MacAddr6::random(); @@ -246,11 +251,15 @@ impl GuestLauncher { } } - async fn compile(&self, id: &str, image: &str, image_cache: &ImageCache) -> Result { + async fn compile( + &self, + id: &str, + image: &str, + image_cache: &ImageCache, + progress: &OciProgressContext, + ) -> Result { let image = ImageName::parse(image)?; - let (sender, _) = broadcast::channel(1000); - let context = OciProgressContext::new(sender); - let compiler = ImageCompiler::new(image_cache, None, context)?; + let compiler = ImageCompiler::new(image_cache, None, progress.clone())?; compiler.compile(id, &image).await }