From a99541460dc69af574f638855df2742b43ddb737 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 16 Apr 2024 08:20:09 +0000 Subject: [PATCH] feat: concurrent image pulls --- Cargo.lock | 1 + crates/daemon/Cargo.toml | 1 + crates/daemon/src/control.rs | 52 +++++++--- crates/oci/examples/squashify.rs | 29 ++++-- crates/oci/src/assemble.rs | 47 ++++++--- crates/oci/src/fetch.rs | 4 +- crates/oci/src/packer/backend.rs | 27 ++++- crates/oci/src/packer/cache.rs | 10 +- crates/oci/src/packer/mod.rs | 10 +- crates/oci/src/packer/service.rs | 172 +++++++++++++++++++++++++++++-- crates/oci/src/progress.rs | 41 ++++++-- crates/runtime/src/cfgblk.rs | 6 +- crates/runtime/src/launch.rs | 4 +- 13 files changed, 331 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0459f90..c17176f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1452,6 +1452,7 @@ dependencies = [ "log", "prost", "redb", + "scopeguard", "signal-hook", "tokio", "tokio-stream", diff --git a/crates/daemon/Cargo.toml b/crates/daemon/Cargo.toml index 12d7295..2f1c053 100644 --- a/crates/daemon/Cargo.toml +++ b/crates/daemon/Cargo.toml @@ -23,6 +23,7 @@ krata-runtime = { path = "../runtime", version = "^0.0.9" } log = { workspace = true } prost = { workspace = true } redb = { workspace = true } +scopeguard = { workspace = true } signal-hook = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 778f232..44ae3b9 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -18,7 +18,7 @@ use krata::{ }; use krataoci::{ name::ImageName, - packer::{service::OciPackerService, OciImagePacked, OciPackedFormat}, + packer::{service::OciPackerService, OciPackedFormat, OciPackedImage}, progress::{OciProgress, OciProgressContext}, }; use std::{pin::Pin, str::FromStr}; @@ -90,8 +90,8 @@ enum ConsoleDataSelect { } enum PullImageSelect { - Progress(usize), - Completed(Result, JoinError>), + Progress(Option), + Completed(Result, JoinError>), } #[tonic::async_trait] @@ -367,32 +367,46 @@ impl ControlService for DaemonControlService { OciImageFormat::Erofs => OciPackedFormat::Erofs, OciImageFormat::Tar => OciPackedFormat::Tar, }; - let (sender, mut receiver) = channel::(100); - let context = OciProgressContext::new(sender); - + let (context, mut receiver) = OciProgressContext::create(); let our_packer = self.packer.clone(); let output = try_stream! { let mut task = tokio::task::spawn(async move { our_packer.request(name, format, context).await }); + let abort_handle = task.abort_handle(); + let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| { + handle.abort(); + }); + loop { - let mut progresses = Vec::new(); let what = select! { - x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x), + x = receiver.recv() => PullImageSelect::Progress(x.ok()), x = &mut task => PullImageSelect::Completed(x), }; match what { - PullImageSelect::Progress(count) => { - if count > 0 { - let progress = progresses.remove(progresses.len() - 1); - let reply = PullImageReply { - progress: Some(convert_oci_progress(progress)), - digest: String::new(), - format: OciImageFormat::Unknown.into(), - }; - yield reply; + PullImageSelect::Progress(Some(mut progress)) => { + let mut drain = 0; + loop { + if drain >= 10 { + break; + } + + if let Ok(latest) = receiver.try_recv() { + progress = latest; + } else { + break; + } + + drain += 1; } + + let reply = PullImageReply { + progress: Some(convert_oci_progress(progress)), + digest: String::new(), + format: OciImageFormat::Unknown.into(), + }; + yield reply; }, PullImageSelect::Completed(result) => { @@ -414,6 +428,10 @@ impl ControlService for DaemonControlService { yield reply; break; }, + + _ => { + continue; + } } } }; diff --git a/crates/oci/examples/squashify.rs b/crates/oci/examples/squashify.rs index 8bd2932..777c951 100644 --- a/crates/oci/examples/squashify.rs +++ b/crates/oci/examples/squashify.rs @@ -5,10 +5,10 @@ use env_logger::Env; use krataoci::{ name::ImageName, packer::{service::OciPackerService, OciPackedFormat}, - progress::{OciProgress, OciProgressContext}, + progress::OciProgressContext, registry::OciPlatform, }; -use tokio::{fs, sync::mpsc::channel}; +use tokio::fs; #[tokio::main] async fn main() -> Result<()> { @@ -22,14 +22,28 @@ async fn main() -> Result<()> { fs::create_dir(&cache_dir).await?; } - let (sender, mut receiver) = channel::(100); + let (context, mut receiver) = OciProgressContext::create(); tokio::task::spawn(async move { loop { - let mut progresses = Vec::new(); - let _ = receiver.recv_many(&mut progresses, 100).await; - let Some(progress) = progresses.last() else { - continue; + let Ok(mut progress) = receiver.recv().await else { + return; }; + + let mut drain = 0; + loop { + if drain >= 10 { + break; + } + + if let Ok(latest) = receiver.try_recv() { + progress = latest; + } else { + break; + } + + drain += 1; + } + println!("phase {:?}", progress.phase); for (id, layer) in &progress.layers { println!( @@ -39,7 +53,6 @@ async fn main() -> Result<()> { } } }); - let context = OciProgressContext::new(sender); let service = OciPackerService::new(seed, &cache_dir, OciPlatform::current())?; let packed = service .request(image.clone(), OciPackedFormat::Squashfs, context) diff --git a/crates/oci/src/assemble.rs b/crates/oci/src/assemble.rs index 2db1d99..2931f3c 100644 --- a/crates/oci/src/assemble.rs +++ b/crates/oci/src/assemble.rs @@ -5,8 +5,10 @@ use crate::vfs::{VfsNode, VfsTree}; use anyhow::{anyhow, Result}; use log::{debug, trace, warn}; use oci_spec::image::{ImageConfiguration, ImageManifest}; + use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs; use tokio::io::AsyncRead; @@ -34,11 +36,12 @@ impl Drop for OciImageAssembled { pub struct OciImageAssembler { downloader: OciImageFetcher, - resolved: OciResolvedImage, + resolved: Option, progress: OciBoundProgress, work_dir: PathBuf, disk_dir: PathBuf, tmp_dir: Option, + success: AtomicBool, } impl OciImageAssembler { @@ -82,11 +85,12 @@ impl OciImageAssembler { Ok(OciImageAssembler { downloader, - resolved, + resolved: Some(resolved), progress, work_dir, disk_dir: target_dir, tmp_dir, + success: AtomicBool::new(false), }) } @@ -98,11 +102,11 @@ impl OciImageAssembler { self.assemble_with(&layer_dir).await } - async fn assemble_with(self, layer_dir: &Path) -> Result { - let local = self - .downloader - .download(self.resolved.clone(), layer_dir) - .await?; + async fn assemble_with(mut self, layer_dir: &Path) -> Result { + let Some(ref resolved) = self.resolved else { + return Err(anyhow!("resolved image was not available when expected")); + }; + let local = self.downloader.download(resolved, layer_dir).await?; let mut vfs = VfsTree::new(); for layer in &local.layers { debug!( @@ -146,13 +150,20 @@ impl OciImageAssembler { fs::remove_file(&layer.path).await?; } } - Ok(OciImageAssembled { + + let Some(resolved) = self.resolved.take() else { + return Err(anyhow!("resolved image was not available when expected")); + }; + + let assembled = OciImageAssembled { vfs: Arc::new(vfs), - digest: self.resolved.digest, - manifest: self.resolved.manifest, + digest: resolved.digest, + manifest: resolved.manifest, config: local.config, - tmp_dir: self.tmp_dir, - }) + tmp_dir: self.tmp_dir.clone(), + }; + self.success.store(true, Ordering::Release); + Ok(assembled) } async fn process_whiteout_entry( @@ -223,6 +234,18 @@ impl OciImageAssembler { } } +impl Drop for OciImageAssembler { + fn drop(&mut self) { + if !self.success.load(Ordering::Acquire) { + if let Some(tmp_dir) = self.tmp_dir.clone() { + tokio::task::spawn(async move { + let _ = fs::remove_dir_all(tmp_dir).await; + }); + } + } + } +} + async fn delete_disk_paths(node: &VfsNode) -> Result<()> { let mut queue = vec![node]; while !queue.is_empty() { diff --git a/crates/oci/src/fetch.rs b/crates/oci/src/fetch.rs index 3e89f10..05817d7 100644 --- a/crates/oci/src/fetch.rs +++ b/crates/oci/src/fetch.rs @@ -219,7 +219,7 @@ impl OciImageFetcher { pub async fn download( &self, - image: OciResolvedImage, + image: &OciResolvedImage, layer_dir: &Path, ) -> Result { let config: OciSchema; @@ -270,7 +270,7 @@ impl OciImageFetcher { .await; } Ok(OciLocalImage { - image, + image: image.clone(), config, layers, }) diff --git a/crates/oci/src/packer/backend.rs b/crates/oci/src/packer/backend.rs index 6103640..2218bc8 100644 --- a/crates/oci/src/packer/backend.rs +++ b/crates/oci/src/packer/backend.rs @@ -7,7 +7,12 @@ use crate::{ }; use anyhow::{anyhow, Result}; use log::warn; -use tokio::{fs::File, pin, process::Command, select}; +use tokio::{ + fs::File, + pin, + process::{Child, Command}, + select, +}; #[derive(Debug, Clone, Copy)] pub enum OciPackerBackendType { @@ -56,7 +61,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { }) .await; - let mut child = Command::new("mksquashfs") + let child = Command::new("mksquashfs") .arg("-") .arg(file) .arg("-comp") @@ -66,7 +71,9 @@ impl OciPackerBackend for OciPackerMkSquashfs { .stderr(Stdio::null()) .stdout(Stdio::null()) .spawn()?; + let mut child = ChildProcessKillGuard(child); let stdin = child + .0 .stdin .take() .ok_or(anyhow!("unable to acquire stdin stream"))?; @@ -77,7 +84,7 @@ impl OciPackerBackend for OciPackerMkSquashfs { } Ok(()) })); - let wait = child.wait(); + let wait = child.0.wait(); pin!(wait); let status_result = loop { if let Some(inner) = writer.as_mut() { @@ -138,7 +145,7 @@ impl OciPackerBackend for OciPackerMkfsErofs { }) .await; - let mut child = Command::new("mkfs.erofs") + let child = Command::new("mkfs.erofs") .arg("-L") .arg("root") .arg("--tar=-") @@ -147,14 +154,16 @@ impl OciPackerBackend for OciPackerMkfsErofs { .stderr(Stdio::null()) .stdout(Stdio::null()) .spawn()?; + let mut child = ChildProcessKillGuard(child); let stdin = child + .0 .stdin .take() .ok_or(anyhow!("unable to acquire stdin stream"))?; let mut writer = Some(tokio::task::spawn( async move { vfs.write_to_tar(stdin).await }, )); - let wait = child.wait(); + let wait = child.0.wait(); pin!(wait); let status_result = loop { if let Some(inner) = writer.as_mut() { @@ -229,3 +238,11 @@ impl OciPackerBackend for OciPackerTar { Ok(()) } } + +struct ChildProcessKillGuard(Child); + +impl Drop for ChildProcessKillGuard { + fn drop(&mut self) { + let _ = self.0.start_kill(); + } +} diff --git a/crates/oci/src/packer/cache.rs b/crates/oci/src/packer/cache.rs index f872e64..5e4d9c2 100644 --- a/crates/oci/src/packer/cache.rs +++ b/crates/oci/src/packer/cache.rs @@ -1,5 +1,5 @@ use crate::{ - packer::{OciImagePacked, OciPackedFormat}, + packer::{OciPackedFormat, OciPackedImage}, schema::OciSchema, }; @@ -25,7 +25,7 @@ impl OciPackerCache { &self, digest: &str, format: OciPackedFormat, - ) -> Result> { + ) -> Result> { let mut fs_path = self.cache_dir.clone(); let mut config_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -46,7 +46,7 @@ impl OciPackerCache { let config_bytes = fs::read(&config_path).await?; let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; debug!("cache hit digest={}", digest); - Some(OciImagePacked::new( + Some(OciPackedImage::new( digest.to_string(), fs_path.clone(), format, @@ -63,7 +63,7 @@ impl OciPackerCache { ) } - pub async fn store(&self, packed: OciImagePacked) -> Result { + pub async fn store(&self, packed: OciPackedImage) -> Result { debug!("cache store digest={}", packed.digest); let mut fs_path = self.cache_dir.clone(); let mut manifest_path = self.cache_dir.clone(); @@ -74,7 +74,7 @@ impl OciPackerCache { fs::rename(&packed.path, &fs_path).await?; fs::write(&config_path, packed.config.raw()).await?; fs::write(&manifest_path, packed.manifest.raw()).await?; - Ok(OciImagePacked::new( + Ok(OciPackedImage::new( packed.digest, fs_path.clone(), packed.format, diff --git a/crates/oci/src/packer/mod.rs b/crates/oci/src/packer/mod.rs index fe9b83f..dd510ab 100644 --- a/crates/oci/src/packer/mod.rs +++ b/crates/oci/src/packer/mod.rs @@ -9,7 +9,7 @@ pub mod backend; pub mod cache; pub mod service; -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Hash)] pub enum OciPackedFormat { #[default] Squashfs, @@ -36,7 +36,7 @@ impl OciPackedFormat { } #[derive(Clone)] -pub struct OciImagePacked { +pub struct OciPackedImage { pub digest: String, pub path: PathBuf, pub format: OciPackedFormat, @@ -44,15 +44,15 @@ pub struct OciImagePacked { pub manifest: OciSchema, } -impl OciImagePacked { +impl OciPackedImage { pub fn new( digest: String, path: PathBuf, format: OciPackedFormat, config: OciSchema, manifest: OciSchema, - ) -> OciImagePacked { - OciImagePacked { + ) -> OciPackedImage { + OciPackedImage { digest, path, format, diff --git a/crates/oci/src/packer/service.rs b/crates/oci/src/packer/service.rs index 1d47ee0..837feb0 100644 --- a/crates/oci/src/packer/service.rs +++ b/crates/oci/src/packer/service.rs @@ -1,22 +1,40 @@ -use std::path::{Path, PathBuf}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt::Display, + path::{Path, PathBuf}, + sync::Arc, +}; use anyhow::{anyhow, Result}; +use tokio::{ + sync::{watch, Mutex}, + task::JoinHandle, +}; use crate::{ assemble::OciImageAssembler, - fetch::OciImageFetcher, + fetch::{OciImageFetcher, OciResolvedImage}, name::ImageName, progress::{OciBoundProgress, OciProgress, OciProgressContext}, registry::OciPlatform, }; -use super::{cache::OciPackerCache, OciImagePacked, OciPackedFormat}; +use log::{error, info, warn}; + +use super::{cache::OciPackerCache, OciPackedFormat, OciPackedImage}; + +pub struct OciPackerTask { + progress: OciBoundProgress, + watch: watch::Sender>>, + task: JoinHandle<()>, +} #[derive(Clone)] pub struct OciPackerService { seed: Option, platform: OciPlatform, cache: OciPackerCache, + tasks: Arc>>, } impl OciPackerService { @@ -29,6 +47,7 @@ impl OciPackerService { seed, cache: OciPackerCache::new(cache_dir)?, platform, + tasks: Arc::new(Mutex::new(HashMap::new())), }) } @@ -36,7 +55,7 @@ impl OciPackerService { &self, digest: &str, format: OciPackedFormat, - ) -> Result> { + ) -> Result> { self.cache.recall(digest, format).await } @@ -45,14 +64,98 @@ impl OciPackerService { name: ImageName, format: OciPackedFormat, progress_context: OciProgressContext, - ) -> Result { + ) -> Result { let progress = OciProgress::new(); let progress = OciBoundProgress::new(progress_context.clone(), progress); let fetcher = OciImageFetcher::new(self.seed.clone(), self.platform.clone(), progress.clone()); let resolved = fetcher.resolve(name).await?; + let key = OciPackerTaskKey { + digest: resolved.digest.clone(), + format, + }; + let (progress_copy_task, mut receiver) = match self.tasks.lock().await.entry(key.clone()) { + Entry::Occupied(entry) => { + let entry = entry.get(); + ( + Some(entry.progress.also_update(progress_context).await), + entry.watch.subscribe(), + ) + } + + Entry::Vacant(entry) => { + let task = self + .clone() + .launch(key.clone(), format, resolved, fetcher, progress.clone()) + .await; + let (watch, receiver) = watch::channel(None); + + let task = OciPackerTask { + progress: progress.clone(), + task, + watch, + }; + entry.insert(task); + (None, receiver) + } + }; + + let _progress_task_guard = scopeguard::guard(progress_copy_task, |task| { + if let Some(task) = task { + task.abort(); + } + }); + + let _task_cancel_guard = scopeguard::guard(self.clone(), |service| { + service.maybe_cancel_task(key); + }); + + loop { + receiver.changed().await?; + let current = receiver.borrow_and_update(); + if current.is_some() { + return current + .as_ref() + .map(|x| x.as_ref().map_err(|err| anyhow!("{}", err)).cloned()) + .unwrap(); + } + } + } + + async fn launch( + self, + key: OciPackerTaskKey, + format: OciPackedFormat, + resolved: OciResolvedImage, + fetcher: OciImageFetcher, + progress: OciBoundProgress, + ) -> JoinHandle<()> { + info!("packer task {} started", key); + tokio::task::spawn(async move { + let _task_drop_guard = + scopeguard::guard((key.clone(), self.clone()), |(key, service)| { + service.ensure_task_gone(key); + }); + if let Err(error) = self + .task(key.clone(), format, resolved, fetcher, progress) + .await + { + self.finish(&key, Err(error)).await; + } + }) + } + + async fn task( + &self, + key: OciPackerTaskKey, + format: OciPackedFormat, + resolved: OciResolvedImage, + fetcher: OciImageFetcher, + progress: OciBoundProgress, + ) -> Result<()> { if let Some(cached) = self.cache.recall(&resolved.digest, format).await? { - return Ok(cached); + self.finish(&key, Ok(cached)).await; + return Ok(()); } let assembler = OciImageAssembler::new(fetcher, resolved, progress.clone(), None, None).await?; @@ -67,7 +170,7 @@ impl OciPackerService { packer .pack(progress, assembled.vfs.clone(), &target) .await?; - let packed = OciImagePacked::new( + let packed = OciPackedImage::new( assembled.digest.clone(), file, format, @@ -75,6 +178,59 @@ impl OciPackerService { assembled.manifest.clone(), ); let packed = self.cache.store(packed).await?; - Ok(packed) + self.finish(&key, Ok(packed)).await; + Ok(()) + } + + async fn finish(&self, key: &OciPackerTaskKey, result: Result) { + let Some(task) = self.tasks.lock().await.remove(key) else { + error!("packer task {} was not found when task completed", key); + return; + }; + + match result.as_ref() { + Ok(_) => { + info!("packer task {} completed", key); + } + + Err(err) => { + warn!("packer task {} failed: {}", key, err); + } + } + + task.watch.send_replace(Some(result)); + } + + fn maybe_cancel_task(self, key: OciPackerTaskKey) { + tokio::task::spawn(async move { + let tasks = self.tasks.lock().await; + if let Some(task) = tasks.get(&key) { + if task.watch.is_closed() { + task.task.abort(); + } + } + }); + } + + fn ensure_task_gone(self, key: OciPackerTaskKey) { + tokio::task::spawn(async move { + let mut tasks = self.tasks.lock().await; + if let Some(task) = tasks.remove(&key) { + warn!("packer task {} aborted", key); + task.watch.send_replace(Some(Err(anyhow!("task aborted")))); + } + }); + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +struct OciPackerTaskKey { + digest: String, + format: OciPackedFormat, +} + +impl Display for OciPackerTaskKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}:{}", self.digest, self.format.extension())) } } diff --git a/crates/oci/src/progress.rs b/crates/oci/src/progress.rs index 43bdaea..e5e22e3 100644 --- a/crates/oci/src/progress.rs +++ b/crates/oci/src/progress.rs @@ -1,7 +1,11 @@ -use std::sync::Arc; - use indexmap::IndexMap; -use tokio::sync::{mpsc::Sender, Mutex}; +use std::sync::Arc; +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinHandle, +}; + +const OCI_PROGRESS_QUEUE_LEN: usize = 100; #[derive(Clone, Debug)] pub struct OciProgress { @@ -99,16 +103,25 @@ pub enum OciProgressLayerPhase { #[derive(Clone)] pub struct OciProgressContext { - sender: Sender, + sender: broadcast::Sender, } impl OciProgressContext { - pub fn new(sender: Sender) -> OciProgressContext { + pub fn create() -> (OciProgressContext, broadcast::Receiver) { + let (sender, receiver) = broadcast::channel(OCI_PROGRESS_QUEUE_LEN); + (OciProgressContext::new(sender), receiver) + } + + pub fn new(sender: broadcast::Sender) -> OciProgressContext { OciProgressContext { sender } } pub fn update(&self, progress: &OciProgress) { - let _ = self.sender.try_send(progress.clone()); + let _ = self.sender.send(progress.clone()); + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() } } @@ -137,4 +150,20 @@ impl OciBoundProgress { function(&mut progress); self.context.update(&progress); } + + pub async fn also_update(&self, context: OciProgressContext) -> JoinHandle<()> { + let progress = self.instance.lock().await.clone(); + context.update(&progress); + let mut receiver = self.context.subscribe(); + tokio::task::spawn(async move { + while let Ok(progress) = receiver.recv().await { + match context.sender.send(progress) { + Ok(_) => {} + Err(_) => { + break; + } + } + } + }) + } } diff --git a/crates/runtime/src/cfgblk.rs b/crates/runtime/src/cfgblk.rs index fbfe563..00f3807 100644 --- a/crates/runtime/src/cfgblk.rs +++ b/crates/runtime/src/cfgblk.rs @@ -1,7 +1,7 @@ use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; use krata::launchcfg::LaunchInfo; -use krataoci::packer::OciImagePacked; +use krataoci::packer::OciPackedImage; use log::trace; use std::fs; use std::fs::File; @@ -9,13 +9,13 @@ use std::path::PathBuf; use uuid::Uuid; pub struct ConfigBlock<'a> { - pub image: &'a OciImagePacked, + pub image: &'a OciPackedImage, pub file: PathBuf, pub dir: PathBuf, } impl ConfigBlock<'_> { - pub fn new<'a>(uuid: &Uuid, image: &'a OciImagePacked) -> Result> { + pub fn new<'a>(uuid: &Uuid, image: &'a OciPackedImage) -> Result> { let mut dir = std::env::temp_dir().clone(); dir.push(format!("krata-cfg-{}", uuid)); fs::create_dir_all(&dir)?; diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index b0ca22d..e76f892 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -10,7 +10,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, }; -use krataoci::packer::OciImagePacked; +use krataoci::packer::OciPackedImage; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; @@ -30,7 +30,7 @@ pub struct GuestLaunchRequest { pub env: HashMap, pub run: Option>, pub debug: bool, - pub image: OciImagePacked, + pub image: OciPackedImage, } pub struct GuestLauncher {