From d659b3aa552fedcd9ed41c9ccaa760cd4cffcee1 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Fri, 29 Mar 2024 03:09:41 +0000 Subject: [PATCH] network: performance tuning using tear off buffers --- crates/kratanet/src/backend.rs | 3 +- crates/kratanet/src/hbridge.rs | 106 ++++++++++++++++-------------- crates/kratanet/src/raw_socket.rs | 12 +++- crates/kratanet/src/vbridge.rs | 14 ++-- 4 files changed, 71 insertions(+), 64 deletions(-) diff --git a/crates/kratanet/src/backend.rs b/crates/kratanet/src/backend.rs index 43e784c..d8965c7 100644 --- a/crates/kratanet/src/backend.rs +++ b/crates/kratanet/src/backend.rs @@ -44,10 +44,11 @@ struct NetworkStack<'a> { impl NetworkStack<'_> { async fn poll(&mut self) -> Result { let what = select! { + biased; x = self.kdev.receiver.recv() => NetworkStackSelect::Receive(x), + x = self.tx.recv() => NetworkStackSelect::Send(x), x = self.bridge.from_bridge_receiver.recv() => NetworkStackSelect::Send(x), x = self.bridge.from_broadcast_receiver.recv() => NetworkStackSelect::Send(x.ok()), - x = self.tx.recv() => NetworkStackSelect::Send(x), }; match what { diff --git a/crates/kratanet/src/hbridge.rs b/crates/kratanet/src/hbridge.rs index 084ff5a..ab9cc7c 100644 --- a/crates/kratanet/src/hbridge.rs +++ b/crates/kratanet/src/hbridge.rs @@ -1,4 +1,7 @@ -use std::net::{IpAddr, Ipv4Addr}; +use std::{ + io::ErrorKind, + net::{IpAddr, Ipv4Addr}, +}; use advmac::MacAddr6; use anyhow::{anyhow, Result}; @@ -6,19 +9,19 @@ use bytes::BytesMut; use futures::TryStreamExt; use log::error; use smoltcp::wire::EthernetAddress; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - select, - sync::mpsc::channel, - task::JoinHandle, -}; +use tokio::{select, task::JoinHandle}; use tokio_tun::Tun; use crate::vbridge::{BridgeJoinHandle, VirtualBridge}; -const RX_BUFFER_QUEUE_LEN: usize = 100; const HOST_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(10, 75, 0, 1); +#[derive(Debug)] +enum HostBridgeProcessSelect { + Send(Option), + Receive(std::io::Result), +} + pub struct HostBridge { task: JoinHandle<()>, } @@ -82,57 +85,58 @@ impl HostBridge { } async fn process(mtu: usize, tun: Tun, mut bridge_handle: BridgeJoinHandle) -> Result<()> { - let (rx_sender, mut rx_receiver) = channel::(RX_BUFFER_QUEUE_LEN); - let (mut read, mut write) = tokio::io::split(tun); - tokio::task::spawn(async move { - let mut buffer = vec![0u8; mtu]; - loop { - let size = match read.read(&mut buffer).await { - Ok(size) => size, - Err(error) => { - error!("failed to read tap device: {}", error); - break; - } - }; - match rx_sender.send(buffer[0..size].into()).await { + let tear_off_size = 100 * mtu; + let mut buffer: BytesMut = BytesMut::with_capacity(tear_off_size); + loop { + if buffer.capacity() < mtu { + buffer = BytesMut::with_capacity(tear_off_size); + } + + buffer.resize(mtu, 0); + let selection = select! { + biased; + x = tun.recv(&mut buffer) => HostBridgeProcessSelect::Receive(x), + x = bridge_handle.from_bridge_receiver.recv() => HostBridgeProcessSelect::Send(x), + x = bridge_handle.from_broadcast_receiver.recv() => HostBridgeProcessSelect::Send(x.ok()), + }; + + match selection { + HostBridgeProcessSelect::Send(Some(bytes)) => match tun.try_send(&bytes) { Ok(_) => {} Err(error) => { + if error.kind() == ErrorKind::WouldBlock { + continue; + } + return Err(error.into()); + } + }, + + HostBridgeProcessSelect::Send(None) => { + break; + } + + HostBridgeProcessSelect::Receive(result) => match result { + Ok(len) => { + if len == 0 { + continue; + } + let packet = buffer.split_to(len); + let _ = bridge_handle.to_bridge_sender.try_send(packet); + } + + Err(error) => { + if error.kind() == ErrorKind::WouldBlock { + continue; + } + error!( - "failed to send data from tap device to processor: {}", + "failed to receive data from tap device to bridge: {}", error ); break; } - } + }, } - }); - loop { - select! { - x = bridge_handle.from_bridge_receiver.recv() => match x { - Some(bytes) => { - write.write_all(&bytes).await?; - }, - None => { - break; - } - }, - x = bridge_handle.from_broadcast_receiver.recv() => match x { - Ok(bytes) => { - write.write_all(&bytes).await?; - }, - Err(error) => { - return Err(error.into()); - } - }, - x = rx_receiver.recv() => match x { - Some(bytes) => { - bridge_handle.to_bridge_sender.send(bytes).await?; - }, - None => { - break; - } - } - }; } Ok(()) } diff --git a/crates/kratanet/src/raw_socket.rs b/crates/kratanet/src/raw_socket.rs index 11bc854..0ddfaac 100644 --- a/crates/kratanet/src/raw_socket.rs +++ b/crates/kratanet/src/raw_socket.rs @@ -239,8 +239,13 @@ impl AsyncRawSocketChannel { let socket = unsafe { std::net::UdpSocket::from_raw_fd(socket.into_raw_fd()) }; let socket = UdpSocket::from_std(socket)?; - let mut buffer = vec![0; mtu]; + let tear_off_size = 100 * mtu; + let mut buffer: BytesMut = BytesMut::with_capacity(tear_off_size); loop { + if buffer.capacity() < mtu { + buffer = BytesMut::with_capacity(tear_off_size); + } + let selection = select! { x = transmit_receiver.recv() => AsyncRawSocketChannelSelect::TransmitPacket(x), x = socket.readable() => AsyncRawSocketChannelSelect::Readable(x?), @@ -248,13 +253,14 @@ impl AsyncRawSocketChannel { match selection { AsyncRawSocketChannelSelect::Readable(_) => { + buffer.resize(mtu, 0); match socket.try_recv(&mut buffer) { Ok(len) => { if len == 0 { continue; } - let buffer = (&buffer[0..len]).into(); - if let Err(error) = receive_sender.try_send(buffer) { + let packet = buffer.split_to(len); + if let Err(error) = receive_sender.try_send(packet) { debug!( "failed to process received packet from raw socket: {}", error diff --git a/crates/kratanet/src/vbridge.rs b/crates/kratanet/src/vbridge.rs index 5ce8e9b..3ce71b9 100644 --- a/crates/kratanet/src/vbridge.rs +++ b/crates/kratanet/src/vbridge.rs @@ -133,8 +133,8 @@ impl VirtualBridge { loop { let selection = select! { biased; - _ = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent, x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x), + _ = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent, x = member_leave_reciever.recv() => VirtualBridgeSelect::MemberLeave(x), }; @@ -158,10 +158,8 @@ impl VirtualBridge { let (mut tcp, payload) = TcpHeader::from_slice(payload)?; tcp.checksum = tcp.calc_checksum_ipv4(&ipv4, payload)?; let tcp_header_offset = Ethernet2Header::LEN + ipv4.header_len(); - let tcp_header_bytes = tcp.to_bytes(); - for (i, b) in tcp_header_bytes.iter().enumerate() { - packet[tcp_header_offset + i] = *b; - } + let mut header = &mut packet[tcp_header_offset..]; + tcp.write(&mut header)?; } } else if header.ether_type == EtherType::IPV6 { let (ipv6, payload) = Ipv6Header::from_slice(payload)?; @@ -169,10 +167,8 @@ impl VirtualBridge { let (mut tcp, payload) = TcpHeader::from_slice(payload)?; tcp.checksum = tcp.calc_checksum_ipv6(&ipv6, payload)?; let tcp_header_offset = Ethernet2Header::LEN + ipv6.header_len(); - let tcp_header_bytes = tcp.to_bytes(); - for (i, b) in tcp_header_bytes.iter().enumerate() { - packet[tcp_header_offset + i] = *b; - } + let mut header = &mut packet[tcp_header_offset..]; + tcp.write(&mut header)?; } }