From 092a243a83dff50ec747c8839f9a235e8c5f9fee Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Wed, 10 Apr 2024 09:26:00 +0000 Subject: [PATCH] feat: initial support for idm send in daemon --- crates/daemon/src/idm.rs | 102 +++++++++++++++++++++++---------- crates/guest/bin/init.rs | 2 + crates/guest/src/background.rs | 4 +- crates/runtime/src/console.rs | 18 ------ crates/runtime/src/lib.rs | 13 ----- 5 files changed, 75 insertions(+), 64 deletions(-) delete mode 100644 crates/runtime/src/console.rs diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index abca368..18ba0f2 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -7,8 +7,9 @@ use kratart::channel::ChannelService; use log::{error, warn}; use prost::Message; use tokio::{ + select, sync::{ - mpsc::{Receiver, Sender}, + mpsc::{channel, Receiver, Sender}, Mutex, }, task::JoinHandle, @@ -19,16 +20,23 @@ type ListenerMap = Arc>>>; #[derive(Clone)] pub struct DaemonIdmHandle { listeners: ListenerMap, + tx_sender: Sender<(u32, IdmPacket)>, task: Arc>, } #[derive(Clone)] pub struct DaemonIdmSubscribeHandle { domid: u32, + tx_sender: Sender<(u32, IdmPacket)>, listeners: ListenerMap, } impl DaemonIdmSubscribeHandle { + pub async fn send(&self, packet: IdmPacket) -> Result<()> { + self.tx_sender.send((self.domid, packet)).await?; + Ok(()) + } + pub async fn unsubscribe(&self) -> Result<()> { let mut guard = self.listeners.lock().await; let _ = guard.remove(&self.domid); @@ -37,6 +45,11 @@ impl DaemonIdmSubscribeHandle { } impl DaemonIdmHandle { + pub async fn send(&self, domid: u32, packet: IdmPacket) -> Result<()> { + self.tx_sender.send((domid, packet)).await?; + Ok(()) + } + pub async fn subscribe( &self, domid: u32, @@ -46,6 +59,7 @@ impl DaemonIdmHandle { guard.insert(domid, sender); Ok(DaemonIdmSubscribeHandle { domid, + tx_sender: self.tx_sender.clone(), listeners: self.listeners.clone(), }) } @@ -61,17 +75,25 @@ impl Drop for DaemonIdmHandle { pub struct DaemonIdm { listeners: ListenerMap, - receiver: Receiver<(u32, Vec)>, + tx_sender: Sender<(u32, IdmPacket)>, + tx_raw_sender: Sender<(u32, Vec)>, + tx_receiver: Receiver<(u32, IdmPacket)>, + rx_receiver: Receiver<(u32, Vec)>, task: JoinHandle<()>, } impl DaemonIdm { pub async fn new() -> Result { - let (service, _, receiver) = ChannelService::new("krata-channel".to_string(), None).await?; + let (service, tx_raw_sender, rx_receiver) = + ChannelService::new("krata-channel".to_string(), None).await?; + let (tx_sender, tx_receiver) = channel(100); let task = service.launch().await?; let listeners = Arc::new(Mutex::new(HashMap::new())); Ok(DaemonIdm { - receiver, + rx_receiver, + tx_receiver, + tx_sender, + tx_raw_sender, task, listeners, }) @@ -79,6 +101,7 @@ impl DaemonIdm { pub async fn launch(mut self) -> Result { let listeners = self.listeners.clone(); + let tx_sender = self.tx_sender.clone(); let task = tokio::task::spawn(async move { let mut buffers: HashMap = HashMap::new(); if let Err(error) = self.process(&mut buffers).await { @@ -87,42 +110,59 @@ impl DaemonIdm { }); Ok(DaemonIdmHandle { listeners, + tx_sender, task: Arc::new(task), }) } async fn process(&mut self, buffers: &mut HashMap) -> Result<()> { loop { - let Some((domid, data)) = self.receiver.recv().await else { - break; - }; - - let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); - buffer.extend_from_slice(&data); - if buffer.len() < 2 { - continue; - } - let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize; - let needed = size + 2; - if buffer.len() < needed { - continue; - } - let mut packet = buffer.split_to(needed); - packet.advance(2); - match IdmPacket::decode(packet) { - Ok(packet) => { - let guard = self.listeners.lock().await; - if let Some(sender) = guard.get(&domid) { - if let Err(error) = sender.try_send((domid, packet)) { - warn!("dropped idm packet from domain {}: {}", domid, error); + select! { + x = self.rx_receiver.recv() => match x { + Some((domid, data)) => { + let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); + buffer.extend_from_slice(&data); + if buffer.len() < 2 { + continue; } + let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize; + let needed = size + 2; + if buffer.len() < needed { + continue; + } + let mut packet = buffer.split_to(needed); + packet.advance(2); + match IdmPacket::decode(packet) { + Ok(packet) => { + let guard = self.listeners.lock().await; + if let Some(sender) = guard.get(&domid) { + if let Err(error) = sender.try_send((domid, packet)) { + warn!("dropped idm packet from domain {}: {}", domid, error); + } + } + } + + Err(packet) => { + warn!("received invalid packet from domain {}: {}", domid, packet); + } + } + }, + + None => { + break; + } + }, + x = self.tx_receiver.recv() => match x { + Some((domid, packet)) => { + let data = packet.encode_to_vec(); + self.tx_raw_sender.send((domid, data)).await?; + }, + + None => { + break; } } - - Err(packet) => { - warn!("received invalid packet from domain {}: {}", domid, packet); - } - } + }; } Ok(()) } diff --git a/crates/guest/bin/init.rs b/crates/guest/bin/init.rs index 11d7b6c..62678e2 100644 --- a/crates/guest/bin/init.rs +++ b/crates/guest/bin/init.rs @@ -23,6 +23,8 @@ async fn main() -> Result<()> { if let Err(error) = guest.init().await { error!("failed to initialize guest: {}", error); death(127).await?; + return Ok(()); } + death(1).await?; Ok(()) } diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index 8252a6f..264bd9e 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -8,7 +8,7 @@ use krata::idm::{ client::IdmClient, protocol::{idm_event::Event, IdmEvent, IdmExitEvent, IdmPacket}, }; -use log::error; +use log::debug; use nix::unistd::Pid; use tokio::select; @@ -38,7 +38,7 @@ impl GuestBackground { }, None => { - error!("idm packet channel closed"); + debug!("idm packet channel closed"); break; } }, diff --git a/crates/runtime/src/console.rs b/crates/runtime/src/console.rs deleted file mode 100644 index 7571226..0000000 --- a/crates/runtime/src/console.rs +++ /dev/null @@ -1,18 +0,0 @@ -use anyhow::Result; -use tokio::fs::File; - -pub struct XenConsole { - pub read_handle: File, - pub write_handle: File, -} - -impl XenConsole { - pub async fn new(tty: &str) -> Result { - let read_handle = File::options().read(true).write(false).open(tty).await?; - let write_handle = File::options().read(false).write(true).open(tty).await?; - Ok(XenConsole { - read_handle, - write_handle, - }) - } -} diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index f6e81d8..1c42763 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -15,7 +15,6 @@ use xenstore::{XsdClient, XsdInterface}; use self::{ autoloop::AutoLoop, - console::XenConsole, launch::{GuestLaunchRequest, GuestLauncher}, }; use krataoci::cache::ImageCache; @@ -23,7 +22,6 @@ use krataoci::cache::ImageCache; pub mod autoloop; pub mod cfgblk; pub mod channel; -pub mod console; pub mod launch; pub struct GuestLoopInfo { @@ -321,17 +319,6 @@ impl Runtime { Ok(uuid) } - pub async fn console(&self, uuid: Uuid) -> Result { - let info = self - .context - .resolve(uuid) - .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; - let domid = info.domid; - let tty = self.context.xen.get_console_path(domid).await?; - XenConsole::new(&tty).await - } - pub async fn list(&self) -> Result> { self.context.list().await }