mirror of
				https://github.com/edera-dev/krata.git
				synced 2025-11-03 23:29:39 +00:00 
			
		
		
		
	network: performance improvements across NAT stack
This commit is contained in:
		@ -5,7 +5,7 @@ use crate::raw_socket::{AsyncRawSocket, RawSocketProtocol};
 | 
			
		||||
use advmac::MacAddr6;
 | 
			
		||||
use anyhow::{anyhow, Result};
 | 
			
		||||
use futures::TryStreamExt;
 | 
			
		||||
use log::warn;
 | 
			
		||||
use log::debug;
 | 
			
		||||
use smoltcp::iface::{Config, Interface, SocketSet};
 | 
			
		||||
use smoltcp::phy::Medium;
 | 
			
		||||
use smoltcp::time::Instant;
 | 
			
		||||
@ -57,13 +57,12 @@ impl NetworkStack<'_> {
 | 
			
		||||
 | 
			
		||||
            NetworkStackSelect::Receive(packet) => {
 | 
			
		||||
                if let Err(error) = self.router.process(packet).await {
 | 
			
		||||
                    warn!("router failed to process packet: {}", error);
 | 
			
		||||
                    debug!("router failed to process packet: {}", error);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                self.udev.rx = Some(packet.to_vec());
 | 
			
		||||
                let timestamp = Instant::now();
 | 
			
		||||
                self.interface
 | 
			
		||||
                    .poll(timestamp, &mut self.udev, &mut self.sockets);
 | 
			
		||||
                    .poll(Instant::now(), &mut self.udev, &mut self.sockets);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            NetworkStackSelect::Reclaim => {}
 | 
			
		||||
@ -128,9 +127,13 @@ impl NetworkBackend {
 | 
			
		||||
        let mut kdev =
 | 
			
		||||
            AsyncRawSocket::bound_to_interface(&self.interface, RawSocketProtocol::Ethernet)?;
 | 
			
		||||
        let mtu = kdev.mtu_of_interface(&self.interface)?;
 | 
			
		||||
        let (tx_sender, tx_receiver) = channel::<Vec<u8>>(4);
 | 
			
		||||
        let (tx_sender, tx_receiver) = channel::<Vec<u8>>(100);
 | 
			
		||||
        let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone());
 | 
			
		||||
        let mac = self.force_mac_address.unwrap_or_else(MacAddr6::random);
 | 
			
		||||
        let mac = self.force_mac_address.unwrap_or_else(|| {
 | 
			
		||||
            let mut mac = MacAddr6::random();
 | 
			
		||||
            mac.set_local(true);
 | 
			
		||||
            mac
 | 
			
		||||
        });
 | 
			
		||||
        let mac = smoltcp::wire::EthernetAddress(mac.to_array());
 | 
			
		||||
        let nat = NatRouter::new(mtu, proxy, mac, addresses.clone(), tx_sender.clone());
 | 
			
		||||
        let mac = HardwareAddress::Ethernet(mac);
 | 
			
		||||
 | 
			
		||||
@ -6,12 +6,12 @@ use etherparse::Icmpv4Type;
 | 
			
		||||
use etherparse::Icmpv6Header;
 | 
			
		||||
use etherparse::Icmpv6Type;
 | 
			
		||||
use etherparse::IpNumber;
 | 
			
		||||
use etherparse::IpPayloadSlice;
 | 
			
		||||
use etherparse::Ipv4Slice;
 | 
			
		||||
use etherparse::Ipv6Slice;
 | 
			
		||||
use etherparse::LaxIpPayloadSlice;
 | 
			
		||||
use etherparse::LaxIpv4Slice;
 | 
			
		||||
use etherparse::LaxIpv6Slice;
 | 
			
		||||
use etherparse::LaxNetSlice;
 | 
			
		||||
use etherparse::LaxSlicedPacket;
 | 
			
		||||
use etherparse::LinkSlice;
 | 
			
		||||
use etherparse::NetSlice;
 | 
			
		||||
use etherparse::SlicedPacket;
 | 
			
		||||
use etherparse::TcpHeaderSlice;
 | 
			
		||||
use etherparse::UdpHeaderSlice;
 | 
			
		||||
use log::{debug, trace};
 | 
			
		||||
@ -74,7 +74,7 @@ impl NatHandlerContext {
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
pub trait NatHandler: Send {
 | 
			
		||||
    async fn receive(&self, packet: &[u8]) -> Result<()>;
 | 
			
		||||
    async fn receive(&self, packet: &[u8]) -> Result<bool>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
@ -134,16 +134,19 @@ impl NatRouter {
 | 
			
		||||
 | 
			
		||||
    pub async fn process_reclaim(&mut self) -> Result<Option<NatKey>> {
 | 
			
		||||
        Ok(if let Some(key) = self.reclaim_receiver.recv().await {
 | 
			
		||||
            self.table.inner.remove(&key);
 | 
			
		||||
            debug!("reclaimed nat key: {}", key);
 | 
			
		||||
            Some(key)
 | 
			
		||||
            if self.table.inner.remove(&key).is_some() {
 | 
			
		||||
                debug!("reclaimed nat key: {}", key);
 | 
			
		||||
                Some(key)
 | 
			
		||||
            } else {
 | 
			
		||||
                None
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            None
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn process(&mut self, data: &[u8]) -> Result<()> {
 | 
			
		||||
        let packet = SlicedPacket::from_ethernet(data)?;
 | 
			
		||||
        let packet = LaxSlicedPacket::from_ethernet(data)?;
 | 
			
		||||
        let Some(ref link) = packet.link else {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        };
 | 
			
		||||
@ -167,8 +170,8 @@ impl NatRouter {
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        match net {
 | 
			
		||||
            NetSlice::Ipv4(ipv4) => self.process_ipv4(data, ether, ipv4).await?,
 | 
			
		||||
            NetSlice::Ipv6(ipv6) => self.process_ipv6(data, ether, ipv6).await?,
 | 
			
		||||
            LaxNetSlice::Ipv4(ipv4) => self.process_ipv4(data, ether, ipv4).await?,
 | 
			
		||||
            LaxNetSlice::Ipv6(ipv6) => self.process_ipv6(data, ether, ipv6).await?,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
@ -178,7 +181,7 @@ impl NatRouter {
 | 
			
		||||
        &mut self,
 | 
			
		||||
        data: &[u8],
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        ipv4: &Ipv4Slice<'a>,
 | 
			
		||||
        ipv4: &LaxIpv4Slice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let source_addr = IpAddress::Ipv4(ipv4.header().source_addr().into());
 | 
			
		||||
        let dest_addr = IpAddress::Ipv4(ipv4.header().destination_addr().into());
 | 
			
		||||
@ -208,7 +211,7 @@ impl NatRouter {
 | 
			
		||||
        &mut self,
 | 
			
		||||
        data: &[u8],
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        ipv6: &Ipv6Slice<'a>,
 | 
			
		||||
        ipv6: &LaxIpv6Slice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let source_addr = IpAddress::Ipv6(ipv6.header().source_addr().into());
 | 
			
		||||
        let dest_addr = IpAddress::Ipv6(ipv6.header().destination_addr().into());
 | 
			
		||||
@ -240,7 +243,7 @@ impl NatRouter {
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        source_addr: IpAddress,
 | 
			
		||||
        dest_addr: IpAddress,
 | 
			
		||||
        payload: &IpPayloadSlice<'a>,
 | 
			
		||||
        payload: &LaxIpPayloadSlice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let header = TcpHeaderSlice::from_slice(payload.payload)?;
 | 
			
		||||
        let source = IpEndpoint::new(source_addr, header.source_port());
 | 
			
		||||
@ -262,7 +265,7 @@ impl NatRouter {
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        source_addr: IpAddress,
 | 
			
		||||
        dest_addr: IpAddress,
 | 
			
		||||
        payload: &IpPayloadSlice<'a>,
 | 
			
		||||
        payload: &LaxIpPayloadSlice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let header = UdpHeaderSlice::from_slice(payload.payload)?;
 | 
			
		||||
        let source = IpEndpoint::new(source_addr, header.source_port());
 | 
			
		||||
@ -284,7 +287,7 @@ impl NatRouter {
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        source_addr: IpAddress,
 | 
			
		||||
        dest_addr: IpAddress,
 | 
			
		||||
        payload: &IpPayloadSlice<'a>,
 | 
			
		||||
        payload: &LaxIpPayloadSlice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let (header, _) = Icmpv4Header::from_slice(payload.payload)?;
 | 
			
		||||
        let Icmpv4Type::EchoRequest(_) = header.icmp_type else {
 | 
			
		||||
@ -309,7 +312,7 @@ impl NatRouter {
 | 
			
		||||
        ether: &Ethernet2Slice<'a>,
 | 
			
		||||
        source_addr: IpAddress,
 | 
			
		||||
        dest_addr: IpAddress,
 | 
			
		||||
        payload: &IpPayloadSlice<'a>,
 | 
			
		||||
        payload: &LaxIpPayloadSlice<'a>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let (header, _) = Icmpv6Header::from_slice(payload.payload)?;
 | 
			
		||||
        let Icmpv6Type::EchoRequest(_) = header.icmp_type else {
 | 
			
		||||
@ -354,7 +357,9 @@ impl NatRouter {
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if let Some(handler) = handler {
 | 
			
		||||
            handler.receive(data).await?;
 | 
			
		||||
            if !handler.receive(data).await? {
 | 
			
		||||
                self.reclaim_sender.try_send(key)?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -27,9 +27,13 @@ pub struct ProxyIcmpHandler {
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
impl NatHandler for ProxyIcmpHandler {
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<()> {
 | 
			
		||||
        self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<bool> {
 | 
			
		||||
        if self.rx_sender.is_closed() {
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        } else {
 | 
			
		||||
            self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -47,7 +47,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            NatKeyProtocol::Icmp => {
 | 
			
		||||
                let (rx_sender, rx_receiver) = channel::<Vec<u8>>(4);
 | 
			
		||||
                let (rx_sender, rx_receiver) = channel::<Vec<u8>>(300);
 | 
			
		||||
                let mut handler = ProxyIcmpHandler::new(rx_sender);
 | 
			
		||||
 | 
			
		||||
                if let Err(error) = handler.spawn(context, rx_receiver).await {
 | 
			
		||||
@ -59,7 +59,7 @@ impl NatHandlerFactory for ProxyNatHandlerFactory {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            NatKeyProtocol::Tcp => {
 | 
			
		||||
                let (rx_sender, rx_receiver) = channel::<Vec<u8>>(4);
 | 
			
		||||
                let (rx_sender, rx_receiver) = channel::<Vec<u8>>(300);
 | 
			
		||||
                let mut handler = ProxyTcpHandler::new(rx_sender);
 | 
			
		||||
 | 
			
		||||
                if let Err(error) = handler.spawn(context, rx_receiver).await {
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,7 @@ use crate::{
 | 
			
		||||
 | 
			
		||||
const TCP_BUFFER_SIZE: usize = 65535;
 | 
			
		||||
const TCP_ACCEPT_TIMEOUT_SECS: u64 = 120;
 | 
			
		||||
const TCP_DANGLE_TIMEOUT_SECS: u64 = 10;
 | 
			
		||||
 | 
			
		||||
pub struct ProxyTcpHandler {
 | 
			
		||||
    rx_sender: Sender<Vec<u8>>,
 | 
			
		||||
@ -36,9 +37,13 @@ pub struct ProxyTcpHandler {
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
impl NatHandler for ProxyTcpHandler {
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<()> {
 | 
			
		||||
        self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<bool> {
 | 
			
		||||
        if self.rx_sender.is_closed() {
 | 
			
		||||
            Ok(false)
 | 
			
		||||
        } else {
 | 
			
		||||
            self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -62,6 +67,13 @@ enum ProxyTcpDataSelect {
 | 
			
		||||
    Close,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
enum ProxyTcpFinishSelect {
 | 
			
		||||
    InternalRecv(Vec<u8>),
 | 
			
		||||
    TxIpPacket(Vec<u8>),
 | 
			
		||||
    Close,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ProxyTcpHandler {
 | 
			
		||||
    pub fn new(rx_sender: Sender<Vec<u8>>) -> Self {
 | 
			
		||||
        ProxyTcpHandler { rx_sender }
 | 
			
		||||
@ -95,10 +107,14 @@ impl ProxyTcpHandler {
 | 
			
		||||
        mut external_socket: TcpStream,
 | 
			
		||||
        mut rx_receiver: Receiver<Vec<u8>>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let (ip_sender, mut ip_receiver) = channel::<Vec<u8>>(4);
 | 
			
		||||
        let (ip_sender, mut ip_receiver) = channel::<Vec<u8>>(300);
 | 
			
		||||
        let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE];
 | 
			
		||||
 | 
			
		||||
        let mut device = ChannelDevice::new(context.mtu, Medium::Ip, ip_sender.clone());
 | 
			
		||||
        let mut device = ChannelDevice::new(
 | 
			
		||||
            context.mtu - Ethernet2Header::LEN,
 | 
			
		||||
            Medium::Ip,
 | 
			
		||||
            ip_sender.clone(),
 | 
			
		||||
        );
 | 
			
		||||
        let config = Config::new(HardwareAddress::Ip);
 | 
			
		||||
 | 
			
		||||
        let tcp_rx_buffer = SocketBuffer::new(vec![0; TCP_BUFFER_SIZE]);
 | 
			
		||||
@ -212,10 +228,10 @@ impl ProxyTcpHandler {
 | 
			
		||||
            .get_mut::<tcp::Socket>(internal_socket_handle)
 | 
			
		||||
            .is_active()
 | 
			
		||||
        {
 | 
			
		||||
            debug!("failed to accept tcp connection from client");
 | 
			
		||||
            true
 | 
			
		||||
        } else {
 | 
			
		||||
            true
 | 
			
		||||
            debug!("failed to accept tcp connection from client");
 | 
			
		||||
            false
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mut already_shutdown = false;
 | 
			
		||||
@ -268,13 +284,13 @@ impl ProxyTcpHandler {
 | 
			
		||||
                if !do_shutdown {
 | 
			
		||||
                    select! {
 | 
			
		||||
                        biased;
 | 
			
		||||
                        x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                        x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ProxyTcpDataSelect::Close
 | 
			
		||||
                        },
 | 
			
		||||
                        x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                        x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ProxyTcpDataSelect::Close
 | 
			
		||||
                        },
 | 
			
		||||
@ -285,13 +301,13 @@ impl ProxyTcpHandler {
 | 
			
		||||
                } else {
 | 
			
		||||
                    select! {
 | 
			
		||||
                        biased;
 | 
			
		||||
                        x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                        x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ProxyTcpDataSelect::Close
 | 
			
		||||
                        },
 | 
			
		||||
                        x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                        x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                            ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ProxyTcpDataSelect::Close
 | 
			
		||||
                        },
 | 
			
		||||
@ -303,13 +319,13 @@ impl ProxyTcpHandler {
 | 
			
		||||
            } else if !do_shutdown {
 | 
			
		||||
                select! {
 | 
			
		||||
                    biased;
 | 
			
		||||
                    x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                    x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                    } else {
 | 
			
		||||
                        ProxyTcpDataSelect::Close
 | 
			
		||||
                    },
 | 
			
		||||
                    x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                    x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                    } else {
 | 
			
		||||
                        ProxyTcpDataSelect::Close
 | 
			
		||||
                    },
 | 
			
		||||
@ -320,13 +336,13 @@ impl ProxyTcpHandler {
 | 
			
		||||
            } else {
 | 
			
		||||
                select! {
 | 
			
		||||
                    biased;
 | 
			
		||||
                    x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                    x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                    } else {
 | 
			
		||||
                        ProxyTcpDataSelect::Close
 | 
			
		||||
                    },
 | 
			
		||||
                    x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::TxIpPacket(data)
 | 
			
		||||
                    x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                        ProxyTcpDataSelect::InternalRecv(data)
 | 
			
		||||
                    } else {
 | 
			
		||||
                        ProxyTcpDataSelect::Close
 | 
			
		||||
                    },
 | 
			
		||||
@ -389,6 +405,58 @@ impl ProxyTcpHandler {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let _ = external_socket.shutdown().await;
 | 
			
		||||
        drop(external_socket);
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            let deadline = tokio::time::sleep(Duration::from_secs(TCP_DANGLE_TIMEOUT_SECS));
 | 
			
		||||
            tokio::pin!(deadline);
 | 
			
		||||
 | 
			
		||||
            let selection = select! {
 | 
			
		||||
                biased;
 | 
			
		||||
                x = ip_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                    ProxyTcpFinishSelect::TxIpPacket(data)
 | 
			
		||||
                } else {
 | 
			
		||||
                    ProxyTcpFinishSelect::Close
 | 
			
		||||
                },
 | 
			
		||||
                x = rx_receiver.recv() => if let Some(data) = x {
 | 
			
		||||
                    ProxyTcpFinishSelect::InternalRecv(data)
 | 
			
		||||
                } else {
 | 
			
		||||
                    ProxyTcpFinishSelect::Close
 | 
			
		||||
                },
 | 
			
		||||
                _ = deadline => ProxyTcpFinishSelect::Close,
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            match selection {
 | 
			
		||||
                ProxyTcpFinishSelect::InternalRecv(data) => {
 | 
			
		||||
                    let (_, payload) = Ethernet2Header::from_slice(&data)?;
 | 
			
		||||
                    device.rx = Some(payload.to_vec());
 | 
			
		||||
                    iface.poll(Instant::now(), &mut device, &mut sockets);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                ProxyTcpFinishSelect::TxIpPacket(payload) => {
 | 
			
		||||
                    let mut buffer: Vec<u8> = Vec::new();
 | 
			
		||||
                    let header = Ethernet2Header {
 | 
			
		||||
                        source: context.key.local_mac.0,
 | 
			
		||||
                        destination: context.key.client_mac.0,
 | 
			
		||||
                        ether_type: match context.key.external_ip.addr {
 | 
			
		||||
                            IpAddress::Ipv4(_) => EtherType::IPV4,
 | 
			
		||||
                            IpAddress::Ipv6(_) => EtherType::IPV6,
 | 
			
		||||
                        },
 | 
			
		||||
                    };
 | 
			
		||||
                    header.write(&mut buffer)?;
 | 
			
		||||
                    buffer.extend_from_slice(&payload);
 | 
			
		||||
                    if let Err(error) = context.try_send(buffer) {
 | 
			
		||||
                        debug!("failed to transmit tcp packet: {}", error);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                ProxyTcpFinishSelect::Close => {
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        context.reclaim().await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
 | 
			
		||||
@ -25,9 +25,13 @@ pub struct ProxyUdpHandler {
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
impl NatHandler for ProxyUdpHandler {
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<()> {
 | 
			
		||||
        self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    async fn receive(&self, data: &[u8]) -> Result<bool> {
 | 
			
		||||
        if self.rx_sender.is_closed() {
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        } else {
 | 
			
		||||
            self.rx_sender.try_send(data.to_vec())?;
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user