feature(zone-exec): implement terminal resize support

This commit is contained in:
Alex Zenla 2024-08-25 21:19:07 -07:00
parent f1e3d59b6a
commit 876517a213
No known key found for this signature in database
GPG Key ID: 067B238899B51269
7 changed files with 165 additions and 28 deletions

View File

@ -3,11 +3,13 @@ use std::collections::HashMap;
use anyhow::Result;
use clap::Parser;
use crossterm::tty::IsTty;
use krata::v1::{
common::{ZoneTaskSpec, ZoneTaskSpecEnvVar},
common::{TerminalSize, ZoneTaskSpec, ZoneTaskSpecEnvVar},
control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest},
};
use tokio::io::stdin;
use tonic::{transport::Channel, Request};
use crate::console::StdioConsoleStream;
@ -36,6 +38,7 @@ pub struct ZoneExecCommand {
impl ZoneExecCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let zone_id: String = resolve_zone(&mut client, &self.zone).await?;
let should_map_tty = self.tty && stdin().is_tty();
let initial = ExecInsideZoneRequest {
zone_id,
task: Some(ZoneTaskSpec {
@ -52,16 +55,25 @@ impl ZoneExecCommand {
}),
stdin: vec![],
stdin_closed: false,
terminal_size: if should_map_tty {
let size = crossterm::terminal::size().ok();
size.map(|(columns, rows)| TerminalSize {
rows: rows as u32,
columns: columns as u32,
})
} else {
None
},
};
let stream = StdioConsoleStream::stdin_stream_exec(initial).await;
let stream = StdioConsoleStream::input_stream_exec(initial, should_map_tty).await;
let response = client
.exec_inside_zone(Request::new(stream))
.await?
.into_inner();
let code = StdioConsoleStream::exec_output(response, self.tty).await?;
let code = StdioConsoleStream::exec_output(response, should_map_tty).await?;
std::process::exit(code);
}
}

View File

@ -7,6 +7,7 @@ use crossterm::{
use krata::v1::common::ZoneState;
use krata::{
events::EventStream,
v1::common::TerminalSize,
v1::control::{
watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest,
@ -15,6 +16,7 @@ use krata::{
use log::debug;
use tokio::{
io::{stderr, stdin, stdout, AsyncReadExt, AsyncWriteExt},
select,
task::JoinHandle,
};
use tokio_stream::{Stream, StreamExt};
@ -22,6 +24,11 @@ use tonic::Streaming;
pub struct StdioConsoleStream;
enum ExecStdinSelect {
DataRead(std::io::Result<usize>),
TerminalResize,
}
impl StdioConsoleStream {
pub async fn stdin_stream(
zone: String,
@ -49,30 +56,106 @@ impl StdioConsoleStream {
}
}
pub async fn stdin_stream_exec(
#[cfg(unix)]
pub async fn input_stream_exec(
initial: ExecInsideZoneRequest,
tty: bool,
) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin();
stream! {
yield initial;
let mut buffer = vec![0u8; 60];
let mut terminal_size_change = if tty {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()).ok()
} else {
None
};
let mut stdin_closed = false;
loop {
let size = match stdin.read(&mut buffer).await {
Ok(size) => size,
Err(error) => {
debug!("failed to read stdin: {}", error);
break;
let selected = if let Some(ref mut terminal_size_change) = terminal_size_change {
if stdin_closed {
select! {
_ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
}
} else {
select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
_ = terminal_size_change.recv() => ExecStdinSelect::TerminalResize,
}
}
} else {
select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
}
};
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d {
break;
match selected {
ExecStdinSelect::DataRead(result) => {
match result {
Ok(size) => {
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d {
break;
}
stdin_closed = size == 0;
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
},
Err(error) => {
debug!("failed to read stdin: {}", error);
break;
}
}
},
ExecStdinSelect::TerminalResize => {
if let Ok((columns, rows)) = crossterm::terminal::size() {
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: Some(TerminalSize {
rows: rows as u32,
columns: columns as u32,
}), stdin: vec![], stdin_closed: false, };
}
}
}
let stdin_closed = size == 0;
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, stdin, stdin_closed, };
if stdin_closed {
break;
}
}
}
#[cfg(not(unix))]
pub async fn input_stream_exec(
initial: ExecInsideZoneRequest,
_tty: bool,
) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin();
stream! {
yield initial;
let mut buffer = vec![0u8; 60];
let mut stdin_closed = false;
loop {
let selected = select! {
result = stdin.read(&mut buffer) => ExecStdinSelect::DataRead(result),
};
match selected {
ExecStdinSelect::DataRead(result) => {
match result {
Ok(size) => {
let stdin = buffer[0..size].to_vec();
if size == 1 && buffer[0] == 0x1d {
break;
}
stdin_closed = size == 0;
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, terminal_size: None, stdin, stdin_closed, };
},
Err(error) => {
debug!("failed to read stdin: {}", error);
break;
}
}
},
_ => {
continue;
}
}
}
}
@ -96,7 +179,7 @@ impl StdioConsoleStream {
}
pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>, raw: bool) -> Result<i32> {
if raw && stdin().is_tty() {
if raw {
enable_raw_mode()?;
StdioConsoleStream::register_terminal_restore_hook()?;
}

View File

@ -13,7 +13,8 @@ use krata::{
idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType,
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest,
ExecStreamRequestStdin, ExecStreamRequestTerminalSize, ExecStreamRequestUpdate,
Request as IdmRequest,
},
v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
};
@ -61,6 +62,12 @@ impl ExecInsideZoneRpc {
command: task.command,
working_directory: task.working_directory,
tty: task.tty,
terminal_size: request.terminal_size.map(|size| {
ExecStreamRequestTerminalSize {
rows: size.rows,
columns: size.columns,
}
}),
})),
})),
};
@ -87,6 +94,16 @@ impl ExecInsideZoneRpc {
})),
}))}).await;
}
if let Some(ref terminal_size) = update.terminal_size {
let _ = handle.update(IdmRequest {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::TerminalResize(ExecStreamRequestTerminalSize {
rows: terminal_size.rows,
columns: terminal_size.columns,
})),
}))}).await;
}
}
},
x = handle.receiver.recv() => match x {

View File

@ -46,6 +46,7 @@ message ExecStreamRequestStart {
repeated string command = 2;
string working_directory = 3;
bool tty = 4;
ExecStreamRequestTerminalSize terminal_size = 5;
}
message ExecStreamRequestStdin {
@ -53,10 +54,16 @@ message ExecStreamRequestStdin {
bool closed = 2;
}
message ExecStreamRequestTerminalSize {
uint32 rows = 1;
uint32 columns = 2;
}
message ExecStreamRequestUpdate {
oneof update {
ExecStreamRequestStart start = 1;
ExecStreamRequestStdin stdin = 2;
ExecStreamRequestTerminalSize terminal_resize = 3;
}
}

View File

@ -134,3 +134,8 @@ enum ZoneMetricFormat {
ZONE_METRIC_FORMAT_INTEGER = 2;
ZONE_METRIC_FORMAT_DURATION_SECONDS = 3;
}
message TerminalSize {
uint32 rows = 1;
uint32 columns = 2;
}

View File

@ -91,6 +91,7 @@ message ExecInsideZoneRequest {
krata.v1.common.ZoneTaskSpec task = 2;
bytes stdin = 3;
bool stdin_closed = 4;
krata.v1.common.TerminalSize terminal_size = 5;
}
message ExecInsideZoneReply {

View File

@ -70,7 +70,11 @@ impl ZoneExecTask {
let code: c_int;
if start.tty {
let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?;
pty.resize(Size::new(24, 80))?;
let size = start
.terminal_size
.map(|x| Size::new(x.rows as u16, x.columns as u16))
.unwrap_or_else(|| Size::new(24, 80));
pty.resize(size)?;
let pts = pty
.pts()
.map_err(|error| anyhow!("unable to allocate pts: {}", error))?;
@ -130,16 +134,24 @@ impl ZoneExecTask {
continue;
};
let Some(Update::Stdin(update)) = update.update else {
continue;
};
match update.update {
Some(Update::Stdin(update)) => {
if !update.data.is_empty()
&& write.write_all(&update.data).await.is_err()
{
break;
}
if !update.data.is_empty() && write.write_all(&update.data).await.is_err() {
break;
}
if update.closed {
break;
if update.closed {
break;
}
}
Some(Update::TerminalResize(size)) => {
let _ = write.resize(Size::new(size.rows as u16, size.columns as u16));
}
_ => {
continue;
}
}
}
});