diff --git a/Cargo.lock b/Cargo.lock index c963b86..0de9e57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1527,6 +1527,7 @@ dependencies = [ name = "krata-xenevtchn" version = "0.0.15" dependencies = [ + "byteorder", "libc", "log", "nix 0.29.0", diff --git a/crates/runtime/src/channel.rs b/crates/runtime/src/channel.rs index f4a815d..30e9418 100644 --- a/crates/runtime/src/channel.rs +++ b/crates/runtime/src/channel.rs @@ -8,14 +8,11 @@ use anyhow::{anyhow, Result}; use log::{debug, error}; use tokio::{ select, - sync::{ - broadcast, - mpsc::{channel, Receiver, Sender}, - }, + sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, time::sleep, }; -use xenevtchn::EventChannel; +use xenevtchn::EventChannelService; use xengnt::{sys::GrantRef, GrantTab, MappedMemory}; use xenstore::{XsdClient, XsdInterface}; @@ -43,7 +40,7 @@ pub struct ChannelService { typ: String, use_reserved_ref: Option, backends: HashMap, - evtchn: EventChannel, + evtchn: EventChannelService, store: XsdClient, gnttab: GrantTab, input_receiver: Receiver<(u32, Vec)>, @@ -64,7 +61,7 @@ impl ChannelService { let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); debug!("opening Xen event channel"); - let evtchn = EventChannel::open().await?; + let evtchn = EventChannelService::open().await?; debug!("opening XenStore"); let store = XsdClient::open().await?; debug!("opening GrantTab"); @@ -234,7 +231,7 @@ impl ChannelBackend { domid: u32, id: u32, store: XsdClient, - evtchn: EventChannel, + evtchn: EventChannelService, gnttab: GrantTab, output_sender: Sender<(u32, Option>)>, use_reserved_ref: Option, @@ -273,7 +270,7 @@ pub struct KrataChannelBackendProcessor { id: u32, domid: u32, store: XsdClient, - evtchn: EventChannel, + evtchn: EventChannelService, gnttab: GrantTab, } @@ -492,25 +489,18 @@ impl KrataChannelBackendProcessor { }, x = channel.receiver.recv() => match x { - Ok(_) => { + Some(_) => { unsafe { let buffer = self.read_output_buffer(channel.local_port, &memory).await?; if !buffer.is_empty() { sender.send((self.domid, Some(buffer))).await?; } }; - channel.unmask_sender.send(channel.local_port).await?; + channel.unmask().await?; }, - Err(error) => { - match error { - broadcast::error::RecvError::Closed => { - break; - }, - error => { - return Err(anyhow!("failed to receive event notification: {}", error)); - } - } + None => { + break; } } }; diff --git a/crates/runtime/src/ip.rs b/crates/runtime/src/ip.rs index ce69a15..4c7771b 100644 --- a/crates/runtime/src/ip.rs +++ b/crates/runtime/src/ip.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::{anyhow, Result}; use ipnetwork::{Ipv4Network, Ipv6Network}; -use log::{debug, error, trace}; +use log::{debug, error}; use tokio::sync::RwLock; use uuid::Uuid; use xenstore::{XsdClient, XsdInterface}; diff --git a/crates/runtime/src/power.rs b/crates/runtime/src/power.rs index 6dafbde..aacf4bf 100644 --- a/crates/runtime/src/power.rs +++ b/crates/runtime/src/power.rs @@ -167,7 +167,10 @@ impl PowerManagementContext { .set_cpufreq_gov(CpuId::All, policy) .await .unwrap_or_else(|error| { - info!("non-fatal error while setting scheduler policy: {:?}", error); + info!( + "non-fatal error while setting scheduler policy: {:?}", + error + ); }); Ok(()) } diff --git a/crates/xen/xenevtchn/Cargo.toml b/crates/xen/xenevtchn/Cargo.toml index 6a4d9d9..417f1a1 100644 --- a/crates/xen/xenevtchn/Cargo.toml +++ b/crates/xen/xenevtchn/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" resolver = "2" [dependencies] +byteorder = { workspace = true } libc = { workspace = true } log = { workspace = true } thiserror = { workspace = true } diff --git a/crates/xen/xenevtchn/examples/simple.rs b/crates/xen/xenevtchn/examples/simple.rs index 3704448..b7c8861 100644 --- a/crates/xen/xenevtchn/examples/simple.rs +++ b/crates/xen/xenevtchn/examples/simple.rs @@ -1,9 +1,9 @@ use xenevtchn::error::Result; -use xenevtchn::EventChannel; +use xenevtchn::EventChannelService; #[tokio::main] async fn main() -> Result<()> { - let channel = EventChannel::open().await?; + let channel = EventChannelService::open().await?; println!("channel opened"); let port = channel.bind_unbound_port(0).await?; println!("port: {}", port); diff --git a/crates/xen/xenevtchn/src/error.rs b/crates/xen/xenevtchn/src/error.rs index 5ece1ad..ec9e729 100644 --- a/crates/xen/xenevtchn/src/error.rs +++ b/crates/xen/xenevtchn/src/error.rs @@ -8,6 +8,10 @@ pub enum Error { Io(#[from] io::Error), #[error("failed to send event channel wake: {0}")] WakeSend(tokio::sync::broadcast::error::SendError), + #[error("failed to acquire lock")] + LockAcquireFailed, + #[error("event port already in use")] + PortInUse, } pub type Result = std::result::Result; diff --git a/crates/xen/xenevtchn/src/lib.rs b/crates/xen/xenevtchn/src/lib.rs index 0fb78c6..1960343 100644 --- a/crates/xen/xenevtchn/src/lib.rs +++ b/crates/xen/xenevtchn/src/lib.rs @@ -1,82 +1,78 @@ pub mod error; +pub mod raw; pub mod sys; use crate::error::{Error, Result}; use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}; -use log::error; +use crate::raw::EVENT_CHANNEL_DEVICE; +use byteorder::{LittleEndian, ReadBytesExt}; +use log::warn; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::mem::size_of; use std::os::fd::AsRawFd; use std::os::raw::c_void; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncReadExt; -use tokio::select; -use tokio::sync::broadcast::{ - channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender, -}; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; +use tokio::sync::{Mutex, RwLock}; -const UNBIND_CHANNEL_QUEUE_LEN: usize = 30; -const UNMASK_CHANNEL_QUEUE_LEN: usize = 30; -const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30; +const CHANNEL_QUEUE_LEN: usize = 30; -type WakeMap = Arc>>>; +type WakeMap = Arc>>>; #[derive(Clone)] -pub struct EventChannel { +pub struct EventChannelService { handle: Arc>, wakes: WakeMap, - unbind_sender: Sender, - unmask_sender: Sender, - task: Arc>, + process_flag: Arc, } pub struct BoundEventChannel { pub local_port: u32, - pub receiver: BroadcastReceiver, - unbind_sender: Sender, - pub unmask_sender: Sender, + pub receiver: Receiver, + pub service: EventChannelService, +} + +impl BoundEventChannel { + pub async fn unmask(&self) -> Result<()> { + self.service.unmask(self.local_port).await + } } impl Drop for BoundEventChannel { fn drop(&mut self) { - let _ = self.unbind_sender.try_send(self.local_port); + let service = self.service.clone(); + let port = self.local_port; + tokio::task::spawn(async move { + let _ = service.unbind(port).await; + }); } } -impl EventChannel { - pub async fn open() -> Result { - let file = OpenOptions::new() +impl EventChannelService { + pub async fn open() -> Result { + let handle = OpenOptions::new() .read(true) .write(true) - .open("/dev/xen/evtchn") + .open(EVENT_CHANNEL_DEVICE) .await?; - - let wakes = Arc::new(Mutex::new(HashMap::new())); - let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN); - let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN); - let task = { - let file = file.try_clone().await?; - let wakes = wakes.clone(); - tokio::task::spawn(async move { - if let Err(error) = - EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await - { - error!("event channel processor failed: {}", error); - } - }) + let wakes = Arc::new(RwLock::new(HashMap::new())); + let flag = Arc::new(AtomicBool::new(false)); + let processor = EventChannelProcessor { + flag: flag.clone(), + handle: handle.try_clone().await?.into_std().await, + wakes: wakes.clone(), }; - Ok(EventChannel { - handle: Arc::new(Mutex::new(file)), + processor.launch()?; + + Ok(EventChannelService { + handle: Arc::new(Mutex::new(handle)), wakes, - unbind_sender, - unmask_sender, - task: Arc::new(task), + process_flag: flag, }) } @@ -109,11 +105,29 @@ impl EventChannel { } } + pub async fn unmask(&self, port: u32) -> Result<()> { + let handle = self.handle.lock().await; + let mut port = port; + let result = unsafe { + libc::write( + handle.as_raw_fd(), + &mut port as *mut u32 as *mut c_void, + size_of::(), + ) + }; + if result != size_of::() as isize { + return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32))); + } + Ok(()) + } + pub async fn unbind(&self, port: u32) -> Result { let handle = self.handle.lock().await; unsafe { let mut request = UnbindPort { port }; - Ok(sys::unbind(handle.as_raw_fd(), &mut request)? as u32) + let result = sys::unbind(handle.as_raw_fd(), &mut request)? as u32; + self.wakes.write().await.remove(&port); + Ok(result) } } @@ -132,95 +146,65 @@ impl EventChannel { pub async fn bind(&self, domid: u32, port: u32) -> Result { let local_port = self.bind_interdomain(domid, port).await?; - let (receiver, unmask_sender) = self.subscribe(local_port).await?; + let receiver = self.subscribe(local_port).await?; let bound = BoundEventChannel { local_port, receiver, - unbind_sender: self.unbind_sender.clone(), - unmask_sender, + service: self.clone(), }; Ok(bound) } - pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver, Sender)> { - let mut wakes = self.wakes.lock().await; + pub async fn subscribe(&self, port: u32) -> Result> { + let mut wakes = self.wakes.write().await; let receiver = match wakes.entry(port) { - Entry::Occupied(entry) => entry.get().subscribe(), + Entry::Occupied(_) => { + return Err(Error::PortInUse); + } Entry::Vacant(entry) => { - let (sender, receiver) = broadcast_channel::(BROADCAST_CHANNEL_QUEUE_LEN); + let (sender, receiver) = channel::(CHANNEL_QUEUE_LEN); entry.insert(sender); receiver } }; - Ok((receiver, self.unmask_sender.clone())) + Ok(receiver) } +} - async fn process( - mut file: File, - wakers: WakeMap, - mut unmask_receiver: Receiver, - mut unbind_receiver: Receiver, - ) -> Result<()> { - loop { - select! { - result = file.read_u32_le() => { - match result { - Ok(port) => { - if let Some(sender) = wakers.lock().await.get(&port) { - if let Err(error) = sender.send(port) { - return Err(Error::WakeSend(error)); - } - } - } +pub struct EventChannelProcessor { + flag: Arc, + handle: std::fs::File, + wakes: WakeMap, +} - Err(error) => return Err(Error::Io(error)) - } +impl EventChannelProcessor { + pub fn launch(mut self) -> Result<()> { + std::thread::spawn(move || { + while let Err(error) = self.process() { + if self.flag.load(Ordering::Acquire) { + break; } - - result = unmask_receiver.recv() => { - match result { - Some(port) => { - unsafe { - let mut port = port; - let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::()); - if result != size_of::() as isize { - return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32))); - } - } - } - - None => { - break; - } - } - } - - result = unbind_receiver.recv() => { - match result { - Some(port) => { - unsafe { - let mut request = UnbindPort { port }; - sys::unbind(file.as_raw_fd(), &mut request)?; - } - } - - None => { - break; - } - } - } - }; - } - + warn!("failed to process event channel notifications: {}", error); + } + }); Ok(()) } -} -impl Drop for EventChannel { - fn drop(&mut self) { - if Arc::strong_count(&self.task) <= 1 { - self.task.abort(); + pub fn process(&mut self) -> Result<()> { + loop { + let port = self.handle.read_u32::()?; + if let Some(wake) = self.wakes.blocking_read().get(&port) { + let _ = wake.try_send(port); + } + } + } +} + +impl Drop for EventChannelService { + fn drop(&mut self) { + if Arc::strong_count(&self.handle) <= 1 { + self.process_flag.store(true, Ordering::Release); } } } diff --git a/crates/xen/xenevtchn/src/raw.rs b/crates/xen/xenevtchn/src/raw.rs new file mode 100644 index 0000000..763b712 --- /dev/null +++ b/crates/xen/xenevtchn/src/raw.rs @@ -0,0 +1,84 @@ +use std::fs::{File, OpenOptions}; +use std::os::fd::AsRawFd; +use std::sync::{Arc, Mutex}; + +use byteorder::{LittleEndian, ReadBytesExt}; + +use crate::error::{Error, Result}; +use crate::sys; + +pub const EVENT_CHANNEL_DEVICE: &str = "/dev/xen/evtchn"; + +#[derive(Clone)] +pub struct RawEventChannelService { + handle: Arc>, +} + +impl RawEventChannelService { + pub fn open() -> Result { + let handle = OpenOptions::new() + .read(true) + .write(true) + .open(EVENT_CHANNEL_DEVICE)?; + let handle = Arc::new(Mutex::new(handle)); + Ok(RawEventChannelService { handle }) + } + + pub fn from_handle(handle: File) -> Result { + Ok(RawEventChannelService { + handle: Arc::new(Mutex::new(handle)), + }) + } + + pub fn bind_virq(&self, virq: u32) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + let mut request = sys::BindVirq { virq }; + Ok(unsafe { sys::bind_virq(handle.as_raw_fd(), &mut request)? as u32 }) + } + + pub fn bind_interdomain(&self, domid: u32, port: u32) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + let mut request = sys::BindInterdomain { + remote_domain: domid, + remote_port: port, + }; + Ok(unsafe { sys::bind_interdomain(handle.as_raw_fd(), &mut request)? as u32 }) + } + + pub fn bind_unbound_port(&self, domid: u32) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + let mut request = sys::BindUnboundPort { + remote_domain: domid, + }; + Ok(unsafe { sys::bind_unbound_port(handle.as_raw_fd(), &mut request)? as u32 }) + } + + pub fn unbind(&self, port: u32) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + let mut request = sys::UnbindPort { port }; + Ok(unsafe { sys::unbind(handle.as_raw_fd(), &mut request)? as u32 }) + } + + pub fn notify(&self, port: u32) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + let mut request = sys::Notify { port }; + Ok(unsafe { sys::notify(handle.as_raw_fd(), &mut request)? as u32 }) + } + + pub fn reset(&self) -> Result { + let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + Ok(unsafe { sys::reset(handle.as_raw_fd())? as u32 }) + } + + pub fn pending(&self) -> Result { + let mut handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?; + Ok(handle.read_u32::()?) + } + + pub fn into_handle(self) -> Result { + Arc::into_inner(self.handle) + .ok_or(Error::LockAcquireFailed)? + .into_inner() + .map_err(|_| Error::LockAcquireFailed) + } +} diff --git a/crates/xen/xenstore/src/bus.rs b/crates/xen/xenstore/src/bus.rs index 1d994dc..af481a1 100644 --- a/crates/xen/xenstore/src/bus.rs +++ b/crates/xen/xenstore/src/bus.rs @@ -203,8 +203,7 @@ impl XsdSocketProcessor { let mut header_buffer: Vec = vec![0u8; XsdMessageHeader::SIZE]; let mut buffer: Vec = vec![0u8; XEN_BUS_MAX_PACKET_SIZE - XsdMessageHeader::SIZE]; loop { - let message = - XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, read)?; + let message = XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, read)?; rx_sender.blocking_send(message)?; } }