From 4955ed3a1cfbc4c443e95f5f058f92ce5405fe8a Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Wed, 14 Feb 2024 20:50:11 +0000 Subject: [PATCH] xenevtchn: implement async processing model --- libs/xen/xenevtchn/Cargo.toml | 1 + libs/xen/xenevtchn/examples/simple.rs | 11 +- libs/xen/xenevtchn/src/error.rs | 2 + libs/xen/xenevtchn/src/lib.rs | 141 ++++++++++++++++++++++---- 4 files changed, 132 insertions(+), 23 deletions(-) diff --git a/libs/xen/xenevtchn/Cargo.toml b/libs/xen/xenevtchn/Cargo.toml index 1e05e6c..fc583a0 100644 --- a/libs/xen/xenevtchn/Cargo.toml +++ b/libs/xen/xenevtchn/Cargo.toml @@ -6,6 +6,7 @@ resolver = "2" [dependencies] thiserror = { workspace = true } +tokio = { workspace = true } log = { workspace = true } [dependencies.nix] diff --git a/libs/xen/xenevtchn/examples/simple.rs b/libs/xen/xenevtchn/examples/simple.rs index 85d6b43..3704448 100644 --- a/libs/xen/xenevtchn/examples/simple.rs +++ b/libs/xen/xenevtchn/examples/simple.rs @@ -1,11 +1,12 @@ use xenevtchn::error::Result; use xenevtchn::EventChannel; -fn main() -> Result<()> { - let mut channel = EventChannel::open()?; - println!("Channel opened."); - let port = channel.bind_unbound_port(1)?; +#[tokio::main] +async fn main() -> Result<()> { + let channel = EventChannel::open().await?; + println!("channel opened"); + let port = channel.bind_unbound_port(0).await?; println!("port: {}", port); - channel.unbind(port)?; + channel.unbind(port).await?; Ok(()) } diff --git a/libs/xen/xenevtchn/src/error.rs b/libs/xen/xenevtchn/src/error.rs index 6c5f7b5..3db25c7 100644 --- a/libs/xen/xenevtchn/src/error.rs +++ b/libs/xen/xenevtchn/src/error.rs @@ -6,6 +6,8 @@ pub enum Error { Kernel(#[from] nix::errno::Errno), #[error("io issue encountered")] Io(#[from] io::Error), + #[error("failed to send event channel wake")] + WakeSend(tokio::sync::broadcast::error::SendError), } pub type Result = std::result::Result; diff --git a/libs/xen/xenevtchn/src/lib.rs b/libs/xen/xenevtchn/src/lib.rs index 165362c..cc670a7 100644 --- a/libs/xen/xenevtchn/src/lib.rs +++ b/libs/xen/xenevtchn/src/lib.rs @@ -1,66 +1,171 @@ pub mod error; pub mod sys; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}; -use std::fs::{File, OpenOptions}; +use log::error; +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::os::fd::AsRawFd; +use std::sync::Arc; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::select; +use tokio::sync::broadcast::{ + channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender, +}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +const UNMASK_CHANNEL_QUEUE_LEN: usize = 30; +const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30; + +type WakeMap = Arc>>>; + +#[derive(Clone)] pub struct EventChannel { - pub handle: File, + handle: Arc>, + wakes: WakeMap, + unmask_sender: Sender, + task: Arc>, } impl EventChannel { - pub fn open() -> Result { + pub async fn open() -> Result { let file = OpenOptions::new() .read(true) .write(true) - .open("/dev/xen/evtchn")?; - Ok(EventChannel { handle: file }) + .open("/dev/xen/evtchn") + .await?; + + let wakes = Arc::new(Mutex::new(HashMap::new())); + let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN); + let task = { + let file = file.try_clone().await?; + let wakes = wakes.clone(); + tokio::task::spawn(async move { + if let Err(error) = EventChannel::process(file, wakes, unmask_receiver).await { + error!("event channel processor failed: {}", error); + } + }) + }; + Ok(EventChannel { + handle: Arc::new(Mutex::new(file)), + wakes, + unmask_sender, + task: Arc::new(task), + }) } - pub fn bind_virq(&mut self, virq: u32) -> Result { + pub async fn bind_virq(&self, virq: u32) -> Result { + let handle = self.handle.lock().await; unsafe { let mut request = BindVirq { virq }; - Ok(sys::bind_virq(self.handle.as_raw_fd(), &mut request)? as u32) + Ok(sys::bind_virq(handle.as_raw_fd(), &mut request)? as u32) } } - pub fn bind_interdomain(&mut self, domid: u32, port: u32) -> Result { + pub async fn bind_interdomain(&self, domid: u32, port: u32) -> Result { + let handle = self.handle.lock().await; unsafe { let mut request = BindInterdomain { remote_domain: domid, remote_port: port, }; - Ok(sys::bind_interdomain(self.handle.as_raw_fd(), &mut request)? as u32) + Ok(sys::bind_interdomain(handle.as_raw_fd(), &mut request)? as u32) } } - pub fn bind_unbound_port(&mut self, domid: u32) -> Result { + pub async fn bind_unbound_port(&self, domid: u32) -> Result { + let handle = self.handle.lock().await; unsafe { let mut request = BindUnboundPort { remote_domain: domid, }; - Ok(sys::bind_unbound_port(self.handle.as_raw_fd(), &mut request)? as u32) + Ok(sys::bind_unbound_port(handle.as_raw_fd(), &mut request)? as u32) } } - pub fn unbind(&mut self, port: u32) -> Result { + pub async fn unbind(&self, port: u32) -> Result { + let handle = self.handle.lock().await; unsafe { let mut request = UnbindPort { port }; - Ok(sys::unbind(self.handle.as_raw_fd(), &mut request)? as u32) + Ok(sys::unbind(handle.as_raw_fd(), &mut request)? as u32) } } - pub fn notify(&mut self, port: u32) -> Result { + pub async fn notify(&self, port: u32) -> Result { + let handle = self.handle.lock().await; unsafe { let mut request = Notify { port }; - Ok(sys::notify(self.handle.as_raw_fd(), &mut request)? as u32) + Ok(sys::notify(handle.as_raw_fd(), &mut request)? as u32) } } - pub fn reset(&mut self) -> Result { - unsafe { Ok(sys::reset(self.handle.as_raw_fd())? as u32) } + pub async fn reset(&self) -> Result { + let handle = self.handle.lock().await; + unsafe { Ok(sys::reset(handle.as_raw_fd())? as u32) } + } + + pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver, Sender)> { + let mut wakes = self.wakes.lock().await; + let receiver = match wakes.entry(port) { + Entry::Occupied(entry) => entry.get().subscribe(), + + Entry::Vacant(entry) => { + let (sender, receiver) = broadcast_channel::(BROADCAST_CHANNEL_QUEUE_LEN); + entry.insert(sender); + receiver + } + }; + Ok((receiver, self.unmask_sender.clone())) + } + + async fn process( + mut file: File, + wakers: WakeMap, + mut unmask_receiver: Receiver, + ) -> Result<()> { + loop { + select! { + result = file.read_u32_le() => { + match result { + Ok(port) => { + if let Some(sender) = wakers.lock().await.get(&port) { + if let Err(error) = sender.send(port) { + return Err(Error::WakeSend(error)); + } + } + } + + Err(error) => return Err(Error::Io(error)) + } + } + + result = unmask_receiver.recv() => { + match result { + Some(port) => { + file.write_u32_le(port).await?; + } + + None => { + break; + } + } + } + }; + } + + Ok(()) + } +} + +impl Drop for EventChannel { + fn drop(&mut self) { + if Arc::strong_count(&self.task) <= 1 { + self.task.abort(); + } } }