network: performance tuning using tear off buffers

This commit is contained in:
Alex Zenla 2024-03-29 03:09:41 +00:00
parent 7d7da1f9ee
commit d659b3aa55
No known key found for this signature in database
GPG Key ID: 067B238899B51269
4 changed files with 71 additions and 64 deletions

View File

@ -44,10 +44,11 @@ struct NetworkStack<'a> {
impl NetworkStack<'_> { impl NetworkStack<'_> {
async fn poll(&mut self) -> Result<bool> { async fn poll(&mut self) -> Result<bool> {
let what = select! { let what = select! {
biased;
x = self.kdev.receiver.recv() => NetworkStackSelect::Receive(x), x = self.kdev.receiver.recv() => NetworkStackSelect::Receive(x),
x = self.tx.recv() => NetworkStackSelect::Send(x),
x = self.bridge.from_bridge_receiver.recv() => NetworkStackSelect::Send(x), x = self.bridge.from_bridge_receiver.recv() => NetworkStackSelect::Send(x),
x = self.bridge.from_broadcast_receiver.recv() => NetworkStackSelect::Send(x.ok()), x = self.bridge.from_broadcast_receiver.recv() => NetworkStackSelect::Send(x.ok()),
x = self.tx.recv() => NetworkStackSelect::Send(x),
}; };
match what { match what {

View File

@ -1,4 +1,7 @@
use std::net::{IpAddr, Ipv4Addr}; use std::{
io::ErrorKind,
net::{IpAddr, Ipv4Addr},
};
use advmac::MacAddr6; use advmac::MacAddr6;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
@ -6,19 +9,19 @@ use bytes::BytesMut;
use futures::TryStreamExt; use futures::TryStreamExt;
use log::error; use log::error;
use smoltcp::wire::EthernetAddress; use smoltcp::wire::EthernetAddress;
use tokio::{ use tokio::{select, task::JoinHandle};
io::{AsyncReadExt, AsyncWriteExt},
select,
sync::mpsc::channel,
task::JoinHandle,
};
use tokio_tun::Tun; use tokio_tun::Tun;
use crate::vbridge::{BridgeJoinHandle, VirtualBridge}; use crate::vbridge::{BridgeJoinHandle, VirtualBridge};
const RX_BUFFER_QUEUE_LEN: usize = 100;
const HOST_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(10, 75, 0, 1); const HOST_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(10, 75, 0, 1);
#[derive(Debug)]
enum HostBridgeProcessSelect {
Send(Option<BytesMut>),
Receive(std::io::Result<usize>),
}
pub struct HostBridge { pub struct HostBridge {
task: JoinHandle<()>, task: JoinHandle<()>,
} }
@ -82,57 +85,58 @@ impl HostBridge {
} }
async fn process(mtu: usize, tun: Tun, mut bridge_handle: BridgeJoinHandle) -> Result<()> { async fn process(mtu: usize, tun: Tun, mut bridge_handle: BridgeJoinHandle) -> Result<()> {
let (rx_sender, mut rx_receiver) = channel::<BytesMut>(RX_BUFFER_QUEUE_LEN); let tear_off_size = 100 * mtu;
let (mut read, mut write) = tokio::io::split(tun); let mut buffer: BytesMut = BytesMut::with_capacity(tear_off_size);
tokio::task::spawn(async move { loop {
let mut buffer = vec![0u8; mtu]; if buffer.capacity() < mtu {
loop { buffer = BytesMut::with_capacity(tear_off_size);
let size = match read.read(&mut buffer).await { }
Ok(size) => size,
Err(error) => { buffer.resize(mtu, 0);
error!("failed to read tap device: {}", error); let selection = select! {
break; biased;
} x = tun.recv(&mut buffer) => HostBridgeProcessSelect::Receive(x),
}; x = bridge_handle.from_bridge_receiver.recv() => HostBridgeProcessSelect::Send(x),
match rx_sender.send(buffer[0..size].into()).await { x = bridge_handle.from_broadcast_receiver.recv() => HostBridgeProcessSelect::Send(x.ok()),
};
match selection {
HostBridgeProcessSelect::Send(Some(bytes)) => match tun.try_send(&bytes) {
Ok(_) => {} Ok(_) => {}
Err(error) => { Err(error) => {
if error.kind() == ErrorKind::WouldBlock {
continue;
}
return Err(error.into());
}
},
HostBridgeProcessSelect::Send(None) => {
break;
}
HostBridgeProcessSelect::Receive(result) => match result {
Ok(len) => {
if len == 0 {
continue;
}
let packet = buffer.split_to(len);
let _ = bridge_handle.to_bridge_sender.try_send(packet);
}
Err(error) => {
if error.kind() == ErrorKind::WouldBlock {
continue;
}
error!( error!(
"failed to send data from tap device to processor: {}", "failed to receive data from tap device to bridge: {}",
error error
); );
break; break;
} }
} },
} }
});
loop {
select! {
x = bridge_handle.from_bridge_receiver.recv() => match x {
Some(bytes) => {
write.write_all(&bytes).await?;
},
None => {
break;
}
},
x = bridge_handle.from_broadcast_receiver.recv() => match x {
Ok(bytes) => {
write.write_all(&bytes).await?;
},
Err(error) => {
return Err(error.into());
}
},
x = rx_receiver.recv() => match x {
Some(bytes) => {
bridge_handle.to_bridge_sender.send(bytes).await?;
},
None => {
break;
}
}
};
} }
Ok(()) Ok(())
} }

View File

@ -239,8 +239,13 @@ impl AsyncRawSocketChannel {
let socket = unsafe { std::net::UdpSocket::from_raw_fd(socket.into_raw_fd()) }; let socket = unsafe { std::net::UdpSocket::from_raw_fd(socket.into_raw_fd()) };
let socket = UdpSocket::from_std(socket)?; let socket = UdpSocket::from_std(socket)?;
let mut buffer = vec![0; mtu]; let tear_off_size = 100 * mtu;
let mut buffer: BytesMut = BytesMut::with_capacity(tear_off_size);
loop { loop {
if buffer.capacity() < mtu {
buffer = BytesMut::with_capacity(tear_off_size);
}
let selection = select! { let selection = select! {
x = transmit_receiver.recv() => AsyncRawSocketChannelSelect::TransmitPacket(x), x = transmit_receiver.recv() => AsyncRawSocketChannelSelect::TransmitPacket(x),
x = socket.readable() => AsyncRawSocketChannelSelect::Readable(x?), x = socket.readable() => AsyncRawSocketChannelSelect::Readable(x?),
@ -248,13 +253,14 @@ impl AsyncRawSocketChannel {
match selection { match selection {
AsyncRawSocketChannelSelect::Readable(_) => { AsyncRawSocketChannelSelect::Readable(_) => {
buffer.resize(mtu, 0);
match socket.try_recv(&mut buffer) { match socket.try_recv(&mut buffer) {
Ok(len) => { Ok(len) => {
if len == 0 { if len == 0 {
continue; continue;
} }
let buffer = (&buffer[0..len]).into(); let packet = buffer.split_to(len);
if let Err(error) = receive_sender.try_send(buffer) { if let Err(error) = receive_sender.try_send(packet) {
debug!( debug!(
"failed to process received packet from raw socket: {}", "failed to process received packet from raw socket: {}",
error error

View File

@ -133,8 +133,8 @@ impl VirtualBridge {
loop { loop {
let selection = select! { let selection = select! {
biased; biased;
_ = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent,
x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x), x = to_bridge_receiver.recv() => VirtualBridgeSelect::PacketReceived(x),
_ = from_broadcast_receiver.recv() => VirtualBridgeSelect::BroadcastSent,
x = member_leave_reciever.recv() => VirtualBridgeSelect::MemberLeave(x), x = member_leave_reciever.recv() => VirtualBridgeSelect::MemberLeave(x),
}; };
@ -158,10 +158,8 @@ impl VirtualBridge {
let (mut tcp, payload) = TcpHeader::from_slice(payload)?; let (mut tcp, payload) = TcpHeader::from_slice(payload)?;
tcp.checksum = tcp.calc_checksum_ipv4(&ipv4, payload)?; tcp.checksum = tcp.calc_checksum_ipv4(&ipv4, payload)?;
let tcp_header_offset = Ethernet2Header::LEN + ipv4.header_len(); let tcp_header_offset = Ethernet2Header::LEN + ipv4.header_len();
let tcp_header_bytes = tcp.to_bytes(); let mut header = &mut packet[tcp_header_offset..];
for (i, b) in tcp_header_bytes.iter().enumerate() { tcp.write(&mut header)?;
packet[tcp_header_offset + i] = *b;
}
} }
} else if header.ether_type == EtherType::IPV6 { } else if header.ether_type == EtherType::IPV6 {
let (ipv6, payload) = Ipv6Header::from_slice(payload)?; let (ipv6, payload) = Ipv6Header::from_slice(payload)?;
@ -169,10 +167,8 @@ impl VirtualBridge {
let (mut tcp, payload) = TcpHeader::from_slice(payload)?; let (mut tcp, payload) = TcpHeader::from_slice(payload)?;
tcp.checksum = tcp.calc_checksum_ipv6(&ipv6, payload)?; tcp.checksum = tcp.calc_checksum_ipv6(&ipv6, payload)?;
let tcp_header_offset = Ethernet2Header::LEN + ipv6.header_len(); let tcp_header_offset = Ethernet2Header::LEN + ipv6.header_len();
let tcp_header_bytes = tcp.to_bytes(); let mut header = &mut packet[tcp_header_offset..];
for (i, b) in tcp_header_bytes.iter().enumerate() { tcp.write(&mut header)?;
packet[tcp_header_offset + i] = *b;
}
} }
} }