fix(zone-exec): ensure that the underlying process is killed when rpc is closed

This commit is contained in:
Alex Zenla 2024-08-24 23:51:48 -07:00
parent ec74bc8d2b
commit 53853a62b0
No known key found for this signature in database
GPG Key ID: 067B238899B51269
6 changed files with 49 additions and 10 deletions

1
Cargo.lock generated
View File

@ -1622,6 +1622,7 @@ dependencies = [
"sys-mount", "sys-mount",
"sysinfo", "sysinfo",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View File

@ -85,6 +85,7 @@ sysinfo = "0.31.2"
termtree = "0.5.1" termtree = "0.5.1"
thiserror = "1.0" thiserror = "1.0"
tokio-tun = "0.11.5" tokio-tun = "0.11.5"
tokio-util = "0.7.11"
toml = "0.8.19" toml = "0.8.19"
tonic-build = "0.12.1" tonic-build = "0.12.1"
tower = "0.5.0" tower = "0.5.0"

View File

@ -16,6 +16,7 @@ use kratart::Runtime;
use log::{debug, info}; use log::{debug, info};
use reconcile::zone::ZoneReconciler; use reconcile::zone::ZoneReconciler;
use std::path::Path; use std::path::Path;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc}; use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use tokio::{ use tokio::{
fs, fs,
@ -208,6 +209,8 @@ impl Daemon {
server = server.tls_config(tls_config)?; server = server.tls_config(tls_config)?;
} }
server = server.http2_keepalive_interval(Some(Duration::from_secs(10)));
let server = server.add_service(ControlServiceServer::new(control_service)); let server = server.add_service(ControlServiceServer::new(control_service));
info!("listening on address {}", addr); info!("listening on address {}", addr);
match addr { match addr {

View File

@ -495,6 +495,7 @@ impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
IdmTransportPacketForm::StreamRequestClosed => { IdmTransportPacketForm::StreamRequestClosed => {
let mut update_streams = request_update_streams.lock().await; let mut update_streams = request_update_streams.lock().await;
update_streams.remove(&packet.id); update_streams.remove(&packet.id);
println!("stream request closed: {}", packet.id);
} }
IdmTransportPacketForm::StreamResponseUpdate => { IdmTransportPacketForm::StreamResponseUpdate => {

View File

@ -29,6 +29,7 @@ serde_json = { workspace = true }
sys-mount = { workspace = true } sys-mount = { workspace = true }
sysinfo = { workspace = true } sysinfo = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-util = { workspace = true }
[lib] [lib]
name = "kratazone" name = "kratazone"

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, process::Stdio}; use std::{collections::HashMap, process::Stdio};
use crate::childwait::ChildWait;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use krata::idm::{ use krata::idm::{
client::IdmClientStreamResponseHandle, client::IdmClientStreamResponseHandle,
@ -16,9 +17,9 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
join, join,
process::Command, process::Command,
select,
}; };
use tokio_util::sync::CancellationToken;
use crate::childwait::ChildWait;
pub struct ZoneExecTask { pub struct ZoneExecTask {
pub wait: ChildWait, pub wait: ChildWait,
@ -111,9 +112,12 @@ impl ZoneExecTask {
} }
}); });
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move { let stdin_task = tokio::task::spawn(async move {
loop { loop {
let Some(request) = receiver.recv().await else { let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break; break;
}; };
@ -136,15 +140,28 @@ impl ZoneExecTask {
}); });
code = loop { code = loop {
if let Ok(event) = wait_subscription.recv().await { select! {
if event.pid.as_raw() as u32 == pid { result = wait_subscription.recv() => match result {
break event.status; Ok(event) => {
if event.pid.as_raw() as u32 == pid {
child.kill = false;
break event.status;
}
}
_ => {
child.inner.start_kill()?;
child.kill = false;
break -1;
}
},
_ = cancel.cancelled() => {
child.inner.start_kill()?;
child.kill = false;
break -1;
} }
} }
}; };
child.kill = false;
let _ = join!(pty_read_task); let _ = join!(pty_read_task);
stdin_task.abort(); stdin_task.abort();
} else { } else {
@ -221,9 +238,12 @@ impl ZoneExecTask {
} }
}); });
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move { let stdin_task = tokio::task::spawn(async move {
loop { loop {
let Some(request) = receiver.recv().await else { let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break; break;
}; };
@ -247,9 +267,21 @@ impl ZoneExecTask {
}); });
code = loop { code = loop {
if let Ok(event) = wait_subscription.recv().await { select! {
if event.pid.as_raw() as u32 == pid { result = wait_subscription.recv() => match result {
break event.status; Ok(event) => {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
_ => {
child.start_kill()?;
break -1;
}
},
_ = cancel.cancelled() => {
child.start_kill()?;
break -1;
} }
} }
}; };