krata: implement event notifications

This commit is contained in:
Alex Zenla 2024-03-06 15:57:56 +00:00
parent e300fd924f
commit 7c8d38a0ca
No known key found for this signature in database
GPG Key ID: 067B238899B51269
8 changed files with 248 additions and 17 deletions

View File

@ -1,7 +1,10 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use env_logger::Env; use env_logger::Env;
use krata::control::{DestroyGuestRequest, LaunchGuestRequest, ListGuestsRequest}; use krata::control::{
watch_events_reply::Event, DestroyGuestRequest, LaunchGuestRequest, ListGuestsRequest,
WatchEventsRequest,
};
use kratactl::{client::ControlClientProvider, console::StdioConsoleStream}; use kratactl::{client::ControlClientProvider, console::StdioConsoleStream};
use tonic::Request; use tonic::Request;
@ -18,7 +21,6 @@ struct ControllerArgs {
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
enum Commands { enum Commands {
List {}, List {},
Launch { Launch {
#[arg(short, long, default_value_t = 1)] #[arg(short, long, default_value_t = 1)]
cpus: u32, cpus: u32,
@ -41,6 +43,7 @@ enum Commands {
#[arg()] #[arg()]
guest: String, guest: String,
}, },
Watch {},
} }
#[tokio::main] #[tokio::main]
@ -116,6 +119,35 @@ async fn main() -> Result<()> {
println!("{}", table.to_string()); println!("{}", table.to_string());
} }
} }
Commands::Watch {} => {
let response = client
.watch_events(Request::new(WatchEventsRequest {}))
.await?;
let mut stream = response.into_inner();
while let Some(reply) = stream.message().await? {
let Some(event) = reply.event else {
continue;
};
match event {
Event::GuestLaunched(launched) => {
println!("event=guest.launched guest={}", launched.guest_id);
}
Event::GuestDestroyed(destroyed) => {
println!("event=guest.destroyed guest={}", destroyed.guest_id);
}
Event::GuestExited(exited) => {
println!(
"event=guest.exited guest={} code={}",
exited.guest_id, exited.code
);
}
}
}
}
} }
Ok(()) Ok(())
} }

View File

@ -21,7 +21,7 @@ impl StdioConsoleStream {
pub async fn stdin_stream(guest: String) -> impl Stream<Item = ConsoleDataRequest> { pub async fn stdin_stream(guest: String) -> impl Stream<Item = ConsoleDataRequest> {
let mut stdin = stdin(); let mut stdin = stdin();
stream! { stream! {
yield ConsoleDataRequest { guest, data: vec![] }; yield ConsoleDataRequest { guest_id: guest, data: vec![] };
let mut buffer = vec![0u8; 60]; let mut buffer = vec![0u8; 60];
loop { loop {
@ -36,7 +36,7 @@ impl StdioConsoleStream {
if size == 1 && buffer[0] == 0x1d { if size == 1 && buffer[0] == 0x1d {
break; break;
} }
yield ConsoleDataRequest { guest: String::default(), data }; yield ConsoleDataRequest { guest_id: String::default(), data };
} }
} }
} }

View File

@ -5,7 +5,7 @@ use futures::Stream;
use krata::control::{ use krata::control::{
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
DestroyGuestReply, DestroyGuestRequest, GuestInfo, LaunchGuestReply, LaunchGuestRequest, DestroyGuestReply, DestroyGuestRequest, GuestInfo, LaunchGuestReply, LaunchGuestRequest,
ListGuestsReply, ListGuestsRequest, ListGuestsReply, ListGuestsRequest, WatchEventsReply, WatchEventsRequest,
}; };
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
@ -14,7 +14,10 @@ use tokio::{
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming}; use tonic::{Request, Response, Status, Streaming};
use crate::runtime::{launch::GuestLaunchRequest, Runtime}; use crate::{
event::DaemonEventContext,
runtime::{launch::GuestLaunchRequest, Runtime},
};
pub struct ApiError { pub struct ApiError {
message: String, message: String,
@ -36,12 +39,13 @@ impl From<ApiError> for Status {
#[derive(Clone)] #[derive(Clone)]
pub struct RuntimeControlService { pub struct RuntimeControlService {
events: DaemonEventContext,
runtime: Runtime, runtime: Runtime,
} }
impl RuntimeControlService { impl RuntimeControlService {
pub fn new(runtime: Runtime) -> Self { pub fn new(events: DaemonEventContext, runtime: Runtime) -> Self {
Self { runtime } Self { events, runtime }
} }
} }
@ -55,6 +59,9 @@ impl ControlService for RuntimeControlService {
type ConsoleDataStream = type ConsoleDataStream =
Pin<Box<dyn Stream<Item = Result<ConsoleDataReply, Status>> + Send + 'static>>; Pin<Box<dyn Stream<Item = Result<ConsoleDataReply, Status>> + Send + 'static>>;
type WatchEventsStream =
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
async fn launch_guest( async fn launch_guest(
&self, &self,
request: Request<LaunchGuestRequest>, request: Request<LaunchGuestRequest>,
@ -115,7 +122,7 @@ impl ControlService for RuntimeControlService {
let request = request?; let request = request?;
let mut console = self let mut console = self
.runtime .runtime
.console(&request.guest) .console(&request.guest_id)
.await .await
.map_err(ApiError::from)?; .map_err(ApiError::from)?;
@ -150,6 +157,20 @@ impl ControlService for RuntimeControlService {
Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream)) Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream))
} }
async fn watch_events(
&self,
request: Request<WatchEventsRequest>,
) -> Result<Response<Self::WatchEventsStream>, Status> {
let _ = request.into_inner();
let mut events = self.events.subscribe();
let output = try_stream! {
while let Ok(event) = events.recv().await {
yield WatchEventsReply { event: Some(event), };
}
};
Ok(Response::new(Box::pin(output) as Self::WatchEventsStream))
}
} }
impl From<crate::runtime::GuestInfo> for GuestInfo { impl From<crate::runtime::GuestInfo> for GuestInfo {

112
daemon/src/event.rs Normal file
View File

@ -0,0 +1,112 @@
use std::{collections::HashMap, time::Duration};
use anyhow::Result;
use krata::control::{GuestDestroyedEvent, GuestExitedEvent, GuestLaunchedEvent};
use log::error;
use tokio::{sync::broadcast, task::JoinHandle, time};
use uuid::Uuid;
use crate::runtime::{GuestInfo, Runtime};
pub type DaemonEvent = krata::control::watch_events_reply::Event;
const EVENT_CHANNEL_QUEUE_LEN: usize = 1000;
#[derive(Clone)]
pub struct DaemonEventContext {
sender: broadcast::Sender<DaemonEvent>,
}
impl DaemonEventContext {
pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
self.sender.subscribe()
}
}
pub struct DaemonEventGenerator {
runtime: Runtime,
last: HashMap<Uuid, GuestInfo>,
sender: broadcast::Sender<DaemonEvent>,
}
impl DaemonEventGenerator {
pub async fn new(runtime: Runtime) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
let generator = DaemonEventGenerator {
runtime,
last: HashMap::new(),
sender: sender.clone(),
};
let context = DaemonEventContext { sender };
Ok((context, generator))
}
async fn evaluate(&mut self) -> Result<()> {
let guests = self.runtime.list().await?;
let guests = {
let mut map = HashMap::new();
for guest in guests {
map.insert(guest.uuid, guest);
}
map
};
let mut events: Vec<DaemonEvent> = Vec::new();
for uuid in guests.keys() {
if !self.last.contains_key(uuid) {
events.push(DaemonEvent::GuestLaunched(GuestLaunchedEvent {
guest_id: uuid.to_string(),
}));
}
}
for uuid in self.last.keys() {
if !guests.contains_key(uuid) {
events.push(DaemonEvent::GuestDestroyed(GuestDestroyedEvent {
guest_id: uuid.to_string(),
}));
}
}
for (uuid, guest) in &guests {
let Some(last) = self.last.get(uuid) else {
continue;
};
if last.state.exit_code.is_some() {
continue;
}
let Some(code) = guest.state.exit_code else {
continue;
};
events.push(DaemonEvent::GuestExited(GuestExitedEvent {
guest_id: uuid.to_string(),
code,
}));
}
self.last = guests;
for event in events {
let _ = self.sender.send(event);
}
Ok(())
}
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
Ok(tokio::task::spawn(async move {
loop {
if let Err(error) = self.evaluate().await {
error!("failed to evaluate daemon events: {}", error);
time::sleep(Duration::from_secs(5)).await;
} else {
time::sleep(Duration::from_millis(500)).await;
}
}
}))
}
}

View File

@ -2,28 +2,39 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr};
use anyhow::Result; use anyhow::Result;
use control::RuntimeControlService; use control::RuntimeControlService;
use event::{DaemonEventContext, DaemonEventGenerator};
use krata::{control::control_service_server::ControlServiceServer, dial::ControlDialAddress}; use krata::{control::control_service_server::ControlServiceServer, dial::ControlDialAddress};
use log::info; use log::info;
use runtime::Runtime; use runtime::Runtime;
use tokio::net::UnixListener; use tokio::{net::UnixListener, task::JoinHandle};
use tokio_stream::wrappers::UnixListenerStream; use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic::transport::{Identity, Server, ServerTlsConfig};
pub mod control; pub mod control;
pub mod event;
pub mod runtime; pub mod runtime;
pub struct Daemon { pub struct Daemon {
store: String, store: String,
runtime: Runtime, runtime: Runtime,
events: DaemonEventContext,
task: JoinHandle<()>,
} }
impl Daemon { impl Daemon {
pub async fn new(store: String, runtime: Runtime) -> Result<Self> { pub async fn new(store: String, runtime: Runtime) -> Result<Self> {
Ok(Self { store, runtime }) let runtime_for_events = runtime.dupe().await?;
let (events, generator) = DaemonEventGenerator::new(runtime_for_events).await?;
Ok(Self {
store,
runtime,
events,
task: generator.launch().await?,
})
} }
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
let control_service = RuntimeControlService::new(self.runtime.clone()); let control_service = RuntimeControlService::new(self.events.clone(), self.runtime.clone());
let mut server = Server::builder(); let mut server = Server::builder();
@ -72,3 +83,9 @@ impl Daemon {
Ok(()) Ok(())
} }
} }
impl Drop for Daemon {
fn drop(&mut self) {
self.task.abort();
}
}

View File

@ -15,7 +15,7 @@ use crate::runtime::cfgblk::ConfigBlock;
use crate::runtime::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}; use crate::runtime::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo};
use crate::runtime::RuntimeContext; use crate::runtime::RuntimeContext;
use super::GuestInfo; use super::{GuestInfo, GuestState};
pub struct GuestLaunchRequest<'a> { pub struct GuestLaunchRequest<'a> {
pub image: &'a str, pub image: &'a str,
@ -192,6 +192,7 @@ impl GuestLauncher {
IpAddr::V6(guest_ipv6), IpAddr::V6(guest_ipv6),
ipv6_network_mask as u8, ipv6_network_mask as u8,
)?), )?),
state: GuestState { exit_code: None },
}), }),
Err(error) => { Err(error) => {
let _ = context.autoloop.unloop(&image_squashfs_loop.path); let _ = context.autoloop.unloop(&image_squashfs_loop.path);

View File

@ -27,6 +27,10 @@ pub struct ContainerLoopInfo {
pub delete: Option<String>, pub delete: Option<String>,
} }
pub struct GuestState {
pub exit_code: Option<i32>,
}
pub struct GuestInfo { pub struct GuestInfo {
pub uuid: Uuid, pub uuid: Uuid,
pub domid: u32, pub domid: u32,
@ -34,6 +38,7 @@ pub struct GuestInfo {
pub loops: Vec<ContainerLoopInfo>, pub loops: Vec<ContainerLoopInfo>,
pub ipv4: Option<IpNetwork>, pub ipv4: Option<IpNetwork>,
pub ipv6: Option<IpNetwork>, pub ipv6: Option<IpNetwork>,
pub state: GuestState,
} }
pub struct RuntimeContext { pub struct RuntimeContext {
@ -115,6 +120,19 @@ impl RuntimeContext {
None None
}; };
let exit_code = self
.xen
.store
.read_string(&format!("{}/krata/guest/exit-code", &dom_path))
.await?;
let exit_code: Option<i32> = match exit_code {
Some(code) => code.parse().ok(),
None => None,
};
let state = GuestState { exit_code };
let loops = RuntimeContext::parse_loop_set(&loops); let loops = RuntimeContext::parse_loop_set(&loops);
guests.push(GuestInfo { guests.push(GuestInfo {
uuid, uuid,
@ -123,6 +141,7 @@ impl RuntimeContext {
loops, loops,
ipv4, ipv4,
ipv6, ipv6,
state,
}); });
} }
Ok(guests) Ok(guests)
@ -165,13 +184,15 @@ impl RuntimeContext {
#[derive(Clone)] #[derive(Clone)]
pub struct Runtime { pub struct Runtime {
store: Arc<String>,
context: Arc<Mutex<RuntimeContext>>, context: Arc<Mutex<RuntimeContext>>,
} }
impl Runtime { impl Runtime {
pub async fn new(store: String) -> Result<Self> { pub async fn new(store: String) -> Result<Self> {
let context = RuntimeContext::new(store).await?; let context = RuntimeContext::new(store.clone()).await?;
Ok(Self { Ok(Self {
store: Arc::new(store),
context: Arc::new(Mutex::new(context)), context: Arc::new(Mutex::new(context)),
}) })
} }
@ -244,4 +265,8 @@ impl Runtime {
let mut context = self.context.lock().await; let mut context = self.context.lock().await;
context.list().await context.list().await
} }
pub async fn dupe(&self) -> Result<Runtime> {
Runtime::new((*self.store).clone()).await
}
} }

View File

@ -38,7 +38,7 @@ message DestroyGuestRequest {
message DestroyGuestReply {} message DestroyGuestReply {}
message ConsoleDataRequest { message ConsoleDataRequest {
string guest = 1; string guest_id = 1;
bytes data = 2; bytes data = 2;
} }
@ -46,11 +46,34 @@ message ConsoleDataReply {
bytes data = 1; bytes data = 1;
} }
message WatchEventsRequest {}
message GuestLaunchedEvent {
string guest_id = 1;
}
message GuestDestroyedEvent {
string guest_id = 1;
}
message GuestExitedEvent {
string guest_id = 1;
int32 code = 2;
}
message WatchEventsReply {
oneof event {
GuestLaunchedEvent guest_launched = 1;
GuestDestroyedEvent guest_destroyed = 2;
GuestExitedEvent guest_exited = 3;
}
}
service ControlService { service ControlService {
rpc LaunchGuest(LaunchGuestRequest) returns (LaunchGuestReply); rpc LaunchGuest(LaunchGuestRequest) returns (LaunchGuestReply);
rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply);
rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply);
rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply);
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
} }