diff --git a/network/src/vbridge.rs b/network/src/vbridge.rs index a5fab8e..f6230fb 100644 --- a/network/src/vbridge.rs +++ b/network/src/vbridge.rs @@ -22,6 +22,7 @@ use tokio::{ const TO_BRIDGE_QUEUE_LEN: usize = 50; const FROM_BRIDGE_QUEUE_LEN: usize = 50; const BROADCAST_QUEUE_LEN: usize = 50; +const MEMBER_LEAVE_QUEUE_LEN: usize = 10; #[derive(Debug)] struct BridgeMember { @@ -29,29 +30,46 @@ struct BridgeMember { } pub struct BridgeJoinHandle { + mac: EthernetAddress, pub to_bridge_sender: Sender, pub from_bridge_receiver: Receiver, pub from_broadcast_receiver: BroadcastReceiver, + member_leave_sender: Sender, +} + +impl Drop for BridgeJoinHandle { + fn drop(&mut self) { + if let Err(error) = self.member_leave_sender.try_send(self.mac) { + warn!( + "virtual bridge member {} failed to leave: {}", + self.mac, error + ); + } + } } type VirtualBridgeMemberMap = Arc>>; #[derive(Clone)] pub struct VirtualBridge { - members: VirtualBridgeMemberMap, to_bridge_sender: Sender, from_broadcast_sender: BroadcastSender, + member_leave_sender: Sender, + members: VirtualBridgeMemberMap, _task: Arc>, } enum VirtualBridgeSelect { BroadcastSent(Option), PacketReceived(Option), + MemberLeave(Option), } impl VirtualBridge { pub fn new() -> Result { let (to_bridge_sender, to_bridge_receiver) = channel::(TO_BRIDGE_QUEUE_LEN); + let (member_leave_sender, member_leave_reciever) = + channel::(MEMBER_LEAVE_QUEUE_LEN); let (from_broadcast_sender, from_broadcast_receiver) = broadcast_channel(BROADCAST_QUEUE_LEN); @@ -62,6 +80,7 @@ impl VirtualBridge { tokio::task::spawn(async move { if let Err(error) = VirtualBridge::process( members, + member_leave_reciever, to_bridge_receiver, broadcast_rx_sender, from_broadcast_receiver, @@ -75,8 +94,9 @@ impl VirtualBridge { Ok(VirtualBridge { to_bridge_sender, - members, from_broadcast_sender, + member_leave_sender, + members, _task: Arc::new(handle), }) } @@ -95,6 +115,8 @@ impl VirtualBridge { }; debug!("virtual bridge member {} has joined", mac); Ok(BridgeJoinHandle { + mac, + member_leave_sender: self.member_leave_sender.clone(), from_bridge_receiver, from_broadcast_receiver: self.from_broadcast_sender.subscribe(), to_bridge_sender: self.to_bridge_sender.clone(), @@ -103,6 +125,7 @@ impl VirtualBridge { async fn process( members: VirtualBridgeMemberMap, + mut member_leave_reciever: Receiver, mut to_bridge_receiver: Receiver, broadcast_rx_sender: BroadcastSender, mut from_broadcast_receiver: BroadcastReceiver, @@ -112,6 +135,7 @@ impl VirtualBridge { biased; x = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent(x.ok()), x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x), + x = member_leave_reciever.recv() => VirtualBridgeSelect::MemberLeave(x), }; match selection { @@ -144,10 +168,6 @@ impl VirtualBridge { let destination = EthernetAddress(header.destination); if destination.is_multicast() { - trace!( - "broadcasting bridge packet from {}", - EthernetAddress(header.source) - ); broadcast_rx_sender.send(packet)?; continue; } @@ -166,7 +186,14 @@ impl VirtualBridge { } } + VirtualBridgeSelect::MemberLeave(Some(mac)) => { + if members.lock().await.remove(&mac).is_some() { + debug!("virtual bridge member {} has left", mac); + } + } + VirtualBridgeSelect::PacketReceived(None) => break, + VirtualBridgeSelect::MemberLeave(None) => {} VirtualBridgeSelect::BroadcastSent(_) => {} } }