network: implement proper backend destruction

This commit is contained in:
Alex Zenla 2024-02-13 17:01:59 +00:00
parent fdd70dee9b
commit 2988d0c5cf
No known key found for this signature in database
GPG Key ID: 067B238899B51269
2 changed files with 59 additions and 25 deletions

View File

@ -16,6 +16,7 @@ use smoltcp::time::Instant;
use smoltcp::wire::{HardwareAddress, IpCidr}; use smoltcp::wire::{HardwareAddress, IpCidr};
use tokio::select; use tokio::select;
use tokio::sync::mpsc::{channel, Receiver}; use tokio::sync::mpsc::{channel, Receiver};
use tokio::task::JoinHandle;
const TX_CHANNEL_BUFFER_LEN: usize = 300; const TX_CHANNEL_BUFFER_LEN: usize = 300;
@ -43,7 +44,7 @@ struct NetworkStack<'a> {
} }
impl NetworkStack<'_> { impl NetworkStack<'_> {
async fn poll(&mut self) -> Result<()> { async fn poll(&mut self) -> Result<bool> {
let what = select! { let what = select! {
x = self.kdev.receiver.recv() => NetworkStackSelect::Receive(x), x = self.kdev.receiver.recv() => NetworkStackSelect::Receive(x),
x = self.bridge.from_bridge_receiver.recv() => NetworkStackSelect::Send(x), x = self.bridge.from_bridge_receiver.recv() => NetworkStackSelect::Send(x),
@ -76,13 +77,14 @@ impl NetworkStack<'_> {
} }
} }
NetworkStackSelect::Receive(None) => {} NetworkStackSelect::Receive(None) | NetworkStackSelect::Send(None) => {
NetworkStackSelect::Send(None) => {} return Ok(false);
}
NetworkStackSelect::Reclaim => {} NetworkStackSelect::Reclaim => {}
} }
Ok(()) Ok(true)
} }
} }
@ -112,8 +114,11 @@ impl NetworkBackend {
pub async fn run(&self) -> Result<()> { pub async fn run(&self) -> Result<()> {
let mut stack = self.create_network_stack().await?; let mut stack = self.create_network_stack().await?;
loop { loop {
stack.poll().await?; if !stack.poll().await? {
break;
}
} }
Ok(())
} }
async fn create_network_stack(&self) -> Result<NetworkStack> { async fn create_network_stack(&self) -> Result<NetworkStack> {
@ -152,8 +157,8 @@ impl NetworkBackend {
}) })
} }
pub async fn launch(self) -> Result<()> { pub async fn launch(self) -> Result<JoinHandle<()>> {
tokio::task::spawn(async move { Ok(tokio::task::spawn(async move {
info!( info!(
"lauched network backend for hypha guest {}", "lauched network backend for hypha guest {}",
self.metadata.uuid self.metadata.uuid
@ -164,7 +169,15 @@ impl NetworkBackend {
self.metadata.uuid, error self.metadata.uuid, error
); );
} }
}); }))
Ok(()) }
}
impl Drop for NetworkBackend {
fn drop(&mut self) {
info!(
"destroyed network backend for hypha guest {}",
self.metadata.uuid
);
} }
} }

View File

@ -1,10 +1,10 @@
use std::time::Duration; use std::{collections::HashMap, time::Duration};
use anyhow::Result; use anyhow::Result;
use autonet::{AutoNetworkChangeset, AutoNetworkCollector, NetworkMetadata}; use autonet::{AutoNetworkChangeset, AutoNetworkCollector, NetworkMetadata};
use futures::{future::join_all, TryFutureExt}; use futures::{future::join_all, TryFutureExt};
use log::warn; use log::warn;
use tokio::time::sleep; use tokio::{task::JoinHandle, time::sleep};
use uuid::Uuid; use uuid::Uuid;
use vbridge::VirtualBridge; use vbridge::VirtualBridge;
@ -21,12 +21,14 @@ pub mod raw_socket;
pub mod vbridge; pub mod vbridge;
pub struct NetworkService { pub struct NetworkService {
pub backends: HashMap<Uuid, JoinHandle<()>>,
pub bridge: VirtualBridge, pub bridge: VirtualBridge,
} }
impl NetworkService { impl NetworkService {
pub fn new() -> Result<NetworkService> { pub fn new() -> Result<NetworkService> {
Ok(NetworkService { Ok(NetworkService {
backends: HashMap::new(),
bridge: VirtualBridge::new()?, bridge: VirtualBridge::new()?,
}) })
} }
@ -47,30 +49,47 @@ impl NetworkService {
collector: &mut AutoNetworkCollector, collector: &mut AutoNetworkCollector,
changeset: AutoNetworkChangeset, changeset: AutoNetworkChangeset,
) -> Result<()> { ) -> Result<()> {
for removal in &changeset.removed {
if let Some(handle) = self.backends.remove(&removal.uuid) {
handle.abort();
}
}
let futures = changeset let futures = changeset
.added .added
.iter() .iter()
.map(|metadata| { .map(|metadata| {
self.add_network_backend(metadata.clone()) self.add_network_backend(metadata)
.map_err(|x| (metadata.clone(), x)) .map_err(|x| (metadata.clone(), x))
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let failed = futures::executor::block_on(async move { let (launched, failed) = futures::executor::block_on(async move {
let mut failed: Vec<Uuid> = Vec::new(); let mut failed: Vec<Uuid> = Vec::new();
let mut launched: Vec<(Uuid, JoinHandle<()>)> = Vec::new();
let results = join_all(futures).await; let results = join_all(futures).await;
for result in results { for result in results {
if let Err((metadata, error)) = result { match result {
warn!( Ok(launch) => {
"failed to launch network backend for hypha guest {}: {}", launched.push(launch);
metadata.uuid, error }
);
failed.push(metadata.uuid); Err((metadata, error)) => {
} warn!(
"failed to launch network backend for hypha guest {}: {}",
metadata.uuid, error
);
failed.push(metadata.uuid);
}
};
} }
failed (launched, failed)
}); });
for (uuid, handle) in launched {
self.backends.insert(uuid, handle);
}
for uuid in failed { for uuid in failed {
collector.mark_unknown(uuid)?; collector.mark_unknown(uuid)?;
} }
@ -78,10 +97,12 @@ impl NetworkService {
Ok(()) Ok(())
} }
async fn add_network_backend(&self, metadata: NetworkMetadata) -> Result<()> { async fn add_network_backend(
let mut network = NetworkBackend::new(metadata, self.bridge.clone())?; &self,
metadata: &NetworkMetadata,
) -> Result<(Uuid, JoinHandle<()>)> {
let mut network = NetworkBackend::new(metadata.clone(), self.bridge.clone())?;
network.init().await?; network.init().await?;
network.launch().await?; Ok((metadata.uuid, network.launch().await?))
Ok(())
} }
} }