From 182401371b86d6344c7c2603297365176b7b253c Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Thu, 7 Mar 2024 04:14:25 -0800 Subject: [PATCH] xenstore: watch support (#4) --- Cargo.toml | 2 +- daemon/src/runtime/mod.rs | 8 +- guest/src/background.rs | 2 +- libs/xen/xenclient/src/error.rs | 8 +- libs/xen/xenclient/src/lib.rs | 10 +- libs/xen/xenstore/Cargo.toml | 11 +- libs/xen/xenstore/examples/list.rs | 43 ++-- libs/xen/xenstore/examples/watch.rs | 23 ++ libs/xen/xenstore/src/bus.rs | 387 ++++++++++++++++++++++------ libs/xen/xenstore/src/client.rs | 160 +++++++----- libs/xen/xenstore/src/error.rs | 27 +- libs/xen/xenstore/src/sys.rs | 36 ++- network/src/autonet.rs | 8 +- 13 files changed, 526 insertions(+), 199 deletions(-) create mode 100644 libs/xen/xenstore/examples/watch.rs diff --git a/Cargo.toml b/Cargo.toml index 6896203..a8fd4a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ thiserror = "1.0" log = "0.4.20" libc = "0.2" nix = "0.28.0" -bytemuck = "1.14.0" +byteorder = "1" slice-copy = "0.3.0" memchr = "2" xz2 = "0.1" diff --git a/daemon/src/runtime/mod.rs b/daemon/src/runtime/mod.rs index 80ff13c..c93d942 100644 --- a/daemon/src/runtime/mod.rs +++ b/daemon/src/runtime/mod.rs @@ -77,12 +77,16 @@ impl RuntimeContext { } fn detect_guest_file(store: &str, name: &str) -> Result { - let path = PathBuf::from(format!("{}/{}", store, name)); + let mut path = PathBuf::from(format!("{}/guest/{}", store, name)); if path.is_file() { return path_as_string(&path); } - Ok(format!("/usr/share/krata/guest/{}", name)) + path = PathBuf::from(format!("/usr/share/krata/guest/{}", name)); + if path.is_file() { + return path_as_string(&path); + } + Err(anyhow!("unable to find required guest file: {}", name)) } pub async fn list(&mut self) -> Result> { diff --git a/guest/src/background.rs b/guest/src/background.rs index 4ed4d11..c42e2f1 100644 --- a/guest/src/background.rs +++ b/guest/src/background.rs @@ -41,7 +41,7 @@ impl ContainerBackground { } async fn death(&mut self, code: c_int) -> Result<()> { - let mut store = XsdClient::open().await?; + let store = XsdClient::open().await?; store .write_string("krata/guest/exit-code", &code.to_string()) .await?; diff --git a/libs/xen/xenclient/src/error.rs b/libs/xen/xenclient/src/error.rs index e186b0b..fa5215e 100644 --- a/libs/xen/xenclient/src/error.rs +++ b/libs/xen/xenclient/src/error.rs @@ -2,11 +2,11 @@ use std::io; #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("io issue encountered")] + #[error("io issue encountered: {0}")] Io(#[from] io::Error), - #[error("xenstore issue encountered")] + #[error("xenstore issue encountered: {0}")] XenStore(#[from] xenstore::error::Error), - #[error("xencall issue encountered")] + #[error("xencall issue encountered: {0}")] XenCall(#[from] xencall::error::Error), #[error("domain does not have a tty")] TtyNotFound, @@ -18,7 +18,7 @@ pub enum Error { PathParentNotFound, #[error("domain does not exist")] DomainNonExistent, - #[error("elf parse failed")] + #[error("elf parse failed: {0}")] ElfParseFailed(#[from] elf::ParseError), #[error("mmap failed")] MmapFailed, diff --git a/libs/xen/xenclient/src/lib.rs b/libs/xen/xenclient/src/lib.rs index dcd5031..ee80012 100644 --- a/libs/xen/xenclient/src/lib.rs +++ b/libs/xen/xenclient/src/lib.rs @@ -145,7 +145,7 @@ impl XenClient { }]; { - let mut tx = self.store.transaction().await?; + let tx = self.store.transaction().await?; tx.rm(dom_path.as_str()).await?; tx.mknod(dom_path.as_str(), ro_perm).await?; @@ -250,7 +250,7 @@ impl XenClient { } { - let mut tx = self.store.transaction().await?; + let tx = self.store.transaction().await?; tx.write_string(format!("{}/image/os_type", vm_path).as_str(), "linux") .await?; tx.write_string( @@ -641,7 +641,7 @@ impl XenClient { }, ]; - let mut tx = self.store.transaction().await?; + let tx = self.store.transaction().await?; tx.mknod(&frontend_path, frontend_perms).await?; for (p, value) in &frontend_items { let path = format!("{}/{}", frontend_path, *p); @@ -706,7 +706,7 @@ impl XenClient { for backend in &backend_paths { let state_path = format!("{}/state", backend); let online_path = format!("{}/online", backend); - let mut tx = self.store.transaction().await?; + let tx = self.store.transaction().await?; let state = tx.read_string(&state_path).await?.unwrap_or(String::new()); if state.is_empty() { break; @@ -735,7 +735,7 @@ impl XenClient { } } - let mut tx = self.store.transaction().await?; + let tx = self.store.transaction().await?; let mut backend_removals: Vec = Vec::new(); backend_removals.extend_from_slice(backend_paths.as_slice()); if let Some(backend) = console_backend_path { diff --git a/libs/xen/xenstore/Cargo.toml b/libs/xen/xenstore/Cargo.toml index f31ad92..0f706ba 100644 --- a/libs/xen/xenstore/Cargo.toml +++ b/libs/xen/xenstore/Cargo.toml @@ -13,14 +13,15 @@ libc = { workspace = true } log = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } +byteorder = { workspace = true } [dev-dependencies] -futures = { workspace = true } - -[dependencies.bytemuck] -workspace = true -features = ["derive"] +env_logger = { workspace = true } [[example]] name = "xenstore-ls" path = "examples/list.rs" + +[[example]] +name = "xenstore-watch" +path = "examples/watch.rs" diff --git a/libs/xen/xenstore/examples/list.rs b/libs/xen/xenstore/examples/list.rs index c0ed6e0..09df815 100644 --- a/libs/xen/xenstore/examples/list.rs +++ b/libs/xen/xenstore/examples/list.rs @@ -1,32 +1,35 @@ -use futures::executor::block_on; +use std::env::args; + use xenstore::client::{XsdClient, XsdInterface}; use xenstore::error::Result; -use xenstore::sys::XSD_ERROR_EINVAL; -fn list_recursive(client: &mut XsdClient, level: usize, path: &str) -> Result<()> { - let children = match block_on(client.list(path)) { - Ok(children) => children, - Err(error) => { - return if error.to_string() == XSD_ERROR_EINVAL.error { - Ok(()) - } else { - Err(error) - } +async fn list_recursive(client: &XsdClient, path: &str) -> Result<()> { + let mut pending = vec![path.to_string()]; + + while let Some(ref path) = pending.pop() { + let children = client.list(path).await?; + for child in children { + let full = format!("{}/{}", if path == "/" { "" } else { path }, child); + let value = client + .read_string(full.as_str()) + .await? + .expect("expected value"); + println!("{} = {:?}", full, value,); + pending.push(full); } - }; - - for child in children { - let full = format!("{}/{}", if path == "/" { "" } else { path }, child); - let value = block_on(client.read_string(full.as_str()))?.expect("expected value"); - println!("{}{} = {:?}", " ".repeat(level), child, value,); - list_recursive(client, level + 1, full.as_str())?; } Ok(()) } #[tokio::main] async fn main() -> Result<()> { - let mut client = XsdClient::open().await?; - list_recursive(&mut client, 0, "/")?; + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let client = XsdClient::open().await?; + loop { + list_recursive(&client, "/").await?; + if args().nth(1).unwrap_or("none".to_string()) != "stress" { + break; + } + } Ok(()) } diff --git a/libs/xen/xenstore/examples/watch.rs b/libs/xen/xenstore/examples/watch.rs new file mode 100644 index 0000000..cf1f3a2 --- /dev/null +++ b/libs/xen/xenstore/examples/watch.rs @@ -0,0 +1,23 @@ +use std::env::args; +use xenstore::client::XsdClient; +use xenstore::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let path = args().nth(1).unwrap_or("/local/domain".to_string()); + let client = XsdClient::open().await?; + let mut handle = client.watch(&path).await?; + let mut count = 0; + loop { + let Some(event) = handle.receiver.recv().await else { + break; + }; + println!("{}", event); + count += 1; + if count >= 3 { + break; + } + } + Ok(()) +} diff --git a/libs/xen/xenstore/src/bus.rs b/libs/xen/xenstore/src/bus.rs index 2f9ea51..2f1a19d 100644 --- a/libs/xen/xenstore/src/bus.rs +++ b/libs/xen/xenstore/src/bus.rs @@ -1,54 +1,326 @@ -use crate::error::{Error, Result}; -use crate::sys::{XsdMessageHeader, XSD_ERROR}; -use std::ffi::CString; -use std::fs::{metadata, File}; -use std::io::{Read, Write}; -use std::mem::size_of; +use std::{collections::HashMap, ffi::CString, io::ErrorKind, sync::Arc, time::Duration}; + +use libc::O_NONBLOCK; +use log::warn; +use tokio::{ + fs::{metadata, File}, + io::{unix::AsyncFd, AsyncReadExt, AsyncWriteExt}, + select, + sync::{ + mpsc::{channel, Receiver, Sender}, + oneshot::{self, channel as oneshot_channel}, + Mutex, + }, + task::JoinHandle, + time::timeout, +}; + +use crate::{ + error::{Error, Result}, + sys::{XsdMessageHeader, XSD_ERROR, XSD_UNWATCH, XSD_WATCH_EVENT}, +}; const XEN_BUS_PATHS: &[&str] = &["/dev/xen/xenbus"]; +const XEN_BUS_MAX_PAYLOAD_SIZE: usize = 4096; +const XEN_BUS_MAX_PACKET_SIZE: usize = XsdMessageHeader::SIZE + XEN_BUS_MAX_PAYLOAD_SIZE; -fn find_bus_path() -> Option { +async fn find_bus_path() -> Option<&'static str> { for path in XEN_BUS_PATHS { - match metadata(path) { - Ok(_) => return Some(String::from(*path)), + match metadata(path).await { + Ok(_) => return Some(path), Err(_) => continue, } } None } -pub struct XsdFileTransport { - handle: File, +struct WatchState { + sender: Sender, } -impl XsdFileTransport { - fn new(path: &str) -> Result { - let handle = File::options().read(true).write(true).open(path)?; - Ok(XsdFileTransport { handle }) +struct ReplyState { + sender: oneshot::Sender, +} + +type ReplyMap = Arc>>; +type WatchMap = Arc>>; + +#[derive(Clone)] +pub struct XsdSocket { + tx_sender: Sender, + replies: ReplyMap, + watches: WatchMap, + next_request_id: Arc>, + next_watch_id: Arc>, + processor_task: Arc>, + rx_task: Arc>, + unwatch_sender: Sender, +} + +impl XsdSocket { + pub async fn open() -> Result { + let path = match find_bus_path().await { + Some(path) => path, + None => return Err(Error::BusNotFound), + }; + + let file = File::options() + .read(true) + .write(true) + .custom_flags(O_NONBLOCK) + .open(path) + .await?; + XsdSocket::from_handle(file).await } - async fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()> { - Ok(self.handle.read_exact(buf)?) + pub async fn from_handle(handle: File) -> Result { + let replies: ReplyMap = Arc::new(Mutex::new(HashMap::new())); + let watches: WatchMap = Arc::new(Mutex::new(HashMap::new())); + + let next_request_id = Arc::new(Mutex::new(0u32)); + + let (rx_sender, rx_receiver) = channel::(10); + let (tx_sender, tx_receiver) = channel::(10); + let (unwatch_sender, unwatch_receiver) = channel::(1000); + let read: File = handle.try_clone().await?; + + let mut processor = XsdSocketProcessor { + handle, + replies: replies.clone(), + watches: watches.clone(), + next_request_id: next_request_id.clone(), + tx_receiver, + rx_receiver, + unwatch_receiver, + }; + + let processor_task = tokio::task::spawn(async move { + if let Err(error) = processor.process().await { + warn!("failed to process xen store messages: {}", error); + } + }); + + let rx_task = tokio::task::spawn(async move { + if let Err(error) = XsdSocketProcessor::process_rx(read, rx_sender).await { + warn!("failed to process xen store responses: {}", error); + } + }); + + Ok(XsdSocket { + tx_sender, + replies, + watches, + next_request_id, + next_watch_id: Arc::new(Mutex::new(0u32)), + processor_task: Arc::new(processor_task), + rx_task: Arc::new(rx_task), + unwatch_sender, + }) } - async fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()> { - self.handle.write_all(buf)?; - self.handle.flush()?; + pub async fn send_buf(&self, tx: u32, typ: u32, payload: &[u8]) -> Result { + let req = { + let mut guard = self.next_request_id.lock().await; + let req = *guard; + *guard = req + 1; + req + }; + let (sender, receiver) = oneshot_channel::(); + self.replies.lock().await.insert(req, ReplyState { sender }); + + let header = XsdMessageHeader { + typ, + req, + tx, + len: payload.len() as u32, + }; + let message = XsdMessage { + header, + payload: payload.to_vec(), + }; + if let Err(error) = self.tx_sender.try_send(message) { + return Err(error.into()); + } + let reply = receiver.await?; + if reply.header.typ == XSD_ERROR { + let error = CString::from_vec_with_nul(reply.payload)?; + return Err(Error::ResponseError(error.into_string()?)); + } + Ok(reply) + } + + pub async fn send(&self, tx: u32, typ: u32, payload: &[&str]) -> Result { + let mut buf: Vec = Vec::new(); + for item in payload { + buf.extend_from_slice(item.as_bytes()); + buf.push(0); + } + self.send_buf(tx, typ, &buf).await + } + + pub async fn add_watch(&self) -> Result<(u32, Receiver, Sender)> { + let id = { + let mut guard = self.next_watch_id.lock().await; + let req = *guard; + *guard = req + 1; + req + }; + let (sender, receiver) = channel(10); + self.watches.lock().await.insert(id, WatchState { sender }); + Ok((id, receiver, self.unwatch_sender.clone())) + } +} + +struct XsdSocketProcessor { + handle: File, + replies: ReplyMap, + watches: WatchMap, + next_request_id: Arc>, + tx_receiver: Receiver, + rx_receiver: Receiver, + unwatch_receiver: Receiver, +} + +impl XsdSocketProcessor { + async fn process_rx(read: File, rx_sender: Sender) -> Result<()> { + let mut buffer: Vec = vec![0u8; XEN_BUS_MAX_PACKET_SIZE]; + let mut fd = AsyncFd::new(read)?; + loop { + select! { + x = fd.readable_mut() => match x { + Ok(mut guard) => { + let future = XsdSocketProcessor::read_message(&mut buffer, guard.get_inner_mut()); + if let Ok(message) = timeout(Duration::from_secs(1), future).await { + rx_sender.send(message?).await?; + } + }, + + Err(error) => { + return Err(error.into()); + } + }, + + _ = rx_sender.closed() => { + break; + } + }; + } + Ok(()) + } + + async fn read_message(buffer: &mut [u8], read: &mut File) -> Result { + let size = loop { + match read.read(buffer).await { + Ok(size) => break size, + Err(error) => { + if error.kind() == ErrorKind::WouldBlock { + tokio::task::yield_now().await; + continue; + } + return Err(error.into()); + } + }; + }; + + if size < XsdMessageHeader::SIZE { + return Err(Error::InvalidBusData); + } + + let header = XsdMessageHeader::decode(&buffer[0..XsdMessageHeader::SIZE])?; + if size < XsdMessageHeader::SIZE + header.len as usize { + return Err(Error::InvalidBusData); + } + let payload = + &mut buffer[XsdMessageHeader::SIZE..XsdMessageHeader::SIZE + header.len as usize]; + Ok(XsdMessage { + header, + payload: payload.to_vec(), + }) + } + + async fn process(&mut self) -> Result<()> { + loop { + select! { + x = self.tx_receiver.recv() => match x { + Some(message) => { + let mut composed: Vec = Vec::new(); + message.header.encode_to(&mut composed)?; + composed.extend_from_slice(&message.payload); + self.handle.write_all(&composed).await?; + } + + None => { + break; + } + }, + + x = self.rx_receiver.recv() => match x { + Some(message) => { + if message.header.typ == XSD_WATCH_EVENT && message.header.req == 0 && message.header.tx == 0 { + let strings = message.parse_string_vec()?; + let Some(path) = strings.first() else { + return Ok(()); + }; + let Some(token) = strings.get(1) else { + return Ok(()); + }; + + let Ok(id) = token.parse::() else { + return Ok(()); + }; + + if let Some(state) = self.watches.lock().await.get(&id) { + let _ = state.sender.try_send(path.clone()); + } + } else if let Some(state) = self.replies.lock().await.remove(&message.header.req) { + let _ = state.sender.send(message); + } + } + + None => { + break; + } + }, + + x = self.unwatch_receiver.recv() => match x { + Some(id) => { + let req = { + let mut guard = self.next_request_id.lock().await; + let req = *guard; + *guard = req + 1; + req + }; + + let mut payload = id.to_string().as_bytes().to_vec(); + payload.push(0); + let header = XsdMessageHeader { + typ: XSD_UNWATCH, + req, + tx: 0, + len: payload.len() as u32, + }; + let mut data = header.encode()?; + data.extend_from_slice(&payload); + self.handle.write_all(&data).await?; + }, + + None => { + break; + } + } + }; + } Ok(()) } } -pub struct XsdSocket { - handle: XsdFileTransport, -} - #[derive(Debug)] -pub struct XsdResponse { +pub struct XsdMessage { pub header: XsdMessageHeader, pub payload: Vec, } -impl XsdResponse { +impl XsdMessage { pub fn parse_string(&self) -> Result { Ok(CString::from_vec_with_nul(self.payload.clone())?.into_string()?) } @@ -73,65 +345,14 @@ impl XsdResponse { } } -impl XsdSocket { - pub async fn open() -> Result { - let path = match find_bus_path() { - Some(path) => path, - None => return Err(Error::BusNotFound), - }; - let transport = XsdFileTransport::new(&path)?; - Ok(XsdSocket { handle: transport }) - } - - pub async fn send(&mut self, tx: u32, typ: u32, buf: &[u8]) -> Result { - let header = XsdMessageHeader { - typ, - req: 0, - tx, - len: buf.len() as u32, - }; - let header_bytes = bytemuck::bytes_of(&header); - let mut composed: Vec = Vec::new(); - composed.extend_from_slice(header_bytes); - composed.extend_from_slice(buf); - self.handle.xsd_write_all(&composed).await?; - let mut result_buf = vec![0u8; size_of::()]; - match self.handle.xsd_read_exact(result_buf.as_mut_slice()).await { - Ok(_) => {} - Err(error) => { - if result_buf.first().unwrap() == &0 { - return Err(error); - } - } +impl Drop for XsdSocket { + fn drop(&mut self) { + if Arc::strong_count(&self.rx_task) <= 1 { + self.rx_task.abort(); } - let result_header = bytemuck::from_bytes::(&result_buf); - let mut payload = vec![0u8; result_header.len as usize]; - self.handle.xsd_read_exact(payload.as_mut_slice()).await?; - if result_header.typ == XSD_ERROR { - let error = CString::from_vec_with_nul(payload)?; - return Err(Error::ResponseError(error.into_string()?)); - } - let response = XsdResponse { header, payload }; - Ok(response) - } - pub async fn send_single(&mut self, tx: u32, typ: u32, string: &str) -> Result { - let text = CString::new(string)?; - let buf = text.as_bytes_with_nul(); - self.send(tx, typ, buf).await - } - - pub async fn send_multiple( - &mut self, - tx: u32, - typ: u32, - array: &[&str], - ) -> Result { - let mut buf: Vec = Vec::new(); - for item in array { - buf.extend_from_slice(item.as_bytes()); - buf.push(0); + if Arc::strong_count(&self.processor_task) <= 1 { + self.processor_task.abort(); } - self.send(tx, typ, buf.as_slice()).await } } diff --git a/libs/xen/xenstore/src/client.rs b/libs/xen/xenstore/src/client.rs index 58bb721..1964834 100644 --- a/libs/xen/xenstore/src/client.rs +++ b/libs/xen/xenstore/src/client.rs @@ -2,26 +2,29 @@ use crate::bus::XsdSocket; use crate::error::{Error, Result}; use crate::sys::{ XSD_DIRECTORY, XSD_GET_DOMAIN_PATH, XSD_INTRODUCE, XSD_MKDIR, XSD_READ, XSD_RM, XSD_SET_PERMS, - XSD_TRANSACTION_END, XSD_TRANSACTION_START, XSD_WRITE, + XSD_TRANSACTION_END, XSD_TRANSACTION_START, XSD_WATCH, XSD_WRITE, }; use log::trace; use std::ffi::CString; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; pub const XS_PERM_NONE: u32 = 0x00; pub const XS_PERM_READ: u32 = 0x01; pub const XS_PERM_WRITE: u32 = 0x02; pub const XS_PERM_READ_WRITE: u32 = XS_PERM_READ | XS_PERM_WRITE; -pub struct XsdClient { - pub socket: XsdSocket, -} - #[derive(Debug, Copy, Clone)] pub struct XsPermission { pub id: u32, pub perms: u32, } +#[derive(Clone)] +pub struct XsdClient { + pub socket: XsdSocket, +} + impl XsPermission { pub fn encode(&self) -> Result { let c = match self.perms { @@ -35,18 +38,30 @@ impl XsPermission { } } +pub struct XsdWatchHandle { + id: u32, + unwatch_sender: Sender, + pub receiver: Receiver, +} + +impl Drop for XsdWatchHandle { + fn drop(&mut self) { + let _ = self.unwatch_sender.try_send(self.id); + } +} + #[allow(async_fn_in_trait)] pub trait XsdInterface { - async fn list(&mut self, path: &str) -> Result>; - async fn read(&mut self, path: &str) -> Result>>; - async fn read_string(&mut self, path: &str) -> Result>; - async fn write(&mut self, path: &str, data: Vec) -> Result; - async fn write_string(&mut self, path: &str, data: &str) -> Result; - async fn mkdir(&mut self, path: &str) -> Result; - async fn rm(&mut self, path: &str) -> Result; - async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result; + async fn list(&self, path: &str) -> Result>; + async fn read(&self, path: &str) -> Result>>; + async fn read_string(&self, path: &str) -> Result>; + async fn write(&self, path: &str, data: Vec) -> Result; + async fn write_string(&self, path: &str, data: &str) -> Result; + async fn mkdir(&self, path: &str) -> Result; + async fn rm(&self, path: &str) -> Result; + async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result; - async fn mknod(&mut self, path: &str, perms: &[XsPermission]) -> Result { + async fn mknod(&self, path: &str, perms: &[XsPermission]) -> Result { let result1 = self.write_string(path, "").await?; let result2 = self.set_perms(path, perms).await?; Ok(result1 && result2) @@ -59,15 +74,23 @@ impl XsdClient { Ok(XsdClient { socket }) } - async fn list(&mut self, tx: u32, path: &str) -> Result> { + async fn list(&self, tx: u32, path: &str) -> Result> { trace!("list tx={tx} path={path}"); - let response = self.socket.send_single(tx, XSD_DIRECTORY, path).await?; + let response = match self.socket.send(tx, XSD_DIRECTORY, &[path]).await { + Ok(response) => response, + Err(error) => { + if error.is_noent_response() { + return Ok(vec![]); + } + return Err(error); + } + }; response.parse_string_vec() } - async fn read(&mut self, tx: u32, path: &str) -> Result>> { + async fn read(&self, tx: u32, path: &str) -> Result>> { trace!("read tx={tx} path={path}"); - match self.socket.send_single(tx, XSD_READ, path).await { + match self.socket.send(tx, XSD_READ, &[path]).await { Ok(response) => Ok(Some(response.payload)), Err(error) => { if error.is_noent_response() { @@ -79,27 +102,27 @@ impl XsdClient { } } - async fn write(&mut self, tx: u32, path: &str, data: Vec) -> Result { + async fn write(&self, tx: u32, path: &str, data: Vec) -> Result { trace!("write tx={tx} path={path} data={:?}", data); let mut buffer = Vec::new(); let path = CString::new(path)?; buffer.extend_from_slice(path.as_bytes_with_nul()); buffer.extend_from_slice(data.as_slice()); - let response = self.socket.send(tx, XSD_WRITE, buffer.as_slice()).await?; + let response = self + .socket + .send_buf(tx, XSD_WRITE, buffer.as_slice()) + .await?; response.parse_bool() } - async fn mkdir(&mut self, tx: u32, path: &str) -> Result { + async fn mkdir(&self, tx: u32, path: &str) -> Result { trace!("mkdir tx={tx} path={path}"); - self.socket - .send_single(tx, XSD_MKDIR, path) - .await? - .parse_bool() + self.socket.send(tx, XSD_MKDIR, &[path]).await?.parse_bool() } - async fn rm(&mut self, tx: u32, path: &str) -> Result { + async fn rm(&self, tx: u32, path: &str) -> Result { trace!("rm tx={tx} path={path}"); - let result = self.socket.send_single(tx, XSD_RM, path).await; + let result = self.socket.send(tx, XSD_RM, &[path]).await; if let Err(error) = result { if error.is_noent_response() { return Ok(true); @@ -109,7 +132,7 @@ impl XsdClient { result.unwrap().parse_bool() } - async fn set_perms(&mut self, tx: u32, path: &str, perms: &[XsPermission]) -> Result { + async fn set_perms(&self, tx: u32, path: &str, perms: &[XsPermission]) -> Result { trace!("set_perms tx={tx} path={path} perms={:?}", perms); let mut items: Vec = Vec::new(); items.push(path.to_string()); @@ -117,28 +140,25 @@ impl XsdClient { items.push(perm.encode()?); } let items_str: Vec<&str> = items.iter().map(|x| x.as_str()).collect(); - let response = self - .socket - .send_multiple(tx, XSD_SET_PERMS, &items_str) - .await?; + let response = self.socket.send(tx, XSD_SET_PERMS, &items_str).await?; response.parse_bool() } - pub async fn transaction(&mut self) -> Result { + pub async fn transaction(&self) -> Result { trace!("transaction start"); - let response = self - .socket - .send_single(0, XSD_TRANSACTION_START, "") - .await?; + let response = self.socket.send(0, XSD_TRANSACTION_START, &[""]).await?; let str = response.parse_string()?; let tx = str.parse::()?; - Ok(XsdTransaction { client: self, tx }) + Ok(XsdTransaction { + client: self.clone(), + tx, + }) } pub async fn get_domain_path(&mut self, domid: u32) -> Result { let response = self .socket - .send_single(0, XSD_GET_DOMAIN_PATH, domid.to_string().as_str()) + .send(0, XSD_GET_DOMAIN_PATH, &[&domid.to_string()]) .await?; response.parse_string() } @@ -147,7 +167,7 @@ impl XsdClient { trace!("introduce domain domid={domid} mfn={mfn} evtchn={evtchn}"); let response = self .socket - .send_multiple( + .send( 0, XSD_INTRODUCE, &[ @@ -159,23 +179,35 @@ impl XsdClient { .await?; response.parse_bool() } + + pub async fn watch(&self, path: &str) -> Result { + let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; + let id_string = id.to_string(); + let _ = self.socket.send(0, XSD_WATCH, &[path, &id_string]).await?; + Ok(XsdWatchHandle { + id, + receiver, + unwatch_sender, + }) + } } -pub struct XsdTransaction<'a> { - client: &'a mut XsdClient, +#[derive(Clone)] +pub struct XsdTransaction { + client: XsdClient, tx: u32, } impl XsdInterface for XsdClient { - async fn list(&mut self, path: &str) -> Result> { + async fn list(&self, path: &str) -> Result> { self.list(0, path).await } - async fn read(&mut self, path: &str) -> Result>> { + async fn read(&self, path: &str) -> Result>> { self.read(0, path).await } - async fn read_string(&mut self, path: &str) -> Result> { + async fn read_string(&self, path: &str) -> Result> { match self.read(0, path).await { Ok(value) => match value { Some(value) => Ok(Some(String::from_utf8(value)?)), @@ -185,37 +217,37 @@ impl XsdInterface for XsdClient { } } - async fn write(&mut self, path: &str, data: Vec) -> Result { + async fn write(&self, path: &str, data: Vec) -> Result { self.write(0, path, data).await } - async fn write_string(&mut self, path: &str, data: &str) -> Result { + async fn write_string(&self, path: &str, data: &str) -> Result { self.write(0, path, data.as_bytes().to_vec()).await } - async fn mkdir(&mut self, path: &str) -> Result { + async fn mkdir(&self, path: &str) -> Result { self.mkdir(0, path).await } - async fn rm(&mut self, path: &str) -> Result { + async fn rm(&self, path: &str) -> Result { self.rm(0, path).await } - async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { + async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result { self.set_perms(0, path, perms).await } } -impl XsdInterface for XsdTransaction<'_> { - async fn list(&mut self, path: &str) -> Result> { +impl XsdInterface for XsdTransaction { + async fn list(&self, path: &str) -> Result> { self.client.list(self.tx, path).await } - async fn read(&mut self, path: &str) -> Result>> { + async fn read(&self, path: &str) -> Result>> { self.client.read(self.tx, path).await } - async fn read_string(&mut self, path: &str) -> Result> { + async fn read_string(&self, path: &str) -> Result> { match self.client.read(self.tx, path).await { Ok(value) => match value { Some(value) => Ok(Some(String::from_utf8(value)?)), @@ -225,46 +257,46 @@ impl XsdInterface for XsdTransaction<'_> { } } - async fn write(&mut self, path: &str, data: Vec) -> Result { + async fn write(&self, path: &str, data: Vec) -> Result { self.client.write(self.tx, path, data).await } - async fn write_string(&mut self, path: &str, data: &str) -> Result { + async fn write_string(&self, path: &str, data: &str) -> Result { self.client .write(self.tx, path, data.as_bytes().to_vec()) .await } - async fn mkdir(&mut self, path: &str) -> Result { + async fn mkdir(&self, path: &str) -> Result { self.client.mkdir(self.tx, path).await } - async fn rm(&mut self, path: &str) -> Result { + async fn rm(&self, path: &str) -> Result { self.client.rm(self.tx, path).await } - async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { + async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result { self.client.set_perms(self.tx, path, perms).await } } -impl XsdTransaction<'_> { - pub async fn end(&mut self, abort: bool) -> Result { +impl XsdTransaction { + pub async fn end(&self, abort: bool) -> Result { let abort_str = if abort { "F" } else { "T" }; trace!("transaction end abort={}", abort); self.client .socket - .send_single(self.tx, XSD_TRANSACTION_END, abort_str) + .send(self.tx, XSD_TRANSACTION_END, &[abort_str]) .await? .parse_bool() } - pub async fn commit(&mut self) -> Result { + pub async fn commit(&self) -> Result { self.end(false).await } - pub async fn abort(&mut self) -> Result { + pub async fn abort(&self) -> Result { self.end(true).await } } diff --git a/libs/xen/xenstore/src/error.rs b/libs/xen/xenstore/src/error.rs index 14ae279..7bdb85a 100644 --- a/libs/xen/xenstore/src/error.rs +++ b/libs/xen/xenstore/src/error.rs @@ -4,21 +4,28 @@ use std::num::ParseIntError; use std::str::Utf8Error; use std::string::FromUtf8Error; +use tokio::sync::mpsc::error::{SendError, TrySendError}; +use tokio::sync::oneshot::error::RecvError; + +use crate::bus::XsdMessage; + #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("io issue encountered")] + #[error("io issue encountered: {0}")] Io(#[from] io::Error), - #[error("utf8 string decode failed")] + #[error("invalid data received on bus")] + InvalidBusData, + #[error("utf8 string decode failed: {0}")] Utf8DecodeString(#[from] FromUtf8Error), - #[error("utf8 str decode failed")] + #[error("utf8 str decode failed: {0}")] Utf8DecodeStr(#[from] Utf8Error), - #[error("unable to decode cstring as utf8")] + #[error("unable to decode cstring as utf8: {0}")] Utf8DecodeCstring(#[from] IntoStringError), - #[error("nul byte found in string")] + #[error("nul byte found in string: {0}")] NulByteFoundString(#[from] NulError), - #[error("unable to find nul byte in vec")] + #[error("unable to find nul byte in vec: {0}")] VecNulByteNotFound(#[from] FromVecWithNulError), - #[error("unable to parse integer")] + #[error("unable to parse integer: {0}")] ParseInt(#[from] ParseIntError), #[error("bus was not found on any available path")] BusNotFound, @@ -26,6 +33,12 @@ pub enum Error { ResponseError(String), #[error("invalid permissions provided")] InvalidPermissions, + #[error("failed to receive reply: {0}")] + ReceiverError(#[from] RecvError), + #[error("failed to send request: {0}")] + SendError(#[from] SendError), + #[error("failed to send request: {0}")] + TrySendError(#[from] TrySendError), } impl Error { diff --git a/libs/xen/xenstore/src/sys.rs b/libs/xen/xenstore/src/sys.rs index 27e398a..e1a1a8b 100644 --- a/libs/xen/xenstore/src/sys.rs +++ b/libs/xen/xenstore/src/sys.rs @@ -1,10 +1,12 @@ /// Handwritten protocol definitions for XenStore. /// Used xen/include/public/io/xs_wire.h as a reference. -use bytemuck::{Pod, Zeroable}; use libc; -#[derive(Copy, Clone, Pod, Zeroable, Debug)] -#[repr(C)] +use crate::error::Result; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use std::io::Cursor; + +#[derive(Copy, Clone, Debug)] pub struct XsdMessageHeader { pub typ: u32, pub req: u32, @@ -12,6 +14,34 @@ pub struct XsdMessageHeader { pub len: u32, } +impl XsdMessageHeader { + pub const SIZE: usize = 16; + + pub fn decode(bytes: &[u8]) -> Result { + let mut cursor = Cursor::new(bytes); + Ok(XsdMessageHeader { + typ: cursor.read_u32::()?, + req: cursor.read_u32::()?, + tx: cursor.read_u32::()?, + len: cursor.read_u32::()?, + }) + } + + pub fn encode_to(&self, buffer: &mut Vec) -> Result<()> { + buffer.write_u32::(self.typ)?; + buffer.write_u32::(self.req)?; + buffer.write_u32::(self.tx)?; + buffer.write_u32::(self.len)?; + Ok(()) + } + + pub fn encode(&self) -> Result> { + let mut buffer = Vec::with_capacity(XsdMessageHeader::SIZE); + self.encode_to(&mut buffer)?; + Ok(buffer) + } +} + pub const XSD_CONTROL: u32 = 0; pub const XSD_DIRECTORY: u32 = 1; pub const XSD_READ: u32 = 2; diff --git a/network/src/autonet.rs b/network/src/autonet.rs index bd5e7b5..04826bf 100644 --- a/network/src/autonet.rs +++ b/network/src/autonet.rs @@ -46,7 +46,7 @@ impl AutoNetworkCollector { pub async fn read(&mut self) -> Result> { let mut networks = Vec::new(); - let mut tx = self.client.transaction().await?; + let tx = self.client.transaction().await?; for domid_string in tx.list("/local/domain").await? { let Ok(domid) = domid_string.parse::() else { continue; @@ -63,13 +63,13 @@ impl AutoNetworkCollector { }; let Ok(guest) = - AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "guest").await + AutoNetworkCollector::read_network_side(uuid, &tx, &dom_path, "guest").await else { continue; }; let Ok(gateway) = - AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "gateway").await + AutoNetworkCollector::read_network_side(uuid, &tx, &dom_path, "gateway").await else { continue; }; @@ -87,7 +87,7 @@ impl AutoNetworkCollector { async fn read_network_side( uuid: Uuid, - tx: &mut XsdTransaction<'_>, + tx: &XsdTransaction, dom_path: &str, side: &str, ) -> Result {