async-ify xenstore and xenclient

This commit is contained in:
Alex Zenla
2024-02-23 04:37:53 +00:00
parent cf0b62c9f5
commit 79acf4e814
15 changed files with 395 additions and 294 deletions

View File

@ -1,13 +1,15 @@
use std::{thread::sleep, time::Duration};
use std::time::Duration;
use anyhow::Result;
use kratanet::autonet::AutoNetworkCollector;
use tokio::time::sleep;
fn main() -> Result<()> {
let mut collector = AutoNetworkCollector::new()?;
#[tokio::main]
async fn main() -> Result<()> {
let mut collector = AutoNetworkCollector::new().await?;
loop {
let changeset = collector.read_changes()?;
let changeset = collector.read_changes().await?;
println!("{:?}", changeset);
sleep(Duration::from_secs(2));
sleep(Duration::from_secs(2)).await;
}
}

View File

@ -37,23 +37,23 @@ pub struct AutoNetworkChangeset {
}
impl AutoNetworkCollector {
pub fn new() -> Result<AutoNetworkCollector> {
pub async fn new() -> Result<AutoNetworkCollector> {
Ok(AutoNetworkCollector {
client: XsdClient::open()?,
client: XsdClient::open().await?,
known: HashMap::new(),
})
}
pub fn read(&mut self) -> Result<Vec<NetworkMetadata>> {
pub async fn read(&mut self) -> Result<Vec<NetworkMetadata>> {
let mut networks = Vec::new();
let mut tx = self.client.transaction()?;
for domid_string in tx.list_any("/local/domain")? {
let mut tx = self.client.transaction().await?;
for domid_string in tx.list("/local/domain").await? {
let Ok(domid) = domid_string.parse::<u32>() else {
continue;
};
let dom_path = format!("/local/domain/{}", domid_string);
let Some(uuid_string) = tx.read_string_optional(&format!("{}/krata/uuid", dom_path))?
let Some(uuid_string) = tx.read_string(&format!("{}/krata/uuid", dom_path)).await?
else {
continue;
};
@ -63,13 +63,13 @@ impl AutoNetworkCollector {
};
let Ok(guest) =
AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "guest")
AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "guest").await
else {
continue;
};
let Ok(gateway) =
AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "gateway")
AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "gateway").await
else {
continue;
};
@ -81,18 +81,18 @@ impl AutoNetworkCollector {
gateway,
});
}
tx.commit()?;
tx.commit().await?;
Ok(networks)
}
fn read_network_side(
async fn read_network_side(
uuid: Uuid,
tx: &mut XsdTransaction<'_>,
dom_path: &str,
side: &str,
) -> Result<NetworkSide> {
let side_path = format!("{}/krata/network/{}", dom_path, side);
let Some(ipv4) = tx.read_string_optional(&format!("{}/ipv4", side_path))? else {
let Some(ipv4) = tx.read_string(&format!("{}/ipv4", side_path)).await? else {
return Err(anyhow!(
"krata domain {} is missing {} ipv4 network entry",
uuid,
@ -100,7 +100,7 @@ impl AutoNetworkCollector {
));
};
let Some(ipv6) = tx.read_string_optional(&format!("{}/ipv6", side_path))? else {
let Some(ipv6) = tx.read_string(&format!("{}/ipv6", side_path)).await? else {
return Err(anyhow!(
"krata domain {} is missing {} ipv6 network entry",
uuid,
@ -108,7 +108,7 @@ impl AutoNetworkCollector {
));
};
let Some(mac) = tx.read_string_optional(&format!("{}/mac", side_path))? else {
let Some(mac) = tx.read_string(&format!("{}/mac", side_path)).await? else {
return Err(anyhow!(
"krata domain {} is missing {} mac address entry",
uuid,
@ -146,12 +146,12 @@ impl AutoNetworkCollector {
Ok(NetworkSide { ipv4, ipv6, mac })
}
pub fn read_changes(&mut self) -> Result<AutoNetworkChangeset> {
pub async fn read_changes(&mut self) -> Result<AutoNetworkChangeset> {
let mut seen: Vec<Uuid> = Vec::new();
let mut added: Vec<NetworkMetadata> = Vec::new();
let mut removed: Vec<NetworkMetadata> = Vec::new();
for network in self.read()? {
for network in self.read().await? {
seen.push(network.uuid);
if self.known.contains_key(&network.uuid) {
continue;

View File

@ -36,9 +36,9 @@ impl NetworkService {
impl NetworkService {
pub async fn watch(&mut self) -> Result<()> {
let mut collector = AutoNetworkCollector::new()?;
let mut collector = AutoNetworkCollector::new().await?;
loop {
let changeset = collector.read_changes()?;
let changeset = collector.read_changes().await?;
self.process_network_changeset(&mut collector, changeset)?;
sleep(Duration::from_secs(2)).await;
}