feat: oci tar format, bit-perfect disk storage for config and manifest, concurrent image pulls (#88)

* oci: retain bit-perfect copies of manifest and config on disk

* feat: oci tar format support

* feat: concurrent image pulls
This commit is contained in:
Alex Zenla
2024-04-16 01:53:44 -07:00
committed by GitHub
parent 79f7742caa
commit e450ebd2a2
21 changed files with 493 additions and 144 deletions

View File

@ -7,12 +7,18 @@ use crate::{
};
use anyhow::{anyhow, Result};
use log::warn;
use tokio::{pin, process::Command, select};
use tokio::{
fs::File,
pin,
process::{Child, Command},
select,
};
#[derive(Debug, Clone, Copy)]
pub enum OciPackerBackendType {
MkSquashfs,
MkfsErofs,
Tar,
}
impl OciPackerBackendType {
@ -20,6 +26,7 @@ impl OciPackerBackendType {
match self {
OciPackerBackendType::MkSquashfs => OciPackedFormat::Squashfs,
OciPackerBackendType::MkfsErofs => OciPackedFormat::Erofs,
OciPackerBackendType::Tar => OciPackedFormat::Tar,
}
}
@ -31,6 +38,7 @@ impl OciPackerBackendType {
OciPackerBackendType::MkfsErofs => {
Box::new(OciPackerMkfsErofs {}) as Box<dyn OciPackerBackend>
}
OciPackerBackendType::Tar => Box::new(OciPackerTar {}) as Box<dyn OciPackerBackend>,
}
}
}
@ -53,7 +61,7 @@ impl OciPackerBackend for OciPackerMkSquashfs {
})
.await;
let mut child = Command::new("mksquashfs")
let child = Command::new("mksquashfs")
.arg("-")
.arg(file)
.arg("-comp")
@ -63,7 +71,9 @@ impl OciPackerBackend for OciPackerMkSquashfs {
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()?;
let mut child = ChildProcessKillGuard(child);
let stdin = child
.0
.stdin
.take()
.ok_or(anyhow!("unable to acquire stdin stream"))?;
@ -74,7 +84,7 @@ impl OciPackerBackend for OciPackerMkSquashfs {
}
Ok(())
}));
let wait = child.wait();
let wait = child.0.wait();
pin!(wait);
let status_result = loop {
if let Some(inner) = writer.as_mut() {
@ -135,7 +145,7 @@ impl OciPackerBackend for OciPackerMkfsErofs {
})
.await;
let mut child = Command::new("mkfs.erofs")
let child = Command::new("mkfs.erofs")
.arg("-L")
.arg("root")
.arg("--tar=-")
@ -144,14 +154,16 @@ impl OciPackerBackend for OciPackerMkfsErofs {
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()?;
let mut child = ChildProcessKillGuard(child);
let stdin = child
.0
.stdin
.take()
.ok_or(anyhow!("unable to acquire stdin stream"))?;
let mut writer = Some(tokio::task::spawn(
async move { vfs.write_to_tar(stdin).await },
));
let wait = child.wait();
let wait = child.0.wait();
pin!(wait);
let status_result = loop {
if let Some(inner) = writer.as_mut() {
@ -199,3 +211,38 @@ impl OciPackerBackend for OciPackerMkfsErofs {
}
}
}
pub struct OciPackerTar {}
#[async_trait::async_trait]
impl OciPackerBackend for OciPackerTar {
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
progress
.update(|progress| {
progress.phase = OciProgressPhase::Packing;
progress.total = 1;
progress.value = 0;
})
.await;
let file = File::create(file).await?;
vfs.write_to_tar(file).await?;
progress
.update(|progress| {
progress.phase = OciProgressPhase::Packing;
progress.total = 1;
progress.value = 1;
})
.await;
Ok(())
}
}
struct ChildProcessKillGuard(Child);
impl Drop for ChildProcessKillGuard {
fn drop(&mut self) {
let _ = self.0.start_kill();
}
}

View File

@ -1,4 +1,7 @@
use crate::packer::{OciImagePacked, OciPackedFormat};
use crate::{
packer::{OciPackedFormat, OciPackedImage},
schema::OciSchema,
};
use anyhow::Result;
use log::debug;
@ -22,7 +25,7 @@ impl OciPackerCache {
&self,
digest: &str,
format: OciPackedFormat,
) -> Result<Option<OciImagePacked>> {
) -> Result<Option<OciPackedImage>> {
let mut fs_path = self.cache_dir.clone();
let mut config_path = self.cache_dir.clone();
let mut manifest_path = self.cache_dir.clone();
@ -38,17 +41,17 @@ impl OciPackerCache {
&& manifest_metadata.is_file()
&& config_metadata.is_file()
{
let manifest_text = fs::read_to_string(&manifest_path).await?;
let manifest: ImageManifest = serde_json::from_str(&manifest_text)?;
let config_text = fs::read_to_string(&config_path).await?;
let config: ImageConfiguration = serde_json::from_str(&config_text)?;
let manifest_bytes = fs::read(&manifest_path).await?;
let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?;
let config_bytes = fs::read(&config_path).await?;
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
debug!("cache hit digest={}", digest);
Some(OciImagePacked::new(
Some(OciPackedImage::new(
digest.to_string(),
fs_path.clone(),
format,
config,
manifest,
OciSchema::new(config_bytes, config),
OciSchema::new(manifest_bytes, manifest),
))
} else {
None
@ -60,7 +63,7 @@ impl OciPackerCache {
)
}
pub async fn store(&self, packed: OciImagePacked) -> Result<OciImagePacked> {
pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
debug!("cache store digest={}", packed.digest);
let mut fs_path = self.cache_dir.clone();
let mut manifest_path = self.cache_dir.clone();
@ -68,12 +71,10 @@ impl OciPackerCache {
fs_path.push(format!("{}.{}", packed.digest, packed.format.extension()));
manifest_path.push(format!("{}.manifest.json", packed.digest));
config_path.push(format!("{}.config.json", packed.digest));
fs::copy(&packed.path, &fs_path).await?;
let manifest_text = serde_json::to_string_pretty(&packed.manifest)?;
fs::write(&manifest_path, manifest_text).await?;
let config_text = serde_json::to_string_pretty(&packed.config)?;
fs::write(&config_path, config_text).await?;
Ok(OciImagePacked::new(
fs::rename(&packed.path, &fs_path).await?;
fs::write(&config_path, packed.config.raw()).await?;
fs::write(&manifest_path, packed.manifest.raw()).await?;
Ok(OciPackedImage::new(
packed.digest,
fs_path.clone(),
packed.format,

View File

@ -1,5 +1,7 @@
use std::path::PathBuf;
use crate::schema::OciSchema;
use self::backend::OciPackerBackendType;
use oci_spec::image::{ImageConfiguration, ImageManifest};
@ -7,11 +9,12 @@ pub mod backend;
pub mod cache;
pub mod service;
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Hash)]
pub enum OciPackedFormat {
#[default]
Squashfs,
Erofs,
Tar,
}
impl OciPackedFormat {
@ -19,6 +22,7 @@ impl OciPackedFormat {
match self {
OciPackedFormat::Squashfs => "squashfs",
OciPackedFormat::Erofs => "erofs",
OciPackedFormat::Tar => "tar",
}
}
@ -26,28 +30,29 @@ impl OciPackedFormat {
match self {
OciPackedFormat::Squashfs => OciPackerBackendType::MkSquashfs,
OciPackedFormat::Erofs => OciPackerBackendType::MkfsErofs,
OciPackedFormat::Tar => OciPackerBackendType::Tar,
}
}
}
#[derive(Clone)]
pub struct OciImagePacked {
pub struct OciPackedImage {
pub digest: String,
pub path: PathBuf,
pub format: OciPackedFormat,
pub config: ImageConfiguration,
pub manifest: ImageManifest,
pub config: OciSchema<ImageConfiguration>,
pub manifest: OciSchema<ImageManifest>,
}
impl OciImagePacked {
impl OciPackedImage {
pub fn new(
digest: String,
path: PathBuf,
format: OciPackedFormat,
config: ImageConfiguration,
manifest: ImageManifest,
) -> OciImagePacked {
OciImagePacked {
config: OciSchema<ImageConfiguration>,
manifest: OciSchema<ImageManifest>,
) -> OciPackedImage {
OciPackedImage {
digest,
path,
format,

View File

@ -1,22 +1,40 @@
use std::path::{Path, PathBuf};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Display,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{anyhow, Result};
use tokio::{
sync::{watch, Mutex},
task::JoinHandle,
};
use crate::{
assemble::OciImageAssembler,
fetch::OciImageFetcher,
fetch::{OciImageFetcher, OciResolvedImage},
name::ImageName,
progress::{OciBoundProgress, OciProgress, OciProgressContext},
registry::OciPlatform,
};
use super::{cache::OciPackerCache, OciImagePacked, OciPackedFormat};
use log::{error, info, warn};
use super::{cache::OciPackerCache, OciPackedFormat, OciPackedImage};
pub struct OciPackerTask {
progress: OciBoundProgress,
watch: watch::Sender<Option<Result<OciPackedImage>>>,
task: JoinHandle<()>,
}
#[derive(Clone)]
pub struct OciPackerService {
seed: Option<PathBuf>,
platform: OciPlatform,
cache: OciPackerCache,
tasks: Arc<Mutex<HashMap<OciPackerTaskKey, OciPackerTask>>>,
}
impl OciPackerService {
@ -29,6 +47,7 @@ impl OciPackerService {
seed,
cache: OciPackerCache::new(cache_dir)?,
platform,
tasks: Arc::new(Mutex::new(HashMap::new())),
})
}
@ -36,7 +55,7 @@ impl OciPackerService {
&self,
digest: &str,
format: OciPackedFormat,
) -> Result<Option<OciImagePacked>> {
) -> Result<Option<OciPackedImage>> {
self.cache.recall(digest, format).await
}
@ -45,14 +64,98 @@ impl OciPackerService {
name: ImageName,
format: OciPackedFormat,
progress_context: OciProgressContext,
) -> Result<OciImagePacked> {
) -> Result<OciPackedImage> {
let progress = OciProgress::new();
let progress = OciBoundProgress::new(progress_context.clone(), progress);
let fetcher =
OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone());
let resolved = fetcher.resolve(name).await?;
let key = OciPackerTaskKey {
digest: resolved.digest.clone(),
format,
};
let (progress_copy_task, mut receiver) = match self.tasks.lock().await.entry(key.clone()) {
Entry::Occupied(entry) => {
let entry = entry.get();
(
Some(entry.progress.also_update(progress_context).await),
entry.watch.subscribe(),
)
}
Entry::Vacant(entry) => {
let task = self
.clone()
.launch(key.clone(), format, resolved, fetcher, progress.clone())
.await;
let (watch, receiver) = watch::channel(None);
let task = OciPackerTask {
progress: progress.clone(),
task,
watch,
};
entry.insert(task);
(None, receiver)
}
};
let _progress_task_guard = scopeguard::guard(progress_copy_task, |task| {
if let Some(task) = task {
task.abort();
}
});
let _task_cancel_guard = scopeguard::guard(self.clone(), |service| {
service.maybe_cancel_task(key);
});
loop {
receiver.changed().await?;
let current = receiver.borrow_and_update();
if current.is_some() {
return current
.as_ref()
.map(|x| x.as_ref().map_err(|err| anyhow!("{}", err)).cloned())
.unwrap();
}
}
}
async fn launch(
self,
key: OciPackerTaskKey,
format: OciPackedFormat,
resolved: OciResolvedImage,
fetcher: OciImageFetcher,
progress: OciBoundProgress,
) -> JoinHandle<()> {
info!("packer task {} started", key);
tokio::task::spawn(async move {
let _task_drop_guard =
scopeguard::guard((key.clone(), self.clone()), |(key, service)| {
service.ensure_task_gone(key);
});
if let Err(error) = self
.task(key.clone(), format, resolved, fetcher, progress)
.await
{
self.finish(&key, Err(error)).await;
}
})
}
async fn task(
&self,
key: OciPackerTaskKey,
format: OciPackedFormat,
resolved: OciResolvedImage,
fetcher: OciImageFetcher,
progress: OciBoundProgress,
) -> Result<()> {
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? {
return Ok(cached);
self.finish(&key, Ok(cached)).await;
return Ok(());
}
let assembler =
OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?;
@ -67,8 +170,7 @@ impl OciPackerService {
packer
.pack(progress, assembled.vfs.clone(), &target)
.await?;
let packed = OciImagePacked::new(
let packed = OciPackedImage::new(
assembled.digest.clone(),
file,
format,
@ -76,6 +178,59 @@ impl OciPackerService {
assembled.manifest.clone(),
);
let packed = self.cache.store(packed).await?;
Ok(packed)
self.finish(&key, Ok(packed)).await;
Ok(())
}
async fn finish(&self, key: &OciPackerTaskKey, result: Result<OciPackedImage>) {
let Some(task) = self.tasks.lock().await.remove(key) else {
error!("packer task {} was not found when task completed", key);
return;
};
match result.as_ref() {
Ok(_) => {
info!("packer task {} completed", key);
}
Err(err) => {
warn!("packer task {} failed: {}", key, err);
}
}
task.watch.send_replace(Some(result));
}
fn maybe_cancel_task(self, key: OciPackerTaskKey) {
tokio::task::spawn(async move {
let tasks = self.tasks.lock().await;
if let Some(task) = tasks.get(&key) {
if task.watch.is_closed() {
task.task.abort();
}
}
});
}
fn ensure_task_gone(self, key: OciPackerTaskKey) {
tokio::task::spawn(async move {
let mut tasks = self.tasks.lock().await;
if let Some(task) = tasks.remove(&key) {
warn!("packer task {} aborted", key);
task.watch.send_replace(Some(Err(anyhow!("task aborted"))));
}
});
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
struct OciPackerTaskKey {
digest: String,
format: OciPackedFormat,
}
impl Display for OciPackerTaskKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{}:{}", self.digest, self.format.extension()))
}
}