fix(idm): reimplement packet processing algorithm (#330)

* chore(xen): rewrite event channel code

* fix(idm): repair idm bugs on the file backend
This commit is contained in:
Alex Zenla
2024-08-13 16:18:27 -07:00
committed by GitHub
parent ffc9dcc0ea
commit 1cf03a460e
16 changed files with 273 additions and 171 deletions

View File

@ -1,82 +1,78 @@
pub mod error;
pub mod raw;
pub mod sys;
use crate::error::{Error, Result};
use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort};
use log::error;
use crate::raw::EVENT_CHANNEL_DEVICE;
use byteorder::{LittleEndian, ReadBytesExt};
use log::warn;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::size_of;
use std::os::fd::AsRawFd;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncReadExt;
use tokio::select;
use tokio::sync::broadcast::{
channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender,
};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::sync::{Mutex, RwLock};
const UNBIND_CHANNEL_QUEUE_LEN: usize = 30;
const UNMASK_CHANNEL_QUEUE_LEN: usize = 30;
const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30;
const CHANNEL_QUEUE_LEN: usize = 30;
type WakeMap = Arc<Mutex<HashMap<u32, BroadastSender<u32>>>>;
type WakeMap = Arc<RwLock<HashMap<u32, Sender<u32>>>>;
#[derive(Clone)]
pub struct EventChannel {
pub struct EventChannelService {
handle: Arc<Mutex<File>>,
wakes: WakeMap,
unbind_sender: Sender<u32>,
unmask_sender: Sender<u32>,
task: Arc<JoinHandle<()>>,
process_flag: Arc<AtomicBool>,
}
pub struct BoundEventChannel {
pub local_port: u32,
pub receiver: BroadcastReceiver<u32>,
unbind_sender: Sender<u32>,
pub unmask_sender: Sender<u32>,
pub receiver: Receiver<u32>,
pub service: EventChannelService,
}
impl BoundEventChannel {
pub async fn unmask(&self) -> Result<()> {
self.service.unmask(self.local_port).await
}
}
impl Drop for BoundEventChannel {
fn drop(&mut self) {
let _ = self.unbind_sender.try_send(self.local_port);
let service = self.service.clone();
let port = self.local_port;
tokio::task::spawn(async move {
let _ = service.unbind(port).await;
});
}
}
impl EventChannel {
pub async fn open() -> Result<EventChannel> {
let file = OpenOptions::new()
impl EventChannelService {
pub async fn open() -> Result<EventChannelService> {
let handle = OpenOptions::new()
.read(true)
.write(true)
.open("/dev/xen/evtchn")
.open(EVENT_CHANNEL_DEVICE)
.await?;
let wakes = Arc::new(Mutex::new(HashMap::new()));
let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN);
let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN);
let task = {
let file = file.try_clone().await?;
let wakes = wakes.clone();
tokio::task::spawn(async move {
if let Err(error) =
EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await
{
error!("event channel processor failed: {}", error);
}
})
let wakes = Arc::new(RwLock::new(HashMap::new()));
let flag = Arc::new(AtomicBool::new(false));
let processor = EventChannelProcessor {
flag: flag.clone(),
handle: handle.try_clone().await?.into_std().await,
wakes: wakes.clone(),
};
Ok(EventChannel {
handle: Arc::new(Mutex::new(file)),
processor.launch()?;
Ok(EventChannelService {
handle: Arc::new(Mutex::new(handle)),
wakes,
unbind_sender,
unmask_sender,
task: Arc::new(task),
process_flag: flag,
})
}
@ -109,11 +105,29 @@ impl EventChannel {
}
}
pub async fn unmask(&self, port: u32) -> Result<()> {
let handle = self.handle.lock().await;
let mut port = port;
let result = unsafe {
libc::write(
handle.as_raw_fd(),
&mut port as *mut u32 as *mut c_void,
size_of::<u32>(),
)
};
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
Ok(())
}
pub async fn unbind(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().await;
unsafe {
let mut request = UnbindPort { port };
Ok(sys::unbind(handle.as_raw_fd(), &mut request)? as u32)
let result = sys::unbind(handle.as_raw_fd(), &mut request)? as u32;
self.wakes.write().await.remove(&port);
Ok(result)
}
}
@ -132,95 +146,65 @@ impl EventChannel {
pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
let local_port = self.bind_interdomain(domid, port).await?;
let (receiver, unmask_sender) = self.subscribe(local_port).await?;
let receiver = self.subscribe(local_port).await?;
let bound = BoundEventChannel {
local_port,
receiver,
unbind_sender: self.unbind_sender.clone(),
unmask_sender,
service: self.clone(),
};
Ok(bound)
}
pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver<u32>, Sender<u32>)> {
let mut wakes = self.wakes.lock().await;
pub async fn subscribe(&self, port: u32) -> Result<Receiver<u32>> {
let mut wakes = self.wakes.write().await;
let receiver = match wakes.entry(port) {
Entry::Occupied(entry) => entry.get().subscribe(),
Entry::Occupied(_) => {
return Err(Error::PortInUse);
}
Entry::Vacant(entry) => {
let (sender, receiver) = broadcast_channel::<u32>(BROADCAST_CHANNEL_QUEUE_LEN);
let (sender, receiver) = channel::<u32>(CHANNEL_QUEUE_LEN);
entry.insert(sender);
receiver
}
};
Ok((receiver, self.unmask_sender.clone()))
Ok(receiver)
}
}
async fn process(
mut file: File,
wakers: WakeMap,
mut unmask_receiver: Receiver<u32>,
mut unbind_receiver: Receiver<u32>,
) -> Result<()> {
loop {
select! {
result = file.read_u32_le() => {
match result {
Ok(port) => {
if let Some(sender) = wakers.lock().await.get(&port) {
if let Err(error) = sender.send(port) {
return Err(Error::WakeSend(error));
}
}
}
pub struct EventChannelProcessor {
flag: Arc<AtomicBool>,
handle: std::fs::File,
wakes: WakeMap,
}
Err(error) => return Err(Error::Io(error))
}
impl EventChannelProcessor {
pub fn launch(mut self) -> Result<()> {
std::thread::spawn(move || {
while let Err(error) = self.process() {
if self.flag.load(Ordering::Acquire) {
break;
}
result = unmask_receiver.recv() => {
match result {
Some(port) => {
unsafe {
let mut port = port;
let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::<u32>());
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
}
}
None => {
break;
}
}
}
result = unbind_receiver.recv() => {
match result {
Some(port) => {
unsafe {
let mut request = UnbindPort { port };
sys::unbind(file.as_raw_fd(), &mut request)?;
}
}
None => {
break;
}
}
}
};
}
warn!("failed to process event channel notifications: {}", error);
}
});
Ok(())
}
}
impl Drop for EventChannel {
fn drop(&mut self) {
if Arc::strong_count(&self.task) <= 1 {
self.task.abort();
pub fn process(&mut self) -> Result<()> {
loop {
let port = self.handle.read_u32::<LittleEndian>()?;
if let Some(wake) = self.wakes.blocking_read().get(&port) {
let _ = wake.try_send(port);
}
}
}
}
impl Drop for EventChannelService {
fn drop(&mut self) {
if Arc::strong_count(&self.handle) <= 1 {
self.process_flag.store(true, Ordering::Release);
}
}
}