diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index fce9f58..33a03bc 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -3,11 +3,10 @@ use std::{env::args, path::PathBuf}; use anyhow::Result; use env_logger::Env; use krataoci::{ - cache::ImageCache, - compiler::OciImageCompiler, name::ImageName, - packer::OciPackerFormat, + packer::{service::OciPackerService, OciPackedFormat}, progress::{OciProgress, OciProgressContext}, + registry::OciPlatform, }; use tokio::{fs, sync::broadcast}; @@ -23,8 +22,6 @@ async fn main() -> Result<()> { fs::create_dir(&cache_dir).await?; } - let cache = ImageCache::new(&cache_dir)?; - let (sender, mut receiver) = broadcast::channel::(1000); tokio::task::spawn(async move { loop { @@ -41,14 +38,14 @@ async fn main() -> Result<()> { } }); let context = OciProgressContext::new(sender); - let compiler = OciImageCompiler::new(&cache, seed, context)?; - let info = compiler - .compile(&image.to_string(), &image, OciPackerFormat::Squashfs) + let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current(), context)?; + let packed = service + .pack("cli", image.clone(), OciPackedFormat::Squashfs) .await?; println!( "generated squashfs of {} to {}", image, - info.image.to_string_lossy() + packed.path.to_string_lossy() ); Ok(()) } diff --git a/crates/oci/src/compiler.rs b/crates/oci/src/assemble/mod.rs similarity index 50% rename from crates/oci/src/compiler.rs rename to crates/oci/src/assemble/mod.rs index a0b7fee..4db8342 100644 --- a/crates/oci/src/compiler.rs +++ b/crates/oci/src/assemble/mod.rs @@ -1,11 +1,6 @@ -use crate::cache::ImageCache; -use crate::fetch::{OciImageDownloader, OciImageLayer}; -use crate::name::ImageName; -use crate::packer::OciPackerFormat; -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; -use crate::registry::OciRegistryPlatform; +use crate::fetch::{OciImageFetcher, OciImageLayer, OciResolvedImage}; +use crate::progress::OciBoundProgress; use anyhow::{anyhow, Result}; -use indexmap::IndexMap; use log::{debug, trace}; use oci_spec::image::{ImageConfiguration, ImageManifest}; use std::borrow::Cow; @@ -17,132 +12,111 @@ use tokio_stream::StreamExt; use tokio_tar::{Archive, Entry}; use uuid::Uuid; -pub const IMAGE_PACKER_VERSION: u64 = 2; - -pub struct ImageInfo { - pub image: PathBuf, +pub struct OciImageAssembled { + pub digest: String, + pub path: PathBuf, pub manifest: ImageManifest, pub config: ImageConfiguration, + pub tmp_dir: Option, } -impl ImageInfo { - pub fn new( - image: PathBuf, - manifest: ImageManifest, - config: ImageConfiguration, - ) -> Result { - Ok(ImageInfo { - image, - manifest, - config, - }) +impl Drop for OciImageAssembled { + fn drop(&mut self) { + if let Some(tmp) = self.tmp_dir.clone() { + tokio::task::spawn(async move { + let _ = fs::remove_dir_all(&tmp).await; + }); + } } } -pub struct OciImageCompiler<'a> { - cache: &'a ImageCache, - seed: Option, - progress: OciProgressContext, +pub struct OciImageAssembler { + downloader: OciImageFetcher, + resolved: OciResolvedImage, + progress: OciBoundProgress, + work_dir: PathBuf, + target_dir: PathBuf, + tmp_dir: Option, } -impl OciImageCompiler<'_> { - pub fn new( - cache: &ImageCache, - seed: Option, - progress: OciProgressContext, - ) -> Result { - Ok(OciImageCompiler { - cache, - seed, +impl OciImageAssembler { + pub async fn new( + downloader: OciImageFetcher, + resolved: OciResolvedImage, + progress: OciBoundProgress, + work_dir: Option, + target_dir: Option, + ) -> Result { + let tmp_dir = if work_dir.is_none() || target_dir.is_none() { + let mut tmp_dir = std::env::temp_dir().clone(); + tmp_dir.push(format!("oci-assemble-{}", Uuid::new_v4())); + Some(tmp_dir) + } else { + None + }; + + let work_dir = if let Some(work_dir) = work_dir { + work_dir + } else { + let mut tmp_dir = tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was not created when expected"))?; + tmp_dir.push("work"); + tmp_dir + }; + + let target_dir = if let Some(target_dir) = target_dir { + target_dir + } else { + let mut tmp_dir = tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was not created when expected"))?; + tmp_dir.push("image"); + tmp_dir + }; + + fs::create_dir_all(&work_dir).await?; + fs::create_dir_all(&target_dir).await?; + + Ok(OciImageAssembler { + downloader, + resolved, progress, + work_dir, + target_dir, + tmp_dir, }) } - pub async fn compile( - &self, - id: &str, - image: &ImageName, - format: OciPackerFormat, - ) -> Result { - debug!("compile image={image} format={:?}", format); - let mut tmp_dir = std::env::temp_dir().clone(); - tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); - - let mut image_dir = tmp_dir.clone(); - image_dir.push("image"); - fs::create_dir_all(&image_dir).await?; - - let mut layer_dir = tmp_dir.clone(); + pub async fn assemble(self) -> Result { + debug!("assemble"); + let mut layer_dir = self.work_dir.clone(); layer_dir.push("layer"); fs::create_dir_all(&layer_dir).await?; - - let mut packed_file = tmp_dir.clone(); - packed_file.push("image.packed"); - - let _guard = scopeguard::guard(tmp_dir.to_path_buf(), |delete| { - tokio::task::spawn(async move { - let _ = fs::remove_dir_all(delete).await; - }); - }); - let info = self - .download_and_compile(id, image, &layer_dir, &image_dir, &packed_file, format) - .await?; - Ok(info) + self.assemble_with(&layer_dir).await } - async fn download_and_compile( - &self, - id: &str, - image: &ImageName, - layer_dir: &Path, - image_dir: &Path, - packed_file: &Path, - format: OciPackerFormat, - ) -> Result { - let mut progress = OciProgress { - id: id.to_string(), - phase: OciProgressPhase::Resolving, - layers: IndexMap::new(), - value: 0, - total: 0, - }; - self.progress.update(&progress); - let downloader = OciImageDownloader::new( - self.seed.clone(), - layer_dir.to_path_buf(), - OciRegistryPlatform::current(), - self.progress.clone(), - ); - let resolved = downloader.resolve(image.clone()).await?; - let cache_key = format!( - "manifest={}:version={}:format={}\n", - resolved.digest, - IMAGE_PACKER_VERSION, - format.id(), - ); - let cache_digest = sha256::digest(cache_key); - - if let Some(cached) = self.cache.recall(&cache_digest, format).await? { - return Ok(cached); - } - - progress.phase = OciProgressPhase::Resolved; - for layer in resolved.manifest.layers() { - progress.add_layer(layer.digest()); - } - self.progress.update(&progress); - - let local = downloader.download(resolved, &mut progress).await?; + async fn assemble_with(self, layer_dir: &Path) -> Result { + let local = self + .downloader + .download(self.resolved.clone(), layer_dir) + .await?; for layer in &local.layers { debug!( "process layer digest={} compression={:?}", &layer.digest, layer.compression, ); - progress.extracting_layer(&layer.digest, 0, 1); - self.progress.update(&progress); - let (whiteouts, count) = self.process_layer_whiteout(layer, image_dir).await?; - progress.extracting_layer(&layer.digest, 0, count); - self.progress.update(&progress); + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, 0, 1); + }) + .await; + let (whiteouts, count) = self.process_layer_whiteout(layer).await?; + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, 0, count); + }) + .await; debug!( "process layer digest={} whiteouts={:?}", &layer.digest, whiteouts @@ -156,10 +130,13 @@ impl OciImageCompiler<'_> { let mut maybe_whiteout_path_str = path.to_str().map(|x| x.to_string()).unwrap_or_default(); if (completed % 10) == 0 { - progress.extracting_layer(&layer.digest, completed, count); + self.progress + .update(|progress| { + progress.extracting_layer(&layer.digest, completed, count); + }) + .await; } completed += 1; - self.progress.update(&progress); if whiteouts.contains(&maybe_whiteout_path_str) { continue; } @@ -177,12 +154,14 @@ impl OciImageCompiler<'_> { if name.starts_with(".wh.") { continue; } else { - self.process_write_entry(&mut entry, layer, image_dir) - .await?; + self.process_write_entry(&mut entry, layer).await?; } } - progress.extracted_layer(&layer.digest); - self.progress.update(&progress); + self.progress + .update(|progress| { + progress.extracted_layer(&layer.digest); + }) + .await; } for layer in &local.layers { @@ -190,41 +169,16 @@ impl OciImageCompiler<'_> { fs::remove_file(&layer.path).await?; } } - - let image_dir_pack = image_dir.to_path_buf(); - let packed_file_pack = packed_file.to_path_buf(); - let progress_pack = progress.clone(); - let progress_context = self.progress.clone(); - let format_pack = format; - progress = tokio::task::spawn_blocking(move || { - OciImageCompiler::pack( - format_pack, - &image_dir_pack, - &packed_file_pack, - progress_pack, - progress_context, - ) + Ok(OciImageAssembled { + digest: self.resolved.digest, + path: self.target_dir, + manifest: self.resolved.manifest, + config: local.config, + tmp_dir: self.tmp_dir, }) - .await??; - - let info = ImageInfo::new( - packed_file.to_path_buf(), - local.image.manifest, - local.config, - )?; - let info = self.cache.store(&cache_digest, &info, format).await?; - progress.phase = OciProgressPhase::Complete; - progress.value = 0; - progress.total = 0; - self.progress.update(&progress); - Ok(info) } - async fn process_layer_whiteout( - &self, - layer: &OciImageLayer, - image_dir: &Path, - ) -> Result<(Vec, usize)> { + async fn process_layer_whiteout(&self, layer: &OciImageLayer) -> Result<(Vec, usize)> { let mut whiteouts = Vec::new(); let mut archive = layer.archive().await?; let mut entries = archive.entries()?; @@ -241,9 +195,7 @@ impl OciImageCompiler<'_> { }; if name.starts_with(".wh.") { - let path = self - .process_whiteout_entry(&entry, name, layer, image_dir) - .await?; + let path = self.process_whiteout_entry(&entry, name, layer).await?; if let Some(path) = path { whiteouts.push(path); } @@ -257,10 +209,9 @@ impl OciImageCompiler<'_> { entry: &Entry>>>, name: &str, layer: &OciImageLayer, - image_dir: &Path, ) -> Result> { let path = entry.path()?; - let mut dst = self.check_safe_entry(path.clone(), image_dir)?; + let mut dst = self.check_safe_entry(path.clone())?; dst.pop(); let mut path = path.to_path_buf(); path.pop(); @@ -271,7 +222,7 @@ impl OciImageCompiler<'_> { let file = &name[4..]; dst.push(file); path.push(file); - self.check_safe_path(&dst, image_dir)?; + self.check_safe_path(&dst)?; } trace!("whiteout entry layer={} path={:?}", &layer.digest, path,); @@ -321,7 +272,6 @@ impl OciImageCompiler<'_> { &self, entry: &mut Entry>>>, layer: &OciImageLayer, - image_dir: &Path, ) -> Result<()> { let uid = entry.header().uid()?; let gid = entry.header().gid()?; @@ -336,7 +286,7 @@ impl OciImageCompiler<'_> { entry.set_preserve_mtime(true); entry.set_preserve_permissions(true); entry.set_unpack_xattrs(true); - if let Some(path) = entry.unpack_in(image_dir).await? { + if let Some(path) = entry.unpack_in(&self.target_dir).await? { if !path.is_symlink() { std::os::unix::fs::chown(path, Some(uid as u32), Some(gid as u32))?; } @@ -344,45 +294,28 @@ impl OciImageCompiler<'_> { Ok(()) } - fn check_safe_entry(&self, path: Cow, image_dir: &Path) -> Result { - let mut dst = image_dir.to_path_buf(); + fn check_safe_entry(&self, path: Cow) -> Result { + let mut dst = self.target_dir.to_path_buf(); dst.push(path); if let Some(name) = dst.file_name() { if let Some(name) = name.to_str() { if name.starts_with(".wh.") { let copy = dst.clone(); dst.pop(); - self.check_safe_path(&dst, image_dir)?; + self.check_safe_path(&dst)?; return Ok(copy); } } } - self.check_safe_path(&dst, image_dir)?; + self.check_safe_path(&dst)?; Ok(dst) } - fn check_safe_path(&self, dst: &Path, image_dir: &Path) -> Result<()> { + fn check_safe_path(&self, dst: &Path) -> Result<()> { let resolved = path_clean::clean(dst); - if !resolved.starts_with(image_dir) { + if !resolved.starts_with(&self.target_dir) { return Err(anyhow!("layer attempts to work outside image dir")); } Ok(()) } - - fn pack( - format: OciPackerFormat, - image_dir: &Path, - packed_file: &Path, - mut progress: OciProgress, - progress_context: OciProgressContext, - ) -> Result { - let backend = format.detect_best_backend(); - let backend = backend.create(); - backend.pack(&mut progress, &progress_context, image_dir, packed_file)?; - std::fs::remove_dir_all(image_dir)?; - progress.phase = OciProgressPhase::Packing; - progress.value = progress.total; - progress_context.update(&progress); - Ok(progress) - } } diff --git a/crates/oci/src/cache.rs b/crates/oci/src/cache.rs deleted file mode 100644 index f79f335..0000000 --- a/crates/oci/src/cache.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::packer::OciPackerFormat; - -use super::compiler::ImageInfo; -use anyhow::Result; -use log::debug; -use oci_spec::image::{ImageConfiguration, ImageManifest}; -use std::path::{Path, PathBuf}; -use tokio::fs; - -#[derive(Clone)] -pub struct ImageCache { - cache_dir: PathBuf, -} - -impl ImageCache { - pub fn new(cache_dir: &Path) -> Result { - Ok(ImageCache { - cache_dir: cache_dir.to_path_buf(), - }) - } - - pub async fn recall(&self, digest: &str, format: OciPackerFormat) -> Result> { - let mut fs_path = self.cache_dir.clone(); - let mut config_path = self.cache_dir.clone(); - let mut manifest_path = self.cache_dir.clone(); - fs_path.push(format!("{}.{}", digest, format.extension())); - manifest_path.push(format!("{}.manifest.json", digest)); - config_path.push(format!("{}.config.json", digest)); - Ok( - if fs_path.exists() && manifest_path.exists() && config_path.exists() { - let image_metadata = fs::metadata(&fs_path).await?; - let manifest_metadata = fs::metadata(&manifest_path).await?; - let config_metadata = fs::metadata(&config_path).await?; - if image_metadata.is_file() - && manifest_metadata.is_file() - && config_metadata.is_file() - { - let manifest_text = fs::read_to_string(&manifest_path).await?; - let manifest: ImageManifest = serde_json::from_str(&manifest_text)?; - let config_text = fs::read_to_string(&config_path).await?; - let config: ImageConfiguration = serde_json::from_str(&config_text)?; - debug!("cache hit digest={}", digest); - Some(ImageInfo::new(fs_path.clone(), manifest, config)?) - } else { - None - } - } else { - debug!("cache miss digest={}", digest); - None - }, - ) - } - - pub async fn store( - &self, - digest: &str, - info: &ImageInfo, - format: OciPackerFormat, - ) -> Result { - debug!("cache store digest={}", digest); - let mut fs_path = self.cache_dir.clone(); - let mut manifest_path = self.cache_dir.clone(); - let mut config_path = self.cache_dir.clone(); - fs_path.push(format!("{}.{}", digest, format.extension())); - manifest_path.push(format!("{}.manifest.json", digest)); - config_path.push(format!("{}.config.json", digest)); - fs::copy(&info.image, &fs_path).await?; - let manifest_text = serde_json::to_string_pretty(&info.manifest)?; - fs::write(&manifest_path, manifest_text).await?; - let config_text = serde_json::to_string_pretty(&info.config)?; - fs::write(&config_path, config_text).await?; - ImageInfo::new(fs_path.clone(), info.manifest.clone(), info.config.clone()) - } -} diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index deccfe0..9260afe 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -1,8 +1,8 @@ -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; +use crate::progress::{OciBoundProgress, OciProgressPhase}; use super::{ name::ImageName, - registry::{OciRegistryClient, OciRegistryPlatform}, + registry::{OciPlatform, OciRegistryClient}, }; use std::{ @@ -24,11 +24,10 @@ use tokio::{ use tokio_stream::StreamExt; use tokio_tar::Archive; -pub struct OciImageDownloader { +pub struct OciImageFetcher { seed: Option, - storage: PathBuf, - platform: OciRegistryPlatform, - progress: OciProgressContext, + platform: OciPlatform, + progress: OciBoundProgress, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -77,16 +76,14 @@ pub struct OciLocalImage { pub layers: Vec, } -impl OciImageDownloader { +impl OciImageFetcher { pub fn new( seed: Option, - storage: PathBuf, - platform: OciRegistryPlatform, - progress: OciProgressContext, - ) -> OciImageDownloader { - OciImageDownloader { + platform: OciPlatform, + progress: OciBoundProgress, + ) -> OciImageFetcher { + OciImageFetcher { seed, - storage, platform, progress, } @@ -216,12 +213,14 @@ impl OciImageDownloader { pub async fn download( &self, image: OciResolvedImage, - progress: &mut OciProgress, + layer_dir: &Path, ) -> Result { let config: ImageConfiguration; - - progress.phase = OciProgressPhase::ConfigAcquire; - self.progress.update(progress); + self.progress + .update(|progress| { + progress.phase = OciProgressPhase::ConfigAcquire; + }) + .await; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; if let Some(seeded) = self .load_seed_json_blob::(image.manifest.config()) @@ -234,18 +233,31 @@ impl OciImageDownloader { .await?; config = serde_json::from_slice(&config_bytes)?; } - progress.phase = OciProgressPhase::LayerAcquire; - self.progress.update(progress); + self.progress + .update(|progress| { + progress.phase = OciProgressPhase::LayerAcquire; + + for layer in image.manifest.layers() { + progress.add_layer(layer.digest(), layer.size() as usize); + } + }) + .await; let mut layers = Vec::new(); for layer in image.manifest.layers() { - progress.downloading_layer(layer.digest(), 0, layer.size() as usize); - self.progress.update(progress); + self.progress + .update(|progress| { + progress.downloading_layer(layer.digest(), 0, layer.size() as usize); + }) + .await; layers.push( - self.acquire_layer(&image.name, layer, &mut client, progress) + self.acquire_layer(&image.name, layer, layer_dir, &mut client) .await?, ); - progress.downloaded_layer(layer.digest()); - self.progress.update(progress); + self.progress + .update(|progress| { + progress.downloaded_layer(layer.digest()); + }) + .await; } Ok(OciLocalImage { image, @@ -258,28 +270,22 @@ impl OciImageDownloader { &self, image: &ImageName, layer: &Descriptor, + layer_dir: &Path, client: &mut OciRegistryClient, - progress: &mut OciProgress, ) -> Result { debug!( "acquire layer digest={} size={}", layer.digest(), layer.size() ); - let mut layer_path = self.storage.clone(); + let mut layer_path = layer_dir.to_path_buf(); layer_path.push(format!("{}.layer", layer.digest())); let seeded = self.extract_seed_blob(layer, &layer_path).await?; if !seeded { let file = File::create(&layer_path).await?; let size = client - .write_blob_to_file( - &image.name, - layer, - file, - Some(progress), - Some(&self.progress), - ) + .write_blob_to_file(&image.name, layer, file, Some(self.progress.clone())) .await?; if layer.size() as u64 != size { return Err(anyhow!( diff --git a/crates/oci/src/lib.rs b/crates/oci/src/lib.rs index 1b117a7..b0e650d 100644 --- a/crates/oci/src/lib.rs +++ b/crates/oci/src/lib.rs @@ -1,5 +1,4 @@ -pub mod cache; -pub mod compiler; +pub mod assemble; pub mod fetch; pub mod name; pub mod packer; diff --git a/crates/oci/src/packer.rs b/crates/oci/src/packer/backend.rs similarity index 67% rename from crates/oci/src/packer.rs rename to crates/oci/src/packer/backend.rs index 2022ec0..2ec5804 100644 --- a/crates/oci/src/packer.rs +++ b/crates/oci/src/packer/backend.rs @@ -6,116 +6,23 @@ use std::{ process::{Command, Stdio}, }; +use crate::progress::{OciBoundProgress, OciProgressPhase}; use anyhow::{anyhow, Result}; use backhand::{compression::Compressor, FilesystemCompressor, FilesystemWriter, NodeHeader}; use log::{trace, warn}; use walkdir::WalkDir; -use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase}; - -#[derive(Debug, Default, Clone, Copy)] -pub enum OciPackerFormat { - #[default] - Squashfs, - Erofs, -} - -#[derive(Debug, Clone, Copy)] -pub enum OciPackerBackendType { - Backhand, - MkSquashfs, - MkfsErofs, -} - -impl OciPackerFormat { - pub fn id(&self) -> u8 { - match self { - OciPackerFormat::Squashfs => 0, - OciPackerFormat::Erofs => 1, - } - } - - pub fn extension(&self) -> &str { - match self { - OciPackerFormat::Squashfs => "erofs", - OciPackerFormat::Erofs => "erofs", - } - } - - pub fn detect_best_backend(&self) -> OciPackerBackendType { - match self { - OciPackerFormat::Squashfs => { - let status = Command::new("mksquashfs") - .arg("-version") - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .stdout(Stdio::null()) - .status() - .ok(); - - let Some(code) = status.and_then(|x| x.code()) else { - return OciPackerBackendType::Backhand; - }; - - if code == 0 { - OciPackerBackendType::MkSquashfs - } else { - OciPackerBackendType::Backhand - } - } - OciPackerFormat::Erofs => OciPackerBackendType::MkfsErofs, - } - } -} - -impl OciPackerBackendType { - pub fn format(&self) -> OciPackerFormat { - match self { - OciPackerBackendType::Backhand => OciPackerFormat::Squashfs, - OciPackerBackendType::MkSquashfs => OciPackerFormat::Squashfs, - OciPackerBackendType::MkfsErofs => OciPackerFormat::Erofs, - } - } - - pub fn create(&self) -> Box { - match self { - OciPackerBackendType::Backhand => { - Box::new(OciPackerBackhand {}) as Box - } - OciPackerBackendType::MkSquashfs => { - Box::new(OciPackerMkSquashfs {}) as Box - } - OciPackerBackendType::MkfsErofs => { - Box::new(OciPackerMkfsErofs {}) as Box - } - } - } -} - -pub trait OciPackerBackend { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()>; -} +use super::OciPackedFormat; pub struct OciPackerBackhand {} impl OciPackerBackend for OciPackerBackhand { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); + fn pack(&self, progress: OciBoundProgress, directory: &Path, file: &Path) -> Result<()> { + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }); let mut writer = FilesystemWriter::default(); writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); let walk = WalkDir::new(directory).follow_links(false); @@ -172,11 +79,6 @@ impl OciPackerBackend for OciPackerBackhand { return Err(anyhow!("invalid file type")); } } - - progress.phase = OciProgressPhase::Packing; - progress.value = 1; - progress_context.update(progress); - let squash_file_path = file .to_str() .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; @@ -185,6 +87,11 @@ impl OciPackerBackend for OciPackerBackhand { let mut bufwrite = BufWriter::new(file); trace!("squash generate: {}", squash_file_path); writer.write(&mut bufwrite)?; + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }); Ok(()) } } @@ -228,20 +135,50 @@ impl Drop for ConsumingFileReader { } } +#[derive(Debug, Clone, Copy)] +pub enum OciPackerBackendType { + Backhand, + MkSquashfs, + MkfsErofs, +} + +impl OciPackerBackendType { + pub fn format(&self) -> OciPackedFormat { + match self { + OciPackerBackendType::Backhand => OciPackedFormat::Squashfs, + OciPackerBackendType::MkSquashfs => OciPackedFormat::Squashfs, + OciPackerBackendType::MkfsErofs => OciPackedFormat::Erofs, + } + } + + pub fn create(&self) -> Box { + match self { + OciPackerBackendType::Backhand => { + Box::new(OciPackerBackhand {}) as Box + } + OciPackerBackendType::MkSquashfs => { + Box::new(OciPackerMkSquashfs {}) as Box + } + OciPackerBackendType::MkfsErofs => { + Box::new(OciPackerMkfsErofs {}) as Box + } + } + } +} + +pub trait OciPackerBackend { + fn pack(&self, progress: OciBoundProgress, directory: &Path, file: &Path) -> Result<()>; +} + pub struct OciPackerMkSquashfs {} impl OciPackerBackend for OciPackerMkSquashfs { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); + fn pack(&self, progress: OciBoundProgress, directory: &Path, file: &Path) -> Result<()> { + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }); let mut child = Command::new("mksquashfs") .arg(directory) .arg(file) @@ -258,10 +195,11 @@ impl OciPackerBackend for OciPackerMkSquashfs { status.code().unwrap() )) } else { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - progress_context.update(progress); + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }); Ok(()) } } @@ -270,17 +208,12 @@ impl OciPackerBackend for OciPackerMkSquashfs { pub struct OciPackerMkfsErofs {} impl OciPackerBackend for OciPackerMkfsErofs { - fn pack( - &self, - progress: &mut OciProgress, - progress_context: &OciProgressContext, - directory: &Path, - file: &Path, - ) -> Result<()> { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 0; - progress_context.update(progress); + fn pack(&self, progress: OciBoundProgress, directory: &Path, file: &Path) -> Result<()> { + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 0; + }); let mut child = Command::new("mkfs.erofs") .arg("-L") .arg("root") @@ -297,10 +230,11 @@ impl OciPackerBackend for OciPackerMkfsErofs { status.code().unwrap() )) } else { - progress.phase = OciProgressPhase::Packing; - progress.total = 1; - progress.value = 1; - progress_context.update(progress); + progress.update_blocking(|progress| { + progress.phase = OciProgressPhase::Packing; + progress.total = 1; + progress.value = 1; + }); Ok(()) } } diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs new file mode 100644 index 0000000..83dc8cc --- /dev/null +++ b/crates/oci/src/packer/cache.rs @@ -0,0 +1,87 @@ +use crate::{ + fetch::OciResolvedImage, + packer::{OciImagePacked, OciPackedFormat}, +}; + +use anyhow::Result; +use log::debug; +use oci_spec::image::{ImageConfiguration, ImageManifest}; +use std::path::{Path, PathBuf}; +use tokio::fs; + +#[derive(Clone)] +pub struct OciPackerCache { + cache_dir: PathBuf, +} + +impl OciPackerCache { + pub fn new(cache_dir: &Path) -> Result { + Ok(OciPackerCache { + cache_dir: cache_dir.to_path_buf(), + }) + } + + pub async fn recall( + &self, + resolved: &OciResolvedImage, + format: OciPackedFormat, + ) -> Result> { + let mut fs_path = self.cache_dir.clone(); + let mut config_path = self.cache_dir.clone(); + let mut manifest_path = self.cache_dir.clone(); + fs_path.push(format!("{}.{}", resolved.digest, format.extension())); + manifest_path.push(format!("{}.manifest.json", resolved.digest)); + config_path.push(format!("{}.config.json", resolved.digest)); + Ok( + if fs_path.exists() && manifest_path.exists() && config_path.exists() { + let image_metadata = fs::metadata(&fs_path).await?; + let manifest_metadata = fs::metadata(&manifest_path).await?; + let config_metadata = fs::metadata(&config_path).await?; + if image_metadata.is_file() + && manifest_metadata.is_file() + && config_metadata.is_file() + { + let manifest_text = fs::read_to_string(&manifest_path).await?; + let manifest: ImageManifest = serde_json::from_str(&manifest_text)?; + let config_text = fs::read_to_string(&config_path).await?; + let config: ImageConfiguration = serde_json::from_str(&config_text)?; + debug!("cache hit digest={}", resolved.digest); + Some(OciImagePacked::new( + resolved.digest.clone(), + fs_path.clone(), + format, + config, + manifest, + )) + } else { + None + } + } else { + debug!("cache miss digest={}", resolved.digest); + None + }, + ) + } + + pub async fn store(&self, packed: OciImagePacked) -> Result { + debug!("cache store digest={}", packed.digest); + let mut fs_path = self.cache_dir.clone(); + let mut manifest_path = self.cache_dir.clone(); + let mut config_path = self.cache_dir.clone(); + fs_path.push(format!("{}.{}", packed.digest, packed.format.extension())); + manifest_path.push(format!("{}.manifest.json", packed.digest)); + config_path.push(format!("{}.config.json", packed.digest)); + fs::copy(&packed.path, &fs_path).await?; + let manifest_text = serde_json::to_string_pretty(&packed.manifest)?; + fs::write(&manifest_path, manifest_text).await?; + let config_text = serde_json::to_string_pretty(&packed.config)?; + fs::write(&config_path, config_text).await?; + Ok(OciImagePacked::new( + packed.digest, + fs_path.clone(), + packed.format, + packed.config, + packed.manifest, + )) + } +} diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs new file mode 100644 index 0000000..2955c47 --- /dev/null +++ b/crates/oci/src/packer/mod.rs @@ -0,0 +1,78 @@ +use self::backend::OciPackerBackendType; +use oci_spec::image::{ImageConfiguration, ImageManifest}; +use std::{ + path::PathBuf, + process::{Command, Stdio}, +}; + +pub mod backend; +pub mod cache; +pub mod service; + +#[derive(Debug, Default, Clone, Copy)] +pub enum OciPackedFormat { + #[default] + Squashfs, + Erofs, +} + +impl OciPackedFormat { + pub fn extension(&self) -> &str { + match self { + OciPackedFormat::Squashfs => "squashfs", + OciPackedFormat::Erofs => "erofs", + } + } + + pub fn detect_best_backend(&self) -> OciPackerBackendType { + match self { + OciPackedFormat::Squashfs => { + let status = Command::new("mksquashfs") + .arg("-version") + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .stdout(Stdio::null()) + .status() + .ok(); + + let Some(code) = status.and_then(|x| x.code()) else { + return OciPackerBackendType::Backhand; + }; + + if code == 0 { + OciPackerBackendType::MkSquashfs + } else { + OciPackerBackendType::Backhand + } + } + OciPackedFormat::Erofs => OciPackerBackendType::MkfsErofs, + } + } +} + +#[derive(Clone)] +pub struct OciImagePacked { + pub digest: String, + pub path: PathBuf, + pub format: OciPackedFormat, + pub config: ImageConfiguration, + pub manifest: ImageManifest, +} + +impl OciImagePacked { + pub fn new( + digest: String, + path: PathBuf, + format: OciPackedFormat, + config: ImageConfiguration, + manifest: ImageManifest, + ) -> OciImagePacked { + OciImagePacked { + digest, + path, + format, + config, + manifest, + } + } +} diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs new file mode 100644 index 0000000..9d3994e --- /dev/null +++ b/crates/oci/src/packer/service.rs @@ -0,0 +1,77 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Result}; + +use crate::{ + assemble::OciImageAssembler, + fetch::OciImageFetcher, + name::ImageName, + progress::{OciBoundProgress, OciProgress, OciProgressContext}, + registry::OciPlatform, +}; + +use super::{cache::OciPackerCache, OciImagePacked, OciPackedFormat}; + +#[derive(Clone)] +pub struct OciPackerService { + seed: Option, + platform: OciPlatform, + cache: OciPackerCache, + progress: OciProgressContext, +} + +impl OciPackerService { + pub fn new( + seed: Option, + cache_dir: &Path, + platform: OciPlatform, + progress: OciProgressContext, + ) -> Result { + Ok(OciPackerService { + seed, + cache: OciPackerCache::new(cache_dir)?, + platform, + progress, + }) + } + + pub async fn pack( + &self, + id: &str, + name: ImageName, + format: OciPackedFormat, + ) -> Result { + let progress = OciProgress::new(id); + let progress = OciBoundProgress::new(self.progress.clone(), progress); + let fetcher = + OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); + let resolved = fetcher.resolve(name).await?; + if let Some(cached) = self.cache.recall(&resolved, format).await? { + return Ok(cached); + } + let assembler = + OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; + let assembled = assembler.assemble().await?; + let mut file = assembled + .tmp_dir + .clone() + .ok_or(anyhow!("tmp_dir was missing when packing image"))?; + file.push("image.pack"); + let target = file.clone(); + let directory = assembled.path.clone(); + tokio::task::spawn_blocking(move || { + let packer = format.detect_best_backend().create(); + packer.pack(progress, &directory, &target) + }) + .await??; + let packed = OciImagePacked::new( + assembled.digest.clone(), + file, + format, + assembled.config.clone(), + assembled.manifest.clone(), + ); + let packed = self.cache.store(packed).await?; + Ok(packed) + } +} diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index 905a12f..b73c945 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,5 +1,7 @@ +use std::sync::Arc; + use indexmap::IndexMap; -use tokio::sync::broadcast::Sender; +use tokio::sync::{broadcast::Sender, Mutex}; #[derive(Clone, Debug)] pub struct OciProgress { @@ -11,14 +13,24 @@ pub struct OciProgress { } impl OciProgress { - pub fn add_layer(&mut self, id: &str) { + pub fn new(id: &str) -> Self { + OciProgress { + id: id.to_string(), + phase: OciProgressPhase::Resolving, + layers: IndexMap::new(), + value: 0, + total: 1, + } + } + + pub fn add_layer(&mut self, id: &str, size: usize) { self.layers.insert( id.to_string(), OciProgressLayer { id: id.to_string(), phase: OciProgressLayerPhase::Waiting, value: 0, - total: 0, + total: size as u64, }, ); } @@ -95,3 +107,30 @@ impl OciProgressContext { let _ = self.sender.send(progress.clone()); } } + +#[derive(Clone)] +pub struct OciBoundProgress { + context: OciProgressContext, + instance: Arc>, +} + +impl OciBoundProgress { + pub fn new(context: OciProgressContext, progress: OciProgress) -> OciBoundProgress { + OciBoundProgress { + context, + instance: Arc::new(Mutex::new(progress)), + } + } + + pub async fn update(&self, function: impl FnOnce(&mut OciProgress)) { + let mut progress = self.instance.lock().await; + function(&mut progress); + self.context.update(&progress); + } + + pub fn update_blocking(&self, function: impl FnOnce(&mut OciProgress)) { + let mut progress = self.instance.blocking_lock(); + function(&mut progress); + self.context.update(&progress); + } +} diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index d3ecc34..1b93e51 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -7,28 +7,28 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode}; use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; -use crate::progress::{OciProgress, OciProgressContext}; +use crate::progress::OciBoundProgress; #[derive(Clone, Debug)] -pub struct OciRegistryPlatform { +pub struct OciPlatform { pub os: Os, pub arch: Arch, } -impl OciRegistryPlatform { +impl OciPlatform { #[cfg(target_arch = "x86_64")] const CURRENT_ARCH: Arch = Arch::Amd64; #[cfg(target_arch = "aarch64")] const CURRENT_ARCH: Arch = Arch::ARM64; - pub fn new(os: Os, arch: Arch) -> OciRegistryPlatform { - OciRegistryPlatform { os, arch } + pub fn new(os: Os, arch: Arch) -> OciPlatform { + OciPlatform { os, arch } } - pub fn current() -> OciRegistryPlatform { - OciRegistryPlatform { + pub fn current() -> OciPlatform { + OciPlatform { os: Os::Linux, - arch: OciRegistryPlatform::CURRENT_ARCH, + arch: OciPlatform::CURRENT_ARCH, } } } @@ -36,12 +36,12 @@ impl OciRegistryPlatform { pub struct OciRegistryClient { agent: Client, url: Url, - platform: OciRegistryPlatform, + platform: OciPlatform, token: Option, } impl OciRegistryClient { - pub fn new(url: Url, platform: OciRegistryPlatform) -> Result { + pub fn new(url: Url, platform: OciPlatform) -> Result { Ok(OciRegistryClient { agent: Client::new(), url, @@ -140,8 +140,7 @@ impl OciRegistryClient { name: N, descriptor: &Descriptor, mut dest: File, - mut progress_handle: Option<&mut OciProgress>, - progress_context: Option<&OciProgressContext>, + progress: Option, ) -> Result { let url = self.url.join(&format!( "/v2/{}/blobs/{}", @@ -157,15 +156,16 @@ impl OciRegistryClient { if (size - last_progress_size) > (5 * 1024 * 1024) { last_progress_size = size; - if let Some(progress_handle) = progress_handle.as_mut() { - progress_handle.downloading_layer( - descriptor.digest(), - size as usize, - descriptor.size() as usize, - ); - if let Some(progress_context) = progress_context.as_ref() { - progress_context.update(progress_handle); - } + if let Some(ref progress) = progress { + progress + .update(|progress| { + progress.downloading_layer( + descriptor.digest(), + size as usize, + descriptor.size() as usize, + ); + }) + .await; } } } diff --git a/crates/runtime/src/cfgblk.rs b/crates/runtime/src/cfgblk.rs index 044a16d..0ae491f 100644 --- a/crates/runtime/src/cfgblk.rs +++ b/crates/runtime/src/cfgblk.rs @@ -1,7 +1,7 @@ use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; use krata::launchcfg::LaunchInfo; -use krataoci::compiler::ImageInfo; +use krataoci::packer::OciImagePacked; use log::trace; use std::fs; use std::fs::File; @@ -9,28 +9,24 @@ use std::path::PathBuf; use uuid::Uuid; pub struct ConfigBlock<'a> { - pub image_info: &'a ImageInfo, + pub image: &'a OciImagePacked, pub file: PathBuf, pub dir: PathBuf, } impl ConfigBlock<'_> { - pub fn new<'a>(uuid: &Uuid, image_info: &'a ImageInfo) -> Result> { + pub fn new<'a>(uuid: &Uuid, image: &'a OciImagePacked) -> Result> { let mut dir = std::env::temp_dir().clone(); dir.push(format!("krata-cfg-{}", uuid)); fs::create_dir_all(&dir)?; let mut file = dir.clone(); file.push("config.squashfs"); - Ok(ConfigBlock { - image_info, - file, - dir, - }) + Ok(ConfigBlock { image, file, dir }) } pub fn build(&self, launch_config: &LaunchInfo) -> Result<()> { trace!("build launch_config={:?}", launch_config); - let manifest = self.image_info.config.to_string()?; + let manifest = self.image.config.to_string()?; let launch = serde_json::to_string(launch_config)?; let mut writer = FilesystemWriter::default(); writer.push_dir( diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index b07a6c7..bd8f0d0 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,8 +10,8 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, }; -use krataoci::packer::OciPackerFormat; -use krataoci::progress::OciProgressContext; +use krataoci::packer::service::OciPackerService; +use krataoci::packer::{OciImagePacked, OciPackedFormat}; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -19,11 +19,7 @@ use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; use crate::RuntimeContext; -use krataoci::{ - cache::ImageCache, - compiler::{ImageInfo, OciImageCompiler}, - name::ImageName, -}; +use krataoci::name::ImageName; use super::{GuestInfo, GuestState}; @@ -55,15 +51,14 @@ impl GuestLauncher { ) -> Result { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let xen_name = format!("krata-{uuid}"); - let image_info = self + let packed = self .compile( &uuid.to_string(), request.image, - &context.image_cache, - &context.oci_progress_context, + &context.packer, match request.format { - LaunchPackedFormat::Squashfs => OciPackerFormat::Squashfs, - LaunchPackedFormat::Erofs => OciPackerFormat::Erofs, + LaunchPackedFormat::Squashfs => OciPackedFormat::Squashfs, + LaunchPackedFormat::Erofs => OciPackedFormat::Erofs, }, ) .await?; @@ -116,11 +111,11 @@ impl GuestLauncher { run: request.run, }; - let cfgblk = ConfigBlock::new(&uuid, &image_info)?; + let cfgblk = ConfigBlock::new(&uuid, &packed)?; cfgblk.build(&launch_config)?; - let image_squashfs_path = image_info - .image + let image_squashfs_path = packed + .path .to_str() .ok_or_else(|| anyhow!("failed to convert image path to string"))?; @@ -265,13 +260,11 @@ impl GuestLauncher { &self, id: &str, image: &str, - image_cache: &ImageCache, - progress: &OciProgressContext, - format: OciPackerFormat, - ) -> Result { + packer: &OciPackerService, + format: OciPackedFormat, + ) -> Result { let image = ImageName::parse(image)?; - let compiler = OciImageCompiler::new(image_cache, None, progress.clone())?; - compiler.compile(id, &image, format).await + packer.pack(id, image, format).await } async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result { diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index e90402c..15ee26d 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -17,7 +17,9 @@ use self::{ autoloop::AutoLoop, launch::{GuestLaunchRequest, GuestLauncher}, }; -use krataoci::{cache::ImageCache, progress::OciProgressContext}; +use krataoci::{ + packer::service::OciPackerService, progress::OciProgressContext, registry::OciPlatform, +}; pub mod autoloop; pub mod cfgblk; @@ -52,7 +54,7 @@ pub struct GuestInfo { #[derive(Clone)] pub struct RuntimeContext { pub oci_progress_context: OciProgressContext, - pub image_cache: ImageCache, + pub packer: OciPackerService, pub autoloop: AutoLoop, pub xen: XenClient, pub kernel: String, @@ -68,13 +70,18 @@ impl RuntimeContext { let xen = XenClient::open(0).await?; image_cache_path.push("image"); fs::create_dir_all(&image_cache_path)?; - let image_cache = ImageCache::new(&image_cache_path)?; + let packer = OciPackerService::new( + None, + &image_cache_path, + OciPlatform::current(), + oci_progress_context.clone(), + )?; let kernel = RuntimeContext::detect_guest_file(&store, "kernel")?; let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?; Ok(RuntimeContext { oci_progress_context, - image_cache, + packer, autoloop: AutoLoop::new(LoopControl::open()?), xen, kernel,