From ee024580afae691495b4b230365fca6593ee6788 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 16 Apr 2024 12:34:02 +0000 Subject: [PATCH] feat: implement improved and detailed oci progress indication --- crates/ctl/src/cli/launch.rs | 5 +- crates/ctl/src/cli/pull.rs | 3 + crates/ctl/src/pull.rs | 324 ++++++++++++++++------ crates/daemon/src/control.rs | 24 +- crates/daemon/src/oci.rs | 88 ++++-- crates/krata/proto/krata/v1/control.proto | 79 ++++-- crates/oci/examples/squashify.rs | 28 +- crates/oci/src/assemble.rs | 30 +- crates/oci/src/fetch.rs | 56 +++- crates/oci/src/packer/backend.rs | 48 ++-- crates/oci/src/packer/service.rs | 22 +- crates/oci/src/progress.rs | 145 +++++++--- crates/oci/src/registry.rs | 24 +- crates/oci/src/vfs.rs | 9 +- 14 files changed, 595 insertions(+), 290 deletions(-) diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index f91823b..e65f968 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -30,8 +30,10 @@ pub enum LaunchImageFormat { #[derive(Parser)] #[command(about = "Launch a new guest")] pub struct LauchCommand { - #[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] + #[arg(long, default_value = "squashfs", help = "Image format")] image_format: LaunchImageFormat, + #[arg(long, help = "Overwrite image cache on pull")] + pull_overwrite_cache: bool, #[arg(short, long, help = "Name of the guest")] name: Option, #[arg( @@ -85,6 +87,7 @@ impl LauchCommand { LaunchImageFormat::Squashfs => OciImageFormat::Squashfs.into(), LaunchImageFormat::Erofs => OciImageFormat::Erofs.into(), }, + overwrite_cache: self.pull_overwrite_cache, }) .await?; let reply = pull_interactive_progress(response.into_inner()).await?; diff --git a/crates/ctl/src/cli/pull.rs b/crates/ctl/src/cli/pull.rs index 9398c1d..3a63fe1 100644 --- a/crates/ctl/src/cli/pull.rs +++ b/crates/ctl/src/cli/pull.rs @@ -23,6 +23,8 @@ pub struct PullCommand { image: String, #[arg(short = 's', long, default_value = "squashfs", help = "Image format")] image_format: PullImageFormat, + #[arg(short = 'o', long, help = "Overwrite image cache")] + overwrite_cache: bool, } impl PullCommand { @@ -35,6 +37,7 @@ impl PullCommand { PullImageFormat::Erofs => OciImageFormat::Erofs.into(), PullImageFormat::Tar => OciImageFormat::Tar.into(), }, + overwrite_cache: self.overwrite_cache, }) .await?; let reply = pull_interactive_progress(response.into_inner()).await?; diff --git a/crates/ctl/src/pull.rs b/crates/ctl/src/pull.rs index e91df10..098d40e 100644 --- a/crates/ctl/src/pull.rs +++ b/crates/ctl/src/pull.rs @@ -1,20 +1,205 @@ -use std::collections::HashMap; +use std::{ + collections::{hash_map::Entry, HashMap}, + time::Duration, +}; use anyhow::{anyhow, Result}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use krata::v1::control::{PullImageProgressLayerPhase, PullImageProgressPhase, PullImageReply}; +use krata::v1::control::{ + image_progress_indication::Indication, ImageProgressIndication, ImageProgressLayerPhase, + ImageProgressPhase, PullImageReply, +}; use tokio_stream::StreamExt; use tonic::Streaming; +const SPINNER_STRINGS: &[&str] = &[ + "[= ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ =]", + "[====================]", +]; + +fn progress_bar_for_indication(indication: &ImageProgressIndication) -> Option { + match indication.indication.as_ref() { + Some(Indication::Hidden(_)) | None => None, + Some(Indication::Bar(indic)) => { + let bar = ProgressBar::new(indic.total); + bar.enable_steady_tick(Duration::from_millis(100)); + Some(bar) + } + Some(Indication::Spinner(_)) => { + let bar = ProgressBar::new_spinner(); + bar.enable_steady_tick(Duration::from_millis(100)); + Some(bar) + } + Some(Indication::Completed(indic)) => { + let bar = ProgressBar::new_spinner(); + bar.enable_steady_tick(Duration::from_millis(100)); + if !indic.message.is_empty() { + bar.finish_with_message(indic.message.clone()); + } else { + bar.finish() + } + Some(bar) + } + } +} + +fn configure_for_indication( + bar: &mut ProgressBar, + multi_progress: &mut MultiProgress, + indication: &ImageProgressIndication, + top_phase: Option, + layer_phase: Option, + layer_id: Option<&str>, +) { + let prefix = if let Some(phase) = top_phase { + match phase { + ImageProgressPhase::Unknown => "unknown", + ImageProgressPhase::Started => "started", + ImageProgressPhase::Resolving => "resolving", + ImageProgressPhase::Resolved => "resolved", + ImageProgressPhase::ConfigDownload => "downloading", + ImageProgressPhase::LayerDownload => "downloading", + ImageProgressPhase::Assemble => "assembling", + ImageProgressPhase::Pack => "packing", + ImageProgressPhase::Complete => "complete", + } + } else if let Some(phase) = layer_phase { + match phase { + ImageProgressLayerPhase::Unknown => "unknown", + ImageProgressLayerPhase::Waiting => "waiting", + ImageProgressLayerPhase::Downloading => "downloading", + ImageProgressLayerPhase::Downloaded => "downloaded", + ImageProgressLayerPhase::Extracting => "extracting", + ImageProgressLayerPhase::Extracted => "extracted", + } + } else { + "" + }; + let prefix = prefix.to_string(); + + let id = if let Some(layer_id) = layer_id { + let hash = if let Some((_, hash)) = layer_id.split_once(':') { + hash + } else { + "unknown" + }; + let small_hash = if hash.len() > 10 { &hash[0..10] } else { hash }; + Some(format!("{:width$}", small_hash, width = 10)) + } else { + None + }; + + let prefix = if let Some(id) = id { + format!("{} {:width$}", id, prefix, width = 11) + } else { + format!(" {:width$}", prefix, width = 11) + }; + + match indication.indication.as_ref() { + Some(Indication::Hidden(_)) | None => { + multi_progress.remove(bar); + return; + } + Some(Indication::Bar(indic)) => { + if indic.is_bytes { + bar.set_style(ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) eta: {eta}").unwrap().progress_chars("=>-")); + } else { + bar.set_style( + ProgressStyle::with_template( + "{prefix} [{bar:20} {msg} {human_pos}/{human_len} ({per_sec}/sec)", + ) + .unwrap() + .progress_chars("=>-"), + ); + } + bar.set_message(indic.message.clone()); + bar.set_position(indic.current); + bar.set_length(indic.total); + } + Some(Indication::Spinner(indic)) => { + bar.set_style( + ProgressStyle::with_template("{prefix} {spinner} {msg}") + .unwrap() + .tick_strings(SPINNER_STRINGS), + ); + bar.set_message(indic.message.clone()); + } + Some(Indication::Completed(indic)) => { + if bar.is_finished() { + return; + } + bar.disable_steady_tick(); + bar.set_message(indic.message.clone()); + if indic.total != 0 { + bar.set_position(indic.total); + bar.set_length(indic.total); + } + if bar.style().get_tick_str(0).contains('=') { + bar.set_style( + ProgressStyle::with_template("{prefix} {spinner} {msg}") + .unwrap() + .tick_strings(SPINNER_STRINGS), + ); + bar.finish_with_message(indic.message.clone()); + } else if indic.is_bytes { + bar.set_style( + ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_total_bytes}") + .unwrap() + .progress_chars("=>-"), + ); + } else { + bar.set_style( + ProgressStyle::with_template("{prefix} [{bar:20}] {msg}") + .unwrap() + .progress_chars("=>-"), + ); + } + bar.tick(); + bar.enable_steady_tick(Duration::from_millis(100)); + } + }; + + bar.set_prefix(prefix); + bar.tick(); +} + pub async fn pull_interactive_progress( mut stream: Streaming, ) -> Result { - let mut multi_progress: Option<(MultiProgress, HashMap)> = None; + let mut multi_progress = MultiProgress::new(); + multi_progress.set_move_cursor(false); + let mut progresses = HashMap::new(); while let Some(reply) = stream.next().await { - let reply = reply?; + let reply = match reply { + Ok(reply) => reply, + Err(error) => { + multi_progress.clear()?; + return Err(error.into()); + } + }; if reply.progress.is_none() && !reply.digest.is_empty() { + multi_progress.clear()?; return Ok(reply); } @@ -22,97 +207,62 @@ pub async fn pull_interactive_progress( continue; }; - if multi_progress.is_none() { - multi_progress = Some((MultiProgress::new(), HashMap::new())); + for layer in &oci.layers { + let Some(ref indication) = layer.indication else { + continue; + }; + + let bar = match progresses.entry(layer.id.clone()) { + Entry::Occupied(entry) => Some(entry.into_mut()), + + Entry::Vacant(entry) => { + if let Some(bar) = progress_bar_for_indication(indication) { + multi_progress.add(bar.clone()); + Some(entry.insert(bar)) + } else { + None + } + } + }; + + if let Some(bar) = bar { + configure_for_indication( + bar, + &mut multi_progress, + indication, + None, + Some(layer.phase()), + Some(&layer.id), + ); + } } - let Some((multi_progress, progresses)) = multi_progress.as_mut() else { - continue; - }; + if let Some(ref indication) = oci.indication { + let bar = match progresses.entry("root".to_string()) { + Entry::Occupied(entry) => Some(entry.into_mut()), - match oci.phase() { - PullImageProgressPhase::Resolved - | PullImageProgressPhase::ConfigAcquire - | PullImageProgressPhase::LayerAcquire => { - if progresses.is_empty() && !oci.layers.is_empty() { - for layer in &oci.layers { - let bar = ProgressBar::new(layer.total); - bar.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); - progresses.insert(layer.id.clone(), bar.clone()); - multi_progress.add(bar); - } - } - - for layer in oci.layers { - let Some(progress) = progresses.get_mut(&layer.id) else { - continue; - }; - - let phase = match layer.phase() { - PullImageProgressLayerPhase::Waiting => "waiting", - PullImageProgressLayerPhase::Downloading => "downloading", - PullImageProgressLayerPhase::Downloaded => "downloaded", - PullImageProgressLayerPhase::Extracting => "extracting", - PullImageProgressLayerPhase::Extracted => "extracted", - _ => "unknown", - }; - - let simple = if let Some((_, hash)) = layer.id.split_once(':') { - hash + Entry::Vacant(entry) => { + if let Some(bar) = progress_bar_for_indication(indication) { + multi_progress.add(bar.clone()); + Some(entry.insert(bar)) } else { - "unknown" - }; - let simple = if simple.len() > 10 { - &simple[0..10] - } else { - simple - }; - let message = format!( - "{:width$} {:phwidth$}", - simple, - phase, - width = 10, - phwidth = 11 - ); - - if message != progress.message() { - progress.set_message(message); + None } - - progress.update(|state| { - state.set_len(layer.total); - state.set_pos(layer.value); - }); } + }; + + if let Some(bar) = bar { + configure_for_indication( + bar, + &mut multi_progress, + indication, + Some(oci.phase()), + None, + None, + ); } - - PullImageProgressPhase::Packing => { - for (key, bar) in &mut *progresses { - if key == "packing" { - continue; - } - bar.finish_and_clear(); - multi_progress.remove(bar); - } - progresses.retain(|k, _| k == "packing"); - if progresses.is_empty() { - let progress = ProgressBar::new(100); - progress.set_message("packing "); - progress.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); - progresses.insert("packing".to_string(), progress); - } - let Some(progress) = progresses.get("packing") else { - continue; - }; - - progress.update(|state| { - state.set_len(oci.total); - state.set_pos(oci.value); - }); - } - - _ => {} } } + multi_progress.clear()?; Err(anyhow!("never received final reply for image pull")) } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 44ae3b9..a6bb54d 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -372,7 +372,7 @@ impl ControlService for DaemonControlService { let output = try_stream! { let mut task = tokio::task::spawn(async move { - our_packer.request(name, format, context).await + our_packer.request(name, format, request.overwrite_cache, context).await }); let abort_handle = task.abort_handle(); let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { @@ -381,26 +381,14 @@ impl ControlService for DaemonControlService { loop { let what = select! { - x = receiver.recv() => PullImageSelect::Progress(x.ok()), + x = receiver.changed() => match x { + Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())), + Err(_) => PullImageSelect::Progress(None), + }, x = &mut task => PullImageSelect::Completed(x), }; match what { - PullImageSelect::Progress(Some(mut progress)) => { - let mut drain = 0; - loop { - if drain >= 10 { - break; - } - - if let Ok(latest) = receiver.try_recv() { - progress = latest; - } else { - break; - } - - drain += 1; - } - + PullImageSelect::Progress(Some(progress)) => { let reply = PullImageReply { progress: Some(convert_oci_progress(progress)), digest: String::new(), diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs index 45945a2..0a601ef 100644 --- a/crates/daemon/src/oci.rs +++ b/crates/daemon/src/oci.rs @@ -1,33 +1,72 @@ use krata::v1::control::{ - PullImageProgress, PullImageProgressLayer, PullImageProgressLayerPhase, PullImageProgressPhase, + image_progress_indication::Indication, ImageProgress, ImageProgressIndication, + ImageProgressIndicationBar, ImageProgressIndicationCompleted, ImageProgressIndicationHidden, + ImageProgressIndicationSpinner, ImageProgressLayer, ImageProgressLayerPhase, + ImageProgressPhase, +}; +use krataoci::progress::{ + OciProgress, OciProgressIndication, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase, }; -use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase}; -fn convert_oci_layer_progress(layer: OciProgressLayer) -> PullImageProgressLayer { - PullImageProgressLayer { - id: layer.id, - phase: match layer.phase { - OciProgressLayerPhase::Waiting => PullImageProgressLayerPhase::Waiting, - OciProgressLayerPhase::Downloading => PullImageProgressLayerPhase::Downloading, - OciProgressLayerPhase::Downloaded => PullImageProgressLayerPhase::Downloaded, - OciProgressLayerPhase::Extracting => PullImageProgressLayerPhase::Extracting, - OciProgressLayerPhase::Extracted => PullImageProgressLayerPhase::Extracted, - } - .into(), - value: layer.value, - total: layer.total, +fn convert_oci_progress_indication(indication: OciProgressIndication) -> ImageProgressIndication { + ImageProgressIndication { + indication: Some(match indication { + OciProgressIndication::Hidden => Indication::Hidden(ImageProgressIndicationHidden {}), + OciProgressIndication::ProgressBar { + message, + current, + total, + bytes, + } => Indication::Bar(ImageProgressIndicationBar { + message: message.unwrap_or_default(), + current, + total, + is_bytes: bytes, + }), + OciProgressIndication::Spinner { message } => { + Indication::Spinner(ImageProgressIndicationSpinner { + message: message.unwrap_or_default(), + }) + } + OciProgressIndication::Completed { + message, + total, + bytes, + } => Indication::Completed(ImageProgressIndicationCompleted { + message: message.unwrap_or_default(), + total: total.unwrap_or(0), + is_bytes: bytes, + }), + }), } } -pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { - PullImageProgress { +fn convert_oci_layer_progress(layer: OciProgressLayer) -> ImageProgressLayer { + ImageProgressLayer { + id: layer.id, + phase: match layer.phase { + OciProgressLayerPhase::Waiting => ImageProgressLayerPhase::Waiting, + OciProgressLayerPhase::Downloading => ImageProgressLayerPhase::Downloading, + OciProgressLayerPhase::Downloaded => ImageProgressLayerPhase::Downloaded, + OciProgressLayerPhase::Extracting => ImageProgressLayerPhase::Extracting, + OciProgressLayerPhase::Extracted => ImageProgressLayerPhase::Extracted, + } + .into(), + indication: Some(convert_oci_progress_indication(layer.indication)), + } +} + +pub fn convert_oci_progress(oci: OciProgress) -> ImageProgress { + ImageProgress { phase: match oci.phase { - OciProgressPhase::Resolving => PullImageProgressPhase::Resolving, - OciProgressPhase::Resolved => PullImageProgressPhase::Resolved, - OciProgressPhase::ConfigAcquire => PullImageProgressPhase::ConfigAcquire, - OciProgressPhase::LayerAcquire => PullImageProgressPhase::LayerAcquire, - OciProgressPhase::Packing => PullImageProgressPhase::Packing, - OciProgressPhase::Complete => PullImageProgressPhase::Complete, + OciProgressPhase::Started => ImageProgressPhase::Started, + OciProgressPhase::Resolving => ImageProgressPhase::Resolving, + OciProgressPhase::Resolved => ImageProgressPhase::Resolved, + OciProgressPhase::ConfigDownload => ImageProgressPhase::ConfigDownload, + OciProgressPhase::LayerDownload => ImageProgressPhase::LayerDownload, + OciProgressPhase::Assemble => ImageProgressPhase::Assemble, + OciProgressPhase::Pack => ImageProgressPhase::Pack, + OciProgressPhase::Complete => ImageProgressPhase::Complete, } .into(), layers: oci @@ -35,7 +74,6 @@ pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { .into_values() .map(convert_oci_layer_progress) .collect::>(), - value: oci.value, - total: oci.total, + indication: Some(convert_oci_progress_indication(oci.indication)), } } diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 692eba8..a28f660 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -89,46 +89,75 @@ message SnoopIdmReply { krata.bus.idm.IdmPacket packet = 3; } -enum PullImageProgressLayerPhase { - PULL_IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; - PULL_IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; - PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; - PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; - PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; - PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; +message ImageProgress { + ImageProgressPhase phase = 1; + repeated ImageProgressLayer layers = 2; + ImageProgressIndication indication = 3; } -message PullImageProgressLayer { +enum ImageProgressPhase { + IMAGE_PROGRESS_PHASE_UNKNOWN = 0; + IMAGE_PROGRESS_PHASE_STARTED = 1; + IMAGE_PROGRESS_PHASE_RESOLVING = 2; + IMAGE_PROGRESS_PHASE_RESOLVED = 3; + IMAGE_PROGRESS_PHASE_CONFIG_DOWNLOAD = 4; + IMAGE_PROGRESS_PHASE_LAYER_DOWNLOAD = 5; + IMAGE_PROGRESS_PHASE_ASSEMBLE = 6; + IMAGE_PROGRESS_PHASE_PACK = 7; + IMAGE_PROGRESS_PHASE_COMPLETE = 8; +} + +message ImageProgressLayer { string id = 1; - PullImageProgressLayerPhase phase = 2; - uint64 value = 3; - uint64 total = 4; + ImageProgressLayerPhase phase = 2; + ImageProgressIndication indication = 3; } -enum PullImageProgressPhase { - PULL_IMAGE_PROGRESS_PHASE_UNKNOWN = 0; - PULL_IMAGE_PROGRESS_PHASE_RESOLVING = 1; - PULL_IMAGE_PROGRESS_PHASE_RESOLVED = 2; - PULL_IMAGE_PROGRESS_PHASE_CONFIG_ACQUIRE = 3; - PULL_IMAGE_PROGRESS_PHASE_LAYER_ACQUIRE = 4; - PULL_IMAGE_PROGRESS_PHASE_PACKING = 5; - PULL_IMAGE_PROGRESS_PHASE_COMPLETE = 6; +enum ImageProgressLayerPhase { + IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; + IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; + IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; + IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; + IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; + IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; } -message PullImageProgress { - PullImageProgressPhase phase = 1; - repeated PullImageProgressLayer layers = 2; - uint64 value = 3; - uint64 total = 4; +message ImageProgressIndication { + oneof indication { + ImageProgressIndicationBar bar = 1; + ImageProgressIndicationSpinner spinner = 2; + ImageProgressIndicationHidden hidden = 3; + ImageProgressIndicationCompleted completed = 4; + } +} + +message ImageProgressIndicationBar { + string message = 1; + uint64 current = 2; + uint64 total = 3; + bool is_bytes = 4; +} + +message ImageProgressIndicationSpinner { + string message = 1; +} + +message ImageProgressIndicationHidden {} + +message ImageProgressIndicationCompleted { + string message = 1; + uint64 total = 2; + bool is_bytes = 3; } message PullImageRequest { string image = 1; krata.v1.common.OciImageFormat format = 2; + bool overwrite_cache = 3; } message PullImageReply { - PullImageProgress progress = 1; + ImageProgress progress = 1; string digest = 2; krata.v1.common.OciImageFormat format = 3; } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 777c951..8f251e6 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -25,37 +25,19 @@ async fn main() -> Result<()> { let (context, mut receiver) = OciProgressContext::create(); tokio::task::spawn(async move { loop { - let Ok(mut progress) = receiver.recv().await else { - return; - }; - - let mut drain = 0; - loop { - if drain >= 10 { - break; - } - - if let Ok(latest) = receiver.try_recv() { - progress = latest; - } else { - break; - } - - drain += 1; + if (receiver.changed().await).is_err() { + break; } - + let progress = receiver.borrow_and_update(); println!("phase {:?}", progress.phase); for (id, layer) in &progress.layers { - println!( - "{} {:?} {} of {}", - id, layer.phase, layer.value, layer.total - ) + println!("{} {:?} {:?}", id, layer.phase, layer.indication,) } } }); let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let packed = service - .request(image.clone(), OciPackedFormat::Squashfs, context) + .request(image.clone(), OciPackedFormat::Squashfs, false, context) .await?; println!( "generated squashfs of {} to {}", diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 2931f3c..2365c61 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -1,4 +1,4 @@ -use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; +use crate::fetch::{OciImageFetcher, OciImageLayer, OciImageLayerReader, OciResolvedImage}; use crate::progress::OciBoundProgress; use crate::schema::OciSchema; use crate::vfs::{VfsNode, VfsTree}; @@ -11,7 +11,6 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs; -use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_tar::{Archive, Entry}; use uuid::Uuid; @@ -115,12 +114,14 @@ impl OciImageAssembler { ); self.progress .update(|progress| { - progress.extracting_layer(&layer.digest, 0, 1); + progress.start_extracting_layer(&layer.digest); }) .await; debug!("process layer digest={}", &layer.digest,); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; + let mut count = 0u64; + let mut size = 0u64; while let Some(entry) = entries.next().await { let mut entry = entry?; let path = entry.path()?; @@ -134,14 +135,21 @@ impl OciImageAssembler { self.process_whiteout_entry(&mut vfs, &entry, name, layer) .await?; } else { - vfs.insert_tar_entry(&entry)?; - self.process_write_entry(&mut vfs, &mut entry, layer) + let reference = vfs.insert_tar_entry(&entry)?; + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, &reference.name); + }) + .await; + size += self + .process_write_entry(&mut vfs, &mut entry, layer) .await?; + count += 1; } } self.progress .update(|progress| { - progress.extracted_layer(&layer.digest); + progress.extracted_layer(&layer.digest, count, size); }) .await; } @@ -169,7 +177,7 @@ impl OciImageAssembler { async fn process_whiteout_entry( &self, vfs: &mut VfsTree, - entry: &Entry>>>, + entry: &Entry>>>, name: &str, layer: &OciImageLayer, ) -> Result<()> { @@ -210,11 +218,11 @@ impl OciImageAssembler { async fn process_write_entry( &self, vfs: &mut VfsTree, - entry: &mut Entry>>>, + entry: &mut Entry>>>, layer: &OciImageLayer, - ) -> Result<()> { + ) -> Result { if !entry.header().entry_type().is_file() { - return Ok(()); + return Ok(0); } trace!( "unpack entry layer={} path={:?} type={:?}", @@ -230,7 +238,7 @@ impl OciImageAssembler { .await? .ok_or(anyhow!("unpack did not return a path"))?; vfs.set_disk_path(&entry.path()?, &path)?; - Ok(()) + Ok(entry.header().size()?) } } diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index 05817d7..db502d0 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -10,6 +10,8 @@ use super::{ use std::{ fmt::Debug, + io::SeekFrom, + os::unix::fs::MetadataExt, path::{Path, PathBuf}, pin::Pin, }; @@ -22,8 +24,8 @@ use oci_spec::image::{ }; use serde::de::DeserializeOwned; use tokio::{ - fs::File, - io::{AsyncRead, AsyncReadExt, BufReader, BufWriter}, + fs::{self, File}, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter}, }; use tokio_stream::StreamExt; use tokio_tar::Archive; @@ -43,16 +45,43 @@ pub enum OciImageLayerCompression { #[derive(Clone, Debug)] pub struct OciImageLayer { + pub metadata: Descriptor, pub path: PathBuf, pub digest: String, pub compression: OciImageLayerCompression, } +#[async_trait::async_trait] +pub trait OciImageLayerReader: AsyncRead + Sync { + async fn position(&mut self) -> Result; +} + +#[async_trait::async_trait] +impl OciImageLayerReader for BufReader { + async fn position(&mut self) -> Result { + Ok(self.seek(SeekFrom::Current(0)).await?) + } +} + +#[async_trait::async_trait] +impl OciImageLayerReader for GzipDecoder> { + async fn position(&mut self) -> Result { + self.get_mut().position().await + } +} + +#[async_trait::async_trait] +impl OciImageLayerReader for ZstdDecoder> { + async fn position(&mut self) -> Result { + self.get_mut().position().await + } +} + impl OciImageLayer { - pub async fn decompress(&self) -> Result>> { + pub async fn decompress(&self) -> Result>> { let file = File::open(&self.path).await?; let reader = BufReader::new(file); - let reader: Pin> = match self.compression { + let reader: Pin> = match self.compression { OciImageLayerCompression::None => Box::pin(reader), OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)), OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)), @@ -60,7 +89,7 @@ impl OciImageLayer { Ok(reader) } - pub async fn archive(&self) -> Result>>> { + pub async fn archive(&self) -> Result>>> { let decompress = self.decompress().await?; Ok(Archive::new(decompress)) } @@ -225,7 +254,7 @@ impl OciImageFetcher { let config: OciSchema; self.progress .update(|progress| { - progress.phase = OciProgressPhase::ConfigAcquire; + progress.phase = OciProgressPhase::ConfigDownload; }) .await; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; @@ -245,10 +274,10 @@ impl OciImageFetcher { } self.progress .update(|progress| { - progress.phase = OciProgressPhase::LayerAcquire; + progress.phase = OciProgressPhase::LayerDownload; for layer in image.manifest.item().layers() { - progress.add_layer(layer.digest(), layer.size() as usize); + progress.add_layer(layer.digest()); } }) .await; @@ -256,7 +285,7 @@ impl OciImageFetcher { for layer in image.manifest.item().layers() { self.progress .update(|progress| { - progress.downloading_layer(layer.digest(), 0, layer.size() as usize); + progress.downloading_layer(layer.digest(), 0, layer.size() as u64); }) .await; layers.push( @@ -265,7 +294,7 @@ impl OciImageFetcher { ); self.progress .update(|progress| { - progress.downloaded_layer(layer.digest()); + progress.downloaded_layer(layer.digest(), layer.size() as u64); }) .await; } @@ -304,6 +333,12 @@ impl OciImageFetcher { } } + let metadata = fs::metadata(&layer_path).await?; + + if layer.size() as u64 != metadata.size() { + return Err(anyhow!("layer size differs from size in manifest",)); + } + let mut media_type = layer.media_type().clone(); // docker layer compatibility @@ -318,6 +353,7 @@ impl OciImageFetcher { other => return Err(anyhow!("found layer with unknown media type: {}", other)), }; Ok(OciImageLayer { + metadata: layer.clone(), path: layer_path, digest: layer.digest().clone(), compression, diff --git a/crates/oci/src/packer/backend.rs b/crates/oci/src/packer/backend.rs index 2218bc8..f82982f 100644 --- a/crates/oci/src/packer/backend.rs +++ b/crates/oci/src/packer/backend.rs @@ -1,14 +1,12 @@ -use std::{path::Path, process::Stdio, sync::Arc}; +use std::{os::unix::fs::MetadataExt, path::Path, process::Stdio, sync::Arc}; use super::OciPackedFormat; -use crate::{ - progress::{OciBoundProgress, OciProgressPhase}, - vfs::VfsTree, -}; +use crate::{progress::OciBoundProgress, vfs::VfsTree}; use anyhow::{anyhow, Result}; use log::warn; use tokio::{ - fs::File, + fs::{self, File}, + io::BufWriter, pin, process::{Child, Command}, select, @@ -55,9 +53,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; @@ -120,12 +116,9 @@ impl OciPackerBackend for OciPackerMkSquashfs { status.code().unwrap() )) } else { + let metadata = fs::metadata(&file).await?; progress - .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - }) + .update(|progress| progress.complete(metadata.size())) .await; Ok(()) } @@ -136,12 +129,10 @@ pub struct OciPackerMkfsErofs {} #[async_trait::async_trait] impl OciPackerBackend for OciPackerMkfsErofs { - async fn pack(&self, progress: OciBoundProgress, vfs: Arc, path: &Path) -> Result<()> { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; @@ -149,7 +140,7 @@ impl OciPackerBackend for OciPackerMkfsErofs { .arg("-L") .arg("root") .arg("--tar=-") - .arg(path) + .arg(file) .stdin(Stdio::piped()) .stderr(Stdio::null()) .stdout(Stdio::null()) @@ -200,11 +191,10 @@ impl OciPackerBackend for OciPackerMkfsErofs { status.code().unwrap() )) } else { + let metadata = fs::metadata(&file).await?; progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; + progress.complete(metadata.size()); }) .await; Ok(()) @@ -219,20 +209,18 @@ impl OciPackerBackend for OciPackerTar { async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; - let file = File::create(file).await?; - vfs.write_to_tar(file).await?; + let output = File::create(file).await?; + let output = BufWriter::new(output); + vfs.write_to_tar(output).await?; + let metadata = fs::metadata(file).await?; progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; + progress.complete(metadata.size()); }) .await; Ok(()) diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 837feb0..9f1840d 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -63,6 +63,7 @@ impl OciPackerService { &self, name: ImageName, format: OciPackedFormat, + overwrite: bool, progress_context: OciProgressContext, ) -> Result { let progress = OciProgress::new(); @@ -86,7 +87,14 @@ impl OciPackerService { Entry::Vacant(entry) => { let task = self .clone() - .launch(key.clone(), format, resolved, fetcher, progress.clone()) + .launch( + key.clone(), + format, + overwrite, + resolved, + fetcher, + progress.clone(), + ) .await; let (watch, receiver) = watch::channel(None); @@ -126,6 +134,7 @@ impl OciPackerService { self, key: OciPackerTaskKey, format: OciPackedFormat, + overwrite: bool, resolved: OciResolvedImage, fetcher: OciImageFetcher, progress: OciBoundProgress, @@ -137,7 +146,7 @@ impl OciPackerService { service.ensure_task_gone(key); }); if let Err(error) = self - .task(key.clone(), format, resolved, fetcher, progress) + .task(key.clone(), format, overwrite, resolved, fetcher, progress) .await { self.finish(&key, Err(error)).await; @@ -149,13 +158,16 @@ impl OciPackerService { &self, key: OciPackerTaskKey, format: OciPackedFormat, + overwrite: bool, resolved: OciResolvedImage, fetcher: OciImageFetcher, progress: OciBoundProgress, ) -> Result<()> { - if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { - self.finish(&key, Ok(cached)).await; - return Ok(()); + if !overwrite { + if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { + self.finish(&key, Ok(cached)).await; + return Ok(()); + } } let assembler = OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index e5e22e3..68b675a 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,18 +1,16 @@ use indexmap::IndexMap; use std::sync::Arc; use tokio::{ - sync::{broadcast, Mutex}, + sync::{watch, Mutex}, task::JoinHandle, }; -const OCI_PROGRESS_QUEUE_LEN: usize = 100; - #[derive(Clone, Debug)] pub struct OciProgress { pub phase: OciProgressPhase, + pub digest: Option, pub layers: IndexMap, - pub value: u64, - pub total: u64, + pub indication: OciProgressIndication, } impl Default for OciProgress { @@ -24,72 +22,146 @@ impl Default for OciProgress { impl OciProgress { pub fn new() -> Self { OciProgress { - phase: OciProgressPhase::Resolving, + phase: OciProgressPhase::Started, + digest: None, layers: IndexMap::new(), - value: 0, - total: 1, + indication: OciProgressIndication::Hidden, } } - pub fn add_layer(&mut self, id: &str, size: usize) { + pub fn start_resolving(&mut self) { + self.phase = OciProgressPhase::Resolving; + self.indication = OciProgressIndication::Spinner { message: None }; + } + + pub fn resolved(&mut self, digest: &str) { + self.digest = Some(digest.to_string()); + self.indication = OciProgressIndication::Hidden; + } + + pub fn add_layer(&mut self, id: &str) { self.layers.insert( id.to_string(), OciProgressLayer { id: id.to_string(), phase: OciProgressLayerPhase::Waiting, - value: 0, - total: size as u64, + indication: OciProgressIndication::Spinner { message: None }, }, ); } - pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) { + pub fn downloading_layer(&mut self, id: &str, downloaded: u64, total: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloading; - entry.value = downloaded as u64; - entry.total = total as u64; + entry.indication = OciProgressIndication::ProgressBar { + message: None, + current: downloaded, + total, + bytes: true, + }; } } - pub fn downloaded_layer(&mut self, id: &str) { + pub fn downloaded_layer(&mut self, id: &str, total: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloaded; - entry.value = entry.total; + entry.indication = OciProgressIndication::Completed { + message: None, + total: Some(total), + bytes: true, + }; } } - pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) { + pub fn start_assemble(&mut self) { + self.phase = OciProgressPhase::Assemble; + self.indication = OciProgressIndication::Hidden; + } + + pub fn start_extracting_layer(&mut self, id: &str) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracting; - entry.value = extracted as u64; - entry.total = total as u64; + entry.indication = OciProgressIndication::Spinner { message: None }; } } - pub fn extracted_layer(&mut self, id: &str) { + pub fn extracting_layer(&mut self, id: &str, file: &str) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Extracting; + entry.indication = OciProgressIndication::Spinner { + message: Some(file.to_string()), + }; + } + } + + pub fn extracted_layer(&mut self, id: &str, count: u64, total_size: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracted; - entry.value = entry.total; + entry.indication = OciProgressIndication::Completed { + message: Some(format!("{} files", count)), + total: Some(total_size), + bytes: true, + }; + } + } + + pub fn start_packing(&mut self) { + self.phase = OciProgressPhase::Pack; + for layer in self.layers.values_mut() { + layer.indication = OciProgressIndication::Hidden; + } + self.indication = OciProgressIndication::Spinner { message: None }; + } + + pub fn complete(&mut self, size: u64) { + self.phase = OciProgressPhase::Complete; + self.indication = OciProgressIndication::Completed { + message: None, + total: Some(size), + bytes: true, } } } #[derive(Clone, Debug)] pub enum OciProgressPhase { + Started, Resolving, Resolved, - ConfigAcquire, - LayerAcquire, - Packing, + ConfigDownload, + LayerDownload, + Assemble, + Pack, Complete, } +#[derive(Clone, Debug)] +pub enum OciProgressIndication { + Hidden, + + ProgressBar { + message: Option, + current: u64, + total: u64, + bytes: bool, + }, + + Spinner { + message: Option, + }, + + Completed { + message: Option, + total: Option, + bytes: bool, + }, +} + #[derive(Clone, Debug)] pub struct OciProgressLayer { pub id: String, pub phase: OciProgressLayerPhase, - pub value: u64, - pub total: u64, + pub indication: OciProgressIndication, } #[derive(Clone, Debug)] @@ -103,16 +175,16 @@ pub enum OciProgressLayerPhase { #[derive(Clone)] pub struct OciProgressContext { - sender: broadcast::Sender, + sender: watch::Sender, } impl OciProgressContext { - pub fn create() -> (OciProgressContext, broadcast::Receiver) { - let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN); + pub fn create() -> (OciProgressContext, watch::Receiver) { + let (sender, receiver) = watch::channel(OciProgress::new()); (OciProgressContext::new(sender), receiver) } - pub fn new(sender: broadcast::Sender) -> OciProgressContext { + pub fn new(sender: watch::Sender) -> OciProgressContext { OciProgressContext { sender } } @@ -120,7 +192,7 @@ impl OciProgressContext { let _ = self.sender.send(progress.clone()); } - pub fn subscribe(&self) -> broadcast::Receiver { + pub fn subscribe(&self) -> watch::Receiver { self.sender.subscribe() } } @@ -156,13 +228,10 @@ impl OciBoundProgress { context.update(&progress); let mut receiver = self.context.subscribe(); tokio::task::spawn(async move { - while let Ok(progress) = receiver.recv().await { - match context.sender.send(progress) { - Ok(_) => {} - Err(_) => { - break; - } - } + while (receiver.changed().await).is_ok() { + context + .sender + .send_replace(receiver.borrow_and_update().clone()); } }) } diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index 86597f6..e356000 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -149,24 +149,20 @@ impl OciRegistryClient { ))?; let mut response = self.call(self.agent.get(url.as_str())).await?; let mut size: u64 = 0; - let mut last_progress_size: u64 = 0; while let Some(chunk) = response.chunk().await? { dest.write_all(&chunk).await?; size += chunk.len() as u64; - if (size - last_progress_size) > (5 * 1024 * 1024) { - last_progress_size = size; - if let Some(ref progress) = progress { - progress - .update(|progress| { - progress.downloading_layer( - descriptor.digest(), - size as usize, - descriptor.size() as usize, - ); - }) - .await; - } + if let Some(ref progress) = progress { + progress + .update(|progress| { + progress.downloading_layer( + descriptor.digest(), + size, + descriptor.size() as u64, + ); + }) + .await; } } Ok(size) diff --git a/crates/oci/src/vfs.rs b/crates/oci/src/vfs.rs index 51c3e0c..e9964ef 100644 --- a/crates/oci/src/vfs.rs +++ b/crates/oci/src/vfs.rs @@ -194,7 +194,7 @@ impl VfsTree { } } - pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<()> { + pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<&VfsNode> { let mut meta = VfsNode::from(entry)?; let path = entry.path()?.to_path_buf(); let parent = if let Some(parent) = path.parent() { @@ -218,8 +218,11 @@ impl VfsTree { meta.children = old.children; } } - parent.children.push(meta); - Ok(()) + parent.children.push(meta.clone()); + let Some(reference) = parent.children.iter().find(|child| child.name == meta.name) else { + return Err(anyhow!("unable to find inserted child in vfs")); + }; + Ok(reference) } pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> {