From b0bd931f0ef481dafc790b22525913af6f74d105 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Fri, 8 Mar 2024 13:08:59 +0000 Subject: [PATCH] xenstore: use AsRef for path and restructure --- crates/krataguest/src/background.rs | 2 +- crates/kratanet/src/autonet.rs | 2 +- crates/kratart/src/launch/mod.rs | 2 +- crates/kratart/src/lib.rs | 2 +- crates/xen/xenclient/src/lib.rs | 2 +- crates/xen/xenstore/examples/list.rs | 2 +- crates/xen/xenstore/examples/watch.rs | 2 +- crates/xen/xenstore/src/client.rs | 302 ------------------------ crates/xen/xenstore/src/lib.rs | 315 +++++++++++++++++++++++++- 9 files changed, 321 insertions(+), 310 deletions(-) delete mode 100644 crates/xen/xenstore/src/client.rs diff --git a/crates/krataguest/src/background.rs b/crates/krataguest/src/background.rs index c42e2f1..3f5e261 100644 --- a/crates/krataguest/src/background.rs +++ b/crates/krataguest/src/background.rs @@ -4,7 +4,7 @@ use crate::childwait::{ChildEvent, ChildWait}; use anyhow::Result; use nix::{libc::c_int, unistd::Pid}; use tokio::{select, time::sleep}; -use xenstore::client::{XsdClient, XsdInterface}; +use xenstore::{XsdClient, XsdInterface}; pub struct ContainerBackground { child: Pid, diff --git a/crates/kratanet/src/autonet.rs b/crates/kratanet/src/autonet.rs index 04826bf..f9adfd4 100644 --- a/crates/kratanet/src/autonet.rs +++ b/crates/kratanet/src/autonet.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use smoltcp::wire::{EthernetAddress, Ipv4Cidr, Ipv6Cidr}; use std::{collections::HashMap, str::FromStr}; use uuid::Uuid; -use xenstore::client::{XsdClient, XsdInterface, XsdTransaction}; +use xenstore::{XsdClient, XsdInterface, XsdTransaction}; pub struct AutoNetworkCollector { client: XsdClient, diff --git a/crates/kratart/src/launch/mod.rs b/crates/kratart/src/launch/mod.rs index f1077ad..940ba4a 100644 --- a/crates/kratart/src/launch/mod.rs +++ b/crates/kratart/src/launch/mod.rs @@ -9,7 +9,7 @@ use krata::launchcfg::{ }; use uuid::Uuid; use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; -use xenstore::client::XsdInterface; +use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; use crate::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}; diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index c93d942..7518140 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -11,7 +11,7 @@ use loopdev::LoopControl; use tokio::sync::Mutex; use uuid::Uuid; use xenclient::XenClient; -use xenstore::client::{XsdClient, XsdInterface}; +use xenstore::{XsdClient, XsdInterface}; use self::{ autoloop::AutoLoop, diff --git a/crates/xen/xenclient/src/lib.rs b/crates/xen/xenclient/src/lib.rs index ee80012..6a720e3 100644 --- a/crates/xen/xenclient/src/lib.rs +++ b/crates/xen/xenclient/src/lib.rs @@ -19,7 +19,7 @@ use std::time::Duration; use uuid::Uuid; use xencall::sys::CreateDomain; use xencall::XenCall; -use xenstore::client::{ +use xenstore::{ XsPermission, XsdClient, XsdInterface, XS_PERM_NONE, XS_PERM_READ, XS_PERM_READ_WRITE, }; diff --git a/crates/xen/xenstore/examples/list.rs b/crates/xen/xenstore/examples/list.rs index 09df815..b18826c 100644 --- a/crates/xen/xenstore/examples/list.rs +++ b/crates/xen/xenstore/examples/list.rs @@ -1,7 +1,7 @@ use std::env::args; -use xenstore::client::{XsdClient, XsdInterface}; use xenstore::error::Result; +use xenstore::{XsdClient, XsdInterface}; async fn list_recursive(client: &XsdClient, path: &str) -> Result<()> { let mut pending = vec![path.to_string()]; diff --git a/crates/xen/xenstore/examples/watch.rs b/crates/xen/xenstore/examples/watch.rs index cf1f3a2..08e2d15 100644 --- a/crates/xen/xenstore/examples/watch.rs +++ b/crates/xen/xenstore/examples/watch.rs @@ -1,6 +1,6 @@ use std::env::args; -use xenstore::client::XsdClient; use xenstore::error::Result; +use xenstore::XsdClient; #[tokio::main] async fn main() -> Result<()> { diff --git a/crates/xen/xenstore/src/client.rs b/crates/xen/xenstore/src/client.rs deleted file mode 100644 index 1964834..0000000 --- a/crates/xen/xenstore/src/client.rs +++ /dev/null @@ -1,302 +0,0 @@ -use crate::bus::XsdSocket; -use crate::error::{Error, Result}; -use crate::sys::{ - XSD_DIRECTORY, XSD_GET_DOMAIN_PATH, XSD_INTRODUCE, XSD_MKDIR, XSD_READ, XSD_RM, XSD_SET_PERMS, - XSD_TRANSACTION_END, XSD_TRANSACTION_START, XSD_WATCH, XSD_WRITE, -}; -use log::trace; -use std::ffi::CString; -use tokio::sync::mpsc::Receiver; -use tokio::sync::mpsc::Sender; - -pub const XS_PERM_NONE: u32 = 0x00; -pub const XS_PERM_READ: u32 = 0x01; -pub const XS_PERM_WRITE: u32 = 0x02; -pub const XS_PERM_READ_WRITE: u32 = XS_PERM_READ | XS_PERM_WRITE; - -#[derive(Debug, Copy, Clone)] -pub struct XsPermission { - pub id: u32, - pub perms: u32, -} - -#[derive(Clone)] -pub struct XsdClient { - pub socket: XsdSocket, -} - -impl XsPermission { - pub fn encode(&self) -> Result { - let c = match self.perms { - XS_PERM_READ_WRITE => 'b', - XS_PERM_WRITE => 'w', - XS_PERM_READ => 'r', - XS_PERM_NONE => 'n', - _ => return Err(Error::InvalidPermissions), - }; - Ok(format!("{}{}", c, self.id)) - } -} - -pub struct XsdWatchHandle { - id: u32, - unwatch_sender: Sender, - pub receiver: Receiver, -} - -impl Drop for XsdWatchHandle { - fn drop(&mut self) { - let _ = self.unwatch_sender.try_send(self.id); - } -} - -#[allow(async_fn_in_trait)] -pub trait XsdInterface { - async fn list(&self, path: &str) -> Result>; - async fn read(&self, path: &str) -> Result>>; - async fn read_string(&self, path: &str) -> Result>; - async fn write(&self, path: &str, data: Vec) -> Result; - async fn write_string(&self, path: &str, data: &str) -> Result; - async fn mkdir(&self, path: &str) -> Result; - async fn rm(&self, path: &str) -> Result; - async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result; - - async fn mknod(&self, path: &str, perms: &[XsPermission]) -> Result { - let result1 = self.write_string(path, "").await?; - let result2 = self.set_perms(path, perms).await?; - Ok(result1 && result2) - } -} - -impl XsdClient { - pub async fn open() -> Result { - let socket = XsdSocket::open().await?; - Ok(XsdClient { socket }) - } - - async fn list(&self, tx: u32, path: &str) -> Result> { - trace!("list tx={tx} path={path}"); - let response = match self.socket.send(tx, XSD_DIRECTORY, &[path]).await { - Ok(response) => response, - Err(error) => { - if error.is_noent_response() { - return Ok(vec![]); - } - return Err(error); - } - }; - response.parse_string_vec() - } - - async fn read(&self, tx: u32, path: &str) -> Result>> { - trace!("read tx={tx} path={path}"); - match self.socket.send(tx, XSD_READ, &[path]).await { - Ok(response) => Ok(Some(response.payload)), - Err(error) => { - if error.is_noent_response() { - Ok(None) - } else { - Err(error) - } - } - } - } - - async fn write(&self, tx: u32, path: &str, data: Vec) -> Result { - trace!("write tx={tx} path={path} data={:?}", data); - let mut buffer = Vec::new(); - let path = CString::new(path)?; - buffer.extend_from_slice(path.as_bytes_with_nul()); - buffer.extend_from_slice(data.as_slice()); - let response = self - .socket - .send_buf(tx, XSD_WRITE, buffer.as_slice()) - .await?; - response.parse_bool() - } - - async fn mkdir(&self, tx: u32, path: &str) -> Result { - trace!("mkdir tx={tx} path={path}"); - self.socket.send(tx, XSD_MKDIR, &[path]).await?.parse_bool() - } - - async fn rm(&self, tx: u32, path: &str) -> Result { - trace!("rm tx={tx} path={path}"); - let result = self.socket.send(tx, XSD_RM, &[path]).await; - if let Err(error) = result { - if error.is_noent_response() { - return Ok(true); - } - return Err(error); - } - result.unwrap().parse_bool() - } - - async fn set_perms(&self, tx: u32, path: &str, perms: &[XsPermission]) -> Result { - trace!("set_perms tx={tx} path={path} perms={:?}", perms); - let mut items: Vec = Vec::new(); - items.push(path.to_string()); - for perm in perms { - items.push(perm.encode()?); - } - let items_str: Vec<&str> = items.iter().map(|x| x.as_str()).collect(); - let response = self.socket.send(tx, XSD_SET_PERMS, &items_str).await?; - response.parse_bool() - } - - pub async fn transaction(&self) -> Result { - trace!("transaction start"); - let response = self.socket.send(0, XSD_TRANSACTION_START, &[""]).await?; - let str = response.parse_string()?; - let tx = str.parse::()?; - Ok(XsdTransaction { - client: self.clone(), - tx, - }) - } - - pub async fn get_domain_path(&mut self, domid: u32) -> Result { - let response = self - .socket - .send(0, XSD_GET_DOMAIN_PATH, &[&domid.to_string()]) - .await?; - response.parse_string() - } - - pub async fn introduce_domain(&mut self, domid: u32, mfn: u64, evtchn: u32) -> Result { - trace!("introduce domain domid={domid} mfn={mfn} evtchn={evtchn}"); - let response = self - .socket - .send( - 0, - XSD_INTRODUCE, - &[ - domid.to_string().as_str(), - mfn.to_string().as_str(), - evtchn.to_string().as_str(), - ], - ) - .await?; - response.parse_bool() - } - - pub async fn watch(&self, path: &str) -> Result { - let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; - let id_string = id.to_string(); - let _ = self.socket.send(0, XSD_WATCH, &[path, &id_string]).await?; - Ok(XsdWatchHandle { - id, - receiver, - unwatch_sender, - }) - } -} - -#[derive(Clone)] -pub struct XsdTransaction { - client: XsdClient, - tx: u32, -} - -impl XsdInterface for XsdClient { - async fn list(&self, path: &str) -> Result> { - self.list(0, path).await - } - - async fn read(&self, path: &str) -> Result>> { - self.read(0, path).await - } - - async fn read_string(&self, path: &str) -> Result> { - match self.read(0, path).await { - Ok(value) => match value { - Some(value) => Ok(Some(String::from_utf8(value)?)), - None => Ok(None), - }, - Err(error) => Err(error), - } - } - - async fn write(&self, path: &str, data: Vec) -> Result { - self.write(0, path, data).await - } - - async fn write_string(&self, path: &str, data: &str) -> Result { - self.write(0, path, data.as_bytes().to_vec()).await - } - - async fn mkdir(&self, path: &str) -> Result { - self.mkdir(0, path).await - } - - async fn rm(&self, path: &str) -> Result { - self.rm(0, path).await - } - - async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result { - self.set_perms(0, path, perms).await - } -} - -impl XsdInterface for XsdTransaction { - async fn list(&self, path: &str) -> Result> { - self.client.list(self.tx, path).await - } - - async fn read(&self, path: &str) -> Result>> { - self.client.read(self.tx, path).await - } - - async fn read_string(&self, path: &str) -> Result> { - match self.client.read(self.tx, path).await { - Ok(value) => match value { - Some(value) => Ok(Some(String::from_utf8(value)?)), - None => Ok(None), - }, - Err(error) => Err(error), - } - } - - async fn write(&self, path: &str, data: Vec) -> Result { - self.client.write(self.tx, path, data).await - } - - async fn write_string(&self, path: &str, data: &str) -> Result { - self.client - .write(self.tx, path, data.as_bytes().to_vec()) - .await - } - - async fn mkdir(&self, path: &str) -> Result { - self.client.mkdir(self.tx, path).await - } - - async fn rm(&self, path: &str) -> Result { - self.client.rm(self.tx, path).await - } - - async fn set_perms(&self, path: &str, perms: &[XsPermission]) -> Result { - self.client.set_perms(self.tx, path, perms).await - } -} - -impl XsdTransaction { - pub async fn end(&self, abort: bool) -> Result { - let abort_str = if abort { "F" } else { "T" }; - - trace!("transaction end abort={}", abort); - self.client - .socket - .send(self.tx, XSD_TRANSACTION_END, &[abort_str]) - .await? - .parse_bool() - } - - pub async fn commit(&self) -> Result { - self.end(false).await - } - - pub async fn abort(&self) -> Result { - self.end(true).await - } -} diff --git a/crates/xen/xenstore/src/lib.rs b/crates/xen/xenstore/src/lib.rs index 6daf7f0..5c962cf 100644 --- a/crates/xen/xenstore/src/lib.rs +++ b/crates/xen/xenstore/src/lib.rs @@ -1,4 +1,317 @@ pub mod bus; -pub mod client; pub mod error; pub mod sys; + +use crate::bus::XsdSocket; +use crate::error::{Error, Result}; +use crate::sys::{ + XSD_DIRECTORY, XSD_GET_DOMAIN_PATH, XSD_INTRODUCE, XSD_MKDIR, XSD_READ, XSD_RM, XSD_SET_PERMS, + XSD_TRANSACTION_END, XSD_TRANSACTION_START, XSD_WATCH, XSD_WRITE, +}; +use log::trace; +use std::ffi::CString; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; + +pub const XS_PERM_NONE: u32 = 0x00; +pub const XS_PERM_READ: u32 = 0x01; +pub const XS_PERM_WRITE: u32 = 0x02; +pub const XS_PERM_READ_WRITE: u32 = XS_PERM_READ | XS_PERM_WRITE; + +#[derive(Debug, Copy, Clone)] +pub struct XsPermission { + pub id: u32, + pub perms: u32, +} + +#[derive(Clone)] +pub struct XsdClient { + pub socket: XsdSocket, +} + +impl XsPermission { + pub fn encode(&self) -> Result { + let c = match self.perms { + XS_PERM_READ_WRITE => 'b', + XS_PERM_WRITE => 'w', + XS_PERM_READ => 'r', + XS_PERM_NONE => 'n', + _ => return Err(Error::InvalidPermissions), + }; + Ok(format!("{}{}", c, self.id)) + } +} + +pub struct XsdWatchHandle { + id: u32, + unwatch_sender: Sender, + pub receiver: Receiver, +} + +impl Drop for XsdWatchHandle { + fn drop(&mut self) { + let _ = self.unwatch_sender.try_send(self.id); + } +} + +#[allow(async_fn_in_trait)] +pub trait XsdInterface { + async fn list>(&self, path: P) -> Result>; + async fn read>(&self, path: P) -> Result>>; + async fn read_string>(&self, path: P) -> Result>; + async fn write>(&self, path: P, data: Vec) -> Result; + async fn write_string>(&self, path: P, data: &str) -> Result; + async fn mkdir>(&self, path: P) -> Result; + async fn rm>(&self, path: P) -> Result; + async fn set_perms>(&self, path: P, perms: &[XsPermission]) -> Result; + + async fn mknod>(&self, path: P, perms: &[XsPermission]) -> Result { + let result1 = self.write_string(path.as_ref(), "").await?; + let result2 = self.set_perms(path.as_ref(), perms).await?; + Ok(result1 && result2) + } +} + +impl XsdClient { + pub async fn open() -> Result { + let socket = XsdSocket::open().await?; + Ok(XsdClient { socket }) + } + + async fn list>(&self, tx: u32, path: P) -> Result> { + trace!("list tx={tx} path={}", path.as_ref()); + let response = match self.socket.send(tx, XSD_DIRECTORY, &[path.as_ref()]).await { + Ok(response) => response, + Err(error) => { + if error.is_noent_response() { + return Ok(vec![]); + } + return Err(error); + } + }; + response.parse_string_vec() + } + + async fn read>(&self, tx: u32, path: P) -> Result>> { + trace!("read tx={tx} path={}", path.as_ref()); + match self.socket.send(tx, XSD_READ, &[path.as_ref()]).await { + Ok(response) => Ok(Some(response.payload)), + Err(error) => { + if error.is_noent_response() { + Ok(None) + } else { + Err(error) + } + } + } + } + + async fn write>(&self, tx: u32, path: P, data: Vec) -> Result { + trace!("write tx={tx} path={} data={:?}", path.as_ref(), data); + let mut buffer = Vec::new(); + let path = CString::new(path.as_ref())?; + buffer.extend_from_slice(path.as_bytes_with_nul()); + buffer.extend_from_slice(data.as_slice()); + let response = self + .socket + .send_buf(tx, XSD_WRITE, buffer.as_slice()) + .await?; + response.parse_bool() + } + + async fn mkdir>(&self, tx: u32, path: P) -> Result { + trace!("mkdir tx={tx} path={}", path.as_ref()); + self.socket + .send(tx, XSD_MKDIR, &[path.as_ref()]) + .await? + .parse_bool() + } + + async fn rm>(&self, tx: u32, path: P) -> Result { + trace!("rm tx={tx} path={}", path.as_ref()); + let result = self.socket.send(tx, XSD_RM, &[path.as_ref()]).await; + if let Err(error) = result { + if error.is_noent_response() { + return Ok(true); + } + return Err(error); + } + result.unwrap().parse_bool() + } + + async fn set_perms>( + &self, + tx: u32, + path: P, + perms: &[XsPermission], + ) -> Result { + trace!("set_perms tx={tx} path={} perms={:?}", path.as_ref(), perms); + let mut items: Vec = Vec::new(); + items.push(path.as_ref().to_string()); + for perm in perms { + items.push(perm.encode()?); + } + let items_str: Vec<&str> = items.iter().map(|x| x.as_str()).collect(); + let response = self.socket.send(tx, XSD_SET_PERMS, &items_str).await?; + response.parse_bool() + } + + pub async fn transaction(&self) -> Result { + trace!("transaction start"); + let response = self.socket.send(0, XSD_TRANSACTION_START, &[""]).await?; + let str = response.parse_string()?; + let tx = str.parse::()?; + Ok(XsdTransaction { + client: self.clone(), + tx, + }) + } + + pub async fn get_domain_path(&mut self, domid: u32) -> Result { + let response = self + .socket + .send(0, XSD_GET_DOMAIN_PATH, &[&domid.to_string()]) + .await?; + response.parse_string() + } + + pub async fn introduce_domain(&mut self, domid: u32, mfn: u64, evtchn: u32) -> Result { + trace!("introduce domain domid={domid} mfn={mfn} evtchn={evtchn}"); + let response = self + .socket + .send( + 0, + XSD_INTRODUCE, + &[ + domid.to_string().as_str(), + mfn.to_string().as_str(), + evtchn.to_string().as_str(), + ], + ) + .await?; + response.parse_bool() + } + + pub async fn watch>(&self, path: P) -> Result { + let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; + let id_string = id.to_string(); + let _ = self + .socket + .send(0, XSD_WATCH, &[path.as_ref(), &id_string]) + .await?; + Ok(XsdWatchHandle { + id, + receiver, + unwatch_sender, + }) + } +} + +#[derive(Clone)] +pub struct XsdTransaction { + client: XsdClient, + tx: u32, +} + +impl XsdInterface for XsdClient { + async fn list>(&self, path: P) -> Result> { + self.list(0, path).await + } + + async fn read>(&self, path: P) -> Result>> { + self.read(0, path).await + } + + async fn read_string>(&self, path: P) -> Result> { + match self.read(0, path).await { + Ok(value) => match value { + Some(value) => Ok(Some(String::from_utf8(value)?)), + None => Ok(None), + }, + Err(error) => Err(error), + } + } + + async fn write>(&self, path: P, data: Vec) -> Result { + self.write(0, path, data).await + } + + async fn write_string>(&self, path: P, data: &str) -> Result { + self.write(0, path, data.as_bytes().to_vec()).await + } + + async fn mkdir>(&self, path: P) -> Result { + self.mkdir(0, path).await + } + + async fn rm>(&self, path: P) -> Result { + self.rm(0, path).await + } + + async fn set_perms>(&self, path: P, perms: &[XsPermission]) -> Result { + self.set_perms(0, path, perms).await + } +} + +impl XsdInterface for XsdTransaction { + async fn list>(&self, path: P) -> Result> { + self.client.list(self.tx, path).await + } + + async fn read>(&self, path: P) -> Result>> { + self.client.read(self.tx, path).await + } + + async fn read_string>(&self, path: P) -> Result> { + match self.client.read(self.tx, path).await { + Ok(value) => match value { + Some(value) => Ok(Some(String::from_utf8(value)?)), + None => Ok(None), + }, + Err(error) => Err(error), + } + } + + async fn write>(&self, path: P, data: Vec) -> Result { + self.client.write(self.tx, path, data).await + } + + async fn write_string>(&self, path: P, data: &str) -> Result { + self.client + .write(self.tx, path, data.as_bytes().to_vec()) + .await + } + + async fn mkdir>(&self, path: P) -> Result { + self.client.mkdir(self.tx, path).await + } + + async fn rm>(&self, path: P) -> Result { + self.client.rm(self.tx, path).await + } + + async fn set_perms>(&self, path: P, perms: &[XsPermission]) -> Result { + self.client.set_perms(self.tx, path, perms).await + } +} + +impl XsdTransaction { + pub async fn end(&self, abort: bool) -> Result { + let abort_str = if abort { "F" } else { "T" }; + + trace!("transaction end abort={}", abort); + self.client + .socket + .send(self.tx, XSD_TRANSACTION_END, &[abort_str]) + .await? + .parse_bool() + } + + pub async fn commit(&self) -> Result { + self.end(false).await + } + + pub async fn abort(&self) -> Result { + self.end(true).await + } +}