diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index ca79e76..2a81c1a 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -1,5 +1,3 @@ -use std::{pin::Pin, str::FromStr}; - use async_stream::try_stream; use futures::Stream; use krata::{ @@ -23,12 +21,10 @@ use krataoci::{ packer::{service::OciPackerService, OciImagePacked, OciPackedFormat}, progress::{OciProgress, OciProgressContext}, }; +use std::{pin::Pin, str::FromStr}; use tokio::{ select, - sync::{ - broadcast, - mpsc::{channel, Sender}, - }, + sync::mpsc::{channel, Sender}, task::JoinError, }; use tokio_stream::StreamExt; @@ -94,7 +90,7 @@ enum ConsoleDataSelect { } enum PullImageSelect { - Progress(Option), + Progress(usize), Completed(Result, JoinError>), } @@ -370,7 +366,7 @@ impl ControlService for DaemonControlService { GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs, GuestOciImageFormat::Erofs => OciPackedFormat::Erofs, }; - let (sender, mut receiver) = broadcast::channel::(100); + let (sender, mut receiver) = channel::(100); let context = OciProgressContext::new(sender); let our_packer = self.packer.clone(); @@ -380,19 +376,22 @@ impl ControlService for DaemonControlService { our_packer.request(name, format, context).await }); loop { + let mut progresses = Vec::new(); let what = select! { - x = receiver.recv() => PullImageSelect::Progress(x.ok()), + x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x), x = &mut task => PullImageSelect::Completed(x), }; - match what { - PullImageSelect::Progress(Some(progress)) => { - let reply = PullImageReply { - progress: Some(convert_oci_progress(progress)), - digest: String::new(), - format: GuestOciImageFormat::Unknown.into(), - }; - yield reply; + PullImageSelect::Progress(count) => { + if count > 0 { + let progress = progresses.remove(progresses.len() - 1); + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: GuestOciImageFormat::Unknown.into(), + }; + yield reply; + } }, PullImageSelect::Completed(result) => { @@ -413,8 +412,6 @@ impl ControlService for DaemonControlService { yield reply; break; }, - - _ => {}, } } }; diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index d4785e8..8bd2932 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -8,7 +8,7 @@ use krataoci::{ progress::{OciProgress, OciProgressContext}, registry::OciPlatform, }; -use tokio::{fs, sync::broadcast}; +use tokio::{fs, sync::mpsc::channel}; #[tokio::main] async fn main() -> Result<()> { @@ -22,14 +22,16 @@ async fn main() -> Result<()> { fs::create_dir(&cache_dir).await?; } - let (sender, mut receiver) = broadcast::channel::(1000); + let (sender, mut receiver) = channel::(100); tokio::task::spawn(async move { loop { - let Some(progress) = receiver.recv().await.ok() else { - break; + let mut progresses = Vec::new(); + let _ = receiver.recv_many(&mut progresses, 100).await; + let Some(progress) = progresses.last() else { + continue; }; println!("phase {:?}", progress.phase); - for (id, layer) in progress.layers { + for (id, layer) in &progress.layers { println!( "{} {:?} {} of {}", id, layer.phase, layer.value, layer.total diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 56caafe..5f13ae3 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -113,48 +113,21 @@ impl OciImageAssembler { progress.extracting_layer(&layer.digest, 0, 1); }) .await; - let (whiteouts, count) = self.process_layer_whiteout(&mut vfs, layer).await?; - self.progress - .update(|progress| { - progress.extracting_layer(&layer.digest, 0, count); - }) - .await; - debug!( - "process layer digest={} whiteouts={:?}", - &layer.digest, whiteouts - ); + debug!("process layer digest={}", &layer.digest,); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; - let mut completed = 0; while let Some(entry) = entries.next().await { let mut entry = entry?; let path = entry.path()?; - let mut maybe_whiteout_path_str = - path.to_str().map(|x| x.to_string()).unwrap_or_default(); - if (completed % 10) == 0 { - self.progress - .update(|progress| { - progress.extracting_layer(&layer.digest, completed, count); - }) - .await; - } - completed += 1; - if whiteouts.contains(&maybe_whiteout_path_str) { - continue; - } - maybe_whiteout_path_str.push('/'); - if whiteouts.contains(&maybe_whiteout_path_str) { - continue; - } let Some(name) = path.file_name() else { continue; }; let Some(name) = name.to_str() else { continue; }; - if name.starts_with(".wh.") { - continue; + self.process_whiteout_entry(&mut vfs, &entry, name, layer) + .await?; } else { vfs.insert_tar_entry(&entry)?; self.process_write_entry(&mut vfs, &mut entry, layer) @@ -181,45 +154,13 @@ impl OciImageAssembler { }) } - async fn process_layer_whiteout( - &self, - vfs: &mut VfsTree, - layer: &OciImageLayer, - ) -> Result<(Vec, usize)> { - let mut whiteouts = Vec::new(); - let mut archive = layer.archive().await?; - let mut entries = archive.entries()?; - let mut count = 0usize; - while let Some(entry) = entries.next().await { - let entry = entry?; - count += 1; - let path = entry.path()?; - let Some(name) = path.file_name() else { - continue; - }; - let Some(name) = name.to_str() else { - continue; - }; - - if name.starts_with(".wh.") { - let path = self - .process_whiteout_entry(vfs, &entry, name, layer) - .await?; - if let Some(path) = path { - whiteouts.push(path); - } - } - } - Ok((whiteouts, count)) - } - async fn process_whiteout_entry( &self, vfs: &mut VfsTree, entry: &Entry>>>, name: &str, layer: &OciImageLayer, - ) -> Result> { + ) -> Result<()> { let path = entry.path()?; let mut path = path.to_path_buf(); path.pop(); @@ -231,24 +172,27 @@ impl OciImageAssembler { path.push(file); } - trace!("whiteout entry layer={} path={:?}", &layer.digest, path,); + trace!( + "whiteout entry {:?} layer={} path={:?}", + entry.path()?, + &layer.digest, + path + ); - let whiteout = path - .to_str() - .ok_or(anyhow!("unable to convert path to string"))? - .to_string(); - - let removed = vfs.root.remove(&path); - if let Some(removed) = removed { - delete_disk_paths(removed).await?; + let result = vfs.root.remove(&path); + if let Some((parent, mut removed)) = result { + delete_disk_paths(&removed).await?; + if opaque { + removed.children.clear(); + parent.children.push(removed); + } } else { - trace!( + warn!( "whiteout entry layer={} path={:?} did not exist", - &layer.digest, - path + &layer.digest, path ); } - Ok(if opaque { None } else { Some(whiteout) }) + Ok(()) } async fn process_write_entry( @@ -266,6 +210,9 @@ impl OciImageAssembler { entry.path()?, entry.header().entry_type(), ); + entry.set_preserve_permissions(false); + entry.set_unpack_xattrs(false); + entry.set_preserve_mtime(false); let path = entry .unpack_in(&self.disk_dir) .await? @@ -275,7 +222,7 @@ impl OciImageAssembler { } } -async fn delete_disk_paths(node: VfsNode) -> Result<()> { +async fn delete_disk_paths(node: &VfsNode) -> Result<()> { let mut queue = vec![node]; while !queue.is_empty() { let node = queue.remove(0); @@ -285,7 +232,8 @@ async fn delete_disk_paths(node: VfsNode) -> Result<()> { } fs::remove_file(disk_path).await?; } - queue.extend_from_slice(&node.children); + let children = node.children.iter().collect::>(); + queue.extend_from_slice(&children); } Ok(()) } diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index c54cba3..43bdaea 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use indexmap::IndexMap; -use tokio::sync::{broadcast::Sender, Mutex}; +use tokio::sync::{mpsc::Sender, Mutex}; #[derive(Clone, Debug)] pub struct OciProgress { @@ -108,7 +108,7 @@ impl OciProgressContext { } pub fn update(&self, progress: &OciProgress) { - let _ = self.sender.send(progress.clone()); + let _ = self.sender.try_send(progress.clone()); } } diff --git a/crates/oci/src/vfs.rs b/crates/oci/src/vfs.rs index a67b19f..51c3e0c 100644 --- a/crates/oci/src/vfs.rs +++ b/crates/oci/src/vfs.rs @@ -7,7 +7,7 @@ use tokio::{ }; use tokio_tar::{Builder, Entry, EntryType, Header}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum VfsNodeType { Directory, RegularFile, @@ -100,7 +100,7 @@ impl VfsNode { Some(node) } - pub fn remove(&mut self, path: &Path) -> Option { + pub fn remove(&mut self, path: &Path) -> Option<(&mut VfsNode, VfsNode)> { let parent = path.parent()?; let node = self.lookup_mut(parent)?; let file_name = path.file_name()?; @@ -109,7 +109,8 @@ impl VfsNode { .children .iter() .position(|child| file_name == child.name)?; - Some(node.children.remove(position)) + let removed = node.children.remove(position); + Some((node, removed)) } pub fn create_tar_header(&self) -> Result
{ @@ -194,7 +195,7 @@ impl VfsTree { } pub fn insert_tar_entry(&mut self, entry: &Entry) -> Result<()> { - let meta = VfsNode::from(entry)?; + let mut meta = VfsNode::from(entry)?; let path = entry.path()?.to_path_buf(); let parent = if let Some(parent) = path.parent() { self.root.lookup_mut(parent) @@ -206,7 +207,17 @@ impl VfsTree { return Err(anyhow!("unable to find parent of entry")); }; - parent.children.retain(|child| child.name != meta.name); + let position = parent + .children + .iter() + .position(|child| meta.name == child.name); + + if let Some(position) = position { + let old = parent.children.remove(position); + if meta.typ == VfsNodeType::Directory { + meta.children = old.children; + } + } parent.children.push(meta); Ok(()) } @@ -237,7 +248,6 @@ impl VfsTree { if path.components().count() != 0 { node.write_to_tar(&path, &mut builder).await?; } - for child in &node.children { queue.push((path.clone(), child)); }