From 089461e5454857937c3211009322e0c6245720f8 Mon Sep 17 00:00:00 2001 From: Khionu Sybiern Date: Thu, 8 Aug 2024 15:03:24 -0700 Subject: [PATCH] WIP: feat(zone): drop Command in favour of posix_spawn This change introduces custom process spawning logic around libc::posix_spawn/p, as well as a custom set of stdio wrappers using the Tokio AsyncRead/AsyncWrite traits. Currently this change is broken, stdio seeming to hang. --- Cargo.lock | 1 + crates/zone/Cargo.toml | 1 + crates/zone/src/background.rs | 27 +-- crates/zone/src/childwait.rs | 17 +- crates/zone/src/exec.rs | 73 ++++---- crates/zone/src/init.rs | 45 ++--- crates/zone/src/lib.rs | 1 + crates/zone/src/spawn/child.rs | 183 ++++++++++++++++++++ crates/zone/src/spawn/mod.rs | 2 + crates/zone/src/spawn/stdio.rs | 304 +++++++++++++++++++++++++++++++++ 10 files changed, 573 insertions(+), 81 deletions(-) create mode 100644 crates/zone/src/spawn/child.rs create mode 100644 crates/zone/src/spawn/mod.rs create mode 100644 crates/zone/src/spawn/stdio.rs diff --git a/Cargo.lock b/Cargo.lock index 9011436..c2ad6a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1594,6 +1594,7 @@ dependencies = [ "nix 0.29.0", "oci-spec", "path-absolutize", + "pin-project-lite", "platform-info", "rtnetlink", "serde", diff --git a/crates/zone/Cargo.toml b/crates/zone/Cargo.toml index 074754b..06bac7d 100644 --- a/crates/zone/Cargo.toml +++ b/crates/zone/Cargo.toml @@ -21,6 +21,7 @@ log = { workspace = true } nix = { workspace = true, features = ["ioctl", "process", "fs"] } oci-spec = { workspace = true } path-absolutize = { workspace = true } +pin-project-lite = { workspace = true } platform-info = { workspace = true } rtnetlink = { workspace = true } serde = { workspace = true } diff --git a/crates/zone/src/background.rs b/crates/zone/src/background.rs index ea103bd..a7ce881 100644 --- a/crates/zone/src/background.rs +++ b/crates/zone/src/background.rs @@ -1,11 +1,11 @@ -use crate::{ - childwait::{ChildEvent, ChildWait}, - death, - exec::ZoneExecTask, - metrics::MetricsCollector, -}; use anyhow::Result; +use log::debug; + use cgroups_rs::Cgroup; +use libc::pid_t; +use tokio::sync::broadcast::Receiver; +use tokio::{select, sync::broadcast}; + use krata::idm::{ client::{IdmClientStreamResponseHandle, IdmInternalClient}, internal::{ @@ -14,21 +14,24 @@ use krata::idm::{ MetricsResponse, PingResponse, Request, Response, }, }; -use log::debug; -use nix::unistd::Pid; -use tokio::sync::broadcast::Receiver; -use tokio::{select, sync::broadcast}; + +use crate::{ + childwait::{ChildEvent, ChildWait}, + death, + exec::ZoneExecTask, + metrics::MetricsCollector, +}; pub struct ZoneBackground { idm: IdmInternalClient, - child: Pid, + child: pid_t, _cgroup: Cgroup, wait: ChildWait, child_receiver: Receiver, } impl ZoneBackground { - pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result { + pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: pid_t) -> Result { let (wait, child_receiver) = ChildWait::new()?; Ok(ZoneBackground { idm, diff --git a/crates/zone/src/childwait.rs b/crates/zone/src/childwait.rs index 6efe82f..5c2a71e 100644 --- a/crates/zone/src/childwait.rs +++ b/crates/zone/src/childwait.rs @@ -1,7 +1,3 @@ -use anyhow::Result; -use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED}; -use log::warn; -use nix::unistd::Pid; use std::thread::sleep; use std::time::Duration; use std::{ @@ -12,13 +8,18 @@ use std::{ }, thread::{self, JoinHandle}, }; + +use anyhow::Result; +use log::warn; + +use libc::{c_int, pid_t, waitpid, WEXITSTATUS, WIFEXITED}; use tokio::sync::broadcast::{channel, Receiver, Sender}; const CHILD_WAIT_QUEUE_LEN: usize = 10; #[derive(Clone, Copy, Debug)] pub struct ChildEvent { - pub pid: Pid, + pub pid: pid_t, pub status: c_int, } @@ -75,10 +76,8 @@ impl ChildWaitTask { continue; } if WIFEXITED(status) { - let event = ChildEvent { - pid: Pid::from_raw(pid), - status: WEXITSTATUS(status), - }; + let status = WEXITSTATUS(status); + let event = ChildEvent { pid, status }; let _ = self.sender.send(event); if self.signal.load(Ordering::Acquire) { diff --git a/crates/zone/src/exec.rs b/crates/zone/src/exec.rs index 941fc14..13da6ec 100644 --- a/crates/zone/src/exec.rs +++ b/crates/zone/src/exec.rs @@ -1,10 +1,13 @@ -use std::{collections::HashMap, process::Stdio}; +use std::{ + collections::HashMap, + ffi::CString, + path::PathBuf, +}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, join, - process::Command, }; use krata::idm::{ @@ -16,7 +19,10 @@ use krata::idm::{ internal::{response::Response as ResponseType, Request, Response}, }; -use crate::childwait::ChildWait; +use crate::{ + childwait::ChildWait, + spawn::child::ChildSpec, +}; pub struct ZoneExecTask { pub wait: ChildWait, @@ -39,11 +45,14 @@ impl ZoneExecTask { return Err(anyhow!("first request did not contain a start update")); }; - let mut cmd = start.command.clone(); + let cmd = start.command.clone(); if cmd.is_empty() { return Err(anyhow!("command line was empty")); } - let exe = cmd.remove(0); + + let exe: PathBuf = cmd[0].clone().into(); + let cmd = cmd.into_iter().map(CString::new).collect::, _>>()?; + let mut env = HashMap::new(); for entry in &start.environment { env.insert(entry.key.clone(), entry.value.clone()); @@ -56,37 +65,29 @@ impl ZoneExecTask { ); } - let dir = if start.working_directory.is_empty() { + let working_dir = if start.working_directory.is_empty() { "/".to_string() } else { start.working_directory.clone() }; - let mut wait_subscription = self.wait.subscribe().await?; - let mut child = Command::new(exe) - .args(cmd) - .envs(env) - .current_dir(dir) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() - .map_err(|error| anyhow!("failed to spawn: {}", error))?; + let wait_rx = self.wait.subscribe().await?; - let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?; - let mut stdin = child - .stdin - .take() - .ok_or_else(|| anyhow!("stdin was missing"))?; - let mut stdout = child - .stdout - .take() - .ok_or_else(|| anyhow!("stdout was missing"))?; - let mut stderr = child - .stderr - .take() - .ok_or_else(|| anyhow!("stderr was missing"))?; + let spec = ChildSpec { + exe: PathBuf::from(exe), + cmd, + env, + tty: false, + cgroup: None, + working_dir, + with_new_session: false, + }; + + let mut child = spec.spawn(wait_rx).context("failed to spawn")?; + + let mut stdin = child.stdin.take().context("stdin was missing")?; + let mut stdout = child.stdout.take().context("stdout was missing")?; + let mut stderr = child.stderr.take().context("stderr was missing")?; let stdout_handle = self.handle.clone(); let stdout_task = tokio::task::spawn(async move { @@ -161,18 +162,12 @@ impl ZoneExecTask { stdin_task.abort(); }); - let code = loop { - if let Ok(event) = wait_subscription.recv().await { - if event.pid.as_raw() as u32 == pid { - break event.status; - } - } - }; + let exit_code = child.wait().await?; data_task.await?; let response = Response { response: Some(ResponseType::ExecStream(ExecStreamResponseUpdate { exited: true, - exit_code: code, + exit_code, error: String::new(), stdout: vec![], stderr: vec![], diff --git a/crates/zone/src/init.rs b/crates/zone/src/init.rs index 897ebb5..bac7152 100644 --- a/crates/zone/src/init.rs +++ b/crates/zone/src/init.rs @@ -1,30 +1,33 @@ +use std::{ + collections::HashMap, + ffi::CString, + fs::{File, OpenOptions, Permissions}, + io, + net::{Ipv4Addr, Ipv6Addr}, + os::fd::AsRawFd, + os::unix::{ffi::OsStrExt, fs::{chroot, PermissionsExt}}, + path::{Path, PathBuf}, + str::FromStr, +}; + use anyhow::{anyhow, Result}; +use log::{trace, warn}; + use cgroups_rs::{Cgroup, CgroupPid}; use futures::stream::TryStreamExt; +use libc::{pid_t, sethostname, setsid, TIOCSCTTY}; +use nix::{ioctl_write_int_bad, unistd::{dup2, execve, fork, ForkResult}}; +use oci_spec::image::{Config, ImageConfiguration}; +use path_absolutize::Absolutize; +use platform_info::{PlatformInfo, PlatformInfoAPI, UNameAPI}; +use sys_mount::{FilesystemType, Mount, MountFlags}; +use tokio::fs; + use ipnetwork::IpNetwork; use krata::ethtool::EthtoolHandle; use krata::idm::client::IdmInternalClient; use krata::idm::internal::INTERNAL_IDM_CHANNEL; use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat}; -use libc::{sethostname, setsid, TIOCSCTTY}; -use log::{trace, warn}; -use nix::ioctl_write_int_bad; -use nix::unistd::{dup2, execve, fork, ForkResult, Pid}; -use oci_spec::image::{Config, ImageConfiguration}; -use path_absolutize::Absolutize; -use platform_info::{PlatformInfo, PlatformInfoAPI, UNameAPI}; -use std::collections::HashMap; -use std::ffi::CString; -use std::fs::{File, OpenOptions, Permissions}; -use std::io; -use std::net::{Ipv4Addr, Ipv6Addr}; -use std::os::fd::AsRawFd; -use std::os::unix::ffi::OsStrExt; -use std::os::unix::fs::{chroot, PermissionsExt}; -use std::path::{Path, PathBuf}; -use std::str::FromStr; -use sys_mount::{FilesystemType, Mount, MountFlags}; -use tokio::fs; use crate::background::ZoneBackground; @@ -606,7 +609,7 @@ impl ZoneInit { env: Vec, ) -> Result<()> { match unsafe { fork()? } { - ForkResult::Parent { child } => self.background(idm, cgroup, child).await, + ForkResult::Parent { child } => self.background(idm, cgroup, child.as_raw()).await, ForkResult::Child => self.foreground(cgroup, working_dir, path, cmd, env).await, } } @@ -638,7 +641,7 @@ impl ZoneInit { &mut self, idm: IdmInternalClient, cgroup: Cgroup, - executed: Pid, + executed: pid_t, ) -> Result<()> { let mut background = ZoneBackground::new(idm, cgroup, executed).await?; background.run().await?; diff --git a/crates/zone/src/lib.rs b/crates/zone/src/lib.rs index 035dc0c..0dfce3a 100644 --- a/crates/zone/src/lib.rs +++ b/crates/zone/src/lib.rs @@ -9,6 +9,7 @@ pub mod childwait; pub mod exec; pub mod init; pub mod metrics; +pub mod spawn; pub async fn death(code: c_int) -> Result<()> { let store = XsdClient::open().await?; diff --git a/crates/zone/src/spawn/child.rs b/crates/zone/src/spawn/child.rs new file mode 100644 index 0000000..91e0771 --- /dev/null +++ b/crates/zone/src/spawn/child.rs @@ -0,0 +1,183 @@ +use std::{ + collections::HashMap, + ffi::CString, + io, + mem::MaybeUninit, + path::PathBuf, + ptr::addr_of_mut, +}; + +use anyhow::{bail, Context, Result}; +use log::{debug, error}; + +use cgroups_rs::{Cgroup, CgroupPid}; +use tokio::sync::broadcast; + +use crate::childwait::ChildEvent; + +use super::stdio::{StdioSet, Stderr, Stdin, Stdout}; + +pub struct Child { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + pid: libc::pid_t, + reaper_rx: broadcast::Receiver, +} + +// TODO: impl From +/// Command used to spawn a child process +#[derive(Debug, Clone)] +pub struct ChildSpec { + /// The executable, with or without path, path relative + /// or absolute, to run. + pub exe: PathBuf, + /// The args to pass, as POSIX specifies + pub cmd: Vec, + /// Env vars to be set + pub env: HashMap, + /// Working directory to set just before spawning + pub working_dir: String, + /// Cgroup we'll use for the child + pub cgroup: Option, + /// Whether to create the child in a new session + /// This is mainly for image entrypoint + pub with_new_session: bool, + /// Whether to use tty + pub tty: bool, +} + +impl Child { + pub fn pid(&self) -> libc::pid_t { self.pid } + + pub async fn wait(mut self) -> Result { + debug!("waiting on process {}", self.pid); + loop { + let Ok(e) = self.reaper_rx.recv().await + else { bail!("dead reaper - ironic"); }; + + if e.pid == self.pid { + return Ok(e.status); + } + } + } +} + +impl ChildSpec { + pub fn spawn(self, reaper_rx: broadcast::Receiver) -> Result { + let Self { + exe, + cmd, + env, + working_dir, + cgroup, + with_new_session, + tty, + .. + } = self; + + let mut stdio = if tty { + StdioSet::new_pty().context("failed to spawn pty")? + } else { + StdioSet::new_pipes().context("failed to alloc pipes")? + }; + + let mut file_actions: libc::posix_spawn_file_actions_t = unsafe { + let mut fa = MaybeUninit::uninit(); + libc::posix_spawn_file_actions_init(fa.as_mut_ptr()); + fa.assume_init() + }; + stdio.add_to_spawn_file_actions(&mut file_actions)?; + + let spawnattr: libc::posix_spawnattr_t = unsafe { + let mut spawnattr = MaybeUninit::uninit(); + libc::posix_spawnattr_init(spawnattr.as_mut_ptr()); + // SAFETY: Both flags use 8 bits or less + #[allow(overflowing_literals)] + let mut flags = 0; + // If we start a new session, spawn will create a new pgroup, too + if with_new_session { + flags |= libc::POSIX_SPAWN_SETSID as i16; + } else { + flags |= libc::POSIX_SPAWN_SETPGROUP as i16; + } + + match libc::posix_spawnattr_setflags(spawnattr.as_mut_ptr(), flags) { + x if x > 0 => { + error!("error on posix_spawnattr_setflags - res {x}"); + return Err(io::Error::last_os_error().into()); + }, + _ => {} + }; + + spawnattr.assume_init() + }; + + let old_working_dir = std::env::current_dir().context("failed to retriev CWD")?; + std::env::set_current_dir(working_dir).context("failed to change CWD")?; + + let mut pid: libc::pid_t = 0; + + let spawn = if exe.is_relative() { + debug!("relying on libc to do executable lookup"); + libc::posix_spawnp + } else { + debug!("absolute command path found"); + libc::posix_spawn + }; + + // SAFETY: We're using the raw underlying value, then rewrapping it for Drop + let res = unsafe { + let exe = CString::new(exe.as_os_str().as_encoded_bytes())?; + let cmd = cmd.into_iter() + .map(CString::into_raw) + .chain(Some(std::ptr::null_mut())) + .collect::>(); + + let env = env.iter() + .map(|(key, value)| CString::new(format!("{}={}", key, value)).context("null byte in env vars")) + .collect::>>()?; + let env = env.into_iter() + .map(CString::into_raw) + .chain(Some(std::ptr::null_mut())) + .collect::>(); + + // TODO: Safety comment + let res = spawn( + addr_of_mut!(pid), + exe.as_ptr(), + &file_actions, + &spawnattr, + cmd.as_slice().as_ptr(), + env.as_slice().as_ptr(), + ); + + let _ = cmd.into_iter().map(|a| CString::from_raw(a)); + let _ = env.into_iter().map(|e| CString::from_raw(e)); + + res + }; + + std::env::set_current_dir(old_working_dir).context("failed to restore previous CWD")?; + + if res != 0 { + error!("Failed to spawn process: return value of {res}"); + return Err(io::Error::last_os_error().into()); + } + + if let Some(cg) = cgroup { + cg.add_task(CgroupPid::from(pid as u64)).context("failed to add child to cgroup")?; + } + + let (stdin, stdout, stderr) = stdio.get_parent_side()?; + + Ok(Child { + pid, + reaper_rx, + stdin: Some(stdin), + stdout: Some(stdout), + stderr: Some(stderr), + }) + } +} + diff --git a/crates/zone/src/spawn/mod.rs b/crates/zone/src/spawn/mod.rs new file mode 100644 index 0000000..2e5c12a --- /dev/null +++ b/crates/zone/src/spawn/mod.rs @@ -0,0 +1,2 @@ +pub mod child; +pub mod stdio; \ No newline at end of file diff --git a/crates/zone/src/spawn/stdio.rs b/crates/zone/src/spawn/stdio.rs new file mode 100644 index 0000000..0bdaa23 --- /dev/null +++ b/crates/zone/src/spawn/stdio.rs @@ -0,0 +1,304 @@ +use std::{ + io, + os::fd::{AsRawFd, IntoRawFd, RawFd}, + pin::Pin, + task::{Context, Poll, ready}, +}; + +use anyhow::{bail, Context as _, Result}; + +use pin_project_lite::pin_project; +use tokio::io::{ + AsyncRead, AsyncWrite, Interest, ReadBuf, + unix::AsyncFd, +}; + +type SpawnFileActions = libc::posix_spawn_file_actions_t; + +pub struct StdioSet { + parent: Option, + child: Option, +} + +pub struct StdioSubset { + stdin: Stdio, + stdout: Stdio, + stderr: Stdio, +} + +pin_project! { + pub struct Stdin { + #[pin] inner: Stdio + } +} + +pin_project! { + pub struct Stdout { + #[pin] inner: Stdio + } +} + +pin_project! { + pub struct Stderr { + #[pin] inner: Stdio + } +} + +struct Stdio(RawFd); + +impl StdioSet { + pub fn add_to_spawn_file_actions(&mut self, attr: &mut SpawnFileActions) -> Result<()> { + let Some(stdio) = self.child.take() else { bail!("already used child-side fd's") }; + let res_in = unsafe { + libc::posix_spawn_file_actions_adddup2( + attr, stdio.stdin.0, libc::STDIN_FILENO + ) + }; + + let res_out = unsafe { + libc::posix_spawn_file_actions_adddup2( + attr, stdio.stdout.0, libc::STDOUT_FILENO + ) + }; + + let res_err = unsafe { + libc::posix_spawn_file_actions_adddup2( + attr, stdio.stderr.0, libc::STDERR_FILENO + ) + }; + + // It is highly unlikely that they will fail from different errors, and + // even if they did, they're all fatal and need to be addressed by the + // user deploying. + match (res_in, res_out, res_err) { + (0, 0, 0) => Ok(()), + _ => Err(std::io::Error::last_os_error().into()), + } + } + + pub fn get_parent_side(&mut self) -> Result<(Stdin, Stdout, Stderr)> { + let StdioSubset { stdin, stdout, stderr } + = self.parent.take().context("stdio handles already taken")?; + + Ok((Stdin { inner: stdin }, Stdout { inner: stdout }, Stderr { inner: stderr })) + } + + pub fn new_pty() -> Result { + use nix::{fcntl::{self, FcntlArg, OFlag}, pty}; + + // Open the Pseudoterminal with +rw capabilities and without + // setting it as our controlling terminal + let pty = pty::posix_openpt(OFlag::O_RDWR | OFlag::O_NOCTTY)?; + // Grant access to the side we pass to the child + // This is referred to as the "slave" + pty::grantpt(&pty)?; + // Unlock the "slave" device + pty::unlockpt(&pty)?; + + // Retrieve the "slave" device + let pts = { + let name = pty::ptsname_r(&pty)?; + std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(name)? + .into_raw_fd() + }; + + // Get the RawFd out of the OwnedFd because OwnedFd + // sets CLOEXEC on clone + let pty = pty.as_raw_fd(); + + // Make the "master" async-ready by setting NONBLOCK + let mut opts = OFlag::from_bits(fcntl::fcntl(pty, FcntlArg::F_GETFL)?) + .expect("got bad O_FLAG bits from kernel"); + opts |= OFlag::O_NONBLOCK; + fcntl::fcntl(pty, FcntlArg::F_SETFL(opts))?; + + Ok(Self { + child: Some(StdioSubset { + stdin: Stdio(pts.clone()), + stdout: Stdio(pts.clone()), + stderr: Stdio(pts), + }), + parent: Some(StdioSubset { + stdin: Stdio(pty.clone()), + stdout: Stdio(pty.clone()), + stderr: Stdio(pty), + }), + }) + } + + pub fn new_pipes() -> Result { + let (stdin_child, stdin_parent) = make_pipe()?; + let (stdout_parent, stdout_child) = make_pipe()?; + let (stderr_parent, stderr_child) = make_pipe()?; + + Ok(Self { + parent: Some(StdioSubset { + stdin: Stdio(stdin_parent), + stdout: Stdio(stdout_parent), + stderr: Stdio(stderr_parent), + }), + child: Some(StdioSubset { + stdin: Stdio(stdin_child), + stdout: Stdio(stdout_child), + stderr: Stdio(stderr_child), + }), + }) + } +} + +impl AsyncRead for Stdio { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_> + ) -> Poll> { + // SAFETY: if this fails, we have a bug in our pty/pipe allocations + let fd = AsyncFd::with_interest(self.0, Interest::READABLE) + .expect("async io failure"); + + loop { + let mut guard = ready!(fd.poll_read_ready(cx))?; + let count = buf.remaining(); + + let res = guard.try_io(|i| match unsafe { + let buf_ptr = buf.initialize_unfilled().as_mut_ptr().cast(); + libc::read(i.as_raw_fd(), buf_ptr, count) + } { + -1 => Err(std::io::Error::last_os_error()), + // SAFETY: write returns -1..=isize::MAX, and + // we've already ruled out -1, so this will be + // a valid usize. + n => { buf.advance(n.try_into().unwrap()); Ok(()) } + }); + + if let Ok(r) = res { + // Err will ever only be WouldBlock, so we allow + // the loop to try again. `r` is the inner Result + // of try_io + return Poll::Ready(r); + } + } + } +} + +impl AsyncWrite for Stdio { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8] + ) -> Poll> { + // SAFETY: if this fails, we have a bug in our pty/pipe allocations + let fd = AsyncFd::with_interest(self.0, Interest::WRITABLE) + .expect("async io failure"); + + loop { + let mut guard = ready!(fd.poll_write_ready(cx))?; + + let res = guard.try_io(|i| match unsafe { + libc::write(i.as_raw_fd(), buf.as_ptr().cast(), buf.len()) + } { + -1 => Err(io::Error::last_os_error()), + // SAFETY: write returns -1..=isize::MAX, and + // we've already ruled out -1, so this will be + // a valid usize. + n => Ok(n.try_into().unwrap()), + }); + + if let Ok(r) = res { + // Err will ever only be WouldBlock, so we allow + // the loop to try again. `r` is the inner Result + // of try_io + return Poll::Ready(r); + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl AsyncWrite for Stdin { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8] + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_> + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl AsyncRead for Stdout { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_> + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncRead for Stderr { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_> + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +fn make_pipe() -> Result<(RawFd, RawFd)> { + // Init two null file descriptors + // [read, write] + let mut raw_fds: [RawFd; 2] = [0, 0]; + + // Allocate the pipe and get each end of, setting as non-blocking + let res = unsafe { libc::pipe(raw_fds.as_mut_ptr().cast()) }; + if res == -1 { return Err(io::Error::last_os_error().into()); } + + // We split the pipe into its ends so we can be explicit + // which end is which. + let [read, write] = raw_fds; + + // Wipe the flags, because CLOEXEC is on by default + let flags = libc::O_NONBLOCK; + f_setfl(read, flags)?; + f_setfl(write, flags)?; + + Ok((read, write)) +} + +fn f_setfl(fd: RawFd, flags: libc::c_int) -> Result<()> { + let res = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; + if res == -1 { return Err(io::Error::last_os_error().into()); } + + Ok(()) +}