feat: implement improved and detailed oci progress indication

This commit is contained in:
Alex Zenla
2024-04-16 12:34:02 +00:00
parent e450ebd2a2
commit ee024580af
14 changed files with 595 additions and 290 deletions

View File

@ -30,8 +30,10 @@ pub enum LaunchImageFormat {
#[derive(Parser)] #[derive(Parser)]
#[command(about = "Launch a new guest")] #[command(about = "Launch a new guest")]
pub struct LauchCommand { pub struct LauchCommand {
#[arg(short = 'S', long, default_value = "squashfs", help = "Image format")] #[arg(long, default_value = "squashfs", help = "Image format")]
image_format: LaunchImageFormat, image_format: LaunchImageFormat,
#[arg(long, help = "Overwrite image cache on pull")]
pull_overwrite_cache: bool,
#[arg(short, long, help = "Name of the guest")] #[arg(short, long, help = "Name of the guest")]
name: Option<String>, name: Option<String>,
#[arg( #[arg(
@ -85,6 +87,7 @@ impl LauchCommand {
LaunchImageFormat::Squashfs => OciImageFormat::Squashfs.into(), LaunchImageFormat::Squashfs => OciImageFormat::Squashfs.into(),
LaunchImageFormat::Erofs => OciImageFormat::Erofs.into(), LaunchImageFormat::Erofs => OciImageFormat::Erofs.into(),
}, },
overwrite_cache: self.pull_overwrite_cache,
}) })
.await?; .await?;
let reply = pull_interactive_progress(response.into_inner()).await?; let reply = pull_interactive_progress(response.into_inner()).await?;

View File

@ -23,6 +23,8 @@ pub struct PullCommand {
image: String, image: String,
#[arg(short = 's', long, default_value = "squashfs", help = "Image format")] #[arg(short = 's', long, default_value = "squashfs", help = "Image format")]
image_format: PullImageFormat, image_format: PullImageFormat,
#[arg(short = 'o', long, help = "Overwrite image cache")]
overwrite_cache: bool,
} }
impl PullCommand { impl PullCommand {
@ -35,6 +37,7 @@ impl PullCommand {
PullImageFormat::Erofs => OciImageFormat::Erofs.into(), PullImageFormat::Erofs => OciImageFormat::Erofs.into(),
PullImageFormat::Tar => OciImageFormat::Tar.into(), PullImageFormat::Tar => OciImageFormat::Tar.into(),
}, },
overwrite_cache: self.overwrite_cache,
}) })
.await?; .await?;
let reply = pull_interactive_progress(response.into_inner()).await?; let reply = pull_interactive_progress(response.into_inner()).await?;

View File

@ -1,20 +1,205 @@
use std::collections::HashMap; use std::{
collections::{hash_map::Entry, HashMap},
time::Duration,
};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use krata::v1::control::{PullImageProgressLayerPhase, PullImageProgressPhase, PullImageReply}; use krata::v1::control::{
image_progress_indication::Indication, ImageProgressIndication, ImageProgressLayerPhase,
ImageProgressPhase, PullImageReply,
};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tonic::Streaming; use tonic::Streaming;
const SPINNER_STRINGS: &[&str] = &[
"[= ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ = ]",
"[ =]",
"[====================]",
];
fn progress_bar_for_indication(indication: &ImageProgressIndication) -> Option<ProgressBar> {
match indication.indication.as_ref() {
Some(Indication::Hidden(_)) | None => None,
Some(Indication::Bar(indic)) => {
let bar = ProgressBar::new(indic.total);
bar.enable_steady_tick(Duration::from_millis(100));
Some(bar)
}
Some(Indication::Spinner(_)) => {
let bar = ProgressBar::new_spinner();
bar.enable_steady_tick(Duration::from_millis(100));
Some(bar)
}
Some(Indication::Completed(indic)) => {
let bar = ProgressBar::new_spinner();
bar.enable_steady_tick(Duration::from_millis(100));
if !indic.message.is_empty() {
bar.finish_with_message(indic.message.clone());
} else {
bar.finish()
}
Some(bar)
}
}
}
fn configure_for_indication(
bar: &mut ProgressBar,
multi_progress: &mut MultiProgress,
indication: &ImageProgressIndication,
top_phase: Option<ImageProgressPhase>,
layer_phase: Option<ImageProgressLayerPhase>,
layer_id: Option<&str>,
) {
let prefix = if let Some(phase) = top_phase {
match phase {
ImageProgressPhase::Unknown => "unknown",
ImageProgressPhase::Started => "started",
ImageProgressPhase::Resolving => "resolving",
ImageProgressPhase::Resolved => "resolved",
ImageProgressPhase::ConfigDownload => "downloading",
ImageProgressPhase::LayerDownload => "downloading",
ImageProgressPhase::Assemble => "assembling",
ImageProgressPhase::Pack => "packing",
ImageProgressPhase::Complete => "complete",
}
} else if let Some(phase) = layer_phase {
match phase {
ImageProgressLayerPhase::Unknown => "unknown",
ImageProgressLayerPhase::Waiting => "waiting",
ImageProgressLayerPhase::Downloading => "downloading",
ImageProgressLayerPhase::Downloaded => "downloaded",
ImageProgressLayerPhase::Extracting => "extracting",
ImageProgressLayerPhase::Extracted => "extracted",
}
} else {
""
};
let prefix = prefix.to_string();
let id = if let Some(layer_id) = layer_id {
let hash = if let Some((_, hash)) = layer_id.split_once(':') {
hash
} else {
"unknown"
};
let small_hash = if hash.len() > 10 { &hash[0..10] } else { hash };
Some(format!("{:width$}", small_hash, width = 10))
} else {
None
};
let prefix = if let Some(id) = id {
format!("{} {:width$}", id, prefix, width = 11)
} else {
format!(" {:width$}", prefix, width = 11)
};
match indication.indication.as_ref() {
Some(Indication::Hidden(_)) | None => {
multi_progress.remove(bar);
return;
}
Some(Indication::Bar(indic)) => {
if indic.is_bytes {
bar.set_style(ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) eta: {eta}").unwrap().progress_chars("=>-"));
} else {
bar.set_style(
ProgressStyle::with_template(
"{prefix} [{bar:20} {msg} {human_pos}/{human_len} ({per_sec}/sec)",
)
.unwrap()
.progress_chars("=>-"),
);
}
bar.set_message(indic.message.clone());
bar.set_position(indic.current);
bar.set_length(indic.total);
}
Some(Indication::Spinner(indic)) => {
bar.set_style(
ProgressStyle::with_template("{prefix} {spinner} {msg}")
.unwrap()
.tick_strings(SPINNER_STRINGS),
);
bar.set_message(indic.message.clone());
}
Some(Indication::Completed(indic)) => {
if bar.is_finished() {
return;
}
bar.disable_steady_tick();
bar.set_message(indic.message.clone());
if indic.total != 0 {
bar.set_position(indic.total);
bar.set_length(indic.total);
}
if bar.style().get_tick_str(0).contains('=') {
bar.set_style(
ProgressStyle::with_template("{prefix} {spinner} {msg}")
.unwrap()
.tick_strings(SPINNER_STRINGS),
);
bar.finish_with_message(indic.message.clone());
} else if indic.is_bytes {
bar.set_style(
ProgressStyle::with_template("{prefix} [{bar:20}] {msg} {binary_total_bytes}")
.unwrap()
.progress_chars("=>-"),
);
} else {
bar.set_style(
ProgressStyle::with_template("{prefix} [{bar:20}] {msg}")
.unwrap()
.progress_chars("=>-"),
);
}
bar.tick();
bar.enable_steady_tick(Duration::from_millis(100));
}
};
bar.set_prefix(prefix);
bar.tick();
}
pub async fn pull_interactive_progress( pub async fn pull_interactive_progress(
mut stream: Streaming<PullImageReply>, mut stream: Streaming<PullImageReply>,
) -> Result<PullImageReply> { ) -> Result<PullImageReply> {
let mut multi_progress: Option<(MultiProgress, HashMap<String, ProgressBar>)> = None; let mut multi_progress = MultiProgress::new();
multi_progress.set_move_cursor(false);
let mut progresses = HashMap::new();
while let Some(reply) = stream.next().await { while let Some(reply) = stream.next().await {
let reply = reply?; let reply = match reply {
Ok(reply) => reply,
Err(error) => {
multi_progress.clear()?;
return Err(error.into());
}
};
if reply.progress.is_none() && !reply.digest.is_empty() { if reply.progress.is_none() && !reply.digest.is_empty() {
multi_progress.clear()?;
return Ok(reply); return Ok(reply);
} }
@ -22,97 +207,62 @@ pub async fn pull_interactive_progress(
continue; continue;
}; };
if multi_progress.is_none() { for layer in &oci.layers {
multi_progress = Some((MultiProgress::new(), HashMap::new())); let Some(ref indication) = layer.indication else {
continue;
};
let bar = match progresses.entry(layer.id.clone()) {
Entry::Occupied(entry) => Some(entry.into_mut()),
Entry::Vacant(entry) => {
if let Some(bar) = progress_bar_for_indication(indication) {
multi_progress.add(bar.clone());
Some(entry.insert(bar))
} else {
None
}
}
};
if let Some(bar) = bar {
configure_for_indication(
bar,
&mut multi_progress,
indication,
None,
Some(layer.phase()),
Some(&layer.id),
);
}
} }
let Some((multi_progress, progresses)) = multi_progress.as_mut() else { if let Some(ref indication) = oci.indication {
continue; let bar = match progresses.entry("root".to_string()) {
}; Entry::Occupied(entry) => Some(entry.into_mut()),
match oci.phase() { Entry::Vacant(entry) => {
PullImageProgressPhase::Resolved if let Some(bar) = progress_bar_for_indication(indication) {
| PullImageProgressPhase::ConfigAcquire multi_progress.add(bar.clone());
| PullImageProgressPhase::LayerAcquire => { Some(entry.insert(bar))
if progresses.is_empty() && !oci.layers.is_empty() {
for layer in &oci.layers {
let bar = ProgressBar::new(layer.total);
bar.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap());
progresses.insert(layer.id.clone(), bar.clone());
multi_progress.add(bar);
}
}
for layer in oci.layers {
let Some(progress) = progresses.get_mut(&layer.id) else {
continue;
};
let phase = match layer.phase() {
PullImageProgressLayerPhase::Waiting => "waiting",
PullImageProgressLayerPhase::Downloading => "downloading",
PullImageProgressLayerPhase::Downloaded => "downloaded",
PullImageProgressLayerPhase::Extracting => "extracting",
PullImageProgressLayerPhase::Extracted => "extracted",
_ => "unknown",
};
let simple = if let Some((_, hash)) = layer.id.split_once(':') {
hash
} else { } else {
"unknown" None
};
let simple = if simple.len() > 10 {
&simple[0..10]
} else {
simple
};
let message = format!(
"{:width$} {:phwidth$}",
simple,
phase,
width = 10,
phwidth = 11
);
if message != progress.message() {
progress.set_message(message);
} }
progress.update(|state| {
state.set_len(layer.total);
state.set_pos(layer.value);
});
} }
};
if let Some(bar) = bar {
configure_for_indication(
bar,
&mut multi_progress,
indication,
Some(oci.phase()),
None,
None,
);
} }
PullImageProgressPhase::Packing => {
for (key, bar) in &mut *progresses {
if key == "packing" {
continue;
}
bar.finish_and_clear();
multi_progress.remove(bar);
}
progresses.retain(|k, _| k == "packing");
if progresses.is_empty() {
let progress = ProgressBar::new(100);
progress.set_message("packing ");
progress.set_style(ProgressStyle::with_template("{msg} {bar}").unwrap());
progresses.insert("packing".to_string(), progress);
}
let Some(progress) = progresses.get("packing") else {
continue;
};
progress.update(|state| {
state.set_len(oci.total);
state.set_pos(oci.value);
});
}
_ => {}
} }
} }
multi_progress.clear()?;
Err(anyhow!("never received final reply for image pull")) Err(anyhow!("never received final reply for image pull"))
} }

View File

@ -372,7 +372,7 @@ impl ControlService for DaemonControlService {
let output = try_stream! { let output = try_stream! {
let mut task = tokio::task::spawn(async move { let mut task = tokio::task::spawn(async move {
our_packer.request(name, format, context).await our_packer.request(name, format, request.overwrite_cache, context).await
}); });
let abort_handle = task.abort_handle(); let abort_handle = task.abort_handle();
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
@ -381,26 +381,14 @@ impl ControlService for DaemonControlService {
loop { loop {
let what = select! { let what = select! {
x = receiver.recv() => PullImageSelect::Progress(x.ok()), x = receiver.changed() => match x {
Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())),
Err(_) => PullImageSelect::Progress(None),
},
x = &mut task => PullImageSelect::Completed(x), x = &mut task => PullImageSelect::Completed(x),
}; };
match what { match what {
PullImageSelect::Progress(Some(mut progress)) => { PullImageSelect::Progress(Some(progress)) => {
let mut drain = 0;
loop {
if drain >= 10 {
break;
}
if let Ok(latest) = receiver.try_recv() {
progress = latest;
} else {
break;
}
drain += 1;
}
let reply = PullImageReply { let reply = PullImageReply {
progress: Some(convert_oci_progress(progress)), progress: Some(convert_oci_progress(progress)),
digest: String::new(), digest: String::new(),

View File

@ -1,33 +1,72 @@
use krata::v1::control::{ use krata::v1::control::{
PullImageProgress, PullImageProgressLayer, PullImageProgressLayerPhase, PullImageProgressPhase, image_progress_indication::Indication, ImageProgress, ImageProgressIndication,
ImageProgressIndicationBar, ImageProgressIndicationCompleted, ImageProgressIndicationHidden,
ImageProgressIndicationSpinner, ImageProgressLayer, ImageProgressLayerPhase,
ImageProgressPhase,
};
use krataoci::progress::{
OciProgress, OciProgressIndication, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase,
}; };
use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase};
fn convert_oci_layer_progress(layer: OciProgressLayer) -> PullImageProgressLayer { fn convert_oci_progress_indication(indication: OciProgressIndication) -> ImageProgressIndication {
PullImageProgressLayer { ImageProgressIndication {
id: layer.id, indication: Some(match indication {
phase: match layer.phase { OciProgressIndication::Hidden => Indication::Hidden(ImageProgressIndicationHidden {}),
OciProgressLayerPhase::Waiting => PullImageProgressLayerPhase::Waiting, OciProgressIndication::ProgressBar {
OciProgressLayerPhase::Downloading => PullImageProgressLayerPhase::Downloading, message,
OciProgressLayerPhase::Downloaded => PullImageProgressLayerPhase::Downloaded, current,
OciProgressLayerPhase::Extracting => PullImageProgressLayerPhase::Extracting, total,
OciProgressLayerPhase::Extracted => PullImageProgressLayerPhase::Extracted, bytes,
} } => Indication::Bar(ImageProgressIndicationBar {
.into(), message: message.unwrap_or_default(),
value: layer.value, current,
total: layer.total, total,
is_bytes: bytes,
}),
OciProgressIndication::Spinner { message } => {
Indication::Spinner(ImageProgressIndicationSpinner {
message: message.unwrap_or_default(),
})
}
OciProgressIndication::Completed {
message,
total,
bytes,
} => Indication::Completed(ImageProgressIndicationCompleted {
message: message.unwrap_or_default(),
total: total.unwrap_or(0),
is_bytes: bytes,
}),
}),
} }
} }
pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress { fn convert_oci_layer_progress(layer: OciProgressLayer) -> ImageProgressLayer {
PullImageProgress { ImageProgressLayer {
id: layer.id,
phase: match layer.phase {
OciProgressLayerPhase::Waiting => ImageProgressLayerPhase::Waiting,
OciProgressLayerPhase::Downloading => ImageProgressLayerPhase::Downloading,
OciProgressLayerPhase::Downloaded => ImageProgressLayerPhase::Downloaded,
OciProgressLayerPhase::Extracting => ImageProgressLayerPhase::Extracting,
OciProgressLayerPhase::Extracted => ImageProgressLayerPhase::Extracted,
}
.into(),
indication: Some(convert_oci_progress_indication(layer.indication)),
}
}
pub fn convert_oci_progress(oci: OciProgress) -> ImageProgress {
ImageProgress {
phase: match oci.phase { phase: match oci.phase {
OciProgressPhase::Resolving => PullImageProgressPhase::Resolving, OciProgressPhase::Started => ImageProgressPhase::Started,
OciProgressPhase::Resolved => PullImageProgressPhase::Resolved, OciProgressPhase::Resolving => ImageProgressPhase::Resolving,
OciProgressPhase::ConfigAcquire => PullImageProgressPhase::ConfigAcquire, OciProgressPhase::Resolved => ImageProgressPhase::Resolved,
OciProgressPhase::LayerAcquire => PullImageProgressPhase::LayerAcquire, OciProgressPhase::ConfigDownload => ImageProgressPhase::ConfigDownload,
OciProgressPhase::Packing => PullImageProgressPhase::Packing, OciProgressPhase::LayerDownload => ImageProgressPhase::LayerDownload,
OciProgressPhase::Complete => PullImageProgressPhase::Complete, OciProgressPhase::Assemble => ImageProgressPhase::Assemble,
OciProgressPhase::Pack => ImageProgressPhase::Pack,
OciProgressPhase::Complete => ImageProgressPhase::Complete,
} }
.into(), .into(),
layers: oci layers: oci
@ -35,7 +74,6 @@ pub fn convert_oci_progress(oci: OciProgress) -> PullImageProgress {
.into_values() .into_values()
.map(convert_oci_layer_progress) .map(convert_oci_layer_progress)
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
value: oci.value, indication: Some(convert_oci_progress_indication(oci.indication)),
total: oci.total,
} }
} }

View File

@ -89,46 +89,75 @@ message SnoopIdmReply {
krata.bus.idm.IdmPacket packet = 3; krata.bus.idm.IdmPacket packet = 3;
} }
enum PullImageProgressLayerPhase { message ImageProgress {
PULL_IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0; ImageProgressPhase phase = 1;
PULL_IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1; repeated ImageProgressLayer layers = 2;
PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2; ImageProgressIndication indication = 3;
PULL_IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3;
PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4;
PULL_IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5;
} }
message PullImageProgressLayer { enum ImageProgressPhase {
IMAGE_PROGRESS_PHASE_UNKNOWN = 0;
IMAGE_PROGRESS_PHASE_STARTED = 1;
IMAGE_PROGRESS_PHASE_RESOLVING = 2;
IMAGE_PROGRESS_PHASE_RESOLVED = 3;
IMAGE_PROGRESS_PHASE_CONFIG_DOWNLOAD = 4;
IMAGE_PROGRESS_PHASE_LAYER_DOWNLOAD = 5;
IMAGE_PROGRESS_PHASE_ASSEMBLE = 6;
IMAGE_PROGRESS_PHASE_PACK = 7;
IMAGE_PROGRESS_PHASE_COMPLETE = 8;
}
message ImageProgressLayer {
string id = 1; string id = 1;
PullImageProgressLayerPhase phase = 2; ImageProgressLayerPhase phase = 2;
uint64 value = 3; ImageProgressIndication indication = 3;
uint64 total = 4;
} }
enum PullImageProgressPhase { enum ImageProgressLayerPhase {
PULL_IMAGE_PROGRESS_PHASE_UNKNOWN = 0; IMAGE_PROGRESS_LAYER_PHASE_UNKNOWN = 0;
PULL_IMAGE_PROGRESS_PHASE_RESOLVING = 1; IMAGE_PROGRESS_LAYER_PHASE_WAITING = 1;
PULL_IMAGE_PROGRESS_PHASE_RESOLVED = 2; IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADING = 2;
PULL_IMAGE_PROGRESS_PHASE_CONFIG_ACQUIRE = 3; IMAGE_PROGRESS_LAYER_PHASE_DOWNLOADED = 3;
PULL_IMAGE_PROGRESS_PHASE_LAYER_ACQUIRE = 4; IMAGE_PROGRESS_LAYER_PHASE_EXTRACTING = 4;
PULL_IMAGE_PROGRESS_PHASE_PACKING = 5; IMAGE_PROGRESS_LAYER_PHASE_EXTRACTED = 5;
PULL_IMAGE_PROGRESS_PHASE_COMPLETE = 6;
} }
message PullImageProgress { message ImageProgressIndication {
PullImageProgressPhase phase = 1; oneof indication {
repeated PullImageProgressLayer layers = 2; ImageProgressIndicationBar bar = 1;
uint64 value = 3; ImageProgressIndicationSpinner spinner = 2;
uint64 total = 4; ImageProgressIndicationHidden hidden = 3;
ImageProgressIndicationCompleted completed = 4;
}
}
message ImageProgressIndicationBar {
string message = 1;
uint64 current = 2;
uint64 total = 3;
bool is_bytes = 4;
}
message ImageProgressIndicationSpinner {
string message = 1;
}
message ImageProgressIndicationHidden {}
message ImageProgressIndicationCompleted {
string message = 1;
uint64 total = 2;
bool is_bytes = 3;
} }
message PullImageRequest { message PullImageRequest {
string image = 1; string image = 1;
krata.v1.common.OciImageFormat format = 2; krata.v1.common.OciImageFormat format = 2;
bool overwrite_cache = 3;
} }
message PullImageReply { message PullImageReply {
PullImageProgress progress = 1; ImageProgress progress = 1;
string digest = 2; string digest = 2;
krata.v1.common.OciImageFormat format = 3; krata.v1.common.OciImageFormat format = 3;
} }

View File

@ -25,37 +25,19 @@ async fn main() -> Result<()> {
let (context, mut receiver) = OciProgressContext::create(); let (context, mut receiver) = OciProgressContext::create();
tokio::task::spawn(async move { tokio::task::spawn(async move {
loop { loop {
let Ok(mut progress) = receiver.recv().await else { if (receiver.changed().await).is_err() {
return; break;
};
let mut drain = 0;
loop {
if drain >= 10 {
break;
}
if let Ok(latest) = receiver.try_recv() {
progress = latest;
} else {
break;
}
drain += 1;
} }
let progress = receiver.borrow_and_update();
println!("phase {:?}", progress.phase); println!("phase {:?}", progress.phase);
for (id, layer) in &progress.layers { for (id, layer) in &progress.layers {
println!( println!("{} {:?} {:?}", id, layer.phase, layer.indication,)
"{} {:?} {} of {}",
id, layer.phase, layer.value, layer.total
)
} }
} }
}); });
let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?;
let packed = service let packed = service
.request(image.clone(), OciPackedFormat::Squashfs, context) .request(image.clone(), OciPackedFormat::Squashfs, false, context)
.await?; .await?;
println!( println!(
"generated squashfs of {} to {}", "generated squashfs of {} to {}",

View File

@ -1,4 +1,4 @@
use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; use crate::fetch::{OciImageFetcher, OciImageLayer, OciImageLayerReader, OciResolvedImage};
use crate::progress::OciBoundProgress; use crate::progress::OciBoundProgress;
use crate::schema::OciSchema; use crate::schema::OciSchema;
use crate::vfs::{VfsNode, VfsTree}; use crate::vfs::{VfsNode, VfsTree};
@ -11,7 +11,6 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_tar::{Archive, Entry}; use tokio_tar::{Archive, Entry};
use uuid::Uuid; use uuid::Uuid;
@ -115,12 +114,14 @@ impl OciImageAssembler {
); );
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.extracting_layer(&layer.digest, 0, 1); progress.start_extracting_layer(&layer.digest);
}) })
.await; .await;
debug!("process layer digest={}", &layer.digest,); debug!("process layer digest={}", &layer.digest,);
let mut archive = layer.archive().await?; let mut archive = layer.archive().await?;
let mut entries = archive.entries()?; let mut entries = archive.entries()?;
let mut count = 0u64;
let mut size = 0u64;
while let Some(entry) = entries.next().await { while let Some(entry) = entries.next().await {
let mut entry = entry?; let mut entry = entry?;
let path = entry.path()?; let path = entry.path()?;
@ -134,14 +135,21 @@ impl OciImageAssembler {
self.process_whiteout_entry(&mut vfs, &entry, name, layer) self.process_whiteout_entry(&mut vfs, &entry, name, layer)
.await?; .await?;
} else { } else {
vfs.insert_tar_entry(&entry)?; let reference = vfs.insert_tar_entry(&entry)?;
self.process_write_entry(&mut vfs, &mut entry, layer) self.progress
.update(|progress| {
progress.extracting_layer(&layer.digest, &reference.name);
})
.await;
size += self
.process_write_entry(&mut vfs, &mut entry, layer)
.await?; .await?;
count += 1;
} }
} }
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.extracted_layer(&layer.digest); progress.extracted_layer(&layer.digest, count, size);
}) })
.await; .await;
} }
@ -169,7 +177,7 @@ impl OciImageAssembler {
async fn process_whiteout_entry( async fn process_whiteout_entry(
&self, &self,
vfs: &mut VfsTree, vfs: &mut VfsTree,
entry: &Entry<Archive<Pin<Box<dyn AsyncRead + Send>>>>, entry: &Entry<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>>,
name: &str, name: &str,
layer: &OciImageLayer, layer: &OciImageLayer,
) -> Result<()> { ) -> Result<()> {
@ -210,11 +218,11 @@ impl OciImageAssembler {
async fn process_write_entry( async fn process_write_entry(
&self, &self,
vfs: &mut VfsTree, vfs: &mut VfsTree,
entry: &mut Entry<Archive<Pin<Box<dyn AsyncRead + Send>>>>, entry: &mut Entry<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>>,
layer: &OciImageLayer, layer: &OciImageLayer,
) -> Result<()> { ) -> Result<u64> {
if !entry.header().entry_type().is_file() { if !entry.header().entry_type().is_file() {
return Ok(()); return Ok(0);
} }
trace!( trace!(
"unpack entry layer={} path={:?} type={:?}", "unpack entry layer={} path={:?} type={:?}",
@ -230,7 +238,7 @@ impl OciImageAssembler {
.await? .await?
.ok_or(anyhow!("unpack did not return a path"))?; .ok_or(anyhow!("unpack did not return a path"))?;
vfs.set_disk_path(&entry.path()?, &path)?; vfs.set_disk_path(&entry.path()?, &path)?;
Ok(()) Ok(entry.header().size()?)
} }
} }

View File

@ -10,6 +10,8 @@ use super::{
use std::{ use std::{
fmt::Debug, fmt::Debug,
io::SeekFrom,
os::unix::fs::MetadataExt,
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
}; };
@ -22,8 +24,8 @@ use oci_spec::image::{
}; };
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio::{ use tokio::{
fs::File, fs::{self, File},
io::{AsyncRead, AsyncReadExt, BufReader, BufWriter}, io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter},
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_tar::Archive; use tokio_tar::Archive;
@ -43,16 +45,43 @@ pub enum OciImageLayerCompression {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OciImageLayer { pub struct OciImageLayer {
pub metadata: Descriptor,
pub path: PathBuf, pub path: PathBuf,
pub digest: String, pub digest: String,
pub compression: OciImageLayerCompression, pub compression: OciImageLayerCompression,
} }
#[async_trait::async_trait]
pub trait OciImageLayerReader: AsyncRead + Sync {
async fn position(&mut self) -> Result<u64>;
}
#[async_trait::async_trait]
impl OciImageLayerReader for BufReader<File> {
async fn position(&mut self) -> Result<u64> {
Ok(self.seek(SeekFrom::Current(0)).await?)
}
}
#[async_trait::async_trait]
impl OciImageLayerReader for GzipDecoder<BufReader<File>> {
async fn position(&mut self) -> Result<u64> {
self.get_mut().position().await
}
}
#[async_trait::async_trait]
impl OciImageLayerReader for ZstdDecoder<BufReader<File>> {
async fn position(&mut self) -> Result<u64> {
self.get_mut().position().await
}
}
impl OciImageLayer { impl OciImageLayer {
pub async fn decompress(&self) -> Result<Pin<Box<dyn AsyncRead + Send>>> { pub async fn decompress(&self) -> Result<Pin<Box<dyn OciImageLayerReader + Send>>> {
let file = File::open(&self.path).await?; let file = File::open(&self.path).await?;
let reader = BufReader::new(file); let reader = BufReader::new(file);
let reader: Pin<Box<dyn AsyncRead + Send>> = match self.compression { let reader: Pin<Box<dyn OciImageLayerReader + Send>> = match self.compression {
OciImageLayerCompression::None => Box::pin(reader), OciImageLayerCompression::None => Box::pin(reader),
OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)), OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)),
OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)), OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)),
@ -60,7 +89,7 @@ impl OciImageLayer {
Ok(reader) Ok(reader)
} }
pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn AsyncRead + Send>>>> { pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>> {
let decompress = self.decompress().await?; let decompress = self.decompress().await?;
Ok(Archive::new(decompress)) Ok(Archive::new(decompress))
} }
@ -225,7 +254,7 @@ impl OciImageFetcher {
let config: OciSchema<ImageConfiguration>; let config: OciSchema<ImageConfiguration>;
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::ConfigAcquire; progress.phase = OciProgressPhase::ConfigDownload;
}) })
.await; .await;
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
@ -245,10 +274,10 @@ impl OciImageFetcher {
} }
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::LayerAcquire; progress.phase = OciProgressPhase::LayerDownload;
for layer in image.manifest.item().layers() { for layer in image.manifest.item().layers() {
progress.add_layer(layer.digest(), layer.size() as usize); progress.add_layer(layer.digest());
} }
}) })
.await; .await;
@ -256,7 +285,7 @@ impl OciImageFetcher {
for layer in image.manifest.item().layers() { for layer in image.manifest.item().layers() {
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.downloading_layer(layer.digest(), 0, layer.size() as usize); progress.downloading_layer(layer.digest(), 0, layer.size() as u64);
}) })
.await; .await;
layers.push( layers.push(
@ -265,7 +294,7 @@ impl OciImageFetcher {
); );
self.progress self.progress
.update(|progress| { .update(|progress| {
progress.downloaded_layer(layer.digest()); progress.downloaded_layer(layer.digest(), layer.size() as u64);
}) })
.await; .await;
} }
@ -304,6 +333,12 @@ impl OciImageFetcher {
} }
} }
let metadata = fs::metadata(&layer_path).await?;
if layer.size() as u64 != metadata.size() {
return Err(anyhow!("layer size differs from size in manifest",));
}
let mut media_type = layer.media_type().clone(); let mut media_type = layer.media_type().clone();
// docker layer compatibility // docker layer compatibility
@ -318,6 +353,7 @@ impl OciImageFetcher {
other => return Err(anyhow!("found layer with unknown media type: {}", other)), other => return Err(anyhow!("found layer with unknown media type: {}", other)),
}; };
Ok(OciImageLayer { Ok(OciImageLayer {
metadata: layer.clone(),
path: layer_path, path: layer_path,
digest: layer.digest().clone(), digest: layer.digest().clone(),
compression, compression,

View File

@ -1,14 +1,12 @@
use std::{path::Path, process::Stdio, sync::Arc}; use std::{os::unix::fs::MetadataExt, path::Path, process::Stdio, sync::Arc};
use super::OciPackedFormat; use super::OciPackedFormat;
use crate::{ use crate::{progress::OciBoundProgress, vfs::VfsTree};
progress::{OciBoundProgress, OciProgressPhase},
vfs::VfsTree,
};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use log::warn; use log::warn;
use tokio::{ use tokio::{
fs::File, fs::{self, File},
io::BufWriter,
pin, pin,
process::{Child, Command}, process::{Child, Command},
select, select,
@ -55,9 +53,7 @@ impl OciPackerBackend for OciPackerMkSquashfs {
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> { async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
progress progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::Packing; progress.start_packing();
progress.total = 1;
progress.value = 0;
}) })
.await; .await;
@ -120,12 +116,9 @@ impl OciPackerBackend for OciPackerMkSquashfs {
status.code().unwrap() status.code().unwrap()
)) ))
} else { } else {
let metadata = fs::metadata(&file).await?;
progress progress
.update(|progress| { .update(|progress| progress.complete(metadata.size()))
progress.phase = OciProgressPhase::Packing;
progress.total = 1;
progress.value = 1;
})
.await; .await;
Ok(()) Ok(())
} }
@ -136,12 +129,10 @@ pub struct OciPackerMkfsErofs {}
#[async_trait::async_trait] #[async_trait::async_trait]
impl OciPackerBackend for OciPackerMkfsErofs { impl OciPackerBackend for OciPackerMkfsErofs {
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, path: &Path) -> Result<()> { async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
progress progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::Packing; progress.start_packing();
progress.total = 1;
progress.value = 0;
}) })
.await; .await;
@ -149,7 +140,7 @@ impl OciPackerBackend for OciPackerMkfsErofs {
.arg("-L") .arg("-L")
.arg("root") .arg("root")
.arg("--tar=-") .arg("--tar=-")
.arg(path) .arg(file)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stderr(Stdio::null()) .stderr(Stdio::null())
.stdout(Stdio::null()) .stdout(Stdio::null())
@ -200,11 +191,10 @@ impl OciPackerBackend for OciPackerMkfsErofs {
status.code().unwrap() status.code().unwrap()
)) ))
} else { } else {
let metadata = fs::metadata(&file).await?;
progress progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::Packing; progress.complete(metadata.size());
progress.total = 1;
progress.value = 1;
}) })
.await; .await;
Ok(()) Ok(())
@ -219,20 +209,18 @@ impl OciPackerBackend for OciPackerTar {
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> { async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
progress progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::Packing; progress.start_packing();
progress.total = 1;
progress.value = 0;
}) })
.await; .await;
let file = File::create(file).await?; let output = File::create(file).await?;
vfs.write_to_tar(file).await?; let output = BufWriter::new(output);
vfs.write_to_tar(output).await?;
let metadata = fs::metadata(file).await?;
progress progress
.update(|progress| { .update(|progress| {
progress.phase = OciProgressPhase::Packing; progress.complete(metadata.size());
progress.total = 1;
progress.value = 1;
}) })
.await; .await;
Ok(()) Ok(())

View File

@ -63,6 +63,7 @@ impl OciPackerService {
&self, &self,
name: ImageName, name: ImageName,
format: OciPackedFormat, format: OciPackedFormat,
overwrite: bool,
progress_context: OciProgressContext, progress_context: OciProgressContext,
) -> Result<OciPackedImage> { ) -> Result<OciPackedImage> {
let progress = OciProgress::new(); let progress = OciProgress::new();
@ -86,7 +87,14 @@ impl OciPackerService {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let task = self let task = self
.clone() .clone()
.launch(key.clone(), format, resolved, fetcher, progress.clone()) .launch(
key.clone(),
format,
overwrite,
resolved,
fetcher,
progress.clone(),
)
.await; .await;
let (watch, receiver) = watch::channel(None); let (watch, receiver) = watch::channel(None);
@ -126,6 +134,7 @@ impl OciPackerService {
self, self,
key: OciPackerTaskKey, key: OciPackerTaskKey,
format: OciPackedFormat, format: OciPackedFormat,
overwrite: bool,
resolved: OciResolvedImage, resolved: OciResolvedImage,
fetcher: OciImageFetcher, fetcher: OciImageFetcher,
progress: OciBoundProgress, progress: OciBoundProgress,
@ -137,7 +146,7 @@ impl OciPackerService {
service.ensure_task_gone(key); service.ensure_task_gone(key);
}); });
if let Err(error) = self if let Err(error) = self
.task(key.clone(), format, resolved, fetcher, progress) .task(key.clone(), format, overwrite, resolved, fetcher, progress)
.await .await
{ {
self.finish(&key, Err(error)).await; self.finish(&key, Err(error)).await;
@ -149,13 +158,16 @@ impl OciPackerService {
&self, &self,
key: OciPackerTaskKey, key: OciPackerTaskKey,
format: OciPackedFormat, format: OciPackedFormat,
overwrite: bool,
resolved: OciResolvedImage, resolved: OciResolvedImage,
fetcher: OciImageFetcher, fetcher: OciImageFetcher,
progress: OciBoundProgress, progress: OciBoundProgress,
) -> Result<()> { ) -> Result<()> {
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { if !overwrite {
self.finish(&key, Ok(cached)).await; if let Some(cached) = self.cache.recall(&resolved.digest, format).await? {
return Ok(()); self.finish(&key, Ok(cached)).await;
return Ok(());
}
} }
let assembler = let assembler =
OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?;

View File

@ -1,18 +1,16 @@
use indexmap::IndexMap; use indexmap::IndexMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::{ use tokio::{
sync::{broadcast, Mutex}, sync::{watch, Mutex},
task::JoinHandle, task::JoinHandle,
}; };
const OCI_PROGRESS_QUEUE_LEN: usize = 100;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OciProgress { pub struct OciProgress {
pub phase: OciProgressPhase, pub phase: OciProgressPhase,
pub digest: Option<String>,
pub layers: IndexMap<String, OciProgressLayer>, pub layers: IndexMap<String, OciProgressLayer>,
pub value: u64, pub indication: OciProgressIndication,
pub total: u64,
} }
impl Default for OciProgress { impl Default for OciProgress {
@ -24,72 +22,146 @@ impl Default for OciProgress {
impl OciProgress { impl OciProgress {
pub fn new() -> Self { pub fn new() -> Self {
OciProgress { OciProgress {
phase: OciProgressPhase::Resolving, phase: OciProgressPhase::Started,
digest: None,
layers: IndexMap::new(), layers: IndexMap::new(),
value: 0, indication: OciProgressIndication::Hidden,
total: 1,
} }
} }
pub fn add_layer(&mut self, id: &str, size: usize) { pub fn start_resolving(&mut self) {
self.phase = OciProgressPhase::Resolving;
self.indication = OciProgressIndication::Spinner { message: None };
}
pub fn resolved(&mut self, digest: &str) {
self.digest = Some(digest.to_string());
self.indication = OciProgressIndication::Hidden;
}
pub fn add_layer(&mut self, id: &str) {
self.layers.insert( self.layers.insert(
id.to_string(), id.to_string(),
OciProgressLayer { OciProgressLayer {
id: id.to_string(), id: id.to_string(),
phase: OciProgressLayerPhase::Waiting, phase: OciProgressLayerPhase::Waiting,
value: 0, indication: OciProgressIndication::Spinner { message: None },
total: size as u64,
}, },
); );
} }
pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) { pub fn downloading_layer(&mut self, id: &str, downloaded: u64, total: u64) {
if let Some(entry) = self.layers.get_mut(id) { if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloading; entry.phase = OciProgressLayerPhase::Downloading;
entry.value = downloaded as u64; entry.indication = OciProgressIndication::ProgressBar {
entry.total = total as u64; message: None,
current: downloaded,
total,
bytes: true,
};
} }
} }
pub fn downloaded_layer(&mut self, id: &str) { pub fn downloaded_layer(&mut self, id: &str, total: u64) {
if let Some(entry) = self.layers.get_mut(id) { if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloaded; entry.phase = OciProgressLayerPhase::Downloaded;
entry.value = entry.total; entry.indication = OciProgressIndication::Completed {
message: None,
total: Some(total),
bytes: true,
};
} }
} }
pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) { pub fn start_assemble(&mut self) {
self.phase = OciProgressPhase::Assemble;
self.indication = OciProgressIndication::Hidden;
}
pub fn start_extracting_layer(&mut self, id: &str) {
if let Some(entry) = self.layers.get_mut(id) { if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracting; entry.phase = OciProgressLayerPhase::Extracting;
entry.value = extracted as u64; entry.indication = OciProgressIndication::Spinner { message: None };
entry.total = total as u64;
} }
} }
pub fn extracted_layer(&mut self, id: &str) { pub fn extracting_layer(&mut self, id: &str, file: &str) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracting;
entry.indication = OciProgressIndication::Spinner {
message: Some(file.to_string()),
};
}
}
pub fn extracted_layer(&mut self, id: &str, count: u64, total_size: u64) {
if let Some(entry) = self.layers.get_mut(id) { if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracted; entry.phase = OciProgressLayerPhase::Extracted;
entry.value = entry.total; entry.indication = OciProgressIndication::Completed {
message: Some(format!("{} files", count)),
total: Some(total_size),
bytes: true,
};
}
}
pub fn start_packing(&mut self) {
self.phase = OciProgressPhase::Pack;
for layer in self.layers.values_mut() {
layer.indication = OciProgressIndication::Hidden;
}
self.indication = OciProgressIndication::Spinner { message: None };
}
pub fn complete(&mut self, size: u64) {
self.phase = OciProgressPhase::Complete;
self.indication = OciProgressIndication::Completed {
message: None,
total: Some(size),
bytes: true,
} }
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum OciProgressPhase { pub enum OciProgressPhase {
Started,
Resolving, Resolving,
Resolved, Resolved,
ConfigAcquire, ConfigDownload,
LayerAcquire, LayerDownload,
Packing, Assemble,
Pack,
Complete, Complete,
} }
#[derive(Clone, Debug)]
pub enum OciProgressIndication {
Hidden,
ProgressBar {
message: Option<String>,
current: u64,
total: u64,
bytes: bool,
},
Spinner {
message: Option<String>,
},
Completed {
message: Option<String>,
total: Option<u64>,
bytes: bool,
},
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OciProgressLayer { pub struct OciProgressLayer {
pub id: String, pub id: String,
pub phase: OciProgressLayerPhase, pub phase: OciProgressLayerPhase,
pub value: u64, pub indication: OciProgressIndication,
pub total: u64,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -103,16 +175,16 @@ pub enum OciProgressLayerPhase {
#[derive(Clone)] #[derive(Clone)]
pub struct OciProgressContext { pub struct OciProgressContext {
sender: broadcast::Sender<OciProgress>, sender: watch::Sender<OciProgress>,
} }
impl OciProgressContext { impl OciProgressContext {
pub fn create() -> (OciProgressContext, broadcast::Receiver<OciProgress>) { pub fn create() -> (OciProgressContext, watch::Receiver<OciProgress>) {
let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN); let (sender, receiver) = watch::channel(OciProgress::new());
(OciProgressContext::new(sender), receiver) (OciProgressContext::new(sender), receiver)
} }
pub fn new(sender: broadcast::Sender<OciProgress>) -> OciProgressContext { pub fn new(sender: watch::Sender<OciProgress>) -> OciProgressContext {
OciProgressContext { sender } OciProgressContext { sender }
} }
@ -120,7 +192,7 @@ impl OciProgressContext {
let _ = self.sender.send(progress.clone()); let _ = self.sender.send(progress.clone());
} }
pub fn subscribe(&self) -> broadcast::Receiver<OciProgress> { pub fn subscribe(&self) -> watch::Receiver<OciProgress> {
self.sender.subscribe() self.sender.subscribe()
} }
} }
@ -156,13 +228,10 @@ impl OciBoundProgress {
context.update(&progress); context.update(&progress);
let mut receiver = self.context.subscribe(); let mut receiver = self.context.subscribe();
tokio::task::spawn(async move { tokio::task::spawn(async move {
while let Ok(progress) = receiver.recv().await { while (receiver.changed().await).is_ok() {
match context.sender.send(progress) { context
Ok(_) => {} .sender
Err(_) => { .send_replace(receiver.borrow_and_update().clone());
break;
}
}
} }
}) })
} }

View File

@ -149,24 +149,20 @@ impl OciRegistryClient {
))?; ))?;
let mut response = self.call(self.agent.get(url.as_str())).await?; let mut response = self.call(self.agent.get(url.as_str())).await?;
let mut size: u64 = 0; let mut size: u64 = 0;
let mut last_progress_size: u64 = 0;
while let Some(chunk) = response.chunk().await? { while let Some(chunk) = response.chunk().await? {
dest.write_all(&chunk).await?; dest.write_all(&chunk).await?;
size += chunk.len() as u64; size += chunk.len() as u64;
if (size - last_progress_size) > (5 * 1024 * 1024) { if let Some(ref progress) = progress {
last_progress_size = size; progress
if let Some(ref progress) = progress { .update(|progress| {
progress progress.downloading_layer(
.update(|progress| { descriptor.digest(),
progress.downloading_layer( size,
descriptor.digest(), descriptor.size() as u64,
size as usize, );
descriptor.size() as usize, })
); .await;
})
.await;
}
} }
} }
Ok(size) Ok(size)

View File

@ -194,7 +194,7 @@ impl VfsTree {
} }
} }
pub fn insert_tar_entry<X: AsyncRead + Unpin>(&mut self, entry: &Entry<X>) -> Result<()> { pub fn insert_tar_entry<X: AsyncRead + Unpin>(&mut self, entry: &Entry<X>) -> Result<&VfsNode> {
let mut meta = VfsNode::from(entry)?; let mut meta = VfsNode::from(entry)?;
let path = entry.path()?.to_path_buf(); let path = entry.path()?.to_path_buf();
let parent = if let Some(parent) = path.parent() { let parent = if let Some(parent) = path.parent() {
@ -218,8 +218,11 @@ impl VfsTree {
meta.children = old.children; meta.children = old.children;
} }
} }
parent.children.push(meta); parent.children.push(meta.clone());
Ok(()) let Some(reference) = parent.children.iter().find(|child| child.name == meta.name) else {
return Err(anyhow!("unable to find inserted child in vfs"));
};
Ok(reference)
} }
pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> { pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> {