rework and cleanup

This commit is contained in:
Alex Zenla 2024-07-25 05:36:48 -07:00
parent 2c868ce0ca
commit c9783dc244
No known key found for this signature in database
GPG Key ID: 067B238899B51269
7 changed files with 3758 additions and 108 deletions

3667
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -34,24 +34,51 @@ impl IpReservationStore {
}
pub async fn list(&self) -> Result<HashMap<Uuid, IpReservation>> {
let mut reservations: HashMap<Uuid, IpReservation> = HashMap::new();
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
for result in table.iter()? {
let (key, value) = result?;
let uuid = Uuid::from_u128_le(key.value());
let reservation = match serde_json::from_slice(value.value()) {
Ok(reservation) => reservation,
Err(error) => {
error!(
"found invalid ip reservation in database for uuid {}: {}",
uuid, error
);
continue;
}
};
reservations.insert(uuid, reservation);
enum ListEntry {
Valid(Uuid, IpReservation),
Invalid(Uuid),
}
let mut reservations: HashMap<Uuid, IpReservation> = HashMap::new();
let corruptions = {
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
table
.iter()?
.flat_map(|result| {
result.map(|(key, value)| {
let uuid = Uuid::from_u128_le(key.value());
match serde_json::from_slice::<IpReservation>(value.value()) {
Ok(reservation) => ListEntry::Valid(uuid, reservation),
Err(error) => {
error!(
"found invalid ip reservation in database for uuid {}: {}",
uuid, error
);
ListEntry::Invalid(uuid)
}
}
})
})
.filter_map(|entry| match entry {
ListEntry::Valid(uuid, reservation) => {
reservations.insert(uuid, reservation);
None
}
ListEntry::Invalid(uuid) => Some(uuid),
})
.collect::<Vec<Uuid>>()
};
if !corruptions.is_empty() {
let write = self.db.database.begin_write()?;
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
for corruption in corruptions {
table.remove(corruption.to_u128_le())?;
}
}
Ok(reservations)
}

View File

@ -139,34 +139,49 @@ impl DaemonIdm {
data: Option<Vec<u8>>,
buffers: &mut HashMap<u32, BytesMut>,
) -> Result<()> {
// check if data is present, if it is not, that signals a closed channel.
if let Some(data) = data {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
buffer.extend_from_slice(&data);
loop {
// check if the buffer is less than the header size, if so, wait for more data
if buffer.len() < 6 {
break;
}
// check for the magic bytes 0xff, 0xff at the start of the message, if that doesn't
// exist, clear the buffer. this ensures that partial messages won't be processed.
if buffer[0] != 0xff || buffer[1] != 0xff {
buffer.clear();
break;
return Ok(());
}
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
// read the size from the buffer as a little endian u32
let size = (buffer[2] as u32
| (buffer[3] as u32) << 8
| (buffer[4] as u32) << 16
| (buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if buffer.len() < needed {
break;
return Ok(());
}
let mut packet = buffer.split_to(needed);
// advance the buffer by the header, leaving only the raw data.
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
let _ =
client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds)
.await?;
let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet.clone());
}
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet });
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
from: domid,
to: 0,
packet,
});
}
Err(packet) => {
@ -187,8 +202,10 @@ impl DaemonIdm {
let data = packet.encode_to_vec();
let mut buffer = vec![0u8; 6];
let length = data.len() as u32;
// magic bytes
buffer[0] = 0xff;
buffer[1] = 0xff;
// little endian u32 for message size
buffer[2] = length as u8;
buffer[3] = (length << 8) as u8;
buffer[4] = (length << 16) as u8;

View File

@ -84,40 +84,22 @@ impl IpAssignment {
gateway_ipv6: Option<Ipv6Addr>,
gateway_mac: Option<MacAddr6>,
) -> Result<IpReservation> {
let mut found_ipv4: Option<Ipv4Addr> = None;
for ip in ipv4_network.iter() {
if ip.is_loopback() || ip.is_multicast() || ip.is_broadcast() {
continue;
}
let found_ipv4: Option<Ipv4Addr> = ipv4_network
.iter()
.filter(|ip| {
ip.is_private() && !(ip.is_loopback() || ip.is_multicast() || ip.is_broadcast())
})
.filter(|ip| {
let last = ip.octets()[3];
// filter for IPs ending in .1 to .250 because .250+ can have special meaning
last > 0 && last < 250
})
.find(|ip| !state.ipv4.contains_key(ip));
if !ip.is_private() {
continue;
}
let last = ip.octets()[3];
if last == 0 || last > 250 {
continue;
}
if state.ipv4.contains_key(&ip) {
continue;
}
found_ipv4 = Some(ip);
break;
}
let mut found_ipv6: Option<Ipv6Addr> = None;
for ip in ipv6_network.iter() {
if ip.is_loopback() || ip.is_multicast() {
continue;
}
if state.ipv6.contains_key(&ip) {
continue;
}
found_ipv6 = Some(ip);
break;
}
let found_ipv6: Option<Ipv6Addr> = ipv6_network
.iter()
.filter(|ip| !ip.is_loopback() && !ip.is_multicast())
.find(|ip| !state.ipv6.contains_key(ip));
let Some(ipv4) = found_ipv4 else {
return Err(anyhow!(

View File

@ -24,7 +24,6 @@ use tokio::{
sync::mpsc::{channel, Sender},
task::JoinHandle,
};
use tokio::runtime::Runtime;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use uuid::Uuid;
@ -108,44 +107,30 @@ impl Daemon {
debug!("initializing caches and hydrating zone state");
let seed = config.oci.seed.clone().map(PathBuf::from);
let packer = OciPackerService::new(seed, &image_cache_dir, OciPlatform::current()).await?;
let glt = ZoneLookupTable::new(0, host_uuid);
let zones_db_path = format!("{}/zones.db", store);
let zones = ZoneStore::open(&PathBuf::from(zones_db_path))?;
let (zone_reconciler_notify, zone_reconciler_receiver) =
channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
debug!("initializing core runtime");
let runtime = Runtime::new(host_uuid).await?;
debug!("starting IDM service");
let idm = DaemonIdm::new(glt.clone()).await?;
let idm = idm.launch().await?;
debug!("initializing console interfaces");
let console = DaemonConsole::new(glt.clone()).await?;
let runtime = Runtime::new().await?;
let zlt = ZoneLookupTable::new(0, host_uuid);
let db_path = format!("{}/zones.db", store);
let db_path = format!("{}/krata.db", store);
let database = KrataDatabase::open(Path::new(&db_path))?;
let zones = ZoneStore::open(database.clone())?;
let (zone_reconciler_notify, zone_reconciler_receiver) =
channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
debug!("starting IDM service");
let idm = DaemonIdm::new(zlt.clone()).await?;
let idm = idm.launch().await?;
debug!("initializing console interfaces");
let console = DaemonConsole::new(zlt.clone()).await?;
let console = console.launch().await?;
debug!("initializing zone reconciler");
let (events, generator) =
DaemonEventGenerator::new(zones.clone(), zone_reconciler_notify.clone(), idm.clone())
.await?;
let runtime_for_reconciler = runtime.dupe().await?;
let ipv4_network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?;
let ipv6_network = Ipv6Network::from_str("fdd4:1476:6c7e::/48")?;
let ip_reservation_store = IpReservationStore::open(database)?;
let ip_assignment =
IpAssignment::new(host_uuid, ipv4_network, ipv6_network, ip_reservation_store).await?;
debug!("initializing zone reconciler");
let zone_reconciler = ZoneReconciler::new(
devices.clone(),
zlt.clone(),

View File

@ -1,17 +1,10 @@
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
<<<<<<< HEAD
use ip::IpVendor;
use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network};
use krataloopdev::LoopControl;
use log::{debug, error};
=======
>>>>>>> 267dc66 (feature(krata): rework api and make ip assignment persistent to database)
use tokio::sync::Semaphore;
use uuid::Uuid;
use krataloopdev::LoopControl;
use xenclient::XenClient;
use xenstore::{XsdClient, XsdInterface};
@ -62,32 +55,11 @@ pub struct RuntimeContext {
}
impl RuntimeContext {
<<<<<<< HEAD
pub async fn new(host_uuid: Uuid) -> Result<Self> {
debug!("initializing XenClient");
let xen = XenClient::new(0, RuntimePlatform::new()).await?;
debug!("initializing ip allocation vendor");
let ipv4_network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?;
let ipv6_network = Ipv6Network::from_str("fdd4:1476:6c7e::/48")?;
let ipvendor =
IpVendor::new(xen.store.clone(), host_uuid, ipv4_network, ipv6_network).await?;
debug!("initializing loop devices");
let autoloop = AutoLoop::new(LoopControl::open()?);
debug!("krata runtime initialized!");
=======
pub async fn new() -> Result<Self> {
let xen = XenClient::new(0, RuntimePlatform::new()).await?;
>>>>>>> 267dc66 (feature(krata): rework api and make ip assignment persistent to database)
Ok(RuntimeContext {
autoloop,
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
<<<<<<< HEAD
ipvendor,
=======
>>>>>>> 267dc66 (feature(krata): rework api and make ip assignment persistent to database)
})
}

View File

@ -7,7 +7,7 @@ use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort}
use crate::raw::EVENT_CHANNEL_DEVICE;
use byteorder::{LittleEndian, ReadBytesExt};
use log::warn;
use log::error;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::size_of;
@ -16,7 +16,6 @@ use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
@ -185,8 +184,9 @@ impl EventChannelProcessor {
if self.flag.load(Ordering::Acquire) {
break;
}
};
}
error!("failed to process event channel wakes: {}", error);
}
});
Ok(())
}