diff --git a/crates/krata/build.rs b/crates/krata/build.rs index f7f99d4..051325e 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -6,13 +6,19 @@ fn main() -> Result<()> { .descriptor_pool("crate::DESCRIPTOR_POOL") .configure( &mut config, - &["../../proto/krata/v1/control.proto"], - &["../../proto/"], + &[ + "../../proto/krata/v1/control.proto", + "proto/krata/internal/idm.proto", + ], + &["../../proto/", "proto/"], )?; tonic_build::configure().compile_with_config( config, - &["../../proto/krata/v1/control.proto"], - &["../../proto/"], + &[ + "../../proto/krata/v1/control.proto", + "proto/krata/internal/idm.proto", + ], + &["../../proto/", "proto/"], )?; Ok(()) } diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/internal/idm.proto new file mode 100644 index 0000000..f824799 --- /dev/null +++ b/crates/krata/proto/krata/internal/idm.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package krata.internal.idm; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.internal.idm"; +option java_outer_classname = "IdmProto"; + +message IdmExitMessage { + int32 code = 1; +} + +message IdmPacket { + oneof message { + IdmExitMessage exit = 1; + } +} diff --git a/crates/krata/src/idm.rs b/crates/krata/src/idm.rs new file mode 100644 index 0000000..cb0a716 --- /dev/null +++ b/crates/krata/src/idm.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/krata.idm.rs")); diff --git a/crates/kratad/src/idm.rs b/crates/kratad/src/idm.rs new file mode 100644 index 0000000..f0a1cfb --- /dev/null +++ b/crates/kratad/src/idm.rs @@ -0,0 +1,40 @@ +use anyhow::Result; +use kratart::channel::ChannelService; +use log::error; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; + +pub struct DaemonIdm { + receiver: Receiver<(u32, Vec)>, + task: JoinHandle<()>, +} + +impl DaemonIdm { + pub async fn new() -> Result { + let (service, receiver) = ChannelService::new("krata-channel".to_string()).await?; + let task = service.launch().await?; + Ok(DaemonIdm { receiver, task }) + } + + pub async fn launch(mut self) -> Result> { + Ok(tokio::task::spawn(async move { + if let Err(error) = self.process().await { + error!("failed to process idm: {}", error); + } + })) + } + + async fn process(&mut self) -> Result<()> { + loop { + let Some(_) = self.receiver.recv().await else { + break; + }; + } + Ok(()) + } +} + +impl Drop for DaemonIdm { + fn drop(&mut self) { + self.task.abort(); + } +} diff --git a/crates/kratad/src/lib.rs b/crates/kratad/src/lib.rs index 56ff2b5..c8f726d 100644 --- a/crates/kratad/src/lib.rs +++ b/crates/kratad/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::Result; use control::RuntimeControlService; use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; +use idm::DaemonIdm; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; use kratart::Runtime; use log::info; @@ -20,6 +21,7 @@ use uuid::Uuid; pub mod control; pub mod db; pub mod event; +pub mod idm; pub mod reconcile; pub struct Daemon { @@ -30,6 +32,7 @@ pub struct Daemon { guest_reconciler_task: JoinHandle<()>, guest_reconciler_notify: Sender, generator_task: JoinHandle<()>, + idm_task: JoinHandle<()>, } const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; @@ -50,14 +53,20 @@ impl Daemon { let runtime_for_reconciler = runtime.dupe().await?; let guest_reconciler = GuestReconciler::new(guests.clone(), events.clone(), runtime_for_reconciler)?; + + let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?; + let idm = DaemonIdm::new().await?; + let idm_task = idm.launch().await?; + let generator_task = generator.launch().await?; Ok(Self { store, runtime, guests, events, - guest_reconciler_task: guest_reconciler.launch(guest_reconciler_receiver).await?, + guest_reconciler_task, guest_reconciler_notify, - generator_task: generator.launch().await?, + generator_task, + idm_task, }) } @@ -121,5 +130,6 @@ impl Drop for Daemon { fn drop(&mut self) { self.guest_reconciler_task.abort(); self.generator_task.abort(); + self.idm_task.abort(); } } diff --git a/crates/kratart/examples/channel.rs b/crates/kratart/examples/channel.rs index fa920e9..d99c1ea 100644 --- a/crates/kratart/examples/channel.rs +++ b/crates/kratart/examples/channel.rs @@ -1,19 +1,23 @@ use anyhow::Result; use env_logger::Env; -use kratart::chan::KrataChannelService; -use xenevtchn::EventChannel; -use xengnt::GrantTab; -use xenstore::XsdClient; +use kratart::channel::ChannelService; #[tokio::main] async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let mut krata = KrataChannelService::new( - EventChannel::open().await?, - XsdClient::open().await?, - GrantTab::open()?, - )?; - krata.watch().await?; + let (service, mut receiver) = ChannelService::new("krata-channel".to_string()).await?; + let task = service.launch().await?; + + loop { + let Some((id, data)) = receiver.recv().await else { + break; + }; + + println!("domain {} = {:?}", id, data); + } + + task.abort(); + Ok(()) } diff --git a/crates/kratart/src/chan.rs b/crates/kratart/src/channel.rs similarity index 80% rename from crates/kratart/src/chan.rs rename to crates/kratart/src/channel.rs index 3584052..6df439e 100644 --- a/crates/kratart/src/chan.rs +++ b/crates/kratart/src/channel.rs @@ -5,7 +5,7 @@ use std::{ }; use anyhow::{anyhow, Result}; -use log::{error, info}; +use log::{debug, error}; use tokio::{ select, sync::{ @@ -19,7 +19,8 @@ use xenevtchn::EventChannel; use xengnt::{sys::GrantRef, GrantTab, MappedMemory}; use xenstore::{XsdClient, XsdInterface}; -const KRATA_SINGLE_CHANNEL_QUEUE_LEN: usize = 100; +const SINGLE_CHANNEL_QUEUE_LEN: usize = 100; +const GROUPED_CHANNEL_QUEUE_LEN: usize = 1000; #[repr(C)] struct XenConsoleInterface { @@ -38,45 +39,87 @@ impl XenConsoleInterface { const OUTPUT_SIZE: usize = 2048; } -pub struct KrataChannelService { - backends: HashMap<(u32, u32), KrataChannelBackend>, +pub struct ChannelService { + typ: String, + backends: HashMap, evtchn: EventChannel, store: XsdClient, gnttab: GrantTab, + input_receiver: Receiver<(u32, Vec)>, + pub input_sender: Sender<(u32, Vec)>, + output_sender: Sender<(u32, Vec)>, } -impl KrataChannelService { - pub fn new( - evtchn: EventChannel, - store: XsdClient, - gnttab: GrantTab, - ) -> Result { - Ok(KrataChannelService { - backends: HashMap::new(), - evtchn, - store, - gnttab, - }) +impl ChannelService { + pub async fn new(typ: String) -> Result<(ChannelService, Receiver<(u32, Vec)>)> { + let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); + let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); + Ok(( + ChannelService { + typ, + backends: HashMap::new(), + evtchn: EventChannel::open().await?, + store: XsdClient::open().await?, + gnttab: GrantTab::open()?, + input_sender, + input_receiver, + output_sender, + }, + output_receiver, + )) } - pub async fn watch(&mut self) -> Result<()> { + pub async fn launch(mut self) -> Result> { + Ok(tokio::task::spawn(async move { + if let Err(error) = self.process().await { + error!("channel processor failed: {}", error); + } + })) + } + + async fn process(&mut self) -> Result<()> { self.scan_all_backends().await?; let mut watch_handle = self.store.create_watch().await?; self.store .bind_watch(&watch_handle, "/local/domain/0/backend/console".to_string()) .await?; loop { - let Some(_) = watch_handle.receiver.recv().await else { - break; - }; + select! { + x = watch_handle.receiver.recv() => match x { + Some(_) => { + self.scan_all_backends().await?; + } - self.scan_all_backends().await?; + None => { + break; + } + }, + + x = self.input_receiver.recv() => match x { + Some((domid, data)) => { + if let Some(backend) = self.backends.get_mut(&domid) { + let _ = backend.sender.try_send(data); + } + }, + + None => { + break; + } + } + } + } + Ok(()) + } + + pub async fn send(&mut self, domid: u32, message: Vec) -> Result<()> { + if let Some(backend) = self.backends.get(&domid) { + backend.sender.send(message).await?; } Ok(()) } async fn ensure_backend_exists(&mut self, domid: u32, id: u32, path: String) -> Result<()> { - if self.backends.contains_key(&(domid, id)) { + if self.backends.contains_key(&domid) { return Ok(()); } let Some(frontend_path) = self.store.read_string(format!("{}/frontend", path)).await? @@ -91,11 +134,11 @@ impl KrataChannelService { return Ok(()); }; - if typ != "krata-channel" { + if typ != self.typ { return Ok(()); } - let backend = KrataChannelBackend::new( + let backend = ChannelBackend::new( path.clone(), frontend_path.clone(), domid, @@ -103,15 +146,16 @@ impl KrataChannelService { self.store.clone(), self.evtchn.clone(), self.gnttab.clone(), + self.output_sender.clone(), ) .await?; - self.backends.insert((domid, id), backend); + self.backends.insert(domid, backend); Ok(()) } async fn scan_all_backends(&mut self) -> Result<()> { let domains = self.store.list("/local/domain/0/backend/console").await?; - let mut seen: Vec<(u32, u32)> = Vec::new(); + let mut seen: Vec = Vec::new(); for domid_string in &domains { let domid = domid_string.parse::()?; let domid_path = format!("/local/domain/0/backend/console/{}", domid); @@ -122,11 +166,11 @@ impl KrataChannelService { domid_string, id_string ); self.ensure_backend_exists(domid, id, console_path).await?; - seen.push((domid, id)); + seen.push(domid); } } - let mut gone: Vec<(u32, u32)> = Vec::new(); + let mut gone: Vec = Vec::new(); for backend in self.backends.keys() { if !seen.contains(backend) { gone.push(*backend); @@ -143,25 +187,25 @@ impl KrataChannelService { } } -pub struct KrataChannelBackend { +pub struct ChannelBackend { pub domid: u32, pub id: u32, - pub receiver: Receiver>, pub sender: Sender>, task: JoinHandle<()>, } -impl Drop for KrataChannelBackend { +impl Drop for ChannelBackend { fn drop(&mut self) { self.task.abort(); - info!( + debug!( "destroyed channel backend for domain {} channel {}", self.domid, self.id ); } } -impl KrataChannelBackend { +impl ChannelBackend { + #[allow(clippy::too_many_arguments)] pub async fn new( backend: String, frontend: String, @@ -170,7 +214,8 @@ impl KrataChannelBackend { store: XsdClient, evtchn: EventChannel, gnttab: GrantTab, - ) -> Result { + output_sender: Sender<(u32, Vec)>, + ) -> Result { let processor = KrataChannelBackendProcessor { backend, frontend, @@ -181,15 +226,13 @@ impl KrataChannelBackend { gnttab, }; - let (output_sender, output_receiver) = channel(KRATA_SINGLE_CHANNEL_QUEUE_LEN); - let (input_sender, input_receiver) = channel(KRATA_SINGLE_CHANNEL_QUEUE_LEN); + let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN); let task = processor.launch(output_sender, input_receiver).await?; - Ok(KrataChannelBackend { + Ok(ChannelBackend { domid, id, task, - receiver: output_receiver, sender: input_sender, }) } @@ -211,7 +254,7 @@ impl KrataChannelBackendProcessor { self.store .write_string(format!("{}/state", self.backend), "3") .await?; - info!( + debug!( "created channel backend for domain {} channel {}", self.domid, self.id ); @@ -246,7 +289,7 @@ impl KrataChannelBackendProcessor { async fn launch( &self, - output_sender: Sender>, + output_sender: Sender<(u32, Vec)>, input_receiver: Receiver>, ) -> Result> { let owned = self.clone(); @@ -263,7 +306,7 @@ impl KrataChannelBackendProcessor { async fn processor( &self, - sender: Sender>, + sender: Sender<(u32, Vec)>, mut receiver: Receiver>, ) -> Result<()> { self.init().await?; @@ -335,7 +378,7 @@ impl KrataChannelBackendProcessor { unsafe { let buffer = self.read_output_buffer(channel.local_port, &memory).await?; if !buffer.is_empty() { - sender.send(buffer).await?; + sender.send((self.domid, buffer)).await?; } }; @@ -404,7 +447,7 @@ impl KrataChannelBackendProcessor { unsafe { let buffer = self.read_output_buffer(channel.local_port, &memory).await?; if !buffer.is_empty() { - sender.send(buffer).await?; + sender.send((self.domid, buffer)).await?; } }; channel.unmask_sender.send(channel.local_port).await?; diff --git a/crates/kratart/src/launch.rs b/crates/kratart/src/launch.rs index ef406e0..10db81c 100644 --- a/crates/kratart/src/launch.rs +++ b/crates/kratart/src/launch.rs @@ -9,7 +9,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, }; use uuid::Uuid; -use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; +use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; @@ -180,11 +180,10 @@ impl GuestLauncher { writable: false, }, ], - // channels: vec![DomainChannel { - // typ: "krata-channel".to_string(), - // initialized: false, - // }], - channels: vec![], + channels: vec![DomainChannel { + typ: "krata-channel".to_string(), + initialized: false, + }], vifs: vec![DomainNetworkInterface { mac: &guest_mac_string, mtu: 1500, diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index f1632a9..3e9851e 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -26,7 +26,7 @@ use krataoci::cache::ImageCache; pub mod autoloop; pub mod cfgblk; -pub mod chan; +pub mod channel; pub mod console; pub mod launch;