network: utilize bytes crate

This commit is contained in:
Alex Zenla 2024-02-12 17:01:47 +00:00
parent ddeab7610d
commit 341907a536
No known key found for this signature in database
GPG Key ID: 067B238899B51269
10 changed files with 86 additions and 65 deletions

View File

@ -53,6 +53,7 @@ udp-stream = "0.0.11"
smoltcp = "0.11.0" smoltcp = "0.11.0"
etherparse = "0.14.2" etherparse = "0.14.2"
async-trait = "0.1.77" async-trait = "0.1.77"
bytes = "1.5.0"
[workspace.dependencies.uuid] [workspace.dependencies.uuid]
version = "1.6.1" version = "1.6.1"

View File

@ -19,6 +19,7 @@ smoltcp = { workspace = true }
etherparse = { workspace = true } etherparse = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
bytes = { workspace = true }
[dependencies.advmac] [dependencies.advmac]
path = "../libs/advmac" path = "../libs/advmac"

View File

@ -6,6 +6,7 @@ use crate::proxynat::ProxyNatHandlerFactory;
use crate::raw_socket::{AsyncRawSocket, RawSocketProtocol}; use crate::raw_socket::{AsyncRawSocket, RawSocketProtocol};
use crate::vbridge::{BridgeJoinHandle, VirtualBridge}; use crate::vbridge::{BridgeJoinHandle, VirtualBridge};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::BytesMut;
use etherparse::SlicedPacket; use etherparse::SlicedPacket;
use futures::TryStreamExt; use futures::TryStreamExt;
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
@ -26,13 +27,13 @@ pub struct NetworkBackend {
enum NetworkStackSelect<'a> { enum NetworkStackSelect<'a> {
Receive(&'a [u8]), Receive(&'a [u8]),
Send(Option<Vec<u8>>), Send(Option<BytesMut>),
Reclaim, Reclaim,
} }
struct NetworkStack<'a> { struct NetworkStack<'a> {
mtu: usize, mtu: usize,
tx: Receiver<Vec<u8>>, tx: Receiver<BytesMut>,
kdev: AsyncRawSocket, kdev: AsyncRawSocket,
udev: ChannelDevice, udev: ChannelDevice,
interface: Interface, interface: Interface,
@ -53,7 +54,7 @@ impl NetworkStack<'_> {
match what { match what {
NetworkStackSelect::Receive(packet) => { NetworkStackSelect::Receive(packet) => {
if let Err(error) = self.bridge.bridge_tx_sender.try_send(packet.to_vec()) { if let Err(error) = self.bridge.bridge_tx_sender.try_send(packet.into()) {
trace!("failed to send guest packet to bridge: {}", error); trace!("failed to send guest packet to bridge: {}", error);
} }
@ -63,7 +64,7 @@ impl NetworkStack<'_> {
debug!("router failed to process packet: {}", error); debug!("router failed to process packet: {}", error);
} }
self.udev.rx = Some(packet.raw.to_vec()); self.udev.rx = Some(packet.raw.into());
self.interface self.interface
.poll(Instant::now(), &mut self.udev, &mut self.sockets); .poll(Instant::now(), &mut self.udev, &mut self.sockets);
} }
@ -120,7 +121,7 @@ impl NetworkBackend {
]; ];
let mut kdev = AsyncRawSocket::bound_to_interface(&interface, RawSocketProtocol::Ethernet)?; let mut kdev = AsyncRawSocket::bound_to_interface(&interface, RawSocketProtocol::Ethernet)?;
let mtu = kdev.mtu_of_interface(&interface)?; let mtu = kdev.mtu_of_interface(&interface)?;
let (tx_sender, tx_receiver) = channel::<Vec<u8>>(100); let (tx_sender, tx_receiver) = channel::<BytesMut>(100);
let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone()); let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone());
let mac = self.metadata.gateway.mac; let mac = self.metadata.gateway.mac;
let nat = NatRouter::new(mtu, proxy, mac, addresses.clone(), tx_sender.clone()); let nat = NatRouter::new(mtu, proxy, mac, addresses.clone(), tx_sender.clone());

View File

@ -1,27 +1,32 @@
use bytes::BytesMut;
// Referenced https://github.com/vi/wgslirpy/blob/master/crates/libwgslirpy/src/channelized_smoltcp_device.rs // Referenced https://github.com/vi/wgslirpy/blob/master/crates/libwgslirpy/src/channelized_smoltcp_device.rs
use log::{debug, warn}; use log::{debug, warn};
use smoltcp::phy::{Checksum, Device, Medium}; use smoltcp::phy::{Checksum, Device, Medium};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
const TEAR_OFF_BUFFER_SIZE: usize = 65536;
pub struct ChannelDevice { pub struct ChannelDevice {
pub mtu: usize, pub mtu: usize,
pub medium: Medium, pub medium: Medium,
pub tx: Sender<Vec<u8>>, pub tx: Sender<BytesMut>,
pub rx: Option<Vec<u8>>, pub rx: Option<BytesMut>,
tear_off_buffer: BytesMut,
} }
impl ChannelDevice { impl ChannelDevice {
pub fn new(mtu: usize, medium: Medium, tx: Sender<Vec<u8>>) -> Self { pub fn new(mtu: usize, medium: Medium, tx: Sender<BytesMut>) -> Self {
Self { Self {
mtu, mtu,
medium, medium,
tx, tx,
rx: None, rx: None,
tear_off_buffer: BytesMut::with_capacity(TEAR_OFF_BUFFER_SIZE),
} }
} }
} }
pub struct RxToken(pub Vec<u8>); pub struct RxToken(pub BytesMut);
impl Device for ChannelDevice { impl Device for ChannelDevice {
type RxToken<'a> = RxToken where Self: 'a; type RxToken<'a> = RxToken where Self: 'a;
@ -69,11 +74,16 @@ impl<'a> smoltcp::phy::TxToken for &'a mut ChannelDevice {
where where
F: FnOnce(&mut [u8]) -> R, F: FnOnce(&mut [u8]) -> R,
{ {
let mut buffer = vec![0u8; len]; self.tear_off_buffer.resize(len, 0);
let result = f(&mut buffer[..]); let result = f(&mut self.tear_off_buffer[..]);
if let Err(error) = self.tx.try_send(buffer) { let chunk = self.tear_off_buffer.split();
if let Err(error) = self.tx.try_send(chunk) {
warn!("failed to transmit packet: {}", error); warn!("failed to transmit packet: {}", error);
} }
if self.tear_off_buffer.capacity() < self.mtu {
self.tear_off_buffer = BytesMut::with_capacity(TEAR_OFF_BUFFER_SIZE);
}
result result
} }
} }

View File

@ -2,6 +2,7 @@ use crate::pkt::RecvPacket;
use crate::pkt::RecvPacketIp; use crate::pkt::RecvPacketIp;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut;
use etherparse::Icmpv4Header; use etherparse::Icmpv4Header;
use etherparse::Icmpv4Type; use etherparse::Icmpv4Type;
use etherparse::Icmpv6Header; use etherparse::Icmpv6Header;
@ -54,12 +55,12 @@ impl Display for NatKey {
pub struct NatHandlerContext { pub struct NatHandlerContext {
pub mtu: usize, pub mtu: usize,
pub key: NatKey, pub key: NatKey,
tx_sender: Sender<Vec<u8>>, tx_sender: Sender<BytesMut>,
reclaim_sender: Sender<NatKey>, reclaim_sender: Sender<NatKey>,
} }
impl NatHandlerContext { impl NatHandlerContext {
pub fn try_send(&self, buffer: Vec<u8>) -> Result<()> { pub fn try_send(&self, buffer: BytesMut) -> Result<()> {
self.tx_sender.try_send(buffer)?; self.tx_sender.try_send(buffer)?;
Ok(()) Ok(())
} }
@ -104,7 +105,7 @@ pub struct NatRouter {
local_cidrs: Vec<IpCidr>, local_cidrs: Vec<IpCidr>,
factory: Box<dyn NatHandlerFactory>, factory: Box<dyn NatHandlerFactory>,
table: NatTable, table: NatTable,
tx_sender: Sender<Vec<u8>>, tx_sender: Sender<BytesMut>,
reclaim_sender: Sender<NatKey>, reclaim_sender: Sender<NatKey>,
reclaim_receiver: Receiver<NatKey>, reclaim_receiver: Receiver<NatKey>,
} }
@ -115,7 +116,7 @@ impl NatRouter {
factory: Box<dyn NatHandlerFactory>, factory: Box<dyn NatHandlerFactory>,
local_mac: EthernetAddress, local_mac: EthernetAddress,
local_cidrs: Vec<IpCidr>, local_cidrs: Vec<IpCidr>,
tx_sender: Sender<Vec<u8>>, tx_sender: Sender<BytesMut>,
) -> Self { ) -> Self {
let (reclaim_sender, reclaim_receiver) = channel(4); let (reclaim_sender, reclaim_receiver) = channel(4);
Self { Self {

View File

@ -5,6 +5,7 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut;
use etherparse::{ use etherparse::{
IcmpEchoHeader, Icmpv4Header, Icmpv4Type, Icmpv6Header, Icmpv6Type, IpNumber, Ipv4Slice, IcmpEchoHeader, Icmpv4Header, Icmpv4Type, Icmpv6Header, Icmpv6Type, IpNumber, Ipv4Slice,
Ipv6Slice, NetSlice, PacketBuilder, SlicedPacket, Ipv6Slice, NetSlice, PacketBuilder, SlicedPacket,
@ -25,7 +26,7 @@ const ICMP_PING_TIMEOUT_SECS: u64 = 20;
const ICMP_TIMEOUT_SECS: u64 = 30; const ICMP_TIMEOUT_SECS: u64 = 30;
pub struct ProxyIcmpHandler { pub struct ProxyIcmpHandler {
rx_sender: Sender<Vec<u8>>, rx_sender: Sender<BytesMut>,
} }
#[async_trait] #[async_trait]
@ -34,26 +35,26 @@ impl NatHandler for ProxyIcmpHandler {
if self.rx_sender.is_closed() { if self.rx_sender.is_closed() {
Ok(true) Ok(true)
} else { } else {
self.rx_sender.try_send(data.to_vec())?; self.rx_sender.try_send(data.into())?;
Ok(true) Ok(true)
} }
} }
} }
enum ProxyIcmpSelect { enum ProxyIcmpSelect {
Internal(Vec<u8>), Internal(BytesMut),
Close, Close,
} }
impl ProxyIcmpHandler { impl ProxyIcmpHandler {
pub fn new(rx_sender: Sender<Vec<u8>>) -> Self { pub fn new(rx_sender: Sender<BytesMut>) -> Self {
ProxyIcmpHandler { rx_sender } ProxyIcmpHandler { rx_sender }
} }
pub async fn spawn( pub async fn spawn(
&mut self, &mut self,
context: NatHandlerContext, context: NatHandlerContext,
rx_receiver: Receiver<Vec<u8>>, rx_receiver: Receiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
let client = IcmpClient::new(match context.key.external_ip.addr { let client = IcmpClient::new(match context.key.external_ip.addr {
IpAddress::Ipv4(_) => IcmpProtocol::Icmpv4, IpAddress::Ipv4(_) => IcmpProtocol::Icmpv4,
@ -69,7 +70,7 @@ impl ProxyIcmpHandler {
async fn process( async fn process(
client: IcmpClient, client: IcmpClient,
mut rx_receiver: Receiver<Vec<u8>>, mut rx_receiver: Receiver<BytesMut>,
context: NatHandlerContext, context: NatHandlerContext,
) -> Result<()> { ) -> Result<()> {
loop { loop {
@ -222,7 +223,7 @@ impl ProxyIcmpHandler {
let packet = packet.icmpv4_echo_reply(echo.id, echo.seq); let packet = packet.icmpv4_echo_reply(echo.id, echo.seq);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
packet.write(&mut buffer, &payload)?; packet.write(&mut buffer, &payload)?;
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit icmp packet: {}", error); debug!("failed to transmit icmp packet: {}", error);
} }
Ok(()) Ok(())
@ -265,7 +266,7 @@ impl ProxyIcmpHandler {
let packet = packet.icmpv6_echo_reply(echo.id, echo.seq); let packet = packet.icmpv6_echo_reply(echo.id, echo.seq);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
packet.write(&mut buffer, &payload)?; packet.write(&mut buffer, &payload)?;
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit icmp packet: {}", error); debug!("failed to transmit icmp packet: {}", error);
} }
Ok(()) Ok(())

View File

@ -1,5 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut;
use log::warn; use log::warn;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
@ -16,6 +17,8 @@ mod icmp;
mod tcp; mod tcp;
mod udp; mod udp;
const RX_CHANNEL_BOUND: usize = 300;
pub struct ProxyNatHandlerFactory {} pub struct ProxyNatHandlerFactory {}
impl Default for ProxyNatHandlerFactory { impl Default for ProxyNatHandlerFactory {
@ -35,7 +38,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory {
async fn nat(&self, context: NatHandlerContext) -> Option<Box<dyn NatHandler>> { async fn nat(&self, context: NatHandlerContext) -> Option<Box<dyn NatHandler>> {
match context.key.protocol { match context.key.protocol {
NatKeyProtocol::Udp => { NatKeyProtocol::Udp => {
let (rx_sender, rx_receiver) = channel::<Vec<u8>>(4); let (rx_sender, rx_receiver) = channel::<BytesMut>(RX_CHANNEL_BOUND);
let mut handler = ProxyUdpHandler::new(rx_sender); let mut handler = ProxyUdpHandler::new(rx_sender);
if let Err(error) = handler.spawn(context, rx_receiver).await { if let Err(error) = handler.spawn(context, rx_receiver).await {
@ -47,7 +50,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory {
} }
NatKeyProtocol::Icmp => { NatKeyProtocol::Icmp => {
let (rx_sender, rx_receiver) = channel::<Vec<u8>>(300); let (rx_sender, rx_receiver) = channel::<BytesMut>(RX_CHANNEL_BOUND);
let mut handler = ProxyIcmpHandler::new(rx_sender); let mut handler = ProxyIcmpHandler::new(rx_sender);
if let Err(error) = handler.spawn(context, rx_receiver).await { if let Err(error) = handler.spawn(context, rx_receiver).await {
@ -59,7 +62,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory {
} }
NatKeyProtocol::Tcp => { NatKeyProtocol::Tcp => {
let (rx_sender, rx_receiver) = channel::<Vec<u8>>(300); let (rx_sender, rx_receiver) = channel::<BytesMut>(RX_CHANNEL_BOUND);
let mut handler = ProxyTcpHandler::new(rx_sender); let mut handler = ProxyTcpHandler::new(rx_sender);
if let Err(error) = handler.spawn(context, rx_receiver).await { if let Err(error) = handler.spawn(context, rx_receiver).await {

View File

@ -5,6 +5,7 @@ use std::{
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut;
use etherparse::{EtherType, Ethernet2Header}; use etherparse::{EtherType, Ethernet2Header};
use log::{debug, warn}; use log::{debug, warn};
use smoltcp::{ use smoltcp::{
@ -32,7 +33,7 @@ const TCP_ACCEPT_TIMEOUT_SECS: u64 = 120;
const TCP_DANGLE_TIMEOUT_SECS: u64 = 10; const TCP_DANGLE_TIMEOUT_SECS: u64 = 10;
pub struct ProxyTcpHandler { pub struct ProxyTcpHandler {
rx_sender: Sender<Vec<u8>>, rx_sender: Sender<BytesMut>,
} }
#[async_trait] #[async_trait]
@ -41,7 +42,7 @@ impl NatHandler for ProxyTcpHandler {
if self.rx_sender.is_closed() { if self.rx_sender.is_closed() {
Ok(false) Ok(false)
} else { } else {
self.rx_sender.try_send(data.to_vec())?; self.rx_sender.try_send(data.into())?;
Ok(true) Ok(true)
} }
} }
@ -49,8 +50,8 @@ impl NatHandler for ProxyTcpHandler {
#[derive(Debug)] #[derive(Debug)]
enum ProxyTcpAcceptSelect { enum ProxyTcpAcceptSelect {
Internal(Vec<u8>), Internal(BytesMut),
TxIpPacket(Vec<u8>), TxIpPacket(BytesMut),
TimePassed, TimePassed,
DoNothing, DoNothing,
Close, Close,
@ -60,8 +61,8 @@ enum ProxyTcpAcceptSelect {
enum ProxyTcpDataSelect { enum ProxyTcpDataSelect {
ExternalRecv(usize), ExternalRecv(usize),
ExternalSent(usize), ExternalSent(usize),
InternalRecv(Vec<u8>), InternalRecv(BytesMut),
TxIpPacket(Vec<u8>), TxIpPacket(BytesMut),
TimePassed, TimePassed,
DoNothing, DoNothing,
Close, Close,
@ -69,20 +70,20 @@ enum ProxyTcpDataSelect {
#[derive(Debug)] #[derive(Debug)]
enum ProxyTcpFinishSelect { enum ProxyTcpFinishSelect {
InternalRecv(Vec<u8>), InternalRecv(BytesMut),
TxIpPacket(Vec<u8>), TxIpPacket(BytesMut),
Close, Close,
} }
impl ProxyTcpHandler { impl ProxyTcpHandler {
pub fn new(rx_sender: Sender<Vec<u8>>) -> Self { pub fn new(rx_sender: Sender<BytesMut>) -> Self {
ProxyTcpHandler { rx_sender } ProxyTcpHandler { rx_sender }
} }
pub async fn spawn( pub async fn spawn(
&mut self, &mut self,
context: NatHandlerContext, context: NatHandlerContext,
rx_receiver: Receiver<Vec<u8>>, rx_receiver: Receiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
let external_addr = match context.key.external_ip.addr { let external_addr = match context.key.external_ip.addr {
IpAddress::Ipv4(addr) => { IpAddress::Ipv4(addr) => {
@ -105,9 +106,9 @@ impl ProxyTcpHandler {
async fn process( async fn process(
context: NatHandlerContext, context: NatHandlerContext,
mut external_socket: TcpStream, mut external_socket: TcpStream,
mut rx_receiver: Receiver<Vec<u8>>, mut rx_receiver: Receiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
let (ip_sender, mut ip_receiver) = channel::<Vec<u8>>(300); let (ip_sender, mut ip_receiver) = channel::<BytesMut>(300);
let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE]; let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE];
let mut device = ChannelDevice::new( let mut device = ChannelDevice::new(
@ -197,7 +198,7 @@ impl ProxyTcpHandler {
ProxyTcpAcceptSelect::Internal(data) => { ProxyTcpAcceptSelect::Internal(data) => {
let (_, payload) = Ethernet2Header::from_slice(&data)?; let (_, payload) = Ethernet2Header::from_slice(&data)?;
device.rx = Some(payload.to_vec()); device.rx = Some(payload.into());
iface.poll(Instant::now(), &mut device, &mut sockets); iface.poll(Instant::now(), &mut device, &mut sockets);
} }
@ -213,7 +214,7 @@ impl ProxyTcpHandler {
}; };
header.write(&mut buffer)?; header.write(&mut buffer)?;
buffer.extend_from_slice(&payload); buffer.extend_from_slice(&payload);
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit tcp packet: {}", error); debug!("failed to transmit tcp packet: {}", error);
} }
} }
@ -370,7 +371,7 @@ impl ProxyTcpHandler {
ProxyTcpDataSelect::InternalRecv(data) => { ProxyTcpDataSelect::InternalRecv(data) => {
let (_, payload) = Ethernet2Header::from_slice(&data)?; let (_, payload) = Ethernet2Header::from_slice(&data)?;
device.rx = Some(payload.to_vec()); device.rx = Some(payload.into());
iface.poll(Instant::now(), &mut device, &mut sockets); iface.poll(Instant::now(), &mut device, &mut sockets);
} }
@ -386,7 +387,7 @@ impl ProxyTcpHandler {
}; };
header.write(&mut buffer)?; header.write(&mut buffer)?;
buffer.extend_from_slice(&payload); buffer.extend_from_slice(&payload);
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit tcp packet: {}", error); debug!("failed to transmit tcp packet: {}", error);
} }
} }
@ -430,7 +431,7 @@ impl ProxyTcpHandler {
match selection { match selection {
ProxyTcpFinishSelect::InternalRecv(data) => { ProxyTcpFinishSelect::InternalRecv(data) => {
let (_, payload) = Ethernet2Header::from_slice(&data)?; let (_, payload) = Ethernet2Header::from_slice(&data)?;
device.rx = Some(payload.to_vec()); device.rx = Some(payload.into());
iface.poll(Instant::now(), &mut device, &mut sockets); iface.poll(Instant::now(), &mut device, &mut sockets);
} }
@ -446,7 +447,7 @@ impl ProxyTcpHandler {
}; };
header.write(&mut buffer)?; header.write(&mut buffer)?;
buffer.extend_from_slice(&payload); buffer.extend_from_slice(&payload);
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit tcp packet: {}", error); debug!("failed to transmit tcp packet: {}", error);
} }
} }

View File

@ -5,6 +5,7 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut;
use etherparse::{PacketBuilder, SlicedPacket, UdpSlice}; use etherparse::{PacketBuilder, SlicedPacket, UdpSlice};
use log::{debug, warn}; use log::{debug, warn};
use smoltcp::wire::IpAddress; use smoltcp::wire::IpAddress;
@ -20,7 +21,7 @@ use crate::nat::{NatHandler, NatHandlerContext};
const UDP_TIMEOUT_SECS: u64 = 60; const UDP_TIMEOUT_SECS: u64 = 60;
pub struct ProxyUdpHandler { pub struct ProxyUdpHandler {
rx_sender: Sender<Vec<u8>>, rx_sender: Sender<BytesMut>,
} }
#[async_trait] #[async_trait]
@ -29,7 +30,7 @@ impl NatHandler for ProxyUdpHandler {
if self.rx_sender.is_closed() { if self.rx_sender.is_closed() {
Ok(true) Ok(true)
} else { } else {
self.rx_sender.try_send(data.to_vec())?; self.rx_sender.try_send(data.into())?;
Ok(true) Ok(true)
} }
} }
@ -37,19 +38,19 @@ impl NatHandler for ProxyUdpHandler {
enum ProxyUdpSelect { enum ProxyUdpSelect {
External(usize), External(usize),
Internal(Vec<u8>), Internal(BytesMut),
Close, Close,
} }
impl ProxyUdpHandler { impl ProxyUdpHandler {
pub fn new(rx_sender: Sender<Vec<u8>>) -> Self { pub fn new(rx_sender: Sender<BytesMut>) -> Self {
ProxyUdpHandler { rx_sender } ProxyUdpHandler { rx_sender }
} }
pub async fn spawn( pub async fn spawn(
&mut self, &mut self,
context: NatHandlerContext, context: NatHandlerContext,
rx_receiver: Receiver<Vec<u8>>, rx_receiver: Receiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
let external_addr = match context.key.external_ip.addr { let external_addr = match context.key.external_ip.addr {
IpAddress::Ipv4(addr) => { IpAddress::Ipv4(addr) => {
@ -72,7 +73,7 @@ impl ProxyUdpHandler {
async fn process( async fn process(
context: NatHandlerContext, context: NatHandlerContext,
mut socket: UdpStream, mut socket: UdpStream,
mut rx_receiver: Receiver<Vec<u8>>, mut rx_receiver: Receiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
let mut external_buffer = vec![0u8; 2048]; let mut external_buffer = vec![0u8; 2048];
@ -108,7 +109,7 @@ impl ProxyUdpHandler {
packet.udp(context.key.external_ip.port, context.key.client_ip.port); packet.udp(context.key.external_ip.port, context.key.client_ip.port);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
packet.write(&mut buffer, data)?; packet.write(&mut buffer, data)?;
if let Err(error) = context.try_send(buffer) { if let Err(error) = context.try_send(buffer.as_slice().into()) {
debug!("failed to transmit udp packet: {}", error); debug!("failed to transmit udp packet: {}", error);
} }
} }

View File

@ -1,4 +1,5 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::BytesMut;
use etherparse::Ethernet2Header; use etherparse::Ethernet2Header;
use log::{debug, trace, warn}; use log::{debug, trace, warn};
use smoltcp::wire::EthernetAddress; use smoltcp::wire::EthernetAddress;
@ -26,33 +27,33 @@ const BROADCAST_RX_QUEUE_LEN: usize = 4;
#[derive(Debug)] #[derive(Debug)]
struct BridgeMember { struct BridgeMember {
pub bridge_rx_sender: Sender<Vec<u8>>, pub bridge_rx_sender: Sender<BytesMut>,
} }
pub struct BridgeJoinHandle { pub struct BridgeJoinHandle {
pub bridge_tx_sender: Sender<Vec<u8>>, pub bridge_tx_sender: Sender<BytesMut>,
pub bridge_rx_receiver: Receiver<Vec<u8>>, pub bridge_rx_receiver: Receiver<BytesMut>,
pub broadcast_rx_receiver: BroadcastReceiver<Vec<u8>>, pub broadcast_rx_receiver: BroadcastReceiver<BytesMut>,
} }
type VirtualBridgeMemberMap = Arc<Mutex<HashMap<[u8; 6], BridgeMember>>>; type VirtualBridgeMemberMap = Arc<Mutex<HashMap<[u8; 6], BridgeMember>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct VirtualBridge { pub struct VirtualBridge {
bridge_tx_sender: Sender<Vec<u8>>,
members: VirtualBridgeMemberMap, members: VirtualBridgeMemberMap,
broadcast_rx_sender: BroadcastSender<Vec<u8>>, bridge_tx_sender: Sender<BytesMut>,
broadcast_rx_sender: BroadcastSender<BytesMut>,
_task: Arc<JoinHandle<()>>, _task: Arc<JoinHandle<()>>,
} }
enum VirtualBridgeSelect { enum VirtualBridgeSelect {
BroadcastSent(Option<Vec<u8>>), BroadcastSent(Option<BytesMut>),
PacketReceived(Option<Vec<u8>>), PacketReceived(Option<BytesMut>),
} }
impl VirtualBridge { impl VirtualBridge {
pub fn new() -> Result<VirtualBridge> { pub fn new() -> Result<VirtualBridge> {
let (bridge_tx_sender, bridge_tx_receiver) = channel::<Vec<u8>>(BRIDGE_TX_QUEUE_LEN); let (bridge_tx_sender, bridge_tx_receiver) = channel::<BytesMut>(BRIDGE_TX_QUEUE_LEN);
let (broadcast_rx_sender, broadcast_rx_receiver) = let (broadcast_rx_sender, broadcast_rx_receiver) =
broadcast_channel(BROADCAST_RX_QUEUE_LEN); broadcast_channel(BROADCAST_RX_QUEUE_LEN);
@ -83,7 +84,7 @@ impl VirtualBridge {
} }
pub async fn join(&self, mac: EthernetAddress) -> Result<BridgeJoinHandle> { pub async fn join(&self, mac: EthernetAddress) -> Result<BridgeJoinHandle> {
let (bridge_rx_sender, bridge_rx_receiver) = channel::<Vec<u8>>(BRIDGE_RX_QUEUE_LEN); let (bridge_rx_sender, bridge_rx_receiver) = channel::<BytesMut>(BRIDGE_RX_QUEUE_LEN);
let member = BridgeMember { bridge_rx_sender }; let member = BridgeMember { bridge_rx_sender };
match self.members.lock().await.entry(mac.0) { match self.members.lock().await.entry(mac.0) {
@ -107,9 +108,9 @@ impl VirtualBridge {
async fn process( async fn process(
members: VirtualBridgeMemberMap, members: VirtualBridgeMemberMap,
mut bridge_tx_receiver: Receiver<Vec<u8>>, mut bridge_tx_receiver: Receiver<BytesMut>,
broadcast_rx_sender: BroadcastSender<Vec<u8>>, broadcast_rx_sender: BroadcastSender<BytesMut>,
mut broadcast_rx_receiver: BroadcastReceiver<Vec<u8>>, mut broadcast_rx_receiver: BroadcastReceiver<BytesMut>,
) -> Result<()> { ) -> Result<()> {
loop { loop {
let selection = select! { let selection = select! {