mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-06 14:41:32 +00:00
feat: concurrent image pulls
This commit is contained in:
@ -23,6 +23,7 @@ krata-runtime = { path = "../runtime", version = "^0.0.9" }
|
||||
log = { workspace = true }
|
||||
prost = { workspace = true }
|
||||
redb = { workspace = true }
|
||||
scopeguard = { workspace = true }
|
||||
signal-hook = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
|
@ -18,7 +18,7 @@ use krata::{
|
||||
};
|
||||
use krataoci::{
|
||||
name::ImageName,
|
||||
packer::{service::OciPackerService, OciImagePacked, OciPackedFormat},
|
||||
packer::{service::OciPackerService, OciPackedFormat, OciPackedImage},
|
||||
progress::{OciProgress, OciProgressContext},
|
||||
};
|
||||
use std::{pin::Pin, str::FromStr};
|
||||
@ -90,8 +90,8 @@ enum ConsoleDataSelect {
|
||||
}
|
||||
|
||||
enum PullImageSelect {
|
||||
Progress(usize),
|
||||
Completed(Result<Result<OciImagePacked, anyhow::Error>, JoinError>),
|
||||
Progress(Option<OciProgress>),
|
||||
Completed(Result<Result<OciPackedImage, anyhow::Error>, JoinError>),
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
@ -367,32 +367,46 @@ impl ControlService for DaemonControlService {
|
||||
OciImageFormat::Erofs => OciPackedFormat::Erofs,
|
||||
OciImageFormat::Tar => OciPackedFormat::Tar,
|
||||
};
|
||||
let (sender, mut receiver) = channel::<OciProgress>(100);
|
||||
let context = OciProgressContext::new(sender);
|
||||
|
||||
let (context, mut receiver) = OciProgressContext::create();
|
||||
let our_packer = self.packer.clone();
|
||||
|
||||
let output = try_stream! {
|
||||
let mut task = tokio::task::spawn(async move {
|
||||
our_packer.request(name, format, context).await
|
||||
});
|
||||
let abort_handle = task.abort_handle();
|
||||
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
|
||||
handle.abort();
|
||||
});
|
||||
|
||||
loop {
|
||||
let mut progresses = Vec::new();
|
||||
let what = select! {
|
||||
x = receiver.recv_many(&mut progresses, 10) => PullImageSelect::Progress(x),
|
||||
x = receiver.recv() => PullImageSelect::Progress(x.ok()),
|
||||
x = &mut task => PullImageSelect::Completed(x),
|
||||
};
|
||||
match what {
|
||||
PullImageSelect::Progress(count) => {
|
||||
if count > 0 {
|
||||
let progress = progresses.remove(progresses.len() - 1);
|
||||
let reply = PullImageReply {
|
||||
progress: Some(convert_oci_progress(progress)),
|
||||
digest: String::new(),
|
||||
format: OciImageFormat::Unknown.into(),
|
||||
};
|
||||
yield reply;
|
||||
PullImageSelect::Progress(Some(mut progress)) => {
|
||||
let mut drain = 0;
|
||||
loop {
|
||||
if drain >= 10 {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(latest) = receiver.try_recv() {
|
||||
progress = latest;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
drain += 1;
|
||||
}
|
||||
|
||||
let reply = PullImageReply {
|
||||
progress: Some(convert_oci_progress(progress)),
|
||||
digest: String::new(),
|
||||
format: OciImageFormat::Unknown.into(),
|
||||
};
|
||||
yield reply;
|
||||
},
|
||||
|
||||
PullImageSelect::Completed(result) => {
|
||||
@ -414,6 +428,10 @@ impl ControlService for DaemonControlService {
|
||||
yield reply;
|
||||
break;
|
||||
},
|
||||
|
||||
_ => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
Reference in New Issue
Block a user