feat: implement on-disk indexes of images

This commit is contained in:
Alex Zenla
2024-04-16 13:55:40 +00:00
parent ee024580af
commit 2842f21ce0
9 changed files with 228 additions and 55 deletions

View File

@ -51,7 +51,7 @@ impl Daemon {
image_cache_dir.push("image"); image_cache_dir.push("image");
fs::create_dir_all(&image_cache_dir).await?; fs::create_dir_all(&image_cache_dir).await?;
let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current())?; let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?;
let runtime = Runtime::new(store.clone()).await?; let runtime = Runtime::new(store.clone()).await?;
let guests_db_path = format!("{}/guests.db", store); let guests_db_path = format!("{}/guests.db", store);

View File

@ -35,7 +35,7 @@ async fn main() -> Result<()> {
} }
} }
}); });
let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current()).await?;
let packed = service let packed = service
.request(image.clone(), OciPackedFormat::Squashfs, false, context) .request(image.clone(), OciPackedFormat::Squashfs, false, context)
.await?; .await?;

View File

@ -4,7 +4,7 @@ use crate::schema::OciSchema;
use crate::vfs::{VfsNode, VfsTree}; use crate::vfs::{VfsNode, VfsTree};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use log::{debug, trace, warn}; use log::{debug, trace, warn};
use oci_spec::image::{ImageConfiguration, ImageManifest}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
@ -17,6 +17,7 @@ use uuid::Uuid;
pub struct OciImageAssembled { pub struct OciImageAssembled {
pub digest: String, pub digest: String,
pub descriptor: Descriptor,
pub manifest: OciSchema<ImageManifest>, pub manifest: OciSchema<ImageManifest>,
pub config: OciSchema<ImageConfiguration>, pub config: OciSchema<ImageConfiguration>,
pub vfs: Arc<VfsTree>, pub vfs: Arc<VfsTree>,
@ -165,6 +166,7 @@ impl OciImageAssembler {
let assembled = OciImageAssembled { let assembled = OciImageAssembled {
vfs: Arc::new(vfs), vfs: Arc::new(vfs),
descriptor: resolved.descriptor,
digest: resolved.digest, digest: resolved.digest,
manifest: resolved.manifest, manifest: resolved.manifest,
config: local.config, config: local.config,

View File

@ -20,7 +20,8 @@ use anyhow::{anyhow, Result};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use log::debug; use log::debug;
use oci_spec::image::{ use oci_spec::image::{
Descriptor, ImageConfiguration, ImageIndex, ImageManifest, MediaType, ToDockerV2S2, Descriptor, DescriptorBuilder, ImageConfiguration, ImageIndex, ImageManifest, MediaType,
ToDockerV2S2,
}; };
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio::{ use tokio::{
@ -99,6 +100,7 @@ impl OciImageLayer {
pub struct OciResolvedImage { pub struct OciResolvedImage {
pub name: ImageName, pub name: ImageName,
pub digest: String, pub digest: String,
pub descriptor: Descriptor,
pub manifest: OciSchema<ImageManifest>, pub manifest: OciSchema<ImageManifest>,
} }
@ -228,6 +230,7 @@ impl OciImageFetcher {
); );
return Ok(OciResolvedImage { return Ok(OciResolvedImage {
name: image, name: image,
descriptor: found.clone(),
digest: found.digest().clone(), digest: found.digest().clone(),
manifest, manifest,
}); });
@ -236,11 +239,20 @@ impl OciImageFetcher {
} }
let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?; let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
let (manifest, digest) = client let (manifest, descriptor, digest) = client
.get_manifest_with_digest(&image.name, &image.reference) .get_manifest_with_digest(&image.name, &image.reference)
.await?; .await?;
let descriptor = descriptor.unwrap_or_else(|| {
DescriptorBuilder::default()
.media_type(MediaType::ImageManifest)
.size(manifest.raw().len() as i64)
.digest(digest.clone())
.build()
.unwrap()
});
Ok(OciResolvedImage { Ok(OciResolvedImage {
name: image, name: image,
descriptor,
digest, digest,
manifest, manifest,
}) })

View File

@ -15,7 +15,13 @@ pub struct ImageName {
impl fmt::Display for ImageName { impl fmt::Display for ImageName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(port) = self.port { if DOCKER_HUB_MIRROR == self.hostname && self.port.is_none() {
if self.name.starts_with("library/") {
write!(f, "{}:{}", &self.name[8..], self.reference)
} else {
write!(f, "{}:{}", self.name, self.reference)
}
} else if let Some(port) = self.port {
write!( write!(
f, f,
"{}:{}/{}:{}", "{}:{}/{}:{}",

View File

@ -1,69 +1,116 @@
use crate::{ use crate::{
name::ImageName,
packer::{OciPackedFormat, OciPackedImage}, packer::{OciPackedFormat, OciPackedImage},
schema::OciSchema, schema::OciSchema,
}; };
use anyhow::Result; use anyhow::Result;
use log::debug; use log::{debug, error};
use oci_spec::image::{ImageConfiguration, ImageManifest}; use oci_spec::image::{
use std::path::{Path, PathBuf}; Descriptor, ImageConfiguration, ImageIndex, ImageIndexBuilder, ImageManifest, MediaType,
use tokio::fs; ANNOTATION_REF_NAME,
};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::{fs, sync::Mutex};
#[derive(Clone)] #[derive(Clone)]
pub struct OciPackerCache { pub struct OciPackerCache {
cache_dir: PathBuf, cache_dir: PathBuf,
index: Arc<Mutex<ImageIndex>>,
} }
const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name";
const ANNOTATION_OCI_PACKER_FORMAT: &str = "dev.krata.oci.packer.format";
impl OciPackerCache { impl OciPackerCache {
pub fn new(cache_dir: &Path) -> Result<OciPackerCache> { pub async fn new(cache_dir: &Path) -> Result<OciPackerCache> {
Ok(OciPackerCache { let index = ImageIndexBuilder::default()
.schema_version(2u32)
.media_type(MediaType::ImageIndex)
.manifests(Vec::new())
.build()?;
let cache = OciPackerCache {
cache_dir: cache_dir.to_path_buf(), cache_dir: cache_dir.to_path_buf(),
}) index: Arc::new(Mutex::new(index)),
};
{
let mut mutex = cache.index.lock().await;
*mutex = cache.load_index().await?;
}
Ok(cache)
} }
pub async fn recall( pub async fn recall(
&self, &self,
name: ImageName,
digest: &str, digest: &str,
format: OciPackedFormat, format: OciPackedFormat,
) -> Result<Option<OciPackedImage>> { ) -> Result<Option<OciPackedImage>> {
let index = self.index.lock().await;
let mut descriptor: Option<Descriptor> = None;
for manifest in index.manifests() {
if manifest.digest() == digest
&& manifest
.annotations()
.as_ref()
.and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
.map(|x| x.as_str())
== Some(format.extension())
{
descriptor = Some(manifest.clone());
break;
}
}
let Some(descriptor) = descriptor else {
return Ok(None);
};
let mut fs_path = self.cache_dir.clone(); let mut fs_path = self.cache_dir.clone();
let mut config_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone();
let mut manifest_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone();
fs_path.push(format!("{}.{}", digest, format.extension())); fs_path.push(format!("{}.{}", digest, format.extension()));
manifest_path.push(format!("{}.manifest.json", digest)); manifest_path.push(format!("{}.manifest.json", digest));
config_path.push(format!("{}.config.json", digest)); config_path.push(format!("{}.config.json", digest));
Ok(
if fs_path.exists() && manifest_path.exists() && config_path.exists() { if fs_path.exists() && manifest_path.exists() && config_path.exists() {
let image_metadata = fs::metadata(&fs_path).await?; let image_metadata = fs::metadata(&fs_path).await?;
let manifest_metadata = fs::metadata(&manifest_path).await?; let manifest_metadata = fs::metadata(&manifest_path).await?;
let config_metadata = fs::metadata(&config_path).await?; let config_metadata = fs::metadata(&config_path).await?;
if image_metadata.is_file() if image_metadata.is_file() && manifest_metadata.is_file() && config_metadata.is_file()
&& manifest_metadata.is_file() {
&& config_metadata.is_file() let manifest_bytes = fs::read(&manifest_path).await?;
{ let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?;
let manifest_bytes = fs::read(&manifest_path).await?; let config_bytes = fs::read(&config_path).await?;
let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?; let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
let config_bytes = fs::read(&config_path).await?; debug!("cache hit digest={}", digest);
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; Ok(Some(OciPackedImage::new(
debug!("cache hit digest={}", digest); name,
Some(OciPackedImage::new( digest.to_string(),
digest.to_string(), fs_path.clone(),
fs_path.clone(), format,
format, descriptor,
OciSchema::new(config_bytes, config), OciSchema::new(config_bytes, config),
OciSchema::new(manifest_bytes, manifest), OciSchema::new(manifest_bytes, manifest),
)) )))
} else {
None
}
} else { } else {
debug!("cache miss digest={}", digest); Ok(None)
None }
}, } else {
) debug!("cache miss digest={}", digest);
Ok(None)
}
} }
pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> { pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
let mut index = self.index.lock().await;
let mut manifests = index.manifests().clone();
debug!("cache store digest={}", packed.digest); debug!("cache store digest={}", packed.digest);
let mut fs_path = self.cache_dir.clone(); let mut fs_path = self.cache_dir.clone();
let mut manifest_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone();
@ -74,12 +121,90 @@ impl OciPackerCache {
fs::rename(&packed.path, &fs_path).await?; fs::rename(&packed.path, &fs_path).await?;
fs::write(&config_path, packed.config.raw()).await?; fs::write(&config_path, packed.config.raw()).await?;
fs::write(&manifest_path, packed.manifest.raw()).await?; fs::write(&manifest_path, packed.manifest.raw()).await?;
Ok(OciPackedImage::new( manifests.retain(|item| {
if item.digest() != &packed.digest {
return true;
}
let Some(format) = item
.annotations()
.as_ref()
.and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
.map(|x| x.as_str())
else {
return true;
};
if format != packed.format.extension() {
return true;
}
false
});
let mut descriptor = packed.descriptor.clone();
let mut annotations = descriptor.annotations().clone().unwrap_or_default();
annotations.insert(
ANNOTATION_OCI_PACKER_FORMAT.to_string(),
packed.format.extension().to_string(),
);
let image_name = packed.name.to_string();
annotations.insert(ANNOTATION_IMAGE_NAME.to_string(), image_name);
let image_ref = packed.name.reference.clone();
annotations.insert(ANNOTATION_REF_NAME.to_string(), image_ref);
descriptor.set_annotations(Some(annotations));
manifests.push(descriptor.clone());
index.set_manifests(manifests);
self.save_index(&index).await?;
let packed = OciPackedImage::new(
packed.name,
packed.digest, packed.digest,
fs_path.clone(), fs_path.clone(),
packed.format, packed.format,
descriptor,
packed.config, packed.config,
packed.manifest, packed.manifest,
)) );
Ok(packed)
}
async fn save_empty_index(&self) -> Result<ImageIndex> {
let index = ImageIndexBuilder::default()
.schema_version(2u32)
.media_type(MediaType::ImageIndex)
.manifests(Vec::new())
.build()?;
self.save_index(&index).await?;
Ok(index)
}
async fn load_index(&self) -> Result<ImageIndex> {
let mut index_path = self.cache_dir.clone();
index_path.push("index.json");
if !index_path.exists() {
self.save_empty_index().await?;
}
let content = fs::read_to_string(&index_path).await?;
let index = match serde_json::from_str::<ImageIndex>(&content) {
Ok(index) => index,
Err(error) => {
error!("image index was corrupted, creating a new one: {}", error);
self.save_empty_index().await?
}
};
Ok(index)
}
async fn save_index(&self, index: &ImageIndex) -> Result<()> {
let mut encoded = serde_json::to_string_pretty(index)?;
encoded.push('\n');
let mut index_path = self.cache_dir.clone();
index_path.push("index.json");
fs::write(&index_path, encoded).await?;
Ok(())
} }
} }

View File

@ -1,9 +1,9 @@
use std::path::PathBuf; use std::path::PathBuf;
use crate::schema::OciSchema; use crate::{name::ImageName, schema::OciSchema};
use self::backend::OciPackerBackendType; use self::backend::OciPackerBackendType;
use oci_spec::image::{ImageConfiguration, ImageManifest}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest};
pub mod backend; pub mod backend;
pub mod cache; pub mod cache;
@ -37,25 +37,31 @@ impl OciPackedFormat {
#[derive(Clone)] #[derive(Clone)]
pub struct OciPackedImage { pub struct OciPackedImage {
pub name: ImageName,
pub digest: String, pub digest: String,
pub path: PathBuf, pub path: PathBuf,
pub format: OciPackedFormat, pub format: OciPackedFormat,
pub descriptor: Descriptor,
pub config: OciSchema<ImageConfiguration>, pub config: OciSchema<ImageConfiguration>,
pub manifest: OciSchema<ImageManifest>, pub manifest: OciSchema<ImageManifest>,
} }
impl OciPackedImage { impl OciPackedImage {
pub fn new( pub fn new(
name: ImageName,
digest: String, digest: String,
path: PathBuf, path: PathBuf,
format: OciPackedFormat, format: OciPackedFormat,
descriptor: Descriptor,
config: OciSchema<ImageConfiguration>, config: OciSchema<ImageConfiguration>,
manifest: OciSchema<ImageManifest>, manifest: OciSchema<ImageManifest>,
) -> OciPackedImage { ) -> OciPackedImage {
OciPackedImage { OciPackedImage {
name,
digest, digest,
path, path,
format, format,
descriptor,
config, config,
manifest, manifest,
} }

View File

@ -38,14 +38,14 @@ pub struct OciPackerService {
} }
impl OciPackerService { impl OciPackerService {
pub fn new( pub async fn new(
seed: Option<PathBuf>, seed: Option<PathBuf>,
cache_dir: &Path, cache_dir: &Path,
platform: OciPlatform, platform: OciPlatform,
) -> Result<OciPackerService> { ) -> Result<OciPackerService> {
Ok(OciPackerService { Ok(OciPackerService {
seed, seed,
cache: OciPackerCache::new(cache_dir)?, cache: OciPackerCache::new(cache_dir).await?,
platform, platform,
tasks: Arc::new(Mutex::new(HashMap::new())), tasks: Arc::new(Mutex::new(HashMap::new())),
}) })
@ -56,7 +56,9 @@ impl OciPackerService {
digest: &str, digest: &str,
format: OciPackedFormat, format: OciPackedFormat,
) -> Result<Option<OciPackedImage>> { ) -> Result<Option<OciPackedImage>> {
self.cache.recall(digest, format).await self.cache
.recall(ImageName::parse("cached:latest")?, digest, format)
.await
} }
pub async fn request( pub async fn request(
@ -70,7 +72,7 @@ impl OciPackerService {
let progress = OciBoundProgress::new(progress_context.clone(), progress); let progress = OciBoundProgress::new(progress_context.clone(), progress);
let fetcher = let fetcher =
OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone());
let resolved = fetcher.resolve(name).await?; let resolved = fetcher.resolve(name.clone()).await?;
let key = OciPackerTaskKey { let key = OciPackerTaskKey {
digest: resolved.digest.clone(), digest: resolved.digest.clone(),
format, format,
@ -88,6 +90,7 @@ impl OciPackerService {
let task = self let task = self
.clone() .clone()
.launch( .launch(
name,
key.clone(), key.clone(),
format, format,
overwrite, overwrite,
@ -130,8 +133,10 @@ impl OciPackerService {
} }
} }
#[allow(clippy::too_many_arguments)]
async fn launch( async fn launch(
self, self,
name: ImageName,
key: OciPackerTaskKey, key: OciPackerTaskKey,
format: OciPackedFormat, format: OciPackedFormat,
overwrite: bool, overwrite: bool,
@ -146,7 +151,15 @@ impl OciPackerService {
service.ensure_task_gone(key); service.ensure_task_gone(key);
}); });
if let Err(error) = self if let Err(error) = self
.task(key.clone(), format, overwrite, resolved, fetcher, progress) .task(
name,
key.clone(),
format,
overwrite,
resolved,
fetcher,
progress,
)
.await .await
{ {
self.finish(&key, Err(error)).await; self.finish(&key, Err(error)).await;
@ -154,8 +167,10 @@ impl OciPackerService {
}) })
} }
#[allow(clippy::too_many_arguments)]
async fn task( async fn task(
&self, &self,
name: ImageName,
key: OciPackerTaskKey, key: OciPackerTaskKey,
format: OciPackedFormat, format: OciPackedFormat,
overwrite: bool, overwrite: bool,
@ -164,7 +179,11 @@ impl OciPackerService {
progress: OciBoundProgress, progress: OciBoundProgress,
) -> Result<()> { ) -> Result<()> {
if !overwrite { if !overwrite {
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { if let Some(cached) = self
.cache
.recall(name.clone(), &resolved.digest, format)
.await?
{
self.finish(&key, Ok(cached)).await; self.finish(&key, Ok(cached)).await;
return Ok(()); return Ok(());
} }
@ -183,9 +202,11 @@ impl OciPackerService {
.pack(progress, assembled.vfs.clone(), &target) .pack(progress, assembled.vfs.clone(), &target)
.await?; .await?;
let packed = OciPackedImage::new( let packed = OciPackedImage::new(
name,
assembled.digest.clone(), assembled.digest.clone(),
file, file,
format, format,
assembled.descriptor.clone(),
assembled.config.clone(), assembled.config.clone(),
assembled.manifest.clone(), assembled.manifest.clone(),
); );

View File

@ -203,7 +203,7 @@ impl OciRegistryClient {
&mut self, &mut self,
name: N, name: N,
reference: R, reference: R,
) -> Result<(OciSchema<ImageManifest>, String)> { ) -> Result<(OciSchema<ImageManifest>, Option<Descriptor>, String)> {
let url = self.url.join(&format!( let url = self.url.join(&format!(
"/v2/{}/manifests/{}", "/v2/{}/manifests/{}",
name.as_ref(), name.as_ref(),
@ -231,9 +231,10 @@ impl OciRegistryClient {
let descriptor = self let descriptor = self
.pick_manifest(index) .pick_manifest(index)
.ok_or_else(|| anyhow!("unable to pick manifest from index"))?; .ok_or_else(|| anyhow!("unable to pick manifest from index"))?;
return self let (manifest, digest) = self
.get_raw_manifest_with_digest(name, descriptor.digest()) .get_raw_manifest_with_digest(name, descriptor.digest())
.await; .await?;
return Ok((manifest, Some(descriptor), digest));
} }
let digest = response let digest = response
.headers() .headers()
@ -243,7 +244,7 @@ impl OciRegistryClient {
.to_string(); .to_string();
let bytes = response.bytes().await?; let bytes = response.bytes().await?;
let manifest = serde_json::from_slice(&bytes)?; let manifest = serde_json::from_slice(&bytes)?;
Ok((OciSchema::new(bytes.to_vec(), manifest), digest)) Ok((OciSchema::new(bytes.to_vec(), manifest), None, digest))
} }
fn pick_manifest(&mut self, index: ImageIndex) -> Option<Descriptor> { fn pick_manifest(&mut self, index: ImageIndex) -> Option<Descriptor> {