diff --git a/controller/bin/control.rs b/controller/bin/control.rs index 2224ed8..475c450 100644 --- a/controller/bin/control.rs +++ b/controller/bin/control.rs @@ -69,7 +69,7 @@ async fn main() -> Result<()> { .map(|x| x.to_string()) .ok_or_else(|| anyhow!("unable to convert store path to string"))?; - let mut context = ControllerContext::new(store_path.clone())?; + let mut context = ControllerContext::new(store_path.clone()).await?; match args.command { Commands::Launch { @@ -96,7 +96,7 @@ async fn main() -> Result<()> { run: if run.is_empty() { None } else { Some(run) }, debug, }; - let (uuid, _domid) = launch.perform(request)?; + let (uuid, _domid) = launch.perform(request).await?; println!("launched container: {}", uuid); if attach { let mut console = ControllerConsole::new(&mut context); @@ -106,7 +106,7 @@ async fn main() -> Result<()> { Commands::Destroy { container } => { let mut destroy = ControllerDestroy::new(&mut context); - destroy.perform(&container)?; + destroy.perform(&container).await?; } Commands::Console { container } => { @@ -115,7 +115,7 @@ async fn main() -> Result<()> { } Commands::List { .. } => { - let containers = context.list()?; + let containers = context.list().await?; let mut table = cli_tables::Table::new(); let header = vec!["uuid", "ipv4", "ipv6", "image"]; table.push_row(&header)?; diff --git a/controller/src/ctl/console.rs b/controller/src/ctl/console.rs index f24626d..3a84d62 100644 --- a/controller/src/ctl/console.rs +++ b/controller/src/ctl/console.rs @@ -16,10 +16,11 @@ impl ControllerConsole<'_> { pub async fn perform(&mut self, id: &str) -> Result<()> { let info = self .context - .resolve(id)? + .resolve(id) + .await? .ok_or_else(|| anyhow!("unable to resolve container: {}", id))?; let domid = info.domid; - let tty = self.context.xen.get_console_path(domid)?; + let tty = self.context.xen.get_console_path(domid).await?; let console = XenConsole::new(&tty).await?; console.attach().await?; Ok(()) diff --git a/controller/src/ctl/destroy.rs b/controller/src/ctl/destroy.rs index eb0ecac..acacb86 100644 --- a/controller/src/ctl/destroy.rs +++ b/controller/src/ctl/destroy.rs @@ -15,15 +15,19 @@ impl ControllerDestroy<'_> { ControllerDestroy { context } } - pub fn perform(&mut self, id: &str) -> Result { + pub async fn perform(&mut self, id: &str) -> Result { let info = self .context - .resolve(id)? + .resolve(id) + .await? .ok_or_else(|| anyhow!("unable to resolve container: {}", id))?; let domid = info.domid; - let mut store = XsdClient::open()?; - let dom_path = store.get_domain_path(domid)?; - let uuid = match store.read_string_optional(format!("{}/krata/uuid", dom_path).as_str())? { + let mut store = XsdClient::open().await?; + let dom_path = store.get_domain_path(domid).await?; + let uuid = match store + .read_string(format!("{}/krata/uuid", dom_path).as_str()) + .await? + { None => { return Err(anyhow!( "domain {} was not found or not created by krata", @@ -36,9 +40,11 @@ impl ControllerDestroy<'_> { return Err(anyhow!("unable to find krata uuid based on the domain",)); } let uuid = Uuid::parse_str(&uuid)?; - let loops = store.read_string(format!("{}/krata/loops", dom_path).as_str())?; + let loops = store + .read_string(format!("{}/krata/loops", dom_path).as_str()) + .await?; let loops = ControllerContext::parse_loop_set(&loops); - self.context.xen.destroy(domid)?; + self.context.xen.destroy(domid).await?; for info in &loops { self.context.autoloop.unloop(&info.device)?; match &info.delete { diff --git a/controller/src/ctl/launch.rs b/controller/src/ctl/launch.rs index 0580810..d200a9f 100644 --- a/controller/src/ctl/launch.rs +++ b/controller/src/ctl/launch.rs @@ -35,7 +35,7 @@ impl ControllerLaunch<'_> { ControllerLaunch { context } } - pub fn perform(&mut self, request: ControllerLaunchRequest) -> Result<(Uuid, u32)> { + pub async fn perform(&mut self, request: ControllerLaunchRequest<'_>) -> Result<(Uuid, u32)> { let uuid = Uuid::new_v4(); let name = format!("krata-{uuid}"); let image_info = self.compile(request.image)?; @@ -47,7 +47,7 @@ impl ControllerLaunch<'_> { container_mac.set_local(true); container_mac.set_multicast(false); - let guest_ipv4 = self.allocate_ipv4()?; + let guest_ipv4 = self.allocate_ipv4().await?; let guest_ipv6 = container_mac.to_link_local_ipv6(); let gateway_ipv4 = "192.168.42.1"; let gateway_ipv6 = "fe80::1"; @@ -178,7 +178,7 @@ impl ControllerLaunch<'_> { ), ], }; - match self.context.xen.create(&config) { + match self.context.xen.create(&config).await { Ok(domid) => Ok((uuid, domid)), Err(error) => { let _ = self.context.autoloop.unloop(&image_squashfs_loop.path); @@ -189,17 +189,17 @@ impl ControllerLaunch<'_> { } } - fn allocate_ipv4(&mut self) -> Result { + async fn allocate_ipv4(&mut self) -> Result { let network = Ipv4Network::new(Ipv4Addr::new(192, 168, 42, 0), 24)?; let mut used: Vec = vec![ Ipv4Addr::new(192, 168, 42, 0), Ipv4Addr::new(192, 168, 42, 1), Ipv4Addr::new(192, 168, 42, 255), ]; - for domid_candidate in self.context.xen.store.list_any("/local/domain")? { + for domid_candidate in self.context.xen.store.list("/local/domain").await? { let dom_path = format!("/local/domain/{}", domid_candidate); let ip_path = format!("{}/krata/network/guest/ipv4", dom_path); - let existing_ip = self.context.xen.store.read_string_optional(&ip_path)?; + let existing_ip = self.context.xen.store.read_string(&ip_path).await?; if let Some(existing_ip) = existing_ip { let ipv4_network = Ipv4Network::from_str(&existing_ip)?; used.push(ipv4_network.ip()); diff --git a/controller/src/ctl/mod.rs b/controller/src/ctl/mod.rs index 037c7f2..ddd12c8 100644 --- a/controller/src/ctl/mod.rs +++ b/controller/src/ctl/mod.rs @@ -37,12 +37,12 @@ pub struct ContainerInfo { } impl ControllerContext { - pub fn new(store_path: String) -> Result { + pub async fn new(store_path: String) -> Result { let mut image_cache_path = PathBuf::from(store_path); image_cache_path.push("cache"); fs::create_dir_all(&image_cache_path)?; - let xen = XenClient::open()?; + let xen = XenClient::open().await?; image_cache_path.push("image"); fs::create_dir_all(&image_cache_path)?; let image_cache = ImageCache::new(&image_cache_path)?; @@ -53,14 +53,15 @@ impl ControllerContext { }) } - pub fn list(&mut self) -> Result> { + pub async fn list(&mut self) -> Result> { let mut containers: Vec = Vec::new(); - for domid_candidate in self.xen.store.list_any("/local/domain")? { + for domid_candidate in self.xen.store.list("/local/domain").await? { let dom_path = format!("/local/domain/{}", domid_candidate); let uuid_string = match self .xen .store - .read_string_optional(&format!("{}/krata/uuid", &dom_path))? + .read_string(&format!("{}/krata/uuid", &dom_path)) + .await? { None => continue, Some(value) => value, @@ -71,22 +72,25 @@ impl ControllerContext { let image = self .xen .store - .read_string_optional(&format!("{}/krata/image", &dom_path))? + .read_string(&format!("{}/krata/image", &dom_path)) + .await? .unwrap_or("unknown".to_string()); let loops = self .xen .store - .read_string_optional(&format!("{}/krata/loops", &dom_path))? - .unwrap_or("".to_string()); + .read_string(&format!("{}/krata/loops", &dom_path)) + .await?; let ipv4 = self .xen .store - .read_string_optional(&format!("{}/krata/network/guest/ipv4", &dom_path))? + .read_string(&format!("{}/krata/network/guest/ipv4", &dom_path)) + .await? .unwrap_or("unknown".to_string()); let ipv6: String = self .xen .store - .read_string_optional(&format!("{}/krata/network/guest/ipv6", &dom_path))? + .read_string(&format!("{}/krata/network/guest/ipv6", &dom_path)) + .await? .unwrap_or("unknown".to_string()); let loops = ControllerContext::parse_loop_set(&loops); containers.push(ContainerInfo { @@ -101,8 +105,8 @@ impl ControllerContext { Ok(containers) } - pub fn resolve(&mut self, id: &str) -> Result> { - for container in self.list()? { + pub async fn resolve(&mut self, id: &str) -> Result> { + for container in self.list().await? { let uuid_string = container.uuid.to_string(); let domid_string = container.domid.to_string(); if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) { @@ -112,7 +116,10 @@ impl ControllerContext { Ok(None) } - fn parse_loop_set(input: &str) -> Vec { + fn parse_loop_set(input: &Option) -> Vec { + let Some(input) = input else { + return Vec::new(); + }; let sets = input .split(',') .map(|x| x.to_string()) diff --git a/libs/xen/xenclient/Cargo.toml b/libs/xen/xenclient/Cargo.toml index d1454c9..3dfd74e 100644 --- a/libs/xen/xenclient/Cargo.toml +++ b/libs/xen/xenclient/Cargo.toml @@ -14,6 +14,7 @@ memchr = { workspace = true } slice-copy = { workspace = true } log = { workspace = true } uuid = { workspace = true } +tokio = { workspace = true } [dependencies.xencall] path = "../xencall" diff --git a/libs/xen/xenclient/examples/boot.rs b/libs/xen/xenclient/examples/boot.rs index fb0da39..e21d75d 100644 --- a/libs/xen/xenclient/examples/boot.rs +++ b/libs/xen/xenclient/examples/boot.rs @@ -2,7 +2,8 @@ use std::{env, process}; use xenclient::error::Result; use xenclient::{DomainConfig, XenClient}; -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { env_logger::init(); let args: Vec = env::args().collect(); @@ -12,7 +13,7 @@ fn main() -> Result<()> { } let kernel_image_path = args.get(1).expect("argument not specified"); let initrd_path = args.get(2).expect("argument not specified"); - let mut client = XenClient::open()?; + let mut client = XenClient::open().await?; let config = DomainConfig { backend_domid: 0, name: "xenclient-test", @@ -28,7 +29,7 @@ fn main() -> Result<()> { extra_keys: vec![], event_channels: vec![], }; - let domid = client.create(&config)?; + let domid = client.create(&config).await?; println!("created domain {}", domid); Ok(()) } diff --git a/libs/xen/xenclient/src/lib.rs b/libs/xen/xenclient/src/lib.rs index a1e56f7..3860699 100644 --- a/libs/xen/xenclient/src/lib.rs +++ b/libs/xen/xenclient/src/lib.rs @@ -82,38 +82,43 @@ pub struct DomainConfig<'a> { } impl XenClient { - pub fn open() -> Result { - let store = XsdClient::open()?; + pub async fn open() -> Result { + let store = XsdClient::open().await?; let call = XenCall::open()?; Ok(XenClient { store, call }) } - pub fn create(&mut self, config: &DomainConfig) -> Result { + pub async fn create(&mut self, config: &DomainConfig<'_>) -> Result { let domain = CreateDomain { max_vcpus: config.max_vcpus, ..Default::default() }; let domid = self.call.create_domain(domain)?; - match self.init(domid, &domain, config) { + match self.init(domid, &domain, config).await { Ok(_) => Ok(domid), Err(err) => { // ignore since destroying a domain is best // effort when an error occurs - let _ = self.destroy(domid); + let _ = self.destroy(domid).await; Err(err) } } } - fn init(&mut self, domid: u32, domain: &CreateDomain, config: &DomainConfig) -> Result<()> { + async fn init( + &mut self, + domid: u32, + domain: &CreateDomain, + config: &DomainConfig<'_>, + ) -> Result<()> { trace!( "XenClient init domid={} domain={:?} config={:?}", domid, domain, config ); - let backend_dom_path = self.store.get_domain_path(0)?; - let dom_path = self.store.get_domain_path(domid)?; + let backend_dom_path = self.store.get_domain_path(0).await?; + let dom_path = self.store.get_domain_path(domid).await?; let uuid_string = Uuid::from_bytes(domain.handle).to_string(); let vm_path = format!("/vm/{}", uuid_string); @@ -139,57 +144,76 @@ impl XenClient { }]; { - let mut tx = self.store.transaction()?; + let mut tx = self.store.transaction().await?; - tx.rm(dom_path.as_str())?; - tx.mknod(dom_path.as_str(), ro_perm)?; + tx.rm(dom_path.as_str()).await?; + tx.mknod(dom_path.as_str(), ro_perm).await?; - tx.rm(vm_path.as_str())?; - tx.mknod(vm_path.as_str(), ro_perm)?; + tx.rm(vm_path.as_str()).await?; + tx.mknod(vm_path.as_str(), ro_perm).await?; - tx.mknod(vm_path.as_str(), no_perm)?; - tx.mknod(format!("{}/device", vm_path).as_str(), no_perm)?; + tx.mknod(vm_path.as_str(), no_perm).await?; + tx.mknod(format!("{}/device", vm_path).as_str(), no_perm) + .await?; - tx.write_string(format!("{}/vm", dom_path).as_str(), &vm_path)?; + tx.write_string(format!("{}/vm", dom_path).as_str(), &vm_path) + .await?; - tx.mknod(format!("{}/cpu", dom_path).as_str(), ro_perm)?; - tx.mknod(format!("{}/memory", dom_path).as_str(), ro_perm)?; + tx.mknod(format!("{}/cpu", dom_path).as_str(), ro_perm) + .await?; + tx.mknod(format!("{}/memory", dom_path).as_str(), ro_perm) + .await?; - tx.mknod(format!("{}/control", dom_path).as_str(), ro_perm)?; + tx.mknod(format!("{}/control", dom_path).as_str(), ro_perm) + .await?; - tx.mknod(format!("{}/control/shutdown", dom_path).as_str(), rw_perm)?; + tx.mknod(format!("{}/control/shutdown", dom_path).as_str(), rw_perm) + .await?; tx.mknod( format!("{}/control/feature-poweroff", dom_path).as_str(), rw_perm, - )?; + ) + .await?; tx.mknod( format!("{}/control/feature-reboot", dom_path).as_str(), rw_perm, - )?; + ) + .await?; tx.mknod( format!("{}/control/feature-suspend", dom_path).as_str(), rw_perm, - )?; - tx.mknod(format!("{}/control/sysrq", dom_path).as_str(), rw_perm)?; + ) + .await?; + tx.mknod(format!("{}/control/sysrq", dom_path).as_str(), rw_perm) + .await?; - tx.mknod(format!("{}/data", dom_path).as_str(), rw_perm)?; - tx.mknod(format!("{}/drivers", dom_path).as_str(), rw_perm)?; - tx.mknod(format!("{}/feature", dom_path).as_str(), rw_perm)?; - tx.mknod(format!("{}/attr", dom_path).as_str(), rw_perm)?; - tx.mknod(format!("{}/error", dom_path).as_str(), rw_perm)?; + tx.mknod(format!("{}/data", dom_path).as_str(), rw_perm) + .await?; + tx.mknod(format!("{}/drivers", dom_path).as_str(), rw_perm) + .await?; + tx.mknod(format!("{}/feature", dom_path).as_str(), rw_perm) + .await?; + tx.mknod(format!("{}/attr", dom_path).as_str(), rw_perm) + .await?; + tx.mknod(format!("{}/error", dom_path).as_str(), rw_perm) + .await?; tx.write_string( format!("{}/uuid", vm_path).as_str(), &Uuid::from_bytes(domain.handle).to_string(), - )?; - tx.write_string(format!("{}/name", dom_path).as_str(), config.name)?; - tx.write_string(format!("{}/name", vm_path).as_str(), config.name)?; + ) + .await?; + tx.write_string(format!("{}/name", dom_path).as_str(), config.name) + .await?; + tx.write_string(format!("{}/name", vm_path).as_str(), config.name) + .await?; for (key, value) in &config.extra_keys { - tx.write_string(format!("{}/{}", dom_path, key).as_str(), value)?; + tx.write_string(format!("{}/{}", dom_path, key).as_str(), value) + .await?; } - tx.commit()?; + tx.commit().await?; } self.call.set_max_vcpus(domid, config.max_vcpus)?; @@ -220,52 +244,63 @@ impl XenClient { } { - let mut tx = self.store.transaction()?; - tx.write_string(format!("{}/image/os_type", vm_path).as_str(), "linux")?; + let mut tx = self.store.transaction().await?; + tx.write_string(format!("{}/image/os_type", vm_path).as_str(), "linux") + .await?; tx.write_string( format!("{}/image/kernel", vm_path).as_str(), config.kernel_path, - )?; + ) + .await?; tx.write_string( format!("{}/image/ramdisk", vm_path).as_str(), config.initrd_path, - )?; + ) + .await?; tx.write_string( format!("{}/image/cmdline", vm_path).as_str(), config.cmdline, - )?; + ) + .await?; tx.write_string( format!("{}/memory/static-max", dom_path).as_str(), &(config.mem_mb * 1024).to_string(), - )?; + ) + .await?; tx.write_string( format!("{}/memory/target", dom_path).as_str(), &(config.mem_mb * 1024).to_string(), - )?; - tx.write_string(format!("{}/memory/videoram", dom_path).as_str(), "0")?; - tx.write_string(format!("{}/domid", dom_path).as_str(), &domid.to_string())?; + ) + .await?; + tx.write_string(format!("{}/memory/videoram", dom_path).as_str(), "0") + .await?; + tx.write_string(format!("{}/domid", dom_path).as_str(), &domid.to_string()) + .await?; tx.write_string( format!("{}/store/port", dom_path).as_str(), &xenstore_evtchn.to_string(), - )?; + ) + .await?; tx.write_string( format!("{}/store/ring-ref", dom_path).as_str(), &xenstore_mfn.to_string(), - )?; + ) + .await?; for i in 0..config.max_vcpus { let path = format!("{}/cpu/{}", dom_path, i); - tx.mkdir(&path)?; - tx.set_perms(&path, ro_perm)?; + tx.mkdir(&path).await?; + tx.set_perms(&path, ro_perm).await?; let path = format!("{}/cpu/{}/availability", dom_path, i); - tx.write_string(&path, "online")?; - tx.set_perms(&path, ro_perm)?; + tx.write_string(&path, "online").await?; + tx.set_perms(&path, ro_perm).await?; } - tx.commit()?; + tx.commit().await?; } if !self .store - .introduce_domain(domid, xenstore_mfn, xenstore_evtchn)? + .introduce_domain(domid, xenstore_mfn, xenstore_evtchn) + .await? { return Err(Error::IntroduceDomainFailed); } @@ -277,7 +312,8 @@ impl XenClient { 0, Some(console_evtchn), Some(console_mfn), - )?; + ) + .await?; for (index, _) in config.consoles.iter().enumerate() { self.console_device_add( @@ -288,7 +324,8 @@ impl XenClient { index + 1, None, None, - )?; + ) + .await?; } for (index, disk) in config.disks.iter().enumerate() { @@ -299,7 +336,8 @@ impl XenClient { domid, index, disk, - )?; + ) + .await?; } for (index, filesystem) in config.filesystems.iter().enumerate() { @@ -310,7 +348,8 @@ impl XenClient { domid, index, filesystem, - )?; + ) + .await?; } for (index, vif) in config.vifs.iter().enumerate() { @@ -321,7 +360,8 @@ impl XenClient { domid, index, vif, - )?; + ) + .await?; } for channel in &config.event_channels { @@ -330,23 +370,25 @@ impl XenClient { .evtchn_alloc_unbound(domid, config.backend_domid)?; let channel_path = format!("{}/evtchn/{}", dom_path, channel.name); self.store - .write_string(&format!("{}/name", channel_path), channel.name)?; + .write_string(&format!("{}/name", channel_path), channel.name) + .await?; self.store - .write_string(&format!("{}/channel", channel_path), &id.to_string())?; + .write_string(&format!("{}/channel", channel_path), &id.to_string()) + .await?; } self.call.unpause_domain(domid)?; Ok(()) } - fn disk_device_add( + async fn disk_device_add( &mut self, dom_path: &str, backend_dom_path: &str, backend_domid: u32, domid: u32, index: usize, - disk: &DomainDisk, + disk: &DomainDisk<'_>, ) -> Result<()> { let id = (202 << 8) | (index << 4) as u64; let backend_items: Vec<(&str, String)> = vec![ @@ -386,12 +428,13 @@ impl XenClient { domid, frontend_items, backend_items, - )?; + ) + .await?; Ok(()) } #[allow(clippy::too_many_arguments, clippy::unnecessary_unwrap)] - fn console_device_add( + async fn console_device_add( &mut self, dom_path: &str, backend_dom_path: &str, @@ -444,18 +487,19 @@ impl XenClient { domid, frontend_entries, backend_entries, - )?; + ) + .await?; Ok(()) } - fn fs_9p_device_add( + async fn fs_9p_device_add( &mut self, dom_path: &str, backend_dom_path: &str, backend_domid: u32, domid: u32, index: usize, - filesystem: &DomainFilesystem, + filesystem: &DomainFilesystem<'_>, ) -> Result<()> { let id = 90 + index as u64; let backend_items: Vec<(&str, String)> = vec![ @@ -481,18 +525,19 @@ impl XenClient { domid, frontend_items, backend_items, - )?; + ) + .await?; Ok(()) } - fn vif_device_add( + async fn vif_device_add( &mut self, dom_path: &str, backend_dom_path: &str, backend_domid: u32, domid: u32, index: usize, - vif: &DomainNetworkInterface, + vif: &DomainNetworkInterface<'_>, ) -> Result<()> { let id = 20 + index as u64; let mut backend_items: Vec<(&str, String)> = vec![ @@ -538,12 +583,13 @@ impl XenClient { domid, frontend_items, backend_items, - )?; + ) + .await?; Ok(()) } #[allow(clippy::too_many_arguments)] - fn device_add( + async fn device_add( &mut self, typ: &str, id: u64, @@ -589,36 +635,36 @@ impl XenClient { }, ]; - let mut tx = self.store.transaction()?; - tx.mknod(&frontend_path, frontend_perms)?; + let mut tx = self.store.transaction().await?; + tx.mknod(&frontend_path, frontend_perms).await?; for (p, value) in &frontend_items { let path = format!("{}/{}", frontend_path, *p); - tx.write_string(&path, value)?; + tx.write_string(&path, value).await?; if !console_zero { - tx.set_perms(&path, frontend_perms)?; + tx.set_perms(&path, frontend_perms).await?; } } - tx.mknod(&backend_path, backend_perms)?; + tx.mknod(&backend_path, backend_perms).await?; for (p, value) in &backend_items { let path = format!("{}/{}", backend_path, *p); - tx.write_string(&path, value)?; + tx.write_string(&path, value).await?; } - tx.commit()?; + tx.commit().await?; Ok(()) } - pub fn destroy(&mut self, domid: u32) -> Result<()> { - if let Err(err) = self.destroy_store(domid) { + pub async fn destroy(&mut self, domid: u32) -> Result<()> { + if let Err(err) = self.destroy_store(domid).await { warn!("failed to destroy store for domain {}: {}", domid, err); } self.call.destroy_domain(domid)?; Ok(()) } - fn destroy_store(&mut self, domid: u32) -> Result<()> { - let dom_path = self.store.get_domain_path(domid)?; - let vm_path = self.store.read_string(&format!("{}/vm", dom_path))?; - if vm_path.is_empty() { + async fn destroy_store(&mut self, domid: u32) -> Result<()> { + let dom_path = self.store.get_domain_path(domid).await?; + let vm_path = self.store.read_string(&format!("{}/vm", dom_path)).await?; + if vm_path.is_none() { return Err(Error::DomainNonExistent); } @@ -626,20 +672,27 @@ impl XenClient { let console_frontend_path = format!("{}/console", dom_path); let console_backend_path = self .store - .read_string_optional(format!("{}/backend", console_frontend_path).as_str())?; + .read_string(format!("{}/backend", console_frontend_path).as_str()) + .await?; for device_category in self .store - .list_any(format!("{}/device", dom_path).as_str())? + .list(format!("{}/device", dom_path).as_str()) + .await? { for device_id in self .store - .list_any(format!("{}/device/{}", dom_path, device_category).as_str())? + .list(format!("{}/device/{}", dom_path, device_category).as_str()) + .await? { let device_path = format!("{}/device/{}/{}", dom_path, device_category, device_id); - let backend_path = self + let Some(backend_path) = self .store - .read_string(format!("{}/backend", device_path).as_str())?; + .read_string(format!("{}/backend", device_path).as_str()) + .await? + else { + continue; + }; backend_paths.push(backend_path); } } @@ -647,16 +700,16 @@ impl XenClient { for backend in &backend_paths { let state_path = format!("{}/state", backend); let online_path = format!("{}/online", backend); - let mut tx = self.store.transaction()?; - let state = tx.read_string(&state_path)?; + let mut tx = self.store.transaction().await?; + let state = tx.read_string(&state_path).await?.unwrap_or(String::new()); if state.is_empty() { break; } - tx.write_string(&online_path, "0")?; + tx.write_string(&online_path, "0").await?; if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 { - tx.write_string(&state_path, "5")?; + tx.write_string(&state_path, "5").await?; } - tx.commit()?; + tx.commit().await?; let mut count: u32 = 0; loop { @@ -664,7 +717,9 @@ impl XenClient { warn!("unable to safely destroy backend: {}", backend); break; } - let state = self.store.read_string(&state_path)?; + let Some(state) = self.store.read_string(&state_path).await? else { + break; + }; let state = i64::from_str(&state).unwrap_or(-1); if state == 6 { break; @@ -674,7 +729,7 @@ impl XenClient { } } - let mut tx = self.store.transaction()?; + let mut tx = self.store.transaction().await?; let mut backend_removals: Vec = Vec::new(); backend_removals.extend_from_slice(backend_paths.as_slice()); if let Some(backend) = console_backend_path { @@ -683,20 +738,23 @@ impl XenClient { for path in &backend_removals { let path = PathBuf::from(path); let parent = path.parent().ok_or(Error::PathParentNotFound)?; - tx.rm(parent.to_str().ok_or(Error::PathStringConversion)?)?; + tx.rm(parent.to_str().ok_or(Error::PathStringConversion)?) + .await?; } - tx.rm(&vm_path)?; - tx.rm(&dom_path)?; - tx.commit()?; + if let Some(vm_path) = vm_path { + tx.rm(&vm_path).await?; + } + tx.rm(&dom_path).await?; + tx.commit().await?; Ok(()) } - pub fn get_console_path(&mut self, domid: u32) -> Result { - let dom_path = self.store.get_domain_path(domid)?; + pub async fn get_console_path(&mut self, domid: u32) -> Result { + let dom_path = self.store.get_domain_path(domid).await?; let console_tty_path = format!("{}/console/tty", dom_path); let mut tty: Option = None; for _ in 0..5 { - tty = self.store.read_string_optional(&console_tty_path)?; + tty = self.store.read_string(&console_tty_path).await?; if tty.is_some() { break; } diff --git a/libs/xen/xenstore/Cargo.toml b/libs/xen/xenstore/Cargo.toml index 0bdd804..f31ad92 100644 --- a/libs/xen/xenstore/Cargo.toml +++ b/libs/xen/xenstore/Cargo.toml @@ -11,6 +11,11 @@ path = "src/lib.rs" thiserror = { workspace = true } libc = { workspace = true } log = { workspace = true } +tokio = { workspace = true } +async-trait = { workspace = true } + +[dev-dependencies] +futures = { workspace = true } [dependencies.bytemuck] workspace = true diff --git a/libs/xen/xenstore/examples/list.rs b/libs/xen/xenstore/examples/list.rs index 43ea294..c0ed6e0 100644 --- a/libs/xen/xenstore/examples/list.rs +++ b/libs/xen/xenstore/examples/list.rs @@ -1,9 +1,10 @@ +use futures::executor::block_on; use xenstore::client::{XsdClient, XsdInterface}; use xenstore::error::Result; use xenstore::sys::XSD_ERROR_EINVAL; fn list_recursive(client: &mut XsdClient, level: usize, path: &str) -> Result<()> { - let children = match client.list(path) { + let children = match block_on(client.list(path)) { Ok(children) => children, Err(error) => { return if error.to_string() == XSD_ERROR_EINVAL.error { @@ -16,20 +17,16 @@ fn list_recursive(client: &mut XsdClient, level: usize, path: &str) -> Result<() for child in children { let full = format!("{}/{}", if path == "/" { "" } else { path }, child); - let value = client.read(full.as_str())?; - println!( - "{}{} = {:?}", - " ".repeat(level), - child, - String::from_utf8(value)? - ); + let value = block_on(client.read_string(full.as_str()))?.expect("expected value"); + println!("{}{} = {:?}", " ".repeat(level), child, value,); list_recursive(client, level + 1, full.as_str())?; } Ok(()) } -fn main() -> Result<()> { - let mut client = XsdClient::open()?; +#[tokio::main] +async fn main() -> Result<()> { + let mut client = XsdClient::open().await?; list_recursive(&mut client, 0, "/")?; Ok(()) } diff --git a/libs/xen/xenstore/src/bus.rs b/libs/xen/xenstore/src/bus.rs index 6a71b01..fd1a5ad 100644 --- a/libs/xen/xenstore/src/bus.rs +++ b/libs/xen/xenstore/src/bus.rs @@ -5,7 +5,8 @@ use std::fs::{self, metadata, File}; use std::io::{Read, Write}; use std::mem::size_of; use std::os::unix::fs::FileTypeExt; -use std::os::unix::net::UnixStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::UnixStream; const XEN_BUS_PATHS: &[&str] = &["/dev/xen/xenbus", "/var/run/xenstored/socket"]; @@ -19,18 +20,21 @@ fn find_bus_path() -> Option { None } +#[async_trait::async_trait] trait XsdTransport { - fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()>; - fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()>; + async fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()>; + async fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()>; } +#[async_trait::async_trait] impl XsdTransport for UnixStream { - fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()> { - Ok(self.write_all(buf)?) + async fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()> { + Ok(self.write_all(buf).await?) } - fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()> { - Ok(self.read_exact(buf)?) + async fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()> { + self.read_exact(buf).await?; + Ok(()) } } @@ -45,12 +49,13 @@ impl XsdFileTransport { } } +#[async_trait::async_trait] impl XsdTransport for XsdFileTransport { - fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()> { + async fn xsd_read_exact(&mut self, buf: &mut [u8]) -> Result<()> { Ok(self.handle.read_exact(buf)?) } - fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()> { + async fn xsd_write_all(&mut self, buf: &[u8]) -> Result<()> { self.handle.write_all(buf)?; self.handle.flush()?; Ok(()) @@ -93,7 +98,7 @@ impl XsdResponse { } impl XsdSocket { - pub fn dial() -> Result { + pub async fn open() -> Result { let path = match find_bus_path() { Some(path) => path, None => return Err(Error::BusNotFound), @@ -102,7 +107,7 @@ impl XsdSocket { let metadata = fs::metadata(&path)?; let file_type = metadata.file_type(); if file_type.is_socket() { - let stream = UnixStream::connect(&path)?; + let stream = UnixStream::connect(&path).await?; return Ok(XsdSocket { handle: Box::new(stream), }); @@ -113,7 +118,7 @@ impl XsdSocket { }) } - pub fn send(&mut self, tx: u32, typ: u32, buf: &[u8]) -> Result { + pub async fn send(&mut self, tx: u32, typ: u32, buf: &[u8]) -> Result { let header = XsdMessageHeader { typ, req: 0, @@ -124,12 +129,14 @@ impl XsdSocket { let mut composed: Vec = Vec::new(); composed.extend_from_slice(header_bytes); composed.extend_from_slice(buf); - self.handle.xsd_write_all(&composed)?; + self.handle.xsd_write_all(&composed).await?; let mut result_buf = vec![0u8; size_of::()]; - self.handle.xsd_read_exact(result_buf.as_mut_slice())?; + self.handle + .xsd_read_exact(result_buf.as_mut_slice()) + .await?; let result_header = bytemuck::from_bytes::(&result_buf); let mut payload = vec![0u8; result_header.len as usize]; - self.handle.xsd_read_exact(payload.as_mut_slice())?; + self.handle.xsd_read_exact(payload.as_mut_slice()).await?; if result_header.typ == XSD_ERROR { let error = CString::from_vec_with_nul(payload)?; return Err(Error::ResponseError(error.into_string()?)); @@ -138,18 +145,23 @@ impl XsdSocket { Ok(response) } - pub fn send_single(&mut self, tx: u32, typ: u32, string: &str) -> Result { + pub async fn send_single(&mut self, tx: u32, typ: u32, string: &str) -> Result { let text = CString::new(string)?; let buf = text.as_bytes_with_nul(); - self.send(tx, typ, buf) + self.send(tx, typ, buf).await } - pub fn send_multiple(&mut self, tx: u32, typ: u32, array: &[&str]) -> Result { + pub async fn send_multiple( + &mut self, + tx: u32, + typ: u32, + array: &[&str], + ) -> Result { let mut buf: Vec = Vec::new(); for item in array { buf.extend_from_slice(item.as_bytes()); buf.push(0); } - self.send(tx, typ, buf.as_slice()) + self.send(tx, typ, buf.as_slice()).await } } diff --git a/libs/xen/xenstore/src/client.rs b/libs/xen/xenstore/src/client.rs index cbb90a4..58bb721 100644 --- a/libs/xen/xenstore/src/client.rs +++ b/libs/xen/xenstore/src/client.rs @@ -35,85 +35,71 @@ impl XsPermission { } } +#[allow(async_fn_in_trait)] pub trait XsdInterface { - fn list(&mut self, path: &str) -> Result>; - fn read(&mut self, path: &str) -> Result>; - fn read_string(&mut self, path: &str) -> Result; - fn write(&mut self, path: &str, data: Vec) -> Result; - fn write_string(&mut self, path: &str, data: &str) -> Result; - fn mkdir(&mut self, path: &str) -> Result; - fn rm(&mut self, path: &str) -> Result; - fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result; + async fn list(&mut self, path: &str) -> Result>; + async fn read(&mut self, path: &str) -> Result>>; + async fn read_string(&mut self, path: &str) -> Result>; + async fn write(&mut self, path: &str, data: Vec) -> Result; + async fn write_string(&mut self, path: &str, data: &str) -> Result; + async fn mkdir(&mut self, path: &str) -> Result; + async fn rm(&mut self, path: &str) -> Result; + async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result; - fn mknod(&mut self, path: &str, perms: &[XsPermission]) -> Result { - let result1 = self.write_string(path, "")?; - let result2 = self.set_perms(path, perms)?; + async fn mknod(&mut self, path: &str, perms: &[XsPermission]) -> Result { + let result1 = self.write_string(path, "").await?; + let result2 = self.set_perms(path, perms).await?; Ok(result1 && result2) } - - fn read_string_optional(&mut self, path: &str) -> Result> { - Ok(match self.read_string(path) { - Ok(value) => Some(value), - Err(error) => { - if error.is_noent_response() { - None - } else { - return Err(error); - } - } - }) - } - - fn list_any(&mut self, path: &str) -> Result> { - Ok(match self.list(path) { - Ok(value) => value, - Err(error) => { - if error.is_noent_response() { - Vec::new() - } else { - return Err(error); - } - } - }) - } } impl XsdClient { - pub fn open() -> Result { - let socket = XsdSocket::dial()?; + pub async fn open() -> Result { + let socket = XsdSocket::open().await?; Ok(XsdClient { socket }) } - fn list(&mut self, tx: u32, path: &str) -> Result> { + async fn list(&mut self, tx: u32, path: &str) -> Result> { trace!("list tx={tx} path={path}"); - let response = self.socket.send_single(tx, XSD_DIRECTORY, path)?; + let response = self.socket.send_single(tx, XSD_DIRECTORY, path).await?; response.parse_string_vec() } - fn read(&mut self, tx: u32, path: &str) -> Result> { + async fn read(&mut self, tx: u32, path: &str) -> Result>> { trace!("read tx={tx} path={path}"); - let response = self.socket.send_single(tx, XSD_READ, path)?; - Ok(response.payload) + match self.socket.send_single(tx, XSD_READ, path).await { + Ok(response) => Ok(Some(response.payload)), + Err(error) => { + if error.is_noent_response() { + Ok(None) + } else { + Err(error) + } + } + } } - fn write(&mut self, tx: u32, path: &str, data: Vec) -> Result { + async fn write(&mut self, tx: u32, path: &str, data: Vec) -> Result { trace!("write tx={tx} path={path} data={:?}", data); let mut buffer = Vec::new(); let path = CString::new(path)?; buffer.extend_from_slice(path.as_bytes_with_nul()); buffer.extend_from_slice(data.as_slice()); - let response = self.socket.send(tx, XSD_WRITE, buffer.as_slice())?; + let response = self.socket.send(tx, XSD_WRITE, buffer.as_slice()).await?; response.parse_bool() } - fn mkdir(&mut self, tx: u32, path: &str) -> Result { + async fn mkdir(&mut self, tx: u32, path: &str) -> Result { trace!("mkdir tx={tx} path={path}"); - self.socket.send_single(tx, XSD_MKDIR, path)?.parse_bool() + self.socket + .send_single(tx, XSD_MKDIR, path) + .await? + .parse_bool() } - fn rm(&mut self, tx: u32, path: &str) -> Result { + async fn rm(&mut self, tx: u32, path: &str) -> Result { trace!("rm tx={tx} path={path}"); - let result = self.socket.send_single(tx, XSD_RM, path); + let result = self.socket.send_single(tx, XSD_RM, path).await; if let Err(error) = result { if error.is_noent_response() { return Ok(true); @@ -123,7 +109,7 @@ impl XsdClient { result.unwrap().parse_bool() } - fn set_perms(&mut self, tx: u32, path: &str, perms: &[XsPermission]) -> Result { + async fn set_perms(&mut self, tx: u32, path: &str, perms: &[XsPermission]) -> Result { trace!("set_perms tx={tx} path={path} perms={:?}", perms); let mut items: Vec = Vec::new(); items.push(path.to_string()); @@ -131,36 +117,46 @@ impl XsdClient { items.push(perm.encode()?); } let items_str: Vec<&str> = items.iter().map(|x| x.as_str()).collect(); - let response = self.socket.send_multiple(tx, XSD_SET_PERMS, &items_str)?; + let response = self + .socket + .send_multiple(tx, XSD_SET_PERMS, &items_str) + .await?; response.parse_bool() } - pub fn transaction(&mut self) -> Result { + pub async fn transaction(&mut self) -> Result { trace!("transaction start"); - let response = self.socket.send_single(0, XSD_TRANSACTION_START, "")?; + let response = self + .socket + .send_single(0, XSD_TRANSACTION_START, "") + .await?; let str = response.parse_string()?; let tx = str.parse::()?; Ok(XsdTransaction { client: self, tx }) } - pub fn get_domain_path(&mut self, domid: u32) -> Result { - let response = - self.socket - .send_single(0, XSD_GET_DOMAIN_PATH, domid.to_string().as_str())?; + pub async fn get_domain_path(&mut self, domid: u32) -> Result { + let response = self + .socket + .send_single(0, XSD_GET_DOMAIN_PATH, domid.to_string().as_str()) + .await?; response.parse_string() } - pub fn introduce_domain(&mut self, domid: u32, mfn: u64, evtchn: u32) -> Result { + pub async fn introduce_domain(&mut self, domid: u32, mfn: u64, evtchn: u32) -> Result { trace!("introduce domain domid={domid} mfn={mfn} evtchn={evtchn}"); - let response = self.socket.send_multiple( - 0, - XSD_INTRODUCE, - &[ - domid.to_string().as_str(), - mfn.to_string().as_str(), - evtchn.to_string().as_str(), - ], - )?; + let response = self + .socket + .send_multiple( + 0, + XSD_INTRODUCE, + &[ + domid.to_string().as_str(), + mfn.to_string().as_str(), + evtchn.to_string().as_str(), + ], + ) + .await?; response.parse_bool() } } @@ -171,89 +167,104 @@ pub struct XsdTransaction<'a> { } impl XsdInterface for XsdClient { - fn list(&mut self, path: &str) -> Result> { - self.list(0, path) + async fn list(&mut self, path: &str) -> Result> { + self.list(0, path).await } - fn read(&mut self, path: &str) -> Result> { - self.read(0, path) + async fn read(&mut self, path: &str) -> Result>> { + self.read(0, path).await } - fn read_string(&mut self, path: &str) -> Result { - Ok(String::from_utf8(self.read(0, path)?)?) + async fn read_string(&mut self, path: &str) -> Result> { + match self.read(0, path).await { + Ok(value) => match value { + Some(value) => Ok(Some(String::from_utf8(value)?)), + None => Ok(None), + }, + Err(error) => Err(error), + } } - fn write(&mut self, path: &str, data: Vec) -> Result { - self.write(0, path, data) + async fn write(&mut self, path: &str, data: Vec) -> Result { + self.write(0, path, data).await } - fn write_string(&mut self, path: &str, data: &str) -> Result { - self.write(0, path, data.as_bytes().to_vec()) + async fn write_string(&mut self, path: &str, data: &str) -> Result { + self.write(0, path, data.as_bytes().to_vec()).await } - fn mkdir(&mut self, path: &str) -> Result { - self.mkdir(0, path) + async fn mkdir(&mut self, path: &str) -> Result { + self.mkdir(0, path).await } - fn rm(&mut self, path: &str) -> Result { - self.rm(0, path) + async fn rm(&mut self, path: &str) -> Result { + self.rm(0, path).await } - fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { - self.set_perms(0, path, perms) + async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { + self.set_perms(0, path, perms).await } } impl XsdInterface for XsdTransaction<'_> { - fn list(&mut self, path: &str) -> Result> { - self.client.list(self.tx, path) + async fn list(&mut self, path: &str) -> Result> { + self.client.list(self.tx, path).await } - fn read(&mut self, path: &str) -> Result> { - self.client.read(self.tx, path) + async fn read(&mut self, path: &str) -> Result>> { + self.client.read(self.tx, path).await } - fn read_string(&mut self, path: &str) -> Result { - Ok(String::from_utf8(self.client.read(self.tx, path)?)?) + async fn read_string(&mut self, path: &str) -> Result> { + match self.client.read(self.tx, path).await { + Ok(value) => match value { + Some(value) => Ok(Some(String::from_utf8(value)?)), + None => Ok(None), + }, + Err(error) => Err(error), + } } - fn write(&mut self, path: &str, data: Vec) -> Result { - self.client.write(self.tx, path, data) + async fn write(&mut self, path: &str, data: Vec) -> Result { + self.client.write(self.tx, path, data).await } - fn write_string(&mut self, path: &str, data: &str) -> Result { - self.client.write(self.tx, path, data.as_bytes().to_vec()) + async fn write_string(&mut self, path: &str, data: &str) -> Result { + self.client + .write(self.tx, path, data.as_bytes().to_vec()) + .await } - fn mkdir(&mut self, path: &str) -> Result { - self.client.mkdir(self.tx, path) + async fn mkdir(&mut self, path: &str) -> Result { + self.client.mkdir(self.tx, path).await } - fn rm(&mut self, path: &str) -> Result { - self.client.rm(self.tx, path) + async fn rm(&mut self, path: &str) -> Result { + self.client.rm(self.tx, path).await } - fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { - self.client.set_perms(self.tx, path, perms) + async fn set_perms(&mut self, path: &str, perms: &[XsPermission]) -> Result { + self.client.set_perms(self.tx, path, perms).await } } impl XsdTransaction<'_> { - pub fn end(&mut self, abort: bool) -> Result { + pub async fn end(&mut self, abort: bool) -> Result { let abort_str = if abort { "F" } else { "T" }; trace!("transaction end abort={}", abort); self.client .socket - .send_single(self.tx, XSD_TRANSACTION_END, abort_str)? + .send_single(self.tx, XSD_TRANSACTION_END, abort_str) + .await? .parse_bool() } - pub fn commit(&mut self) -> Result { - self.end(false) + pub async fn commit(&mut self) -> Result { + self.end(false).await } - pub fn abort(&mut self) -> Result { - self.end(true) + pub async fn abort(&mut self) -> Result { + self.end(true).await } } diff --git a/network/examples/autonet.rs b/network/examples/autonet.rs index ae59a18..758a86f 100644 --- a/network/examples/autonet.rs +++ b/network/examples/autonet.rs @@ -1,13 +1,15 @@ -use std::{thread::sleep, time::Duration}; +use std::time::Duration; use anyhow::Result; use kratanet::autonet::AutoNetworkCollector; +use tokio::time::sleep; -fn main() -> Result<()> { - let mut collector = AutoNetworkCollector::new()?; +#[tokio::main] +async fn main() -> Result<()> { + let mut collector = AutoNetworkCollector::new().await?; loop { - let changeset = collector.read_changes()?; + let changeset = collector.read_changes().await?; println!("{:?}", changeset); - sleep(Duration::from_secs(2)); + sleep(Duration::from_secs(2)).await; } } diff --git a/network/src/autonet.rs b/network/src/autonet.rs index ad393b3..bd5e7b5 100644 --- a/network/src/autonet.rs +++ b/network/src/autonet.rs @@ -37,23 +37,23 @@ pub struct AutoNetworkChangeset { } impl AutoNetworkCollector { - pub fn new() -> Result { + pub async fn new() -> Result { Ok(AutoNetworkCollector { - client: XsdClient::open()?, + client: XsdClient::open().await?, known: HashMap::new(), }) } - pub fn read(&mut self) -> Result> { + pub async fn read(&mut self) -> Result> { let mut networks = Vec::new(); - let mut tx = self.client.transaction()?; - for domid_string in tx.list_any("/local/domain")? { + let mut tx = self.client.transaction().await?; + for domid_string in tx.list("/local/domain").await? { let Ok(domid) = domid_string.parse::() else { continue; }; let dom_path = format!("/local/domain/{}", domid_string); - let Some(uuid_string) = tx.read_string_optional(&format!("{}/krata/uuid", dom_path))? + let Some(uuid_string) = tx.read_string(&format!("{}/krata/uuid", dom_path)).await? else { continue; }; @@ -63,13 +63,13 @@ impl AutoNetworkCollector { }; let Ok(guest) = - AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "guest") + AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "guest").await else { continue; }; let Ok(gateway) = - AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "gateway") + AutoNetworkCollector::read_network_side(uuid, &mut tx, &dom_path, "gateway").await else { continue; }; @@ -81,18 +81,18 @@ impl AutoNetworkCollector { gateway, }); } - tx.commit()?; + tx.commit().await?; Ok(networks) } - fn read_network_side( + async fn read_network_side( uuid: Uuid, tx: &mut XsdTransaction<'_>, dom_path: &str, side: &str, ) -> Result { let side_path = format!("{}/krata/network/{}", dom_path, side); - let Some(ipv4) = tx.read_string_optional(&format!("{}/ipv4", side_path))? else { + let Some(ipv4) = tx.read_string(&format!("{}/ipv4", side_path)).await? else { return Err(anyhow!( "krata domain {} is missing {} ipv4 network entry", uuid, @@ -100,7 +100,7 @@ impl AutoNetworkCollector { )); }; - let Some(ipv6) = tx.read_string_optional(&format!("{}/ipv6", side_path))? else { + let Some(ipv6) = tx.read_string(&format!("{}/ipv6", side_path)).await? else { return Err(anyhow!( "krata domain {} is missing {} ipv6 network entry", uuid, @@ -108,7 +108,7 @@ impl AutoNetworkCollector { )); }; - let Some(mac) = tx.read_string_optional(&format!("{}/mac", side_path))? else { + let Some(mac) = tx.read_string(&format!("{}/mac", side_path)).await? else { return Err(anyhow!( "krata domain {} is missing {} mac address entry", uuid, @@ -146,12 +146,12 @@ impl AutoNetworkCollector { Ok(NetworkSide { ipv4, ipv6, mac }) } - pub fn read_changes(&mut self) -> Result { + pub async fn read_changes(&mut self) -> Result { let mut seen: Vec = Vec::new(); let mut added: Vec = Vec::new(); let mut removed: Vec = Vec::new(); - for network in self.read()? { + for network in self.read().await? { seen.push(network.uuid); if self.known.contains_key(&network.uuid) { continue; diff --git a/network/src/lib.rs b/network/src/lib.rs index b71e355..52183ba 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -36,9 +36,9 @@ impl NetworkService { impl NetworkService { pub async fn watch(&mut self) -> Result<()> { - let mut collector = AutoNetworkCollector::new()?; + let mut collector = AutoNetworkCollector::new().await?; loop { - let changeset = collector.read_changes()?; + let changeset = collector.read_changes().await?; self.process_network_changeset(&mut collector, changeset)?; sleep(Duration::from_secs(2)).await; }