diff --git a/Cargo.lock b/Cargo.lock index a3e3fbc..a61294e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,7 +190,7 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 1.0.1", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", ] @@ -431,7 +431,7 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ - "crossterm", + "crossterm 0.27.0", "strum", "strum_macros", "unicode-width", @@ -439,13 +439,14 @@ dependencies = [ [[package]] name = "compact_str" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +checksum = "6050c3a16ddab2e412160b31f2c871015704239bca62f72f6e5f0be631d3f644" dependencies = [ "castaway", "cfg-if", "itoa", + "rustversion", "ryu", "static_assertions", ] @@ -520,10 +521,23 @@ checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ "bitflags 2.6.0", "crossterm_winapi", - "futures-core", "libc", - "mio 0.8.11", "parking_lot", + "winapi", +] + +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.6.0", + "crossterm_winapi", + "futures-core", + "mio", + "parking_lot", + "rustix", "signal-hook", "signal-hook-mio", "winapi", @@ -1148,7 +1162,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -1202,6 +1216,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "instability" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b23a0c8dfe501baac4adf6ebbfa6eddf8f0c07f56b058cc1288017e32397846c" +dependencies = [ + "quote", + "syn 2.0.74", +] + [[package]] name = "instant" version = "0.1.13" @@ -1290,7 +1314,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", - "tower", + "tower 0.5.0", "url", ] @@ -1329,7 +1353,7 @@ dependencies = [ "base64", "clap", "comfy-table", - "crossterm", + "crossterm 0.28.1", "ctrlc", "env_logger", "fancy-duration", @@ -1347,7 +1371,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", - "tower", + "tower 0.5.0", ] [[package]] @@ -1585,6 +1609,7 @@ dependencies = [ "oci-spec", "path-absolutize", "platform-info", + "pty-process", "rtnetlink", "serde", "serde_json", @@ -1691,18 +1716,6 @@ dependencies = [ "adler", ] -[[package]] -name = "mio" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" -dependencies = [ - "libc", - "log", - "wasi", - "windows-sys 0.48.0", -] - [[package]] name = "mio" version = "1.0.2" @@ -1711,6 +1724,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi", "libc", + "log", "wasi", "windows-sys 0.52.0", ] @@ -2168,6 +2182,17 @@ dependencies = [ "prost", ] +[[package]] +name = "pty-process" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8749b545e244c90bf74a5767764cc2194f1888bb42f84015486a64c82bea5cc0" +dependencies = [ + "libc", + "rustix", + "tokio", +] + [[package]] name = "quinn" version = "0.11.3" @@ -2263,18 +2288,18 @@ dependencies = [ [[package]] name = "ratatui" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16546c5b5962abf8ce6e2881e722b4e0ae3b6f1a08a26ae3573c55853ca68d3" +checksum = "5ba6a365afbe5615999275bea2446b970b10a41102500e27ce7678d50d978303" dependencies = [ "bitflags 2.6.0", "cassowary", "compact_str", - "crossterm", + "crossterm 0.28.1", + "instability", "itertools", "lru", "paste", - "stability", "strum", "strum_macros", "unicode-segmentation", @@ -2459,6 +2484,7 @@ checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.6.0", "errno", + "itoa", "libc", "linux-raw-sys", "windows-sys 0.52.0", @@ -2650,7 +2676,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" dependencies = [ "libc", - "mio 0.8.11", + "mio", "signal-hook", ] @@ -2727,16 +2753,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "stability" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d904e7009df136af5297832a3ace3370cd14ff1546a232f4f185036c2736fcac" -dependencies = [ - "quote", - "syn 2.0.74", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2832,15 +2848,14 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.30.13" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +checksum = "d4115055da5f572fff541dd0c4e61b0262977f453cc9fe04be83aba25a89bdab" dependencies = [ - "cfg-if", "core-foundation-sys", "libc", + "memchr", "ntapi", - "once_cell", "rayon", "windows", ] @@ -2914,7 +2929,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio 1.0.2", + "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -3052,7 +3067,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -3091,6 +3106,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36b837f86b25d7c0d7988f00a54e74739be6477f2aac6201b8f429a7569991b7" +dependencies = [ + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -3109,7 +3134,6 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3390,9 +3414,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.52.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" dependencies = [ "windows-core", "windows-targets 0.52.6", @@ -3400,9 +3424,43 @@ dependencies = [ [[package]] name = "windows-core" -version = "0.52.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" dependencies = [ "windows-targets 0.52.6", ] diff --git a/Cargo.toml b/Cargo.toml index 833ee10..bf3d709 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ c2rust-bitfields = "0.18.0" cgroups-rs = "0.3.4" circular-buffer = "0.1.7" comfy-table = "7.1.1" -crossterm = "0.27.0" +crossterm = "0.28.1" ctrlc = "3.4.5" elf = "0.7.4" env_logger = "0.11.5" @@ -68,8 +68,9 @@ prost = "0.13.1" prost-build = "0.13.1" prost-reflect-build = "0.14.0" prost-types = "0.13.1" +pty-process = "0.4.0" rand = "0.8.5" -ratatui = "0.27.0" +ratatui = "0.28.0" redb = "2.1.1" regex = "1.10.6" rtnetlink = "0.14.1" @@ -80,13 +81,13 @@ sha256 = "1.5.0" signal-hook = "0.3.17" slice-copy = "0.3.0" smoltcp = "0.11.0" -sysinfo = "0.30.13" +sysinfo = "0.31.2" termtree = "0.5.1" thiserror = "1.0" tokio-tun = "0.11.5" toml = "0.8.19" tonic-build = "0.12.1" -tower = "0.4.13" +tower = "0.5.0" udp-stream = "0.0.12" url = "2.5.2" walkdir = "2" diff --git a/crates/ctl/src/cli/zone/exec.rs b/crates/ctl/src/cli/zone/exec.rs index e0ba4bf..addd236 100644 --- a/crates/ctl/src/cli/zone/exec.rs +++ b/crates/ctl/src/cli/zone/exec.rs @@ -21,6 +21,8 @@ pub struct ZoneExecCommand { env: Option>, #[arg(short = 'w', long, help = "Working directory")] working_directory: Option, + #[arg(short = 't', long, help = "Allocate tty")] + tty: bool, #[arg(help = "Zone to exec inside, either the name or the uuid")] zone: String, #[arg( @@ -46,8 +48,10 @@ impl ZoneExecCommand { .collect(), command: self.command, working_directory: self.working_directory.unwrap_or_default(), + tty: self.tty, }), - data: vec![], + stdin: vec![], + stdin_closed: false, }; let stream = StdioConsoleStream::stdin_stream_exec(initial).await; @@ -57,7 +61,7 @@ impl ZoneExecCommand { .await? .into_inner(); - let code = StdioConsoleStream::exec_output(response).await?; + let code = StdioConsoleStream::exec_output(response, self.tty).await?; std::process::exit(code); } } diff --git a/crates/ctl/src/cli/zone/launch.rs b/crates/ctl/src/cli/zone/launch.rs index 07eaa56..2b2316a 100644 --- a/crates/ctl/src/cli/zone/launch.rs +++ b/crates/ctl/src/cli/zone/launch.rs @@ -59,6 +59,8 @@ pub struct ZoneLaunchCommand { device: Vec, #[arg[short, long, help = "Environment variables set in the zone"]] env: Option>, + #[arg(short = 't', long, help = "Allocate tty for task")] + tty: bool, #[arg( short, long, @@ -143,6 +145,7 @@ impl ZoneLaunchCommand { .collect(), command: self.command, working_directory: self.working_directory.unwrap_or_default(), + tty: self.tty, }), annotations: vec![], devices: self diff --git a/crates/ctl/src/cli/zone/top.rs b/crates/ctl/src/cli/zone/top.rs index 04c2441..5526cec 100644 --- a/crates/ctl/src/cli/zone/top.rs +++ b/crates/ctl/src/cli/zone/top.rs @@ -112,7 +112,7 @@ impl ZoneTopApp { } fn render_frame(&mut self, frame: &mut Frame) { - frame.render_widget(self, frame.size()); + frame.render_widget(self, frame.area()); } fn handle_event(&mut self, event: Event) -> io::Result<()> { diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index b05b0f6..008c205 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -62,11 +62,15 @@ impl StdioConsoleStream { break; } }; - let data = buffer[0..size].to_vec(); + let stdin = buffer[0..size].to_vec(); if size == 1 && buffer[0] == 0x1d { break; } - yield ExecInsideZoneRequest { zone_id: String::default(), task: None, data }; + let stdin_closed = size == 0; + yield ExecInsideZoneRequest { zone_id: String::default(), task: None, stdin, stdin_closed, }; + if stdin_closed { + break; + } } } } @@ -88,7 +92,11 @@ impl StdioConsoleStream { Ok(()) } - pub async fn exec_output(mut stream: Streaming) -> Result { + pub async fn exec_output(mut stream: Streaming, raw: bool) -> Result { + if raw && stdin().is_tty() { + enable_raw_mode()?; + StdioConsoleStream::register_terminal_restore_hook()?; + } let mut stdout = stdout(); let mut stderr = stderr(); while let Some(reply) = stream.next().await { diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index c6d21be..c132ffd 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -226,6 +226,7 @@ impl ControlService for DaemonControlService { .collect(), command: task.command, working_directory: task.working_directory, + tty: task.tty, })), })), }; @@ -243,11 +244,12 @@ impl ControlService for DaemonControlService { }.into()); if let Ok(update) = update { - if !update.data.is_empty() { + if !update.stdin.is_empty() { let _ = handle.update(IdmRequest { request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { update: Some(Update::Stdin(ExecStreamRequestStdin { - data: update.data, + data: update.stdin, + closed: update.stdin_closed, })), }))}).await; } @@ -263,7 +265,7 @@ impl ControlService for DaemonControlService { error: update.error, exit_code: update.exit_code, stdout: update.stdout, - stderr: update.stderr + stderr: update.stderr, }; yield reply; }, diff --git a/crates/krata/proto/krata/idm/internal.proto b/crates/krata/proto/krata/idm/internal.proto index 5c96a12..6f176b2 100644 --- a/crates/krata/proto/krata/idm/internal.proto +++ b/crates/krata/proto/krata/idm/internal.proto @@ -45,10 +45,12 @@ message ExecStreamRequestStart { repeated ExecEnvVar environment = 1; repeated string command = 2; string working_directory = 3; + bool tty = 4; } message ExecStreamRequestStdin { bytes data = 1; + bool closed = 2; } message ExecStreamRequestUpdate { diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 44c2bdc..05c3071 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -56,6 +56,7 @@ message ZoneTaskSpec { repeated ZoneTaskSpecEnvVar environment = 1; repeated string command = 2; string working_directory = 3; + bool tty = 4; } message ZoneTaskSpecEnvVar { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 8f37d53..3197c40 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -84,7 +84,8 @@ message ListZonesReply { message ExecInsideZoneRequest { string zone_id = 1; krata.v1.common.ZoneTaskSpec task = 2; - bytes data = 3; + bytes stdin = 3; + bool stdin_closed = 4; } message ExecInsideZoneReply { diff --git a/crates/zone/Cargo.toml b/crates/zone/Cargo.toml index 96323b3..fe0a52c 100644 --- a/crates/zone/Cargo.toml +++ b/crates/zone/Cargo.toml @@ -22,6 +22,7 @@ nix = { workspace = true, features = ["ioctl", "process", "fs"] } oci-spec = { workspace = true } path-absolutize = { workspace = true } platform-info = { workspace = true } +pty-process = { workspace = true, features = ["async"] } rtnetlink = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/zone/src/exec.rs b/crates/zone/src/exec.rs index 941fc14..c1caac8 100644 --- a/crates/zone/src/exec.rs +++ b/crates/zone/src/exec.rs @@ -1,12 +1,6 @@ use std::{collections::HashMap, process::Stdio}; use anyhow::{anyhow, Result}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - join, - process::Command, -}; - use krata::idm::{ client::IdmClientStreamResponseHandle, internal::{ @@ -15,6 +9,14 @@ use krata::idm::{ }, internal::{response::Response as ResponseType, Request, Response}, }; +use libc::c_int; +use pty_process::{Pty, Size}; +use tokio::process::Child; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + join, + process::Command, +}; use crate::childwait::ChildWait; @@ -52,7 +54,7 @@ impl ZoneExecTask { if !env.contains_key("PATH") { env.insert( "PATH".to_string(), - "/bin:/usr/bin:/usr/local/bin".to_string(), + "/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin".to_string(), ); } @@ -63,112 +65,196 @@ impl ZoneExecTask { }; let mut wait_subscription = self.wait.subscribe().await?; - let mut child = Command::new(exe) - .args(cmd) - .envs(env) - .current_dir(dir) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() - .map_err(|error| anyhow!("failed to spawn: {}", error))?; - let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?; - let mut stdin = child - .stdin - .take() - .ok_or_else(|| anyhow!("stdin was missing"))?; - let mut stdout = child - .stdout - .take() - .ok_or_else(|| anyhow!("stdout was missing"))?; - let mut stderr = child - .stderr - .take() - .ok_or_else(|| anyhow!("stderr was missing"))?; - - let stdout_handle = self.handle.clone(); - let stdout_task = tokio::task::spawn(async move { - let mut stdout_buffer = vec![0u8; 8 * 1024]; - loop { - let Ok(size) = stdout.read(&mut stdout_buffer).await else { - break; - }; - if size > 0 { - let response = Response { - response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { - exited: false, - exit_code: 0, - error: String::new(), - stdout: stdout_buffer[0..size].to_vec(), - stderr: vec![], - })), + let code: c_int; + if start.tty { + let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?; + pty.resize(Size::new(24, 80))?; + let mut child = ChildDropGuard { + inner: pty_process::Command::new(exe) + .args(cmd) + .envs(env) + .current_dir(dir) + .spawn( + &pty.pts() + .map_err(|error| anyhow!("unable to allocate pts: {}", error))?, + ) + .map_err(|error| anyhow!("failed to spawn: {}", error))?, + kill: true, + }; + let pid = child + .inner + .id() + .ok_or_else(|| anyhow!("pid is not provided"))?; + let (mut read, mut write) = pty.into_split(); + let pty_read_handle = self.handle.clone(); + let pty_read_task = tokio::task::spawn(async move { + let mut stdout_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = read.read(&mut stdout_buffer).await else { + break; }; - let _ = stdout_handle.respond(response).await; - } else { - break; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: stdout_buffer[0..size].to_vec(), + stderr: vec![], + })), + }; + let _ = pty_read_handle.respond(response).await; + } else { + break; + } } - } - }); + }); - let stderr_handle = self.handle.clone(); - let stderr_task = tokio::task::spawn(async move { - let mut stderr_buffer = vec![0u8; 8 * 1024]; - loop { - let Ok(size) = stderr.read(&mut stderr_buffer).await else { - break; - }; - if size > 0 { - let response = Response { - response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { - exited: false, - exit_code: 0, - error: String::new(), - stdout: vec![], - stderr: stderr_buffer[0..size].to_vec(), - })), + let stdin_task = tokio::task::spawn(async move { + loop { + let Some(request) = receiver.recv().await else { + break; }; - let _ = stderr_handle.respond(response).await; - } else { - break; + + let Some(RequestType::ExecStream(update)) = request.request else { + continue; + }; + + let Some(Update::Stdin(update)) = update.update else { + continue; + }; + + if !update.data.is_empty() && write.write_all(&update.data).await.is_err() { + break; + } + + if update.closed { + break; + } } - } - }); + }); - let stdin_task = tokio::task::spawn(async move { - loop { - let Some(request) = receiver.recv().await else { - break; - }; - - let Some(RequestType::ExecStream(update)) = request.request else { - continue; - }; - - let Some(Update::Stdin(update)) = update.update else { - continue; - }; - - if stdin.write_all(&update.data).await.is_err() { - break; + code = loop { + if let Ok(event) = wait_subscription.recv().await { + if event.pid.as_raw() as u32 == pid { + break event.status; + } } - } - }); + }; - let data_task = tokio::task::spawn(async move { - let _ = join!(stdout_task, stderr_task); + child.kill = false; + + let _ = join!(pty_read_task); stdin_task.abort(); - }); + } else { + let mut child = Command::new(exe) + .args(cmd) + .envs(env) + .current_dir(dir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|error| anyhow!("failed to spawn: {}", error))?; - let code = loop { - if let Ok(event) = wait_subscription.recv().await { - if event.pid.as_raw() as u32 == pid { - break event.status; + let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?; + let mut stdin = child + .stdin + .take() + .ok_or_else(|| anyhow!("stdin was missing"))?; + let mut stdout = child + .stdout + .take() + .ok_or_else(|| anyhow!("stdout was missing"))?; + let mut stderr = child + .stderr + .take() + .ok_or_else(|| anyhow!("stderr was missing"))?; + + let stdout_handle = self.handle.clone(); + let stdout_task = tokio::task::spawn(async move { + let mut stdout_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = stdout.read(&mut stdout_buffer).await else { + break; + }; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: stdout_buffer[0..size].to_vec(), + stderr: vec![], + })), + }; + let _ = stdout_handle.respond(response).await; + } else { + break; + } } - } - }; - data_task.await?; + }); + + let stderr_handle = self.handle.clone(); + let stderr_task = tokio::task::spawn(async move { + let mut stderr_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = stderr.read(&mut stderr_buffer).await else { + break; + }; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: vec![], + stderr: stderr_buffer[0..size].to_vec(), + })), + }; + let _ = stderr_handle.respond(response).await; + } else { + break; + } + } + }); + + let stdin_task = tokio::task::spawn(async move { + loop { + let Some(request) = receiver.recv().await else { + break; + }; + + let Some(RequestType::ExecStream(update)) = request.request else { + continue; + }; + + let Some(Update::Stdin(update)) = update.update else { + continue; + }; + + if stdin.write_all(&update.data).await.is_err() { + break; + } + } + }); + + let data_task = tokio::task::spawn(async move { + let _ = join!(stdout_task, stderr_task); + stdin_task.abort(); + }); + + code = loop { + if let Ok(event) = wait_subscription.recv().await { + if event.pid.as_raw() as u32 == pid { + break event.status; + } + } + }; + data_task.await?; + } let response = Response { response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { exited: true, @@ -183,3 +269,16 @@ impl ZoneExecTask { Ok(()) } } + +struct ChildDropGuard { + pub inner: Child, + pub kill: bool, +} + +impl Drop for ChildDropGuard { + fn drop(&mut self) { + if self.kill { + drop(self.inner.start_kill()); + } + } +} diff --git a/crates/zone/src/metrics.rs b/crates/zone/src/metrics.rs index d67526c..e539be7 100644 --- a/crates/zone/src/metrics.rs +++ b/crates/zone/src/metrics.rs @@ -2,7 +2,7 @@ use std::{ops::Add, path::Path}; use anyhow::Result; use krata::idm::internal::{MetricFormat, MetricNode}; -use sysinfo::Process; +use sysinfo::{Process, ProcessesToUpdate}; pub struct MetricsCollector {} @@ -38,7 +38,7 @@ impl MetricsCollector { } fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { - sysinfo.refresh_processes(); + sysinfo.refresh_processes(ProcessesToUpdate::All); let mut processes = Vec::new(); let mut sysinfo_processes = sysinfo.processes().values().collect::>(); sysinfo_processes.sort_by_key(|x| x.pid()); @@ -70,7 +70,11 @@ impl MetricsCollector { metrics.push(MetricNode::raw_value("cwd", working_directory)); } - let cmdline = process.cmd().to_vec(); + let cmdline = process + .cmd() + .iter() + .map(|x| x.to_string_lossy().to_string()) + .collect::>(); metrics.push(MetricNode::raw_value("cmdline", cmdline)); metrics.push(MetricNode::structural( "memory",