From 9dddbbe424f467a65b0f1620939858ac74664cf8 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Thu, 8 Feb 2024 12:17:51 +0000 Subject: [PATCH] network: implement support for smoltcp and ipstack --- Cargo.toml | 1 + container/src/init.rs | 19 +++-- controller/src/ctl/mod.rs | 7 +- libs/ipstack/src/packet.rs | 3 - network/Cargo.toml | 1 + network/src/lib.rs | 65 ++++++++++++++--- network/src/raw_socket.rs | 140 ++++++++++++++++++++++++++++++++++--- shared/src/lib.rs | 8 ++- 8 files changed, 207 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d28f024..c9144de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ netlink-packet-route = "0.19.0" futures = "0.3.30" ipnetwork = "0.20.0" udp-stream = "0.0.11" +smoltcp = "0.11.0" [workspace.dependencies.uuid] version = "1.6.1" diff --git a/container/src/init.rs b/container/src/init.rs index c2677fe..16a4310 100644 --- a/container/src/init.rs +++ b/container/src/init.rs @@ -284,15 +284,17 @@ impl ContainerInit { async fn network_setup(&mut self, network: &LaunchNetwork) -> Result<()> { trace!( - "setting up network with link {} and ipv4 {}", + "setting up network for link {} with ipv4 address {} and gateway {}", network.link, - network.ipv4 + network.ipv4.address, + network.ipv4.gateway, ); let (connection, handle, _) = rtnetlink::new_connection()?; tokio::spawn(connection); - let ip: IpNetwork = network.ipv4.parse()?; + let ipnet: IpNetwork = network.ipv4.address.parse()?; + let gateway: Ipv4Addr = network.ipv4.gateway.parse()?; let mut links = handle .link() @@ -302,17 +304,11 @@ impl ContainerInit { if let Some(link) = links.try_next().await? { handle .address() - .add(link.header.index, ip.ip(), ip.prefix()) + .add(link.header.index, ipnet.ip(), ipnet.prefix()) .execute() .await?; - handle - .link() - .set(link.header.index) - .arp(false) - .up() - .execute() - .await?; + handle.link().set(link.header.index).up().execute().await?; handle .route() @@ -320,6 +316,7 @@ impl ContainerInit { .v4() .destination_prefix(Ipv4Addr::new(0, 0, 0, 0), 0) .output_interface(link.header.index) + .gateway(gateway) .execute() .await?; } else { diff --git a/controller/src/ctl/mod.rs b/controller/src/ctl/mod.rs index c254646..25c90d1 100644 --- a/controller/src/ctl/mod.rs +++ b/controller/src/ctl/mod.rs @@ -7,7 +7,7 @@ use crate::image::name::ImageName; use crate::image::{ImageCompiler, ImageInfo}; use advmac::MacAddr6; use anyhow::{anyhow, Result}; -use hypha::{LaunchInfo, LaunchNetwork}; +use hypha::{LaunchInfo, LaunchNetwork, LaunchNetworkIpv4}; use ipnetwork::Ipv4Network; use loopdev::LoopControl; use std::io::{Read, Write}; @@ -85,7 +85,10 @@ impl Controller { let launch_config = LaunchInfo { network: Some(LaunchNetwork { link: "eth0".to_string(), - ipv4: format!("{}/24", ipv4), + ipv4: LaunchNetworkIpv4 { + address: format!("{}/24", ipv4), + gateway: "192.168.42.1".to_string(), + }, }), env, run, diff --git a/libs/ipstack/src/packet.rs b/libs/ipstack/src/packet.rs index 509342f..d9cb7a0 100644 --- a/libs/ipstack/src/packet.rs +++ b/libs/ipstack/src/packet.rs @@ -1,7 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use etherparse::{Ethernet2Header, IpHeader, PacketHeaders, TcpHeader, UdpHeader, WriteError}; -use tracing::debug; use crate::error::IpStackError; @@ -42,7 +41,6 @@ pub struct NetworkPacket { impl NetworkPacket { pub fn parse(buf: &[u8]) -> Result { - debug!("read: {:?}", buf); let p = PacketHeaders::from_ethernet_slice(buf).map_err(|_| IpStackError::InvalidPacket)?; let ip = p.ip.ok_or(IpStackError::InvalidPacket)?; let transport = match p.transport { @@ -139,7 +137,6 @@ impl NetworkPacket { // .write(&mut buf) // .map_err(IpStackError::PacketWriteError)?; buf.extend_from_slice(&self.payload); - debug!("write: {:?}", buf); Ok(buf) } pub fn ttl(&self) -> u8 { diff --git a/network/Cargo.toml b/network/Cargo.toml index 17b179e..6528bea 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -15,6 +15,7 @@ tokio = { workspace = true } futures = { workspace = true } libc = { workspace = true } udp-stream = { workspace = true } +smoltcp = { workspace = true } [dependencies.advmac] path = "../libs/advmac" diff --git a/network/src/lib.rs b/network/src/lib.rs index 0b64124..a5e5ead 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,21 +1,32 @@ +use std::os::fd::AsRawFd; +use std::str::FromStr; use std::sync::{Arc, Mutex}; +use std::thread; use std::time::Duration; +use advmac::MacAddr6; use anyhow::{anyhow, Result}; -use futures::TryStreamExt; +use futures::channel::oneshot; +use futures::{try_join, TryStreamExt}; use ipstack::stream::IpStackStream; use log::{debug, error, info, warn}; use netlink_packet_route::link::LinkAttribute; use raw_socket::{AsyncRawSocket, RawSocket}; +use smoltcp::iface::{Config, Interface, SocketSet}; +use smoltcp::time::Instant; +use smoltcp::wire::{HardwareAddress, IpCidr}; use tokio::net::TcpStream; use tokio::time::sleep; use udp_stream::UdpStream; mod raw_socket; +#[derive(Clone)] pub struct NetworkBackend { + pub network: String, pub interface: String, } + pub struct NetworkService { pub network: String, } @@ -27,8 +38,9 @@ impl NetworkService { } impl NetworkBackend { - pub fn new(iface: &str) -> Result { + pub fn new(network: &str, iface: &str) -> Result { Ok(NetworkBackend { + network: network.to_string(), interface: iface.to_string(), }) } @@ -56,14 +68,16 @@ impl NetworkBackend { } pub async fn run(&mut self) -> Result<()> { + try_join!(self.run_internet_stack(), self.run_virtual_host_stack()).map(|_| ()) + } + + async fn run_internet_stack(&self) -> Result<()> { let mut config = ipstack::IpStackConfig::default(); config.mtu(1500); - config.tcp_timeout(std::time::Duration::from_secs(600)); // 10 minutes - config.udp_timeout(std::time::Duration::from_secs(10)); // 10 seconds + config.tcp_timeout(std::time::Duration::from_secs(600)); + config.udp_timeout(std::time::Duration::from_secs(10)); - let mut socket = RawSocket::new(&self.interface)?; - socket.bind_interface()?; - let socket = AsyncRawSocket::new(socket)?; + let socket = AsyncRawSocket::bind(&self.interface)?; let mut stack = ipstack::IpStack::new(config, socket); while let Ok(stream) = stack.accept().await { @@ -72,7 +86,40 @@ impl NetworkBackend { Ok(()) } - async fn process_stream(&mut self, stream: IpStackStream) -> Result<()> { + async fn run_virtual_host_stack(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + let me = self.clone(); + thread::spawn(move || { + let _ = tx.send(me.run_virtual_host_stack_blocking()); + }); + rx.await? + } + + fn run_virtual_host_stack_blocking(&self) -> Result<()> { + let address = IpCidr::from_str(&self.network) + .map_err(|_| anyhow!("failed to parse cidr: {}", self.network))?; + let addresses: Vec = vec![address]; + let mut socket = RawSocket::new(&self.interface)?; + let mac = MacAddr6::random(); + let mac = HardwareAddress::Ethernet(smoltcp::wire::EthernetAddress(mac.to_array())); + let config = Config::new(mac); + let mut iface = Interface::new(config, &mut socket, Instant::now()); + iface.update_ip_addrs(|addrs| { + addrs + .extend_from_slice(&addresses) + .expect("failed to set ip addresses"); + }); + + let mut sockets = SocketSet::new(vec![]); + let fd = socket.as_raw_fd(); + loop { + let timestamp = Instant::now(); + iface.poll(timestamp, &mut socket, &mut sockets); + smoltcp::phy::wait(fd, iface.poll_delay(timestamp, &sockets))?; + } + } + + async fn process_stream(&self, stream: IpStackStream) -> Result<()> { match stream { IpStackStream::Tcp(mut tcp) => { debug!("tcp: {}", tcp.peer_addr()); @@ -160,7 +207,7 @@ impl NetworkService { spawned: Arc>>, ) -> Result<()> { let interface = interface.to_string(); - let mut network = NetworkBackend::new(&interface)?; + let mut network = NetworkBackend::new(&self.network, &interface)?; info!("initializing network backend for interface {}", interface); network.init().await?; tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/network/src/raw_socket.rs b/network/src/raw_socket.rs index a362369..09fabd1 100644 --- a/network/src/raw_socket.rs +++ b/network/src/raw_socket.rs @@ -1,30 +1,35 @@ +use anyhow::Result; use futures::ready; +use log::debug; +use smoltcp::phy::{Device, DeviceCapabilities, Medium}; +use smoltcp::time::Instant; +use std::cell::RefCell; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; use std::{io, mem}; - -use anyhow::Result; use tokio::io::unix::AsyncFd; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; const SIOCGIFINDEX: libc::c_ulong = 0x8933; #[derive(Debug)] -pub struct RawSocket { +pub struct RawSocketHandle { + pub mtu: usize, protocol: libc::c_short, lower: libc::c_int, ifreq: Ifreq, } -impl AsRawFd for RawSocket { +impl AsRawFd for RawSocketHandle { fn as_raw_fd(&self) -> RawFd { self.lower } } -impl RawSocket { - pub fn new(name: &str) -> io::Result { +impl RawSocketHandle { + pub fn new(interface: &str) -> io::Result { let protocol: libc::c_short = 0x0003; let lower = unsafe { let lower = libc::socket( @@ -38,13 +43,20 @@ impl RawSocket { lower }; - Ok(RawSocket { + Ok(RawSocketHandle { + mtu: 1500, protocol, lower, - ifreq: ifreq_for(name), + ifreq: ifreq_for(interface), }) } + pub fn bind(interface: &str) -> Result { + let mut socket = RawSocketHandle::new(interface)?; + socket.bind_interface()?; + Ok(socket) + } + pub fn bind_interface(&mut self) -> io::Result<()> { let sockaddr = libc::sockaddr_ll { sll_family: libc::AF_PACKET as u16, @@ -101,7 +113,7 @@ impl RawSocket { } } -impl Drop for RawSocket { +impl Drop for RawSocketHandle { fn drop(&mut self) { unsafe { libc::close(self.lower); @@ -109,6 +121,107 @@ impl Drop for RawSocket { } } +#[derive(Debug)] +pub struct RawSocket { + lower: Rc>, + mtu: usize, +} + +impl AsRawFd for RawSocket { + fn as_raw_fd(&self) -> RawFd { + self.lower.borrow().as_raw_fd() + } +} + +impl RawSocket { + pub fn new(name: &str) -> io::Result { + let mut lower = RawSocketHandle::new(name)?; + lower.bind_interface()?; + let mtu = lower.mtu; + Ok(RawSocket { + lower: Rc::new(RefCell::new(lower)), + mtu, + }) + } +} + +impl Device for RawSocket { + type RxToken<'a> = RxToken + where + Self: 'a; + type TxToken<'a> = TxToken + where + Self: 'a; + + fn capabilities(&self) -> DeviceCapabilities { + let mut capabilities = DeviceCapabilities::default(); + capabilities.medium = Medium::Ethernet; + capabilities.max_transmission_unit = self.mtu; + capabilities + } + + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + let lower = self.lower.borrow_mut(); + let mut buffer = vec![0; self.mtu]; + match lower.recv(&mut buffer[..]) { + Ok(size) => { + buffer.resize(size, 0); + let rx = RxToken { buffer }; + let tx = TxToken { + lower: self.lower.clone(), + }; + Some((rx, tx)) + } + Err(err) if err.kind() == io::ErrorKind::WouldBlock => None, + Err(err) => panic!("{}", err), + } + } + + fn transmit(&mut self, _timestamp: Instant) -> Option> { + Some(TxToken { + lower: self.lower.clone(), + }) + } +} + +#[doc(hidden)] +pub struct RxToken { + buffer: Vec, +} + +impl smoltcp::phy::RxToken for RxToken { + fn consume(mut self, f: F) -> R + where + F: FnOnce(&mut [u8]) -> R, + { + f(&mut self.buffer[..]) + } +} + +#[doc(hidden)] +pub struct TxToken { + lower: Rc>, +} + +impl smoltcp::phy::TxToken for TxToken { + fn consume(self, len: usize, f: F) -> R + where + F: FnOnce(&mut [u8]) -> R, + { + let lower = self.lower.borrow_mut(); + let mut buffer = vec![0; len]; + let result = f(&mut buffer); + match lower.send(&buffer[..]) { + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + debug!("phy: tx failed due to WouldBlock") + } + Err(err) => panic!("{}", err), + } + result + } +} + #[repr(C)] #[derive(Debug)] struct Ifreq { @@ -143,15 +256,20 @@ fn ifreq_ioctl( } pub struct AsyncRawSocket { - inner: AsyncFd, + inner: AsyncFd, } impl AsyncRawSocket { - pub fn new(socket: RawSocket) -> Result { + pub fn new(socket: RawSocketHandle) -> Result { Ok(Self { inner: AsyncFd::new(socket)?, }) } + + pub fn bind(interface: &str) -> Result { + let socket = RawSocketHandle::bind(interface)?; + AsyncRawSocket::new(socket) + } } impl AsyncRead for AsyncRawSocket { diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 1281840..8c020a2 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,9 +1,15 @@ use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchNetworkIpv4 { + pub address: String, + pub gateway: String, +} + #[derive(Serialize, Deserialize, Debug)] pub struct LaunchNetwork { pub link: String, - pub ipv4: String, + pub ipv4: LaunchNetworkIpv4, } #[derive(Serialize, Deserialize, Debug)]