fix(idm): process all idm messages in the same frame and use childwait exit notification for exec (fixes #290) (#302)

This commit is contained in:
Alex Zenla
2024-08-05 17:29:09 -07:00
committed by GitHub
parent 62569f6c59
commit 224fdbe227
6 changed files with 79 additions and 69 deletions

View File

@ -1,6 +1,12 @@
use std::{collections::HashMap, process::Stdio};
use anyhow::{anyhow, Result};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
};
use krata::idm::{
client::IdmClientStreamResponseHandle,
internal::{
@ -9,13 +15,11 @@ use krata::idm::{
},
internal::{response::Response as ResponseType, Request, Response},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
};
use crate::childwait::ChildWait;
pub struct ZoneExecTask {
pub wait: ChildWait,
pub handle: IdmClientStreamResponseHandle<Request>,
}
@ -58,6 +62,7 @@ impl ZoneExecTask {
start.working_directory.clone()
};
let mut wait_subscription = self.wait.subscribe().await?;
let mut child = Command::new(exe)
.args(cmd)
.envs(env)
@ -69,6 +74,7 @@ impl ZoneExecTask {
.spawn()
.map_err(|error| anyhow!("failed to spawn: {}", error))?;
let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?;
let mut stdin = child
.stdin
.take()
@ -150,12 +156,19 @@ impl ZoneExecTask {
}
});
let exit = child.wait().await?;
let code = exit.code().unwrap_or(-1);
let _ = join!(stdout_task, stderr_task);
stdin_task.abort();
let data_task = tokio::task::spawn(async move {
let _ = join!(stdout_task, stderr_task);
stdin_task.abort();
});
let code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
};
data_task.await?;
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: true,