From 2c9152d4339e81e7f5940440ba78007a4c2140c7 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Mon, 22 Apr 2024 23:02:14 +0000 Subject: [PATCH] feat: exec tty support --- Cargo.lock | 13 + Cargo.toml | 4 + crates/ctl/src/cli/exec.rs | 12 +- crates/ctl/src/console.rs | 14 +- crates/daemon/src/control.rs | 18 +- crates/guest/Cargo.toml | 1 + crates/guest/src/exec.rs | 322 ++++++++++++++------ crates/krata/proto/krata/idm/internal.proto | 2 + crates/krata/proto/krata/v1/control.proto | 4 +- hack/debug/common.sh | 2 +- 10 files changed, 286 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0b262b..fe72a6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1479,6 +1479,7 @@ dependencies = [ "nix 0.28.0", "oci-spec", "path-absolutize", + "pty-process", "rtnetlink", "serde", "serde_json", @@ -2221,6 +2222,17 @@ dependencies = [ "prost", ] +[[package]] +name = "pty-process" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8749b545e244c90bf74a5767764cc2194f1888bb42f84015486a64c82bea5cc0" +dependencies = [ + "libc", + "rustix", + "tokio", +] + [[package]] name = "quote" version = "1.0.35" @@ -2456,6 +2468,7 @@ checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ "bitflags 2.5.0", "errno", + "itoa", "libc", "linux-raw-sys", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index 48e7346..10275ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,10 @@ features = ["derive"] version = "0.13.1" features = ["derive"] +[workspace.dependencies.pty-process] +version = "0.4.0" +features = ["async"] + [workspace.dependencies.reqwest] version = "0.12.4" default-features = false diff --git a/crates/ctl/src/cli/exec.rs b/crates/ctl/src/cli/exec.rs index 8b56223..0a22e6f 100644 --- a/crates/ctl/src/cli/exec.rs +++ b/crates/ctl/src/cli/exec.rs @@ -21,6 +21,8 @@ pub struct ExecCommand { env: Option>, #[arg(short = 'w', long, help = "Working directory")] working_directory: Option, + #[arg(short = 't', long, help = "Allocate tty")] + tty: bool, #[arg(help = "Guest to exec inside, either the name or the uuid")] guest: String, #[arg( @@ -47,14 +49,16 @@ impl ExecCommand { command: self.command, working_directory: self.working_directory.unwrap_or_default(), }), - data: vec![], + tty: self.tty, + stdin: vec![], + stdin_closed: false, }; let stream = StdioConsoleStream::stdin_stream_exec(initial).await; - let response = client.exec_guest(Request::new(stream)).await?.into_inner(); - - let code = StdioConsoleStream::exec_output(response).await?; + let result = StdioConsoleStream::exec_output(self.tty, response).await; + StdioConsoleStream::restore_terminal_mode(); + let code = result?; std::process::exit(code); } } diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index 57efb67..269c778 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -68,7 +68,13 @@ impl StdioConsoleStream { if size == 1 && buffer[0] == 0x1d { break; } - yield ExecGuestRequest { guest_id: String::default(), task: None, data }; + + let closed = size == 0; + + yield ExecGuestRequest { guest_id: String::default(), task: None, tty: false, stdin: data, stdin_closed: closed }; + if closed { + break; + } } } } @@ -90,7 +96,11 @@ impl StdioConsoleStream { Ok(()) } - pub async fn exec_output(mut stream: Streaming) -> Result { + pub async fn exec_output(tty: bool, mut stream: Streaming) -> Result { + if tty && stdin().is_tty() { + enable_raw_mode()?; + StdioConsoleStream::register_terminal_restore_hook()?; + } let mut stdout = stdout(); let mut stderr = stderr(); while let Some(reply) = stream.next().await { diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 486adfd..9beacc7 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -210,15 +210,28 @@ impl ControlService for DaemonControlService { .collect(), command: task.command, working_directory: task.working_directory, + tty: request.tty, })), })), }; + let (request_stdin, request_stdin_closed) = (request.stdin.clone(), request.stdin_closed); + let output = try_stream! { let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError { message: x.to_string(), })?; + if !request_stdin.is_empty() { + let _ = handle.update(IdmRequest { + request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { + update: Some(Update::Stdin(ExecStreamRequestStdin { + data: request_stdin, + closed: request_stdin_closed, + })), + }))}).await; + } + loop { select! { x = input.next() => if let Some(update) = x { @@ -227,11 +240,12 @@ impl ControlService for DaemonControlService { }.into()); if let Ok(update) = update { - if !update.data.is_empty() { + if !update.stdin.is_empty() { let _ = handle.update(IdmRequest { request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { update: Some(Update::Stdin(ExecStreamRequestStdin { - data: update.data, + data: update.stdin, + closed: update.stdin_closed, })), }))}).await; } diff --git a/crates/guest/Cargo.toml b/crates/guest/Cargo.toml index 688338d..d6d180e 100644 --- a/crates/guest/Cargo.toml +++ b/crates/guest/Cargo.toml @@ -21,6 +21,7 @@ log = { workspace = true } nix = { workspace = true, features = ["ioctl", "process", "fs"] } oci-spec = { workspace = true } path-absolutize = { workspace = true } +pty-process = { workspace = true } rtnetlink = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/guest/src/exec.rs b/crates/guest/src/exec.rs index 82fb360..7b3971e 100644 --- a/crates/guest/src/exec.rs +++ b/crates/guest/src/exec.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, process::Stdio}; +use std::{collections::HashMap, process::Stdio, time::Duration}; use anyhow::{anyhow, Result}; use krata::idm::{ @@ -9,10 +9,12 @@ use krata::idm::{ }, internal::{response::Response as ResponseType, Request, Response}, }; +use pty_process::{Pty, Size}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, join, - process::Command, + process::{Child, Command}, + time::sleep, }; pub struct GuestExecTask { @@ -58,115 +60,243 @@ impl GuestExecTask { start.working_directory.clone() }; - let mut child = Command::new(exe) - .args(cmd) - .envs(env) - .current_dir(dir) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() - .map_err(|error| anyhow!("failed to spawn: {}", error))?; + if start.tty { + let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?; + pty.resize(Size::new(24, 80))?; + let mut child = ChildDropGuard { + inner: pty_process::Command::new(exe) + .args(cmd) + .envs(env) + .current_dir(dir) + .spawn( + &pty.pts() + .map_err(|error| anyhow!("unable to allocate pts: {}", error))?, + ) + .map_err(|error| anyhow!("failed to spawn: {}", error))?, + kill: true, + }; + let (mut read, mut write) = pty.into_split(); - let mut stdin = child - .stdin - .take() - .ok_or_else(|| anyhow!("stdin was missing"))?; - let mut stdout = child - .stdout - .take() - .ok_or_else(|| anyhow!("stdout was missing"))?; - let mut stderr = child - .stderr - .take() - .ok_or_else(|| anyhow!("stderr was missing"))?; - - let stdout_handle = self.handle.clone(); - let stdout_task = tokio::task::spawn(async move { - let mut stdout_buffer = vec![0u8; 8 * 1024]; - loop { - let Ok(size) = stdout.read(&mut stdout_buffer).await else { - break; - }; - if size > 0 { - let response = Response { - response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { - exited: false, - exit_code: 0, - error: String::new(), - stdout: stdout_buffer[0..size].to_vec(), - stderr: vec![], - })), + let pty_read_handle = self.handle.clone(); + let pty_read_task = tokio::task::spawn(async move { + let mut stdout_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = read.read(&mut stdout_buffer).await else { + break; }; - let _ = stdout_handle.respond(response).await; - } else { - break; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: stdout_buffer[0..size].to_vec(), + stderr: vec![], + })), + }; + let _ = pty_read_handle.respond(response).await; + } else { + break; + } } - } - }); + }); - let stderr_handle = self.handle.clone(); - let stderr_task = tokio::task::spawn(async move { - let mut stderr_buffer = vec![0u8; 8 * 1024]; - loop { - let Ok(size) = stderr.read(&mut stderr_buffer).await else { - break; - }; - if size > 0 { - let response = Response { - response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { - exited: false, - exit_code: 0, - error: String::new(), - stdout: vec![], - stderr: stderr_buffer[0..size].to_vec(), - })), + let stdin_task = tokio::task::spawn(async move { + loop { + let Some(request) = receiver.recv().await else { + break; }; - let _ = stderr_handle.respond(response).await; - } else { - break; + + let Some(RequestType::ExecStream(update)) = request.request else { + continue; + }; + + let Some(Update::Stdin(update)) = update.update else { + continue; + }; + + if !update.data.is_empty() && write.write_all(&update.data).await.is_err() { + break; + } + + if update.closed { + break; + } + } + }); + + let mut result = child.inner.wait().await; + if result.is_err() { + sleep(Duration::from_millis(10)).await; + if let Ok(Some(status)) = child.inner.try_wait() { + result = Ok(status); } } - }); + let code = result.as_ref().ok().and_then(|x| x.code()).unwrap_or(-1); + let error = result + .as_ref() + .map_err(|x| x.to_string()) + .err() + .unwrap_or_default(); - let stdin_task = tokio::task::spawn(async move { - loop { - let Some(request) = receiver.recv().await else { - break; - }; + let _ = pty_read_task.await; + stdin_task.abort(); + let _ = stdin_task.await; - let Some(RequestType::ExecStream(update)) = request.request else { - continue; - }; + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: true, + exit_code: code, + error, + stdout: vec![], + stderr: vec![], + })), + }; + self.handle.respond(response).await?; + child.kill = false; + } else { + let mut child = ChildDropGuard { + inner: Command::new(exe) + .args(cmd) + .envs(env) + .current_dir(dir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|error| anyhow!("failed to spawn: {}", error))?, + kill: true, + }; + let mut stdin = child + .inner + .stdin + .take() + .ok_or_else(|| anyhow!("stdin was missing"))?; + let mut stdout = child + .inner + .stdout + .take() + .ok_or_else(|| anyhow!("stdout was missing"))?; + let mut stderr = child + .inner + .stderr + .take() + .ok_or_else(|| anyhow!("stderr was missing"))?; - let Some(Update::Stdin(update)) = update.update else { - continue; - }; + let stdout_handle = self.handle.clone(); + let stdout_task = tokio::task::spawn(async move { + let mut stdout_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = stdout.read(&mut stdout_buffer).await else { + break; + }; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: stdout_buffer[0..size].to_vec(), + stderr: vec![], + })), + }; + let _ = stdout_handle.respond(response).await; + } else { + break; + } + } + }); - if stdin.write_all(&update.data).await.is_err() { - break; + let stderr_handle = self.handle.clone(); + let stderr_task = tokio::task::spawn(async move { + let mut stderr_buffer = vec![0u8; 8 * 1024]; + loop { + let Ok(size) = stderr.read(&mut stderr_buffer).await else { + break; + }; + if size > 0 { + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: false, + exit_code: 0, + error: String::new(), + stdout: vec![], + stderr: stderr_buffer[0..size].to_vec(), + })), + }; + let _ = stderr_handle.respond(response).await; + } else { + break; + } + } + }); + + let stdin_task = tokio::task::spawn(async move { + loop { + let Some(request) = receiver.recv().await else { + break; + }; + + let Some(RequestType::ExecStream(update)) = request.request else { + continue; + }; + + let Some(Update::Stdin(update)) = update.update else { + continue; + }; + + if !update.data.is_empty() && stdin.write_all(&update.data).await.is_err() { + break; + } + + if update.closed { + break; + } + } + }); + + let mut result = child.inner.wait().await; + if result.is_err() { + sleep(Duration::from_millis(10)).await; + if let Ok(Some(status)) = child.inner.try_wait() { + result = Ok(status); } } - }); + let code = result.as_ref().ok().and_then(|x| x.code()).unwrap_or(-1); + let error = result + .as_ref() + .map_err(|x| x.to_string()) + .err() + .unwrap_or_default(); - let exit = child.wait().await?; - let code = exit.code().unwrap_or(-1); - - let _ = join!(stdout_task, stderr_task); - stdin_task.abort(); - - let response = Response { - response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { - exited: true, - exit_code: code, - error: String::new(), - stdout: vec![], - stderr: vec![], - })), - }; - self.handle.respond(response).await?; + let _ = join!(stdout_task, stderr_task); + stdin_task.abort(); + let response = Response { + response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { + exited: true, + exit_code: code, + error, + stdout: vec![], + stderr: vec![], + })), + }; + self.handle.respond(response).await?; + child.kill = false; + } Ok(()) } } + +struct ChildDropGuard { + pub inner: Child, + pub kill: bool, +} + +impl Drop for ChildDropGuard { + fn drop(&mut self) { + if self.kill { + drop(self.inner.start_kill()); + } + } +} diff --git a/crates/krata/proto/krata/idm/internal.proto b/crates/krata/proto/krata/idm/internal.proto index 5c96a12..6f176b2 100644 --- a/crates/krata/proto/krata/idm/internal.proto +++ b/crates/krata/proto/krata/idm/internal.proto @@ -45,10 +45,12 @@ message ExecStreamRequestStart { repeated ExecEnvVar environment = 1; repeated string command = 2; string working_directory = 3; + bool tty = 4; } message ExecStreamRequestStdin { bytes data = 1; + bool closed = 2; } message ExecStreamRequestUpdate { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index ca49f5b..c3c22e1 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -67,7 +67,9 @@ message ListGuestsReply { message ExecGuestRequest { string guest_id = 1; krata.v1.common.GuestTaskSpec task = 2; - bytes data = 3; + bytes stdin = 3; + bool stdin_closed = 4; + bool tty = 5; } message ExecGuestReply { diff --git a/hack/debug/common.sh b/hack/debug/common.sh index 5156af8..f9c3683 100644 --- a/hack/debug/common.sh +++ b/hack/debug/common.sh @@ -28,5 +28,5 @@ build_and_run() { fi RUST_TARGET="$(./hack/build/target.sh)" ./hack/build/cargo.sh build ${CARGO_BUILD_FLAGS} --bin "${EXE_TARGET}" - exec sudo sh -c "RUST_LOG='${RUST_LOG}' 'target/${RUST_TARGET}/debug/${EXE_TARGET}' $*" + exec sudo RUST_LOG="${RUST_LOG}" "target/${RUST_TARGET}/debug/${EXE_TARGET}" "${@}" }