utilize async processing for console and child exit events

This commit is contained in:
Alex Zenla 2024-02-23 03:25:06 +00:00
parent c582f15c54
commit 3af9ffec34
No known key found for this signature in database
GPG Key ID: 067B238899B51269
16 changed files with 364 additions and 167 deletions

View File

@ -25,6 +25,9 @@ features = ["process"]
[dependencies.krata]
path = "../shared"
[dependencies.xenevtchn]
path = "../libs/xen/xenevtchn"
[lib]
path = "src/lib.rs"

View File

@ -0,0 +1,49 @@
use std::time::Duration;
use crate::childwait::{ChildEvent, ChildWait};
use anyhow::Result;
use nix::{libc::c_int, unistd::Pid};
use tokio::{select, time::sleep};
pub struct ContainerBackground {
child: Pid,
wait: ChildWait,
}
impl ContainerBackground {
pub async fn new(child: Pid) -> Result<ContainerBackground> {
Ok(ContainerBackground {
child,
wait: ChildWait::new()?,
})
}
pub async fn run(&mut self) -> Result<()> {
loop {
select! {
event = self.wait.recv() => match event {
Some(event) => self.child_event(event).await?,
None => {
break;
}
}
};
}
Ok(())
}
async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
if event.pid == self.child {
self.death(event.status).await?;
}
Ok(())
}
async fn death(&mut self, code: c_int) -> Result<()> {
println!("[krata] container process exited: status = {}", code);
println!("[krata] looping forever");
loop {
sleep(Duration::from_secs(1)).await;
}
}
}

View File

@ -0,0 +1,84 @@
use std::{
ptr::addr_of_mut,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
};
use anyhow::Result;
use log::warn;
use nix::{
libc::{c_int, wait},
unistd::Pid,
};
use tokio::sync::mpsc::{channel, Receiver, Sender};
const CHILD_WAIT_QUEUE_LEN: usize = 10;
#[derive(Clone, Copy, Debug)]
pub struct ChildEvent {
pub pid: Pid,
pub status: c_int,
}
pub struct ChildWait {
receiver: Receiver<ChildEvent>,
signal: Arc<AtomicBool>,
_task: JoinHandle<()>,
}
impl ChildWait {
pub fn new() -> Result<ChildWait> {
let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN);
let signal = Arc::new(AtomicBool::new(false));
let mut processor = ChildWaitTask {
sender,
signal: signal.clone(),
};
let task = thread::spawn(move || {
if let Err(error) = processor.process() {
warn!("failed to process child updates: {}", error);
}
});
Ok(ChildWait {
receiver,
signal,
_task: task,
})
}
pub async fn recv(&mut self) -> Option<ChildEvent> {
self.receiver.recv().await
}
}
struct ChildWaitTask {
sender: Sender<ChildEvent>,
signal: Arc<AtomicBool>,
}
impl ChildWaitTask {
fn process(&mut self) -> Result<()> {
loop {
let mut status: c_int = 0;
let pid = unsafe { wait(addr_of_mut!(status)) };
let event = ChildEvent {
pid: Pid::from_raw(pid),
status,
};
let _ = self.sender.try_send(event);
if self.signal.load(Ordering::Acquire) {
return Ok(());
}
}
}
}
impl Drop for ChildWait {
fn drop(&mut self) {
self.signal.store(true, Ordering::Release);
}
}

View File

@ -3,7 +3,7 @@ use futures::stream::TryStreamExt;
use ipnetwork::IpNetwork;
use krata::{LaunchInfo, LaunchNetwork};
use log::{trace, warn};
use nix::libc::{c_int, dup2, ioctl, wait};
use nix::libc::{dup2, ioctl};
use nix::unistd::{execve, fork, ForkResult, Pid};
use oci_spec::image::{Config, ImageConfiguration};
use std::ffi::{CStr, CString};
@ -13,14 +13,13 @@ use std::os::fd::AsRawFd;
use std::os::linux::fs::MetadataExt;
use std::os::unix::fs::{chroot, PermissionsExt};
use std::path::{Path, PathBuf};
use std::ptr::addr_of_mut;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use std::{fs, io};
use sys_mount::{FilesystemType, Mount, MountFlags};
use walkdir::WalkDir;
use crate::background::ContainerBackground;
const IMAGE_BLOCK_DEVICE_PATH: &str = "/dev/xvda";
const CONFIG_BLOCK_DEVICE_PATH: &str = "/dev/xvdb";
@ -84,7 +83,7 @@ impl ContainerInit {
}
if let Some(cfg) = config.config() {
self.run(cfg, &launch)?;
self.run(cfg, &launch).await?;
} else {
return Err(anyhow!(
"unable to determine what to execute, image config doesn't tell us"
@ -367,7 +366,7 @@ impl ContainerInit {
Ok(())
}
fn run(&mut self, config: &Config, launch: &LaunchInfo) -> Result<()> {
async fn run(&mut self, config: &Config, launch: &LaunchInfo) -> Result<()> {
let mut cmd = match config.cmd() {
None => vec![],
Some(value) => value.clone(),
@ -408,7 +407,7 @@ impl ContainerInit {
}
std::env::set_current_dir(&working_dir)?;
self.fork_and_exec(&path_cstr, cmd_cstr, env_cstr)?;
self.fork_and_exec(&path_cstr, cmd_cstr, env_cstr).await?;
Ok(())
}
@ -420,9 +419,14 @@ impl ContainerInit {
Ok(results)
}
fn fork_and_exec(&mut self, path: &CStr, cmd: Vec<CString>, env: Vec<CString>) -> Result<()> {
async fn fork_and_exec(
&mut self,
path: &CStr,
cmd: Vec<CString>,
env: Vec<CString>,
) -> Result<()> {
match unsafe { fork()? } {
ForkResult::Parent { child } => self.background(child),
ForkResult::Parent { child } => self.background(child).await,
ForkResult::Child => {
unsafe { nix::libc::setsid() };
let result = unsafe { ioctl(io::stdin().as_raw_fd(), nix::libc::TIOCSCTTY, 0) };
@ -435,21 +439,9 @@ impl ContainerInit {
}
}
fn background(&mut self, executed: Pid) -> Result<()> {
loop {
let mut status: c_int = 0;
let pid = unsafe { wait(addr_of_mut!(status)) };
if executed.as_raw() == pid {
return self.death(status);
}
}
}
fn death(&mut self, code: c_int) -> Result<()> {
println!("[krata] container process exited: status = {}", code);
println!("[krata] looping forever");
loop {
sleep(Duration::from_secs(1));
}
async fn background(&mut self, executed: Pid) -> Result<()> {
let mut background = ContainerBackground::new(executed).await?;
background.run().await?;
Ok(())
}
}

View File

@ -1 +1,3 @@
pub mod background;
pub mod childwait;
pub mod init;

View File

@ -26,6 +26,8 @@ oci-spec = { workspace = true }
backhand = { workspace = true }
uuid = { workspace = true }
ipnetwork = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
[dependencies.krata]
path = "../shared"

View File

@ -53,7 +53,8 @@ enum Commands {
},
}
fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
let args = ControllerArgs::parse();
@ -99,7 +100,7 @@ fn main() -> Result<()> {
println!("launched container: {}", uuid);
if attach {
let mut console = ControllerConsole::new(&mut context);
console.perform(&uuid.to_string())?;
console.perform(&uuid.to_string()).await?;
}
}
@ -110,7 +111,7 @@ fn main() -> Result<()> {
Commands::Console { container } => {
let mut console = ControllerConsole::new(&mut context);
console.perform(&container)?;
console.perform(&container).await?;
}
Commands::List { .. } => {

74
controller/src/console.rs Normal file
View File

@ -0,0 +1,74 @@
use std::{
io::{stdin, stdout},
os::fd::{AsRawFd, FromRawFd},
};
use anyhow::Result;
use futures::future::join_all;
use log::warn;
use std::process::exit;
use termion::raw::IntoRawMode;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
pub struct XenConsole {
xen_read_handle: File,
xen_write_handle: File,
}
impl XenConsole {
pub async fn new(tty: &str) -> Result<XenConsole> {
let xen_read_handle = File::options().read(true).write(false).open(tty).await?;
let xen_write_handle = File::options().read(false).write(true).open(tty).await?;
Ok(XenConsole {
xen_read_handle,
xen_write_handle,
})
}
pub async fn attach(self) -> Result<()> {
let stdin = stdin();
let terminal = stdout().into_raw_mode()?;
let stdout = unsafe { File::from_raw_fd(terminal.as_raw_fd()) };
let reader_task = tokio::task::spawn(async move {
if let Err(error) = XenConsole::copy_stdout(stdout, self.xen_read_handle).await {
warn!("failed to copy console output: {}", error);
}
});
let writer_task = tokio::task::spawn(async move {
if let Err(error) = XenConsole::intercept_stdin(
unsafe { File::from_raw_fd(stdin.as_raw_fd()) },
self.xen_write_handle,
)
.await
{
warn!("failed to intercept stdin: {}", error);
}
});
join_all(vec![reader_task, writer_task]).await;
Ok(())
}
async fn copy_stdout(mut stdout: File, mut console: File) -> Result<()> {
let mut buffer = vec![0u8; 256];
loop {
let size = console.read(&mut buffer).await?;
stdout.write_all(&buffer[0..size]).await?;
stdout.flush().await?;
}
}
async fn intercept_stdin(mut stdin: File, mut console: File) -> Result<()> {
let mut buffer = vec![0u8; 60];
loop {
let size = stdin.read(&mut buffer).await?;
if size == 1 && buffer[0] == 0x1d {
exit(0);
}
console.write_all(&buffer[0..size]).await?;
}
}
}

View File

@ -1,11 +1,6 @@
use std::{
io::{self, Read, Write},
process::exit,
thread,
};
use anyhow::{anyhow, Result};
use termion::raw::IntoRawMode;
use crate::console::XenConsole;
use super::ControllerContext;
@ -18,49 +13,15 @@ impl ControllerConsole<'_> {
ControllerConsole { context }
}
pub fn perform(&mut self, id: &str) -> Result<()> {
pub async fn perform(&mut self, id: &str) -> Result<()> {
let info = self
.context
.resolve(id)?
.ok_or_else(|| anyhow!("unable to resolve container: {}", id))?;
let domid = info.domid;
let (mut read, mut write) = self.context.xen.open_console(domid)?;
let mut stdin = io::stdin();
let is_tty = termion::is_tty(&stdin);
let mut stdout_for_exit = io::stdout().into_raw_mode()?;
thread::spawn(move || {
let mut buffer = vec![0u8; 60];
loop {
let size = stdin.read(&mut buffer).expect("failed to read stdin");
if is_tty && size == 1 && buffer[0] == 0x1d {
stdout_for_exit
.suspend_raw_mode()
.expect("failed to disable raw mode");
stdout_for_exit.flush().expect("failed to flush stdout");
exit(0);
}
write
.write_all(&buffer[0..size])
.expect("failed to write to domain console");
write.flush().expect("failed to flush domain console");
}
});
let mut buffer = vec![0u8; 256];
if is_tty {
let mut stdout = io::stdout().into_raw_mode()?;
loop {
let size = read.read(&mut buffer)?;
stdout.write_all(&buffer[0..size])?;
stdout.flush()?;
}
} else {
let mut stdout = io::stdout();
loop {
let size = read.read(&mut buffer)?;
stdout.write_all(&buffer[0..size])?;
stdout.flush()?;
}
}
let tty = self.context.xen.get_console_path(domid)?;
let console = XenConsole::new(&tty).await?;
console.attach().await?;
Ok(())
}
}

View File

@ -4,10 +4,11 @@ use advmac::MacAddr6;
use anyhow::{anyhow, Result};
use ipnetwork::Ipv4Network;
use krata::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
LaunchChannels, LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6,
LaunchNetworkResolver,
};
use uuid::Uuid;
use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface};
use xenclient::{DomainConfig, DomainDisk, DomainEventChannel, DomainNetworkInterface};
use xenstore::client::XsdInterface;
use crate::image::{name::ImageName, ImageCompiler, ImageInfo};
@ -75,6 +76,9 @@ impl ControllerLaunch<'_> {
}),
env: request.env,
run: request.run,
channels: LaunchChannels {
exit: "krata-exit".to_string(),
},
};
let cfgblk = ConfigBlock::new(&uuid, &image_info)?;
@ -133,6 +137,7 @@ impl ControllerLaunch<'_> {
script: None,
}],
filesystems: vec![],
event_channels: vec![DomainEventChannel { name: "krata-exit" }],
extra_keys: vec![
("krata/uuid".to_string(), uuid.to_string()),
(

View File

@ -1,3 +1,4 @@
pub mod autoloop;
pub mod console;
pub mod ctl;
pub mod image;

View File

@ -26,6 +26,7 @@ fn main() -> Result<()> {
vifs: vec![],
filesystems: vec![],
extra_keys: vec![],
event_channels: vec![],
};
let domid = client.create(&config)?;
println!("created domain {}", domid);

View File

@ -11,7 +11,7 @@ use crate::error::{Error, Result};
use crate::x86::X86BootSetup;
use log::{trace, warn};
use std::fs::{read, File, OpenOptions};
use std::fs::read;
use std::path::PathBuf;
use std::str::FromStr;
use std::thread;
@ -59,6 +59,11 @@ pub struct DomainNetworkInterface<'a> {
#[derive(Debug)]
pub struct DomainConsole {}
#[derive(Debug)]
pub struct DomainEventChannel<'a> {
pub name: &'a str,
}
#[derive(Debug)]
pub struct DomainConfig<'a> {
pub backend_domid: u32,
@ -72,6 +77,7 @@ pub struct DomainConfig<'a> {
pub consoles: Vec<DomainConsole>,
pub vifs: Vec<DomainNetworkInterface<'a>>,
pub filesystems: Vec<DomainFilesystem<'a>>,
pub event_channels: Vec<DomainEventChannel<'a>>,
pub extra_keys: Vec<(String, String)>,
}
@ -99,90 +105,6 @@ impl XenClient {
}
}
pub fn destroy(&mut self, domid: u32) -> Result<()> {
if let Err(err) = self.destroy_store(domid) {
warn!("failed to destroy store for domain {}: {}", domid, err);
}
self.call.destroy_domain(domid)?;
Ok(())
}
fn destroy_store(&mut self, domid: u32) -> Result<()> {
let dom_path = self.store.get_domain_path(domid)?;
let vm_path = self.store.read_string(&format!("{}/vm", dom_path))?;
if vm_path.is_empty() {
return Err(Error::DomainNonExistent);
}
let mut backend_paths: Vec<String> = Vec::new();
let console_frontend_path = format!("{}/console", dom_path);
let console_backend_path = self
.store
.read_string_optional(format!("{}/backend", console_frontend_path).as_str())?;
for device_category in self
.store
.list_any(format!("{}/device", dom_path).as_str())?
{
for device_id in self
.store
.list_any(format!("{}/device/{}", dom_path, device_category).as_str())?
{
let device_path = format!("{}/device/{}/{}", dom_path, device_category, device_id);
let backend_path = self
.store
.read_string(format!("{}/backend", device_path).as_str())?;
backend_paths.push(backend_path);
}
}
for backend in &backend_paths {
let state_path = format!("{}/state", backend);
let online_path = format!("{}/online", backend);
let mut tx = self.store.transaction()?;
let state = tx.read_string(&state_path)?;
if state.is_empty() {
break;
}
tx.write_string(&online_path, "0")?;
if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 {
tx.write_string(&state_path, "5")?;
}
tx.commit()?;
let mut count: u32 = 0;
loop {
if count >= 100 {
warn!("unable to safely destroy backend: {}", backend);
break;
}
let state = self.store.read_string(&state_path)?;
let state = i64::from_str(&state).unwrap_or(-1);
if state == 6 {
break;
}
thread::sleep(Duration::from_millis(100));
count += 1;
}
}
let mut tx = self.store.transaction()?;
let mut backend_removals: Vec<String> = Vec::new();
backend_removals.extend_from_slice(backend_paths.as_slice());
if let Some(backend) = console_backend_path {
backend_removals.push(backend);
}
for path in &backend_removals {
let path = PathBuf::from(path);
let parent = path.parent().ok_or(Error::PathParentNotFound)?;
tx.rm(parent.to_str().ok_or(Error::PathStringConversion)?)?;
}
tx.rm(&vm_path)?;
tx.rm(&dom_path)?;
tx.commit()?;
Ok(())
}
fn init(&mut self, domid: u32, domain: &CreateDomain, config: &DomainConfig) -> Result<()> {
trace!(
"XenClient init domid={} domain={:?} config={:?}",
@ -401,6 +323,18 @@ impl XenClient {
vif,
)?;
}
for channel in &config.event_channels {
let id = self
.call
.evtchn_alloc_unbound(domid, config.backend_domid)?;
let channel_path = format!("{}/evtchn/{}", dom_path, channel.name);
self.store
.write_string(&format!("{}/name", channel_path), channel.name)?;
self.store
.write_string(&format!("{}/channel", channel_path), &id.to_string())?;
}
self.call.unpause_domain(domid)?;
Ok(())
}
@ -673,7 +607,91 @@ impl XenClient {
Ok(())
}
pub fn open_console(&mut self, domid: u32) -> Result<(File, File)> {
pub fn destroy(&mut self, domid: u32) -> Result<()> {
if let Err(err) = self.destroy_store(domid) {
warn!("failed to destroy store for domain {}: {}", domid, err);
}
self.call.destroy_domain(domid)?;
Ok(())
}
fn destroy_store(&mut self, domid: u32) -> Result<()> {
let dom_path = self.store.get_domain_path(domid)?;
let vm_path = self.store.read_string(&format!("{}/vm", dom_path))?;
if vm_path.is_empty() {
return Err(Error::DomainNonExistent);
}
let mut backend_paths: Vec<String> = Vec::new();
let console_frontend_path = format!("{}/console", dom_path);
let console_backend_path = self
.store
.read_string_optional(format!("{}/backend", console_frontend_path).as_str())?;
for device_category in self
.store
.list_any(format!("{}/device", dom_path).as_str())?
{
for device_id in self
.store
.list_any(format!("{}/device/{}", dom_path, device_category).as_str())?
{
let device_path = format!("{}/device/{}/{}", dom_path, device_category, device_id);
let backend_path = self
.store
.read_string(format!("{}/backend", device_path).as_str())?;
backend_paths.push(backend_path);
}
}
for backend in &backend_paths {
let state_path = format!("{}/state", backend);
let online_path = format!("{}/online", backend);
let mut tx = self.store.transaction()?;
let state = tx.read_string(&state_path)?;
if state.is_empty() {
break;
}
tx.write_string(&online_path, "0")?;
if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 {
tx.write_string(&state_path, "5")?;
}
tx.commit()?;
let mut count: u32 = 0;
loop {
if count >= 100 {
warn!("unable to safely destroy backend: {}", backend);
break;
}
let state = self.store.read_string(&state_path)?;
let state = i64::from_str(&state).unwrap_or(-1);
if state == 6 {
break;
}
thread::sleep(Duration::from_millis(100));
count += 1;
}
}
let mut tx = self.store.transaction()?;
let mut backend_removals: Vec<String> = Vec::new();
backend_removals.extend_from_slice(backend_paths.as_slice());
if let Some(backend) = console_backend_path {
backend_removals.push(backend);
}
for path in &backend_removals {
let path = PathBuf::from(path);
let parent = path.parent().ok_or(Error::PathParentNotFound)?;
tx.rm(parent.to_str().ok_or(Error::PathStringConversion)?)?;
}
tx.rm(&vm_path)?;
tx.rm(&dom_path)?;
tx.commit()?;
Ok(())
}
pub fn get_console_path(&mut self, domid: u32) -> Result<String> {
let dom_path = self.store.get_domain_path(domid)?;
let console_tty_path = format!("{}/console/tty", dom_path);
let mut tty: Option<String> = None;
@ -687,8 +705,6 @@ impl XenClient {
let Some(tty) = tty else {
return Err(Error::TtyNotFound);
};
let read = OpenOptions::new().read(true).write(false).open(&tty)?;
let write = OpenOptions::new().read(false).write(true).open(&tty)?;
Ok((read, write))
Ok(tty)
}
}

View File

@ -10,5 +10,5 @@ REAL_SCRIPT="$(realpath "${0}")"
cd "$(dirname "${REAL_SCRIPT}")/.."
./initrd/build.sh -q
sudo cp "target/initrd/initrd" "/var/lib/krata/default/initrd"
cargo build -q --target x86_64-unknown-linux-gnu --bin kratactl
cargo build --target x86_64-unknown-linux-gnu --bin kratactl
exec sudo RUST_LOG="${RUST_LOG}" target/x86_64-unknown-linux-gnu/debug/kratactl "${@}"

View File

@ -8,5 +8,5 @@ fi
REAL_SCRIPT="$(realpath "${0}")"
cd "$(dirname "${REAL_SCRIPT}")/.."
cargo build -q --target x86_64-unknown-linux-gnu --bin kratanet
cargo build --target x86_64-unknown-linux-gnu --bin kratanet
exec sudo RUST_LOG="${RUST_LOG}" target/x86_64-unknown-linux-gnu/debug/kratanet "${@}"

View File

@ -25,9 +25,15 @@ pub struct LaunchNetwork {
pub resolver: LaunchNetworkResolver,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchChannels {
pub exit: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchInfo {
pub network: Option<LaunchNetwork>,
pub env: Option<Vec<String>>,
pub run: Option<Vec<String>>,
pub channels: LaunchChannels,
}