WIP: feat(zone): drop Command in favour of posix_spawn

This change introduces custom process spawning logic around
libc::posix_spawn/p, as well as a custom set of stdio wrappers using the
Tokio AsyncRead/AsyncWrite traits.

Currently this change is broken, stdio seeming to hang.
This commit is contained in:
Khionu Sybiern 2024-08-08 15:03:24 -07:00
parent 6bf3741ec9
commit 089461e545
No known key found for this signature in database
10 changed files with 573 additions and 81 deletions

1
Cargo.lock generated
View File

@ -1594,6 +1594,7 @@ dependencies = [
"nix 0.29.0",
"oci-spec",
"path-absolutize",
"pin-project-lite",
"platform-info",
"rtnetlink",
"serde",

View File

@ -21,6 +21,7 @@ log = { workspace = true }
nix = { workspace = true, features = ["ioctl", "process", "fs"] }
oci-spec = { workspace = true }
path-absolutize = { workspace = true }
pin-project-lite = { workspace = true }
platform-info = { workspace = true }
rtnetlink = { workspace = true }
serde = { workspace = true }

View File

@ -1,11 +1,11 @@
use crate::{
childwait::{ChildEvent, ChildWait},
death,
exec::ZoneExecTask,
metrics::MetricsCollector,
};
use anyhow::Result;
use log::debug;
use cgroups_rs::Cgroup;
use libc::pid_t;
use tokio::sync::broadcast::Receiver;
use tokio::{select, sync::broadcast};
use krata::idm::{
client::{IdmClientStreamResponseHandle, IdmInternalClient},
internal::{
@ -14,21 +14,24 @@ use krata::idm::{
MetricsResponse, PingResponse, Request, Response,
},
};
use log::debug;
use nix::unistd::Pid;
use tokio::sync::broadcast::Receiver;
use tokio::{select, sync::broadcast};
use crate::{
childwait::{ChildEvent, ChildWait},
death,
exec::ZoneExecTask,
metrics::MetricsCollector,
};
pub struct ZoneBackground {
idm: IdmInternalClient,
child: Pid,
child: pid_t,
_cgroup: Cgroup,
wait: ChildWait,
child_receiver: Receiver<ChildEvent>,
}
impl ZoneBackground {
pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: pid_t) -> Result<ZoneBackground> {
let (wait, child_receiver) = ChildWait::new()?;
Ok(ZoneBackground {
idm,

View File

@ -1,7 +1,3 @@
use anyhow::Result;
use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
use log::warn;
use nix::unistd::Pid;
use std::thread::sleep;
use std::time::Duration;
use std::{
@ -12,13 +8,18 @@ use std::{
},
thread::{self, JoinHandle},
};
use anyhow::Result;
use log::warn;
use libc::{c_int, pid_t, waitpid, WEXITSTATUS, WIFEXITED};
use tokio::sync::broadcast::{channel, Receiver, Sender};
const CHILD_WAIT_QUEUE_LEN: usize = 10;
#[derive(Clone, Copy, Debug)]
pub struct ChildEvent {
pub pid: Pid,
pub pid: pid_t,
pub status: c_int,
}
@ -75,10 +76,8 @@ impl ChildWaitTask {
continue;
}
if WIFEXITED(status) {
let event = ChildEvent {
pid: Pid::from_raw(pid),
status: WEXITSTATUS(status),
};
let status = WEXITSTATUS(status);
let event = ChildEvent { pid, status };
let _ = self.sender.send(event);
if self.signal.load(Ordering::Acquire) {

View File

@ -1,10 +1,13 @@
use std::{collections::HashMap, process::Stdio};
use std::{
collections::HashMap,
ffi::CString,
path::PathBuf,
};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
};
use krata::idm::{
@ -16,7 +19,10 @@ use krata::idm::{
internal::{response::Response as ResponseType, Request, Response},
};
use crate::childwait::ChildWait;
use crate::{
childwait::ChildWait,
spawn::child::ChildSpec,
};
pub struct ZoneExecTask {
pub wait: ChildWait,
@ -39,11 +45,14 @@ impl ZoneExecTask {
return Err(anyhow!("first request did not contain a start update"));
};
let mut cmd = start.command.clone();
let cmd = start.command.clone();
if cmd.is_empty() {
return Err(anyhow!("command line was empty"));
}
let exe = cmd.remove(0);
let exe: PathBuf = cmd[0].clone().into();
let cmd = cmd.into_iter().map(CString::new).collect::<Result<Vec<CString>, _>>()?;
let mut env = HashMap::new();
for entry in &start.environment {
env.insert(entry.key.clone(), entry.value.clone());
@ -56,37 +65,29 @@ impl ZoneExecTask {
);
}
let dir = if start.working_directory.is_empty() {
let working_dir = if start.working_directory.is_empty() {
"/".to_string()
} else {
start.working_directory.clone()
};
let mut wait_subscription = self.wait.subscribe().await?;
let mut child = Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|error| anyhow!("failed to spawn: {}", error))?;
let wait_rx = self.wait.subscribe().await?;
let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("stdin was missing"))?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("stdout was missing"))?;
let mut stderr = child
.stderr
.take()
.ok_or_else(|| anyhow!("stderr was missing"))?;
let spec = ChildSpec {
exe: PathBuf::from(exe),
cmd,
env,
tty: false,
cgroup: None,
working_dir,
with_new_session: false,
};
let mut child = spec.spawn(wait_rx).context("failed to spawn")?;
let mut stdin = child.stdin.take().context("stdin was missing")?;
let mut stdout = child.stdout.take().context("stdout was missing")?;
let mut stderr = child.stderr.take().context("stderr was missing")?;
let stdout_handle = self.handle.clone();
let stdout_task = tokio::task::spawn(async move {
@ -161,18 +162,12 @@ impl ZoneExecTask {
stdin_task.abort();
});
let code = loop {
if let Ok(event) = wait_subscription.recv().await {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
};
let exit_code = child.wait().await?;
data_task.await?;
let response = Response {
response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate {
exited: true,
exit_code: code,
exit_code,
error: String::new(),
stdout: vec![],
stderr: vec![],

View File

@ -1,30 +1,33 @@
use std::{
collections::HashMap,
ffi::CString,
fs::{File, OpenOptions, Permissions},
io,
net::{Ipv4Addr, Ipv6Addr},
os::fd::AsRawFd,
os::unix::{ffi::OsStrExt, fs::{chroot, PermissionsExt}},
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::{anyhow, Result};
use log::{trace, warn};
use cgroups_rs::{Cgroup, CgroupPid};
use futures::stream::TryStreamExt;
use libc::{pid_t, sethostname, setsid, TIOCSCTTY};
use nix::{ioctl_write_int_bad, unistd::{dup2, execve, fork, ForkResult}};
use oci_spec::image::{Config, ImageConfiguration};
use path_absolutize::Absolutize;
use platform_info::{PlatformInfo, PlatformInfoAPI, UNameAPI};
use sys_mount::{FilesystemType, Mount, MountFlags};
use tokio::fs;
use ipnetwork::IpNetwork;
use krata::ethtool::EthtoolHandle;
use krata::idm::client::IdmInternalClient;
use krata::idm::internal::INTERNAL_IDM_CHANNEL;
use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat};
use libc::{sethostname, setsid, TIOCSCTTY};
use log::{trace, warn};
use nix::ioctl_write_int_bad;
use nix::unistd::{dup2, execve, fork, ForkResult, Pid};
use oci_spec::image::{Config, ImageConfiguration};
use path_absolutize::Absolutize;
use platform_info::{PlatformInfo, PlatformInfoAPI, UNameAPI};
use std::collections::HashMap;
use std::ffi::CString;
use std::fs::{File, OpenOptions, Permissions};
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::os::fd::AsRawFd;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::{chroot, PermissionsExt};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use sys_mount::{FilesystemType, Mount, MountFlags};
use tokio::fs;
use crate::background::ZoneBackground;
@ -606,7 +609,7 @@ impl ZoneInit {
env: Vec<CString>,
) -> Result<()> {
match unsafe { fork()? } {
ForkResult::Parent { child } => self.background(idm, cgroup, child).await,
ForkResult::Parent { child } => self.background(idm, cgroup, child.as_raw()).await,
ForkResult::Child => self.foreground(cgroup, working_dir, path, cmd, env).await,
}
}
@ -638,7 +641,7 @@ impl ZoneInit {
&mut self,
idm: IdmInternalClient,
cgroup: Cgroup,
executed: Pid,
executed: pid_t,
) -> Result<()> {
let mut background = ZoneBackground::new(idm, cgroup, executed).await?;
background.run().await?;

View File

@ -9,6 +9,7 @@ pub mod childwait;
pub mod exec;
pub mod init;
pub mod metrics;
pub mod spawn;
pub async fn death(code: c_int) -> Result<()> {
let store = XsdClient::open().await?;

View File

@ -0,0 +1,183 @@
use std::{
collections::HashMap,
ffi::CString,
io,
mem::MaybeUninit,
path::PathBuf,
ptr::addr_of_mut,
};
use anyhow::{bail, Context, Result};
use log::{debug, error};
use cgroups_rs::{Cgroup, CgroupPid};
use tokio::sync::broadcast;
use crate::childwait::ChildEvent;
use super::stdio::{StdioSet, Stderr, Stdin, Stdout};
pub struct Child {
pub stdin: Option<Stdin>,
pub stdout: Option<Stdout>,
pub stderr: Option<Stderr>,
pid: libc::pid_t,
reaper_rx: broadcast::Receiver<ChildEvent>,
}
// TODO: impl From<OciImage>
/// Command used to spawn a child process
#[derive(Debug, Clone)]
pub struct ChildSpec {
/// The executable, with or without path, path relative
/// or absolute, to run.
pub exe: PathBuf,
/// The args to pass, as POSIX specifies
pub cmd: Vec<CString>,
/// Env vars to be set
pub env: HashMap<String, String>,
/// Working directory to set just before spawning
pub working_dir: String,
/// Cgroup we'll use for the child
pub cgroup: Option<Cgroup>,
/// Whether to create the child in a new session
/// This is mainly for image entrypoint
pub with_new_session: bool,
/// Whether to use tty
pub tty: bool,
}
impl Child {
pub fn pid(&self) -> libc::pid_t { self.pid }
pub async fn wait(mut self) -> Result<libc::c_int> {
debug!("waiting on process {}", self.pid);
loop {
let Ok(e) = self.reaper_rx.recv().await
else { bail!("dead reaper - ironic"); };
if e.pid == self.pid {
return Ok(e.status);
}
}
}
}
impl ChildSpec {
pub fn spawn(self, reaper_rx: broadcast::Receiver<ChildEvent>) -> Result<Child> {
let Self {
exe,
cmd,
env,
working_dir,
cgroup,
with_new_session,
tty,
..
} = self;
let mut stdio = if tty {
StdioSet::new_pty().context("failed to spawn pty")?
} else {
StdioSet::new_pipes().context("failed to alloc pipes")?
};
let mut file_actions: libc::posix_spawn_file_actions_t = unsafe {
let mut fa = MaybeUninit::uninit();
libc::posix_spawn_file_actions_init(fa.as_mut_ptr());
fa.assume_init()
};
stdio.add_to_spawn_file_actions(&mut file_actions)?;
let spawnattr: libc::posix_spawnattr_t = unsafe {
let mut spawnattr = MaybeUninit::uninit();
libc::posix_spawnattr_init(spawnattr.as_mut_ptr());
// SAFETY: Both flags use 8 bits or less
#[allow(overflowing_literals)]
let mut flags = 0;
// If we start a new session, spawn will create a new pgroup, too
if with_new_session {
flags |= libc::POSIX_SPAWN_SETSID as i16;
} else {
flags |= libc::POSIX_SPAWN_SETPGROUP as i16;
}
match libc::posix_spawnattr_setflags(spawnattr.as_mut_ptr(), flags) {
x if x > 0 => {
error!("error on posix_spawnattr_setflags - res {x}");
return Err(io::Error::last_os_error().into());
},
_ => {}
};
spawnattr.assume_init()
};
let old_working_dir = std::env::current_dir().context("failed to retriev CWD")?;
std::env::set_current_dir(working_dir).context("failed to change CWD")?;
let mut pid: libc::pid_t = 0;
let spawn = if exe.is_relative() {
debug!("relying on libc to do executable lookup");
libc::posix_spawnp
} else {
debug!("absolute command path found");
libc::posix_spawn
};
// SAFETY: We're using the raw underlying value, then rewrapping it for Drop
let res = unsafe {
let exe = CString::new(exe.as_os_str().as_encoded_bytes())?;
let cmd = cmd.into_iter()
.map(CString::into_raw)
.chain(Some(std::ptr::null_mut()))
.collect::<Vec<*mut i8>>();
let env = env.iter()
.map(|(key, value)| CString::new(format!("{}={}", key, value)).context("null byte in env vars"))
.collect::<Result<Vec<CString>>>()?;
let env = env.into_iter()
.map(CString::into_raw)
.chain(Some(std::ptr::null_mut()))
.collect::<Vec<*mut i8>>();
// TODO: Safety comment
let res = spawn(
addr_of_mut!(pid),
exe.as_ptr(),
&file_actions,
&spawnattr,
cmd.as_slice().as_ptr(),
env.as_slice().as_ptr(),
);
let _ = cmd.into_iter().map(|a| CString::from_raw(a));
let _ = env.into_iter().map(|e| CString::from_raw(e));
res
};
std::env::set_current_dir(old_working_dir).context("failed to restore previous CWD")?;
if res != 0 {
error!("Failed to spawn process: return value of {res}");
return Err(io::Error::last_os_error().into());
}
if let Some(cg) = cgroup {
cg.add_task(CgroupPid::from(pid as u64)).context("failed to add child to cgroup")?;
}
let (stdin, stdout, stderr) = stdio.get_parent_side()?;
Ok(Child {
pid,
reaper_rx,
stdin: Some(stdin),
stdout: Some(stdout),
stderr: Some(stderr),
})
}
}

View File

@ -0,0 +1,2 @@
pub mod child;
pub mod stdio;

View File

@ -0,0 +1,304 @@
use std::{
io,
os::fd::{AsRawFd, IntoRawFd, RawFd},
pin::Pin,
task::{Context, Poll, ready},
};
use anyhow::{bail, Context as _, Result};
use pin_project_lite::pin_project;
use tokio::io::{
AsyncRead, AsyncWrite, Interest, ReadBuf,
unix::AsyncFd,
};
type SpawnFileActions = libc::posix_spawn_file_actions_t;
pub struct StdioSet {
parent: Option<StdioSubset>,
child: Option<StdioSubset>,
}
pub struct StdioSubset {
stdin: Stdio,
stdout: Stdio,
stderr: Stdio,
}
pin_project! {
pub struct Stdin {
#[pin] inner: Stdio
}
}
pin_project! {
pub struct Stdout {
#[pin] inner: Stdio
}
}
pin_project! {
pub struct Stderr {
#[pin] inner: Stdio
}
}
struct Stdio(RawFd);
impl StdioSet {
pub fn add_to_spawn_file_actions(&mut self, attr: &mut SpawnFileActions) -> Result<()> {
let Some(stdio) = self.child.take() else { bail!("already used child-side fd's") };
let res_in = unsafe {
libc::posix_spawn_file_actions_adddup2(
attr, stdio.stdin.0, libc::STDIN_FILENO
)
};
let res_out = unsafe {
libc::posix_spawn_file_actions_adddup2(
attr, stdio.stdout.0, libc::STDOUT_FILENO
)
};
let res_err = unsafe {
libc::posix_spawn_file_actions_adddup2(
attr, stdio.stderr.0, libc::STDERR_FILENO
)
};
// It is highly unlikely that they will fail from different errors, and
// even if they did, they're all fatal and need to be addressed by the
// user deploying.
match (res_in, res_out, res_err) {
(0, 0, 0) => Ok(()),
_ => Err(std::io::Error::last_os_error().into()),
}
}
pub fn get_parent_side(&mut self) -> Result<(Stdin, Stdout, Stderr)> {
let StdioSubset { stdin, stdout, stderr }
= self.parent.take().context("stdio handles already taken")?;
Ok((Stdin { inner: stdin }, Stdout { inner: stdout }, Stderr { inner: stderr }))
}
pub fn new_pty() -> Result<Self> {
use nix::{fcntl::{self, FcntlArg, OFlag}, pty};
// Open the Pseudoterminal with +rw capabilities and without
// setting it as our controlling terminal
let pty = pty::posix_openpt(OFlag::O_RDWR | OFlag::O_NOCTTY)?;
// Grant access to the side we pass to the child
// This is referred to as the "slave"
pty::grantpt(&pty)?;
// Unlock the "slave" device
pty::unlockpt(&pty)?;
// Retrieve the "slave" device
let pts = {
let name = pty::ptsname_r(&pty)?;
std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(name)?
.into_raw_fd()
};
// Get the RawFd out of the OwnedFd because OwnedFd
// sets CLOEXEC on clone
let pty = pty.as_raw_fd();
// Make the "master" async-ready by setting NONBLOCK
let mut opts = OFlag::from_bits(fcntl::fcntl(pty, FcntlArg::F_GETFL)?)
.expect("got bad O_FLAG bits from kernel");
opts |= OFlag::O_NONBLOCK;
fcntl::fcntl(pty, FcntlArg::F_SETFL(opts))?;
Ok(Self {
child: Some(StdioSubset {
stdin: Stdio(pts.clone()),
stdout: Stdio(pts.clone()),
stderr: Stdio(pts),
}),
parent: Some(StdioSubset {
stdin: Stdio(pty.clone()),
stdout: Stdio(pty.clone()),
stderr: Stdio(pty),
}),
})
}
pub fn new_pipes() -> Result<Self> {
let (stdin_child, stdin_parent) = make_pipe()?;
let (stdout_parent, stdout_child) = make_pipe()?;
let (stderr_parent, stderr_child) = make_pipe()?;
Ok(Self {
parent: Some(StdioSubset {
stdin: Stdio(stdin_parent),
stdout: Stdio(stdout_parent),
stderr: Stdio(stderr_parent),
}),
child: Some(StdioSubset {
stdin: Stdio(stdin_child),
stdout: Stdio(stdout_child),
stderr: Stdio(stderr_child),
}),
})
}
}
impl AsyncRead for Stdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<std::io::Result<()>> {
// SAFETY: if this fails, we have a bug in our pty/pipe allocations
let fd = AsyncFd::with_interest(self.0, Interest::READABLE)
.expect("async io failure");
loop {
let mut guard = ready!(fd.poll_read_ready(cx))?;
let count = buf.remaining();
let res = guard.try_io(|i| match unsafe {
let buf_ptr = buf.initialize_unfilled().as_mut_ptr().cast();
libc::read(i.as_raw_fd(), buf_ptr, count)
} {
-1 => Err(std::io::Error::last_os_error()),
// SAFETY: write returns -1..=isize::MAX, and
// we've already ruled out -1, so this will be
// a valid usize.
n => { buf.advance(n.try_into().unwrap()); Ok(()) }
});
if let Ok(r) = res {
// Err will ever only be WouldBlock, so we allow
// the loop to try again. `r` is the inner Result
// of try_io
return Poll::Ready(r);
}
}
}
}
impl AsyncWrite for Stdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<io::Result<usize>> {
// SAFETY: if this fails, we have a bug in our pty/pipe allocations
let fd = AsyncFd::with_interest(self.0, Interest::WRITABLE)
.expect("async io failure");
loop {
let mut guard = ready!(fd.poll_write_ready(cx))?;
let res = guard.try_io(|i| match unsafe {
libc::write(i.as_raw_fd(), buf.as_ptr().cast(), buf.len())
} {
-1 => Err(io::Error::last_os_error()),
// SAFETY: write returns -1..=isize::MAX, and
// we've already ruled out -1, so this will be
// a valid usize.
n => Ok(n.try_into().unwrap()),
});
if let Ok(r) = res {
// Err will ever only be WouldBlock, so we allow
// the loop to try again. `r` is the inner Result
// of try_io
return Poll::Ready(r);
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for Stdin {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for Stdout {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}
impl AsyncRead for Stderr {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}
fn make_pipe() -> Result<(RawFd, RawFd)> {
// Init two null file descriptors
// [read, write]
let mut raw_fds: [RawFd; 2] = [0, 0];
// Allocate the pipe and get each end of, setting as non-blocking
let res = unsafe { libc::pipe(raw_fds.as_mut_ptr().cast()) };
if res == -1 { return Err(io::Error::last_os_error().into()); }
// We split the pipe into its ends so we can be explicit
// which end is which.
let [read, write] = raw_fds;
// Wipe the flags, because CLOEXEC is on by default
let flags = libc::O_NONBLOCK;
f_setfl(read, flags)?;
f_setfl(write, flags)?;
Ok((read, write))
}
fn f_setfl(fd: RawFd, flags: libc::c_int) -> Result<()> {
let res = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
if res == -1 { return Err(io::Error::last_os_error().into()); }
Ok(())
}