mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-06 06:31:31 +00:00
feat: implement improved and detailed oci progress indication
This commit is contained in:
@ -25,37 +25,19 @@ async fn main() -> Result<()> {
|
||||
let (context, mut receiver) = OciProgressContext::create();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
let Ok(mut progress) = receiver.recv().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut drain = 0;
|
||||
loop {
|
||||
if drain >= 10 {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(latest) = receiver.try_recv() {
|
||||
progress = latest;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
drain += 1;
|
||||
if (receiver.changed().await).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
let progress = receiver.borrow_and_update();
|
||||
println!("phase {:?}", progress.phase);
|
||||
for (id, layer) in &progress.layers {
|
||||
println!(
|
||||
"{} {:?} {} of {}",
|
||||
id, layer.phase, layer.value, layer.total
|
||||
)
|
||||
println!("{} {:?} {:?}", id, layer.phase, layer.indication,)
|
||||
}
|
||||
}
|
||||
});
|
||||
let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?;
|
||||
let packed = service
|
||||
.request(image.clone(), OciPackedFormat::Squashfs, context)
|
||||
.request(image.clone(), OciPackedFormat::Squashfs, false, context)
|
||||
.await?;
|
||||
println!(
|
||||
"generated squashfs of {} to {}",
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage};
|
||||
use crate::fetch::{OciImageFetcher, OciImageLayer, OciImageLayerReader, OciResolvedImage};
|
||||
use crate::progress::OciBoundProgress;
|
||||
use crate::schema::OciSchema;
|
||||
use crate::vfs::{VfsNode, VfsTree};
|
||||
@ -11,7 +11,6 @@ use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_tar::{Archive, Entry};
|
||||
use uuid::Uuid;
|
||||
@ -115,12 +114,14 @@ impl OciImageAssembler {
|
||||
);
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.extracting_layer(&layer.digest, 0, 1);
|
||||
progress.start_extracting_layer(&layer.digest);
|
||||
})
|
||||
.await;
|
||||
debug!("process layer digest={}", &layer.digest,);
|
||||
let mut archive = layer.archive().await?;
|
||||
let mut entries = archive.entries()?;
|
||||
let mut count = 0u64;
|
||||
let mut size = 0u64;
|
||||
while let Some(entry) = entries.next().await {
|
||||
let mut entry = entry?;
|
||||
let path = entry.path()?;
|
||||
@ -134,14 +135,21 @@ impl OciImageAssembler {
|
||||
self.process_whiteout_entry(&mut vfs, &entry, name, layer)
|
||||
.await?;
|
||||
} else {
|
||||
vfs.insert_tar_entry(&entry)?;
|
||||
self.process_write_entry(&mut vfs, &mut entry, layer)
|
||||
let reference = vfs.insert_tar_entry(&entry)?;
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.extracting_layer(&layer.digest, &reference.name);
|
||||
})
|
||||
.await;
|
||||
size += self
|
||||
.process_write_entry(&mut vfs, &mut entry, layer)
|
||||
.await?;
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.extracted_layer(&layer.digest);
|
||||
progress.extracted_layer(&layer.digest, count, size);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@ -169,7 +177,7 @@ impl OciImageAssembler {
|
||||
async fn process_whiteout_entry(
|
||||
&self,
|
||||
vfs: &mut VfsTree,
|
||||
entry: &Entry<Archive<Pin<Box<dyn AsyncRead + Send>>>>,
|
||||
entry: &Entry<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>>,
|
||||
name: &str,
|
||||
layer: &OciImageLayer,
|
||||
) -> Result<()> {
|
||||
@ -210,11 +218,11 @@ impl OciImageAssembler {
|
||||
async fn process_write_entry(
|
||||
&self,
|
||||
vfs: &mut VfsTree,
|
||||
entry: &mut Entry<Archive<Pin<Box<dyn AsyncRead + Send>>>>,
|
||||
entry: &mut Entry<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>>,
|
||||
layer: &OciImageLayer,
|
||||
) -> Result<()> {
|
||||
) -> Result<u64> {
|
||||
if !entry.header().entry_type().is_file() {
|
||||
return Ok(());
|
||||
return Ok(0);
|
||||
}
|
||||
trace!(
|
||||
"unpack entry layer={} path={:?} type={:?}",
|
||||
@ -230,7 +238,7 @@ impl OciImageAssembler {
|
||||
.await?
|
||||
.ok_or(anyhow!("unpack did not return a path"))?;
|
||||
vfs.set_disk_path(&entry.path()?, &path)?;
|
||||
Ok(())
|
||||
Ok(entry.header().size()?)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,8 @@ use super::{
|
||||
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
io::SeekFrom,
|
||||
os::unix::fs::MetadataExt,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
};
|
||||
@ -22,8 +24,8 @@ use oci_spec::image::{
|
||||
};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncRead, AsyncReadExt, BufReader, BufWriter},
|
||||
fs::{self, File},
|
||||
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_tar::Archive;
|
||||
@ -43,16 +45,43 @@ pub enum OciImageLayerCompression {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciImageLayer {
|
||||
pub metadata: Descriptor,
|
||||
pub path: PathBuf,
|
||||
pub digest: String,
|
||||
pub compression: OciImageLayerCompression,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait OciImageLayerReader: AsyncRead + Sync {
|
||||
async fn position(&mut self) -> Result<u64>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl OciImageLayerReader for BufReader<File> {
|
||||
async fn position(&mut self) -> Result<u64> {
|
||||
Ok(self.seek(SeekFrom::Current(0)).await?)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl OciImageLayerReader for GzipDecoder<BufReader<File>> {
|
||||
async fn position(&mut self) -> Result<u64> {
|
||||
self.get_mut().position().await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl OciImageLayerReader for ZstdDecoder<BufReader<File>> {
|
||||
async fn position(&mut self) -> Result<u64> {
|
||||
self.get_mut().position().await
|
||||
}
|
||||
}
|
||||
|
||||
impl OciImageLayer {
|
||||
pub async fn decompress(&self) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
|
||||
pub async fn decompress(&self) -> Result<Pin<Box<dyn OciImageLayerReader + Send>>> {
|
||||
let file = File::open(&self.path).await?;
|
||||
let reader = BufReader::new(file);
|
||||
let reader: Pin<Box<dyn AsyncRead + Send>> = match self.compression {
|
||||
let reader: Pin<Box<dyn OciImageLayerReader + Send>> = match self.compression {
|
||||
OciImageLayerCompression::None => Box::pin(reader),
|
||||
OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)),
|
||||
OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)),
|
||||
@ -60,7 +89,7 @@ impl OciImageLayer {
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn AsyncRead + Send>>>> {
|
||||
pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn OciImageLayerReader + Send>>>> {
|
||||
let decompress = self.decompress().await?;
|
||||
Ok(Archive::new(decompress))
|
||||
}
|
||||
@ -225,7 +254,7 @@ impl OciImageFetcher {
|
||||
let config: OciSchema<ImageConfiguration>;
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::ConfigAcquire;
|
||||
progress.phase = OciProgressPhase::ConfigDownload;
|
||||
})
|
||||
.await;
|
||||
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
|
||||
@ -245,10 +274,10 @@ impl OciImageFetcher {
|
||||
}
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::LayerAcquire;
|
||||
progress.phase = OciProgressPhase::LayerDownload;
|
||||
|
||||
for layer in image.manifest.item().layers() {
|
||||
progress.add_layer(layer.digest(), layer.size() as usize);
|
||||
progress.add_layer(layer.digest());
|
||||
}
|
||||
})
|
||||
.await;
|
||||
@ -256,7 +285,7 @@ impl OciImageFetcher {
|
||||
for layer in image.manifest.item().layers() {
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.downloading_layer(layer.digest(), 0, layer.size() as usize);
|
||||
progress.downloading_layer(layer.digest(), 0, layer.size() as u64);
|
||||
})
|
||||
.await;
|
||||
layers.push(
|
||||
@ -265,7 +294,7 @@ impl OciImageFetcher {
|
||||
);
|
||||
self.progress
|
||||
.update(|progress| {
|
||||
progress.downloaded_layer(layer.digest());
|
||||
progress.downloaded_layer(layer.digest(), layer.size() as u64);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@ -304,6 +333,12 @@ impl OciImageFetcher {
|
||||
}
|
||||
}
|
||||
|
||||
let metadata = fs::metadata(&layer_path).await?;
|
||||
|
||||
if layer.size() as u64 != metadata.size() {
|
||||
return Err(anyhow!("layer size differs from size in manifest",));
|
||||
}
|
||||
|
||||
let mut media_type = layer.media_type().clone();
|
||||
|
||||
// docker layer compatibility
|
||||
@ -318,6 +353,7 @@ impl OciImageFetcher {
|
||||
other => return Err(anyhow!("found layer with unknown media type: {}", other)),
|
||||
};
|
||||
Ok(OciImageLayer {
|
||||
metadata: layer.clone(),
|
||||
path: layer_path,
|
||||
digest: layer.digest().clone(),
|
||||
compression,
|
||||
|
@ -1,14 +1,12 @@
|
||||
use std::{path::Path, process::Stdio, sync::Arc};
|
||||
use std::{os::unix::fs::MetadataExt, path::Path, process::Stdio, sync::Arc};
|
||||
|
||||
use super::OciPackedFormat;
|
||||
use crate::{
|
||||
progress::{OciBoundProgress, OciProgressPhase},
|
||||
vfs::VfsTree,
|
||||
};
|
||||
use crate::{progress::OciBoundProgress, vfs::VfsTree};
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::warn;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
fs::{self, File},
|
||||
io::BufWriter,
|
||||
pin,
|
||||
process::{Child, Command},
|
||||
select,
|
||||
@ -55,9 +53,7 @@ impl OciPackerBackend for OciPackerMkSquashfs {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -120,12 +116,9 @@ impl OciPackerBackend for OciPackerMkSquashfs {
|
||||
status.code().unwrap()
|
||||
))
|
||||
} else {
|
||||
let metadata = fs::metadata(&file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
})
|
||||
.update(|progress| progress.complete(metadata.size()))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
@ -136,12 +129,10 @@ pub struct OciPackerMkfsErofs {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, path: &Path) -> Result<()> {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -149,7 +140,7 @@ impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
.arg("-L")
|
||||
.arg("root")
|
||||
.arg("--tar=-")
|
||||
.arg(path)
|
||||
.arg(file)
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
@ -200,11 +191,10 @@ impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
status.code().unwrap()
|
||||
))
|
||||
} else {
|
||||
let metadata = fs::metadata(&file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
progress.complete(metadata.size());
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
@ -219,20 +209,18 @@ impl OciPackerBackend for OciPackerTar {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
let file = File::create(file).await?;
|
||||
vfs.write_to_tar(file).await?;
|
||||
let output = File::create(file).await?;
|
||||
let output = BufWriter::new(output);
|
||||
vfs.write_to_tar(output).await?;
|
||||
|
||||
let metadata = fs::metadata(file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
progress.complete(metadata.size());
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
|
@ -63,6 +63,7 @@ impl OciPackerService {
|
||||
&self,
|
||||
name: ImageName,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
progress_context: OciProgressContext,
|
||||
) -> Result<OciPackedImage> {
|
||||
let progress = OciProgress::new();
|
||||
@ -86,7 +87,14 @@ impl OciPackerService {
|
||||
Entry::Vacant(entry) => {
|
||||
let task = self
|
||||
.clone()
|
||||
.launch(key.clone(), format, resolved, fetcher, progress.clone())
|
||||
.launch(
|
||||
key.clone(),
|
||||
format,
|
||||
overwrite,
|
||||
resolved,
|
||||
fetcher,
|
||||
progress.clone(),
|
||||
)
|
||||
.await;
|
||||
let (watch, receiver) = watch::channel(None);
|
||||
|
||||
@ -126,6 +134,7 @@ impl OciPackerService {
|
||||
self,
|
||||
key: OciPackerTaskKey,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
resolved: OciResolvedImage,
|
||||
fetcher: OciImageFetcher,
|
||||
progress: OciBoundProgress,
|
||||
@ -137,7 +146,7 @@ impl OciPackerService {
|
||||
service.ensure_task_gone(key);
|
||||
});
|
||||
if let Err(error) = self
|
||||
.task(key.clone(), format, resolved, fetcher, progress)
|
||||
.task(key.clone(), format, overwrite, resolved, fetcher, progress)
|
||||
.await
|
||||
{
|
||||
self.finish(&key, Err(error)).await;
|
||||
@ -149,13 +158,16 @@ impl OciPackerService {
|
||||
&self,
|
||||
key: OciPackerTaskKey,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
resolved: OciResolvedImage,
|
||||
fetcher: OciImageFetcher,
|
||||
progress: OciBoundProgress,
|
||||
) -> Result<()> {
|
||||
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? {
|
||||
self.finish(&key, Ok(cached)).await;
|
||||
return Ok(());
|
||||
if !overwrite {
|
||||
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? {
|
||||
self.finish(&key, Ok(cached)).await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let assembler =
|
||||
OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?;
|
||||
|
@ -1,18 +1,16 @@
|
||||
use indexmap::IndexMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
sync::{broadcast, Mutex},
|
||||
sync::{watch, Mutex},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
const OCI_PROGRESS_QUEUE_LEN: usize = 100;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciProgress {
|
||||
pub phase: OciProgressPhase,
|
||||
pub digest: Option<String>,
|
||||
pub layers: IndexMap<String, OciProgressLayer>,
|
||||
pub value: u64,
|
||||
pub total: u64,
|
||||
pub indication: OciProgressIndication,
|
||||
}
|
||||
|
||||
impl Default for OciProgress {
|
||||
@ -24,72 +22,146 @@ impl Default for OciProgress {
|
||||
impl OciProgress {
|
||||
pub fn new() -> Self {
|
||||
OciProgress {
|
||||
phase: OciProgressPhase::Resolving,
|
||||
phase: OciProgressPhase::Started,
|
||||
digest: None,
|
||||
layers: IndexMap::new(),
|
||||
value: 0,
|
||||
total: 1,
|
||||
indication: OciProgressIndication::Hidden,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self, id: &str, size: usize) {
|
||||
pub fn start_resolving(&mut self) {
|
||||
self.phase = OciProgressPhase::Resolving;
|
||||
self.indication = OciProgressIndication::Spinner { message: None };
|
||||
}
|
||||
|
||||
pub fn resolved(&mut self, digest: &str) {
|
||||
self.digest = Some(digest.to_string());
|
||||
self.indication = OciProgressIndication::Hidden;
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self, id: &str) {
|
||||
self.layers.insert(
|
||||
id.to_string(),
|
||||
OciProgressLayer {
|
||||
id: id.to_string(),
|
||||
phase: OciProgressLayerPhase::Waiting,
|
||||
value: 0,
|
||||
total: size as u64,
|
||||
indication: OciProgressIndication::Spinner { message: None },
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) {
|
||||
pub fn downloading_layer(&mut self, id: &str, downloaded: u64, total: u64) {
|
||||
if let Some(entry) = self.layers.get_mut(id) {
|
||||
entry.phase = OciProgressLayerPhase::Downloading;
|
||||
entry.value = downloaded as u64;
|
||||
entry.total = total as u64;
|
||||
entry.indication = OciProgressIndication::ProgressBar {
|
||||
message: None,
|
||||
current: downloaded,
|
||||
total,
|
||||
bytes: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn downloaded_layer(&mut self, id: &str) {
|
||||
pub fn downloaded_layer(&mut self, id: &str, total: u64) {
|
||||
if let Some(entry) = self.layers.get_mut(id) {
|
||||
entry.phase = OciProgressLayerPhase::Downloaded;
|
||||
entry.value = entry.total;
|
||||
entry.indication = OciProgressIndication::Completed {
|
||||
message: None,
|
||||
total: Some(total),
|
||||
bytes: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) {
|
||||
pub fn start_assemble(&mut self) {
|
||||
self.phase = OciProgressPhase::Assemble;
|
||||
self.indication = OciProgressIndication::Hidden;
|
||||
}
|
||||
|
||||
pub fn start_extracting_layer(&mut self, id: &str) {
|
||||
if let Some(entry) = self.layers.get_mut(id) {
|
||||
entry.phase = OciProgressLayerPhase::Extracting;
|
||||
entry.value = extracted as u64;
|
||||
entry.total = total as u64;
|
||||
entry.indication = OciProgressIndication::Spinner { message: None };
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extracted_layer(&mut self, id: &str) {
|
||||
pub fn extracting_layer(&mut self, id: &str, file: &str) {
|
||||
if let Some(entry) = self.layers.get_mut(id) {
|
||||
entry.phase = OciProgressLayerPhase::Extracting;
|
||||
entry.indication = OciProgressIndication::Spinner {
|
||||
message: Some(file.to_string()),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extracted_layer(&mut self, id: &str, count: u64, total_size: u64) {
|
||||
if let Some(entry) = self.layers.get_mut(id) {
|
||||
entry.phase = OciProgressLayerPhase::Extracted;
|
||||
entry.value = entry.total;
|
||||
entry.indication = OciProgressIndication::Completed {
|
||||
message: Some(format!("{} files", count)),
|
||||
total: Some(total_size),
|
||||
bytes: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_packing(&mut self) {
|
||||
self.phase = OciProgressPhase::Pack;
|
||||
for layer in self.layers.values_mut() {
|
||||
layer.indication = OciProgressIndication::Hidden;
|
||||
}
|
||||
self.indication = OciProgressIndication::Spinner { message: None };
|
||||
}
|
||||
|
||||
pub fn complete(&mut self, size: u64) {
|
||||
self.phase = OciProgressPhase::Complete;
|
||||
self.indication = OciProgressIndication::Completed {
|
||||
message: None,
|
||||
total: Some(size),
|
||||
bytes: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OciProgressPhase {
|
||||
Started,
|
||||
Resolving,
|
||||
Resolved,
|
||||
ConfigAcquire,
|
||||
LayerAcquire,
|
||||
Packing,
|
||||
ConfigDownload,
|
||||
LayerDownload,
|
||||
Assemble,
|
||||
Pack,
|
||||
Complete,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OciProgressIndication {
|
||||
Hidden,
|
||||
|
||||
ProgressBar {
|
||||
message: Option<String>,
|
||||
current: u64,
|
||||
total: u64,
|
||||
bytes: bool,
|
||||
},
|
||||
|
||||
Spinner {
|
||||
message: Option<String>,
|
||||
},
|
||||
|
||||
Completed {
|
||||
message: Option<String>,
|
||||
total: Option<u64>,
|
||||
bytes: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciProgressLayer {
|
||||
pub id: String,
|
||||
pub phase: OciProgressLayerPhase,
|
||||
pub value: u64,
|
||||
pub total: u64,
|
||||
pub indication: OciProgressIndication,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@ -103,16 +175,16 @@ pub enum OciProgressLayerPhase {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OciProgressContext {
|
||||
sender: broadcast::Sender<OciProgress>,
|
||||
sender: watch::Sender<OciProgress>,
|
||||
}
|
||||
|
||||
impl OciProgressContext {
|
||||
pub fn create() -> (OciProgressContext, broadcast::Receiver<OciProgress>) {
|
||||
let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN);
|
||||
pub fn create() -> (OciProgressContext, watch::Receiver<OciProgress>) {
|
||||
let (sender, receiver) = watch::channel(OciProgress::new());
|
||||
(OciProgressContext::new(sender), receiver)
|
||||
}
|
||||
|
||||
pub fn new(sender: broadcast::Sender<OciProgress>) -> OciProgressContext {
|
||||
pub fn new(sender: watch::Sender<OciProgress>) -> OciProgressContext {
|
||||
OciProgressContext { sender }
|
||||
}
|
||||
|
||||
@ -120,7 +192,7 @@ impl OciProgressContext {
|
||||
let _ = self.sender.send(progress.clone());
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<OciProgress> {
|
||||
pub fn subscribe(&self) -> watch::Receiver<OciProgress> {
|
||||
self.sender.subscribe()
|
||||
}
|
||||
}
|
||||
@ -156,13 +228,10 @@ impl OciBoundProgress {
|
||||
context.update(&progress);
|
||||
let mut receiver = self.context.subscribe();
|
||||
tokio::task::spawn(async move {
|
||||
while let Ok(progress) = receiver.recv().await {
|
||||
match context.sender.send(progress) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (receiver.changed().await).is_ok() {
|
||||
context
|
||||
.sender
|
||||
.send_replace(receiver.borrow_and_update().clone());
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -149,24 +149,20 @@ impl OciRegistryClient {
|
||||
))?;
|
||||
let mut response = self.call(self.agent.get(url.as_str())).await?;
|
||||
let mut size: u64 = 0;
|
||||
let mut last_progress_size: u64 = 0;
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
dest.write_all(&chunk).await?;
|
||||
size += chunk.len() as u64;
|
||||
|
||||
if (size - last_progress_size) > (5 * 1024 * 1024) {
|
||||
last_progress_size = size;
|
||||
if let Some(ref progress) = progress {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.downloading_layer(
|
||||
descriptor.digest(),
|
||||
size as usize,
|
||||
descriptor.size() as usize,
|
||||
);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
if let Some(ref progress) = progress {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.downloading_layer(
|
||||
descriptor.digest(),
|
||||
size,
|
||||
descriptor.size() as u64,
|
||||
);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Ok(size)
|
||||
|
@ -194,7 +194,7 @@ impl VfsTree {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_tar_entry<X: AsyncRead + Unpin>(&mut self, entry: &Entry<X>) -> Result<()> {
|
||||
pub fn insert_tar_entry<X: AsyncRead + Unpin>(&mut self, entry: &Entry<X>) -> Result<&VfsNode> {
|
||||
let mut meta = VfsNode::from(entry)?;
|
||||
let path = entry.path()?.to_path_buf();
|
||||
let parent = if let Some(parent) = path.parent() {
|
||||
@ -218,8 +218,11 @@ impl VfsTree {
|
||||
meta.children = old.children;
|
||||
}
|
||||
}
|
||||
parent.children.push(meta);
|
||||
Ok(())
|
||||
parent.children.push(meta.clone());
|
||||
let Some(reference) = parent.children.iter().find(|child| child.name == meta.name) else {
|
||||
return Err(anyhow!("unable to find inserted child in vfs"));
|
||||
};
|
||||
Ok(reference)
|
||||
}
|
||||
|
||||
pub fn set_disk_path(&mut self, path: &Path, disk_path: &Path) -> Result<()> {
|
||||
|
Reference in New Issue
Block a user