feat: implement oci image progress (#64)

* feat: oci progress events

* feat: oci progress bars on launch
This commit is contained in:
Alex Zenla 2024-04-12 11:09:26 -07:00 committed by GitHub
parent 6cef03bffa
commit 6d07112e3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 630 additions and 159 deletions

55
Cargo.lock generated
View File

@ -421,6 +421,19 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "console"
version = "0.15.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb"
dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"unicode-width",
"windows-sys 0.52.0",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.6"
@ -693,6 +706,12 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4445909572dbd556c457c849c4ca58623d84b27c8fff1e74b0b4227d8b90d17b"
[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "env_filter"
version = "0.1.0"
@ -1223,6 +1242,28 @@ dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "indicatif"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3"
dependencies = [
"console",
"instant",
"number_prefix",
"portable-atomic",
"unicode-width",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@ -1321,6 +1362,7 @@ dependencies = [
"env_logger",
"fancy-duration",
"human_bytes",
"indicatif",
"krata",
"log",
"prost-reflect",
@ -1347,6 +1389,7 @@ dependencies = [
"env_logger",
"futures",
"krata",
"krata-oci",
"krata-runtime",
"log",
"prost",
@ -1801,6 +1844,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.32.2"
@ -1945,6 +1994,12 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "portable-atomic"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
[[package]]
name = "ppv-lite86"
version = "0.2.17"

View File

@ -42,6 +42,7 @@ fancy-duration = "0.9.2"
flate2 = "1.0"
futures = "0.3.30"
human_bytes = "0.4"
indicatif = "0.17.8"
ipnetwork = "0.20.0"
libc = "0.2"
log = "0.4.20"

View File

@ -18,6 +18,7 @@ ctrlc = { workspace = true, features = ["termination"] }
env_logger = { workspace = true }
fancy-duration = { workspace = true }
human_bytes = { workspace = true }
indicatif = { workspace = true }
krata = { path = "../krata", version = "^0.0.8" }
log = { workspace = true }
prost-reflect = { workspace = true, features = ["serde"] }

View File

@ -52,32 +52,30 @@ impl DestroyCommand {
async fn wait_guest_destroyed(id: &str, events: EventStream) -> Result<()> {
let mut stream = events.subscribe();
while let Ok(event) = stream.recv().await {
match event {
Event::GuestChanged(changed) => {
let Some(guest) = changed.guest else {
continue;
};
if let Event::GuestChanged(changed) = event {
let Some(guest) = changed.guest else {
continue;
};
if guest.id != id {
continue;
if guest.id != id {
continue;
}
let Some(state) = guest.state else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == GuestStatus::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
error!("guest error: {}", error.message);
}
}
let Some(state) = guest.state else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == GuestStatus::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
error!("guest error: {}", error.message);
}
}
if state.status() == GuestStatus::Destroyed {
std::process::exit(0);
}
if state.status() == GuestStatus::Destroyed {
std::process::exit(0);
}
}
}

View File

@ -2,6 +2,7 @@ use std::collections::HashMap;
use anyhow::Result;
use clap::Parser;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use krata::{
events::EventStream,
v1::{
@ -11,7 +12,7 @@ use krata::{
},
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
CreateGuestRequest,
CreateGuestRequest, OciProgressEventLayerPhase, OciProgressEventPhase,
},
},
};
@ -125,9 +126,14 @@ impl LauchCommand {
async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> {
let mut stream = events.subscribe();
let mut multi_progress: Option<(MultiProgress, HashMap<String, ProgressBar>)> = None;
while let Ok(event) = stream.recv().await {
match event {
Event::GuestChanged(changed) => {
if let Some((multi_progress, _)) = multi_progress.as_mut() {
let _ = multi_progress.clear();
}
let Some(guest) = changed.guest else {
continue;
};
@ -158,6 +164,84 @@ async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> {
break;
}
}
Event::OciProgress(oci) => {
if multi_progress.is_none() {
multi_progress = Some((MultiProgress::new(), HashMap::new()));
}
let Some((multi_progress, progresses)) = multi_progress.as_mut() else {
continue;
};
match oci.phase() {
OciProgressEventPhase::Resolved
| OciProgressEventPhase::ConfigAcquire
| OciProgressEventPhase::LayerAcquire => {
if progresses.is_empty() && !oci.layers.is_empty() {
for layer in &oci.layers {
let bar = ProgressBar::new(layer.total);
bar.set_style(
ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}")
.unwrap(),
);
progresses.insert(layer.id.clone(), bar.clone());
multi_progress.add(bar);
}
}
for layer in oci.layers {
let Some(progress) = progresses.get_mut(&layer.id) else {
continue;
};
let phase = match layer.phase() {
OciProgressEventLayerPhase::Waiting => "waiting",
OciProgressEventLayerPhase::Downloading => "downloading",
OciProgressEventLayerPhase::Downloaded => "downloaded",
OciProgressEventLayerPhase::Extracting => "extracting",
OciProgressEventLayerPhase::Extracted => "extracted",
_ => "unknown",
};
progress.set_message(format!("{} {}", layer.id, phase));
progress.set_length(layer.total);
progress.set_position(layer.value);
}
}
OciProgressEventPhase::Packing => {
for (key, progress) in &mut *progresses {
if key == "packing" {
continue;
}
progress.finish_and_clear();
multi_progress.remove(progress);
}
progresses.retain(|k, _| k == "packing");
if progresses.is_empty() {
let progress = ProgressBar::new(100);
progress.set_style(
ProgressStyle::with_template("{msg} {wide_bar} {pos}/{len}")
.unwrap(),
);
progresses.insert("packing".to_string(), progress);
}
let Some(progress) = progresses.get("packing") else {
continue;
};
progress.set_message("packing image");
progress.set_length(oci.total);
progress.set_position(oci.value);
}
_ => {}
}
for progress in progresses {
progress.1.tick();
}
}
}
}
Ok(())

View File

@ -28,11 +28,10 @@ impl WatchCommand {
let mut stream = events.subscribe();
loop {
let event = stream.recv().await?;
match event {
Event::GuestChanged(changed) => {
let guest = changed.guest.clone();
self.print_event("guest.changed", changed, guest)?;
}
if let Event::GuestChanged(changed) = event {
let guest = changed.guest.clone();
self.print_event("guest.changed", changed, guest)?;
}
}
}

View File

@ -69,28 +69,26 @@ impl StdioConsoleStream {
Ok(tokio::task::spawn(async move {
let mut stream = events.subscribe();
while let Ok(event) = stream.recv().await {
match event {
Event::GuestChanged(changed) => {
let Some(guest) = changed.guest else {
continue;
};
if let Event::GuestChanged(changed) = event {
let Some(guest) = changed.guest else {
continue;
};
let Some(state) = guest.state else {
continue;
};
let Some(state) = guest.state else {
continue;
};
if guest.id != id {
continue;
}
if guest.id != id {
continue;
}
if let Some(exit_info) = state.exit_info {
return Some(exit_info.code);
}
if let Some(exit_info) = state.exit_info {
return Some(exit_info.code);
}
let status = state.status();
if status == GuestStatus::Destroying || status == GuestStatus::Destroyed {
return Some(10);
}
let status = state.status();
if status == GuestStatus::Destroying || status == GuestStatus::Destroyed {
return Some(10);
}
}
}

View File

@ -18,6 +18,7 @@ clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
krata = { path = "../krata", version = "^0.0.8" }
krata-oci = { path = "../oci", version = "^0.0.8" }
krata-runtime = { path = "../runtime", version = "^0.0.8" }
log = { workspace = true }
prost = { workspace = true }

View File

@ -3,7 +3,6 @@ use clap::Parser;
use env_logger::Env;
use krata::dial::ControlDialAddress;
use kratad::Daemon;
use kratart::Runtime;
use log::LevelFilter;
use std::{
str::FromStr,
@ -27,8 +26,8 @@ async fn main() -> Result<()> {
let args = DaemonCommand::parse();
let addr = ControlDialAddress::from_str(&args.listen)?;
let runtime = Runtime::new(args.store.clone()).await?;
let mut daemon = Daemon::new(args.store.clone(), runtime).await?;
let mut daemon = Daemon::new(args.store.clone()).await?;
daemon.listen(addr).await?;
Ok(())
}

View File

@ -9,6 +9,7 @@ use krata::{
idm::protocol::{idm_event::Event, IdmEvent},
v1::common::{GuestExitInfo, GuestState, GuestStatus},
};
use krataoci::progress::OciProgress;
use log::{error, warn};
use tokio::{
select,
@ -21,7 +22,7 @@ use tokio::{
};
use uuid::Uuid;
use crate::{db::GuestStore, idm::DaemonIdmHandle};
use crate::{db::GuestStore, idm::DaemonIdmHandle, oci::convert_oci_progress};
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
@ -52,7 +53,8 @@ pub struct DaemonEventGenerator {
idms: HashMap<u32, (Uuid, JoinHandle<()>)>,
idm_sender: Sender<(u32, IdmEvent)>,
idm_receiver: Receiver<(u32, IdmEvent)>,
_event_sender: broadcast::Sender<DaemonEvent>,
oci_progress_receiver: broadcast::Receiver<OciProgress>,
event_sender: broadcast::Sender<DaemonEvent>,
}
impl DaemonEventGenerator {
@ -60,6 +62,7 @@ impl DaemonEventGenerator {
guests: GuestStore,
guest_reconciler_notify: Sender<Uuid>,
idm: DaemonIdmHandle,
oci_progress_receiver: broadcast::Receiver<OciProgress>,
) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN);
@ -71,55 +74,54 @@ impl DaemonEventGenerator {
idms: HashMap::new(),
idm_sender,
idm_receiver,
_event_sender: sender.clone(),
oci_progress_receiver,
event_sender: sender.clone(),
};
let context = DaemonEventContext { sender };
Ok((context, generator))
}
async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> {
match event {
DaemonEvent::GuestChanged(changed) => {
let Some(ref guest) = changed.guest else {
return Ok(());
};
if let DaemonEvent::GuestChanged(changed) = event {
let Some(ref guest) = changed.guest else {
return Ok(());
};
let Some(ref state) = guest.state else {
return Ok(());
};
let Some(ref state) = guest.state else {
return Ok(());
};
let status = state.status();
let id = Uuid::from_str(&guest.id)?;
let domid = state.domid;
match status {
GuestStatus::Started => {
if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client(domid).await?;
let mut receiver = client.subscribe().await?;
let sender = self.idm_sender.clone();
let task = tokio::task::spawn(async move {
loop {
let Ok(event) = receiver.recv().await else {
break;
};
let status = state.status();
let id = Uuid::from_str(&guest.id)?;
let domid = state.domid;
match status {
GuestStatus::Started => {
if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client(domid).await?;
let mut receiver = client.subscribe().await?;
let sender = self.idm_sender.clone();
let task = tokio::task::spawn(async move {
loop {
let Ok(event) = receiver.recv().await else {
break;
};
if let Err(error) = sender.send((domid, event)).await {
warn!("unable to deliver idm event: {}", error);
}
if let Err(error) = sender.send((domid, event)).await {
warn!("unable to deliver idm event: {}", error);
}
});
e.insert((id, task));
}
}
});
e.insert((id, task));
}
GuestStatus::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
}
_ => {}
}
GuestStatus::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
}
_ => {}
}
}
Ok(())
@ -148,6 +150,17 @@ impl DaemonEventGenerator {
Ok(())
}
async fn handle_oci_progress_event(&mut self, progress: OciProgress) -> Result<()> {
let Some(_) = Uuid::from_str(&progress.id).ok() else {
return Ok(());
};
let event = convert_oci_progress(progress);
self.event_sender.send(DaemonEvent::OciProgress(event))?;
Ok(())
}
async fn evaluate(&mut self) -> Result<()> {
select! {
x = self.idm_receiver.recv() => match x {
@ -168,6 +181,14 @@ impl DaemonEventGenerator {
Err(error) => {
Err(error.into())
}
},
x = self.oci_progress_receiver.recv() => match x {
Ok(event) => {
self.handle_oci_progress_event(event).await
},
Err(error) => {
Err(error.into())
}
}
}
}

View File

@ -7,12 +7,16 @@ use db::GuestStore;
use event::{DaemonEventContext, DaemonEventGenerator};
use idm::{DaemonIdm, DaemonIdmHandle};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use krataoci::progress::OciProgressContext;
use kratart::Runtime;
use log::info;
use reconcile::guest::GuestReconciler;
use tokio::{
net::UnixListener,
sync::mpsc::{channel, Sender},
sync::{
broadcast,
mpsc::{channel, Sender},
},
task::JoinHandle,
};
use tokio_stream::wrappers::UnixListenerStream;
@ -25,6 +29,7 @@ pub mod db;
pub mod event;
pub mod idm;
pub mod metrics;
pub mod oci;
pub mod reconcile;
pub struct Daemon {
@ -39,9 +44,14 @@ pub struct Daemon {
}
const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;
const OCI_PROGRESS_QUEUE_LEN: usize = 1000;
impl Daemon {
pub async fn new(store: String, runtime: Runtime) -> Result<Self> {
pub async fn new(store: String) -> Result<Self> {
let (oci_progress_sender, oci_progress_receiver) =
broadcast::channel(OCI_PROGRESS_QUEUE_LEN);
let runtime =
Runtime::new(OciProgressContext::new(oci_progress_sender), store.clone()).await?;
let guests_db_path = format!("{}/guests.db", store);
let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
let (guest_reconciler_notify, guest_reconciler_receiver) =
@ -50,9 +60,13 @@ impl Daemon {
let idm = idm.launch().await?;
let console = DaemonConsole::new().await?;
let console = console.launch().await?;
let (events, generator) =
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
.await?;
let (events, generator) = DaemonEventGenerator::new(
guests.clone(),
guest_reconciler_notify.clone(),
idm.clone(),
oci_progress_receiver,
)
.await?;
let runtime_for_reconciler = runtime.dupe().await?;
let guest_reconciler = GuestReconciler::new(
guests.clone(),

42
crates/daemon/src/oci.rs Normal file
View File

@ -0,0 +1,42 @@
use krata::v1::control::{
OciProgressEvent, OciProgressEventLayer, OciProgressEventLayerPhase, OciProgressEventPhase,
};
use krataoci::progress::{OciProgress, OciProgressLayer, OciProgressLayerPhase, OciProgressPhase};
fn convert_oci_layer_progress(layer: OciProgressLayer) -> OciProgressEventLayer {
OciProgressEventLayer {
id: layer.id,
phase: match layer.phase {
OciProgressLayerPhase::Waiting => OciProgressEventLayerPhase::Waiting,
OciProgressLayerPhase::Downloading => OciProgressEventLayerPhase::Downloading,
OciProgressLayerPhase::Downloaded => OciProgressEventLayerPhase::Downloaded,
OciProgressLayerPhase::Extracting => OciProgressEventLayerPhase::Extracting,
OciProgressLayerPhase::Extracted => OciProgressEventLayerPhase::Extracted,
}
.into(),
value: layer.value,
total: layer.total,
}
}
pub fn convert_oci_progress(oci: OciProgress) -> OciProgressEvent {
OciProgressEvent {
guest_id: oci.id,
phase: match oci.phase {
OciProgressPhase::Resolving => OciProgressEventPhase::Resolving,
OciProgressPhase::Resolved => OciProgressEventPhase::Resolved,
OciProgressPhase::ConfigAcquire => OciProgressEventPhase::ConfigAcquire,
OciProgressPhase::LayerAcquire => OciProgressEventPhase::LayerAcquire,
OciProgressPhase::Packing => OciProgressEventPhase::Packing,
OciProgressPhase::Complete => OciProgressEventPhase::Complete,
}
.into(),
layers: oci
.layers
.into_values()
.map(convert_oci_layer_progress)
.collect::<Vec<_>>(),
value: oci.value,
total: oci.total,
}
}

View File

@ -61,6 +61,7 @@ message WatchEventsRequest {}
message WatchEventsReply {
oneof event {
GuestChangedEvent guest_changed = 1;
OciProgressEvent oci_progress = 2;
}
}
@ -68,6 +69,40 @@ message GuestChangedEvent {
krata.v1.common.Guest guest = 1;
}
enum OciProgressEventLayerPhase {
OCI_PROGRESS_EVENT_LAYER_PHASE_UNKNOWN = 0;
OCI_PROGRESS_EVENT_LAYER_PHASE_WAITING = 1;
OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADING = 2;
OCI_PROGRESS_EVENT_LAYER_PHASE_DOWNLOADED = 3;
OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTING = 4;
OCI_PROGRESS_EVENT_LAYER_PHASE_EXTRACTED = 5;
}
message OciProgressEventLayer {
string id = 1;
OciProgressEventLayerPhase phase = 2;
uint64 value = 3;
uint64 total = 4;
}
enum OciProgressEventPhase {
OCI_PROGRESS_EVENT_PHASE_UNKNOWN = 0;
OCI_PROGRESS_EVENT_PHASE_RESOLVING = 1;
OCI_PROGRESS_EVENT_PHASE_RESOLVED = 2;
OCI_PROGRESS_EVENT_PHASE_CONFIG_ACQUIRE = 3;
OCI_PROGRESS_EVENT_PHASE_LAYER_ACQUIRE = 4;
OCI_PROGRESS_EVENT_PHASE_PACKING = 5;
OCI_PROGRESS_EVENT_PHASE_COMPLETE = 6;
}
message OciProgressEvent {
string guest_id = 1;
OciProgressEventPhase phase = 2;
repeated OciProgressEventLayer layers = 3;
uint64 value = 4;
uint64 total = 5;
}
message ReadGuestMetricsRequest {
string guest_id = 1;
}

View File

@ -1 +1,2 @@
#![allow(clippy::all)]
tonic::include_proto!("krata.v1.common");

View File

@ -1 +1,2 @@
#![allow(clippy::all)]
tonic::include_proto!("krata.v1.control");

View File

@ -179,6 +179,8 @@ impl AutoNetworkWatcher {
break;
},
Ok(_) => {},
Err(error) => {
warn!("failed to receive event: {}", error);
}

View File

@ -2,8 +2,10 @@ use std::{env::args, path::PathBuf};
use anyhow::Result;
use env_logger::Env;
use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName};
use tokio::fs;
use krataoci::{
cache::ImageCache, compiler::ImageCompiler, name::ImageName, progress::OciProgressContext,
};
use tokio::{fs, sync::broadcast};
#[tokio::main]
async fn main() -> Result<()> {
@ -18,8 +20,18 @@ async fn main() -> Result<()> {
}
let cache = ImageCache::new(&cache_dir)?;
let compiler = ImageCompiler::new(&cache, seed)?;
let info = compiler.compile(&image).await?;
let (sender, mut receiver) = broadcast::channel(1000);
tokio::task::spawn(async move {
loop {
let Some(_) = receiver.recv().await.ok() else {
break;
};
}
});
let context = OciProgressContext::new(sender);
let compiler = ImageCompiler::new(&cache, seed, context)?;
let info = compiler.compile(&image.to_string(), &image).await?;
println!(
"generated squashfs of {} to {}",
image,

View File

@ -1,6 +1,7 @@
use crate::cache::ImageCache;
use crate::fetch::{OciImageDownloader, OciImageLayer};
use crate::name::ImageName;
use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase};
use crate::registry::OciRegistryPlatform;
use anyhow::{anyhow, Result};
use backhand::compression::Compressor;
@ -8,6 +9,7 @@ use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader};
use log::{debug, trace, warn};
use oci_spec::image::{ImageConfiguration, ImageManifest};
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufWriter, ErrorKind, Read};
use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt};
@ -45,14 +47,23 @@ impl ImageInfo {
pub struct ImageCompiler<'a> {
cache: &'a ImageCache,
seed: Option<PathBuf>,
progress: OciProgressContext,
}
impl ImageCompiler<'_> {
pub fn new(cache: &ImageCache, seed: Option<PathBuf>) -> Result<ImageCompiler> {
Ok(ImageCompiler { cache, seed })
pub fn new(
cache: &ImageCache,
seed: Option<PathBuf>,
progress: OciProgressContext,
) -> Result<ImageCompiler> {
Ok(ImageCompiler {
cache,
seed,
progress,
})
}
pub async fn compile(&self, image: &ImageName) -> Result<ImageInfo> {
pub async fn compile(&self, id: &str, image: &ImageName) -> Result<ImageInfo> {
debug!("compile image={image}");
let mut tmp_dir = std::env::temp_dir().clone();
tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4()));
@ -68,7 +79,7 @@ impl ImageCompiler<'_> {
let mut squash_file = tmp_dir.clone();
squash_file.push("image.squashfs");
let info = self
.download_and_compile(image, &layer_dir, &image_dir, &squash_file)
.download_and_compile(id, image, &layer_dir, &image_dir, &squash_file)
.await?;
fs::remove_dir_all(&tmp_dir).await?;
Ok(info)
@ -76,15 +87,25 @@ impl ImageCompiler<'_> {
async fn download_and_compile(
&self,
id: &str,
image: &ImageName,
layer_dir: &Path,
image_dir: &Path,
squash_file: &Path,
) -> Result<ImageInfo> {
let mut progress = OciProgress {
id: id.to_string(),
phase: OciProgressPhase::Resolving,
layers: BTreeMap::new(),
value: 0,
total: 0,
};
self.progress.update(&progress);
let downloader = OciImageDownloader::new(
self.seed.clone(),
layer_dir.to_path_buf(),
OciRegistryPlatform::current(),
self.progress.clone(),
);
let resolved = downloader.resolve(image.clone()).await?;
let cache_key = format!(
@ -93,28 +114,44 @@ impl ImageCompiler<'_> {
);
let cache_digest = sha256::digest(cache_key);
progress.phase = OciProgressPhase::Complete;
self.progress.update(&progress);
if let Some(cached) = self.cache.recall(&cache_digest).await? {
return Ok(cached);
}
let local = downloader.download(resolved).await?;
progress.phase = OciProgressPhase::Resolved;
for layer in resolved.manifest.layers() {
progress.add_layer(layer.digest());
}
self.progress.update(&progress);
let local = downloader.download(resolved, &mut progress).await?;
for layer in &local.layers {
debug!(
"process layer digest={} compression={:?}",
&layer.digest, layer.compression,
);
let whiteouts = self.process_layer_whiteout(layer, image_dir).await?;
progress.extracting_layer(&layer.digest, 0, 0);
self.progress.update(&progress);
let (whiteouts, count) = self.process_layer_whiteout(layer, image_dir).await?;
progress.extracting_layer(&layer.digest, 0, count);
self.progress.update(&progress);
debug!(
"process layer digest={} whiteouts={:?}",
&layer.digest, whiteouts
);
let mut archive = layer.archive().await?;
let mut entries = archive.entries()?;
let mut completed = 0;
while let Some(entry) = entries.next().await {
let mut entry = entry?;
let path = entry.path()?;
let mut maybe_whiteout_path_str =
path.to_str().map(|x| x.to_string()).unwrap_or_default();
progress.extracting_layer(&layer.digest, completed, count);
completed += 1;
self.progress.update(&progress);
if whiteouts.contains(&maybe_whiteout_path_str) {
continue;
}
@ -123,10 +160,10 @@ impl ImageCompiler<'_> {
continue;
}
let Some(name) = path.file_name() else {
return Err(anyhow!("unable to get file name"));
continue;
};
let Some(name) = name.to_str() else {
return Err(anyhow!("unable to get file name as string"));
continue;
};
if name.starts_with(".wh.") {
@ -136,6 +173,8 @@ impl ImageCompiler<'_> {
.await?;
}
}
progress.extracted_layer(&layer.digest);
self.progress.update(&progress);
}
for layer in &local.layers {
@ -144,31 +183,51 @@ impl ImageCompiler<'_> {
}
}
self.squash(image_dir, squash_file)?;
let image_dir_squash = image_dir.to_path_buf();
let squash_file_squash = squash_file.to_path_buf();
let progress_squash = progress.clone();
let progress_context = self.progress.clone();
progress = tokio::task::spawn_blocking(move || {
ImageCompiler::squash(
&image_dir_squash,
&squash_file_squash,
progress_squash,
progress_context,
)
})
.await??;
let info = ImageInfo::new(
squash_file.to_path_buf(),
local.image.manifest,
local.config,
)?;
self.cache.store(&cache_digest, &info).await
let info = self.cache.store(&cache_digest, &info).await?;
progress.phase = OciProgressPhase::Complete;
progress.value = 0;
progress.total = 0;
self.progress.update(&progress);
Ok(info)
}
async fn process_layer_whiteout(
&self,
layer: &OciImageLayer,
image_dir: &Path,
) -> Result<Vec<String>> {
) -> Result<(Vec<String>, usize)> {
let mut whiteouts = Vec::new();
let mut archive = layer.archive().await?;
let mut entries = archive.entries()?;
let mut count = 0usize;
while let Some(entry) = entries.next().await {
let entry = entry?;
count += 1;
let path = entry.path()?;
let Some(name) = path.file_name() else {
return Err(anyhow!("unable to get file name"));
continue;
};
let Some(name) = name.to_str() else {
return Err(anyhow!("unable to get file name as string"));
continue;
};
if name.starts_with(".wh.") {
@ -180,7 +239,7 @@ impl ImageCompiler<'_> {
}
}
}
Ok(whiteouts)
Ok((whiteouts, count))
}
async fn process_whiteout_entry(
@ -300,7 +359,16 @@ impl ImageCompiler<'_> {
Ok(())
}
fn squash(&self, image_dir: &Path, squash_file: &Path) -> Result<()> {
fn squash(
image_dir: &Path,
squash_file: &Path,
mut progress: OciProgress,
progress_context: OciProgressContext,
) -> Result<OciProgress> {
progress.phase = OciProgressPhase::Packing;
progress.total = 2;
progress.value = 0;
progress_context.update(&progress);
let mut writer = FilesystemWriter::default();
writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?);
let walk = WalkDir::new(image_dir).follow_links(false);
@ -358,6 +426,10 @@ impl ImageCompiler<'_> {
}
}
progress.phase = OciProgressPhase::Packing;
progress.value = 1;
progress_context.update(&progress);
let squash_file_path = squash_file
.to_str()
.ok_or_else(|| anyhow!("failed to convert squashfs string"))?;
@ -367,7 +439,10 @@ impl ImageCompiler<'_> {
trace!("squash generate: {}", squash_file_path);
writer.write(&mut bufwrite)?;
std::fs::remove_dir_all(image_dir)?;
Ok(())
progress.phase = OciProgressPhase::Packing;
progress.value = 2;
progress_context.update(&progress);
Ok(progress)
}
}

View File

@ -1,3 +1,5 @@
use crate::progress::{OciProgress, OciProgressContext, OciProgressPhase};
use super::{
name::ImageName,
registry::{OciRegistryClient, OciRegistryPlatform},
@ -26,6 +28,7 @@ pub struct OciImageDownloader {
seed: Option<PathBuf>,
storage: PathBuf,
platform: OciRegistryPlatform,
progress: OciProgressContext,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -79,11 +82,13 @@ impl OciImageDownloader {
seed: Option<PathBuf>,
storage: PathBuf,
platform: OciRegistryPlatform,
progress: OciProgressContext,
) -> OciImageDownloader {
OciImageDownloader {
seed,
storage,
platform,
progress,
}
}
@ -208,9 +213,15 @@ impl OciImageDownloader {
})
}
pub async fn download(&self, image: OciResolvedImage) -> Result<OciLocalImage> {
pub async fn download(
&self,
image: OciResolvedImage,
progress: &mut OciProgress,
) -> Result<OciLocalImage> {
let config: ImageConfiguration;
progress.phase = OciProgressPhase::ConfigAcquire;
self.progress.update(progress);
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
if let Some(seeded) = self
.load_seed_json_blob::<ImageConfiguration>(image.manifest.config())
@ -223,9 +234,18 @@ impl OciImageDownloader {
.await?;
config = serde_json::from_slice(&config_bytes)?;
}
progress.phase = OciProgressPhase::LayerAcquire;
self.progress.update(progress);
let mut layers = Vec::new();
for layer in image.manifest.layers() {
layers.push(self.acquire_layer(&image.name, layer, &mut client).await?);
progress.downloading_layer(layer.digest(), 0, layer.size() as usize);
self.progress.update(progress);
layers.push(
self.acquire_layer(&image.name, layer, &mut client, progress)
.await?,
);
progress.downloaded_layer(layer.digest());
self.progress.update(progress);
}
Ok(OciLocalImage {
image,
@ -239,6 +259,7 @@ impl OciImageDownloader {
image: &ImageName,
layer: &Descriptor,
client: &mut OciRegistryClient,
progress: &mut OciProgress,
) -> Result<OciImageLayer> {
debug!(
"acquire layer digest={} size={}",
@ -251,7 +272,15 @@ impl OciImageDownloader {
let seeded = self.extract_seed_blob(layer, &layer_path).await?;
if !seeded {
let file = File::create(&layer_path).await?;
let size = client.write_blob_to_file(&image.name, layer, file).await?;
let size = client
.write_blob_to_file(
&image.name,
layer,
file,
Some(progress),
Some(&self.progress),
)
.await?;
if layer.size() as u64 != size {
return Err(anyhow!(
"downloaded layer size differs from size in manifest",

View File

@ -2,4 +2,5 @@ pub mod cache;
pub mod compiler;
pub mod fetch;
pub mod name;
pub mod progress;
pub mod registry;

View File

@ -0,0 +1,98 @@
use std::collections::BTreeMap;
use tokio::sync::broadcast::Sender;
#[derive(Clone, Debug)]
pub struct OciProgress {
pub id: String,
pub phase: OciProgressPhase,
pub layers: BTreeMap<String, OciProgressLayer>,
pub value: u64,
pub total: u64,
}
impl OciProgress {
pub fn add_layer(&mut self, id: &str) {
self.layers.insert(
id.to_string(),
OciProgressLayer {
id: id.to_string(),
phase: OciProgressLayerPhase::Waiting,
value: 0,
total: 0,
},
);
}
pub fn downloading_layer(&mut self, id: &str, downloaded: usize, total: usize) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloading;
entry.value = downloaded as u64;
entry.total = total as u64;
}
}
pub fn downloaded_layer(&mut self, id: &str) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Downloaded;
entry.value = entry.total;
}
}
pub fn extracting_layer(&mut self, id: &str, extracted: usize, total: usize) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracting;
entry.value = extracted as u64;
entry.total = total as u64;
}
}
pub fn extracted_layer(&mut self, id: &str) {
if let Some(entry) = self.layers.get_mut(id) {
entry.phase = OciProgressLayerPhase::Extracted;
entry.value = entry.total;
}
}
}
#[derive(Clone, Debug)]
pub enum OciProgressPhase {
Resolving,
Resolved,
ConfigAcquire,
LayerAcquire,
Packing,
Complete,
}
#[derive(Clone, Debug)]
pub struct OciProgressLayer {
pub id: String,
pub phase: OciProgressLayerPhase,
pub value: u64,
pub total: u64,
}
#[derive(Clone, Debug)]
pub enum OciProgressLayerPhase {
Waiting,
Downloading,
Downloaded,
Extracting,
Extracted,
}
#[derive(Clone)]
pub struct OciProgressContext {
sender: Sender<OciProgress>,
}
impl OciProgressContext {
pub fn new(sender: Sender<OciProgress>) -> OciProgressContext {
OciProgressContext { sender }
}
pub fn update(&self, progress: &OciProgress) {
let _ = self.sender.send(progress.clone());
}
}

View File

@ -7,6 +7,8 @@ use reqwest::{Client, RequestBuilder, Response, StatusCode};
use tokio::{fs::File, io::AsyncWriteExt};
use url::Url;
use crate::progress::{OciProgress, OciProgressContext};
#[derive(Clone, Debug)]
pub struct OciRegistryPlatform {
pub os: Os,
@ -138,6 +140,8 @@ impl OciRegistryClient {
name: N,
descriptor: &Descriptor,
mut dest: File,
mut progress_handle: Option<&mut OciProgress>,
progress_context: Option<&OciProgressContext>,
) -> Result<u64> {
let url = self.url.join(&format!(
"/v2/{}/blobs/{}",
@ -146,9 +150,24 @@ impl OciRegistryClient {
))?;
let mut response = self.call(self.agent.get(url.as_str())).await?;
let mut size: u64 = 0;
let mut last_progress_size: u64 = 0;
while let Some(chunk) = response.chunk().await? {
dest.write_all(&chunk).await?;
size += chunk.len() as u64;
if (size - last_progress_size) > (5 * 1024 * 1024) {
last_progress_size = size;
if let Some(progress_handle) = progress_handle.as_mut() {
progress_handle.downloading_layer(
descriptor.digest(),
size as usize,
descriptor.size() as usize,
);
if let Some(progress_context) = progress_context.as_ref() {
progress_context.update(progress_handle);
}
}
}
}
Ok(size)
}

View File

@ -31,10 +31,6 @@ name = "kratart"
[dev-dependencies]
env_logger = { workspace = true }
[[example]]
name = "kratart-squashify"
path = "examples/squashify.rs"
[[example]]
name = "kratart-channel"
path = "examples/channel.rs"

View File

@ -1,29 +0,0 @@
use std::{env::args, path::PathBuf};
use anyhow::Result;
use env_logger::Env;
use krataoci::{cache::ImageCache, compiler::ImageCompiler, name::ImageName};
use tokio::fs;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let image = ImageName::parse(&args().nth(1).unwrap())?;
let seed = args().nth(2).map(PathBuf::from);
let cache_dir = PathBuf::from("krata-cache");
if !cache_dir.exists() {
fs::create_dir(&cache_dir).await?;
}
let cache = ImageCache::new(&cache_dir)?;
let compiler = ImageCompiler::new(&cache, seed)?;
let info = compiler.compile(&image).await?;
println!(
"generated squashfs of {} to {}",
image,
info.image_squashfs.to_string_lossy()
);
Ok(())
}

View File

@ -9,6 +9,7 @@ use ipnetwork::{IpNetwork, Ipv4Network};
use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
};
use krataoci::progress::OciProgressContext;
use tokio::sync::Semaphore;
use uuid::Uuid;
use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface};
@ -51,7 +52,14 @@ impl GuestLauncher {
) -> Result<GuestInfo> {
let uuid = request.uuid.unwrap_or_else(Uuid::new_v4);
let xen_name = format!("krata-{uuid}");
let image_info = self.compile(request.image, &context.image_cache).await?;
let image_info = self
.compile(
&uuid.to_string(),
request.image,
&context.image_cache,
&context.oci_progress_context,
)
.await?;
let mut gateway_mac = MacAddr6::random();
gateway_mac.set_local(true);
@ -243,10 +251,16 @@ impl GuestLauncher {
}
}
async fn compile(&self, image: &str, image_cache: &ImageCache) -> Result<ImageInfo> {
async fn compile(
&self,
id: &str,
image: &str,
image_cache: &ImageCache,
progress: &OciProgressContext,
) -> Result<ImageInfo> {
let image = ImageName::parse(image)?;
let compiler = ImageCompiler::new(image_cache, None)?;
compiler.compile(&image).await
let compiler = ImageCompiler::new(image_cache, None, progress.clone())?;
compiler.compile(id, &image).await
}
async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result<Ipv4Addr> {

View File

@ -17,7 +17,7 @@ use self::{
autoloop::AutoLoop,
launch::{GuestLaunchRequest, GuestLauncher},
};
use krataoci::cache::ImageCache;
use krataoci::{cache::ImageCache, progress::OciProgressContext};
pub mod autoloop;
pub mod cfgblk;
@ -51,6 +51,7 @@ pub struct GuestInfo {
#[derive(Clone)]
pub struct RuntimeContext {
pub oci_progress_context: OciProgressContext,
pub image_cache: ImageCache,
pub autoloop: AutoLoop,
pub xen: XenClient,
@ -59,7 +60,7 @@ pub struct RuntimeContext {
}
impl RuntimeContext {
pub async fn new(store: String) -> Result<Self> {
pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result<Self> {
let mut image_cache_path = PathBuf::from(&store);
image_cache_path.push("cache");
fs::create_dir_all(&image_cache_path)?;
@ -72,6 +73,7 @@ impl RuntimeContext {
let initrd = RuntimeContext::detect_guest_file(&store, "initrd")?;
Ok(RuntimeContext {
oci_progress_context,
image_cache,
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
@ -252,15 +254,17 @@ impl RuntimeContext {
#[derive(Clone)]
pub struct Runtime {
oci_progress_context: OciProgressContext,
store: Arc<String>,
context: RuntimeContext,
launch_semaphore: Arc<Semaphore>,
}
impl Runtime {
pub async fn new(store: String) -> Result<Self> {
let context = RuntimeContext::new(store.clone()).await?;
pub async fn new(oci_progress_context: OciProgressContext, store: String) -> Result<Self> {
let context = RuntimeContext::new(oci_progress_context.clone(), store.clone()).await?;
Ok(Self {
oci_progress_context,
store: Arc::new(store),
context,
launch_semaphore: Arc::new(Semaphore::new(1)),
@ -324,7 +328,7 @@ impl Runtime {
}
pub async fn dupe(&self) -> Result<Runtime> {
Runtime::new((*self.store).clone()).await
Runtime::new(self.oci_progress_context.clone(), (*self.store).clone()).await
}
}