mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-04 05:31:32 +00:00
feat: oci concurrency improvements (#95)
* feat: implement improved and detailed oci progress indication * feat: implement on-disk indexes of images * oci: utilize rw-lock for increased cache performance
This commit is contained in:
@ -1,14 +1,12 @@
|
||||
use std::{path::Path, process::Stdio, sync::Arc};
|
||||
use std::{os::unix::fs::MetadataExt, path::Path, process::Stdio, sync::Arc};
|
||||
|
||||
use super::OciPackedFormat;
|
||||
use crate::{
|
||||
progress::{OciBoundProgress, OciProgressPhase},
|
||||
vfs::VfsTree,
|
||||
};
|
||||
use crate::{progress::OciBoundProgress, vfs::VfsTree};
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::warn;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
fs::{self, File},
|
||||
io::BufWriter,
|
||||
pin,
|
||||
process::{Child, Command},
|
||||
select,
|
||||
@ -55,9 +53,7 @@ impl OciPackerBackend for OciPackerMkSquashfs {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -120,12 +116,9 @@ impl OciPackerBackend for OciPackerMkSquashfs {
|
||||
status.code().unwrap()
|
||||
))
|
||||
} else {
|
||||
let metadata = fs::metadata(&file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
})
|
||||
.update(|progress| progress.complete(metadata.size()))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
@ -136,12 +129,10 @@ pub struct OciPackerMkfsErofs {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, path: &Path) -> Result<()> {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -149,7 +140,7 @@ impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
.arg("-L")
|
||||
.arg("root")
|
||||
.arg("--tar=-")
|
||||
.arg(path)
|
||||
.arg(file)
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
@ -200,11 +191,10 @@ impl OciPackerBackend for OciPackerMkfsErofs {
|
||||
status.code().unwrap()
|
||||
))
|
||||
} else {
|
||||
let metadata = fs::metadata(&file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
progress.complete(metadata.size());
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
@ -219,20 +209,18 @@ impl OciPackerBackend for OciPackerTar {
|
||||
async fn pack(&self, progress: OciBoundProgress, vfs: Arc<VfsTree>, file: &Path) -> Result<()> {
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 0;
|
||||
progress.start_packing();
|
||||
})
|
||||
.await;
|
||||
|
||||
let file = File::create(file).await?;
|
||||
vfs.write_to_tar(file).await?;
|
||||
let output = File::create(file).await?;
|
||||
let output = BufWriter::new(output);
|
||||
vfs.write_to_tar(output).await?;
|
||||
|
||||
let metadata = fs::metadata(file).await?;
|
||||
progress
|
||||
.update(|progress| {
|
||||
progress.phase = OciProgressPhase::Packing;
|
||||
progress.total = 1;
|
||||
progress.value = 1;
|
||||
progress.complete(metadata.size());
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
|
@ -1,69 +1,121 @@
|
||||
use crate::{
|
||||
name::ImageName,
|
||||
packer::{OciPackedFormat, OciPackedImage},
|
||||
schema::OciSchema,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use log::debug;
|
||||
use oci_spec::image::{ImageConfiguration, ImageManifest};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use log::{debug, error};
|
||||
use oci_spec::image::{
|
||||
Descriptor, ImageConfiguration, ImageIndex, ImageIndexBuilder, ImageManifest, MediaType,
|
||||
ANNOTATION_REF_NAME,
|
||||
};
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{fs, sync::RwLock};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OciPackerCache {
|
||||
cache_dir: PathBuf,
|
||||
index: Arc<RwLock<ImageIndex>>,
|
||||
}
|
||||
|
||||
const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name";
|
||||
const ANNOTATION_OCI_PACKER_FORMAT: &str = "dev.krata.oci.packer.format";
|
||||
|
||||
impl OciPackerCache {
|
||||
pub fn new(cache_dir: &Path) -> Result<OciPackerCache> {
|
||||
Ok(OciPackerCache {
|
||||
pub async fn new(cache_dir: &Path) -> Result<OciPackerCache> {
|
||||
let index = ImageIndexBuilder::default()
|
||||
.schema_version(2u32)
|
||||
.media_type(MediaType::ImageIndex)
|
||||
.manifests(Vec::new())
|
||||
.build()?;
|
||||
let cache = OciPackerCache {
|
||||
cache_dir: cache_dir.to_path_buf(),
|
||||
})
|
||||
index: Arc::new(RwLock::new(index)),
|
||||
};
|
||||
|
||||
{
|
||||
let mut mutex = cache.index.write().await;
|
||||
*mutex = cache.load_index().await?;
|
||||
}
|
||||
|
||||
Ok(cache)
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<Descriptor>> {
|
||||
let index = self.index.read().await;
|
||||
Ok(index.manifests().clone())
|
||||
}
|
||||
|
||||
pub async fn recall(
|
||||
&self,
|
||||
name: ImageName,
|
||||
digest: &str,
|
||||
format: OciPackedFormat,
|
||||
) -> Result<Option<OciPackedImage>> {
|
||||
let index = self.index.read().await;
|
||||
|
||||
let mut descriptor: Option<Descriptor> = None;
|
||||
for manifest in index.manifests() {
|
||||
if manifest.digest() == digest
|
||||
&& manifest
|
||||
.annotations()
|
||||
.as_ref()
|
||||
.and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
|
||||
.map(|x| x.as_str())
|
||||
== Some(format.extension())
|
||||
{
|
||||
descriptor = Some(manifest.clone());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(descriptor) = descriptor else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut fs_path = self.cache_dir.clone();
|
||||
let mut config_path = self.cache_dir.clone();
|
||||
let mut manifest_path = self.cache_dir.clone();
|
||||
fs_path.push(format!("{}.{}", digest, format.extension()));
|
||||
manifest_path.push(format!("{}.manifest.json", digest));
|
||||
config_path.push(format!("{}.config.json", digest));
|
||||
Ok(
|
||||
if fs_path.exists() && manifest_path.exists() && config_path.exists() {
|
||||
let image_metadata = fs::metadata(&fs_path).await?;
|
||||
let manifest_metadata = fs::metadata(&manifest_path).await?;
|
||||
let config_metadata = fs::metadata(&config_path).await?;
|
||||
if image_metadata.is_file()
|
||||
&& manifest_metadata.is_file()
|
||||
&& config_metadata.is_file()
|
||||
{
|
||||
let manifest_bytes = fs::read(&manifest_path).await?;
|
||||
let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?;
|
||||
let config_bytes = fs::read(&config_path).await?;
|
||||
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
|
||||
debug!("cache hit digest={}", digest);
|
||||
Some(OciPackedImage::new(
|
||||
digest.to_string(),
|
||||
fs_path.clone(),
|
||||
format,
|
||||
OciSchema::new(config_bytes, config),
|
||||
OciSchema::new(manifest_bytes, manifest),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
if fs_path.exists() && manifest_path.exists() && config_path.exists() {
|
||||
let image_metadata = fs::metadata(&fs_path).await?;
|
||||
let manifest_metadata = fs::metadata(&manifest_path).await?;
|
||||
let config_metadata = fs::metadata(&config_path).await?;
|
||||
if image_metadata.is_file() && manifest_metadata.is_file() && config_metadata.is_file()
|
||||
{
|
||||
let manifest_bytes = fs::read(&manifest_path).await?;
|
||||
let manifest: ImageManifest = serde_json::from_slice(&manifest_bytes)?;
|
||||
let config_bytes = fs::read(&config_path).await?;
|
||||
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
|
||||
debug!("cache hit digest={}", digest);
|
||||
Ok(Some(OciPackedImage::new(
|
||||
name,
|
||||
digest.to_string(),
|
||||
fs_path.clone(),
|
||||
format,
|
||||
descriptor,
|
||||
OciSchema::new(config_bytes, config),
|
||||
OciSchema::new(manifest_bytes, manifest),
|
||||
)))
|
||||
} else {
|
||||
debug!("cache miss digest={}", digest);
|
||||
None
|
||||
},
|
||||
)
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
debug!("cache miss digest={}", digest);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
|
||||
let mut index = self.index.write().await;
|
||||
let mut manifests = index.manifests().clone();
|
||||
debug!("cache store digest={}", packed.digest);
|
||||
let mut fs_path = self.cache_dir.clone();
|
||||
let mut manifest_path = self.cache_dir.clone();
|
||||
@ -74,12 +126,90 @@ impl OciPackerCache {
|
||||
fs::rename(&packed.path, &fs_path).await?;
|
||||
fs::write(&config_path, packed.config.raw()).await?;
|
||||
fs::write(&manifest_path, packed.manifest.raw()).await?;
|
||||
Ok(OciPackedImage::new(
|
||||
manifests.retain(|item| {
|
||||
if item.digest() != &packed.digest {
|
||||
return true;
|
||||
}
|
||||
|
||||
let Some(format) = item
|
||||
.annotations()
|
||||
.as_ref()
|
||||
.and_then(|x| x.get(ANNOTATION_OCI_PACKER_FORMAT))
|
||||
.map(|x| x.as_str())
|
||||
else {
|
||||
return true;
|
||||
};
|
||||
|
||||
if format != packed.format.extension() {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
let mut descriptor = packed.descriptor.clone();
|
||||
let mut annotations = descriptor.annotations().clone().unwrap_or_default();
|
||||
annotations.insert(
|
||||
ANNOTATION_OCI_PACKER_FORMAT.to_string(),
|
||||
packed.format.extension().to_string(),
|
||||
);
|
||||
let image_name = packed.name.to_string();
|
||||
annotations.insert(ANNOTATION_IMAGE_NAME.to_string(), image_name);
|
||||
let image_ref = packed.name.reference.clone();
|
||||
annotations.insert(ANNOTATION_REF_NAME.to_string(), image_ref);
|
||||
descriptor.set_annotations(Some(annotations));
|
||||
manifests.push(descriptor.clone());
|
||||
index.set_manifests(manifests);
|
||||
self.save_index(&index).await?;
|
||||
|
||||
let packed = OciPackedImage::new(
|
||||
packed.name,
|
||||
packed.digest,
|
||||
fs_path.clone(),
|
||||
packed.format,
|
||||
descriptor,
|
||||
packed.config,
|
||||
packed.manifest,
|
||||
))
|
||||
);
|
||||
Ok(packed)
|
||||
}
|
||||
|
||||
async fn save_empty_index(&self) -> Result<ImageIndex> {
|
||||
let index = ImageIndexBuilder::default()
|
||||
.schema_version(2u32)
|
||||
.media_type(MediaType::ImageIndex)
|
||||
.manifests(Vec::new())
|
||||
.build()?;
|
||||
self.save_index(&index).await?;
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
async fn load_index(&self) -> Result<ImageIndex> {
|
||||
let mut index_path = self.cache_dir.clone();
|
||||
index_path.push("index.json");
|
||||
|
||||
if !index_path.exists() {
|
||||
self.save_empty_index().await?;
|
||||
}
|
||||
|
||||
let content = fs::read_to_string(&index_path).await?;
|
||||
let index = match serde_json::from_str::<ImageIndex>(&content) {
|
||||
Ok(index) => index,
|
||||
Err(error) => {
|
||||
error!("image index was corrupted, creating a new one: {}", error);
|
||||
self.save_empty_index().await?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
async fn save_index(&self, index: &ImageIndex) -> Result<()> {
|
||||
let mut encoded = serde_json::to_string_pretty(index)?;
|
||||
encoded.push('\n');
|
||||
let mut index_path = self.cache_dir.clone();
|
||||
index_path.push("index.json");
|
||||
fs::write(&index_path, encoded).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::schema::OciSchema;
|
||||
use crate::{name::ImageName, schema::OciSchema};
|
||||
|
||||
use self::backend::OciPackerBackendType;
|
||||
use oci_spec::image::{ImageConfiguration, ImageManifest};
|
||||
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest};
|
||||
|
||||
pub mod backend;
|
||||
pub mod cache;
|
||||
@ -37,25 +37,31 @@ impl OciPackedFormat {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OciPackedImage {
|
||||
pub name: ImageName,
|
||||
pub digest: String,
|
||||
pub path: PathBuf,
|
||||
pub format: OciPackedFormat,
|
||||
pub descriptor: Descriptor,
|
||||
pub config: OciSchema<ImageConfiguration>,
|
||||
pub manifest: OciSchema<ImageManifest>,
|
||||
}
|
||||
|
||||
impl OciPackedImage {
|
||||
pub fn new(
|
||||
name: ImageName,
|
||||
digest: String,
|
||||
path: PathBuf,
|
||||
format: OciPackedFormat,
|
||||
descriptor: Descriptor,
|
||||
config: OciSchema<ImageConfiguration>,
|
||||
manifest: OciSchema<ImageManifest>,
|
||||
) -> OciPackedImage {
|
||||
OciPackedImage {
|
||||
name,
|
||||
digest,
|
||||
path,
|
||||
format,
|
||||
descriptor,
|
||||
config,
|
||||
manifest,
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use oci_spec::image::Descriptor;
|
||||
use tokio::{
|
||||
sync::{watch, Mutex},
|
||||
task::JoinHandle,
|
||||
@ -38,38 +39,45 @@ pub struct OciPackerService {
|
||||
}
|
||||
|
||||
impl OciPackerService {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
seed: Option<PathBuf>,
|
||||
cache_dir: &Path,
|
||||
platform: OciPlatform,
|
||||
) -> Result<OciPackerService> {
|
||||
Ok(OciPackerService {
|
||||
seed,
|
||||
cache: OciPackerCache::new(cache_dir)?,
|
||||
cache: OciPackerCache::new(cache_dir).await?,
|
||||
platform,
|
||||
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<Descriptor>> {
|
||||
self.cache.list().await
|
||||
}
|
||||
|
||||
pub async fn recall(
|
||||
&self,
|
||||
digest: &str,
|
||||
format: OciPackedFormat,
|
||||
) -> Result<Option<OciPackedImage>> {
|
||||
self.cache.recall(digest, format).await
|
||||
self.cache
|
||||
.recall(ImageName::parse("cached:latest")?, digest, format)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn request(
|
||||
&self,
|
||||
name: ImageName,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
progress_context: OciProgressContext,
|
||||
) -> Result<OciPackedImage> {
|
||||
let progress = OciProgress::new();
|
||||
let progress = OciBoundProgress::new(progress_context.clone(), progress);
|
||||
let fetcher =
|
||||
OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone());
|
||||
let resolved = fetcher.resolve(name).await?;
|
||||
let resolved = fetcher.resolve(name.clone()).await?;
|
||||
let key = OciPackerTaskKey {
|
||||
digest: resolved.digest.clone(),
|
||||
format,
|
||||
@ -86,7 +94,15 @@ impl OciPackerService {
|
||||
Entry::Vacant(entry) => {
|
||||
let task = self
|
||||
.clone()
|
||||
.launch(key.clone(), format, resolved, fetcher, progress.clone())
|
||||
.launch(
|
||||
name,
|
||||
key.clone(),
|
||||
format,
|
||||
overwrite,
|
||||
resolved,
|
||||
fetcher,
|
||||
progress.clone(),
|
||||
)
|
||||
.await;
|
||||
let (watch, receiver) = watch::channel(None);
|
||||
|
||||
@ -122,22 +138,33 @@ impl OciPackerService {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn launch(
|
||||
self,
|
||||
name: ImageName,
|
||||
key: OciPackerTaskKey,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
resolved: OciResolvedImage,
|
||||
fetcher: OciImageFetcher,
|
||||
progress: OciBoundProgress,
|
||||
) -> JoinHandle<()> {
|
||||
info!("packer task {} started", key);
|
||||
info!("started packer task {}", key);
|
||||
tokio::task::spawn(async move {
|
||||
let _task_drop_guard =
|
||||
scopeguard::guard((key.clone(), self.clone()), |(key, service)| {
|
||||
service.ensure_task_gone(key);
|
||||
});
|
||||
if let Err(error) = self
|
||||
.task(key.clone(), format, resolved, fetcher, progress)
|
||||
.task(
|
||||
name,
|
||||
key.clone(),
|
||||
format,
|
||||
overwrite,
|
||||
resolved,
|
||||
fetcher,
|
||||
progress,
|
||||
)
|
||||
.await
|
||||
{
|
||||
self.finish(&key, Err(error)).await;
|
||||
@ -145,17 +172,26 @@ impl OciPackerService {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn task(
|
||||
&self,
|
||||
name: ImageName,
|
||||
key: OciPackerTaskKey,
|
||||
format: OciPackedFormat,
|
||||
overwrite: bool,
|
||||
resolved: OciResolvedImage,
|
||||
fetcher: OciImageFetcher,
|
||||
progress: OciBoundProgress,
|
||||
) -> Result<()> {
|
||||
if let Some(cached) = self.cache.recall(&resolved.digest, format).await? {
|
||||
self.finish(&key, Ok(cached)).await;
|
||||
return Ok(());
|
||||
if !overwrite {
|
||||
if let Some(cached) = self
|
||||
.cache
|
||||
.recall(name.clone(), &resolved.digest, format)
|
||||
.await?
|
||||
{
|
||||
self.finish(&key, Ok(cached)).await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let assembler =
|
||||
OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?;
|
||||
@ -171,9 +207,11 @@ impl OciPackerService {
|
||||
.pack(progress, assembled.vfs.clone(), &target)
|
||||
.await?;
|
||||
let packed = OciPackedImage::new(
|
||||
name,
|
||||
assembled.digest.clone(),
|
||||
file,
|
||||
format,
|
||||
assembled.descriptor.clone(),
|
||||
assembled.config.clone(),
|
||||
assembled.manifest.clone(),
|
||||
);
|
||||
@ -190,7 +228,7 @@ impl OciPackerService {
|
||||
|
||||
match result.as_ref() {
|
||||
Ok(_) => {
|
||||
info!("packer task {} completed", key);
|
||||
info!("completed packer task {}", key);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
@ -216,7 +254,7 @@ impl OciPackerService {
|
||||
tokio::task::spawn(async move {
|
||||
let mut tasks = self.tasks.lock().await;
|
||||
if let Some(task) = tasks.remove(&key) {
|
||||
warn!("packer task {} aborted", key);
|
||||
warn!("aborted packer task {}", key);
|
||||
task.watch.send_replace(Some(Err(anyhow!("task aborted"))));
|
||||
}
|
||||
});
|
||||
|
Reference in New Issue
Block a user