chore(xen): rewrite event channel code

This commit is contained in:
Alex Zenla 2024-08-13 14:59:43 -07:00
parent ffc9dcc0ea
commit 621ae536f6
No known key found for this signature in database
GPG Key ID: 067B238899B51269
10 changed files with 203 additions and 137 deletions

1
Cargo.lock generated
View File

@ -1527,6 +1527,7 @@ dependencies = [
name = "krata-xenevtchn" name = "krata-xenevtchn"
version = "0.0.15" version = "0.0.15"
dependencies = [ dependencies = [
"byteorder",
"libc", "libc",
"log", "log",
"nix 0.29.0", "nix 0.29.0",

View File

@ -8,14 +8,11 @@ use anyhow::{anyhow, Result};
use log::{debug, error}; use log::{debug, error};
use tokio::{ use tokio::{
select, select,
sync::{ sync::mpsc::{channel, Receiver, Sender},
broadcast,
mpsc::{channel, Receiver, Sender},
},
task::JoinHandle, task::JoinHandle,
time::sleep, time::sleep,
}; };
use xenevtchn::EventChannel; use xenevtchn::EventChannelService;
use xengnt::{sys::GrantRef, GrantTab, MappedMemory}; use xengnt::{sys::GrantRef, GrantTab, MappedMemory};
use xenstore::{XsdClient, XsdInterface}; use xenstore::{XsdClient, XsdInterface};
@ -43,7 +40,7 @@ pub struct ChannelService {
typ: String, typ: String,
use_reserved_ref: Option<u64>, use_reserved_ref: Option<u64>,
backends: HashMap<u32, ChannelBackend>, backends: HashMap<u32, ChannelBackend>,
evtchn: EventChannel, evtchn: EventChannelService,
store: XsdClient, store: XsdClient,
gnttab: GrantTab, gnttab: GrantTab,
input_receiver: Receiver<(u32, Vec<u8>)>, input_receiver: Receiver<(u32, Vec<u8>)>,
@ -64,7 +61,7 @@ impl ChannelService {
let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
debug!("opening Xen event channel"); debug!("opening Xen event channel");
let evtchn = EventChannel::open().await?; let evtchn = EventChannelService::open().await?;
debug!("opening XenStore"); debug!("opening XenStore");
let store = XsdClient::open().await?; let store = XsdClient::open().await?;
debug!("opening GrantTab"); debug!("opening GrantTab");
@ -234,7 +231,7 @@ impl ChannelBackend {
domid: u32, domid: u32,
id: u32, id: u32,
store: XsdClient, store: XsdClient,
evtchn: EventChannel, evtchn: EventChannelService,
gnttab: GrantTab, gnttab: GrantTab,
output_sender: Sender<(u32, Option<Vec<u8>>)>, output_sender: Sender<(u32, Option<Vec<u8>>)>,
use_reserved_ref: Option<u64>, use_reserved_ref: Option<u64>,
@ -273,7 +270,7 @@ pub struct KrataChannelBackendProcessor {
id: u32, id: u32,
domid: u32, domid: u32,
store: XsdClient, store: XsdClient,
evtchn: EventChannel, evtchn: EventChannelService,
gnttab: GrantTab, gnttab: GrantTab,
} }
@ -492,25 +489,18 @@ impl KrataChannelBackendProcessor {
}, },
x = channel.receiver.recv() => match x { x = channel.receiver.recv() => match x {
Ok(_) => { Some(_) => {
unsafe { unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?; let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() { if !buffer.is_empty() {
sender.send((self.domid, Some(buffer))).await?; sender.send((self.domid, Some(buffer))).await?;
} }
}; };
channel.unmask_sender.send(channel.local_port).await?; channel.unmask().await?;
}, },
Err(error) => { None => {
match error { break;
broadcast::error::RecvError::Closed => {
break;
},
error => {
return Err(anyhow!("failed to receive event notification: {}", error));
}
}
} }
} }
}; };

View File

@ -7,7 +7,7 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use ipnetwork::{Ipv4Network, Ipv6Network}; use ipnetwork::{Ipv4Network, Ipv6Network};
use log::{debug, error, trace}; use log::{debug, error};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
use xenstore::{XsdClient, XsdInterface}; use xenstore::{XsdClient, XsdInterface};

View File

@ -167,7 +167,10 @@ impl PowerManagementContext {
.set_cpufreq_gov(CpuId::All, policy) .set_cpufreq_gov(CpuId::All, policy)
.await .await
.unwrap_or_else(|error| { .unwrap_or_else(|error| {
info!("non-fatal error while setting scheduler policy: {:?}", error); info!(
"non-fatal error while setting scheduler policy: {:?}",
error
);
}); });
Ok(()) Ok(())
} }

View File

@ -9,6 +9,7 @@ edition = "2021"
resolver = "2" resolver = "2"
[dependencies] [dependencies]
byteorder = { workspace = true }
libc = { workspace = true } libc = { workspace = true }
log = { workspace = true } log = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }

View File

@ -1,9 +1,9 @@
use xenevtchn::error::Result; use xenevtchn::error::Result;
use xenevtchn::EventChannel; use xenevtchn::EventChannelService;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let channel = EventChannel::open().await?; let channel = EventChannelService::open().await?;
println!("channel opened"); println!("channel opened");
let port = channel.bind_unbound_port(0).await?; let port = channel.bind_unbound_port(0).await?;
println!("port: {}", port); println!("port: {}", port);

View File

@ -8,6 +8,10 @@ pub enum Error {
Io(#[from] io::Error), Io(#[from] io::Error),
#[error("failed to send event channel wake: {0}")] #[error("failed to send event channel wake: {0}")]
WakeSend(tokio::sync::broadcast::error::SendError<u32>), WakeSend(tokio::sync::broadcast::error::SendError<u32>),
#[error("failed to acquire lock")]
LockAcquireFailed,
#[error("event port already in use")]
PortInUse,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@ -1,82 +1,78 @@
pub mod error; pub mod error;
pub mod raw;
pub mod sys; pub mod sys;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}; use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort};
use log::error; use crate::raw::EVENT_CHANNEL_DEVICE;
use byteorder::{LittleEndian, ReadBytesExt};
use log::warn;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem::size_of; use std::mem::size_of;
use std::os::fd::AsRawFd; use std::os::fd::AsRawFd;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::select;
use tokio::sync::broadcast::{
channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender,
};
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
const UNBIND_CHANNEL_QUEUE_LEN: usize = 30; const CHANNEL_QUEUE_LEN: usize = 30;
const UNMASK_CHANNEL_QUEUE_LEN: usize = 30;
const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30;
type WakeMap = Arc<Mutex<HashMap<u32, BroadastSender<u32>>>>; type WakeMap = Arc<RwLock<HashMap<u32, Sender<u32>>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct EventChannel { pub struct EventChannelService {
handle: Arc<Mutex<File>>, handle: Arc<Mutex<File>>,
wakes: WakeMap, wakes: WakeMap,
unbind_sender: Sender<u32>, process_flag: Arc<AtomicBool>,
unmask_sender: Sender<u32>,
task: Arc<JoinHandle<()>>,
} }
pub struct BoundEventChannel { pub struct BoundEventChannel {
pub local_port: u32, pub local_port: u32,
pub receiver: BroadcastReceiver<u32>, pub receiver: Receiver<u32>,
unbind_sender: Sender<u32>, pub service: EventChannelService,
pub unmask_sender: Sender<u32>, }
impl BoundEventChannel {
pub async fn unmask(&self) -> Result<()> {
self.service.unmask(self.local_port).await
}
} }
impl Drop for BoundEventChannel { impl Drop for BoundEventChannel {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.unbind_sender.try_send(self.local_port); let service = self.service.clone();
let port = self.local_port;
tokio::task::spawn(async move {
let _ = service.unbind(port).await;
});
} }
} }
impl EventChannel { impl EventChannelService {
pub async fn open() -> Result<EventChannel> { pub async fn open() -> Result<EventChannelService> {
let file = OpenOptions::new() let handle = OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.open("/dev/xen/evtchn") .open(EVENT_CHANNEL_DEVICE)
.await?; .await?;
let wakes = Arc::new(RwLock::new(HashMap::new()));
let wakes = Arc::new(Mutex::new(HashMap::new())); let flag = Arc::new(AtomicBool::new(false));
let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN); let processor = EventChannelProcessor {
let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN); flag: flag.clone(),
let task = { handle: handle.try_clone().await?.into_std().await,
let file = file.try_clone().await?; wakes: wakes.clone(),
let wakes = wakes.clone();
tokio::task::spawn(async move {
if let Err(error) =
EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await
{
error!("event channel processor failed: {}", error);
}
})
}; };
Ok(EventChannel { processor.launch()?;
handle: Arc::new(Mutex::new(file)),
Ok(EventChannelService {
handle: Arc::new(Mutex::new(handle)),
wakes, wakes,
unbind_sender, process_flag: flag,
unmask_sender,
task: Arc::new(task),
}) })
} }
@ -109,11 +105,29 @@ impl EventChannel {
} }
} }
pub async fn unmask(&self, port: u32) -> Result<()> {
let handle = self.handle.lock().await;
let mut port = port;
let result = unsafe {
libc::write(
handle.as_raw_fd(),
&mut port as *mut u32 as *mut c_void,
size_of::<u32>(),
)
};
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
Ok(())
}
pub async fn unbind(&self, port: u32) -> Result<u32> { pub async fn unbind(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().await; let handle = self.handle.lock().await;
unsafe { unsafe {
let mut request = UnbindPort { port }; let mut request = UnbindPort { port };
Ok(sys::unbind(handle.as_raw_fd(), &mut request)? as u32) let result = sys::unbind(handle.as_raw_fd(), &mut request)? as u32;
self.wakes.write().await.remove(&port);
Ok(result)
} }
} }
@ -132,95 +146,65 @@ impl EventChannel {
pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> { pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
let local_port = self.bind_interdomain(domid, port).await?; let local_port = self.bind_interdomain(domid, port).await?;
let (receiver, unmask_sender) = self.subscribe(local_port).await?; let receiver = self.subscribe(local_port).await?;
let bound = BoundEventChannel { let bound = BoundEventChannel {
local_port, local_port,
receiver, receiver,
unbind_sender: self.unbind_sender.clone(), service: self.clone(),
unmask_sender,
}; };
Ok(bound) Ok(bound)
} }
pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver<u32>, Sender<u32>)> { pub async fn subscribe(&self, port: u32) -> Result<Receiver<u32>> {
let mut wakes = self.wakes.lock().await; let mut wakes = self.wakes.write().await;
let receiver = match wakes.entry(port) { let receiver = match wakes.entry(port) {
Entry::Occupied(entry) => entry.get().subscribe(), Entry::Occupied(_) => {
return Err(Error::PortInUse);
}
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let (sender, receiver) = broadcast_channel::<u32>(BROADCAST_CHANNEL_QUEUE_LEN); let (sender, receiver) = channel::<u32>(CHANNEL_QUEUE_LEN);
entry.insert(sender); entry.insert(sender);
receiver receiver
} }
}; };
Ok((receiver, self.unmask_sender.clone())) Ok(receiver)
} }
}
async fn process( pub struct EventChannelProcessor {
mut file: File, flag: Arc<AtomicBool>,
wakers: WakeMap, handle: std::fs::File,
mut unmask_receiver: Receiver<u32>, wakes: WakeMap,
mut unbind_receiver: Receiver<u32>, }
) -> Result<()> {
loop {
select! {
result = file.read_u32_le() => {
match result {
Ok(port) => {
if let Some(sender) = wakers.lock().await.get(&port) {
if let Err(error) = sender.send(port) {
return Err(Error::WakeSend(error));
}
}
}
Err(error) => return Err(Error::Io(error)) impl EventChannelProcessor {
} pub fn launch(mut self) -> Result<()> {
std::thread::spawn(move || {
while let Err(error) = self.process() {
if self.flag.load(Ordering::Acquire) {
break;
} }
warn!("failed to process event channel notifications: {}", error);
result = unmask_receiver.recv() => { }
match result { });
Some(port) => {
unsafe {
let mut port = port;
let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::<u32>());
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
}
}
None => {
break;
}
}
}
result = unbind_receiver.recv() => {
match result {
Some(port) => {
unsafe {
let mut request = UnbindPort { port };
sys::unbind(file.as_raw_fd(), &mut request)?;
}
}
None => {
break;
}
}
}
};
}
Ok(()) Ok(())
} }
}
impl Drop for EventChannel { pub fn process(&mut self) -> Result<()> {
fn drop(&mut self) { loop {
if Arc::strong_count(&self.task) <= 1 { let port = self.handle.read_u32::<LittleEndian>()?;
self.task.abort(); if let Some(wake) = self.wakes.blocking_read().get(&port) {
let _ = wake.try_send(port);
}
}
}
}
impl Drop for EventChannelService {
fn drop(&mut self) {
if Arc::strong_count(&self.handle) <= 1 {
self.process_flag.store(true, Ordering::Release);
} }
} }
} }

View File

@ -0,0 +1,84 @@
use std::fs::{File, OpenOptions};
use std::os::fd::AsRawFd;
use std::sync::{Arc, Mutex};
use byteorder::{LittleEndian, ReadBytesExt};
use crate::error::{Error, Result};
use crate::sys;
pub const EVENT_CHANNEL_DEVICE: &str = "/dev/xen/evtchn";
#[derive(Clone)]
pub struct RawEventChannelService {
handle: Arc<Mutex<File>>,
}
impl RawEventChannelService {
pub fn open() -> Result<RawEventChannelService> {
let handle = OpenOptions::new()
.read(true)
.write(true)
.open(EVENT_CHANNEL_DEVICE)?;
let handle = Arc::new(Mutex::new(handle));
Ok(RawEventChannelService { handle })
}
pub fn from_handle(handle: File) -> Result<RawEventChannelService> {
Ok(RawEventChannelService {
handle: Arc::new(Mutex::new(handle)),
})
}
pub fn bind_virq(&self, virq: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindVirq { virq };
Ok(unsafe { sys::bind_virq(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn bind_interdomain(&self, domid: u32, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindInterdomain {
remote_domain: domid,
remote_port: port,
};
Ok(unsafe { sys::bind_interdomain(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn bind_unbound_port(&self, domid: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindUnboundPort {
remote_domain: domid,
};
Ok(unsafe { sys::bind_unbound_port(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn unbind(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::UnbindPort { port };
Ok(unsafe { sys::unbind(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn notify(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::Notify { port };
Ok(unsafe { sys::notify(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn reset(&self) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
Ok(unsafe { sys::reset(handle.as_raw_fd())? as u32 })
}
pub fn pending(&self) -> Result<u32> {
let mut handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
Ok(handle.read_u32::<LittleEndian>()?)
}
pub fn into_handle(self) -> Result<File> {
Arc::into_inner(self.handle)
.ok_or(Error::LockAcquireFailed)?
.into_inner()
.map_err(|_| Error::LockAcquireFailed)
}
}

View File

@ -203,8 +203,7 @@ impl XsdSocketProcessor {
let mut header_buffer: Vec<u8> = vec![0u8; XsdMessageHeader::SIZE]; let mut header_buffer: Vec<u8> = vec![0u8; XsdMessageHeader::SIZE];
let mut buffer: Vec<u8> = vec![0u8; XEN_BUS_MAX_PACKET_SIZE - XsdMessageHeader::SIZE]; let mut buffer: Vec<u8> = vec![0u8; XEN_BUS_MAX_PACKET_SIZE - XsdMessageHeader::SIZE];
loop { loop {
let message = let message = XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, read)?;
XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, read)?;
rx_sender.blocking_send(message)?; rx_sender.blocking_send(message)?;
} }
} }