feat: rework oci to preserve permissions via a vfs

This commit is contained in:
Alex Zenla 2024-04-15 12:41:17 +00:00
parent 5169a97d43
commit 32da4dec5a
No known key found for this signature in database
GPG Key ID: 067B238899B51269
5 changed files with 67 additions and 110 deletions

View File

@ -1,5 +1,3 @@
use std::{pin::Pin, str::FromStr};
use async_stream::try_stream;
use futures::Stream;
use krata::{
@ -23,12 +21,10 @@ use krataoci::{
packer::{service::OciPackerService, OciImagePacked, OciPackedFormat},
progress::{OciProgress, OciProgressContext},
};
use std::{pin::Pin, str::FromStr};
use tokio::{
select,
sync::{
broadcast,
mpsc::{channel, Sender},
},
sync::mpsc::{channel, Sender},
task::JoinError,
};
use tokio_stream::StreamExt;
@ -94,7 +90,7 @@ enum ConsoleDataSelect {
}
enum PullImageSelect {
Progress(Option<OciProgress>),
Progress(usize),
Completed(Result<Result<OciImagePacked, anyhow::Error>, JoinError>),
}
@ -370,7 +366,7 @@ impl ControlService for DaemonControlService {
GuestOciImageFormat::Squashfs => OciPackedFormat::Squashfs,
GuestOciImageFormat::Erofs => OciPackedFormat::Erofs,
};
let (sender, mut receiver) = broadcast::channel::<OciProgress>(100);
let (sender, mut receiver) = channel::<OciProgress>(100);
let context = OciProgressContext::new(sender);
let our_packer = self.packer.clone();
@ -380,19 +376,22 @@ impl ControlService for DaemonControlService {
our_packer.request(name, format, context).await
});
loop {
let mut progresses = Vec::new();
let what = select! {
x = receiver.recv() => PullImageSelect::Progress(x.ok()),
x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x),
x = &mut task => PullImageSelect::Completed(x),
};
match what {
PullImageSelect::Progress(Some(progress)) => {
let reply = PullImageReply {
progress: Some(convert_oci_progress(progress)),
digest: String::new(),
format: GuestOciImageFormat::Unknown.into(),
};
yield reply;
PullImageSelect::Progress(count) => {
if count > 0 {
let progress = progresses.remove(progresses.len() - 1);
let reply = PullImageReply {
progress: Some(convert_oci_progress(progress)),
digest: String::new(),
format: GuestOciImageFormat::Unknown.into(),
};
yield reply;
}
},
PullImageSelect::Completed(result) => {
@ -413,8 +412,6 @@ impl ControlService for DaemonControlService {
yield reply;
break;
},
_ => {},
}
}
};

View File

@ -8,7 +8,7 @@ use krataoci::{
progress::{OciProgress, OciProgressContext},
registry::OciPlatform,
};
use tokio::{fs, sync::broadcast};
use tokio::{fs, sync::mpsc::channel};
#[tokio::main]
async fn main() -> Result<()> {
@ -22,14 +22,16 @@ async fn main() -> Result<()> {
fs::create_dir(&cache_dir).await?;
}
let (sender, mut receiver) = broadcast::channel::<OciProgress>(1000);
let (sender, mut receiver) = channel::<OciProgress>(100);
tokio::task::spawn(async move {
loop {
let Some(progress) = receiver.recv().await.ok() else {
break;
let mut progresses = Vec::new();
let _ = receiver.recv_many(&mut progresses, 100).await;
let Some(progress) = progresses.last() else {
continue;
};
println!("phase {:?}", progress.phase);
for (id, layer) in progress.layers {
for (id, layer) in &progress.layers {
println!(
"{} {:?} {} of {}",
id, layer.phase, layer.value, layer.total

View File

@ -113,48 +113,21 @@ impl OciImageAssembler {
progress.extracting_layer(&layer.digest, 0, 1);
})
.await;
let (whiteouts, count) = self.process_layer_whiteout(&mut vfs, layer).await?;
self.progress
.update(|progress| {
progress.extracting_layer(&layer.digest, 0, count);
})
.await;
debug!(
"process layer digest={} whiteouts={:?}",
&layer.digest, whiteouts
);
debug!("process layer digest={}", &layer.digest,);
let mut archive = layer.archive().await?;
let mut entries = archive.entries()?;
let mut completed = 0;
while let Some(entry) = entries.next().await {
let mut entry = entry?;
let path = entry.path()?;
let mut maybe_whiteout_path_str =
path.to_str().map(|x| x.to_string()).unwrap_or_default();
if (completed % 10) == 0 {
self.progress
.update(|progress| {
progress.extracting_layer(&layer.digest, completed, count);
})
.await;
}
completed += 1;
if whiteouts.contains(&maybe_whiteout_path_str) {
continue;
}
maybe_whiteout_path_str.push('/');
if whiteouts.contains(&maybe_whiteout_path_str) {
continue;
}
let Some(name) = path.file_name() else {
continue;
};
let Some(name) = name.to_str() else {
continue;
};
if name.starts_with(".wh.") {
continue;
self.process_whiteout_entry(&mut vfs, &entry, name, layer)
.await?;
} else {
vfs.insert_tar_entry(&entry)?;
self.process_write_entry(&mut vfs, &mut entry, layer)
@ -181,45 +154,13 @@ impl OciImageAssembler {
})
}
async fn process_layer_whiteout(
&self,
vfs: &mut VfsTree,
layer: &OciImageLayer,
) -> Result<(Vec<String>, usize)> {
let mut whiteouts = Vec::new();
let mut archive = layer.archive().await?;
let mut entries = archive.entries()?;
let mut count = 0usize;
while let Some(entry) = entries.next().await {
let entry = entry?;
count += 1;
let path = entry.path()?;
let Some(name) = path.file_name() else {
continue;
};
let Some(name) = name.to_str() else {
continue;
};
if name.starts_with(".wh.") {
let path = self
.process_whiteout_entry(vfs, &entry, name, layer)
.await?;
if let Some(path) = path {
whiteouts.push(path);
}
}
}
Ok((whiteouts, count))
}
async fn process_whiteout_entry(
&self,
vfs: &mut VfsTree,
entry: &Entry<Archive<Pin<Box<dyn AsyncRead + Send>>>>,
name: &str,
layer: &OciImageLayer,
) -> Result<Option<String>> {
) -> Result<()> {
let path = entry.path()?;
let mut path = path.to_path_buf();
path.pop();
@ -231,24 +172,27 @@ impl OciImageAssembler {
path.push(file);
}
trace!("whiteout entry layer={} path={:?}", &layer.digest, path,);
trace!(
"whiteout entry {:?} layer={} path={:?}",
entry.path()?,
&layer.digest,
path
);
let whiteout = path
.to_str()
.ok_or(anyhow!("unable to convert path to string"))?
.to_string();
let removed = vfs.root.remove(&path);
if let Some(removed) = removed {
delete_disk_paths(removed).await?;
let result = vfs.root.remove(&path);
if let Some((parent, mut removed)) = result {
delete_disk_paths(&removed).await?;
if opaque {
removed.children.clear();
parent.children.push(removed);
}
} else {
trace!(
warn!(
"whiteout entry layer={} path={:?} did not exist",
&layer.digest,
path
&layer.digest, path
);
}
Ok(if opaque { None } else { Some(whiteout) })
Ok(())
}
async fn process_write_entry(
@ -266,6 +210,9 @@ impl OciImageAssembler {
entry.path()?,
entry.header().entry_type(),
);
entry.set_preserve_permissions(false);
entry.set_unpack_xattrs(false);
entry.set_preserve_mtime(false);
let path = entry
.unpack_in(&self.disk_dir)
.await?
@ -275,7 +222,7 @@ impl OciImageAssembler {
}
}
async fn delete_disk_paths(node: VfsNode) -> Result<()> {
async fn delete_disk_paths(node: &VfsNode) -> Result<()> {
let mut queue = vec![node];
while !queue.is_empty() {
let node = queue.remove(0);
@ -285,7 +232,8 @@ async fn delete_disk_paths(node: VfsNode) -> Result<()> {
}
fs::remove_file(disk_path).await?;
}
queue.extend_from_slice(&node.children);
let children = node.children.iter().collect::<Vec<_>>();
queue.extend_from_slice(&children);
}
Ok(())
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use indexmap::IndexMap;
use tokio::sync::{broadcast::Sender, Mutex};
use tokio::sync::{mpsc::Sender, Mutex};
#[derive(Clone, Debug)]
pub struct OciProgress {
@ -108,7 +108,7 @@ impl OciProgressContext {
}
pub fn update(&self, progress: &OciProgress) {
let _ = self.sender.send(progress.clone());
let _ = self.sender.try_send(progress.clone());
}
}

View File

@ -7,7 +7,7 @@ use tokio::{
};
use tokio_tar::{Builder, Entry, EntryType, Header};
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum VfsNodeType {
Directory,
RegularFile,
@ -100,7 +100,7 @@ impl VfsNode {
Some(node)
}
pub fn remove(&mut self, path: &Path) -> Option<VfsNode> {
pub fn remove(&mut self, path: &Path) -> Option<(&mut VfsNode, VfsNode)> {
let parent = path.parent()?;
let node = self.lookup_mut(parent)?;
let file_name = path.file_name()?;
@ -109,7 +109,8 @@ impl VfsNode {
.children
.iter()
.position(|child| file_name == child.name)?;
Some(node.children.remove(position))
let removed = node.children.remove(position);
Some((node, removed))
}
pub fn create_tar_header(&self) -> Result<Header> {
@ -194,7 +195,7 @@ impl VfsTree {
}
pub fn insert_tar_entry<X: AsyncRead + Unpin>(&mut self, entry: &Entry<X>) -> Result<()> {
let meta = VfsNode::from(entry)?;
let mut meta = VfsNode::from(entry)?;
let path = entry.path()?.to_path_buf();
let parent = if let Some(parent) = path.parent() {
self.root.lookup_mut(parent)
@ -206,7 +207,17 @@ impl VfsTree {
return Err(anyhow!("unable to find parent of entry"));
};
parent.children.retain(|child| child.name != meta.name);
let position = parent
.children
.iter()
.position(|child| meta.name == child.name);
if let Some(position) = position {
let old = parent.children.remove(position);
if meta.typ == VfsNodeType::Directory {
meta.children = old.children;
}
}
parent.children.push(meta);
Ok(())
}
@ -237,7 +248,6 @@ impl VfsTree {
if path.components().count() != 0 {
node.write_to_tar(&path, &mut builder).await?;
}
for child in &node.children {
queue.push((path.clone(), child));
}