diff --git a/crates/kratactl/src/cli/console.rs b/crates/kratactl/src/cli/console.rs index ae47bac..fc8fb67 100644 --- a/crates/kratactl/src/cli/console.rs +++ b/crates/kratactl/src/cli/console.rs @@ -2,6 +2,7 @@ use anyhow::Result; use clap::Parser; use krata::control::control_service_client::ControlServiceClient; +use tokio::select; use tonic::transport::Channel; use crate::{console::StdioConsoleStream, events::EventStream}; @@ -20,10 +21,17 @@ impl ConsoleCommand { ) -> Result<()> { let input = StdioConsoleStream::stdin_stream(self.guest.clone()).await; let output = client.console_data(input).await?.into_inner(); + let stdout_handle = + tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await }); let exit_hook_task = StdioConsoleStream::guest_exit_hook(self.guest.clone(), events).await?; - StdioConsoleStream::stdout(output).await?; - exit_hook_task.abort(); - Ok(()) + let code = select! { + x = stdout_handle => { + x??; + None + }, + x = exit_hook_task => x? + }; + std::process::exit(code.unwrap_or(0)); } } diff --git a/crates/kratactl/src/cli/launch.rs b/crates/kratactl/src/cli/launch.rs index 0de41ef..c545b94 100644 --- a/crates/kratactl/src/cli/launch.rs +++ b/crates/kratactl/src/cli/launch.rs @@ -7,6 +7,7 @@ use krata::{ }, }; use log::error; +use tokio::select; use tonic::{transport::Channel, Request}; use crate::{console::StdioConsoleStream, events::EventStream}; @@ -52,17 +53,25 @@ impl LauchCommand { .await? .into_inner(); let id = response.guest_id; - if self.attach { + let code = if self.attach { wait_guest_started(&id, events.clone()).await?; let input = StdioConsoleStream::stdin_stream(id.clone()).await; let output = client.console_data(input).await?.into_inner(); + let stdout_handle = + tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await }); let exit_hook_task = StdioConsoleStream::guest_exit_hook(id.clone(), events).await?; - StdioConsoleStream::stdout(output).await?; - exit_hook_task.abort(); + select! { + x = stdout_handle => { + x??; + None + }, + x = exit_hook_task => x? + } } else { println!("created guest: {}", id); - } - Ok(()) + None + }; + std::process::exit(code.unwrap_or(0)); } } diff --git a/crates/kratactl/src/console.rs b/crates/kratactl/src/console.rs index c0ba8f2..9a3ab94 100644 --- a/crates/kratactl/src/console.rs +++ b/crates/kratactl/src/console.rs @@ -9,7 +9,7 @@ use krata::{ common::GuestStatus, control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest}, }; -use log::{debug, warn}; +use log::debug; use termion::raw::IntoRawMode; use tokio::{ fs::File, @@ -61,7 +61,10 @@ impl StdioConsoleStream { Ok(()) } - pub async fn guest_exit_hook(id: String, events: EventStream) -> Result> { + pub async fn guest_exit_hook( + id: String, + events: EventStream, + ) -> Result>> { Ok(tokio::task::spawn(async move { let mut stream = events.subscribe(); while let Ok(event) = stream.recv().await { @@ -80,16 +83,17 @@ impl StdioConsoleStream { } if let Some(exit_info) = state.exit_info { - std::process::exit(exit_info.code); + return Some(exit_info.code); } - if state.status() == GuestStatus::Destroy { - warn!("attached guest was destroyed"); - std::process::exit(1); + let status = state.status(); + if status == GuestStatus::Destroy || status == GuestStatus::Destroyed { + return Some(10); } } } } + None })) } } diff --git a/crates/kratad/src/event.rs b/crates/kratad/src/event.rs index 7fd9913..1d60691 100644 --- a/crates/kratad/src/event.rs +++ b/crates/kratad/src/event.rs @@ -1,22 +1,27 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration}; use anyhow::Result; use krata::common::{GuestExitInfo, GuestState, GuestStatus}; use log::error; use tokio::{ - sync::{broadcast, mpsc::Sender}, + select, + sync::{ + broadcast, + mpsc::{channel, Receiver, Sender}, + }, task::JoinHandle, time, }; use uuid::Uuid; -use kratart::{GuestInfo, Runtime}; +use kratart::Runtime; use crate::db::GuestStore; pub type DaemonEvent = krata::control::watch_events_reply::Event; const EVENT_CHANNEL_QUEUE_LEN: usize = 1000; +const EXIT_CODE_CHANNEL_QUEUE_LEN: usize = 1000; #[derive(Clone)] pub struct DaemonEventContext { @@ -38,8 +43,11 @@ pub struct DaemonEventGenerator { runtime: Runtime, guests: GuestStore, guest_reconciler_notify: Sender, - last: HashMap, - _sender: broadcast::Sender, + feed: broadcast::Receiver, + exit_code_sender: Sender<(Uuid, i32)>, + exit_code_receiver: Receiver<(Uuid, i32)>, + exit_code_handles: HashMap>, + _event_sender: broadcast::Sender, } impl DaemonEventGenerator { @@ -49,74 +57,101 @@ impl DaemonEventGenerator { runtime: Runtime, ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); + let (exit_code_sender, exit_code_receiver) = channel(EXIT_CODE_CHANNEL_QUEUE_LEN); let generator = DaemonEventGenerator { runtime, guests, guest_reconciler_notify, - last: HashMap::new(), - _sender: sender.clone(), + feed: sender.subscribe(), + exit_code_receiver, + exit_code_sender, + exit_code_handles: HashMap::new(), + _event_sender: sender.clone(), }; let context = DaemonEventContext { sender }; Ok((context, generator)) } - async fn evaluate(&mut self) -> Result<()> { - let guests = self.runtime.list().await?; - let guests = { - let mut map = HashMap::new(); - for guest in guests { - map.insert(guest.uuid, guest); - } - map - }; - - let mut exits: Vec<(Uuid, i32)> = Vec::new(); - - for (uuid, guest) in &guests { - let Some(last) = self.last.get(uuid) else { - continue; - }; - - if last.state.exit_code.is_some() { - continue; - } - - let Some(code) = guest.state.exit_code else { - continue; - }; - - exits.push((*uuid, code)); - } - - for (uuid, code) in exits { - if let Some(mut entry) = self.guests.read(uuid).await? { - let Some(ref mut guest) = entry.guest else { - continue; + async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::GuestChanged(changed) => { + let Some(ref guest) = changed.guest else { + return Ok(()); }; - guest.state = Some(GuestState { - status: GuestStatus::Exited.into(), - exit_info: Some(GuestExitInfo { code }), - error_info: None, - }); + let Some(ref state) = guest.state else { + return Ok(()); + }; - self.guests.update(uuid, entry).await?; - self.guest_reconciler_notify.send(uuid).await?; + let status = state.status(); + let id = Uuid::from_str(&guest.id)?; + match status { + GuestStatus::Started => { + let handle = self + .runtime + .subscribe_exit_code(id, self.exit_code_sender.clone()) + .await?; + self.exit_code_handles.insert(id, handle); + } + + GuestStatus::Destroyed => { + if let Some(handle) = self.exit_code_handles.remove(&id) { + handle.abort(); + } + } + + _ => {} + } } } - - self.last = guests; Ok(()) } + async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> { + if let Some(mut entry) = self.guests.read(id).await? { + let Some(ref mut guest) = entry.guest else { + return Ok(()); + }; + + guest.state = Some(GuestState { + status: GuestStatus::Exited.into(), + exit_info: Some(GuestExitInfo { code }), + error_info: None, + }); + + self.guests.update(id, entry).await?; + self.guest_reconciler_notify.send(id).await?; + } + Ok(()) + } + + async fn evaluate(&mut self) -> Result<()> { + select! { + x = self.exit_code_receiver.recv() => match x { + Some((uuid, code)) => { + self.handle_exit_code(uuid, code).await + }, + None => { + Ok(()) + } + }, + x = self.feed.recv() => match x { + Ok(event) => { + self.handle_feed_event(&event).await + }, + Err(error) => { + Err(error.into()) + } + } + } + } + pub async fn launch(mut self) -> Result> { Ok(tokio::task::spawn(async move { loop { if let Err(error) = self.evaluate().await { error!("failed to evaluate daemon events: {}", error); time::sleep(Duration::from_secs(5)).await; - } else { - time::sleep(Duration::from_millis(500)).await; } } })) diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index a29fcbb..e7115b4 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -7,11 +7,15 @@ use std::{ use anyhow::{anyhow, Result}; use ipnetwork::IpNetwork; +use log::error; use loopdev::LoopControl; -use tokio::sync::Mutex; +use tokio::{ + sync::{mpsc::Sender, Mutex}, + task::JoinHandle, +}; use uuid::Uuid; use xenclient::XenClient; -use xenstore::{XsdClient, XsdInterface}; +use xenstore::{XsdClient, XsdInterface, XsdWatchHandle}; use self::{ autoloop::AutoLoop, @@ -232,6 +236,28 @@ impl Runtime { launcher.launch(&mut context, request).await } + pub async fn subscribe_exit_code( + &self, + uuid: Uuid, + sender: Sender<(Uuid, i32)>, + ) -> Result> { + let mut context = self.context.lock().await; + let info = context + .resolve(uuid) + .await? + .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; + let path = format!("/local/domain/{}/krata/guest/exit-code", info.domid); + let handle = context.xen.store.watch(&path).await?; + let watch = ExitCodeWatch { + handle, + sender, + store: context.xen.store.clone(), + uuid, + path, + }; + watch.launch().await + } + pub async fn destroy(&self, uuid: Uuid) -> Result { let mut context = self.context.lock().await; let info = context @@ -305,3 +331,44 @@ fn path_as_string(path: &Path) -> Result { .ok_or_else(|| anyhow!("unable to convert path to string")) .map(|x| x.to_string()) } + +struct ExitCodeWatch { + store: XsdClient, + handle: XsdWatchHandle, + uuid: Uuid, + sender: Sender<(Uuid, i32)>, + path: String, +} + +impl ExitCodeWatch { + pub async fn launch(mut self) -> Result> { + Ok(tokio::task::spawn(async move { + if let Err(error) = self.process().await { + error!("failed to watch exit for guest {}: {}", self.uuid, error); + } + })) + } + + async fn process(&mut self) -> Result<()> { + loop { + match self.handle.receiver.recv().await { + Some(_) => { + let exit_code_string = self.store.read_string(&self.path).await?; + if let Some(exit_code) = exit_code_string.and_then(|x| i32::from_str(&x).ok()) { + match self.sender.try_send((self.uuid, exit_code)) { + Ok(_) => {} + Err(error) => { + return Err(error.into()); + } + } + return Ok(()); + } + } + + None => { + return Ok(()); + } + } + } + } +}