diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/internal/idm.proto index f824799..015fe5a 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/internal/idm.proto @@ -6,12 +6,16 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.internal.idm"; option java_outer_classname = "IdmProto"; -message IdmExitMessage { +message IdmExitEvent { int32 code = 1; } -message IdmPacket { - oneof message { - IdmExitMessage exit = 1; +message IdmEvent { + oneof event { + IdmExitEvent exit = 1; } } + +message IdmPacket { + IdmEvent event = 1; +} diff --git a/crates/kratactl/src/cli/list.rs b/crates/kratactl/src/cli/list.rs index 4a0e83b..06d9fef 100644 --- a/crates/kratactl/src/cli/list.rs +++ b/crates/kratactl/src/cli/list.rs @@ -4,7 +4,7 @@ use cli_tables::Table; use krata::{ events::EventStream, v1::{ - common::{guest_image_spec::Image, Guest}, + common::{guest_image_spec::Image, Guest, GuestStatus}, control::{ control_service_client::ControlServiceClient, ListGuestsRequest, ResolveGuestRequest, }, @@ -14,7 +14,7 @@ use krata::{ use serde_json::Value; use tonic::{transport::Channel, Request}; -use crate::format::{guest_state_text, kv2line, proto2dynamic, proto2kv}; +use crate::format::{guest_state_text, guest_status_text, kv2line, proto2dynamic, proto2kv}; #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] enum ListFormat { @@ -24,6 +24,7 @@ enum ListFormat { Jsonl, Yaml, KeyValue, + Simple, } #[derive(Parser)] @@ -40,7 +41,7 @@ impl ListCommand { mut client: ControlServiceClient, _events: EventStream, ) -> Result<()> { - let guests = if let Some(ref guest) = self.guest { + let mut guests = if let Some(ref guest) = self.guest { let reply = client .resolve_guest(Request::new(ResolveGuestRequest { name: guest.clone(), @@ -60,11 +61,36 @@ impl ListCommand { .guests }; + guests.sort_by(|a, b| { + a.spec + .as_ref() + .map(|x| x.name.as_str()) + .unwrap_or("") + .cmp(b.spec.as_ref().map(|x| x.name.as_str()).unwrap_or("")) + }); + match self.format { ListFormat::CliTable => { self.print_guest_table(guests)?; } + ListFormat::Simple => { + for guest in guests { + let state = guest_status_text( + guest + .state + .as_ref() + .map(|x| x.status()) + .unwrap_or(GuestStatus::Unknown), + ); + let name = guest.spec.as_ref().map(|x| x.name.as_str()).unwrap_or(""); + let network = guest.state.as_ref().and_then(|x| x.network.as_ref()); + let ipv4 = network.map(|x| x.guest_ipv4.as_str()).unwrap_or(""); + let ipv6 = network.map(|x| x.guest_ipv6.as_str()).unwrap_or(""); + println!("{}\t{}\t{}\t{}\t{}", guest.id, state, name, ipv4, ipv6); + } + } + ListFormat::Json | ListFormat::JsonPretty | ListFormat::Yaml => { let mut values = Vec::new(); for guest in guests { diff --git a/crates/kratad/src/event.rs b/crates/kratad/src/event.rs index cb5c8d8..cdf275c 100644 --- a/crates/kratad/src/event.rs +++ b/crates/kratad/src/event.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use krata::{ - idm::protocol::{idm_packet::Message, IdmPacket}, + idm::protocol::{idm_event::Event, IdmPacket}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; use log::error; @@ -117,7 +117,7 @@ impl DaemonEventGenerator { } async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> { - if let Some(Message::Exit(exit)) = packet.message { + if let Some(Event::Exit(exit)) = packet.event.and_then(|x| x.event) { self.handle_exit_code(id, exit.code).await?; } Ok(()) diff --git a/crates/krataguest/src/background.rs b/crates/krataguest/src/background.rs index d114a2e..d0cc092 100644 --- a/crates/krataguest/src/background.rs +++ b/crates/krataguest/src/background.rs @@ -5,7 +5,7 @@ use crate::{ use anyhow::Result; use krata::idm::{ client::IdmClient, - protocol::{idm_packet::Message, IdmExitMessage, IdmPacket}, + protocol::{idm_event::Event, IdmEvent, IdmExitEvent, IdmPacket}, }; use log::error; use nix::unistd::Pid; @@ -56,7 +56,9 @@ impl GuestBackground { self.idm .sender .send(IdmPacket { - message: Some(Message::Exit(IdmExitMessage { code: event.status })), + event: Some(IdmEvent { + event: Some(Event::Exit(IdmExitEvent { code: event.status })), + }), }) .await?; death(event.status).await?; diff --git a/crates/kratanet/src/backend.rs b/crates/kratanet/src/backend.rs index d8965c7..2953099 100644 --- a/crates/kratanet/src/backend.rs +++ b/crates/kratanet/src/backend.rs @@ -153,7 +153,7 @@ impl NetworkBackend { pub async fn launch(self) -> Result> { Ok(tokio::task::spawn(async move { info!( - "lauched network backend for krata guest {}", + "launched network backend for krata guest {}", self.metadata.uuid ); if let Err(error) = self.run().await { diff --git a/crates/kratanet/src/raw_socket.rs b/crates/kratanet/src/raw_socket.rs index 0ddfaac..e1389a6 100644 --- a/crates/kratanet/src/raw_socket.rs +++ b/crates/kratanet/src/raw_socket.rs @@ -291,6 +291,12 @@ impl AsyncRawSocketChannel { debug!("failed to transmit: would block"); continue; } + + // device no longer exists + if error.raw_os_error() == Some(6) { + break; + } + return Err(anyhow!( "failed to write {} bytes to raw socket: {}", packet.len(), diff --git a/crates/kratart/src/channel.rs b/crates/kratart/src/channel.rs index 07348f3..b702f09 100644 --- a/crates/kratart/src/channel.rs +++ b/crates/kratart/src/channel.rs @@ -79,10 +79,11 @@ impl ChannelService { async fn process(&mut self) -> Result<()> { self.scan_all_backends().await?; - let mut watch_handle = self.store.create_watch().await?; - self.store - .bind_watch(&watch_handle, "/local/domain/0/backend/console".to_string()) + let mut watch_handle = self + .store + .create_watch("/local/domain/0/backend/console") .await?; + self.store.bind_watch(&watch_handle).await?; loop { select! { x = watch_handle.receiver.recv() => match x { @@ -310,10 +311,11 @@ impl KrataChannelBackendProcessor { mut receiver: Receiver>, ) -> Result<()> { self.init().await?; - let mut frontend_state_change = self.store.create_watch().await?; - self.store - .bind_watch(&frontend_state_change, format!("{}/state", self.frontend)) + let mut frontend_state_change = self + .store + .create_watch(format!("{}/state", self.frontend)) .await?; + self.store.bind_watch(&frontend_state_change).await?; let (ring_ref, port) = loop { match frontend_state_change.receiver.recv().await { @@ -382,10 +384,11 @@ impl KrataChannelBackendProcessor { } }; - let mut self_state_change = self.store.create_watch().await?; - self.store - .bind_watch(&self_state_change, format!("{}/state", self.backend)) + let mut self_state_change = self + .store + .create_watch(format!("{}/state", self.backend)) .await?; + self.store.bind_watch(&self_state_change).await?; loop { select! { x = self_state_change.receiver.recv() => match x { diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index 3e9851e..44bd2a9 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -7,15 +7,11 @@ use std::{ use anyhow::{anyhow, Result}; use ipnetwork::IpNetwork; -use log::error; use loopdev::LoopControl; -use tokio::{ - sync::{mpsc::Sender, Mutex}, - task::JoinHandle, -}; +use tokio::sync::Mutex; use uuid::Uuid; use xenclient::XenClient; -use xenstore::{XsdClient, XsdInterface, XsdWatchHandle}; +use xenstore::{XsdClient, XsdInterface}; use self::{ autoloop::AutoLoop, @@ -30,7 +26,7 @@ pub mod channel; pub mod console; pub mod launch; -pub struct ContainerLoopInfo { +pub struct GuestLoopInfo { pub device: String, pub file: String, pub delete: Option, @@ -45,7 +41,7 @@ pub struct GuestInfo { pub uuid: Uuid, pub domid: u32, pub image: String, - pub loops: Vec, + pub loops: Vec, pub guest_ipv4: Option, pub guest_ipv6: Option, pub guest_mac: Option, @@ -231,7 +227,7 @@ impl RuntimeContext { Ok(None) } - fn parse_loop_set(input: &Option) -> Vec { + fn parse_loop_set(input: &Option) -> Vec { let Some(input) = input else { return Vec::new(); }; @@ -242,7 +238,7 @@ impl RuntimeContext { .map(|x| (x[0].clone(), x[1].clone(), x[2].clone())) .collect::>(); sets.iter() - .map(|(device, file, delete)| ContainerLoopInfo { + .map(|(device, file, delete)| GuestLoopInfo { device: device.clone(), file: file.clone(), delete: if delete == "none" { @@ -251,7 +247,7 @@ impl RuntimeContext { Some(delete.clone()) }, }) - .collect::>() + .collect::>() } } @@ -276,29 +272,6 @@ impl Runtime { launcher.launch(&mut context, request).await } - pub async fn subscribe_exit_code( - &self, - uuid: Uuid, - sender: Sender<(Uuid, i32)>, - ) -> Result> { - let mut context = self.context.lock().await; - let info = context - .resolve(uuid) - .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; - let path = format!("/local/domain/{}/krata/guest/exit-code", info.domid); - let handle = context.xen.store.create_watch().await?; - context.xen.store.bind_watch(&handle, &path).await?; - let watch = ExitCodeWatch { - handle, - sender, - store: context.xen.store.clone(), - uuid, - path, - }; - watch.launch().await - } - pub async fn destroy(&self, uuid: Uuid) -> Result { let mut context = self.context.lock().await; let info = context @@ -372,44 +345,3 @@ fn path_as_string(path: &Path) -> Result { .ok_or_else(|| anyhow!("unable to convert path to string")) .map(|x| x.to_string()) } - -struct ExitCodeWatch { - store: XsdClient, - handle: XsdWatchHandle, - uuid: Uuid, - sender: Sender<(Uuid, i32)>, - path: String, -} - -impl ExitCodeWatch { - pub async fn launch(mut self) -> Result> { - Ok(tokio::task::spawn(async move { - if let Err(error) = self.process().await { - error!("failed to watch exit for guest {}: {}", self.uuid, error); - } - })) - } - - async fn process(&mut self) -> Result<()> { - loop { - match self.handle.receiver.recv().await { - Some(_) => { - let exit_code_string = self.store.read_string(&self.path).await?; - if let Some(exit_code) = exit_code_string.and_then(|x| i32::from_str(&x).ok()) { - match self.sender.try_send((self.uuid, exit_code)) { - Ok(_) => {} - Err(error) => { - return Err(error.into()); - } - } - return Ok(()); - } - } - - None => { - return Ok(()); - } - } - } - } -} diff --git a/crates/xen/xenclient/src/lib.rs b/crates/xen/xenclient/src/lib.rs index c1b8535..54fb6a6 100644 --- a/crates/xen/xenclient/src/lib.rs +++ b/crates/xen/xenclient/src/lib.rs @@ -20,12 +20,12 @@ use crate::boot::BootSetup; use crate::elfloader::ElfImageLoader; use crate::error::{Error, Result}; use boot::BootState; -use log::{trace, warn}; +use log::{debug, trace, warn}; +use tokio::time::timeout; use std::fs::read; use std::path::PathBuf; use std::str::FromStr; -use std::thread; use std::time::Duration; use uuid::Uuid; use xencall::sys::{CreateDomain, XEN_DOMCTL_CDF_HAP, XEN_DOMCTL_CDF_HVM_GUEST}; @@ -759,6 +759,7 @@ impl XenClient { for backend in &backend_paths { let state_path = format!("{}/state", backend); + let mut watch = self.store.create_watch(&state_path).await?; let online_path = format!("{}/online", backend); let tx = self.store.transaction().await?; let state = tx.read_string(&state_path).await?.unwrap_or(String::new()); @@ -769,22 +770,25 @@ impl XenClient { if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 { tx.write_string(&state_path, "5").await?; } + self.store.bind_watch(&watch).await?; tx.commit().await?; let mut count: u32 = 0; loop { - if count >= 100 { - warn!("unable to safely destroy backend: {}", backend); + if count >= 3 { + debug!("unable to safely destroy backend: {}", backend); break; } - let Some(state) = self.store.read_string(&state_path).await? else { - break; - }; + let _ = timeout(Duration::from_secs(1), watch.receiver.recv()).await; + let state = self + .store + .read_string(&state_path) + .await? + .unwrap_or_else(|| "6".to_string()); let state = i64::from_str(&state).unwrap_or(-1); if state == 6 { break; } - thread::sleep(Duration::from_millis(100)); count += 1; } } @@ -818,7 +822,7 @@ impl XenClient { if tty.is_some() { break; } - thread::sleep(Duration::from_millis(200)); + tokio::time::sleep(Duration::from_millis(200)).await; } let Some(tty) = tty else { return Err(Error::TtyNotFound); diff --git a/crates/xen/xenstore/examples/watch.rs b/crates/xen/xenstore/examples/watch.rs index 989265f..021b049 100644 --- a/crates/xen/xenstore/examples/watch.rs +++ b/crates/xen/xenstore/examples/watch.rs @@ -7,8 +7,8 @@ async fn main() -> Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let path = args().nth(1).unwrap_or("/local/domain".to_string()); let client = XsdClient::open().await?; - let mut handle = client.create_watch().await?; - client.bind_watch(&handle, path).await?; + let mut handle = client.create_watch(path).await?; + client.bind_watch(&handle).await?; let mut count = 0; loop { let Some(event) = handle.receiver.recv().await else { diff --git a/crates/xen/xenstore/src/bus.rs b/crates/xen/xenstore/src/bus.rs index 1cc9fa0..fc209c6 100644 --- a/crates/xen/xenstore/src/bus.rs +++ b/crates/xen/xenstore/src/bus.rs @@ -65,7 +65,7 @@ pub struct XsdSocket { next_watch_id: Arc>, processor_task: Arc>, rx_task: Arc>, - unwatch_sender: Sender, + unwatch_sender: Sender<(u32, String)>, } impl XsdSocket { @@ -100,7 +100,7 @@ impl XsdSocket { let (rx_sender, rx_receiver) = channel::(10); let (tx_sender, tx_receiver) = channel::(10); - let (unwatch_sender, unwatch_receiver) = channel::(1000); + let (unwatch_sender, unwatch_receiver) = channel::<(u32, String)>(1000); let read: File = handle.try_clone().await?; let mut processor = XsdSocketProcessor { @@ -141,7 +141,7 @@ impl XsdSocket { let req = { let mut guard = self.next_request_id.lock().await; let req = *guard; - *guard = req + 1; + *guard = req.wrapping_add(1); req }; let (sender, receiver) = oneshot_channel::(); @@ -177,12 +177,12 @@ impl XsdSocket { self.send_buf(tx, typ, &buf).await } - pub async fn add_watch(&self) -> Result<(u32, Receiver, Sender)> { + pub async fn add_watch(&self) -> Result<(u32, Receiver, Sender<(u32, String)>)> { let id = { let mut guard = self.next_watch_id.lock().await; - let req = *guard; - *guard = req + 1; - req + let watch = *guard; + *guard = watch.wrapping_add(1); + watch }; let (sender, receiver) = channel(10); self.watches.lock().await.insert(id, WatchState { sender }); @@ -197,7 +197,7 @@ struct XsdSocketProcessor { next_request_id: Arc>, tx_receiver: Receiver, rx_receiver: Receiver, - unwatch_receiver: Receiver, + unwatch_receiver: Receiver<(u32, String)>, } impl XsdSocketProcessor { @@ -326,16 +326,18 @@ impl XsdSocketProcessor { }, x = self.unwatch_receiver.recv() => match x { - Some(id) => { + Some((id, path)) => { let req = { let mut guard = self.next_request_id.lock().await; let req = *guard; - *guard = req + 1; + *guard = req.wrapping_add(1); req }; let mut payload = id.to_string().as_bytes().to_vec(); payload.push(0); + payload.extend_from_slice(path.to_string().as_bytes()); + payload.push(0); let header = XsdMessageHeader { typ: XSD_UNWATCH, req, diff --git a/crates/xen/xenstore/src/lib.rs b/crates/xen/xenstore/src/lib.rs index badb1c0..c515673 100644 --- a/crates/xen/xenstore/src/lib.rs +++ b/crates/xen/xenstore/src/lib.rs @@ -43,14 +43,15 @@ impl XsPermission { } pub struct XsdWatchHandle { + pub path: String, pub id: u32, - unwatch_sender: Sender, + unwatch_sender: Sender<(u32, String)>, pub receiver: Receiver, } impl Drop for XsdWatchHandle { fn drop(&mut self) { - let _ = self.unwatch_sender.try_send(self.id); + let _ = self.unwatch_sender.try_send((self.id, self.path.clone())); } } @@ -192,17 +193,18 @@ impl XsdClient { response.parse_bool() } - pub async fn create_watch(&self) -> Result { + pub async fn create_watch>(&self, path: P) -> Result { let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; Ok(XsdWatchHandle { + path: path.as_ref().to_string(), id, receiver, unwatch_sender, }) } - pub async fn bind_watch>(&self, handle: &XsdWatchHandle, path: P) -> Result<()> { - self.bind_watch_id(handle.id, path).await + pub async fn bind_watch(&self, handle: &XsdWatchHandle) -> Result<()> { + self.bind_watch_id(handle.id, &handle.path).await } pub async fn bind_watch_id>(&self, id: u32, path: P) -> Result<()> {