mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
krata: begin work on guest message channel
This commit is contained in:
@ -5,6 +5,7 @@ edition = "2021"
|
||||
resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
libc = { workspace = true }
|
||||
log = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
nix = { workspace = true, features = ["ioctl"] }
|
||||
|
@ -2,11 +2,11 @@ use std::io;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("kernel error")]
|
||||
#[error("kernel error: {0}")]
|
||||
Kernel(#[from] nix::errno::Errno),
|
||||
#[error("io issue encountered")]
|
||||
#[error("io issue encountered: {0}")]
|
||||
Io(#[from] io::Error),
|
||||
#[error("failed to send event channel wake")]
|
||||
#[error("failed to send event channel wake: {0}")]
|
||||
WakeSend(tokio::sync::broadcast::error::SendError<u32>),
|
||||
}
|
||||
|
||||
|
@ -7,10 +7,12 @@ use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}
|
||||
use log::error;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::mem::size_of;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::raw::c_void;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::select;
|
||||
use tokio::sync::broadcast::{
|
||||
channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender,
|
||||
@ -19,6 +21,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const UNBIND_CHANNEL_QUEUE_LEN: usize = 30;
|
||||
const UNMASK_CHANNEL_QUEUE_LEN: usize = 30;
|
||||
const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30;
|
||||
|
||||
@ -28,10 +31,24 @@ type WakeMap = Arc<Mutex<HashMap<u32, BroadastSender<u32>>>>;
|
||||
pub struct EventChannel {
|
||||
handle: Arc<Mutex<File>>,
|
||||
wakes: WakeMap,
|
||||
unbind_sender: Sender<u32>,
|
||||
unmask_sender: Sender<u32>,
|
||||
task: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
pub struct BoundEventChannel {
|
||||
pub local_port: u32,
|
||||
pub receiver: BroadcastReceiver<u32>,
|
||||
unbind_sender: Sender<u32>,
|
||||
pub unmask_sender: Sender<u32>,
|
||||
}
|
||||
|
||||
impl Drop for BoundEventChannel {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.unbind_sender.try_send(self.local_port);
|
||||
}
|
||||
}
|
||||
|
||||
impl EventChannel {
|
||||
pub async fn open() -> Result<EventChannel> {
|
||||
let file = OpenOptions::new()
|
||||
@ -41,12 +58,15 @@ impl EventChannel {
|
||||
.await?;
|
||||
|
||||
let wakes = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN);
|
||||
let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN);
|
||||
let task = {
|
||||
let file = file.try_clone().await?;
|
||||
let wakes = wakes.clone();
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(error) = EventChannel::process(file, wakes, unmask_receiver).await {
|
||||
if let Err(error) =
|
||||
EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await
|
||||
{
|
||||
error!("event channel processor failed: {}", error);
|
||||
}
|
||||
})
|
||||
@ -54,6 +74,7 @@ impl EventChannel {
|
||||
Ok(EventChannel {
|
||||
handle: Arc::new(Mutex::new(file)),
|
||||
wakes,
|
||||
unbind_sender,
|
||||
unmask_sender,
|
||||
task: Arc::new(task),
|
||||
})
|
||||
@ -109,6 +130,18 @@ impl EventChannel {
|
||||
unsafe { Ok(sys::reset(handle.as_raw_fd())? as u32) }
|
||||
}
|
||||
|
||||
pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
|
||||
let local_port = self.bind_interdomain(domid, port).await?;
|
||||
let (receiver, unmask_sender) = self.subscribe(local_port).await?;
|
||||
let bound = BoundEventChannel {
|
||||
local_port,
|
||||
receiver,
|
||||
unbind_sender: self.unbind_sender.clone(),
|
||||
unmask_sender,
|
||||
};
|
||||
Ok(bound)
|
||||
}
|
||||
|
||||
pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver<u32>, Sender<u32>)> {
|
||||
let mut wakes = self.wakes.lock().await;
|
||||
let receiver = match wakes.entry(port) {
|
||||
@ -127,6 +160,7 @@ impl EventChannel {
|
||||
mut file: File,
|
||||
wakers: WakeMap,
|
||||
mut unmask_receiver: Receiver<u32>,
|
||||
mut unbind_receiver: Receiver<u32>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
select! {
|
||||
@ -147,7 +181,28 @@ impl EventChannel {
|
||||
result = unmask_receiver.recv() => {
|
||||
match result {
|
||||
Some(port) => {
|
||||
file.write_u32_le(port).await?;
|
||||
unsafe {
|
||||
let mut port = port;
|
||||
let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::<u32>());
|
||||
if result != size_of::<u32>() as isize {
|
||||
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result = unbind_receiver.recv() => {
|
||||
match result {
|
||||
Some(port) => {
|
||||
unsafe {
|
||||
let mut request = UnbindPort { port };
|
||||
sys::unbind(file.as_raw_fd(), &mut request)?;
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
|
Reference in New Issue
Block a user