diff --git a/Cargo.toml b/Cargo.toml index d8bdbc2..fa93461 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ udp-stream = "0.0.11" smoltcp = "0.11.0" etherparse = "0.14.2" async-trait = "0.1.77" +bytes = "1.5.0" [workspace.dependencies.uuid] version = "1.6.1" diff --git a/network/Cargo.toml b/network/Cargo.toml index f401bcf..5444608 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -19,6 +19,7 @@ smoltcp = { workspace = true } etherparse = { workspace = true } async-trait = { workspace = true } uuid = { workspace = true } +bytes = { workspace = true } [dependencies.advmac] path = "../libs/advmac" diff --git a/network/src/backend.rs b/network/src/backend.rs index 233b462..8d32813 100644 --- a/network/src/backend.rs +++ b/network/src/backend.rs @@ -6,6 +6,7 @@ use crate::proxynat::ProxyNatHandlerFactory; use crate::raw_socket::{AsyncRawSocket, RawSocketProtocol}; use crate::vbridge::{BridgeJoinHandle, VirtualBridge}; use anyhow::{anyhow, Result}; +use bytes::BytesMut; use etherparse::SlicedPacket; use futures::TryStreamExt; use log::{debug, info, trace, warn}; @@ -26,13 +27,13 @@ pub struct NetworkBackend { enum NetworkStackSelect<'a> { Receive(&'a [u8]), - Send(Option>), + Send(Option), Reclaim, } struct NetworkStack<'a> { mtu: usize, - tx: Receiver>, + tx: Receiver, kdev: AsyncRawSocket, udev: ChannelDevice, interface: Interface, @@ -53,7 +54,7 @@ impl NetworkStack<'_> { match what { NetworkStackSelect::Receive(packet) => { - if let Err(error) = self.bridge.bridge_tx_sender.try_send(packet.to_vec()) { + if let Err(error) = self.bridge.bridge_tx_sender.try_send(packet.into()) { trace!("failed to send guest packet to bridge: {}", error); } @@ -63,7 +64,7 @@ impl NetworkStack<'_> { debug!("router failed to process packet: {}", error); } - self.udev.rx = Some(packet.raw.to_vec()); + self.udev.rx = Some(packet.raw.into()); self.interface .poll(Instant::now(), &mut self.udev, &mut self.sockets); } @@ -120,7 +121,7 @@ impl NetworkBackend { ]; let mut kdev = AsyncRawSocket::bound_to_interface(&interface, RawSocketProtocol::Ethernet)?; let mtu = kdev.mtu_of_interface(&interface)?; - let (tx_sender, tx_receiver) = channel::>(100); + let (tx_sender, tx_receiver) = channel::(100); let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone()); let mac = self.metadata.gateway.mac; let nat = NatRouter::new(mtu, proxy, mac, addresses.clone(), tx_sender.clone()); diff --git a/network/src/chandev.rs b/network/src/chandev.rs index 17bd4f1..4369ec0 100644 --- a/network/src/chandev.rs +++ b/network/src/chandev.rs @@ -1,27 +1,32 @@ +use bytes::BytesMut; // Referenced https://github.com/vi/wgslirpy/blob/master/crates/libwgslirpy/src/channelized_smoltcp_device.rs use log::{debug, warn}; use smoltcp::phy::{Checksum, Device, Medium}; use tokio::sync::mpsc::Sender; +const TEAR_OFF_BUFFER_SIZE: usize = 65536; + pub struct ChannelDevice { pub mtu: usize, pub medium: Medium, - pub tx: Sender>, - pub rx: Option>, + pub tx: Sender, + pub rx: Option, + tear_off_buffer: BytesMut, } impl ChannelDevice { - pub fn new(mtu: usize, medium: Medium, tx: Sender>) -> Self { + pub fn new(mtu: usize, medium: Medium, tx: Sender) -> Self { Self { mtu, medium, tx, rx: None, + tear_off_buffer: BytesMut::with_capacity(TEAR_OFF_BUFFER_SIZE), } } } -pub struct RxToken(pub Vec); +pub struct RxToken(pub BytesMut); impl Device for ChannelDevice { type RxToken<'a> = RxToken where Self: 'a; @@ -69,11 +74,16 @@ impl<'a> smoltcp::phy::TxToken for &'a mut ChannelDevice { where F: FnOnce(&mut [u8]) -> R, { - let mut buffer = vec![0u8; len]; - let result = f(&mut buffer[..]); - if let Err(error) = self.tx.try_send(buffer) { + self.tear_off_buffer.resize(len, 0); + let result = f(&mut self.tear_off_buffer[..]); + let chunk = self.tear_off_buffer.split(); + if let Err(error) = self.tx.try_send(chunk) { warn!("failed to transmit packet: {}", error); } + + if self.tear_off_buffer.capacity() < self.mtu { + self.tear_off_buffer = BytesMut::with_capacity(TEAR_OFF_BUFFER_SIZE); + } result } } diff --git a/network/src/nat.rs b/network/src/nat.rs index 7401f66..af0907a 100644 --- a/network/src/nat.rs +++ b/network/src/nat.rs @@ -2,6 +2,7 @@ use crate::pkt::RecvPacket; use crate::pkt::RecvPacketIp; use anyhow::Result; use async_trait::async_trait; +use bytes::BytesMut; use etherparse::Icmpv4Header; use etherparse::Icmpv4Type; use etherparse::Icmpv6Header; @@ -54,12 +55,12 @@ impl Display for NatKey { pub struct NatHandlerContext { pub mtu: usize, pub key: NatKey, - tx_sender: Sender>, + tx_sender: Sender, reclaim_sender: Sender, } impl NatHandlerContext { - pub fn try_send(&self, buffer: Vec) -> Result<()> { + pub fn try_send(&self, buffer: BytesMut) -> Result<()> { self.tx_sender.try_send(buffer)?; Ok(()) } @@ -104,7 +105,7 @@ pub struct NatRouter { local_cidrs: Vec, factory: Box, table: NatTable, - tx_sender: Sender>, + tx_sender: Sender, reclaim_sender: Sender, reclaim_receiver: Receiver, } @@ -115,7 +116,7 @@ impl NatRouter { factory: Box, local_mac: EthernetAddress, local_cidrs: Vec, - tx_sender: Sender>, + tx_sender: Sender, ) -> Self { let (reclaim_sender, reclaim_receiver) = channel(4); Self { diff --git a/network/src/proxynat/icmp.rs b/network/src/proxynat/icmp.rs index 9bb442b..f5e5765 100644 --- a/network/src/proxynat/icmp.rs +++ b/network/src/proxynat/icmp.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::BytesMut; use etherparse::{ IcmpEchoHeader, Icmpv4Header, Icmpv4Type, Icmpv6Header, Icmpv6Type, IpNumber, Ipv4Slice, Ipv6Slice, NetSlice, PacketBuilder, SlicedPacket, @@ -25,7 +26,7 @@ const ICMP_PING_TIMEOUT_SECS: u64 = 20; const ICMP_TIMEOUT_SECS: u64 = 30; pub struct ProxyIcmpHandler { - rx_sender: Sender>, + rx_sender: Sender, } #[async_trait] @@ -34,26 +35,26 @@ impl NatHandler for ProxyIcmpHandler { if self.rx_sender.is_closed() { Ok(true) } else { - self.rx_sender.try_send(data.to_vec())?; + self.rx_sender.try_send(data.into())?; Ok(true) } } } enum ProxyIcmpSelect { - Internal(Vec), + Internal(BytesMut), Close, } impl ProxyIcmpHandler { - pub fn new(rx_sender: Sender>) -> Self { + pub fn new(rx_sender: Sender) -> Self { ProxyIcmpHandler { rx_sender } } pub async fn spawn( &mut self, context: NatHandlerContext, - rx_receiver: Receiver>, + rx_receiver: Receiver, ) -> Result<()> { let client = IcmpClient::new(match context.key.external_ip.addr { IpAddress::Ipv4(_) => IcmpProtocol::Icmpv4, @@ -69,7 +70,7 @@ impl ProxyIcmpHandler { async fn process( client: IcmpClient, - mut rx_receiver: Receiver>, + mut rx_receiver: Receiver, context: NatHandlerContext, ) -> Result<()> { loop { @@ -222,7 +223,7 @@ impl ProxyIcmpHandler { let packet = packet.icmpv4_echo_reply(echo.id, echo.seq); let mut buffer: Vec = Vec::new(); packet.write(&mut buffer, &payload)?; - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit icmp packet: {}", error); } Ok(()) @@ -265,7 +266,7 @@ impl ProxyIcmpHandler { let packet = packet.icmpv6_echo_reply(echo.id, echo.seq); let mut buffer: Vec = Vec::new(); packet.write(&mut buffer, &payload)?; - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit icmp packet: {}", error); } Ok(()) diff --git a/network/src/proxynat/mod.rs b/network/src/proxynat/mod.rs index 7cadff6..40784f4 100644 --- a/network/src/proxynat/mod.rs +++ b/network/src/proxynat/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; +use bytes::BytesMut; use log::warn; use tokio::sync::mpsc::channel; @@ -16,6 +17,8 @@ mod icmp; mod tcp; mod udp; +const RX_CHANNEL_BOUND: usize = 300; + pub struct ProxyNatHandlerFactory {} impl Default for ProxyNatHandlerFactory { @@ -35,7 +38,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory { async fn nat(&self, context: NatHandlerContext) -> Option> { match context.key.protocol { NatKeyProtocol::Udp => { - let (rx_sender, rx_receiver) = channel::>(4); + let (rx_sender, rx_receiver) = channel::(RX_CHANNEL_BOUND); let mut handler = ProxyUdpHandler::new(rx_sender); if let Err(error) = handler.spawn(context, rx_receiver).await { @@ -47,7 +50,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory { } NatKeyProtocol::Icmp => { - let (rx_sender, rx_receiver) = channel::>(300); + let (rx_sender, rx_receiver) = channel::(RX_CHANNEL_BOUND); let mut handler = ProxyIcmpHandler::new(rx_sender); if let Err(error) = handler.spawn(context, rx_receiver).await { @@ -59,7 +62,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory { } NatKeyProtocol::Tcp => { - let (rx_sender, rx_receiver) = channel::>(300); + let (rx_sender, rx_receiver) = channel::(RX_CHANNEL_BOUND); let mut handler = ProxyTcpHandler::new(rx_sender); if let Err(error) = handler.spawn(context, rx_receiver).await { diff --git a/network/src/proxynat/tcp.rs b/network/src/proxynat/tcp.rs index f835c8a..4fbeafd 100644 --- a/network/src/proxynat/tcp.rs +++ b/network/src/proxynat/tcp.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::Result; use async_trait::async_trait; +use bytes::BytesMut; use etherparse::{EtherType, Ethernet2Header}; use log::{debug, warn}; use smoltcp::{ @@ -32,7 +33,7 @@ const TCP_ACCEPT_TIMEOUT_SECS: u64 = 120; const TCP_DANGLE_TIMEOUT_SECS: u64 = 10; pub struct ProxyTcpHandler { - rx_sender: Sender>, + rx_sender: Sender, } #[async_trait] @@ -41,7 +42,7 @@ impl NatHandler for ProxyTcpHandler { if self.rx_sender.is_closed() { Ok(false) } else { - self.rx_sender.try_send(data.to_vec())?; + self.rx_sender.try_send(data.into())?; Ok(true) } } @@ -49,8 +50,8 @@ impl NatHandler for ProxyTcpHandler { #[derive(Debug)] enum ProxyTcpAcceptSelect { - Internal(Vec), - TxIpPacket(Vec), + Internal(BytesMut), + TxIpPacket(BytesMut), TimePassed, DoNothing, Close, @@ -60,8 +61,8 @@ enum ProxyTcpAcceptSelect { enum ProxyTcpDataSelect { ExternalRecv(usize), ExternalSent(usize), - InternalRecv(Vec), - TxIpPacket(Vec), + InternalRecv(BytesMut), + TxIpPacket(BytesMut), TimePassed, DoNothing, Close, @@ -69,20 +70,20 @@ enum ProxyTcpDataSelect { #[derive(Debug)] enum ProxyTcpFinishSelect { - InternalRecv(Vec), - TxIpPacket(Vec), + InternalRecv(BytesMut), + TxIpPacket(BytesMut), Close, } impl ProxyTcpHandler { - pub fn new(rx_sender: Sender>) -> Self { + pub fn new(rx_sender: Sender) -> Self { ProxyTcpHandler { rx_sender } } pub async fn spawn( &mut self, context: NatHandlerContext, - rx_receiver: Receiver>, + rx_receiver: Receiver, ) -> Result<()> { let external_addr = match context.key.external_ip.addr { IpAddress::Ipv4(addr) => { @@ -105,9 +106,9 @@ impl ProxyTcpHandler { async fn process( context: NatHandlerContext, mut external_socket: TcpStream, - mut rx_receiver: Receiver>, + mut rx_receiver: Receiver, ) -> Result<()> { - let (ip_sender, mut ip_receiver) = channel::>(300); + let (ip_sender, mut ip_receiver) = channel::(300); let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE]; let mut device = ChannelDevice::new( @@ -197,7 +198,7 @@ impl ProxyTcpHandler { ProxyTcpAcceptSelect::Internal(data) => { let (_, payload) = Ethernet2Header::from_slice(&data)?; - device.rx = Some(payload.to_vec()); + device.rx = Some(payload.into()); iface.poll(Instant::now(), &mut device, &mut sockets); } @@ -213,7 +214,7 @@ impl ProxyTcpHandler { }; header.write(&mut buffer)?; buffer.extend_from_slice(&payload); - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit tcp packet: {}", error); } } @@ -370,7 +371,7 @@ impl ProxyTcpHandler { ProxyTcpDataSelect::InternalRecv(data) => { let (_, payload) = Ethernet2Header::from_slice(&data)?; - device.rx = Some(payload.to_vec()); + device.rx = Some(payload.into()); iface.poll(Instant::now(), &mut device, &mut sockets); } @@ -386,7 +387,7 @@ impl ProxyTcpHandler { }; header.write(&mut buffer)?; buffer.extend_from_slice(&payload); - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit tcp packet: {}", error); } } @@ -430,7 +431,7 @@ impl ProxyTcpHandler { match selection { ProxyTcpFinishSelect::InternalRecv(data) => { let (_, payload) = Ethernet2Header::from_slice(&data)?; - device.rx = Some(payload.to_vec()); + device.rx = Some(payload.into()); iface.poll(Instant::now(), &mut device, &mut sockets); } @@ -446,7 +447,7 @@ impl ProxyTcpHandler { }; header.write(&mut buffer)?; buffer.extend_from_slice(&payload); - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit tcp packet: {}", error); } } diff --git a/network/src/proxynat/udp.rs b/network/src/proxynat/udp.rs index 5d8ef36..fb1ecd7 100644 --- a/network/src/proxynat/udp.rs +++ b/network/src/proxynat/udp.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::BytesMut; use etherparse::{PacketBuilder, SlicedPacket, UdpSlice}; use log::{debug, warn}; use smoltcp::wire::IpAddress; @@ -20,7 +21,7 @@ use crate::nat::{NatHandler, NatHandlerContext}; const UDP_TIMEOUT_SECS: u64 = 60; pub struct ProxyUdpHandler { - rx_sender: Sender>, + rx_sender: Sender, } #[async_trait] @@ -29,7 +30,7 @@ impl NatHandler for ProxyUdpHandler { if self.rx_sender.is_closed() { Ok(true) } else { - self.rx_sender.try_send(data.to_vec())?; + self.rx_sender.try_send(data.into())?; Ok(true) } } @@ -37,19 +38,19 @@ impl NatHandler for ProxyUdpHandler { enum ProxyUdpSelect { External(usize), - Internal(Vec), + Internal(BytesMut), Close, } impl ProxyUdpHandler { - pub fn new(rx_sender: Sender>) -> Self { + pub fn new(rx_sender: Sender) -> Self { ProxyUdpHandler { rx_sender } } pub async fn spawn( &mut self, context: NatHandlerContext, - rx_receiver: Receiver>, + rx_receiver: Receiver, ) -> Result<()> { let external_addr = match context.key.external_ip.addr { IpAddress::Ipv4(addr) => { @@ -72,7 +73,7 @@ impl ProxyUdpHandler { async fn process( context: NatHandlerContext, mut socket: UdpStream, - mut rx_receiver: Receiver>, + mut rx_receiver: Receiver, ) -> Result<()> { let mut external_buffer = vec![0u8; 2048]; @@ -108,7 +109,7 @@ impl ProxyUdpHandler { packet.udp(context.key.external_ip.port, context.key.client_ip.port); let mut buffer: Vec = Vec::new(); packet.write(&mut buffer, data)?; - if let Err(error) = context.try_send(buffer) { + if let Err(error) = context.try_send(buffer.as_slice().into()) { debug!("failed to transmit udp packet: {}", error); } } diff --git a/network/src/vbridge.rs b/network/src/vbridge.rs index ba52f17..0bc6d54 100644 --- a/network/src/vbridge.rs +++ b/network/src/vbridge.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Result}; +use bytes::BytesMut; use etherparse::Ethernet2Header; use log::{debug, trace, warn}; use smoltcp::wire::EthernetAddress; @@ -26,33 +27,33 @@ const BROADCAST_RX_QUEUE_LEN: usize = 4; #[derive(Debug)] struct BridgeMember { - pub bridge_rx_sender: Sender>, + pub bridge_rx_sender: Sender, } pub struct BridgeJoinHandle { - pub bridge_tx_sender: Sender>, - pub bridge_rx_receiver: Receiver>, - pub broadcast_rx_receiver: BroadcastReceiver>, + pub bridge_tx_sender: Sender, + pub bridge_rx_receiver: Receiver, + pub broadcast_rx_receiver: BroadcastReceiver, } type VirtualBridgeMemberMap = Arc>>; #[derive(Clone)] pub struct VirtualBridge { - bridge_tx_sender: Sender>, members: VirtualBridgeMemberMap, - broadcast_rx_sender: BroadcastSender>, + bridge_tx_sender: Sender, + broadcast_rx_sender: BroadcastSender, _task: Arc>, } enum VirtualBridgeSelect { - BroadcastSent(Option>), - PacketReceived(Option>), + BroadcastSent(Option), + PacketReceived(Option), } impl VirtualBridge { pub fn new() -> Result { - let (bridge_tx_sender, bridge_tx_receiver) = channel::>(BRIDGE_TX_QUEUE_LEN); + let (bridge_tx_sender, bridge_tx_receiver) = channel::(BRIDGE_TX_QUEUE_LEN); let (broadcast_rx_sender, broadcast_rx_receiver) = broadcast_channel(BROADCAST_RX_QUEUE_LEN); @@ -83,7 +84,7 @@ impl VirtualBridge { } pub async fn join(&self, mac: EthernetAddress) -> Result { - let (bridge_rx_sender, bridge_rx_receiver) = channel::>(BRIDGE_RX_QUEUE_LEN); + let (bridge_rx_sender, bridge_rx_receiver) = channel::(BRIDGE_RX_QUEUE_LEN); let member = BridgeMember { bridge_rx_sender }; match self.members.lock().await.entry(mac.0) { @@ -107,9 +108,9 @@ impl VirtualBridge { async fn process( members: VirtualBridgeMemberMap, - mut bridge_tx_receiver: Receiver>, - broadcast_rx_sender: BroadcastSender>, - mut broadcast_rx_receiver: BroadcastReceiver>, + mut bridge_tx_receiver: Receiver, + broadcast_rx_sender: BroadcastSender, + mut broadcast_rx_receiver: BroadcastReceiver, ) -> Result<()> { loop { let selection = select! {