diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index e81901f..4c605a0 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -51,7 +51,7 @@ impl Daemon { image_cache_dir.push("image"); fs::create_dir_all(&image_cache_dir).await?; - let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current())?; + let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?; let runtime = Runtime::new(store.clone()).await?; let guests_db_path = format!("{}/guests.db", store); diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 8f251e6..c6361c1 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -35,7 +35,7 @@ async fn main() -> Result<()> { } } }); - let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; + let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current()).await?; let packed = service .request(image.clone(), OciPackedFormat::Squashfs, false, context) .await?; diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 2365c61..97c89e5 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -4,7 +4,7 @@ use crate::schema::OciSchema; use crate::vfs::{VfsNode, VfsTree}; use anyhow::{anyhow, Result}; use log::{debug, trace, warn}; -use oci_spec::image::{ImageConfiguration, ImageManifest}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest}; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -17,6 +17,7 @@ use uuid::Uuid; pub struct OciImageAssembled { pub digest: String, + pub descriptor: Descriptor, pub manifest: OciSchema, pub config: OciSchema, pub vfs: Arc, @@ -165,6 +166,7 @@ impl OciImageAssembler { let assembled = OciImageAssembled { vfs: Arc::new(vfs), + descriptor: resolved.descriptor, digest: resolved.digest, manifest: resolved.manifest, config: local.config, diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index db502d0..fc4ce4a 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -20,7 +20,8 @@ use anyhow::{anyhow, Result}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use log::debug; use oci_spec::image::{ - Descriptor, ImageConfiguration, ImageIndex, ImageManifest, MediaType, ToDockerV2S2, + Descriptor, DescriptorBuilder, ImageConfiguration, ImageIndex, ImageManifest, MediaType, + ToDockerV2S2, }; use serde::de::DeserializeOwned; use tokio::{ @@ -99,6 +100,7 @@ impl OciImageLayer { pub struct OciResolvedImage { pub name: ImageName, pub digest: String, + pub descriptor: Descriptor, pub manifest: OciSchema, } @@ -228,6 +230,7 @@ impl OciImageFetcher { ); return Ok(OciResolvedImage { name: image, + descriptor: found.clone(), digest: found.digest().clone(), manifest, }); @@ -236,11 +239,20 @@ impl OciImageFetcher { } let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?; - let (manifest, digest) = client + let (manifest, descriptor, digest) = client .get_manifest_with_digest(&image.name, &image.reference) .await?; + let descriptor = descriptor.unwrap_or_else(|| { + DescriptorBuilder::default() + .media_type(MediaType::ImageManifest) + .size(manifest.raw().len() as i64) + .digest(digest.clone()) + .build() + .unwrap() + }); Ok(OciResolvedImage { name: image, + descriptor, digest, manifest, }) diff --git a/crates/oci/src/name.rs b/crates/oci/src/name.rs index 8dffbcf..ab92da1 100644 --- a/crates/oci/src/name.rs +++ b/crates/oci/src/name.rs @@ -15,7 +15,13 @@ pub struct ImageName { impl fmt::Display for ImageName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(port) = self.port { + if DOCKER_HUB_MIRROR == self.hostname && self.port.is_none() { + if self.name.starts_with("library/") { + write!(f, "{}:{}", &self.name[8..], self.reference) + } else { + write!(f, "{}:{}", self.name, self.reference) + } + } else if let Some(port) = self.port { write!( f, "{}:{}/{}:{}", diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs index 5e4d9c2..cd53d81 100644 --- a/crates/oci/src/packer/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,69 +1,116 @@ use crate::{ + name::ImageName, packer::{OciPackedFormat, OciPackedImage}, schema::OciSchema, }; use anyhow::Result; -use log::debug; -use oci_spec::image::{ImageConfiguration, ImageManifest}; -use std::path::{Path, PathBuf}; -use tokio::fs; +use log::{debug, error}; +use oci_spec::image::{ + Descriptor, ImageConfiguration, ImageIndex, ImageIndexBuilder, ImageManifest, MediaType, + ANNOTATION_REF_NAME, +}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; +use tokio::{fs, sync::Mutex}; #[derive(Clone)] pub struct OciPackerCache { cache_dir: PathBuf, + index: Arc>, } +const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name"; +const ANNOTATION_OCI_PACKER_FORMAT: &str = "dev.krata.oci.packer.format"; + impl OciPackerCache { - pub fn new(cache_dir: &Path) -> Result { - Ok(OciPackerCache { + pub async fn new(cache_dir: &Path) -> Result { + let index = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(Vec::new()) + .build()?; + let cache = OciPackerCache { cache_dir: cache_dir.to_path_buf(), - }) + index: Arc::new(Mutex::new(index)), + }; + + { + let mut mutex = cache.index.lock().await; + *mutex = cache.load_index().await?; + } + + Ok(cache) } pub async fn recall( &self, + name: ImageName, digest: &str, format: OciPackedFormat, ) -> Result> { + let index = self.index.lock().await; + + let mut descriptor: Option = None; + for manifest in index.manifests() { + if manifest.digest() == digest + && manifest + .annotations() + .as_ref() + .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT)) + .map(|x| x.as_str()) + == Some(format.extension()) + { + descriptor = Some(manifest.clone()); + break; + } + } + + let Some(descriptor) = descriptor else { + return Ok(None); + }; + let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); fs_path.push(format!("{}.{}", digest, format.extension())); manifest_path.push(format!("{}.manifest.json", digest)); config_path.push(format!("{}.config.json", digest)); - Ok( - if fs_path.exists() && manifest_path.exists() && config_path.exists() { - let image_metadata = fs::metadata(&fs_path).await?; - let manifest_metadata = fs::metadata(&manifest_path).await?; - let config_metadata = fs::metadata(&config_path).await?; - if image_metadata.is_file() - && manifest_metadata.is_file() - && config_metadata.is_file() - { - let manifest_bytes = fs::read(&manifest_path).await?; - let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; - let config_bytes = fs::read(&config_path).await?; - let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; - debug!("cache hit digest={}", digest); - Some(OciPackedImage::new( - digest.to_string(), - fs_path.clone(), - format, - OciSchema::new(config_bytes, config), - OciSchema::new(manifest_bytes, manifest), - )) - } else { - None - } + + if fs_path.exists() && manifest_path.exists() && config_path.exists() { + let image_metadata = fs::metadata(&fs_path).await?; + let manifest_metadata = fs::metadata(&manifest_path).await?; + let config_metadata = fs::metadata(&config_path).await?; + if image_metadata.is_file() && manifest_metadata.is_file() && config_metadata.is_file() + { + let manifest_bytes = fs::read(&manifest_path).await?; + let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; + let config_bytes = fs::read(&config_path).await?; + let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; + debug!("cache hit digest={}", digest); + Ok(Some(OciPackedImage::new( + name, + digest.to_string(), + fs_path.clone(), + format, + descriptor, + OciSchema::new(config_bytes, config), + OciSchema::new(manifest_bytes, manifest), + ))) } else { - debug!("cache miss digest={}", digest); - None - }, - ) + Ok(None) + } + } else { + debug!("cache miss digest={}", digest); + Ok(None) + } } pub async fn store(&self, packed: OciPackedImage) -> Result { + let mut index = self.index.lock().await; + let mut manifests = index.manifests().clone(); debug!("cache store digest={}", packed.digest); let mut fs_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -74,12 +121,90 @@ impl OciPackerCache { fs::rename(&packed.path, &fs_path).await?; fs::write(&config_path, packed.config.raw()).await?; fs::write(&manifest_path, packed.manifest.raw()).await?; - Ok(OciPackedImage::new( + manifests.retain(|item| { + if item.digest() != &packed.digest { + return true; + } + + let Some(format) = item + .annotations() + .as_ref() + .and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT)) + .map(|x| x.as_str()) + else { + return true; + }; + + if format != packed.format.extension() { + return true; + } + + false + }); + + let mut descriptor = packed.descriptor.clone(); + let mut annotations = descriptor.annotations().clone().unwrap_or_default(); + annotations.insert( + ANNOTATION_OCI_PACKER_FORMAT.to_string(), + packed.format.extension().to_string(), + ); + let image_name = packed.name.to_string(); + annotations.insert(ANNOTATION_IMAGE_NAME.to_string(), image_name); + let image_ref = packed.name.reference.clone(); + annotations.insert(ANNOTATION_REF_NAME.to_string(), image_ref); + descriptor.set_annotations(Some(annotations)); + manifests.push(descriptor.clone()); + index.set_manifests(manifests); + self.save_index(&index).await?; + + let packed = OciPackedImage::new( + packed.name, packed.digest, fs_path.clone(), packed.format, + descriptor, packed.config, packed.manifest, - )) + ); + Ok(packed) + } + + async fn save_empty_index(&self) -> Result { + let index = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(Vec::new()) + .build()?; + self.save_index(&index).await?; + Ok(index) + } + + async fn load_index(&self) -> Result { + let mut index_path = self.cache_dir.clone(); + index_path.push("index.json"); + + if !index_path.exists() { + self.save_empty_index().await?; + } + + let content = fs::read_to_string(&index_path).await?; + let index = match serde_json::from_str::(&content) { + Ok(index) => index, + Err(error) => { + error!("image index was corrupted, creating a new one: {}", error); + self.save_empty_index().await? + } + }; + + Ok(index) + } + + async fn save_index(&self, index: &ImageIndex) -> Result<()> { + let mut encoded = serde_json::to_string_pretty(index)?; + encoded.push('\n'); + let mut index_path = self.cache_dir.clone(); + index_path.push("index.json"); + fs::write(&index_path, encoded).await?; + Ok(()) } } diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs index dd510ab..61b9fca 100644 --- a/crates/oci/src/packer/mod.rs +++ b/crates/oci/src/packer/mod.rs @@ -1,9 +1,9 @@ use std::path::PathBuf; -use crate::schema::OciSchema; +use crate::{name::ImageName, schema::OciSchema}; use self::backend::OciPackerBackendType; -use oci_spec::image::{ImageConfiguration, ImageManifest}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest}; pub mod backend; pub mod cache; @@ -37,25 +37,31 @@ impl OciPackedFormat { #[derive(Clone)] pub struct OciPackedImage { + pub name: ImageName, pub digest: String, pub path: PathBuf, pub format: OciPackedFormat, + pub descriptor: Descriptor, pub config: OciSchema, pub manifest: OciSchema, } impl OciPackedImage { pub fn new( + name: ImageName, digest: String, path: PathBuf, format: OciPackedFormat, + descriptor: Descriptor, config: OciSchema, manifest: OciSchema, ) -> OciPackedImage { OciPackedImage { + name, digest, path, format, + descriptor, config, manifest, } diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 9f1840d..db35012 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -38,14 +38,14 @@ pub struct OciPackerService { } impl OciPackerService { - pub fn new( + pub async fn new( seed: Option, cache_dir: &Path, platform: OciPlatform, ) -> Result { Ok(OciPackerService { seed, - cache: OciPackerCache::new(cache_dir)?, + cache: OciPackerCache::new(cache_dir).await?, platform, tasks: Arc::new(Mutex::new(HashMap::new())), }) @@ -56,7 +56,9 @@ impl OciPackerService { digest: &str, format: OciPackedFormat, ) -> Result> { - self.cache.recall(digest, format).await + self.cache + .recall(ImageName::parse("cached:latest")?, digest, format) + .await } pub async fn request( @@ -70,7 +72,7 @@ impl OciPackerService { let progress = OciBoundProgress::new(progress_context.clone(), progress); let fetcher = OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); - let resolved = fetcher.resolve(name).await?; + let resolved = fetcher.resolve(name.clone()).await?; let key = OciPackerTaskKey { digest: resolved.digest.clone(), format, @@ -88,6 +90,7 @@ impl OciPackerService { let task = self .clone() .launch( + name, key.clone(), format, overwrite, @@ -130,8 +133,10 @@ impl OciPackerService { } } + #[allow(clippy::too_many_arguments)] async fn launch( self, + name: ImageName, key: OciPackerTaskKey, format: OciPackedFormat, overwrite: bool, @@ -146,7 +151,15 @@ impl OciPackerService { service.ensure_task_gone(key); }); if let Err(error) = self - .task(key.clone(), format, overwrite, resolved, fetcher, progress) + .task( + name, + key.clone(), + format, + overwrite, + resolved, + fetcher, + progress, + ) .await { self.finish(&key, Err(error)).await; @@ -154,8 +167,10 @@ impl OciPackerService { }) } + #[allow(clippy::too_many_arguments)] async fn task( &self, + name: ImageName, key: OciPackerTaskKey, format: OciPackedFormat, overwrite: bool, @@ -164,7 +179,11 @@ impl OciPackerService { progress: OciBoundProgress, ) -> Result<()> { if !overwrite { - if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { + if let Some(cached) = self + .cache + .recall(name.clone(), &resolved.digest, format) + .await? + { self.finish(&key, Ok(cached)).await; return Ok(()); } @@ -183,9 +202,11 @@ impl OciPackerService { .pack(progress, assembled.vfs.clone(), &target) .await?; let packed = OciPackedImage::new( + name, assembled.digest.clone(), file, format, + assembled.descriptor.clone(), assembled.config.clone(), assembled.manifest.clone(), ); diff --git a/crates/oci/src/registry.rs b/crates/oci/src/registry.rs index e356000..1874ae8 100644 --- a/crates/oci/src/registry.rs +++ b/crates/oci/src/registry.rs @@ -203,7 +203,7 @@ impl OciRegistryClient { &mut self, name: N, reference: R, - ) -> Result<(OciSchema, String)> { + ) -> Result<(OciSchema, Option, String)> { let url = self.url.join(&format!( "/v2/{}/manifests/{}", name.as_ref(), @@ -231,9 +231,10 @@ impl OciRegistryClient { let descriptor = self .pick_manifest(index) .ok_or_else(|| anyhow!("unable to pick manifest from index"))?; - return self + let (manifest, digest) = self .get_raw_manifest_with_digest(name, descriptor.digest()) - .await; + .await?; + return Ok((manifest, Some(descriptor), digest)); } let digest = response .headers() @@ -243,7 +244,7 @@ impl OciRegistryClient { .to_string(); let bytes = response.bytes().await?; let manifest = serde_json::from_slice(&bytes)?; - Ok((OciSchema::new(bytes.to_vec(), manifest), digest)) + Ok((OciSchema::new(bytes.to_vec(), manifest), None, digest)) } fn pick_manifest(&mut self, index: ImageIndex) -> Option {