mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-05 06:01:32 +00:00
oci: utilize rw-lock for increased cache performance
This commit is contained in:
@ -14,12 +14,12 @@ use std::{
|
|||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use tokio::{fs, sync::Mutex};
|
use tokio::{fs, sync::RwLock};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct OciPackerCache {
|
pub struct OciPackerCache {
|
||||||
cache_dir: PathBuf,
|
cache_dir: PathBuf,
|
||||||
index: Arc<Mutex<ImageIndex>>,
|
index: Arc<RwLock<ImageIndex>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name";
|
const ANNOTATION_IMAGE_NAME: &str = "io.containerd.image.name";
|
||||||
@ -34,24 +34,29 @@ impl OciPackerCache {
|
|||||||
.build()?;
|
.build()?;
|
||||||
let cache = OciPackerCache {
|
let cache = OciPackerCache {
|
||||||
cache_dir: cache_dir.to_path_buf(),
|
cache_dir: cache_dir.to_path_buf(),
|
||||||
index: Arc::new(Mutex::new(index)),
|
index: Arc::new(RwLock::new(index)),
|
||||||
};
|
};
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut mutex = cache.index.lock().await;
|
let mut mutex = cache.index.write().await;
|
||||||
*mutex = cache.load_index().await?;
|
*mutex = cache.load_index().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(cache)
|
Ok(cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn list(&self) -> Result<Vec<Descriptor>> {
|
||||||
|
let index = self.index.read().await;
|
||||||
|
Ok(index.manifests().clone())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn recall(
|
pub async fn recall(
|
||||||
&self,
|
&self,
|
||||||
name: ImageName,
|
name: ImageName,
|
||||||
digest: &str,
|
digest: &str,
|
||||||
format: OciPackedFormat,
|
format: OciPackedFormat,
|
||||||
) -> Result<Option<OciPackedImage>> {
|
) -> Result<Option<OciPackedImage>> {
|
||||||
let index = self.index.lock().await;
|
let index = self.index.read().await;
|
||||||
|
|
||||||
let mut descriptor: Option<Descriptor> = None;
|
let mut descriptor: Option<Descriptor> = None;
|
||||||
for manifest in index.manifests() {
|
for manifest in index.manifests() {
|
||||||
@ -109,7 +114,7 @@ impl OciPackerCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
|
pub async fn store(&self, packed: OciPackedImage) -> Result<OciPackedImage> {
|
||||||
let mut index = self.index.lock().await;
|
let mut index = self.index.write().await;
|
||||||
let mut manifests = index.manifests().clone();
|
let mut manifests = index.manifests().clone();
|
||||||
debug!("cache store digest={}", packed.digest);
|
debug!("cache store digest={}", packed.digest);
|
||||||
let mut fs_path = self.cache_dir.clone();
|
let mut fs_path = self.cache_dir.clone();
|
||||||
|
@ -6,6 +6,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
|
use oci_spec::image::Descriptor;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{watch, Mutex},
|
sync::{watch, Mutex},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
@ -51,6 +52,10 @@ impl OciPackerService {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn list(&self) -> Result<Vec<Descriptor>> {
|
||||||
|
self.cache.list().await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn recall(
|
pub async fn recall(
|
||||||
&self,
|
&self,
|
||||||
digest: &str,
|
digest: &str,
|
||||||
@ -144,7 +149,7 @@ impl OciPackerService {
|
|||||||
fetcher: OciImageFetcher,
|
fetcher: OciImageFetcher,
|
||||||
progress: OciBoundProgress,
|
progress: OciBoundProgress,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
info!("packer task {} started", key);
|
info!("started packer task {}", key);
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let _task_drop_guard =
|
let _task_drop_guard =
|
||||||
scopeguard::guard((key.clone(), self.clone()), |(key, service)| {
|
scopeguard::guard((key.clone(), self.clone()), |(key, service)| {
|
||||||
@ -223,7 +228,7 @@ impl OciPackerService {
|
|||||||
|
|
||||||
match result.as_ref() {
|
match result.as_ref() {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("packer task {} completed", key);
|
info!("completed packer task {}", key);
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -249,7 +254,7 @@ impl OciPackerService {
|
|||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let mut tasks = self.tasks.lock().await;
|
let mut tasks = self.tasks.lock().await;
|
||||||
if let Some(task) = tasks.remove(&key) {
|
if let Some(task) = tasks.remove(&key) {
|
||||||
warn!("packer task {} aborted", key);
|
warn!("aborted packer task {}", key);
|
||||||
task.watch.send_replace(Some(Err(anyhow!("task aborted"))));
|
task.watch.send_replace(Some(Err(anyhow!("task aborted"))));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Reference in New Issue
Block a user