feat: oci progress events

This commit is contained in:
Alex Zenla
2024-04-12 10:54:51 +00:00
parent 346cf4a7fa
commit 8d26cd8fb2
24 changed files with 454 additions and 153 deletions

1
Cargo.lock generated
View File

@ -1347,6 +1347,7 @@ dependencies = [
"env_logger", "env_logger",
"futures", "futures",
"krata", "krata",
"krata-oci",
"krata-runtime", "krata-runtime",
"log", "log",
"prost", "prost",

View File

@ -52,32 +52,30 @@ impl DestroyCommand {
async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> { async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> {
let mut stream = events.subscribe(); let mut stream = events.subscribe();
while let Ok(event) = stream.recv().await { while let Ok(event) = stream.recv().await {
match event { if let Event::GuestChanged(changed) = event {
Event::GuestChanged(changed) => { let Some(guest) = changed.guest else {
let Some(guest) = changed.guest else { continue;
continue; };
};
if guest.id != id { if guest.id != id {
continue; continue;
}
let Some(state) = guest.state else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == GuestStatus::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
error!("guest error: {}", error.message);
} }
}
let Some(state) = guest.state else { if state.status() == GuestStatus::Destroyed {
continue; std::process::exit(0);
};
if let Some(ref error) = state.error_info {
if state.status() == GuestStatus::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
error!("guest error: {}", error.message);
}
}
if state.status() == GuestStatus::Destroyed {
std::process::exit(0);
}
} }
} }
} }

View File

@ -158,6 +158,8 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> {
break; break;
} }
} }
Event::OciProgress(_oci) => {}
} }
} }
Ok(()) Ok(())

View File

@ -28,11 +28,10 @@ impl WatchCommand {
let mut stream = events.subscribe(); let mut stream = events.subscribe();
loop { loop {
let event = stream.recv().await?; let event = stream.recv().await?;
match event {
Event::GuestChanged(changed) => { if let Event::GuestChanged(changed) = event {
let guest = changed.guest.clone(); let guest = changed.guest.clone();
self.print_event("guest.changed", changed, guest)?; self.print_event("guest.changed", changed, guest)?;
}
} }
} }
} }

View File

@ -69,28 +69,26 @@ impl StdioConsoleStream {
Ok(tokio::task::spawn(async move { Ok(tokio::task::spawn(async move {
let mut stream = events.subscribe(); let mut stream = events.subscribe();
while let Ok(event) = stream.recv().await { while let Ok(event) = stream.recv().await {
match event { if let Event::GuestChanged(changed) = event {
Event::GuestChanged(changed) => { let Some(guest) = changed.guest else {
let Some(guest) = changed.guest else { continue;
continue; };
};
let Some(state) = guest.state else { let Some(state) = guest.state else {
continue; continue;
}; };
if guest.id != id { if guest.id != id {
continue; continue;
} }
if let Some(exit_info) = state.exit_info { if let Some(exit_info) = state.exit_info {
return Some(exit_info.code); return Some(exit_info.code);
} }
let status = state.status(); let status = state.status();
if status == GuestStatus::Destroying || status == GuestStatus::Destroyed { if status == GuestStatus::Destroying || status == GuestStatus::Destroyed {
return Some(10); return Some(10);
}
} }
} }
} }

View File

@ -18,6 +18,7 @@ clap = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
krata = { path = "../krata", version = "^0.0.8" } krata = { path = "../krata", version = "^0.0.8" }
krata-oci = { path = "../oci", version = "^0.0.8" }
krata-runtime = { path = "../runtime", version = "^0.0.8" } krata-runtime = { path = "../runtime", version = "^0.0.8" }
log = { workspace = true } log = { workspace = true }
prost = { workspace = true } prost = { workspace = true }

View File

@ -3,7 +3,6 @@ use clap::Parser;
use env_logger::Env; use env_logger::Env;
use krata::dial::ControlDialAddress; use krata::dial::ControlDialAddress;
use kratad::Daemon; use kratad::Daemon;
use kratart::Runtime;
use log::LevelFilter; use log::LevelFilter;
use std::{ use std::{
str::FromStr, str::FromStr,
@ -27,8 +26,8 @@ async fn main() -> Result<()> {
let args = DaemonCommand::parse(); let args = DaemonCommand::parse();
let addr = ControlDialAddress::from_str(&args.listen)?; let addr = ControlDialAddress::from_str(&args.listen)?;
let runtime = Runtime::new(args.store.clone()).await?;
let mut daemon = Daemon::new(args.store.clone(), runtime).await?; let mut daemon = Daemon::new(args.store.clone()).await?;
daemon.listen(addr).await?; daemon.listen(addr).await?;
Ok(()) Ok(())
} }

View File

@ -9,6 +9,7 @@ use krata::{
idm::protocol::{idm_event::Event, IdmEvent}, idm::protocol::{idm_event::Event, IdmEvent},
v1::common::{GuestExitInfo, GuestState, GuestStatus}, v1::common::{GuestExitInfo, GuestState, GuestStatus},
}; };
use krataoci::progress::OciProgress;
use log::{error, warn}; use log::{error, warn};
use tokio::{ use tokio::{
select, select,
@ -21,7 +22,7 @@ use tokio::{
}; };
use uuid::Uuid; use uuid::Uuid;
use crate::{db::GuestStore, idm::DaemonIdmHandle}; use crate::{db::GuestStore, idm::DaemonIdmHandle, oci::convert_oci_progress};
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event; pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
@ -52,7 +53,8 @@ pub struct DaemonEventGenerator {
idms: HashMap<u32, (Uuid, JoinHandle<()>)>, idms: HashMap<u32, (Uuid, JoinHandle<()>)>,
idm_sender: Sender<(u32, IdmEvent)>, idm_sender: Sender<(u32, IdmEvent)>,
idm_receiver: Receiver<(u32, IdmEvent)>, idm_receiver: Receiver<(u32, IdmEvent)>,
_event_sender: broadcast::Sender<DaemonEvent>, oci_progress_receiver: broadcast::Receiver<OciProgress>,
event_sender: broadcast::Sender<DaemonEvent>,
} }
impl DaemonEventGenerator { impl DaemonEventGenerator {
@ -60,6 +62,7 @@ impl DaemonEventGenerator {
guests: GuestStore, guests: GuestStore,
guest_reconciler_notify: Sender<Uuid>, guest_reconciler_notify: Sender<Uuid>,
idm: DaemonIdmHandle, idm: DaemonIdmHandle,
oci_progress_receiver: broadcast::Receiver<OciProgress>,
) -> Result<(DaemonEventContext, DaemonEventGenerator)> { ) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN); let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN);
@ -71,55 +74,54 @@ impl DaemonEventGenerator {
idms: HashMap::new(), idms: HashMap::new(),
idm_sender, idm_sender,
idm_receiver, idm_receiver,
_event_sender: sender.clone(), oci_progress_receiver,
event_sender: sender.clone(),
}; };
let context = DaemonEventContext { sender }; let context = DaemonEventContext { sender };
Ok((context, generator)) Ok((context, generator))
} }
async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> { async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> {
match event { if let DaemonEvent::GuestChanged(changed) = event {
DaemonEvent::GuestChanged(changed) => { let Some(ref guest) = changed.guest else {
let Some(ref guest) = changed.guest else { return Ok(());
return Ok(()); };
};
let Some(ref state) = guest.state else { let Some(ref state) = guest.state else {
return Ok(()); return Ok(());
}; };
let status = state.status(); let status = state.status();
let id = Uuid::from_str(&guest.id)?; let id = Uuid::from_str(&guest.id)?;
let domid = state.domid; let domid = state.domid;
match status { match status {
GuestStatus::Started => { GuestStatus::Started => {
if let Entry::Vacant(e) = self.idms.entry(domid) { if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client(domid).await?; let client = self.idm.client(domid).await?;
let mut receiver = client.subscribe().await?; let mut receiver = client.subscribe().await?;
let sender = self.idm_sender.clone(); let sender = self.idm_sender.clone();
let task = tokio::task::spawn(async move { let task = tokio::task::spawn(async move {
loop { loop {
let Ok(event) = receiver.recv().await else { let Ok(event) = receiver.recv().await else {
break; break;
}; };
if let Err(error) = sender.send((domid, event)).await { if let Err(error) = sender.send((domid, event)).await {
warn!("unable to deliver idm event: {}", error); warn!("unable to deliver idm event: {}", error);
}
} }
}); }
e.insert((id, task)); });
} e.insert((id, task));
} }
GuestStatus::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
}
_ => {}
} }
GuestStatus::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
}
_ => {}
} }
} }
Ok(()) Ok(())
@ -148,6 +150,17 @@ impl DaemonEventGenerator {
Ok(()) Ok(())
} }
async fn handle_oci_progress_event(&mut self, progress: OciProgress) -> Result<()> {
let Some(_) = Uuid::from_str(&progress.id).ok() else {
return Ok(());
};
let event = convert_oci_progress(progress);
self.event_sender.send(DaemonEvent::OciProgress(event))?;
Ok(())
}
async fn evaluate(&mut self) -> Result<()> { async fn evaluate(&mut self) -> Result<()> {
select! { select! {
x = self.idm_receiver.recv() => match x { x = self.idm_receiver.recv() => match x {
@ -168,6 +181,14 @@ impl DaemonEventGenerator {
Err(error) => { Err(error) => {
Err(error.into()) Err(error.into())
} }
},
x = self.oci_progress_receiver.recv() => match x {
Ok(event) => {
self.handle_oci_progress_event(event).await
},
Err(error) => {
Err(error.into())
}
} }
} }
} }

View File

@ -7,12 +7,16 @@ use db::GuestStore;
use event::{DaemonEventContext, DaemonEventGenerator}; use event::{DaemonEventContext, DaemonEventGenerator};
use idm::{DaemonIdm, DaemonIdmHandle}; use idm::{DaemonIdm, DaemonIdmHandle};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use krataoci::progress::OciProgressContext;
use kratart::Runtime; use kratart::Runtime;
use log::info; use log::info;
use reconcile::guest::GuestReconciler; use reconcile::guest::GuestReconciler;
use tokio::{ use tokio::{
net::UnixListener, net::UnixListener,
sync::mpsc::{channel, Sender}, sync::{
broadcast,
mpsc::{channel, Sender},
},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_stream::wrappers::UnixListenerStream; use tokio_stream::wrappers::UnixListenerStream;
@ -25,6 +29,7 @@ pub mod db;
pub mod event; pub mod event;
pub mod idm; pub mod idm;
pub mod metrics; pub mod metrics;
pub mod oci;
pub mod reconcile; pub mod reconcile;
pub struct Daemon { pub struct Daemon {
@ -39,9 +44,14 @@ pub struct Daemon {
} }
const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;
const OCI_PROGRESS_QUEUE_LEN: usize = 1000;
impl Daemon { impl Daemon {
pub async fn new(store: String, runtime: Runtime) -> Result<Self> { pub async fn new(store: String) -> Result<Self> {
let (oci_progress_sender, oci_progress_receiver) =
broadcast::channel(OCI_PROGRESS_QUEUE_LEN);
let runtime =
Runtime::new(OciProgressContext::new(oci_progress_sender), store.clone()).await?;
let guests_db_path = format!("{}/guests.db", store); let guests_db_path = format!("{}/guests.db", store);
let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
let (guest_reconciler_notify, guest_reconciler_receiver) = let (guest_reconciler_notify, guest_reconciler_receiver) =
@ -50,9 +60,13 @@ impl Daemon {
let idm = idm.launch().await?; let idm = idm.launch().await?;
let console = DaemonConsole::new().await?; let console = DaemonConsole::new().await?;
let console = console.launch().await?; let console = console.launch().await?;
let (events, generator) = let (events, generator) = DaemonEventGenerator::new(
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) guests.clone(),
.await?; guest_reconciler_notify.clone(),
idm.clone(),
oci_progress_receiver,
)
.await?;
let runtime_for_reconciler = runtime.dupe().await?; let runtime_for_reconciler = runtime.dupe().await?;
let guest_reconciler = GuestReconciler::new( let guest_reconciler = GuestReconciler::new(
guests.clone(), guests.clone(),

40
crates/daemon/src/oci.rs Normal file
View File

@ -0,0 +1,40 @@
use krata::v1::control::{
OciProgressEvent, OciProgressEventLayer, OciProgressEventLayerPhase, OciProgressEventPhase,
};
use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase};
fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer {
OciProgressEventLayer {
id: layer.id,
phase: match layer.phase {
OciProgressLayerPhase::Waiting => OciProgressEventLayerPhase::Waiting,
OciProgressLayerPhase::Downloading => OciProgressEventLayerPhase::Downloading,
OciProgressLayerPhase::Downloaded => OciProgressEventLayerPhase::Downloaded,
OciProgressLayerPhase::Extracting => OciProgressEventLayerPhase::Extracting,
OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted,
}
.into(),
progress: layer.progress,
}
}
pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent {
OciProgressEvent {
guest_id: oci.id,
phase: match oci.phase {
OciProgressPhase::Resolving => OciProgressEventPhase::Resolving,
OciProgressPhase::Resolved => OciProgressEventPhase::Resolved,
OciProgressPhase::ConfigAcquire => OciProgressEventPhase::ConfigAcquire,
OciProgressPhase::LayerAcquire => OciProgressEventPhase::LayerAcquire,
OciProgressPhase::Packing => OciProgressEventPhase::Packing,
OciProgressPhase::Complete => OciProgressEventPhase::Complete,
}
.into(),
layers: oci
.layers
.into_values()
.map(convert_oci_layer_progress)
.collect::<Vec<_>>(),
progress: oci.progress,
}
}

View File

@ -61,6 +61,7 @@ message WatchEventsRequest {}
message WatchEventsReply { message WatchEventsReply {
oneof event { oneof event {
GuestChangedEvent guest_changed = 1; GuestChangedEvent guest_changed = 1;
OciProgressEvent oci_progress = 2;
} }
} }
@ -68,6 +69,38 @@ message GuestChangedEvent {
krata.v1.common.Guest guest = 1; krata.v1.common.Guest guest = 1;
} }
enum OciProgressEventLayerPhase {
OCI_PROGRESS_EVENT_LAYER_PHASE_UNKNOWN = 0;
OCI_PROGRESS_EVENT_LAYER_PHASE_WAITING = 1;
OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADING = 2;
OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADED = 3;
OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTING = 4;
OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTED = 5;
}
message OciProgressEventLayer {
string id = 1;
OciProgressEventLayerPhase phase = 2;
double progress = 3;
}
enum OciProgressEventPhase {
OCI_PROGRESS_EVENT_PHASE_UNKNOWN = 0;
OCI_PROGRESS_EVENT_PHASE_RESOLVING = 1;
OCI_PROGRESS_EVENT_PHASE_RESOLVED = 2;
OCI_PROGRESS_EVENT_PHASE_CONFIG_ACQUIRE = 3;
OCI_PROGRESS_EVENT_PHASE_LAYER_ACQUIRE = 4;
OCI_PROGRESS_EVENT_PHASE_PACKING = 5;
OCI_PROGRESS_EVENT_PHASE_COMPLETE = 6;
}
message OciProgressEvent {
string guest_id = 1;
OciProgressEventPhase phase = 2;
repeated OciProgressEventLayer layers = 3;
double progress = 4;
}
message ReadGuestMetricsRequest { message ReadGuestMetricsRequest {
string guest_id = 1; string guest_id = 1;
} }

View File

@ -1 +1,2 @@
#![allow(clippy::all)]
tonic::include_proto!("krata.v1.common"); tonic::include_proto!("krata.v1.common");

View File

@ -1 +1,2 @@
#![allow(clippy::all)]
tonic::include_proto!("krata.v1.control"); tonic::include_proto!("krata.v1.control");

View File

@ -179,6 +179,8 @@ impl AutoNetworkWatcher {
break; break;
}, },
Ok(_) => {},
Err(error) => { Err(error) => {
warn!("failed to receive event: {}", error); warn!("failed to receive event: {}", error);
} }

View File

@ -2,8 +2,10 @@ use std::{env::args, path::PathBuf};
use anyhow::Result; use anyhow::Result;
use env_logger::Env; use env_logger::Env;
use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName}; use krataoci::{
use tokio::fs; cache::ImageCache, compiler::ImageCompiler, name::ImageName, progress::OciProgressContext,
};
use tokio::{fs, sync::broadcast};
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -18,8 +20,18 @@ async fn main() -> Result<()> {
} }
let cache = ImageCache::new(&cache_dir)?; let cache = ImageCache::new(&cache_dir)?;
let compiler = ImageCompiler::new(&cache, seed)?;
let info = compiler.compile(&image).await?; let (sender, mut receiver) = broadcast::channel(1000);
tokio::task::spawn(async move {
loop {
let Some(_) = receiver.recv().await.ok() else {
break;
};
}
});
let context = OciProgressContext::new(sender);
let compiler = ImageCompiler::new(&cache, seed, context)?;
let info = compiler.compile(&image.to_string(), &image).await?;
println!( println!(
"generated squashfs of {} to {}", "generated squashfs of {} to {}",
image, image,

View File

@ -1,6 +1,7 @@
use crate::cache::ImageCache; use crate::cache::ImageCache;
use crate::fetch::{OciImageDownloader, OciImageLayer}; use crate::fetch::{OciImageDownloader, OciImageLayer};
use crate::name::ImageName; use crate::name::ImageName;
use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase};
use crate::registry::OciRegistryPlatform; use crate::registry::OciRegistryPlatform;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use backhand::compression::Compressor; use backhand::compression::Compressor;
@ -8,6 +9,7 @@ use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader};
use log::{debug, trace, warn}; use log::{debug, trace, warn};
use oci_spec::image::{ImageConfiguration, ImageManifest}; use oci_spec::image::{ImageConfiguration, ImageManifest};
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, ErrorKind, Read}; use std::io::{BufWriter, ErrorKind, Read};
use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt};
@ -45,14 +47,23 @@ impl ImageInfo {
pub struct ImageCompiler<'a> { pub struct ImageCompiler<'a> {
cache: &'a ImageCache, cache: &'a ImageCache,
seed: Option<PathBuf>, seed: Option<PathBuf>,
progress: OciProgressContext,
} }
impl ImageCompiler<'_> { impl ImageCompiler<'_> {
pub fn new(cache: &ImageCache, seed: Option<PathBuf>) -> Result<ImageCompiler> { pub fn new(
Ok(ImageCompiler { cache, seed }) cache: &ImageCache,
seed: Option<PathBuf>,
progress: OciProgressContext,
) -> Result<ImageCompiler> {
Ok(ImageCompiler {
cache,
seed,
progress,
})
} }
pub async fn compile(&self, image: &ImageName) -> Result<ImageInfo> { pub async fn compile(&self, id: &str, image: &ImageName) -> Result<ImageInfo> {
debug!("compile image={image}"); debug!("compile image={image}");
let mut tmp_dir = std::env::temp_dir().clone(); let mut tmp_dir = std::env::temp_dir().clone();
tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4()));
@ -68,7 +79,7 @@ impl ImageCompiler<'_> {
let mut squash_file = tmp_dir.clone(); let mut squash_file = tmp_dir.clone();
squash_file.push("image.squashfs"); squash_file.push("image.squashfs");
let info = self let info = self
.download_and_compile(image, &layer_dir, &image_dir, &squash_file) .download_and_compile(id, image, &layer_dir, &image_dir, &squash_file)
.await?; .await?;
fs::remove_dir_all(&tmp_dir).await?; fs::remove_dir_all(&tmp_dir).await?;
Ok(info) Ok(info)
@ -76,15 +87,24 @@ impl ImageCompiler<'_> {
async fn download_and_compile( async fn download_and_compile(
&self, &self,
id: &str,
image: &ImageName, image: &ImageName,
layer_dir: &Path, layer_dir: &Path,
image_dir: &Path, image_dir: &Path,
squash_file: &Path, squash_file: &Path,
) -> Result<ImageInfo> { ) -> Result<ImageInfo> {
let mut progress = OciProgress {
id: id.to_string(),
phase: OciProgressPhase::Resolving,
layers: BTreeMap::new(),
progress: 0.0,
};
self.progress.update(&progress);
let downloader = OciImageDownloader::new( let downloader = OciImageDownloader::new(
self.seed.clone(), self.seed.clone(),
layer_dir.to_path_buf(), layer_dir.to_path_buf(),
OciRegistryPlatform::current(), OciRegistryPlatform::current(),
self.progress.clone(),
); );
let resolved = downloader.resolve(image.clone()).await?; let resolved = downloader.resolve(image.clone()).await?;
let cache_key = format!( let cache_key = format!(
@ -93,28 +113,44 @@ impl ImageCompiler<'_> {
); );
let cache_digest = sha256::digest(cache_key); let cache_digest = sha256::digest(cache_key);
progress.phase = OciProgressPhase::Complete;
self.progress.update(&progress);
if let Some(cached) = self.cache.recall(&cache_digest).await? { if let Some(cached) = self.cache.recall(&cache_digest).await? {
return Ok(cached); return Ok(cached);
} }
let local = downloader.download(resolved).await?; progress.phase = OciProgressPhase::Resolved;
for layer in resolved.manifest.layers() {
progress.add_layer(layer.digest());
}
self.progress.update(&progress);
let local = downloader.download(resolved, &mut progress).await?;
for layer in &local.layers { for layer in &local.layers {
debug!( debug!(
"process layer digest={} compression={:?}", "process layer digest={} compression={:?}",
&layer.digest, layer.compression, &layer.digest, layer.compression,
); );
let whiteouts = self.process_layer_whiteout(layer, image_dir).await?; progress.extracting_layer(&layer.digest, 0, 0);
self.progress.update(&progress);
let (whiteouts, count) = self.process_layer_whiteout(layer, image_dir).await?;
progress.extracting_layer(&layer.digest, 0, count);
self.progress.update(&progress);
debug!( debug!(
"process layer digest={} whiteouts={:?}", "process layer digest={} whiteouts={:?}",
&layer.digest, whiteouts &layer.digest, whiteouts
); );
let mut archive = layer.archive().await?; let mut archive = layer.archive().await?;
let mut entries = archive.entries()?; let mut entries = archive.entries()?;
let mut completed = 0;
while let Some(entry) = entries.next().await { while let Some(entry) = entries.next().await {
let mut entry = entry?; let mut entry = entry?;
let path = entry.path()?; let path = entry.path()?;
let mut maybe_whiteout_path_str = let mut maybe_whiteout_path_str =
path.to_str().map(|x| x.to_string()).unwrap_or_default(); path.to_str().map(|x| x.to_string()).unwrap_or_default();
completed += 1;
progress.extracting_layer(&layer.digest, completed, count);
self.progress.update(&progress);
if whiteouts.contains(&maybe_whiteout_path_str) { if whiteouts.contains(&maybe_whiteout_path_str) {
continue; continue;
} }
@ -144,7 +180,7 @@ impl ImageCompiler<'_> {
} }
} }
self.squash(image_dir, squash_file)?; self.squash(image_dir, squash_file, &mut progress)?;
let info = ImageInfo::new( let info = ImageInfo::new(
squash_file.to_path_buf(), squash_file.to_path_buf(),
local.image.manifest, local.image.manifest,
@ -157,12 +193,14 @@ impl ImageCompiler<'_> {
&self, &self,
layer: &OciImageLayer, layer: &OciImageLayer,
image_dir: &Path, image_dir: &Path,
) -> Result<Vec<String>> { ) -> Result<(Vec<String>, usize)> {
let mut whiteouts = Vec::new(); let mut whiteouts = Vec::new();
let mut archive = layer.archive().await?; let mut archive = layer.archive().await?;
let mut entries = archive.entries()?; let mut entries = archive.entries()?;
let mut count = 0usize;
while let Some(entry) = entries.next().await { while let Some(entry) = entries.next().await {
let entry = entry?; let entry = entry?;
count += 1;
let path = entry.path()?; let path = entry.path()?;
let Some(name) = path.file_name() else { let Some(name) = path.file_name() else {
return Err(anyhow!("unable to get file name")); return Err(anyhow!("unable to get file name"));
@ -180,7 +218,7 @@ impl ImageCompiler<'_> {
} }
} }
} }
Ok(whiteouts) Ok((whiteouts, count))
} }
async fn process_whiteout_entry( async fn process_whiteout_entry(
@ -300,7 +338,15 @@ impl ImageCompiler<'_> {
Ok(()) Ok(())
} }
fn squash(&self, image_dir: &Path, squash_file: &Path) -> Result<()> { fn squash(
&self,
image_dir: &Path,
squash_file: &Path,
progress: &mut OciProgress,
) -> Result<()> {
progress.phase = OciProgressPhase::Packing;
progress.progress = 0.0;
self.progress.update(progress);
let mut writer = FilesystemWriter::default(); let mut writer = FilesystemWriter::default();
writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?);
let walk = WalkDir::new(image_dir).follow_links(false); let walk = WalkDir::new(image_dir).follow_links(false);
@ -358,6 +404,10 @@ impl ImageCompiler<'_> {
} }
} }
progress.phase = OciProgressPhase::Packing;
progress.progress = 50.0;
self.progress.update(progress);
let squash_file_path = squash_file let squash_file_path = squash_file
.to_str() .to_str()
.ok_or_else(|| anyhow!("failed to convert squashfs string"))?; .ok_or_else(|| anyhow!("failed to convert squashfs string"))?;
@ -367,6 +417,9 @@ impl ImageCompiler<'_> {
trace!("squash generate: {}", squash_file_path); trace!("squash generate: {}", squash_file_path);
writer.write(&mut bufwrite)?; writer.write(&mut bufwrite)?;
std::fs::remove_dir_all(image_dir)?; std::fs::remove_dir_all(image_dir)?;
progress.phase = OciProgressPhase::Complete;
progress.progress = 100.0;
self.progress.update(progress);
Ok(()) Ok(())
} }
} }

View File

@ -1,3 +1,5 @@
use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase};
use super::{ use super::{
name::ImageName, name::ImageName,
registry::{OciRegistryClient, OciRegistryPlatform}, registry::{OciRegistryClient, OciRegistryPlatform},
@ -26,6 +28,7 @@ pub struct OciImageDownloader {
seed: Option<PathBuf>, seed: Option<PathBuf>,
storage: PathBuf, storage: PathBuf,
platform: OciRegistryPlatform, platform: OciRegistryPlatform,
progress: OciProgressContext,
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -79,11 +82,13 @@ impl OciImageDownloader {
seed: Option<PathBuf>, seed: Option<PathBuf>,
storage: PathBuf, storage: PathBuf,
platform: OciRegistryPlatform, platform: OciRegistryPlatform,
progress: OciProgressContext,
) -> OciImageDownloader { ) -> OciImageDownloader {
OciImageDownloader { OciImageDownloader {
seed, seed,
storage, storage,
platform, platform,
progress,
} }
} }
@ -208,9 +213,15 @@ impl OciImageDownloader {
}) })
} }
pub async fn download(&self, image: OciResolvedImage) -> Result<OciLocalImage> { pub async fn download(
&self,
image: OciResolvedImage,
progress: &mut OciProgress,
) -> Result<OciLocalImage> {
let config: ImageConfiguration; let config: ImageConfiguration;
progress.phase = OciProgressPhase::ConfigAcquire;
self.progress.update(progress);
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
if let Some(seeded) = self if let Some(seeded) = self
.load_seed_json_blob::<ImageConfiguration>(image.manifest.config()) .load_seed_json_blob::<ImageConfiguration>(image.manifest.config())
@ -223,9 +234,18 @@ impl OciImageDownloader {
.await?; .await?;
config = serde_json::from_slice(&config_bytes)?; config = serde_json::from_slice(&config_bytes)?;
} }
progress.phase = OciProgressPhase::LayerAcquire;
self.progress.update(progress);
let mut layers = Vec::new(); let mut layers = Vec::new();
for layer in image.manifest.layers() { for layer in image.manifest.layers() {
layers.push(self.acquire_layer(&image.name, layer, &mut client).await?); progress.downloading_layer(layer.digest(), 0, layer.size() as usize);
self.progress.update(progress);
layers.push(
self.acquire_layer(&image.name, layer, &mut client, progress)
.await?,
);
progress.downloaded_layer(layer.digest());
self.progress.update(progress);
} }
Ok(OciLocalImage { Ok(OciLocalImage {
image, image,
@ -239,6 +259,7 @@ impl OciImageDownloader {
image: &ImageName, image: &ImageName,
layer: &Descriptor, layer: &Descriptor,
client: &mut OciRegistryClient, client: &mut OciRegistryClient,
progress: &mut OciProgress,
) -> Result<OciImageLayer> { ) -> Result<OciImageLayer> {
debug!( debug!(
"acquire layer digest={} size={}", "acquire layer digest={} size={}",
@ -251,7 +272,15 @@ impl OciImageDownloader {
let seeded = self.extract_seed_blob(layer, &layer_path).await?; let seeded = self.extract_seed_blob(layer, &layer_path).await?;
if !seeded { if !seeded {
let file = File::create(&layer_path).await?; let file = File::create(&layer_path).await?;
let size = client.write_blob_to_file(&image.name, layer, file).await?; let size = client
.write_blob_to_file(
&image.name,
layer,
file,
Some(progress),
Some(&self.progress),
)
.await?;
if layer.size() as u64 != size { if layer.size() as u64 != size {
return Err(anyhow!( return Err(anyhow!(
"downloaded layer size differs from size in manifest", "downloaded layer size differs from size in manifest",

View File

@ -2,4 +2,5 @@ pub mod cache;
pub mod compiler; pub mod compiler;
pub mod fetch; pub mod fetch;
pub mod name; pub mod name;
pub mod progress;
pub mod registry; pub mod registry;

101
crates/oci/src/progress.rs Normal file
View File

@ -0,0 +1,101 @@
use std::collections::BTreeMap;
use tokio::sync::broadcast::Sender;
#[derive(Clone, Debug)]
pub struct OciProgress {
pub id: String,
pub phase: OciProgressPhase,
pub layers: BTreeMap<String, OciProgressLayer>,
pub progress: f64,
}
impl OciProgress {
pub fn add_layer(&mut self, id: &str) {
self.layers.insert(
id.to_string(),
OciProgressLayer {
id: id.to_string(),
phase: OciProgressLayerPhase::Waiting,
progress: 0.0,
},
);
}
pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloading;
entry.progress = if total != 0 {
(downloaded as f64 / total as f64) * 100.0
} else {
100.0
};
}
}
pub fn downloaded_layer(&mut self, id: &str) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloaded;
entry.progress = 100.0;
}
}
pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracting;
entry.progress = if total != 0 {
(extracted as f64 / total as f64) * 100.0
} else {
100.0
};
}
}
pub fn extracted_layer(&mut self, id: &str) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracted;
entry.progress = 100.0;
}
}
}
#[derive(Clone, Debug)]
pub enum OciProgressPhase {
Resolving,
Resolved,
ConfigAcquire,
LayerAcquire,
Packing,
Complete,
}
#[derive(Clone, Debug)]
pub struct OciProgressLayer {
pub id: String,
pub phase: OciProgressLayerPhase,
pub progress: f64,
}
#[derive(Clone, Debug)]
pub enum OciProgressLayerPhase {
Waiting,
Downloading,
Downloaded,
Extracting,
Extracted,
}
#[derive(Clone)]
pub struct OciProgressContext {
sender: Sender<OciProgress>,
}
impl OciProgressContext {
pub fn new(sender: Sender<OciProgress>) -> OciProgressContext {
OciProgressContext { sender }
}
pub fn update(&self, progress: &OciProgress) {
let _ = self.sender.send(progress.clone());
}
}

View File

@ -7,6 +7,8 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode};
use tokio::{fs::File, io::AsyncWriteExt}; use tokio::{fs::File, io::AsyncWriteExt};
use url::Url; use url::Url;
use crate::progress::{OciProgress, OciProgressContext};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OciRegistryPlatform { pub struct OciRegistryPlatform {
pub os: Os, pub os: Os,
@ -138,6 +140,8 @@ impl OciRegistryClient {
name: N, name: N,
descriptor: &Descriptor, descriptor: &Descriptor,
mut dest: File, mut dest: File,
mut progress_handle: Option<&mut OciProgress>,
progress_context: Option<&OciProgressContext>,
) -> Result<u64> { ) -> Result<u64> {
let url = self.url.join(&format!( let url = self.url.join(&format!(
"/v2/{}/blobs/{}", "/v2/{}/blobs/{}",
@ -146,9 +150,24 @@ impl OciRegistryClient {
))?; ))?;
let mut response = self.call(self.agent.get(url.as_str())).await?; let mut response = self.call(self.agent.get(url.as_str())).await?;
let mut size: u64 = 0; let mut size: u64 = 0;
let mut last_progress_size: u64 = 0;
while let Some(chunk) = response.chunk().await? { while let Some(chunk) = response.chunk().await? {
dest.write_all(&chunk).await?; dest.write_all(&chunk).await?;
size += chunk.len() as u64; size += chunk.len() as u64;
if (size - last_progress_size) > (5 * 1024 * 1024) {
last_progress_size = size;
if let Some(progress_handle) = progress_handle.as_mut() {
progress_handle.downloading_layer(
descriptor.digest(),
size as usize,
descriptor.size() as usize,
);
if let Some(progress_context) = progress_context.as_ref() {
progress_context.update(progress_handle);
}
}
}
} }
Ok(size) Ok(size)
} }

View File

@ -31,10 +31,6 @@ name = "kratart"
[dev-dependencies] [dev-dependencies]
env_logger = { workspace = true } env_logger = { workspace = true }
[[example]]
name = "kratart-squashify"
path = "examples/squashify.rs"
[[example]] [[example]]
name = "kratart-channel" name = "kratart-channel"
path = "examples/channel.rs" path = "examples/channel.rs"

View File

@ -1,29 +0,0 @@
use std::{env::args, path::PathBuf};
use anyhow::Result;
use env_logger::Env;
use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName};
use tokio::fs;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let image = ImageName::parse(&args().nth(1).unwrap())?;
let seed = args().nth(2).map(PathBuf::from);
let cache_dir = PathBuf::from("krata-cache");
if !cache_dir.exists() {
fs::create_dir(&cache_dir).await?;
}
let cache = ImageCache::new(&cache_dir)?;
let compiler = ImageCompiler::new(&cache, seed)?;
let info = compiler.compile(&image).await?;
println!(
"generated squashfs of {} to {}",
image,
info.image_squashfs.to_string_lossy()
);
Ok(())
}

View File

@ -9,7 +9,8 @@ use ipnetwork::{IpNetwork, Ipv4Network};
use krata::launchcfg::{ use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
}; };
use tokio::sync::Semaphore; use krataoci::progress::OciProgressContext;
use tokio::sync::{broadcast, Semaphore};
use uuid::Uuid; use uuid::Uuid;
use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface};
use xenstore::XsdInterface; use xenstore::XsdInterface;
@ -51,7 +52,9 @@ impl GuestLauncher {
) -> Result<GuestInfo> { ) -> Result<GuestInfo> {
let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let uuid = request.uuid.unwrap_or_else(Uuid::new_v4);
let xen_name = format!("krata-{uuid}"); let xen_name = format!("krata-{uuid}");
let image_info = self.compile(request.image, &context.image_cache).await?; let image_info = self
.compile(&uuid.to_string(), request.image, &context.image_cache)
.await?;
let mut gateway_mac = MacAddr6::random(); let mut gateway_mac = MacAddr6::random();
gateway_mac.set_local(true); gateway_mac.set_local(true);
@ -243,10 +246,12 @@ impl GuestLauncher {
} }
} }
async fn compile(&self, image: &str, image_cache: &ImageCache) -> Result<ImageInfo> { async fn compile(&self, id: &str, image: &str, image_cache: &ImageCache) -> Result<ImageInfo> {
let image = ImageName::parse(image)?; let image = ImageName::parse(image)?;
let compiler = ImageCompiler::new(image_cache, None)?; let (sender, _) = broadcast::channel(1000);
compiler.compile(&image).await let context = OciProgressContext::new(sender);
let compiler = ImageCompiler::new(image_cache, None, context)?;
compiler.compile(id, &image).await
} }
async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result<Ipv4Addr> { async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result<Ipv4Addr> {

View File

@ -17,7 +17,7 @@ use self::{
autoloop::AutoLoop, autoloop::AutoLoop,
launch::{GuestLaunchRequest, GuestLauncher}, launch::{GuestLaunchRequest, GuestLauncher},
}; };
use krataoci::cache::ImageCache; use krataoci::{cache::ImageCache, progress::OciProgressContext};
pub mod autoloop; pub mod autoloop;
pub mod cfgblk; pub mod cfgblk;
@ -51,6 +51,7 @@ pub struct GuestInfo {
#[derive(Clone)] #[derive(Clone)]
pub struct RuntimeContext { pub struct RuntimeContext {
pub oci_progress_context: OciProgressContext,
pub image_cache: ImageCache, pub image_cache: ImageCache,
pub autoloop: AutoLoop, pub autoloop: AutoLoop,
pub xen: XenClient, pub xen: XenClient,
@ -59,7 +60,7 @@ pub struct RuntimeContext {
} }
impl RuntimeContext { impl RuntimeContext {
pub async fn new(store: String) -> Result<Self> { pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result<Self> {
let mut image_cache_path = PathBuf::from(&store); let mut image_cache_path = PathBuf::from(&store);
image_cache_path.push("cache"); image_cache_path.push("cache");
fs::create_dir_all(&image_cache_path)?; fs::create_dir_all(&image_cache_path)?;
@ -72,6 +73,7 @@ impl RuntimeContext {
let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?; let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?;
Ok(RuntimeContext { Ok(RuntimeContext {
oci_progress_context,
image_cache, image_cache,
autoloop: AutoLoop::new(LoopControl::open()?), autoloop: AutoLoop::new(LoopControl::open()?),
xen, xen,
@ -252,15 +254,17 @@ impl RuntimeContext {
#[derive(Clone)] #[derive(Clone)]
pub struct Runtime { pub struct Runtime {
oci_progress_context: OciProgressContext,
store: Arc<String>, store: Arc<String>,
context: RuntimeContext, context: RuntimeContext,
launch_semaphore: Arc<Semaphore>, launch_semaphore: Arc<Semaphore>,
} }
impl Runtime { impl Runtime {
pub async fn new(store: String) -> Result<Self> { pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result<Self> {
let context = RuntimeContext::new(store.clone()).await?; let context = RuntimeContext::new(oci_progress_context.clone(), store.clone()).await?;
Ok(Self { Ok(Self {
oci_progress_context,
store: Arc::new(store), store: Arc::new(store),
context, context,
launch_semaphore: Arc::new(Semaphore::new(1)), launch_semaphore: Arc::new(Semaphore::new(1)),
@ -324,7 +328,7 @@ impl Runtime {
} }
pub async fn dupe(&self) -> Result<Runtime> { pub async fn dupe(&self) -> Result<Runtime> {
Runtime::new((*self.store).clone()).await Runtime::new(self.oci_progress_context.clone(), (*self.store).clone()).await
} }
} }