fix(zone-exec): ensure that the underlying process is killed when rpc is closed (#361)

This commit is contained in:
Alex Zenla 2024-08-25 00:07:37 -07:00 committed by GitHub
parent 41aa1aa707
commit 96ccbd50bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 49 additions and 10 deletions

1
Cargo.lock generated
View File

@ -1622,6 +1622,7 @@ dependencies = [
"sys-mount",
"sysinfo",
"tokio",
"tokio-util",
]
[[package]]

View File

@ -85,6 +85,7 @@ sysinfo = "0.31.2"
termtree = "0.5.1"
thiserror = "1.0"
tokio-tun = "0.11.5"
tokio-util = "0.7.11"
toml = "0.8.19"
tonic-build = "0.12.1"
tower = "0.5.0"

View File

@ -16,6 +16,7 @@ use kratart::Runtime;
use log::{debug, info};
use reconcile::zone::ZoneReconciler;
use std::path::Path;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use tokio::{
fs,
@ -208,6 +209,8 @@ impl Daemon {
server = server.tls_config(tls_config)?;
}
server = server.http2_keepalive_interval(Some(Duration::from_secs(10)));
let server = server.add_service(ControlServiceServer::new(control_service));
info!("listening on address {}", addr);
match addr {

View File

@ -495,6 +495,7 @@ impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
IdmTransportPacketForm::StreamRequestClosed => {
let mut update_streams = request_update_streams.lock().await;
update_streams.remove(&packet.id);
println!("stream request closed: {}", packet.id);
}
IdmTransportPacketForm::StreamResponseUpdate => {

View File

@ -29,6 +29,7 @@ serde_json = { workspace = true }
sys-mount = { workspace = true }
sysinfo = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
[lib]
name = "kratazone"

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, process::Stdio};
use crate::childwait::ChildWait;
use anyhow::{anyhow, Result};
use krata::idm::{
client::IdmClientStreamResponseHandle,
@ -16,9 +17,9 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
select,
};
use crate::childwait::ChildWait;
use tokio_util::sync::CancellationToken;
pub struct ZoneExecTask {
pub wait: ChildWait,
@ -111,9 +112,12 @@ impl ZoneExecTask {
}
});
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break;
};
@ -136,15 +140,28 @@ impl ZoneExecTask {
});
code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
select! {
result = wait_subscription.recv() => match result {
Ok(event) => {
if event.pid.as_raw() as u32 == pid {
child.kill = false;
break event.status;
}
}
_ => {
child.inner.start_kill()?;
child.kill = false;
break -1;
}
},
_ = cancel.cancelled() => {
child.inner.start_kill()?;
child.kill = false;
break -1;
}
}
};
child.kill = false;
let _ = join!(pty_read_task);
stdin_task.abort();
} else {
@ -221,9 +238,12 @@ impl ZoneExecTask {
}
});
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break;
};
@ -247,9 +267,21 @@ impl ZoneExecTask {
});
code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
select! {
result = wait_subscription.recv() => match result {
Ok(event) => {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
_ => {
child.start_kill()?;
break -1;
}
},
_ = cancel.cancelled() => {
child.start_kill()?;
break -1;
}
}
};