diff --git a/controller/bin/control.rs b/controller/bin/control.rs index 3d0b6a2..09d4da8 100644 --- a/controller/bin/control.rs +++ b/controller/bin/control.rs @@ -1,7 +1,10 @@ use anyhow::{anyhow, Result}; use clap::{Parser, Subcommand}; use env_logger::Env; -use krata::control::{DestroyGuestRequest, LaunchGuestRequest, ListGuestsRequest}; +use krata::control::{ + watch_events_reply::Event, DestroyGuestRequest, LaunchGuestRequest, ListGuestsRequest, + WatchEventsRequest, +}; use kratactl::{client::ControlClientProvider, console::StdioConsoleStream}; use tonic::Request; @@ -18,7 +21,6 @@ struct ControllerArgs { #[derive(Subcommand, Debug)] enum Commands { List {}, - Launch { #[arg(short, long, default_value_t = 1)] cpus: u32, @@ -41,6 +43,7 @@ enum Commands { #[arg()] guest: String, }, + Watch {}, } #[tokio::main] @@ -116,6 +119,35 @@ async fn main() -> Result<()> { println!("{}", table.to_string()); } } + + Commands::Watch {} => { + let response = client + .watch_events(Request::new(WatchEventsRequest {})) + .await?; + let mut stream = response.into_inner(); + while let Some(reply) = stream.message().await? { + let Some(event) = reply.event else { + continue; + }; + + match event { + Event::GuestLaunched(launched) => { + println!("event=guest.launched guest={}", launched.guest_id); + } + + Event::GuestDestroyed(destroyed) => { + println!("event=guest.destroyed guest={}", destroyed.guest_id); + } + + Event::GuestExited(exited) => { + println!( + "event=guest.exited guest={} code={}", + exited.guest_id, exited.code + ); + } + } + } + } } Ok(()) } diff --git a/controller/src/console.rs b/controller/src/console.rs index ded6a2b..9387b26 100644 --- a/controller/src/console.rs +++ b/controller/src/console.rs @@ -21,7 +21,7 @@ impl StdioConsoleStream { pub async fn stdin_stream(guest: String) -> impl Stream { let mut stdin = stdin(); stream! { - yield ConsoleDataRequest { guest, data: vec![] }; + yield ConsoleDataRequest { guest_id: guest, data: vec![] }; let mut buffer = vec![0u8; 60]; loop { @@ -36,7 +36,7 @@ impl StdioConsoleStream { if size == 1 && buffer[0] == 0x1d { break; } - yield ConsoleDataRequest { guest: String::default(), data }; + yield ConsoleDataRequest { guest_id: String::default(), data }; } } } diff --git a/daemon/src/control.rs b/daemon/src/control.rs index efeaf28..3678d8c 100644 --- a/daemon/src/control.rs +++ b/daemon/src/control.rs @@ -5,7 +5,7 @@ use futures::Stream; use krata::control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, DestroyGuestReply, DestroyGuestRequest, GuestInfo, LaunchGuestReply, LaunchGuestRequest, - ListGuestsReply, ListGuestsRequest, + ListGuestsReply, ListGuestsRequest, WatchEventsReply, WatchEventsRequest, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -14,7 +14,10 @@ use tokio::{ use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; -use crate::runtime::{launch::GuestLaunchRequest, Runtime}; +use crate::{ + event::DaemonEventContext, + runtime::{launch::GuestLaunchRequest, Runtime}, +}; pub struct ApiError { message: String, @@ -36,12 +39,13 @@ impl From for Status { #[derive(Clone)] pub struct RuntimeControlService { + events: DaemonEventContext, runtime: Runtime, } impl RuntimeControlService { - pub fn new(runtime: Runtime) -> Self { - Self { runtime } + pub fn new(events: DaemonEventContext, runtime: Runtime) -> Self { + Self { events, runtime } } } @@ -55,6 +59,9 @@ impl ControlService for RuntimeControlService { type ConsoleDataStream = Pin> + Send + 'static>>; + type WatchEventsStream = + Pin> + Send + 'static>>; + async fn launch_guest( &self, request: Request, @@ -115,7 +122,7 @@ impl ControlService for RuntimeControlService { let request = request?; let mut console = self .runtime - .console(&request.guest) + .console(&request.guest_id) .await .map_err(ApiError::from)?; @@ -150,6 +157,20 @@ impl ControlService for RuntimeControlService { Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream)) } + + async fn watch_events( + &self, + request: Request, + ) -> Result, Status> { + let _ = request.into_inner(); + let mut events = self.events.subscribe(); + let output = try_stream! { + while let Ok(event) = events.recv().await { + yield WatchEventsReply { event: Some(event), }; + } + }; + Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) + } } impl From for GuestInfo { diff --git a/daemon/src/event.rs b/daemon/src/event.rs new file mode 100644 index 0000000..25f8604 --- /dev/null +++ b/daemon/src/event.rs @@ -0,0 +1,112 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::Result; +use krata::control::{GuestDestroyedEvent, GuestExitedEvent, GuestLaunchedEvent}; +use log::error; +use tokio::{sync::broadcast, task::JoinHandle, time}; +use uuid::Uuid; + +use crate::runtime::{GuestInfo, Runtime}; + +pub type DaemonEvent = krata::control::watch_events_reply::Event; + +const EVENT_CHANNEL_QUEUE_LEN: usize = 1000; + +#[derive(Clone)] +pub struct DaemonEventContext { + sender: broadcast::Sender, +} + +impl DaemonEventContext { + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} + +pub struct DaemonEventGenerator { + runtime: Runtime, + last: HashMap, + sender: broadcast::Sender, +} + +impl DaemonEventGenerator { + pub async fn new(runtime: Runtime) -> Result<(DaemonEventContext, DaemonEventGenerator)> { + let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); + let generator = DaemonEventGenerator { + runtime, + last: HashMap::new(), + sender: sender.clone(), + }; + let context = DaemonEventContext { sender }; + Ok((context, generator)) + } + + async fn evaluate(&mut self) -> Result<()> { + let guests = self.runtime.list().await?; + let guests = { + let mut map = HashMap::new(); + for guest in guests { + map.insert(guest.uuid, guest); + } + map + }; + + let mut events: Vec = Vec::new(); + + for uuid in guests.keys() { + if !self.last.contains_key(uuid) { + events.push(DaemonEvent::GuestLaunched(GuestLaunchedEvent { + guest_id: uuid.to_string(), + })); + } + } + + for uuid in self.last.keys() { + if !guests.contains_key(uuid) { + events.push(DaemonEvent::GuestDestroyed(GuestDestroyedEvent { + guest_id: uuid.to_string(), + })); + } + } + + for (uuid, guest) in &guests { + let Some(last) = self.last.get(uuid) else { + continue; + }; + + if last.state.exit_code.is_some() { + continue; + } + + let Some(code) = guest.state.exit_code else { + continue; + }; + + events.push(DaemonEvent::GuestExited(GuestExitedEvent { + guest_id: uuid.to_string(), + code, + })); + } + + self.last = guests; + + for event in events { + let _ = self.sender.send(event); + } + + Ok(()) + } + + pub async fn launch(mut self) -> Result> { + Ok(tokio::task::spawn(async move { + loop { + if let Err(error) = self.evaluate().await { + error!("failed to evaluate daemon events: {}", error); + time::sleep(Duration::from_secs(5)).await; + } else { + time::sleep(Duration::from_millis(500)).await; + } + } + })) + } +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index a4c0a16..5cf8a9b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -2,28 +2,39 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use anyhow::Result; use control::RuntimeControlService; +use event::{DaemonEventContext, DaemonEventGenerator}; use krata::{control::control_service_server::ControlServiceServer, dial::ControlDialAddress}; use log::info; use runtime::Runtime; -use tokio::net::UnixListener; +use tokio::{net::UnixListener, task::JoinHandle}; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::{Identity, Server, ServerTlsConfig}; pub mod control; +pub mod event; pub mod runtime; pub struct Daemon { store: String, runtime: Runtime, + events: DaemonEventContext, + task: JoinHandle<()>, } impl Daemon { pub async fn new(store: String, runtime: Runtime) -> Result { - Ok(Self { store, runtime }) + let runtime_for_events = runtime.dupe().await?; + let (events, generator) = DaemonEventGenerator::new(runtime_for_events).await?; + Ok(Self { + store, + runtime, + events, + task: generator.launch().await?, + }) } pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { - let control_service = RuntimeControlService::new(self.runtime.clone()); + let control_service = RuntimeControlService::new(self.events.clone(), self.runtime.clone()); let mut server = Server::builder(); @@ -72,3 +83,9 @@ impl Daemon { Ok(()) } } + +impl Drop for Daemon { + fn drop(&mut self) { + self.task.abort(); + } +} diff --git a/daemon/src/runtime/launch/mod.rs b/daemon/src/runtime/launch/mod.rs index 8d811c6..2e493a6 100644 --- a/daemon/src/runtime/launch/mod.rs +++ b/daemon/src/runtime/launch/mod.rs @@ -15,7 +15,7 @@ use crate::runtime::cfgblk::ConfigBlock; use crate::runtime::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}; use crate::runtime::RuntimeContext; -use super::GuestInfo; +use super::{GuestInfo, GuestState}; pub struct GuestLaunchRequest<'a> { pub image: &'a str, @@ -192,6 +192,7 @@ impl GuestLauncher { IpAddr::V6(guest_ipv6), ipv6_network_mask as u8, )?), + state: GuestState { exit_code: None }, }), Err(error) => { let _ = context.autoloop.unloop(&image_squashfs_loop.path); diff --git a/daemon/src/runtime/mod.rs b/daemon/src/runtime/mod.rs index 6d520a6..d10b876 100644 --- a/daemon/src/runtime/mod.rs +++ b/daemon/src/runtime/mod.rs @@ -27,6 +27,10 @@ pub struct ContainerLoopInfo { pub delete: Option, } +pub struct GuestState { + pub exit_code: Option, +} + pub struct GuestInfo { pub uuid: Uuid, pub domid: u32, @@ -34,6 +38,7 @@ pub struct GuestInfo { pub loops: Vec, pub ipv4: Option, pub ipv6: Option, + pub state: GuestState, } pub struct RuntimeContext { @@ -115,6 +120,19 @@ impl RuntimeContext { None }; + let exit_code = self + .xen + .store + .read_string(&format!("{}/krata/guest/exit-code", &dom_path)) + .await?; + + let exit_code: Option = match exit_code { + Some(code) => code.parse().ok(), + None => None, + }; + + let state = GuestState { exit_code }; + let loops = RuntimeContext::parse_loop_set(&loops); guests.push(GuestInfo { uuid, @@ -123,6 +141,7 @@ impl RuntimeContext { loops, ipv4, ipv6, + state, }); } Ok(guests) @@ -165,13 +184,15 @@ impl RuntimeContext { #[derive(Clone)] pub struct Runtime { + store: Arc, context: Arc>, } impl Runtime { pub async fn new(store: String) -> Result { - let context = RuntimeContext::new(store).await?; + let context = RuntimeContext::new(store.clone()).await?; Ok(Self { + store: Arc::new(store), context: Arc::new(Mutex::new(context)), }) } @@ -244,4 +265,8 @@ impl Runtime { let mut context = self.context.lock().await; context.list().await } + + pub async fn dupe(&self) -> Result { + Runtime::new((*self.store).clone()).await + } } diff --git a/shared/proto/krata/control.proto b/shared/proto/krata/control.proto index c3801bb..aeefab3 100644 --- a/shared/proto/krata/control.proto +++ b/shared/proto/krata/control.proto @@ -38,7 +38,7 @@ message DestroyGuestRequest { message DestroyGuestReply {} message ConsoleDataRequest { - string guest = 1; + string guest_id = 1; bytes data = 2; } @@ -46,11 +46,34 @@ message ConsoleDataReply { bytes data = 1; } +message WatchEventsRequest {} + +message GuestLaunchedEvent { + string guest_id = 1; +} + +message GuestDestroyedEvent { + string guest_id = 1; +} + +message GuestExitedEvent { + string guest_id = 1; + int32 code = 2; +} + +message WatchEventsReply { + oneof event { + GuestLaunchedEvent guest_launched = 1; + GuestDestroyedEvent guest_destroyed = 2; + GuestExitedEvent guest_exited = 3; + } +} + service ControlService { rpc LaunchGuest(LaunchGuestRequest) returns (LaunchGuestReply); rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); - rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); - rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); + + rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); }