network: implement bridge leave support

This commit is contained in:
Alex Zenla
2024-02-13 18:18:34 +00:00
parent 21707daa98
commit 3524683ab4

View File

@ -22,6 +22,7 @@ use tokio::{
const TO_BRIDGE_QUEUE_LEN: usize = 50; const TO_BRIDGE_QUEUE_LEN: usize = 50;
const FROM_BRIDGE_QUEUE_LEN: usize = 50; const FROM_BRIDGE_QUEUE_LEN: usize = 50;
const BROADCAST_QUEUE_LEN: usize = 50; const BROADCAST_QUEUE_LEN: usize = 50;
const MEMBER_LEAVE_QUEUE_LEN: usize = 10;
#[derive(Debug)] #[derive(Debug)]
struct BridgeMember { struct BridgeMember {
@ -29,29 +30,46 @@ struct BridgeMember {
} }
pub struct BridgeJoinHandle { pub struct BridgeJoinHandle {
mac: EthernetAddress,
pub to_bridge_sender: Sender<BytesMut>, pub to_bridge_sender: Sender<BytesMut>,
pub from_bridge_receiver: Receiver<BytesMut>, pub from_bridge_receiver: Receiver<BytesMut>,
pub from_broadcast_receiver: BroadcastReceiver<BytesMut>, pub from_broadcast_receiver: BroadcastReceiver<BytesMut>,
member_leave_sender: Sender<EthernetAddress>,
}
impl Drop for BridgeJoinHandle {
fn drop(&mut self) {
if let Err(error) = self.member_leave_sender.try_send(self.mac) {
warn!(
"virtual bridge member {} failed to leave: {}",
self.mac, error
);
}
}
} }
type VirtualBridgeMemberMap = Arc<Mutex<HashMap<EthernetAddress, BridgeMember>>>; type VirtualBridgeMemberMap = Arc<Mutex<HashMap<EthernetAddress, BridgeMember>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct VirtualBridge { pub struct VirtualBridge {
members: VirtualBridgeMemberMap,
to_bridge_sender: Sender<BytesMut>, to_bridge_sender: Sender<BytesMut>,
from_broadcast_sender: BroadcastSender<BytesMut>, from_broadcast_sender: BroadcastSender<BytesMut>,
member_leave_sender: Sender<EthernetAddress>,
members: VirtualBridgeMemberMap,
_task: Arc<JoinHandle<()>>, _task: Arc<JoinHandle<()>>,
} }
enum VirtualBridgeSelect { enum VirtualBridgeSelect {
BroadcastSent(Option<BytesMut>), BroadcastSent(Option<BytesMut>),
PacketReceived(Option<BytesMut>), PacketReceived(Option<BytesMut>),
MemberLeave(Option<EthernetAddress>),
} }
impl VirtualBridge { impl VirtualBridge {
pub fn new() -> Result<VirtualBridge> { pub fn new() -> Result<VirtualBridge> {
let (to_bridge_sender, to_bridge_receiver) = channel::<BytesMut>(TO_BRIDGE_QUEUE_LEN); let (to_bridge_sender, to_bridge_receiver) = channel::<BytesMut>(TO_BRIDGE_QUEUE_LEN);
let (member_leave_sender, member_leave_reciever) =
channel::<EthernetAddress>(MEMBER_LEAVE_QUEUE_LEN);
let (from_broadcast_sender, from_broadcast_receiver) = let (from_broadcast_sender, from_broadcast_receiver) =
broadcast_channel(BROADCAST_QUEUE_LEN); broadcast_channel(BROADCAST_QUEUE_LEN);
@ -62,6 +80,7 @@ impl VirtualBridge {
tokio::task::spawn(async move { tokio::task::spawn(async move {
if let Err(error) = VirtualBridge::process( if let Err(error) = VirtualBridge::process(
members, members,
member_leave_reciever,
to_bridge_receiver, to_bridge_receiver,
broadcast_rx_sender, broadcast_rx_sender,
from_broadcast_receiver, from_broadcast_receiver,
@ -75,8 +94,9 @@ impl VirtualBridge {
Ok(VirtualBridge { Ok(VirtualBridge {
to_bridge_sender, to_bridge_sender,
members,
from_broadcast_sender, from_broadcast_sender,
member_leave_sender,
members,
_task: Arc::new(handle), _task: Arc::new(handle),
}) })
} }
@ -95,6 +115,8 @@ impl VirtualBridge {
}; };
debug!("virtual bridge member {} has joined", mac); debug!("virtual bridge member {} has joined", mac);
Ok(BridgeJoinHandle { Ok(BridgeJoinHandle {
mac,
member_leave_sender: self.member_leave_sender.clone(),
from_bridge_receiver, from_bridge_receiver,
from_broadcast_receiver: self.from_broadcast_sender.subscribe(), from_broadcast_receiver: self.from_broadcast_sender.subscribe(),
to_bridge_sender: self.to_bridge_sender.clone(), to_bridge_sender: self.to_bridge_sender.clone(),
@ -103,6 +125,7 @@ impl VirtualBridge {
async fn process( async fn process(
members: VirtualBridgeMemberMap, members: VirtualBridgeMemberMap,
mut member_leave_reciever: Receiver<EthernetAddress>,
mut to_bridge_receiver: Receiver<BytesMut>, mut to_bridge_receiver: Receiver<BytesMut>,
broadcast_rx_sender: BroadcastSender<BytesMut>, broadcast_rx_sender: BroadcastSender<BytesMut>,
mut from_broadcast_receiver: BroadcastReceiver<BytesMut>, mut from_broadcast_receiver: BroadcastReceiver<BytesMut>,
@ -112,6 +135,7 @@ impl VirtualBridge {
biased; biased;
x = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent(x.ok()), x = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent(x.ok()),
x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x), x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x),
x = member_leave_reciever.recv() => VirtualBridgeSelect::MemberLeave(x),
}; };
match selection { match selection {
@ -144,10 +168,6 @@ impl VirtualBridge {
let destination = EthernetAddress(header.destination); let destination = EthernetAddress(header.destination);
if destination.is_multicast() { if destination.is_multicast() {
trace!(
"broadcasting bridge packet from {}",
EthernetAddress(header.source)
);
broadcast_rx_sender.send(packet)?; broadcast_rx_sender.send(packet)?;
continue; continue;
} }
@ -166,7 +186,14 @@ impl VirtualBridge {
} }
} }
VirtualBridgeSelect::MemberLeave(Some(mac)) => {
if members.lock().await.remove(&mac).is_some() {
debug!("virtual bridge member {} has left", mac);
}
}
VirtualBridgeSelect::PacketReceived(None) => break, VirtualBridgeSelect::PacketReceived(None) => break,
VirtualBridgeSelect::MemberLeave(None) => {}
VirtualBridgeSelect::BroadcastSent(_) => {} VirtualBridgeSelect::BroadcastSent(_) => {}
} }
} }