From f96f9d8abf0856aa6d46e2dd81fb6d2c742fc330 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sun, 11 Feb 2024 11:53:59 +0000 Subject: [PATCH] network: performance improvements across NAT stack --- network/src/backend.rs | 15 +++-- network/src/nat.rs | 43 +++++++------ network/src/proxynat/icmp.rs | 10 ++- network/src/proxynat/mod.rs | 4 +- network/src/proxynat/tcp.rs | 114 ++++++++++++++++++++++++++++------- network/src/proxynat/udp.rs | 10 ++- 6 files changed, 140 insertions(+), 56 deletions(-) diff --git a/network/src/backend.rs b/network/src/backend.rs index b99a29f..0c0b440 100644 --- a/network/src/backend.rs +++ b/network/src/backend.rs @@ -5,7 +5,7 @@ use crate::raw_socket::{AsyncRawSocket, RawSocketProtocol}; use advmac::MacAddr6; use anyhow::{anyhow, Result}; use futures::TryStreamExt; -use log::warn; +use log::debug; use smoltcp::iface::{Config, Interface, SocketSet}; use smoltcp::phy::Medium; use smoltcp::time::Instant; @@ -57,13 +57,12 @@ impl NetworkStack<'_> { NetworkStackSelect::Receive(packet) => { if let Err(error) = self.router.process(packet).await { - warn!("router failed to process packet: {}", error); + debug!("router failed to process packet: {}", error); } self.udev.rx = Some(packet.to_vec()); - let timestamp = Instant::now(); self.interface - .poll(timestamp, &mut self.udev, &mut self.sockets); + .poll(Instant::now(), &mut self.udev, &mut self.sockets); } NetworkStackSelect::Reclaim => {} @@ -128,9 +127,13 @@ impl NetworkBackend { let mut kdev = AsyncRawSocket::bound_to_interface(&self.interface, RawSocketProtocol::Ethernet)?; let mtu = kdev.mtu_of_interface(&self.interface)?; - let (tx_sender, tx_receiver) = channel::>(4); + let (tx_sender, tx_receiver) = channel::>(100); let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone()); - let mac = self.force_mac_address.unwrap_or_else(MacAddr6::random); + let mac = self.force_mac_address.unwrap_or_else(|| { + let mut mac = MacAddr6::random(); + mac.set_local(true); + mac + }); let mac = smoltcp::wire::EthernetAddress(mac.to_array()); let nat = NatRouter::new(mtu, proxy, mac, addresses.clone(), tx_sender.clone()); let mac = HardwareAddress::Ethernet(mac); diff --git a/network/src/nat.rs b/network/src/nat.rs index 4ad3fe2..fb39ca4 100644 --- a/network/src/nat.rs +++ b/network/src/nat.rs @@ -6,12 +6,12 @@ use etherparse::Icmpv4Type; use etherparse::Icmpv6Header; use etherparse::Icmpv6Type; use etherparse::IpNumber; -use etherparse::IpPayloadSlice; -use etherparse::Ipv4Slice; -use etherparse::Ipv6Slice; +use etherparse::LaxIpPayloadSlice; +use etherparse::LaxIpv4Slice; +use etherparse::LaxIpv6Slice; +use etherparse::LaxNetSlice; +use etherparse::LaxSlicedPacket; use etherparse::LinkSlice; -use etherparse::NetSlice; -use etherparse::SlicedPacket; use etherparse::TcpHeaderSlice; use etherparse::UdpHeaderSlice; use log::{debug, trace}; @@ -74,7 +74,7 @@ impl NatHandlerContext { #[async_trait] pub trait NatHandler: Send { - async fn receive(&self, packet: &[u8]) -> Result<()>; + async fn receive(&self, packet: &[u8]) -> Result; } #[async_trait] @@ -134,16 +134,19 @@ impl NatRouter { pub async fn process_reclaim(&mut self) -> Result> { Ok(if let Some(key) = self.reclaim_receiver.recv().await { - self.table.inner.remove(&key); - debug!("reclaimed nat key: {}", key); - Some(key) + if self.table.inner.remove(&key).is_some() { + debug!("reclaimed nat key: {}", key); + Some(key) + } else { + None + } } else { None }) } pub async fn process(&mut self, data: &[u8]) -> Result<()> { - let packet = SlicedPacket::from_ethernet(data)?; + let packet = LaxSlicedPacket::from_ethernet(data)?; let Some(ref link) = packet.link else { return Ok(()); }; @@ -167,8 +170,8 @@ impl NatRouter { }; match net { - NetSlice::Ipv4(ipv4) => self.process_ipv4(data, ether, ipv4).await?, - NetSlice::Ipv6(ipv6) => self.process_ipv6(data, ether, ipv6).await?, + LaxNetSlice::Ipv4(ipv4) => self.process_ipv4(data, ether, ipv4).await?, + LaxNetSlice::Ipv6(ipv6) => self.process_ipv6(data, ether, ipv6).await?, } Ok(()) @@ -178,7 +181,7 @@ impl NatRouter { &mut self, data: &[u8], ether: &Ethernet2Slice<'a>, - ipv4: &Ipv4Slice<'a>, + ipv4: &LaxIpv4Slice<'a>, ) -> Result<()> { let source_addr = IpAddress::Ipv4(ipv4.header().source_addr().into()); let dest_addr = IpAddress::Ipv4(ipv4.header().destination_addr().into()); @@ -208,7 +211,7 @@ impl NatRouter { &mut self, data: &[u8], ether: &Ethernet2Slice<'a>, - ipv6: &Ipv6Slice<'a>, + ipv6: &LaxIpv6Slice<'a>, ) -> Result<()> { let source_addr = IpAddress::Ipv6(ipv6.header().source_addr().into()); let dest_addr = IpAddress::Ipv6(ipv6.header().destination_addr().into()); @@ -240,7 +243,7 @@ impl NatRouter { ether: &Ethernet2Slice<'a>, source_addr: IpAddress, dest_addr: IpAddress, - payload: &IpPayloadSlice<'a>, + payload: &LaxIpPayloadSlice<'a>, ) -> Result<()> { let header = TcpHeaderSlice::from_slice(payload.payload)?; let source = IpEndpoint::new(source_addr, header.source_port()); @@ -262,7 +265,7 @@ impl NatRouter { ether: &Ethernet2Slice<'a>, source_addr: IpAddress, dest_addr: IpAddress, - payload: &IpPayloadSlice<'a>, + payload: &LaxIpPayloadSlice<'a>, ) -> Result<()> { let header = UdpHeaderSlice::from_slice(payload.payload)?; let source = IpEndpoint::new(source_addr, header.source_port()); @@ -284,7 +287,7 @@ impl NatRouter { ether: &Ethernet2Slice<'a>, source_addr: IpAddress, dest_addr: IpAddress, - payload: &IpPayloadSlice<'a>, + payload: &LaxIpPayloadSlice<'a>, ) -> Result<()> { let (header, _) = Icmpv4Header::from_slice(payload.payload)?; let Icmpv4Type::EchoRequest(_) = header.icmp_type else { @@ -309,7 +312,7 @@ impl NatRouter { ether: &Ethernet2Slice<'a>, source_addr: IpAddress, dest_addr: IpAddress, - payload: &IpPayloadSlice<'a>, + payload: &LaxIpPayloadSlice<'a>, ) -> Result<()> { let (header, _) = Icmpv6Header::from_slice(payload.payload)?; let Icmpv6Type::EchoRequest(_) = header.icmp_type else { @@ -354,7 +357,9 @@ impl NatRouter { }; if let Some(handler) = handler { - handler.receive(data).await?; + if !handler.receive(data).await? { + self.reclaim_sender.try_send(key)?; + } } Ok(()) } diff --git a/network/src/proxynat/icmp.rs b/network/src/proxynat/icmp.rs index d234a8e..b67a58d 100644 --- a/network/src/proxynat/icmp.rs +++ b/network/src/proxynat/icmp.rs @@ -27,9 +27,13 @@ pub struct ProxyIcmpHandler { #[async_trait] impl NatHandler for ProxyIcmpHandler { - async fn receive(&self, data: &[u8]) -> Result<()> { - self.rx_sender.try_send(data.to_vec())?; - Ok(()) + async fn receive(&self, data: &[u8]) -> Result { + if self.rx_sender.is_closed() { + Ok(true) + } else { + self.rx_sender.try_send(data.to_vec())?; + Ok(true) + } } } diff --git a/network/src/proxynat/mod.rs b/network/src/proxynat/mod.rs index d59d281..7cadff6 100644 --- a/network/src/proxynat/mod.rs +++ b/network/src/proxynat/mod.rs @@ -47,7 +47,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory { } NatKeyProtocol::Icmp => { - let (rx_sender, rx_receiver) = channel::>(4); + let (rx_sender, rx_receiver) = channel::>(300); let mut handler = ProxyIcmpHandler::new(rx_sender); if let Err(error) = handler.spawn(context, rx_receiver).await { @@ -59,7 +59,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory { } NatKeyProtocol::Tcp => { - let (rx_sender, rx_receiver) = channel::>(4); + let (rx_sender, rx_receiver) = channel::>(300); let mut handler = ProxyTcpHandler::new(rx_sender); if let Err(error) = handler.spawn(context, rx_receiver).await { diff --git a/network/src/proxynat/tcp.rs b/network/src/proxynat/tcp.rs index 4154a26..f835c8a 100644 --- a/network/src/proxynat/tcp.rs +++ b/network/src/proxynat/tcp.rs @@ -29,6 +29,7 @@ use crate::{ const TCP_BUFFER_SIZE: usize = 65535; const TCP_ACCEPT_TIMEOUT_SECS: u64 = 120; +const TCP_DANGLE_TIMEOUT_SECS: u64 = 10; pub struct ProxyTcpHandler { rx_sender: Sender>, @@ -36,9 +37,13 @@ pub struct ProxyTcpHandler { #[async_trait] impl NatHandler for ProxyTcpHandler { - async fn receive(&self, data: &[u8]) -> Result<()> { - self.rx_sender.try_send(data.to_vec())?; - Ok(()) + async fn receive(&self, data: &[u8]) -> Result { + if self.rx_sender.is_closed() { + Ok(false) + } else { + self.rx_sender.try_send(data.to_vec())?; + Ok(true) + } } } @@ -62,6 +67,13 @@ enum ProxyTcpDataSelect { Close, } +#[derive(Debug)] +enum ProxyTcpFinishSelect { + InternalRecv(Vec), + TxIpPacket(Vec), + Close, +} + impl ProxyTcpHandler { pub fn new(rx_sender: Sender>) -> Self { ProxyTcpHandler { rx_sender } @@ -95,10 +107,14 @@ impl ProxyTcpHandler { mut external_socket: TcpStream, mut rx_receiver: Receiver>, ) -> Result<()> { - let (ip_sender, mut ip_receiver) = channel::>(4); + let (ip_sender, mut ip_receiver) = channel::>(300); let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE]; - let mut device = ChannelDevice::new(context.mtu, Medium::Ip, ip_sender.clone()); + let mut device = ChannelDevice::new( + context.mtu - Ethernet2Header::LEN, + Medium::Ip, + ip_sender.clone(), + ); let config = Config::new(HardwareAddress::Ip); let tcp_rx_buffer = SocketBuffer::new(vec![0; TCP_BUFFER_SIZE]); @@ -212,10 +228,10 @@ impl ProxyTcpHandler { .get_mut::(internal_socket_handle) .is_active() { - debug!("failed to accept tcp connection from client"); true } else { - true + debug!("failed to accept tcp connection from client"); + false }; let mut already_shutdown = false; @@ -268,13 +284,13 @@ impl ProxyTcpHandler { if !do_shutdown { select! { biased; - x = rx_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::InternalRecv(data) + x = ip_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::TxIpPacket(data) } else { ProxyTcpDataSelect::Close }, - x = ip_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::TxIpPacket(data) + x = rx_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::InternalRecv(data) } else { ProxyTcpDataSelect::Close }, @@ -285,13 +301,13 @@ impl ProxyTcpHandler { } else { select! { biased; - x = rx_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::InternalRecv(data) + x = ip_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::TxIpPacket(data) } else { ProxyTcpDataSelect::Close }, - x = ip_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::TxIpPacket(data) + x = rx_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::InternalRecv(data) } else { ProxyTcpDataSelect::Close }, @@ -303,13 +319,13 @@ impl ProxyTcpHandler { } else if !do_shutdown { select! { biased; - x = rx_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::InternalRecv(data) + x = ip_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::TxIpPacket(data) } else { ProxyTcpDataSelect::Close }, - x = ip_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::TxIpPacket(data) + x = rx_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::InternalRecv(data) } else { ProxyTcpDataSelect::Close }, @@ -320,13 +336,13 @@ impl ProxyTcpHandler { } else { select! { biased; - x = rx_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::InternalRecv(data) + x = ip_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::TxIpPacket(data) } else { ProxyTcpDataSelect::Close }, - x = ip_receiver.recv() => if let Some(data) = x { - ProxyTcpDataSelect::TxIpPacket(data) + x = rx_receiver.recv() => if let Some(data) = x { + ProxyTcpDataSelect::InternalRecv(data) } else { ProxyTcpDataSelect::Close }, @@ -389,6 +405,58 @@ impl ProxyTcpHandler { } } + let _ = external_socket.shutdown().await; + drop(external_socket); + + loop { + let deadline = tokio::time::sleep(Duration::from_secs(TCP_DANGLE_TIMEOUT_SECS)); + tokio::pin!(deadline); + + let selection = select! { + biased; + x = ip_receiver.recv() => if let Some(data) = x { + ProxyTcpFinishSelect::TxIpPacket(data) + } else { + ProxyTcpFinishSelect::Close + }, + x = rx_receiver.recv() => if let Some(data) = x { + ProxyTcpFinishSelect::InternalRecv(data) + } else { + ProxyTcpFinishSelect::Close + }, + _ = deadline => ProxyTcpFinishSelect::Close, + }; + + match selection { + ProxyTcpFinishSelect::InternalRecv(data) => { + let (_, payload) = Ethernet2Header::from_slice(&data)?; + device.rx = Some(payload.to_vec()); + iface.poll(Instant::now(), &mut device, &mut sockets); + } + + ProxyTcpFinishSelect::TxIpPacket(payload) => { + let mut buffer: Vec = Vec::new(); + let header = Ethernet2Header { + source: context.key.local_mac.0, + destination: context.key.client_mac.0, + ether_type: match context.key.external_ip.addr { + IpAddress::Ipv4(_) => EtherType::IPV4, + IpAddress::Ipv6(_) => EtherType::IPV6, + }, + }; + header.write(&mut buffer)?; + buffer.extend_from_slice(&payload); + if let Err(error) = context.try_send(buffer) { + debug!("failed to transmit tcp packet: {}", error); + } + } + + ProxyTcpFinishSelect::Close => { + break; + } + } + } + context.reclaim().await?; Ok(()) diff --git a/network/src/proxynat/udp.rs b/network/src/proxynat/udp.rs index bc1dd1a..5d8ef36 100644 --- a/network/src/proxynat/udp.rs +++ b/network/src/proxynat/udp.rs @@ -25,9 +25,13 @@ pub struct ProxyUdpHandler { #[async_trait] impl NatHandler for ProxyUdpHandler { - async fn receive(&self, data: &[u8]) -> Result<()> { - self.rx_sender.try_send(data.to_vec())?; - Ok(()) + async fn receive(&self, data: &[u8]) -> Result { + if self.rx_sender.is_closed() { + Ok(true) + } else { + self.rx_sender.try_send(data.to_vec())?; + Ok(true) + } } }