feature(zone-exec): implement terminal resize support (#363)

This commit is contained in:
Alex Zenla
2024-08-26 00:43:07 -04:00
committed by GitHub
parent f1e3d59b6a
commit 0d2b7a3ae3
7 changed files with 165 additions and 28 deletions

View File

@ -3,11 +3,13 @@ use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use crossterm::tty::IsTty;
use krata::v1::{ use krata::v1::{
common::{ZoneTaskSpec, ZoneTaskSpecEnvVar}, common::{TerminalSize, ZoneTaskSpec, ZoneTaskSpecEnvVar},
control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest}, control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest},
}; };
use tokio::io::stdin;
use tonic::{transport::Channel, Request}; use tonic::{transport::Channel, Request};
use crate::console::StdioConsoleStream; use crate::console::StdioConsoleStream;
@ -36,6 +38,7 @@ pub struct ZoneExecCommand {
impl ZoneExecCommand { impl ZoneExecCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> { pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let zone_id: String = resolve_zone(&mut client, &self.zone).await?; let zone_id: String = resolve_zone(&mut client, &self.zone).await?;
let should_map_tty = self.tty && stdin().is_tty();
let initial = ExecInsideZoneRequest { let initial = ExecInsideZoneRequest {
zone_id, zone_id,
task: Some(ZoneTaskSpec { task: Some(ZoneTaskSpec {
@ -52,16 +55,25 @@ impl ZoneExecCommand {
}), }),
stdin: vec![], stdin: vec![],
stdin_closed: false, stdin_closed: false,
terminal_size: if should_map_tty {
let size = crossterm::terminal::size().ok();
size.map(|(columns, rows)| TerminalSize {
rows: rows as u32,
columns: columns as u32,
})
} else {
None
},
}; };
let stream = StdioConsoleStream::stdin_stream_exec(initial).await; let stream = StdioConsoleStream::input_stream_exec(initial, should_map_tty).await;
let response = client let response = client
.exec_inside_zone(Request::new(stream)) .exec_inside_zone(Request::new(stream))
.await? .await?
.into_inner(); .into_inner();
let code = StdioConsoleStream::exec_output(response, self.tty).await?; let code = StdioConsoleStream::exec_output(response, should_map_tty).await?;
std::process::exit(code); std::process::exit(code);
} }
} }

View File

@ -7,6 +7,7 @@ use crossterm::{
use krata::v1::common::ZoneState; use krata::v1::common::ZoneState;
use krata::{ use krata::{
events::EventStream, events::EventStream,
v1::common::TerminalSize,
v1::control::{ v1::control::{
watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply, watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest, ZoneConsoleRequest,
@ -15,6 +16,7 @@ use krata::{
use log::debug; use log::debug;
use tokio::{ use tokio::{
io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt}, io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt},
select,
task::JoinHandle, task::JoinHandle,
}; };
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
@ -22,6 +24,11 @@ use tonic::Streaming;
pub struct StdioConsoleStream; pub struct StdioConsoleStream;
enum ExecStdinSelect {
DataRead(std::io::Result<usize>),
TerminalResize,
}
impl StdioConsoleStream { impl StdioConsoleStream {
pub async fn stdin_stream( pub async fn stdin_stream(
zone: String, zone: String,
@ -49,30 +56,106 @@ impl StdioConsoleStream {
} }
} }
pub async fn stdin_stream_exec( #[cfg(unix)]
pub async fn input_stream_exec(
initial: ExecInsideZoneRequest, initial: ExecInsideZoneRequest,
tty: bool,
) -> impl Stream<Item = ExecInsideZoneRequest> { ) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin(); let mut stdin = stdin();
stream! { stream! {
yield initial; yield initial;
let mut buffer = vec![0u8; 60]; let mut buffer = vec![0u8; 60];
let mut terminal_size_change = if tty {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()).ok()
} else {
None
};
let mut stdin_closed = false;
loop { loop {
let size = match stdin.read(&mut buffer).await { let selected = if let Some(ref mut terminal_size_change) = terminal_size_change {
Ok(size) => size, if stdin_closed {
Err(error) => { select! {
debug!("failed to read stdin: {}", error); _ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
break; }
} else {
select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
_ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
}
}
} else {
select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
} }
}; };
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d { match selected {
break; ExecStdinSelect::DataRead(result) => {
match result {
Ok(size) => {
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d {
break;
}
stdin_closed = size == 0;
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
},
Err(error) => {
debug!("failed to read stdin: {}", error);
break;
}
}
},
ExecStdinSelect::TerminalResize => {
if let Ok((columns, rows)) = crossterm::terminal::size() {
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: Some(TerminalSize {
rows: rows as u32,
columns: columns as u32,
}), stdin: vec![], stdin_closed: false, };
}
}
} }
let stdin_closed = size == 0; }
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, stdin, stdin_closed, }; }
if stdin_closed { }
break;
#[cfg(not(unix))]
pub async fn input_stream_exec(
initial: ExecInsideZoneRequest,
_tty: bool,
) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin();
stream! {
yield initial;
let mut buffer = vec![0u8; 60];
let mut stdin_closed = false;
loop {
let selected = select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
};
match selected {
ExecStdinSelect::DataRead(result) => {
match result {
Ok(size) => {
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d {
break;
}
stdin_closed = size == 0;
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
},
Err(error) => {
debug!("failed to read stdin: {}", error);
break;
}
}
},
_ => {
continue;
}
} }
} }
} }
@ -96,7 +179,7 @@ impl StdioConsoleStream {
} }
pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>, raw: bool) -> Result<i32> { pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>, raw: bool) -> Result<i32> {
if raw && stdin().is_tty() { if raw {
enable_raw_mode()?; enable_raw_mode()?;
StdioConsoleStream::register_terminal_restore_hook()?; StdioConsoleStream::register_terminal_restore_hook()?;
} }

View File

@ -13,7 +13,8 @@ use krata::{
idm::internal::{ idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType, exec_stream_request_update::Update, request::Request as IdmRequestType,
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart, response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest, ExecStreamRequestStdin, ExecStreamRequestTerminalSize, ExecStreamRequestUpdate,
Request as IdmRequest,
}, },
v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest}, v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
}; };
@ -61,6 +62,12 @@ impl ExecInsideZoneRpc {
command: task.command, command: task.command,
working_directory: task.working_directory, working_directory: task.working_directory,
tty: task.tty, tty: task.tty,
terminal_size: request.terminal_size.map(|size| {
ExecStreamRequestTerminalSize {
rows: size.rows,
columns: size.columns,
}
}),
})), })),
})), })),
}; };
@ -87,6 +94,16 @@ impl ExecInsideZoneRpc {
})), })),
}))}).await; }))}).await;
} }
if let Some(ref terminal_size) = update.terminal_size {
let _ = handle.update(IdmRequest {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::TerminalResize(ExecStreamRequestTerminalSize {
rows: terminal_size.rows,
columns: terminal_size.columns,
})),
}))}).await;
}
} }
}, },
x = handle.receiver.recv() => match x { x = handle.receiver.recv() => match x {

View File

@ -46,6 +46,7 @@ message ExecStreamRequestStart {
repeated string command = 2; repeated string command = 2;
string working_directory = 3; string working_directory = 3;
bool tty = 4; bool tty = 4;
ExecStreamRequestTerminalSize terminal_size = 5;
} }
message ExecStreamRequestStdin { message ExecStreamRequestStdin {
@ -53,10 +54,16 @@ message ExecStreamRequestStdin {
bool closed = 2; bool closed = 2;
} }
message ExecStreamRequestTerminalSize {
uint32 rows = 1;
uint32 columns = 2;
}
message ExecStreamRequestUpdate { message ExecStreamRequestUpdate {
oneof update { oneof update {
ExecStreamRequestStart start = 1; ExecStreamRequestStart start = 1;
ExecStreamRequestStdin stdin = 2; ExecStreamRequestStdin stdin = 2;
ExecStreamRequestTerminalSize terminal_resize = 3;
} }
} }

View File

@ -134,3 +134,8 @@ enum ZoneMetricFormat {
ZONE_METRIC_FORMAT_INTEGER = 2; ZONE_METRIC_FORMAT_INTEGER = 2;
ZONE_METRIC_FORMAT_DURATION_SECONDS = 3; ZONE_METRIC_FORMAT_DURATION_SECONDS = 3;
} }
message TerminalSize {
uint32 rows = 1;
uint32 columns = 2;
}

View File

@ -91,6 +91,7 @@ message ExecInsideZoneRequest {
krata.v1.common.ZoneTaskSpec task = 2; krata.v1.common.ZoneTaskSpec task = 2;
bytes stdin = 3; bytes stdin = 3;
bool stdin_closed = 4; bool stdin_closed = 4;
krata.v1.common.TerminalSize terminal_size = 5;
} }
message ExecInsideZoneReply { message ExecInsideZoneReply {

View File

@ -70,7 +70,11 @@ impl ZoneExecTask {
let code: c_int; let code: c_int;
if start.tty { if start.tty {
let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?; let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?;
pty.resize(Size::new(24, 80))?; let size = start
.terminal_size
.map(|x| Size::new(x.rows as u16, x.columns as u16))
.unwrap_or_else(|| Size::new(24, 80));
pty.resize(size)?;
let pts = pty let pts = pty
.pts() .pts()
.map_err(|error| anyhow!("unable to allocate pts: {}", error))?; .map_err(|error| anyhow!("unable to allocate pts: {}", error))?;
@ -130,16 +134,24 @@ impl ZoneExecTask {
continue; continue;
}; };
let Some(Update::Stdin(update)) = update.update else { match update.update {
continue; Some(Update::Stdin(update)) => {
}; if !update.data.is_empty()
&& write.write_all(&update.data).await.is_err()
{
break;
}
if !update.data.is_empty() && write.write_all(&update.data).await.is_err() { if update.closed {
break; break;
} }
}
if update.closed { Some(Update::TerminalResize(size)) => {
break; let _ = write.resize(Size::new(size.rows as u16, size.columns as u16));
}
_ => {
continue;
}
} }
} }
}); });