From 0d2b7a3ae32d6fe235e74779fd4b4802719c9114 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Mon, 26 Aug 2024 00:43:07 -0400 Subject: [PATCH] feature(zone-exec): implement terminal resize support (#363) --- crates/ctl/src/cli/zone/exec.rs | 18 ++- crates/ctl/src/console.rs | 111 +++++++++++++++--- crates/daemon/src/control/exec_inside_zone.rs | 19 ++- crates/krata/proto/krata/idm/internal.proto | 7 ++ crates/krata/proto/krata/v1/common.proto | 5 + crates/krata/proto/krata/v1/control.proto | 1 + crates/zone/src/exec.rs | 32 +++-- 7 files changed, 165 insertions(+), 28 deletions(-) diff --git a/crates/ctl/src/cli/zone/exec.rs b/crates/ctl/src/cli/zone/exec.rs index addd236..ef2e247 100644 --- a/crates/ctl/src/cli/zone/exec.rs +++ b/crates/ctl/src/cli/zone/exec.rs @@ -3,11 +3,13 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; +use crossterm::tty::IsTty; use krata::v1::{ - common::{ZoneTaskSpec, ZoneTaskSpecEnvVar}, + common::{TerminalSize, ZoneTaskSpec, ZoneTaskSpecEnvVar}, control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest}, }; +use tokio::io::stdin; use tonic::{transport::Channel, Request}; use crate::console::StdioConsoleStream; @@ -36,6 +38,7 @@ pub struct ZoneExecCommand { impl ZoneExecCommand { pub async fn run(self, mut client: ControlServiceClient) -> Result<()> { let zone_id: String = resolve_zone(&mut client, &self.zone).await?; + let should_map_tty = self.tty && stdin().is_tty(); let initial = ExecInsideZoneRequest { zone_id, task: Some(ZoneTaskSpec { @@ -52,16 +55,25 @@ impl ZoneExecCommand { }), stdin: vec![], stdin_closed: false, + terminal_size: if should_map_tty { + let size = crossterm::terminal::size().ok(); + size.map(|(columns, rows)| TerminalSize { + rows: rows as u32, + columns: columns as u32, + }) + } else { + None + }, }; - let stream = StdioConsoleStream::stdin_stream_exec(initial).await; + let stream = StdioConsoleStream::input_stream_exec(initial, should_map_tty).await; let response = client .exec_inside_zone(Request::new(stream)) .await? .into_inner(); - let code = StdioConsoleStream::exec_output(response, self.tty).await?; + let code = StdioConsoleStream::exec_output(response, should_map_tty).await?; std::process::exit(code); } } diff --git a/crates/ctl/src/console.rs b/crates/ctl/src/console.rs index 27aef87..1f8e9c5 100644 --- a/crates/ctl/src/console.rs +++ b/crates/ctl/src/console.rs @@ -7,6 +7,7 @@ use crossterm::{ use krata::v1::common::ZoneState; use krata::{ events::EventStream, + v1::common::TerminalSize, v1::control::{ watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply, ZoneConsoleRequest, @@ -15,6 +16,7 @@ use krata::{ use log::debug; use tokio::{ io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt}, + select, task::JoinHandle, }; use tokio_stream::{Stream, StreamExt}; @@ -22,6 +24,11 @@ use tonic::Streaming; pub struct StdioConsoleStream; +enum ExecStdinSelect { + DataRead(std::io::Result), + TerminalResize, +} + impl StdioConsoleStream { pub async fn stdin_stream( zone: String, @@ -49,30 +56,106 @@ impl StdioConsoleStream { } } - pub async fn stdin_stream_exec( + #[cfg(unix)] + pub async fn input_stream_exec( initial: ExecInsideZoneRequest, + tty: bool, ) -> impl Stream { let mut stdin = stdin(); stream! { yield initial; let mut buffer = vec![0u8; 60]; + let mut terminal_size_change = if tty { + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()).ok() + } else { + None + }; + let mut stdin_closed = false; loop { - let size = match stdin.read(&mut buffer).await { - Ok(size) => size, - Err(error) => { - debug!("failed to read stdin: {}", error); - break; + let selected = if let Some(ref mut terminal_size_change) = terminal_size_change { + if stdin_closed { + select! { + _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize, + } + } else { + select! { + result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result), + _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize, + } + } + } else { + select! { + result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result), } }; - let stdin = buffer[0..size].to_vec(); - if size == 1 && buffer[0] == 0x1d { - break; + + match selected { + ExecStdinSelect::DataRead(result) => { + match result { + Ok(size) => { + let stdin = buffer[0..size].to_vec(); + if size == 1 && buffer[0] == 0x1d { + break; + } + stdin_closed = size == 0; + yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, }; + }, + Err(error) => { + debug!("failed to read stdin: {}", error); + break; + } + } + }, + ExecStdinSelect::TerminalResize => { + if let Ok((columns, rows)) = crossterm::terminal::size() { + yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: Some(TerminalSize { + rows: rows as u32, + columns: columns as u32, + }), stdin: vec![], stdin_closed: false, }; + } + } } - let stdin_closed = size == 0; - yield ExecInsideZoneRequest { zone_id: String::default(), task: None, stdin, stdin_closed, }; - if stdin_closed { - break; + } + } + } + + #[cfg(not(unix))] + pub async fn input_stream_exec( + initial: ExecInsideZoneRequest, + _tty: bool, + ) -> impl Stream { + let mut stdin = stdin(); + stream! { + yield initial; + + let mut buffer = vec![0u8; 60]; + let mut stdin_closed = false; + loop { + let selected = select! { + result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result), + }; + + match selected { + ExecStdinSelect::DataRead(result) => { + match result { + Ok(size) => { + let stdin = buffer[0..size].to_vec(); + if size == 1 && buffer[0] == 0x1d { + break; + } + stdin_closed = size == 0; + yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, }; + }, + Err(error) => { + debug!("failed to read stdin: {}", error); + break; + } + } + }, + _ => { + continue; + } } } } @@ -96,7 +179,7 @@ impl StdioConsoleStream { } pub async fn exec_output(mut stream: Streaming, raw: bool) -> Result { - if raw && stdin().is_tty() { + if raw { enable_raw_mode()?; StdioConsoleStream::register_terminal_restore_hook()?; } diff --git a/crates/daemon/src/control/exec_inside_zone.rs b/crates/daemon/src/control/exec_inside_zone.rs index c08fc8c..003a2d7 100644 --- a/crates/daemon/src/control/exec_inside_zone.rs +++ b/crates/daemon/src/control/exec_inside_zone.rs @@ -13,7 +13,8 @@ use krata::{ idm::internal::{ exec_stream_request_update::Update, request::Request as IdmRequestType, response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart, - ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest, + ExecStreamRequestStdin, ExecStreamRequestTerminalSize, ExecStreamRequestUpdate, + Request as IdmRequest, }, v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest}, }; @@ -61,6 +62,12 @@ impl ExecInsideZoneRpc { command: task.command, working_directory: task.working_directory, tty: task.tty, + terminal_size: request.terminal_size.map(|size| { + ExecStreamRequestTerminalSize { + rows: size.rows, + columns: size.columns, + } + }), })), })), }; @@ -87,6 +94,16 @@ impl ExecInsideZoneRpc { })), }))}).await; } + + if let Some(ref terminal_size) = update.terminal_size { + let _ = handle.update(IdmRequest { + request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate { + update: Some(Update::TerminalResize(ExecStreamRequestTerminalSize { + rows: terminal_size.rows, + columns: terminal_size.columns, + })), + }))}).await; + } } }, x = handle.receiver.recv() => match x { diff --git a/crates/krata/proto/krata/idm/internal.proto b/crates/krata/proto/krata/idm/internal.proto index 6f176b2..48349ae 100644 --- a/crates/krata/proto/krata/idm/internal.proto +++ b/crates/krata/proto/krata/idm/internal.proto @@ -46,6 +46,7 @@ message ExecStreamRequestStart { repeated string command = 2; string working_directory = 3; bool tty = 4; + ExecStreamRequestTerminalSize terminal_size = 5; } message ExecStreamRequestStdin { @@ -53,10 +54,16 @@ message ExecStreamRequestStdin { bool closed = 2; } +message ExecStreamRequestTerminalSize { + uint32 rows = 1; + uint32 columns = 2; +} + message ExecStreamRequestUpdate { oneof update { ExecStreamRequestStart start = 1; ExecStreamRequestStdin stdin = 2; + ExecStreamRequestTerminalSize terminal_resize = 3; } } diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 267c234..cc14d6a 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -134,3 +134,8 @@ enum ZoneMetricFormat { ZONE_METRIC_FORMAT_INTEGER = 2; ZONE_METRIC_FORMAT_DURATION_SECONDS = 3; } + +message TerminalSize { + uint32 rows = 1; + uint32 columns = 2; +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 372e135..eeca1d6 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -91,6 +91,7 @@ message ExecInsideZoneRequest { krata.v1.common.ZoneTaskSpec task = 2; bytes stdin = 3; bool stdin_closed = 4; + krata.v1.common.TerminalSize terminal_size = 5; } message ExecInsideZoneReply { diff --git a/crates/zone/src/exec.rs b/crates/zone/src/exec.rs index e462e0f..065ac9c 100644 --- a/crates/zone/src/exec.rs +++ b/crates/zone/src/exec.rs @@ -70,7 +70,11 @@ impl ZoneExecTask { let code: c_int; if start.tty { let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?; - pty.resize(Size::new(24, 80))?; + let size = start + .terminal_size + .map(|x| Size::new(x.rows as u16, x.columns as u16)) + .unwrap_or_else(|| Size::new(24, 80)); + pty.resize(size)?; let pts = pty .pts() .map_err(|error| anyhow!("unable to allocate pts: {}", error))?; @@ -130,16 +134,24 @@ impl ZoneExecTask { continue; }; - let Some(Update::Stdin(update)) = update.update else { - continue; - }; + match update.update { + Some(Update::Stdin(update)) => { + if !update.data.is_empty() + && write.write_all(&update.data).await.is_err() + { + break; + } - if !update.data.is_empty() && write.write_all(&update.data).await.is_err() { - break; - } - - if update.closed { - break; + if update.closed { + break; + } + } + Some(Update::TerminalResize(size)) => { + let _ = write.resize(Size::new(size.rows as u16, size.columns as u16)); + } + _ => { + continue; + } } } });