diff --git a/crates/kratactl/bin/control.rs b/crates/kratactl/bin/control.rs index 2bf1e80..3aad58f 100644 --- a/crates/kratactl/bin/control.rs +++ b/crates/kratactl/bin/control.rs @@ -11,9 +11,8 @@ use krata::{ WatchEventsRequest, }, }; -use kratactl::{client::ControlClientProvider, console::StdioConsoleStream}; +use kratactl::{client::ControlClientProvider, console::StdioConsoleStream, events::EventStream}; use log::error; -use tokio_stream::StreamExt; use tonic::Request; #[derive(Parser, Debug)] @@ -62,10 +61,13 @@ async fn main() -> Result<()> { let args = ControllerArgs::parse(); let mut client = ControlClientProvider::dial(args.connection.parse()?).await?; - let mut events = client - .watch_events(WatchEventsRequest {}) - .await? - .into_inner(); + let events = EventStream::open( + client + .watch_events(WatchEventsRequest {}) + .await? + .into_inner(), + ) + .await?; match args.command { Commands::Launch { @@ -95,41 +97,7 @@ async fn main() -> Result<()> { .into_inner(); let id = response.guest_id; if attach { - while let Some(event) = events.next().await { - let reply = event?; - match reply.event { - Some(Event::GuestChanged(changed)) => { - let Some(guest) = changed.guest else { - continue; - }; - - if guest.id != id { - continue; - } - - let Some(state) = guest.state else { - continue; - }; - - if let Some(ref error) = state.error_info { - error!("guest error: {}", error.message); - } - - if state.status() == GuestStatus::Destroyed { - error!("guest destroyed"); - std::process::exit(1); - } - - if state.status() == GuestStatus::Started { - break; - } - } - - None => { - continue; - } - } - } + wait_guest_started(&id, events.clone()).await?; let input = StdioConsoleStream::stdin_stream(id.clone()).await; let output = client.console_data(input).await?.into_inner(); let exit_hook_task = @@ -208,19 +176,17 @@ async fn main() -> Result<()> { } Commands::Watch {} => { - let response = client - .watch_events(Request::new(WatchEventsRequest {})) - .await?; - let mut stream = response.into_inner(); - while let Some(reply) = stream.message().await? { - let Some(event) = reply.event else { - continue; - }; - + let mut stream = events.subscribe(); + loop { + let event = stream.recv().await?; match event { Event::GuestChanged(changed) => { if let Some(guest) = changed.guest { - println!("event=guest.changed guest={}", guest.id); + println!( + "event=guest.changed guest={} status={}", + guest.id, + guest_status_text(guest.state.unwrap_or_default().status()) + ); } } } @@ -232,7 +198,7 @@ async fn main() -> Result<()> { fn guest_status_text(status: GuestStatus) -> String { match status { - GuestStatus::Unknown => "unknown", + GuestStatus::Destroy => "destroying", GuestStatus::Destroyed => "destroyed", GuestStatus::Start => "starting", GuestStatus::Exited => "exited", @@ -254,3 +220,38 @@ fn guest_state_text(state: GuestState) -> String { } text } + +async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> { + let mut stream = events.subscribe(); + while let Ok(event) = stream.recv().await { + match event { + Event::GuestChanged(changed) => { + let Some(guest) = changed.guest else { + continue; + }; + + if guest.id != id { + continue; + } + + let Some(state) = guest.state else { + continue; + }; + + if let Some(ref error) = state.error_info { + error!("guest error: {}", error.message); + } + + if state.status() == GuestStatus::Destroyed { + error!("guest destroyed"); + std::process::exit(1); + } + + if state.status() == GuestStatus::Started { + break; + } + } + } + } + Ok(()) +} diff --git a/crates/kratactl/src/console.rs b/crates/kratactl/src/console.rs index 38aeec8..c0ba8f2 100644 --- a/crates/kratactl/src/console.rs +++ b/crates/kratactl/src/console.rs @@ -7,9 +7,9 @@ use anyhow::Result; use async_stream::stream; use krata::{ common::GuestStatus, - control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest, WatchEventsReply}, + control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest}, }; -use log::{debug, error, warn}; +use log::{debug, warn}; use termion::raw::IntoRawMode; use tokio::{ fs::File, @@ -19,6 +19,8 @@ use tokio::{ use tokio_stream::{Stream, StreamExt}; use tonic::Streaming; +use crate::events::EventStream; + pub struct StdioConsoleStream; impl StdioConsoleStream { @@ -59,46 +61,31 @@ impl StdioConsoleStream { Ok(()) } - pub async fn guest_exit_hook( - id: String, - mut events: Streaming, - ) -> Result> { + pub async fn guest_exit_hook(id: String, events: EventStream) -> Result> { Ok(tokio::task::spawn(async move { - while let Some(result) = events.next().await { - match result { - Err(error) => { - error!("failed to handle events for exit hook: {}", error); - break; - } - - Ok(reply) => { - let Some(event) = reply.event else { + let mut stream = events.subscribe(); + while let Ok(event) = stream.recv().await { + match event { + Event::GuestChanged(changed) => { + let Some(guest) = changed.guest else { continue; }; - match event { - Event::GuestChanged(changed) => { - let Some(guest) = changed.guest else { - continue; - }; + let Some(state) = guest.state else { + continue; + }; - let Some(state) = guest.state else { - continue; - }; + if guest.id != id { + continue; + } - if guest.id != id { - continue; - } + if let Some(exit_info) = state.exit_info { + std::process::exit(exit_info.code); + } - if let Some(exit_info) = state.exit_info { - std::process::exit(exit_info.code); - } - - if state.status() == GuestStatus::Destroyed { - warn!("attached guest was destroyed"); - std::process::exit(1); - } - } + if state.status() == GuestStatus::Destroy { + warn!("attached guest was destroyed"); + std::process::exit(1); } } } diff --git a/crates/kratactl/src/events.rs b/crates/kratactl/src/events.rs new file mode 100644 index 0000000..ef8e291 --- /dev/null +++ b/crates/kratactl/src/events.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use anyhow::Result; +use krata::control::{watch_events_reply::Event, WatchEventsReply}; +use log::trace; +use tokio::{sync::broadcast, task::JoinHandle}; +use tokio_stream::StreamExt; +use tonic::Streaming; + +#[derive(Clone)] +pub struct EventStream { + sender: Arc>, + task: Arc>, +} + +impl EventStream { + pub async fn open(mut events: Streaming) -> Result { + let (sender, _) = broadcast::channel(1000); + let emit = sender.clone(); + let task = tokio::task::spawn(async move { + loop { + let Some(result) = events.next().await else { + break; + }; + + let reply = match result { + Ok(reply) => reply, + Err(error) => { + trace!("event stream processing failed: {}", error); + break; + } + }; + + let Some(event) = reply.event else { + continue; + }; + let _ = emit.send(event); + } + }); + Ok(Self { + sender: Arc::new(sender), + task: Arc::new(task), + }) + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} + +impl Drop for EventStream { + fn drop(&mut self) { + if Arc::strong_count(&self.task) <= 1 { + self.task.abort(); + } + } +} diff --git a/crates/kratactl/src/lib.rs b/crates/kratactl/src/lib.rs index 5393e6a..d52da44 100644 --- a/crates/kratactl/src/lib.rs +++ b/crates/kratactl/src/lib.rs @@ -1,2 +1,3 @@ pub mod client; pub mod console; +pub mod events; diff --git a/crates/kratad/src/control.rs b/crates/kratad/src/control.rs index c71f23d..5b619cc 100644 --- a/crates/kratad/src/control.rs +++ b/crates/kratad/src/control.rs @@ -191,11 +191,10 @@ impl ControlService for RuntimeControlService { .into()); }; let request = request?; - let mut console = self - .runtime - .console(&request.guest_id) - .await - .map_err(ApiError::from)?; + let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { + message: error.to_string(), + })?; + let mut console = self.runtime.console(uuid).await.map_err(ApiError::from)?; let output = try_stream! { let mut buffer: Vec = vec![0u8; 256]; diff --git a/crates/kratad/src/event.rs b/crates/kratad/src/event.rs index 36828a8..7fd9913 100644 --- a/crates/kratad/src/event.rs +++ b/crates/kratad/src/event.rs @@ -1,10 +1,7 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Result; -use krata::{ - common::{GuestExitInfo, GuestState, GuestStatus}, - control::watch_events_reply::Event, -}; +use krata::common::{GuestExitInfo, GuestState, GuestStatus}; use log::error; use tokio::{ sync::{broadcast, mpsc::Sender}, @@ -42,7 +39,7 @@ pub struct DaemonEventGenerator { guests: GuestStore, guest_reconciler_notify: Sender, last: HashMap, - _sender: broadcast::Sender, + _sender: broadcast::Sender, } impl DaemonEventGenerator { diff --git a/crates/kratad/src/reconcile/mod.rs b/crates/kratad/src/reconcile/mod.rs index f218e1d..1daa934 100644 --- a/crates/kratad/src/reconcile/mod.rs +++ b/crates/kratad/src/reconcile/mod.rs @@ -107,12 +107,8 @@ impl GuestReconciler { }))?; let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() { - GuestStatus::Start => self.start(uuid, guest).await.map(|_| true), - - GuestStatus::Destroy | GuestStatus::Exited => { - self.destroy(uuid, guest).await.map(|_| true) - } - + GuestStatus::Start => self.start(uuid, guest).await, + GuestStatus::Destroy | GuestStatus::Exited => self.destroy(uuid, guest).await, _ => Ok(false), }; @@ -149,7 +145,7 @@ impl GuestReconciler { Ok(()) } - async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result<()> { + async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result { let Some(ref spec) = guest.spec else { return Err(anyhow!("guest spec not specified")); }; @@ -191,11 +187,11 @@ impl GuestReconciler { exit_info: None, error_info: None, }); - Ok(()) + Ok(true) } - async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result<()> { - self.runtime.destroy(&uuid.to_string()).await?; + async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result { + self.runtime.destroy(uuid).await?; info!("destroyed guest {}", uuid); guest.network = None; guest.state = Some(GuestState { @@ -203,7 +199,7 @@ impl GuestReconciler { exit_info: None, error_info: None, }); - Ok(()) + Ok(true) } } diff --git a/crates/kratart/src/launch/mod.rs b/crates/kratart/src/launch/mod.rs index 019bb08..781c0e0 100644 --- a/crates/kratart/src/launch/mod.rs +++ b/crates/kratart/src/launch/mod.rs @@ -45,7 +45,7 @@ impl GuestLauncher { request: GuestLaunchRequest<'r>, ) -> Result { let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); - let name = format!("krata-{uuid}"); + let xen_name = format!("krata-{uuid}"); let image_info = self.compile(request.image, &context.image_cache).await?; let mut gateway_mac = MacAddr6::random(); @@ -161,7 +161,7 @@ impl GuestLauncher { let config = DomainConfig { backend_domid: 0, - name: &name, + name: &xen_name, max_vcpus: request.vcpus, mem_mb: request.mem, kernel_path: &context.kernel, diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index 0bd97b8..a29fcbb 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -178,18 +178,9 @@ impl RuntimeContext { Ok(guests) } - pub async fn resolve(&mut self, id: &str) -> Result> { + pub async fn resolve(&mut self, uuid: Uuid) -> Result> { for guest in self.list().await? { - let uuid_string = guest.uuid.to_string(); - let domid_string = guest.domid.to_string(); - - if let Some(ref name) = guest.name { - if name == id { - return Ok(Some(guest)); - } - } - - if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) { + if guest.uuid == uuid { return Ok(Some(guest)); } } @@ -241,12 +232,12 @@ impl Runtime { launcher.launch(&mut context, request).await } - pub async fn destroy(&self, id: &str) -> Result { + pub async fn destroy(&self, uuid: Uuid) -> Result { let mut context = self.context.lock().await; let info = context - .resolve(id) + .resolve(uuid) .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; + .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; let domid = info.domid; let mut store = XsdClient::open().await?; let dom_path = store.get_domain_path(domid).await?; @@ -288,12 +279,12 @@ impl Runtime { Ok(uuid) } - pub async fn console(&self, id: &str) -> Result { + pub async fn console(&self, uuid: Uuid) -> Result { let mut context = self.context.lock().await; let info = context - .resolve(id) + .resolve(uuid) .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; + .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; let domid = info.domid; let tty = context.xen.get_console_path(domid).await?; XenConsole::new(&tty).await