mirror of
				https://github.com/edera-dev/krata.git
				synced 2025-11-03 23:29:39 +00:00 
			
		
		
		
	network: gigabit performance tuning
This commit is contained in:
		@ -4,6 +4,7 @@ use crate::nat::Nat;
 | 
			
		||||
use crate::proxynat::ProxyNatHandlerFactory;
 | 
			
		||||
use crate::raw_socket::{AsyncRawSocketChannel, RawSocketHandle, RawSocketProtocol};
 | 
			
		||||
use crate::vbridge::{BridgeJoinHandle, VirtualBridge};
 | 
			
		||||
use crate::FORCE_MTU;
 | 
			
		||||
use anyhow::{anyhow, Result};
 | 
			
		||||
use bytes::BytesMut;
 | 
			
		||||
use futures::TryStreamExt;
 | 
			
		||||
@ -16,7 +17,7 @@ use tokio::select;
 | 
			
		||||
use tokio::sync::mpsc::{channel, Receiver};
 | 
			
		||||
use tokio::task::JoinHandle;
 | 
			
		||||
 | 
			
		||||
const TX_CHANNEL_BUFFER_LEN: usize = 1000;
 | 
			
		||||
const TX_CHANNEL_BUFFER_LEN: usize = 3000;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct NetworkBackend {
 | 
			
		||||
@ -119,9 +120,8 @@ impl NetworkBackend {
 | 
			
		||||
            self.metadata.gateway.ipv4.into(),
 | 
			
		||||
            self.metadata.gateway.ipv6.into(),
 | 
			
		||||
        ];
 | 
			
		||||
        let mut kdev =
 | 
			
		||||
            RawSocketHandle::bound_to_interface(&interface, RawSocketProtocol::Ethernet)?;
 | 
			
		||||
        let mtu = kdev.mtu_of_interface(&interface)?;
 | 
			
		||||
        let kdev = RawSocketHandle::bound_to_interface(&interface, RawSocketProtocol::Ethernet)?;
 | 
			
		||||
        let mtu = FORCE_MTU;
 | 
			
		||||
        let (tx_sender, tx_receiver) = channel::<BytesMut>(TX_CHANNEL_BUFFER_LEN);
 | 
			
		||||
        let mut udev = ChannelDevice::new(mtu, Medium::Ethernet, tx_sender.clone());
 | 
			
		||||
        let mac = self.metadata.gateway.mac;
 | 
			
		||||
 | 
			
		||||
@ -16,16 +16,19 @@ use tokio_tun::Tun;
 | 
			
		||||
 | 
			
		||||
use crate::vbridge::{BridgeJoinHandle, VirtualBridge};
 | 
			
		||||
 | 
			
		||||
const RX_BUFFER_QUEUE_LEN: usize = 100;
 | 
			
		||||
const HOST_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(10, 75, 0, 1);
 | 
			
		||||
 | 
			
		||||
pub struct HostBridge {
 | 
			
		||||
    task: JoinHandle<()>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl HostBridge {
 | 
			
		||||
    pub async fn new(interface: String, bridge: &VirtualBridge) -> Result<HostBridge> {
 | 
			
		||||
    pub async fn new(mtu: usize, interface: String, bridge: &VirtualBridge) -> Result<HostBridge> {
 | 
			
		||||
        let tun = Tun::builder()
 | 
			
		||||
            .name(&interface)
 | 
			
		||||
            .tap(true)
 | 
			
		||||
            .mtu(1500)
 | 
			
		||||
            .mtu(mtu as i32)
 | 
			
		||||
            .packet_info(false)
 | 
			
		||||
            .try_build()?;
 | 
			
		||||
 | 
			
		||||
@ -48,11 +51,7 @@ impl HostBridge {
 | 
			
		||||
 | 
			
		||||
        handle
 | 
			
		||||
            .address()
 | 
			
		||||
            .add(
 | 
			
		||||
                link.header.index,
 | 
			
		||||
                IpAddr::V4(Ipv4Addr::new(10, 75, 0, 1)),
 | 
			
		||||
                16,
 | 
			
		||||
            )
 | 
			
		||||
            .add(link.header.index, IpAddr::V4(HOST_IPV4_ADDR), 16)
 | 
			
		||||
            .execute()
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
@ -74,7 +73,7 @@ impl HostBridge {
 | 
			
		||||
        let bridge_handle = bridge.join(mac).await?;
 | 
			
		||||
 | 
			
		||||
        let task = tokio::task::spawn(async move {
 | 
			
		||||
            if let Err(error) = HostBridge::process(tun, bridge_handle).await {
 | 
			
		||||
            if let Err(error) = HostBridge::process(mtu, tun, bridge_handle).await {
 | 
			
		||||
                error!("failed to process host bridge: {}", error);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
@ -82,11 +81,11 @@ impl HostBridge {
 | 
			
		||||
        Ok(HostBridge { task })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn process(tun: Tun, mut bridge_handle: BridgeJoinHandle) -> Result<()> {
 | 
			
		||||
        let (rx_sender, mut rx_receiver) = channel::<BytesMut>(100);
 | 
			
		||||
    async fn process(mtu: usize, tun: Tun, mut bridge_handle: BridgeJoinHandle) -> Result<()> {
 | 
			
		||||
        let (rx_sender, mut rx_receiver) = channel::<BytesMut>(RX_BUFFER_QUEUE_LEN);
 | 
			
		||||
        let (mut read, mut write) = tokio::io::split(tun);
 | 
			
		||||
        tokio::task::spawn(async move {
 | 
			
		||||
            let mut buffer = vec![0u8; 1500];
 | 
			
		||||
            let mut buffer = vec![0u8; mtu];
 | 
			
		||||
            loop {
 | 
			
		||||
                let size = match read.read(&mut buffer).await {
 | 
			
		||||
                    Ok(size) => size,
 | 
			
		||||
 | 
			
		||||
@ -1,4 +1,4 @@
 | 
			
		||||
use std::{collections::HashMap, thread, time::Duration};
 | 
			
		||||
use std::{collections::HashMap, time::Duration};
 | 
			
		||||
 | 
			
		||||
use anyhow::Result;
 | 
			
		||||
use autonet::{AutoNetworkChangeset, AutoNetworkCollector, NetworkMetadata};
 | 
			
		||||
@ -22,6 +22,8 @@ pub mod proxynat;
 | 
			
		||||
pub mod raw_socket;
 | 
			
		||||
pub mod vbridge;
 | 
			
		||||
 | 
			
		||||
pub const FORCE_MTU: usize = 20000;
 | 
			
		||||
 | 
			
		||||
pub struct NetworkService {
 | 
			
		||||
    pub backends: HashMap<Uuid, JoinHandle<()>>,
 | 
			
		||||
    pub bridge: VirtualBridge,
 | 
			
		||||
@ -31,7 +33,7 @@ pub struct NetworkService {
 | 
			
		||||
impl NetworkService {
 | 
			
		||||
    pub async fn new() -> Result<NetworkService> {
 | 
			
		||||
        let bridge = VirtualBridge::new()?;
 | 
			
		||||
        let hbridge = HostBridge::new("krata0".to_string(), &bridge).await?;
 | 
			
		||||
        let hbridge = HostBridge::new(FORCE_MTU, "krata0".to_string(), &bridge).await?;
 | 
			
		||||
        Ok(NetworkService {
 | 
			
		||||
            backends: HashMap::new(),
 | 
			
		||||
            bridge,
 | 
			
		||||
@ -45,12 +47,13 @@ impl NetworkService {
 | 
			
		||||
        let mut collector = AutoNetworkCollector::new().await?;
 | 
			
		||||
        loop {
 | 
			
		||||
            let changeset = collector.read_changes().await?;
 | 
			
		||||
            self.process_network_changeset(&mut collector, changeset)?;
 | 
			
		||||
            self.process_network_changeset(&mut collector, changeset)
 | 
			
		||||
                .await?;
 | 
			
		||||
            sleep(Duration::from_secs(2)).await;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn process_network_changeset(
 | 
			
		||||
    async fn process_network_changeset(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        collector: &mut AutoNetworkCollector,
 | 
			
		||||
        changeset: AutoNetworkChangeset,
 | 
			
		||||
@ -70,28 +73,25 @@ impl NetworkService {
 | 
			
		||||
            })
 | 
			
		||||
            .collect::<Vec<_>>();
 | 
			
		||||
 | 
			
		||||
        thread::sleep(Duration::from_secs(1));
 | 
			
		||||
        let (launched, failed) = futures::executor::block_on(async move {
 | 
			
		||||
            let mut failed: Vec<Uuid> = Vec::new();
 | 
			
		||||
            let mut launched: Vec<(Uuid, JoinHandle<()>)> = Vec::new();
 | 
			
		||||
            let results = join_all(futures).await;
 | 
			
		||||
            for result in results {
 | 
			
		||||
                match result {
 | 
			
		||||
                    Ok(launch) => {
 | 
			
		||||
                        launched.push(launch);
 | 
			
		||||
                    }
 | 
			
		||||
        sleep(Duration::from_secs(1)).await;
 | 
			
		||||
        let mut failed: Vec<Uuid> = Vec::new();
 | 
			
		||||
        let mut launched: Vec<(Uuid, JoinHandle<()>)> = Vec::new();
 | 
			
		||||
        let results = join_all(futures).await;
 | 
			
		||||
        for result in results {
 | 
			
		||||
            match result {
 | 
			
		||||
                Ok(launch) => {
 | 
			
		||||
                    launched.push(launch);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                    Err((metadata, error)) => {
 | 
			
		||||
                        warn!(
 | 
			
		||||
                            "failed to launch network backend for krata guest {}: {}",
 | 
			
		||||
                            metadata.uuid, error
 | 
			
		||||
                        );
 | 
			
		||||
                        failed.push(metadata.uuid);
 | 
			
		||||
                    }
 | 
			
		||||
                };
 | 
			
		||||
            }
 | 
			
		||||
            (launched, failed)
 | 
			
		||||
        });
 | 
			
		||||
                Err((metadata, error)) => {
 | 
			
		||||
                    warn!(
 | 
			
		||||
                        "failed to launch network backend for krata guest {}: {}",
 | 
			
		||||
                        metadata.uuid, error
 | 
			
		||||
                    );
 | 
			
		||||
                    failed.push(metadata.uuid);
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (uuid, handle) in launched {
 | 
			
		||||
            self.backends.insert(uuid, handle);
 | 
			
		||||
 | 
			
		||||
@ -33,8 +33,8 @@ use super::key::NatKey;
 | 
			
		||||
use super::key::NatKeyProtocol;
 | 
			
		||||
use super::table::NatTable;
 | 
			
		||||
 | 
			
		||||
const RECLAIM_CHANNEL_QUEUE_LEN: usize = 10;
 | 
			
		||||
const RECEIVE_CHANNEL_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const RECEIVE_CHANNEL_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const RECLAIM_CHANNEL_QUEUE_LEN: usize = 30;
 | 
			
		||||
 | 
			
		||||
pub struct NatProcessor {
 | 
			
		||||
    mtu: usize,
 | 
			
		||||
 | 
			
		||||
@ -29,11 +29,10 @@ use crate::{
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
const TCP_BUFFER_SIZE: usize = 65535;
 | 
			
		||||
const TCP_IP_BUFFER_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const TCP_ACCEPT_TIMEOUT_SECS: u64 = 120;
 | 
			
		||||
const TCP_DANGLE_TIMEOUT_SECS: u64 = 10;
 | 
			
		||||
 | 
			
		||||
const TCP_IP_BUFFER_LEN: usize = 1000;
 | 
			
		||||
 | 
			
		||||
pub struct ProxyTcpHandler {
 | 
			
		||||
    rx_sender: Sender<BytesMut>,
 | 
			
		||||
}
 | 
			
		||||
@ -110,7 +109,7 @@ impl ProxyTcpHandler {
 | 
			
		||||
        mut external_socket: TcpStream,
 | 
			
		||||
        mut rx_receiver: Receiver<BytesMut>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let (ip_sender, mut ip_receiver) = channel::<BytesMut>(TCP_IP_BUFFER_LEN);
 | 
			
		||||
        let (ip_sender, mut ip_receiver) = channel::<BytesMut>(TCP_IP_BUFFER_QUEUE_LEN);
 | 
			
		||||
        let mut external_buffer = vec![0u8; TCP_BUFFER_SIZE];
 | 
			
		||||
 | 
			
		||||
        let mut device = ChannelDevice::new(
 | 
			
		||||
 | 
			
		||||
@ -11,8 +11,8 @@ use tokio::select;
 | 
			
		||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
 | 
			
		||||
use tokio::task::JoinHandle;
 | 
			
		||||
 | 
			
		||||
const RAW_SOCKET_TRANSMIT_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const RAW_SOCKET_RECEIVE_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const RAW_SOCKET_TRANSMIT_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const RAW_SOCKET_RECEIVE_QUEUE_LEN: usize = 3000;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum RawSocketProtocol {
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,10 @@ use tokio::{
 | 
			
		||||
    task::JoinHandle,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
const TO_BRIDGE_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const FROM_BRIDGE_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const BROADCAST_QUEUE_LEN: usize = 1000;
 | 
			
		||||
const MEMBER_LEAVE_QUEUE_LEN: usize = 10;
 | 
			
		||||
const TO_BRIDGE_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const FROM_BRIDGE_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const BROADCAST_QUEUE_LEN: usize = 3000;
 | 
			
		||||
const MEMBER_LEAVE_QUEUE_LEN: usize = 30;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct BridgeMember {
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user