diff --git a/Cargo.lock b/Cargo.lock index 50157a9..35c444e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1622,6 +1622,7 @@ dependencies = [ "sys-mount", "sysinfo", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index aab22f1..70f45dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ sysinfo = "0.31.2" termtree = "0.5.1" thiserror = "1.0" tokio-tun = "0.11.5" +tokio-util = "0.7.11" toml = "0.8.19" tonic-build = "0.12.1" tower = "0.5.0" diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 0205efc..455fc76 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -16,6 +16,7 @@ use kratart::Runtime; use log::{debug, info}; use reconcile::zone::ZoneReconciler; use std::path::Path; +use std::time::Duration; use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc}; use tokio::{ fs, @@ -208,6 +209,8 @@ impl Daemon { server = server.tls_config(tls_config)?; } + server = server.http2_keepalive_interval(Some(Duration::from_secs(10))); + let server = server.add_service(ControlServiceServer::new(control_service)); info!("listening on address {}", addr); match addr { diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 0fc7606..8e8c79f 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -495,6 +495,7 @@ impl IdmClient { IdmTransportPacketForm::StreamRequestClosed => { let mut update_streams = request_update_streams.lock().await; update_streams.remove(&packet.id); + println!("stream request closed: {}", packet.id); } IdmTransportPacketForm::StreamResponseUpdate => { diff --git a/crates/zone/Cargo.toml b/crates/zone/Cargo.toml index d7653d6..6f47ed7 100644 --- a/crates/zone/Cargo.toml +++ b/crates/zone/Cargo.toml @@ -29,6 +29,7 @@ serde_json = { workspace = true } sys-mount = { workspace = true } sysinfo = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } [lib] name = "kratazone" diff --git a/crates/zone/src/exec.rs b/crates/zone/src/exec.rs index c1caac8..7025806 100644 --- a/crates/zone/src/exec.rs +++ b/crates/zone/src/exec.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, process::Stdio}; +use crate::childwait::ChildWait; use anyhow::{anyhow, Result}; use krata::idm::{ client::IdmClientStreamResponseHandle, @@ -16,9 +17,9 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, join, process::Command, + select, }; - -use crate::childwait::ChildWait; +use tokio_util::sync::CancellationToken; pub struct ZoneExecTask { pub wait: ChildWait, @@ -111,9 +112,12 @@ impl ZoneExecTask { } }); + let cancel = CancellationToken::new(); + let stdin_cancel = cancel.clone(); let stdin_task = tokio::task::spawn(async move { loop { let Some(request) = receiver.recv().await else { + stdin_cancel.cancel(); break; }; @@ -136,15 +140,28 @@ impl ZoneExecTask { }); code = loop { - if let Ok(event) = wait_subscription.recv().await { - if event.pid.as_raw() as u32 == pid { - break event.status; + select! { + result = wait_subscription.recv() => match result { + Ok(event) => { + if event.pid.as_raw() as u32 == pid { + child.kill = false; + break event.status; + } + } + _ => { + child.inner.start_kill()?; + child.kill = false; + break -1; + } + }, + _ = cancel.cancelled() => { + child.inner.start_kill()?; + child.kill = false; + break -1; } } }; - child.kill = false; - let _ = join!(pty_read_task); stdin_task.abort(); } else { @@ -221,9 +238,12 @@ impl ZoneExecTask { } }); + let cancel = CancellationToken::new(); + let stdin_cancel = cancel.clone(); let stdin_task = tokio::task::spawn(async move { loop { let Some(request) = receiver.recv().await else { + stdin_cancel.cancel(); break; }; @@ -247,9 +267,21 @@ impl ZoneExecTask { }); code = loop { - if let Ok(event) = wait_subscription.recv().await { - if event.pid.as_raw() as u32 == pid { - break event.status; + select! { + result = wait_subscription.recv() => match result { + Ok(event) => { + if event.pid.as_raw() as u32 == pid { + break event.status; + } + } + _ => { + child.start_kill()?; + break -1; + } + }, + _ = cancel.cancelled() => { + child.start_kill()?; + break -1; } } };