From 8135307283d35f7f07396edf47c58231f75344da Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 16 Apr 2024 09:29:54 -0700 Subject: [PATCH] feat: oci concurrency improvements (#95) * feat: implement improved and detailed oci progress indication * feat: implement on-disk indexes of images * oci: utilize rw-lock for increased cache performance --- crates/ctl/src/cli/launch.rs | 5 +- crates/ctl/src/cli/pull.rs | 3 + crates/ctl/src/pull.rs | 324 ++++++++++++++++------ crates/daemon/src/control.rs | 24 +- crates/daemon/src/lib.rs | 2 +- crates/daemon/src/oci.rs | 88 ++++-- crates/krata/proto/krata/v1/control.proto | 79 ++++-- crates/oci/examples/squashify.rs | 30 +- crates/oci/src/assemble.rs | 34 ++- crates/oci/src/fetch.rs | 72 ++++- crates/oci/src/name.rs | 8 +- crates/oci/src/packer/backend.rs | 48 ++-- crates/oci/src/packer/cache.rs | 204 +++++++++++--- crates/oci/src/packer/mod.rs | 10 +- crates/oci/src/packer/service.rs | 62 ++++- crates/oci/src/progress.rs | 145 +++++++--- crates/oci/src/registry.rs | 33 +-- crates/oci/src/vfs.rs | 9 +- 18 files changed, 834 insertions(+), 346 deletions(-) diff --git a/crates/ctl/src/cli/launch.rs b/crates/ctl/src/cli/launch.rs index f91823b..e65f968 100644 --- a/crates/ctl/src/cli/launch.rs +++ b/crates/ctl/src/cli/launch.rs @@ -30,8 +30,10 @@ pub enum LaunchImageFormat { #[derive(Parser)] #[command(about = "Launch a new guest")] pub struct LauchCommand { - #[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] + #[arg(long, default_value = "squashfs", help = "Image format")] image_format: LaunchImageFormat, + #[arg(long, help = "Overwrite image cache on pull")] + pull_overwrite_cache: bool, #[arg(short, long, help = "Name of the guest")] name: Option, #[arg( @@ -85,6 +87,7 @@ impl LauchCommand { LaunchImageFormat::Squashfs => OciImageFormat::Squashfs.into(), LaunchImageFormat::Erofs => OciImageFormat::Erofs.into(), }, + overwrite_cache: self.pull_overwrite_cache, }) .await?; let reply = pull_interactive_progress(response.into_inner()).await?; diff --git a/crates/ctl/src/cli/pull.rs b/crates/ctl/src/cli/pull.rs index 9398c1d..3a63fe1 100644 --- a/crates/ctl/src/cli/pull.rs +++ b/crates/ctl/src/cli/pull.rs @@ -23,6 +23,8 @@ pub struct PullCommand { image: String, #[arg(short = 's', long, default_value = "squashfs", help = "Image format")] image_format: PullImageFormat, + #[arg(short = 'o', long, help = "Overwrite image cache")] + overwrite_cache: bool, } impl PullCommand { @@ -35,6 +37,7 @@ impl PullCommand { PullImageFormat::Erofs => OciImageFormat::Erofs.into(), PullImageFormat::Tar => OciImageFormat::Tar.into(), }, + overwrite_cache: self.overwrite_cache, }) .await?; let reply = pull_interactive_progress(response.into_inner()).await?; diff --git a/crates/ctl/src/pull.rs b/crates/ctl/src/pull.rs index e91df10..098d40e 100644 --- a/crates/ctl/src/pull.rs +++ b/crates/ctl/src/pull.rs @@ -1,20 +1,205 @@ -use std::collections::HashMap; +use std::{ + collections::{hash_map::Entry, HashMap}, + time::Duration, +}; use anyhow::{anyhow, Result}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use krata::v1::control::{PullImageProgressLayerPhase, PullImageProgressPhase, PullImageReply}; +use krata::v1::control::{ + image_progress_indication::Indication, ImageProgressIndication, ImageProgressLayerPhase, + ImageProgressPhase, PullImageReply, +}; use tokio_stream::StreamExt; use tonic::Streaming; +const SPINNER_STRINGS: &[&str] = &[ + "[= ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ = ]", + "[ =]", + "[====================]", +]; + +fn progress_bar_for_indication(indication: &ImageProgressIndication) -> Option { + match indication.indication.as_ref() { + Some(Indication::Hidden(_)) | None => None, + Some(Indication::Bar(indic)) => { + let bar = ProgressBar::new(indic.total); + bar.enable_steady_tick(Duration::from_millis(100)); + Some(bar) + } + Some(Indication::Spinner(_)) => { + let bar = ProgressBar::new_spinner(); + bar.enable_steady_tick(Duration::from_millis(100)); + Some(bar) + } + Some(Indication::Completed(indic)) => { + let bar = ProgressBar::new_spinner(); + bar.enable_steady_tick(Duration::from_millis(100)); + if !indic.message.is_empty() { + bar.finish_with_message(indic.message.clone()); + } else { + bar.finish() + } + Some(bar) + } + } +} + +fn configure_for_indication( + bar: &mut ProgressBar, + multi_progress: &mut MultiProgress, + indication: &ImageProgressIndication, + top_phase: Option, + layer_phase: Option, + layer_id: Option<&str>, +) { + let prefix = if let Some(phase) = top_phase { + match phase { + ImageProgressPhase::Unknown => "unknown", + ImageProgressPhase::Started => "started", + ImageProgressPhase::Resolving => "resolving", + ImageProgressPhase::Resolved => "resolved", + ImageProgressPhase::ConfigDownload => "downloading", + ImageProgressPhase::LayerDownload => "downloading", + ImageProgressPhase::Assemble => "assembling", + ImageProgressPhase::Pack => "packing", + ImageProgressPhase::Complete => "complete", + } + } else if let Some(phase) = layer_phase { + match phase { + ImageProgressLayerPhase::Unknown => "unknown", + ImageProgressLayerPhase::Waiting => "waiting", + ImageProgressLayerPhase::Downloading => "downloading", + ImageProgressLayerPhase::Downloaded => "downloaded", + ImageProgressLayerPhase::Extracting => "extracting", + ImageProgressLayerPhase::Extracted => "extracted", + } + } else { + "" + }; + let prefix = prefix.to_string(); + + let id = if let Some(layer_id) = layer_id { + let hash = if let Some((_, hash)) = layer_id.split_once(':') { + hash + } else { + "unknown" + }; + let small_hash = if hash.len() > 10 { &hash[0..10] } else { hash }; + Some(format!("{:width$}", small_hash, width = 10)) + } else { + None + }; + + let prefix = if let Some(id) = id { + format!("{} {:width$}", id, prefix, width = 11) + } else { + format!(" {:width$}", prefix, width = 11) + }; + + match indication.indication.as_ref() { + Some(Indication::Hidden(_)) | None => { + multi_progress.remove(bar); + return; + } + Some(Indication::Bar(indic)) => { + if indic.is_bytes { + bar.set_style(ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) eta: {eta}").unwrap().progress_chars("=>-")); + } else { + bar.set_style( + ProgressStyle::with_template( + "{prefix} [{bar:20} {msg} {human_pos}/{human_len} ({per_sec}/sec)", + ) + .unwrap() + .progress_chars("=>-"), + ); + } + bar.set_message(indic.message.clone()); + bar.set_position(indic.current); + bar.set_length(indic.total); + } + Some(Indication::Spinner(indic)) => { + bar.set_style( + ProgressStyle::with_template("{prefix} {spinner} {msg}") + .unwrap() + .tick_strings(SPINNER_STRINGS), + ); + bar.set_message(indic.message.clone()); + } + Some(Indication::Completed(indic)) => { + if bar.is_finished() { + return; + } + bar.disable_steady_tick(); + bar.set_message(indic.message.clone()); + if indic.total != 0 { + bar.set_position(indic.total); + bar.set_length(indic.total); + } + if bar.style().get_tick_str(0).contains('=') { + bar.set_style( + ProgressStyle::with_template("{prefix} {spinner} {msg}") + .unwrap() + .tick_strings(SPINNER_STRINGS), + ); + bar.finish_with_message(indic.message.clone()); + } else if indic.is_bytes { + bar.set_style( + ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_total_bytes}") + .unwrap() + .progress_chars("=>-"), + ); + } else { + bar.set_style( + ProgressStyle::with_template("{prefix} [{bar:20}] {msg}") + .unwrap() + .progress_chars("=>-"), + ); + } + bar.tick(); + bar.enable_steady_tick(Duration::from_millis(100)); + } + }; + + bar.set_prefix(prefix); + bar.tick(); +} + pub async fn pull_interactive_progress( mut stream: Streaming, ) -> Result { - let mut multi_progress: Option<(MultiProgress, HashMap)> = None; + let mut multi_progress = MultiProgress::new(); + multi_progress.set_move_cursor(false); + let mut progresses = HashMap::new(); while let Some(reply) = stream.next().await { - let reply = reply?; + let reply = match reply { + Ok(reply) => reply, + Err(error) => { + multi_progress.clear()?; + return Err(error.into()); + } + }; if reply.progress.is_none() && !reply.digest.is_empty() { + multi_progress.clear()?; return Ok(reply); } @@ -22,97 +207,62 @@ pub async fn pull_interactive_progress( continue; }; - if multi_progress.is_none() { - multi_progress = Some((MultiProgress::new(), HashMap::new())); + for layer in &oci.layers { + let Some(ref indication) = layer.indication else { + continue; + }; + + let bar = match progresses.entry(layer.id.clone()) { + Entry::Occupied(entry) => Some(entry.into_mut()), + + Entry::Vacant(entry) => { + if let Some(bar) = progress_bar_for_indication(indication) { + multi_progress.add(bar.clone()); + Some(entry.insert(bar)) + } else { + None + } + } + }; + + if let Some(bar) = bar { + configure_for_indication( + bar, + &mut multi_progress, + indication, + None, + Some(layer.phase()), + Some(&layer.id), + ); + } } - let Some((multi_progress, progresses)) = multi_progress.as_mut() else { - continue; - }; + if let Some(ref indication) = oci.indication { + let bar = match progresses.entry("root".to_string()) { + Entry::Occupied(entry) => Some(entry.into_mut()), - match oci.phase() { - PullImageProgressPhase::Resolved - | PullImageProgressPhase::ConfigAcquire - | PullImageProgressPhase::LayerAcquire => { - if progresses.is_empty() && !oci.layers.is_empty() { - for layer in &oci.layers { - let bar = ProgressBar::new(layer.total); - bar.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); - progresses.insert(layer.id.clone(), bar.clone()); - multi_progress.add(bar); - } - } - - for layer in oci.layers { - let Some(progress) = progresses.get_mut(&layer.id) else { - continue; - }; - - let phase = match layer.phase() { - PullImageProgressLayerPhase::Waiting => "waiting", - PullImageProgressLayerPhase::Downloading => "downloading", - PullImageProgressLayerPhase::Downloaded => "downloaded", - PullImageProgressLayerPhase::Extracting => "extracting", - PullImageProgressLayerPhase::Extracted => "extracted", - _ => "unknown", - }; - - let simple = if let Some((_, hash)) = layer.id.split_once(':') { - hash + Entry::Vacant(entry) => { + if let Some(bar) = progress_bar_for_indication(indication) { + multi_progress.add(bar.clone()); + Some(entry.insert(bar)) } else { - "unknown" - }; - let simple = if simple.len() > 10 { - &simple[0..10] - } else { - simple - }; - let message = format!( - "{:width$} {:phwidth$}", - simple, - phase, - width = 10, - phwidth = 11 - ); - - if message != progress.message() { - progress.set_message(message); + None } - - progress.update(|state| { - state.set_len(layer.total); - state.set_pos(layer.value); - }); } + }; + + if let Some(bar) = bar { + configure_for_indication( + bar, + &mut multi_progress, + indication, + Some(oci.phase()), + None, + None, + ); } - - PullImageProgressPhase::Packing => { - for (key, bar) in &mut *progresses { - if key == "packing" { - continue; - } - bar.finish_and_clear(); - multi_progress.remove(bar); - } - progresses.retain(|k, _| k == "packing"); - if progresses.is_empty() { - let progress = ProgressBar::new(100); - progress.set_message("packing "); - progress.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap()); - progresses.insert("packing".to_string(), progress); - } - let Some(progress) = progresses.get("packing") else { - continue; - }; - - progress.update(|state| { - state.set_len(oci.total); - state.set_pos(oci.value); - }); - } - - _ => {} } } + multi_progress.clear()?; Err(anyhow!("never received final reply for image pull")) } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 44ae3b9..a6bb54d 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -372,7 +372,7 @@ impl ControlService for DaemonControlService { let output = try_stream! { let mut task = tokio::task::spawn(async move { - our_packer.request(name, format, context).await + our_packer.request(name, format, request.overwrite_cache, context).await }); let abort_handle = task.abort_handle(); let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { @@ -381,26 +381,14 @@ impl ControlService for DaemonControlService { loop { let what = select! { - x = receiver.recv() => PullImageSelect::Progress(x.ok()), + x = receiver.changed() => match x { + Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())), + Err(_) => PullImageSelect::Progress(None), + }, x = &mut task => PullImageSelect::Completed(x), }; match what { - PullImageSelect::Progress(Some(mut progress)) => { - let mut drain = 0; - loop { - if drain >= 10 { - break; - } - - if let Ok(latest) = receiver.try_recv() { - progress = latest; - } else { - break; - } - - drain += 1; - } - + PullImageSelect::Progress(Some(progress)) => { let reply = PullImageReply { progress: Some(convert_oci_progress(progress)), digest: String::new(), diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index e81901f..4c605a0 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -51,7 +51,7 @@ impl Daemon { image_cache_dir.push("image"); fs::create_dir_all(&image_cache_dir).await?; - let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current())?; + let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?; let runtime = Runtime::new(store.clone()).await?; let guests_db_path = format!("{}/guests.db", store); diff --git a/crates/daemon/src/oci.rs b/crates/daemon/src/oci.rs index 45945a2..0a601ef 100644 --- a/crates/daemon/src/oci.rs +++ b/crates/daemon/src/oci.rs @@ -1,33 +1,72 @@ use krata::v1::control::{ - PullImageProgress, PullImageProgressLayer, PullImageProgressLayerPhase, PullImageProgressPhase, + image_progress_indication::Indication, ImageProgress, ImageProgressIndication, + ImageProgressIndicationBar, ImageProgressIndicationCompleted, ImageProgressIndicationHidden, + ImageProgressIndicationSpinner, ImageProgressLayer, ImageProgressLayerPhase, + ImageProgressPhase, +}; +use krataoci::progress::{ + OciProgress, OciProgressIndication, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase, }; -use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase}; -fn convert_oci_layer_progress(layer: OciProgressLayer) -> PullImageProgressLayer { - PullImageProgressLayer { - id: layer.id, - phase: match layer.phase { - OciProgressLayerPhase::Waiting => PullImageProgressLayerPhase::Waiting, - OciProgressLayerPhase::Downloading => PullImageProgressLayerPhase::Downloading, - OciProgressLayerPhase::Downloaded => PullImageProgressLayerPhase::Downloaded, - OciProgressLayerPhase::Extracting => PullImageProgressLayerPhase::Extracting, - OciProgressLayerPhase::Extracted => PullImageProgressLayerPhase::Extracted, - } - .into(), - value: layer.value, - total: layer.total, +fn convert_oci_progress_indication(indication: OciProgressIndication) -> ImageProgressIndication { + ImageProgressIndication { + indication: Some(match indication { + OciProgressIndication::Hidden => Indication::Hidden(ImageProgressIndicationHidden {}), + OciProgressIndication::ProgressBar { + message, + current, + total, + bytes, + } => Indication::Bar(ImageProgressIndicationBar { + message: message.unwrap_or_default(), + current, + total, + is_bytes: bytes, + }), + OciProgressIndication::Spinner { message } => { + Indication::Spinner(ImageProgressIndicationSpinner { + message: message.unwrap_or_default(), + }) + } + OciProgressIndication::Completed { + message, + total, + bytes, + } => Indication::Completed(ImageProgressIndicationCompleted { + message: message.unwrap_or_default(), + total: total.unwrap_or(0), + is_bytes: bytes, + }), + }), } } -pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { - PullImageProgress { +fn convert_oci_layer_progress(layer: OciProgressLayer) -> ImageProgressLayer { + ImageProgressLayer { + id: layer.id, + phase: match layer.phase { + OciProgressLayerPhase::Waiting => ImageProgressLayerPhase::Waiting, + OciProgressLayerPhase::Downloading => ImageProgressLayerPhase::Downloading, + OciProgressLayerPhase::Downloaded => ImageProgressLayerPhase::Downloaded, + OciProgressLayerPhase::Extracting => ImageProgressLayerPhase::Extracting, + OciProgressLayerPhase::Extracted => ImageProgressLayerPhase::Extracted, + } + .into(), + indication: Some(convert_oci_progress_indication(layer.indication)), + } +} + +pub fn convert_oci_progress(oci: OciProgress) -> ImageProgress { + ImageProgress { phase: match oci.phase { - OciProgressPhase::Resolving => PullImageProgressPhase::Resolving, - OciProgressPhase::Resolved => PullImageProgressPhase::Resolved, - OciProgressPhase::ConfigAcquire => PullImageProgressPhase::ConfigAcquire, - OciProgressPhase::LayerAcquire => PullImageProgressPhase::LayerAcquire, - OciProgressPhase::Packing => PullImageProgressPhase::Packing, - OciProgressPhase::Complete => PullImageProgressPhase::Complete, + OciProgressPhase::Started => ImageProgressPhase::Started, + OciProgressPhase::Resolving => ImageProgressPhase::Resolving, + OciProgressPhase::Resolved => ImageProgressPhase::Resolved, + OciProgressPhase::ConfigDownload => ImageProgressPhase::ConfigDownload, + OciProgressPhase::LayerDownload => ImageProgressPhase::LayerDownload, + OciProgressPhase::Assemble => ImageProgressPhase::Assemble, + OciProgressPhase::Pack => ImageProgressPhase::Pack, + OciProgressPhase::Complete => ImageProgressPhase::Complete, } .into(), layers: oci @@ -35,7 +74,6 @@ pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { .into_values() .map(convert_oci_layer_progress) .collect::>(), - value: oci.value, - total: oci.total, + indication: Some(convert_oci_progress_indication(oci.indication)), } } diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 692eba8..a28f660 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -89,46 +89,75 @@ message SnoopIdmReply { krata.bus.idm.IdmPacket packet = 3; } -enum PullImageProgressLayerPhase { - PULL_IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; - PULL_IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; - PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; - PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; - PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; - PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; +message ImageProgress { + ImageProgressPhase phase = 1; + repeated ImageProgressLayer layers = 2; + ImageProgressIndication indication = 3; } -message PullImageProgressLayer { +enum ImageProgressPhase { + IMAGE_PROGRESS_PHASE_UNKNOWN = 0; + IMAGE_PROGRESS_PHASE_STARTED = 1; + IMAGE_PROGRESS_PHASE_RESOLVING = 2; + IMAGE_PROGRESS_PHASE_RESOLVED = 3; + IMAGE_PROGRESS_PHASE_CONFIG_DOWNLOAD = 4; + IMAGE_PROGRESS_PHASE_LAYER_DOWNLOAD = 5; + IMAGE_PROGRESS_PHASE_ASSEMBLE = 6; + IMAGE_PROGRESS_PHASE_PACK = 7; + IMAGE_PROGRESS_PHASE_COMPLETE = 8; +} + +message ImageProgressLayer { string id = 1; - PullImageProgressLayerPhase phase = 2; - uint64 value = 3; - uint64 total = 4; + ImageProgressLayerPhase phase = 2; + ImageProgressIndication indication = 3; } -enum PullImageProgressPhase { - PULL_IMAGE_PROGRESS_PHASE_UNKNOWN = 0; - PULL_IMAGE_PROGRESS_PHASE_RESOLVING = 1; - PULL_IMAGE_PROGRESS_PHASE_RESOLVED = 2; - PULL_IMAGE_PROGRESS_PHASE_CONFIG_ACQUIRE = 3; - PULL_IMAGE_PROGRESS_PHASE_LAYER_ACQUIRE = 4; - PULL_IMAGE_PROGRESS_PHASE_PACKING = 5; - PULL_IMAGE_PROGRESS_PHASE_COMPLETE = 6; +enum ImageProgressLayerPhase { + IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; + IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; + IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; + IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3; + IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4; + IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5; } -message PullImageProgress { - PullImageProgressPhase phase = 1; - repeated PullImageProgressLayer layers = 2; - uint64 value = 3; - uint64 total = 4; +message ImageProgressIndication { + oneof indication { + ImageProgressIndicationBar bar = 1; + ImageProgressIndicationSpinner spinner = 2; + ImageProgressIndicationHidden hidden = 3; + ImageProgressIndicationCompleted completed = 4; + } +} + +message ImageProgressIndicationBar { + string message = 1; + uint64 current = 2; + uint64 total = 3; + bool is_bytes = 4; +} + +message ImageProgressIndicationSpinner { + string message = 1; +} + +message ImageProgressIndicationHidden {} + +message ImageProgressIndicationCompleted { + string message = 1; + uint64 total = 2; + bool is_bytes = 3; } message PullImageRequest { string image = 1; krata.v1.common.OciImageFormat format = 2; + bool overwrite_cache = 3; } message PullImageReply { - PullImageProgress progress = 1; + ImageProgress progress = 1; string digest = 2; krata.v1.common.OciImageFormat format = 3; } diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 777c951..c6361c1 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -25,37 +25,19 @@ async fn main() -> Result<()> { let (context, mut receiver) = OciProgressContext::create(); tokio::task::spawn(async move { loop { - let Ok(mut progress) = receiver.recv().await else { - return; - }; - - let mut drain = 0; - loop { - if drain >= 10 { - break; - } - - if let Ok(latest) = receiver.try_recv() { - progress = latest; - } else { - break; - } - - drain += 1; + if (receiver.changed().await).is_err() { + break; } - + let progress = receiver.borrow_and_update(); println!("phase {:?}", progress.phase); for (id, layer) in &progress.layers { - println!( - "{} {:?} {} of {}", - id, layer.phase, layer.value, layer.total - ) + println!("{} {:?} {:?}", id, layer.phase, layer.indication,) } } }); - let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; + let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current()).await?; let packed = service - .request(image.clone(), OciPackedFormat::Squashfs, context) + .request(image.clone(), OciPackedFormat::Squashfs, false, context) .await?; println!( "generated squashfs of {} to {}", diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 2931f3c..97c89e5 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -1,23 +1,23 @@ -use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; +use crate::fetch::{OciImageFetcher, OciImageLayer, OciImageLayerReader, OciResolvedImage}; use crate::progress::OciBoundProgress; use crate::schema::OciSchema; use crate::vfs::{VfsNode, VfsTree}; use anyhow::{anyhow, Result}; use log::{debug, trace, warn}; -use oci_spec::image::{ImageConfiguration, ImageManifest}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest}; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs; -use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_tar::{Archive, Entry}; use uuid::Uuid; pub struct OciImageAssembled { pub digest: String, + pub descriptor: Descriptor, pub manifest: OciSchema, pub config: OciSchema, pub vfs: Arc, @@ -115,12 +115,14 @@ impl OciImageAssembler { ); self.progress .update(|progress| { - progress.extracting_layer(&layer.digest, 0, 1); + progress.start_extracting_layer(&layer.digest); }) .await; debug!("process layer digest={}", &layer.digest,); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; + let mut count = 0u64; + let mut size = 0u64; while let Some(entry) = entries.next().await { let mut entry = entry?; let path = entry.path()?; @@ -134,14 +136,21 @@ impl OciImageAssembler { self.process_whiteout_entry(&mut vfs, &entry, name, layer) .await?; } else { - vfs.insert_tar_entry(&entry)?; - self.process_write_entry(&mut vfs, &mut entry, layer) + let reference = vfs.insert_tar_entry(&entry)?; + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, &reference.name); + }) + .await; + size += self + .process_write_entry(&mut vfs, &mut entry, layer) .await?; + count += 1; } } self.progress .update(|progress| { - progress.extracted_layer(&layer.digest); + progress.extracted_layer(&layer.digest, count, size); }) .await; } @@ -157,6 +166,7 @@ impl OciImageAssembler { let assembled = OciImageAssembled { vfs: Arc::new(vfs), + descriptor: resolved.descriptor, digest: resolved.digest, manifest: resolved.manifest, config: local.config, @@ -169,7 +179,7 @@ impl OciImageAssembler { async fn process_whiteout_entry( &self, vfs: &mut VfsTree, - entry: &Entry>>>, + entry: &Entry>>>, name: &str, layer: &OciImageLayer, ) -> Result<()> { @@ -210,11 +220,11 @@ impl OciImageAssembler { async fn process_write_entry( &self, vfs: &mut VfsTree, - entry: &mut Entry>>>, + entry: &mut Entry>>>, layer: &OciImageLayer, - ) -> Result<()> { + ) -> Result { if !entry.header().entry_type().is_file() { - return Ok(()); + return Ok(0); } trace!( "unpack entry layer={} path={:?} type={:?}", @@ -230,7 +240,7 @@ impl OciImageAssembler { .await? .ok_or(anyhow!("unpack did not return a path"))?; vfs.set_disk_path(&entry.path()?, &path)?; - Ok(()) + Ok(entry.header().size()?) } } diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index 05817d7..fc4ce4a 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -10,6 +10,8 @@ use super::{ use std::{ fmt::Debug, + io::SeekFrom, + os::unix::fs::MetadataExt, path::{Path, PathBuf}, pin::Pin, }; @@ -18,12 +20,13 @@ use anyhow::{anyhow, Result}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use log::debug; use oci_spec::image::{ - Descriptor, ImageConfiguration, ImageIndex, ImageManifest, MediaType, ToDockerV2S2, + Descriptor, DescriptorBuilder, ImageConfiguration, ImageIndex, ImageManifest, MediaType, + ToDockerV2S2, }; use serde::de::DeserializeOwned; use tokio::{ - fs::File, - io::{AsyncRead, AsyncReadExt, BufReader, BufWriter}, + fs::{self, File}, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter}, }; use tokio_stream::StreamExt; use tokio_tar::Archive; @@ -43,16 +46,43 @@ pub enum OciImageLayerCompression { #[derive(Clone, Debug)] pub struct OciImageLayer { + pub metadata: Descriptor, pub path: PathBuf, pub digest: String, pub compression: OciImageLayerCompression, } +#[async_trait::async_trait] +pub trait OciImageLayerReader: AsyncRead + Sync { + async fn position(&mut self) -> Result; +} + +#[async_trait::async_trait] +impl OciImageLayerReader for BufReader { + async fn position(&mut self) -> Result { + Ok(self.seek(SeekFrom::Current(0)).await?) + } +} + +#[async_trait::async_trait] +impl OciImageLayerReader for GzipDecoder> { + async fn position(&mut self) -> Result { + self.get_mut().position().await + } +} + +#[async_trait::async_trait] +impl OciImageLayerReader for ZstdDecoder> { + async fn position(&mut self) -> Result { + self.get_mut().position().await + } +} + impl OciImageLayer { - pub async fn decompress(&self) -> Result>> { + pub async fn decompress(&self) -> Result>> { let file = File::open(&self.path).await?; let reader = BufReader::new(file); - let reader: Pin> = match self.compression { + let reader: Pin> = match self.compression { OciImageLayerCompression::None => Box::pin(reader), OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)), OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)), @@ -60,7 +90,7 @@ impl OciImageLayer { Ok(reader) } - pub async fn archive(&self) -> Result>>> { + pub async fn archive(&self) -> Result>>> { let decompress = self.decompress().await?; Ok(Archive::new(decompress)) } @@ -70,6 +100,7 @@ impl OciImageLayer { pub struct OciResolvedImage { pub name: ImageName, pub digest: String, + pub descriptor: Descriptor, pub manifest: OciSchema, } @@ -199,6 +230,7 @@ impl OciImageFetcher { ); return Ok(OciResolvedImage { name: image, + descriptor: found.clone(), digest: found.digest().clone(), manifest, }); @@ -207,11 +239,20 @@ impl OciImageFetcher { } let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?; - let (manifest, digest) = client + let (manifest, descriptor, digest) = client .get_manifest_with_digest(&image.name, &image.reference) .await?; + let descriptor = descriptor.unwrap_or_else(|| { + DescriptorBuilder::default() + .media_type(MediaType::ImageManifest) + .size(manifest.raw().len() as i64) + .digest(digest.clone()) + .build() + .unwrap() + }); Ok(OciResolvedImage { name: image, + descriptor, digest, manifest, }) @@ -225,7 +266,7 @@ impl OciImageFetcher { let config: OciSchema; self.progress .update(|progress| { - progress.phase = OciProgressPhase::ConfigAcquire; + progress.phase = OciProgressPhase::ConfigDownload; }) .await; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; @@ -245,10 +286,10 @@ impl OciImageFetcher { } self.progress .update(|progress| { - progress.phase = OciProgressPhase::LayerAcquire; + progress.phase = OciProgressPhase::LayerDownload; for layer in image.manifest.item().layers() { - progress.add_layer(layer.digest(), layer.size() as usize); + progress.add_layer(layer.digest()); } }) .await; @@ -256,7 +297,7 @@ impl OciImageFetcher { for layer in image.manifest.item().layers() { self.progress .update(|progress| { - progress.downloading_layer(layer.digest(), 0, layer.size() as usize); + progress.downloading_layer(layer.digest(), 0, layer.size() as u64); }) .await; layers.push( @@ -265,7 +306,7 @@ impl OciImageFetcher { ); self.progress .update(|progress| { - progress.downloaded_layer(layer.digest()); + progress.downloaded_layer(layer.digest(), layer.size() as u64); }) .await; } @@ -304,6 +345,12 @@ impl OciImageFetcher { } } + let metadata = fs::metadata(&layer_path).await?; + + if layer.size() as u64 != metadata.size() { + return Err(anyhow!("layer size differs from size in manifest",)); + } + let mut media_type = layer.media_type().clone(); // docker layer compatibility @@ -318,6 +365,7 @@ impl OciImageFetcher { other => return Err(anyhow!("found layer with unknown media type: {}", other)), }; Ok(OciImageLayer { + metadata: layer.clone(), path: layer_path, digest: layer.digest().clone(), compression, diff --git a/crates/oci/src/name.rs b/crates/oci/src/name.rs index 8dffbcf..ab92da1 100644 --- a/crates/oci/src/name.rs +++ b/crates/oci/src/name.rs @@ -15,7 +15,13 @@ pub struct ImageName { impl fmt::Display for ImageName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(port) = self.port { + if DOCKER_HUB_MIRROR == self.hostname && self.port.is_none() { + if self.name.starts_with("library/") { + write!(f, "{}:{}", &self.name[8..], self.reference) + } else { + write!(f, "{}:{}", self.name, self.reference) + } + } else if let Some(port) = self.port { write!( f, "{}:{}/{}:{}", diff --git a/crates/oci/src/packer/backend.rs b/crates/oci/src/packer/backend.rs index 2218bc8..f82982f 100644 --- a/crates/oci/src/packer/backend.rs +++ b/crates/oci/src/packer/backend.rs @@ -1,14 +1,12 @@ -use std::{path::Path, process::Stdio, sync::Arc}; +use std::{os::unix::fs::MetadataExt, path::Path, process::Stdio, sync::Arc}; use super::OciPackedFormat; -use crate::{ - progress::{OciBoundProgress, OciProgressPhase}, - vfs::VfsTree, -}; +use crate::{progress::OciBoundProgress, vfs::VfsTree}; use anyhow::{anyhow, Result}; use log::warn; use tokio::{ - fs::File, + fs::{self, File}, + io::BufWriter, pin, process::{Child, Command}, select, @@ -55,9 +53,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; @@ -120,12 +116,9 @@ impl OciPackerBackend for OciPackerMkSquashfs { status.code().unwrap() )) } else { + let metadata = fs::metadata(&file).await?; progress - .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - }) + .update(|progress| progress.complete(metadata.size())) .await; Ok(()) } @@ -136,12 +129,10 @@ pub struct OciPackerMkfsErofs {} #[async_trait::async_trait] impl OciPackerBackend for OciPackerMkfsErofs { - async fn pack(&self, progress: OciBoundProgress, vfs: Arc, path: &Path) -> Result<()> { + async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; @@ -149,7 +140,7 @@ impl OciPackerBackend for OciPackerMkfsErofs { .arg("-L") .arg("root") .arg("--tar=-") - .arg(path) + .arg(file) .stdin(Stdio::piped()) .stderr(Stdio::null()) .stdout(Stdio::null()) @@ -200,11 +191,10 @@ impl OciPackerBackend for OciPackerMkfsErofs { status.code().unwrap() )) } else { + let metadata = fs::metadata(&file).await?; progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; + progress.complete(metadata.size()); }) .await; Ok(()) @@ -219,20 +209,18 @@ impl OciPackerBackend for OciPackerTar { async fn pack(&self, progress: OciBoundProgress, vfs: Arc, file: &Path) -> Result<()> { progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; + progress.start_packing(); }) .await; - let file = File::create(file).await?; - vfs.write_to_tar(file).await?; + let output = File::create(file).await?; + let output = BufWriter::new(output); + vfs.write_to_tar(output).await?; + let metadata = fs::metadata(file).await?; progress .update(|progress| { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; + progress.complete(metadata.size()); }) .await; Ok(()) diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs index 5e4d9c2..0155659 100644 --- a/crates/oci/src/packer/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,69 +1,121 @@ use crate::{ + name::ImageName, packer::{OciPackedFormat, OciPackedImage}, schema::OciSchema, }; use anyhow::Result; -use log::debug; -use oci_spec::image::{ImageConfiguration, ImageManifest}; -use std::path::{Path, PathBuf}; -use tokio::fs; +use log::{debug, error}; +use oci_spec::image::{ + Descriptor, ImageConfiguration, ImageIndex, ImageIndexBuilder, ImageManifest, MediaType, + ANNOTATION_REF_NAME, +}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; +use tokio::{fs, sync::RwLock}; #[derive(Clone)] pub struct OciPackerCache { cache_dir: PathBuf, + index: Arc>, } +const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name"; +const ANNOTATION_OCI_PACKER_FORMAT: &str = "dev.krata.oci.packer.format"; + impl OciPackerCache { - pub fn new(cache_dir: &Path) -> Result { - Ok(OciPackerCache { + pub async fn new(cache_dir: &Path) -> Result { + let index = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(Vec::new()) + .build()?; + let cache = OciPackerCache { cache_dir: cache_dir.to_path_buf(), - }) + index: Arc::new(RwLock::new(index)), + }; + + { + let mut mutex = cache.index.write().await; + *mutex = cache.load_index().await?; + } + + Ok(cache) + } + + pub async fn list(&self) -> Result> { + let index = self.index.read().await; + Ok(index.manifests().clone()) } pub async fn recall( &self, + name: ImageName, digest: &str, format: OciPackedFormat, ) -> Result> { + let index = self.index.read().await; + + let mut descriptor: Option = None; + for manifest in index.manifests() { + if manifest.digest() == digest + && manifest + .annotations() + .as_ref() + .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT)) + .map(|x| x.as_str()) + == Some(format.extension()) + { + descriptor = Some(manifest.clone()); + break; + } + } + + let Some(descriptor) = descriptor else { + return Ok(None); + }; + let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); fs_path.push(format!("{}.{}", digest, format.extension())); manifest_path.push(format!("{}.manifest.json", digest)); config_path.push(format!("{}.config.json", digest)); - Ok( - if fs_path.exists() && manifest_path.exists() && config_path.exists() { - let image_metadata = fs::metadata(&fs_path).await?; - let manifest_metadata = fs::metadata(&manifest_path).await?; - let config_metadata = fs::metadata(&config_path).await?; - if image_metadata.is_file() - && manifest_metadata.is_file() - && config_metadata.is_file() - { - let manifest_bytes = fs::read(&manifest_path).await?; - let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; - let config_bytes = fs::read(&config_path).await?; - let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; - debug!("cache hit digest={}", digest); - Some(OciPackedImage::new( - digest.to_string(), - fs_path.clone(), - format, - OciSchema::new(config_bytes, config), - OciSchema::new(manifest_bytes, manifest), - )) - } else { - None - } + + if fs_path.exists() && manifest_path.exists() && config_path.exists() { + let image_metadata = fs::metadata(&fs_path).await?; + let manifest_metadata = fs::metadata(&manifest_path).await?; + let config_metadata = fs::metadata(&config_path).await?; + if image_metadata.is_file() && manifest_metadata.is_file() && config_metadata.is_file() + { + let manifest_bytes = fs::read(&manifest_path).await?; + let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; + let config_bytes = fs::read(&config_path).await?; + let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; + debug!("cache hit digest={}", digest); + Ok(Some(OciPackedImage::new( + name, + digest.to_string(), + fs_path.clone(), + format, + descriptor, + OciSchema::new(config_bytes, config), + OciSchema::new(manifest_bytes, manifest), + ))) } else { - debug!("cache miss digest={}", digest); - None - }, - ) + Ok(None) + } + } else { + debug!("cache miss digest={}", digest); + Ok(None) + } } pub async fn store(&self, packed: OciPackedImage) -> Result { + let mut index = self.index.write().await; + let mut manifests = index.manifests().clone(); debug!("cache store digest={}", packed.digest); let mut fs_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -74,12 +126,90 @@ impl OciPackerCache { fs::rename(&packed.path, &fs_path).await?; fs::write(&config_path, packed.config.raw()).await?; fs::write(&manifest_path, packed.manifest.raw()).await?; - Ok(OciPackedImage::new( + manifests.retain(|item| { + if item.digest() != &packed.digest { + return true; + } + + let Some(format) = item + .annotations() + .as_ref() + .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT)) + .map(|x| x.as_str()) + else { + return true; + }; + + if format != packed.format.extension() { + return true; + } + + false + }); + + let mut descriptor = packed.descriptor.clone(); + let mut annotations = descriptor.annotations().clone().unwrap_or_default(); + annotations.insert( + ANNOTATION_OCI_PACKER_FORMAT.to_string(), + packed.format.extension().to_string(), + ); + let image_name = packed.name.to_string(); + annotations.insert(ANNOTATION_IMAGE_NAME.to_string(), image_name); + let image_ref = packed.name.reference.clone(); + annotations.insert(ANNOTATION_REF_NAME.to_string(), image_ref); + descriptor.set_annotations(Some(annotations)); + manifests.push(descriptor.clone()); + index.set_manifests(manifests); + self.save_index(&index).await?; + + let packed = OciPackedImage::new( + packed.name, packed.digest, fs_path.clone(), packed.format, + descriptor, packed.config, packed.manifest, - )) + ); + Ok(packed) + } + + async fn save_empty_index(&self) -> Result { + let index = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(Vec::new()) + .build()?; + self.save_index(&index).await?; + Ok(index) + } + + async fn load_index(&self) -> Result { + let mut index_path = self.cache_dir.clone(); + index_path.push("index.json"); + + if !index_path.exists() { + self.save_empty_index().await?; + } + + let content = fs::read_to_string(&index_path).await?; + let index = match serde_json::from_str::(&content) { + Ok(index) => index, + Err(error) => { + error!("image index was corrupted, creating a new one: {}", error); + self.save_empty_index().await? + } + }; + + Ok(index) + } + + async fn save_index(&self, index: &ImageIndex) -> Result<()> { + let mut encoded = serde_json::to_string_pretty(index)?; + encoded.push('\n'); + let mut index_path = self.cache_dir.clone(); + index_path.push("index.json"); + fs::write(&index_path, encoded).await?; + Ok(()) } } diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs index dd510ab..61b9fca 100644 --- a/crates/oci/src/packer/mod.rs +++ b/crates/oci/src/packer/mod.rs @@ -1,9 +1,9 @@ use std::path::PathBuf; -use crate::schema::OciSchema; +use crate::{name::ImageName, schema::OciSchema}; use self::backend::OciPackerBackendType; -use oci_spec::image::{ImageConfiguration, ImageManifest}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest}; pub mod backend; pub mod cache; @@ -37,25 +37,31 @@ impl OciPackedFormat { #[derive(Clone)] pub struct OciPackedImage { + pub name: ImageName, pub digest: String, pub path: PathBuf, pub format: OciPackedFormat, + pub descriptor: Descriptor, pub config: OciSchema, pub manifest: OciSchema, } impl OciPackedImage { pub fn new( + name: ImageName, digest: String, path: PathBuf, format: OciPackedFormat, + descriptor: Descriptor, config: OciSchema, manifest: OciSchema, ) -> OciPackedImage { OciPackedImage { + name, digest, path, format, + descriptor, config, manifest, } diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 837feb0..9fff54b 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +use oci_spec::image::Descriptor; use tokio::{ sync::{watch, Mutex}, task::JoinHandle, @@ -38,38 +39,45 @@ pub struct OciPackerService { } impl OciPackerService { - pub fn new( + pub async fn new( seed: Option, cache_dir: &Path, platform: OciPlatform, ) -> Result { Ok(OciPackerService { seed, - cache: OciPackerCache::new(cache_dir)?, + cache: OciPackerCache::new(cache_dir).await?, platform, tasks: Arc::new(Mutex::new(HashMap::new())), }) } + pub async fn list(&self) -> Result> { + self.cache.list().await + } + pub async fn recall( &self, digest: &str, format: OciPackedFormat, ) -> Result> { - self.cache.recall(digest, format).await + self.cache + .recall(ImageName::parse("cached:latest")?, digest, format) + .await } pub async fn request( &self, name: ImageName, format: OciPackedFormat, + overwrite: bool, progress_context: OciProgressContext, ) -> Result { let progress = OciProgress::new(); let progress = OciBoundProgress::new(progress_context.clone(), progress); let fetcher = OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); - let resolved = fetcher.resolve(name).await?; + let resolved = fetcher.resolve(name.clone()).await?; let key = OciPackerTaskKey { digest: resolved.digest.clone(), format, @@ -86,7 +94,15 @@ impl OciPackerService { Entry::Vacant(entry) => { let task = self .clone() - .launch(key.clone(), format, resolved, fetcher, progress.clone()) + .launch( + name, + key.clone(), + format, + overwrite, + resolved, + fetcher, + progress.clone(), + ) .await; let (watch, receiver) = watch::channel(None); @@ -122,22 +138,33 @@ impl OciPackerService { } } + #[allow(clippy::too_many_arguments)] async fn launch( self, + name: ImageName, key: OciPackerTaskKey, format: OciPackedFormat, + overwrite: bool, resolved: OciResolvedImage, fetcher: OciImageFetcher, progress: OciBoundProgress, ) -> JoinHandle<()> { - info!("packer task {} started", key); + info!("started packer task {}", key); tokio::task::spawn(async move { let _task_drop_guard = scopeguard::guard((key.clone(), self.clone()), |(key, service)| { service.ensure_task_gone(key); }); if let Err(error) = self - .task(key.clone(), format, resolved, fetcher, progress) + .task( + name, + key.clone(), + format, + overwrite, + resolved, + fetcher, + progress, + ) .await { self.finish(&key, Err(error)).await; @@ -145,17 +172,26 @@ impl OciPackerService { }) } + #[allow(clippy::too_many_arguments)] async fn task( &self, + name: ImageName, key: OciPackerTaskKey, format: OciPackedFormat, + overwrite: bool, resolved: OciResolvedImage, fetcher: OciImageFetcher, progress: OciBoundProgress, ) -> Result<()> { - if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { - self.finish(&key, Ok(cached)).await; - return Ok(()); + if !overwrite { + if let Some(cached) = self + .cache + .recall(name.clone(), &resolved.digest, format) + .await? + { + self.finish(&key, Ok(cached)).await; + return Ok(()); + } } let assembler = OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; @@ -171,9 +207,11 @@ impl OciPackerService { .pack(progress, assembled.vfs.clone(), &target) .await?; let packed = OciPackedImage::new( + name, assembled.digest.clone(), file, format, + assembled.descriptor.clone(), assembled.config.clone(), assembled.manifest.clone(), ); @@ -190,7 +228,7 @@ impl OciPackerService { match result.as_ref() { Ok(_) => { - info!("packer task {} completed", key); + info!("completed packer task {}", key); } Err(err) => { @@ -216,7 +254,7 @@ impl OciPackerService { tokio::task::spawn(async move { let mut tasks = self.tasks.lock().await; if let Some(task) = tasks.remove(&key) { - warn!("packer task {} aborted", key); + warn!("aborted packer task {}", key); task.watch.send_replace(Some(Err(anyhow!("task aborted")))); } }); diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index e5e22e3..68b675a 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,18 +1,16 @@ use indexmap::IndexMap; use std::sync::Arc; use tokio::{ - sync::{broadcast, Mutex}, + sync::{watch, Mutex}, task::JoinHandle, }; -const OCI_PROGRESS_QUEUE_LEN: usize = 100; - #[derive(Clone, Debug)] pub struct OciProgress { pub phase: OciProgressPhase, + pub digest: Option, pub layers: IndexMap, - pub value: u64, - pub total: u64, + pub indication: OciProgressIndication, } impl Default for OciProgress { @@ -24,72 +22,146 @@ impl Default for OciProgress { impl OciProgress { pub fn new() -> Self { OciProgress { - phase: OciProgressPhase::Resolving, + phase: OciProgressPhase::Started, + digest: None, layers: IndexMap::new(), - value: 0, - total: 1, + indication: OciProgressIndication::Hidden, } } - pub fn add_layer(&mut self, id: &str, size: usize) { + pub fn start_resolving(&mut self) { + self.phase = OciProgressPhase::Resolving; + self.indication = OciProgressIndication::Spinner { message: None }; + } + + pub fn resolved(&mut self, digest: &str) { + self.digest = Some(digest.to_string()); + self.indication = OciProgressIndication::Hidden; + } + + pub fn add_layer(&mut self, id: &str) { self.layers.insert( id.to_string(), OciProgressLayer { id: id.to_string(), phase: OciProgressLayerPhase::Waiting, - value: 0, - total: size as u64, + indication: OciProgressIndication::Spinner { message: None }, }, ); } - pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) { + pub fn downloading_layer(&mut self, id: &str, downloaded: u64, total: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloading; - entry.value = downloaded as u64; - entry.total = total as u64; + entry.indication = OciProgressIndication::ProgressBar { + message: None, + current: downloaded, + total, + bytes: true, + }; } } - pub fn downloaded_layer(&mut self, id: &str) { + pub fn downloaded_layer(&mut self, id: &str, total: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Downloaded; - entry.value = entry.total; + entry.indication = OciProgressIndication::Completed { + message: None, + total: Some(total), + bytes: true, + }; } } - pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) { + pub fn start_assemble(&mut self) { + self.phase = OciProgressPhase::Assemble; + self.indication = OciProgressIndication::Hidden; + } + + pub fn start_extracting_layer(&mut self, id: &str) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracting; - entry.value = extracted as u64; - entry.total = total as u64; + entry.indication = OciProgressIndication::Spinner { message: None }; } } - pub fn extracted_layer(&mut self, id: &str) { + pub fn extracting_layer(&mut self, id: &str, file: &str) { + if let Some(entry) = self.layers.get_mut(id) { + entry.phase = OciProgressLayerPhase::Extracting; + entry.indication = OciProgressIndication::Spinner { + message: Some(file.to_string()), + }; + } + } + + pub fn extracted_layer(&mut self, id: &str, count: u64, total_size: u64) { if let Some(entry) = self.layers.get_mut(id) { entry.phase = OciProgressLayerPhase::Extracted; - entry.value = entry.total; + entry.indication = OciProgressIndication::Completed { + message: Some(format!("{} files", count)), + total: Some(total_size), + bytes: true, + }; + } + } + + pub fn start_packing(&mut self) { + self.phase = OciProgressPhase::Pack; + for layer in self.layers.values_mut() { + layer.indication = OciProgressIndication::Hidden; + } + self.indication = OciProgressIndication::Spinner { message: None }; + } + + pub fn complete(&mut self, size: u64) { + self.phase = OciProgressPhase::Complete; + self.indication = OciProgressIndication::Completed { + message: None, + total: Some(size), + bytes: true, } } } #[derive(Clone, Debug)] pub enum OciProgressPhase { + Started, Resolving, Resolved, - ConfigAcquire, - LayerAcquire, - Packing, + ConfigDownload, + LayerDownload, + Assemble, + Pack, Complete, } +#[derive(Clone, Debug)] +pub enum OciProgressIndication { + Hidden, + + ProgressBar { + message: Option, + current: u64, + total: u64, + bytes: bool, + }, + + Spinner { + message: Option, + }, + + Completed { + message: Option, + total: Option, + bytes: bool, + }, +} + #[derive(Clone, Debug)] pub struct OciProgressLayer { pub id: String, pub phase: OciProgressLayerPhase, - pub value: u64, - pub total: u64, + pub indication: OciProgressIndication, } #[derive(Clone, Debug)] @@ -103,16 +175,16 @@ pub enum OciProgressLayerPhase { #[derive(Clone)] pub struct OciProgressContext { - sender: broadcast::Sender, + sender: watch::Sender, } impl OciProgressContext { - pub fn create() -> (OciProgressContext, broadcast::Receiver) { - let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN); + pub fn create() -> (OciProgressContext, watch::Receiver) { + let (sender, receiver) = watch::channel(OciProgress::new()); (OciProgressContext::new(sender), receiver) } - pub fn new(sender: broadcast::Sender) -> OciProgressContext { + pub fn new(sender: watch::Sender) -> OciProgressContext { OciProgressContext { sender } } @@ -120,7 +192,7 @@ impl OciProgressContext { let _ = self.sender.send(progress.clone()); } - pub fn subscribe(&self) -> broadcast::Receiver { + pub fn subscribe(&self) -> watch::Receiver { self.sender.subscribe() } } @@ -156,13 +228,10 @@ impl OciBoundProgress { context.update(&progress); let mut receiver = self.context.subscribe(); tokio::task::spawn(async move { - while let Ok(progress) = receiver.recv().await { - match context.sender.send(progress) { - Ok(_) => {} - Err(_) => { - break; - } - } + while (receiver.changed().await).is_ok() { + context + .sender + .send_replace(receiver.borrow_and_update().clone()); } }) } diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index 86597f6..1874ae8 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -149,24 +149,20 @@ impl OciRegistryClient { ))?; let mut response = self.call(self.agent.get(url.as_str())).await?; let mut size: u64 = 0; - let mut last_progress_size: u64 = 0; while let Some(chunk) = response.chunk().await? { dest.write_all(&chunk).await?; size += chunk.len() as u64; - if (size - last_progress_size) > (5 * 1024 * 1024) { - last_progress_size = size; - if let Some(ref progress) = progress { - progress - .update(|progress| { - progress.downloading_layer( - descriptor.digest(), - size as usize, - descriptor.size() as usize, - ); - }) - .await; - } + if let Some(ref progress) = progress { + progress + .update(|progress| { + progress.downloading_layer( + descriptor.digest(), + size, + descriptor.size() as u64, + ); + }) + .await; } } Ok(size) @@ -207,7 +203,7 @@ impl OciRegistryClient { &mut self, name: N, reference: R, - ) -> Result<(OciSchema, String)> { + ) -> Result<(OciSchema, Option, String)> { let url = self.url.join(&format!( "/v2/{}/manifests/{}", name.as_ref(), @@ -235,9 +231,10 @@ impl OciRegistryClient { let descriptor = self .pick_manifest(index) .ok_or_else(|| anyhow!("unable to pick manifest from index"))?; - return self + let (manifest, digest) = self .get_raw_manifest_with_digest(name, descriptor.digest()) - .await; + .await?; + return Ok((manifest, Some(descriptor), digest)); } let digest = response .headers() @@ -247,7 +244,7 @@ impl OciRegistryClient { .to_string(); let bytes = response.bytes().await?; let manifest = serde_json::from_slice(&bytes)?; - Ok((OciSchema::new(bytes.to_vec(), manifest), digest)) + Ok((OciSchema::new(bytes.to_vec(), manifest), None, digest)) } fn pick_manifest(&mut self, index: ImageIndex) -> Option { diff --git a/crates/oci/src/vfs.rs b/crates/oci/src/vfs.rs index 51c3e0c..e9964ef 100644 --- a/crates/oci/src/vfs.rs +++ b/crates/oci/src/vfs.rs @@ -194,7 +194,7 @@ impl VfsTree { } } - pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<()> { + pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<&VfsNode> { let mut meta = VfsNode::from(entry)?; let path = entry.path()?.to_path_buf(); let parent = if let Some(parent) = path.parent() { @@ -218,8 +218,11 @@ impl VfsTree { meta.children = old.children; } } - parent.children.push(meta); - Ok(()) + parent.children.push(meta.clone()); + let Some(reference) = parent.children.iter().find(|child| child.name == meta.name) else { + return Err(anyhow!("unable to find inserted child in vfs")); + }; + Ok(reference) } pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> {