feat(xenstore): multi-watch and maybe-commit support (#429)

This commit is contained in:
Alex Zenla 2024-12-14 17:57:15 -05:00 committed by GitHub
parent a04a812f28
commit d7affe6c8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 7 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
/target /target
/.idea
/.vscode

View File

@ -63,7 +63,7 @@ impl BootDomain {
} }
let local_page_size: u32 = (1i64 << XEN_PAGE_SHIFT) as u32; let local_page_size: u32 = (1i64 << XEN_PAGE_SHIFT) as u32;
let pages = (size + local_page_size as u64 - 1) / local_page_size as u64; let pages = size.div_ceil(local_page_size as u64);
let start = self.virt_alloc_end; let start = self.virt_alloc_end;
let mut segment = DomainSegment { let mut segment = DomainSegment {

View File

@ -10,11 +10,12 @@ async fn list_recursive(client: &XsdClient, path: &str) -> Result<()> {
let children = client.list(path).await?; let children = client.list(path).await?;
for child in children { for child in children {
let full = format!("{}/{}", if path == "/" { "" } else { path }, child); let full = format!("{}/{}", if path == "/" { "" } else { path }, child);
let value = client let value = client.read(full.as_str()).await?.expect("expected value");
.read_string(full.as_str()) let stringified = match String::from_utf8(value) {
.await? Ok(string) => format!("\"{}\"", string),
.expect("expected value"); Err(error) => format!("{:?}", error.into_bytes()),
println!("{} = {:?}", full, value,); };
println!("{} = {}", full, stringified);
pending.push(full); pending.push(full);
} }
} }

View File

@ -48,6 +48,13 @@ impl Error {
_ => false, _ => false,
} }
} }
pub fn is_again_response(&self) -> bool {
match self {
Error::ResponseError(message) => message == "EAGAIN",
_ => false,
}
}
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@ -55,6 +55,27 @@ impl Drop for XsdWatchHandle {
} }
} }
pub struct XsdMultiWatchHandle {
pub paths: Vec<String>,
pub id: u32,
unwatch_sender: Sender<(u32, String)>,
pub receiver: Receiver<String>,
}
impl XsdMultiWatchHandle {
pub fn add_path(&mut self, path: impl AsRef<str>) {
self.paths.push(path.as_ref().to_string());
}
}
impl Drop for XsdMultiWatchHandle {
fn drop(&mut self) {
for path in &self.paths {
let _ = self.unwatch_sender.try_send((self.id, path.clone()));
}
}
}
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
pub trait XsdInterface { pub trait XsdInterface {
async fn list<P: AsRef<str>>(&self, path: P) -> Result<Vec<String>>; async fn list<P: AsRef<str>>(&self, path: P) -> Result<Vec<String>>;
@ -141,7 +162,7 @@ impl XsdClient {
} }
return Err(error); return Err(error);
} }
result.unwrap().parse_bool() result?.parse_bool()
} }
async fn set_perms<P: AsRef<str>>( async fn set_perms<P: AsRef<str>>(
@ -197,6 +218,16 @@ impl XsdClient {
response.parse_bool() response.parse_bool()
} }
pub async fn create_multi_watch(&self) -> Result<XsdMultiWatchHandle> {
let (id, receiver, unwatch_sender) = self.socket.add_watch().await?;
Ok(XsdMultiWatchHandle {
paths: vec![],
id,
receiver,
unwatch_sender,
})
}
pub async fn create_watch<P: AsRef<str>>(&self, path: P) -> Result<XsdWatchHandle> { pub async fn create_watch<P: AsRef<str>>(&self, path: P) -> Result<XsdWatchHandle> {
let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; let (id, receiver, unwatch_sender) = self.socket.add_watch().await?;
Ok(XsdWatchHandle { Ok(XsdWatchHandle {
@ -319,6 +350,20 @@ impl XsdTransaction {
.parse_bool() .parse_bool()
} }
pub async fn maybe_commit(&self) -> Result<bool> {
match self.end(false).await {
Ok(result) => Ok(result),
Err(error) => {
if error.is_again_response() {
Ok(false)
} else {
Err(error)
}
}
}
}
pub async fn commit(&self) -> Result<bool> { pub async fn commit(&self) -> Result<bool> {
self.end(false).await self.end(false).await
} }