feature(exec): implement tty support (fixes #335) (#336)

This commit is contained in:
Alex Zenla
2024-08-14 12:45:59 -07:00
committed by GitHub
parent 87530edf70
commit bf3b73bf24
13 changed files with 350 additions and 166 deletions

View File

@ -22,6 +22,7 @@ nix = { workspace = true, features = ["ioctl", "process", "fs"] }
oci-spec = { workspace = true }
path-absolutize = { workspace = true }
platform-info = { workspace = true }
pty-process = { workspace = true, features = ["async"] }
rtnetlink = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@ -1,12 +1,6 @@
use std::{collections::HashMap, process::Stdio};
use anyhow::{anyhow, Result};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
};
use krata::idm::{
client::IdmClientStreamResponseHandle,
internal::{
@ -15,6 +9,14 @@ use krata::idm::{
},
internal::{response::Response as ResponseType, Request, Response},
};
use libc::c_int;
use pty_process::{Pty, Size};
use tokio::process::Child;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
};
use crate::childwait::ChildWait;
@ -52,7 +54,7 @@ impl ZoneExecTask {
if !env.contains_key("PATH") {
env.insert(
"PATH".to_string(),
"/bin:/usr/bin:/usr/local/bin".to_string(),
"/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin".to_string(),
);
}
@ -63,112 +65,196 @@ impl ZoneExecTask {
};
let mut wait_subscription = self.wait.subscribe().await?;
let mut child = Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|error| anyhow!("failed to spawn: {}", error))?;
let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("stdin was missing"))?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("stdout was missing"))?;
let mut stderr = child
.stderr
.take()
.ok_or_else(|| anyhow!("stderr was missing"))?;
let stdout_handle = self.handle.clone();
let stdout_task = tokio::task::spawn(async move {
let mut stdout_buffer = vec![0u8; 8 * 1024];
loop {
let Ok(size) = stdout.read(&mut stdout_buffer).await else {
break;
};
if size > 0 {
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: false,
exit_code: 0,
error: String::new(),
stdout: stdout_buffer[0..size].to_vec(),
stderr: vec![],
})),
let code: c_int;
if start.tty {
let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?;
pty.resize(Size::new(24, 80))?;
let mut child = ChildDropGuard {
inner: pty_process::Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
.spawn(
&pty.pts()
.map_err(|error| anyhow!("unable to allocate pts: {}", error))?,
)
.map_err(|error| anyhow!("failed to spawn: {}", error))?,
kill: true,
};
let pid = child
.inner
.id()
.ok_or_else(|| anyhow!("pid is not provided"))?;
let (mut read, mut write) = pty.into_split();
let pty_read_handle = self.handle.clone();
let pty_read_task = tokio::task::spawn(async move {
let mut stdout_buffer = vec![0u8; 8 * 1024];
loop {
let Ok(size) = read.read(&mut stdout_buffer).await else {
break;
};
let _ = stdout_handle.respond(response).await;
} else {
break;
if size > 0 {
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: false,
exit_code: 0,
error: String::new(),
stdout: stdout_buffer[0..size].to_vec(),
stderr: vec![],
})),
};
let _ = pty_read_handle.respond(response).await;
} else {
break;
}
}
}
});
});
let stderr_handle = self.handle.clone();
let stderr_task = tokio::task::spawn(async move {
let mut stderr_buffer = vec![0u8; 8 * 1024];
loop {
let Ok(size) = stderr.read(&mut stderr_buffer).await else {
break;
};
if size > 0 {
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: false,
exit_code: 0,
error: String::new(),
stdout: vec![],
stderr: stderr_buffer[0..size].to_vec(),
})),
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
break;
};
let _ = stderr_handle.respond(response).await;
} else {
break;
let Some(RequestType::ExecStream(update)) = request.request else {
continue;
};
let Some(Update::Stdin(update)) = update.update else {
continue;
};
if !update.data.is_empty() && write.write_all(&update.data).await.is_err() {
break;
}
if update.closed {
break;
}
}
}
});
});
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
break;
};
let Some(RequestType::ExecStream(update)) = request.request else {
continue;
};
let Some(Update::Stdin(update)) = update.update else {
continue;
};
if stdin.write_all(&update.data).await.is_err() {
break;
code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
}
});
};
let data_task = tokio::task::spawn(async move {
let _ = join!(stdout_task, stderr_task);
child.kill = false;
let _ = join!(pty_read_task);
stdin_task.abort();
});
} else {
let mut child = Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|error| anyhow!("failed to spawn: {}", error))?;
let code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("stdin was missing"))?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("stdout was missing"))?;
let mut stderr = child
.stderr
.take()
.ok_or_else(|| anyhow!("stderr was missing"))?;
let stdout_handle = self.handle.clone();
let stdout_task = tokio::task::spawn(async move {
let mut stdout_buffer = vec![0u8; 8 * 1024];
loop {
let Ok(size) = stdout.read(&mut stdout_buffer).await else {
break;
};
if size > 0 {
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: false,
exit_code: 0,
error: String::new(),
stdout: stdout_buffer[0..size].to_vec(),
stderr: vec![],
})),
};
let _ = stdout_handle.respond(response).await;
} else {
break;
}
}
}
};
data_task.await?;
});
let stderr_handle = self.handle.clone();
let stderr_task = tokio::task::spawn(async move {
let mut stderr_buffer = vec![0u8; 8 * 1024];
loop {
let Ok(size) = stderr.read(&mut stderr_buffer).await else {
break;
};
if size > 0 {
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: false,
exit_code: 0,
error: String::new(),
stdout: vec![],
stderr: stderr_buffer[0..size].to_vec(),
})),
};
let _ = stderr_handle.respond(response).await;
} else {
break;
}
}
});
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
break;
};
let Some(RequestType::ExecStream(update)) = request.request else {
continue;
};
let Some(Update::Stdin(update)) = update.update else {
continue;
};
if stdin.write_all(&update.data).await.is_err() {
break;
}
}
});
let data_task = tokio::task::spawn(async move {
let _ = join!(stdout_task, stderr_task);
stdin_task.abort();
});
code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
};
data_task.await?;
}
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: true,
@ -183,3 +269,16 @@ impl ZoneExecTask {
Ok(())
}
}
struct ChildDropGuard {
pub inner: Child,
pub kill: bool,
}
impl Drop for ChildDropGuard {
fn drop(&mut self) {
if self.kill {
drop(self.inner.start_kill());
}
}
}

View File

@ -2,7 +2,7 @@ use std::{ops::Add, path::Path};
use anyhow::Result;
use krata::idm::internal::{MetricFormat, MetricNode};
use sysinfo::Process;
use sysinfo::{Process, ProcessesToUpdate};
pub struct MetricsCollector {}
@ -38,7 +38,7 @@ impl MetricsCollector {
}
fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result<MetricNode> {
sysinfo.refresh_processes();
sysinfo.refresh_processes(ProcessesToUpdate::All);
let mut processes = Vec::new();
let mut sysinfo_processes = sysinfo.processes().values().collect::<Vec<_>>();
sysinfo_processes.sort_by_key(|x| x.pid());
@ -70,7 +70,11 @@ impl MetricsCollector {
metrics.push(MetricNode::raw_value("cwd", working_directory));
}
let cmdline = process.cmd().to_vec();
let cmdline = process
.cmd()
.iter()
.map(|x| x.to_string_lossy().to_string())
.collect::<Vec<_>>();
metrics.push(MetricNode::raw_value("cmdline", cmdline));
metrics.push(MetricNode::structural(
"memory",