From 38e892e24980b5010858aaf5f85a882a57a8f667 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sun, 21 Apr 2024 21:00:32 -0700 Subject: [PATCH] feat: idm v2 (#102) * feat: rebuild idm to separate transport from content * feat: fast guest lookup table and host identification --- Cargo.lock | 2 + Cargo.toml | 1 + crates/ctl/Cargo.toml | 2 + crates/ctl/src/cli/identify_host.rs | 22 +++ crates/ctl/src/cli/idm_snoop.rs | 102 +++++++++++-- crates/ctl/src/cli/mod.rs | 13 +- crates/ctl/src/format.rs | 87 ++++++----- crates/daemon/bin/daemon.rs | 24 +-- crates/daemon/src/command.rs | 36 +++++ crates/daemon/src/console.rs | 17 ++- crates/daemon/src/control.rs | 103 +++++-------- crates/daemon/src/event.rs | 13 +- crates/daemon/src/glt.rs | 69 +++++++++ crates/daemon/src/idm.rs | 56 ++++--- crates/daemon/src/lib.rs | 31 +++- crates/daemon/src/metrics.rs | 14 +- crates/daemon/src/reconcile/guest.rs | 33 ++++- crates/guest/src/background.rs | 46 +++--- crates/guest/src/init.rs | 21 ++- crates/guest/src/metrics.rs | 65 ++++---- crates/krata/build.rs | 12 +- crates/krata/proto/krata/bus/idm.proto | 67 --------- crates/krata/proto/krata/idm/internal.proto | 57 +++++++ crates/krata/proto/krata/idm/transport.proto | 22 +++ crates/krata/proto/krata/v1/common.proto | 3 +- crates/krata/proto/krata/v1/control.proto | 18 ++- crates/krata/src/bus/mod.rs | 1 - crates/krata/src/idm/client.rs | 139 ++++++++++-------- .../krata/src/{bus/idm.rs => idm/internal.rs} | 62 ++++++-- crates/krata/src/idm/mod.rs | 4 +- crates/krata/src/idm/serialize.rs | 10 ++ crates/krata/src/idm/transport.rs | 1 + crates/krata/src/lib.rs | 1 - 33 files changed, 763 insertions(+), 391 deletions(-) create mode 100644 crates/ctl/src/cli/identify_host.rs create mode 100644 crates/daemon/src/command.rs create mode 100644 crates/daemon/src/glt.rs delete mode 100644 crates/krata/proto/krata/bus/idm.proto create mode 100644 crates/krata/proto/krata/idm/internal.proto create mode 100644 crates/krata/proto/krata/idm/transport.proto delete mode 100644 crates/krata/src/bus/mod.rs rename crates/krata/src/{bus/idm.rs => idm/internal.rs} (58%) create mode 100644 crates/krata/src/idm/serialize.rs create mode 100644 crates/krata/src/idm/transport.rs diff --git a/Cargo.lock b/Cargo.lock index 11e0dbb..b38a884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1412,6 +1412,7 @@ version = "0.0.9" dependencies = [ "anyhow", "async-stream", + "base64 0.22.0", "clap", "comfy-table", "crossterm", @@ -1425,6 +1426,7 @@ dependencies = [ "prost-reflect", "prost-types", "ratatui", + "serde", "serde_json", "serde_yaml", "termtree", diff --git a/Cargo.toml b/Cargo.toml index 330875c..843b24e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-compression = "0.4.8" async-stream = "0.3.5" async-trait = "0.1.80" backhand = "0.15.0" +base64 = "0.22.0" byteorder = "1" bytes = "1.5.0" cgroups-rs = "0.3.4" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index 968ce0e..06de33c 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -11,6 +11,7 @@ resolver = "2" [dependencies] anyhow = { workspace = true } async-stream = { workspace = true } +base64 = { workspace = true } clap = { workspace = true } comfy-table = { workspace = true } crossterm = { workspace = true, features = ["event-stream"] } @@ -24,6 +25,7 @@ log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } prost-types = { workspace = true } ratatui = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } termtree = { workspace = true } diff --git a/crates/ctl/src/cli/identify_host.rs b/crates/ctl/src/cli/identify_host.rs new file mode 100644 index 0000000..de96fc4 --- /dev/null +++ b/crates/ctl/src/cli/identify_host.rs @@ -0,0 +1,22 @@ +use anyhow::Result; +use clap::Parser; +use krata::v1::control::{control_service_client::ControlServiceClient, IdentifyHostRequest}; + +use tonic::{transport::Channel, Request}; + +#[derive(Parser)] +#[command(about = "Identify information about the host")] +pub struct IdentifyHostCommand {} + +impl IdentifyHostCommand { + pub async fn run(self, mut client: ControlServiceClient) -> Result<()> { + let response = client + .identify_host(Request::new(IdentifyHostRequest {})) + .await? + .into_inner(); + println!("Host UUID: {}", response.host_uuid); + println!("Host Domain: {}", response.host_domid); + println!("Krata Version: {}", response.krata_version); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/idm_snoop.rs b/crates/ctl/src/cli/idm_snoop.rs index 3a779a5..228e350 100644 --- a/crates/ctl/src/cli/idm_snoop.rs +++ b/crates/ctl/src/cli/idm_snoop.rs @@ -1,14 +1,18 @@ use anyhow::Result; +use base64::Engine; use clap::{Parser, ValueEnum}; use krata::{ events::EventStream, + idm::{internal, serialize::IdmSerializable, transport::IdmTransportPacketForm}, v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest}, }; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio_stream::StreamExt; use tonic::transport::Channel; -use crate::format::{kv2line, proto2dynamic, proto2kv}; +use crate::format::{kv2line, proto2dynamic, value2kv}; #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] enum IdmSnoopFormat { @@ -34,19 +38,22 @@ impl IdmSnoopCommand { while let Some(reply) = stream.next().await { let reply = reply?; + let Some(line) = convert_idm_snoop(reply) else { + continue; + }; + match self.format { IdmSnoopFormat::Simple => { - self.print_simple(reply)?; + self.print_simple(line)?; } IdmSnoopFormat::Jsonl => { - let value = serde_json::to_value(proto2dynamic(reply)?)?; - let encoded = serde_json::to_string(&value)?; + let encoded = serde_json::to_string(&line)?; println!("{}", encoded.trim()); } IdmSnoopFormat::KeyValue => { - self.print_key_value(reply)?; + self.print_key_value(line)?; } } } @@ -54,21 +61,86 @@ impl IdmSnoopCommand { Ok(()) } - fn print_simple(&self, reply: SnoopIdmReply) -> Result<()> { - let from = reply.from; - let to = reply.to; - let Some(packet) = reply.packet else { - return Ok(()); + fn print_simple(&self, line: IdmSnoopLine) -> Result<()> { + let encoded = if !line.packet.decoded.is_null() { + serde_json::to_string(&line.packet.decoded)? + } else { + base64::prelude::BASE64_STANDARD.encode(&line.packet.data) }; - let value = serde_json::to_value(proto2dynamic(packet)?)?; - let encoded = serde_json::to_string(&value)?; - println!("({} -> {}) {}", from, to, encoded); + println!( + "({} -> {}) {} {} {}", + line.from, line.to, line.packet.id, line.packet.form, encoded + ); Ok(()) } - fn print_key_value(&self, reply: SnoopIdmReply) -> Result<()> { - let kvs = proto2kv(reply)?; + fn print_key_value(&self, line: IdmSnoopLine) -> Result<()> { + let kvs = value2kv(serde_json::to_value(line)?)?; println!("{}", kv2line(kvs)); Ok(()) } } + +#[derive(Serialize, Deserialize)] +pub struct IdmSnoopLine { + pub from: String, + pub to: String, + pub packet: IdmSnoopData, +} + +#[derive(Serialize, Deserialize)] +pub struct IdmSnoopData { + pub id: u64, + pub channel: u64, + pub form: String, + pub data: String, + pub decoded: Value, +} + +pub fn convert_idm_snoop(reply: SnoopIdmReply) -> Option { + let packet = &(reply.packet?); + + let decoded = if packet.channel == 0 { + match packet.form() { + IdmTransportPacketForm::Event => internal::Event::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + IdmTransportPacketForm::Request => internal::Request::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + IdmTransportPacketForm::Response => internal::Response::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + _ => None, + } + } else { + None + }; + + let decoded = decoded + .and_then(|message| serde_json::to_value(message).ok()) + .unwrap_or(Value::Null); + + let data = IdmSnoopData { + id: packet.id, + channel: packet.channel, + form: match packet.form() { + IdmTransportPacketForm::Raw => "raw".to_string(), + IdmTransportPacketForm::Event => "event".to_string(), + IdmTransportPacketForm::Request => "request".to_string(), + IdmTransportPacketForm::Response => "response".to_string(), + _ => format!("unknown-{}", packet.form), + }, + data: base64::prelude::BASE64_STANDARD.encode(&packet.data), + decoded, + }; + + Some(IdmSnoopLine { + from: reply.from, + to: reply.to, + packet: data, + }) +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 17cd396..4653dd3 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -1,5 +1,6 @@ pub mod attach; pub mod destroy; +pub mod identify_host; pub mod idm_snoop; pub mod launch; pub mod list; @@ -20,9 +21,10 @@ use krata::{ use tonic::{transport::Channel, Request}; use self::{ - attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, - launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, - pull::PullCommand, resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, + attach::AttachCommand, destroy::DestroyCommand, identify_host::IdentifyHostCommand, + idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand, + metrics::MetricsCommand, pull::PullCommand, resolve::ResolveCommand, top::TopCommand, + watch::WatchCommand, }; #[derive(Parser)] @@ -56,6 +58,7 @@ pub enum Commands { Metrics(MetricsCommand), IdmSnoop(IdmSnoopCommand), Top(TopCommand), + IdentifyHost(IdentifyHostCommand), } impl ControlCommand { @@ -107,6 +110,10 @@ impl ControlCommand { Commands::Pull(pull) => { pull.run(client).await?; } + + Commands::IdentifyHost(identify) => { + identify.run(client).await?; + } } Ok(()) } diff --git a/crates/ctl/src/format.rs b/crates/ctl/src/format.rs index 9f7a2f4..5e9f232 100644 --- a/crates/ctl/src/format.rs +++ b/crates/ctl/src/format.rs @@ -4,7 +4,7 @@ use anyhow::Result; use fancy_duration::FancyDuration; use human_bytes::human_bytes; use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus}; -use prost_reflect::{DynamicMessage, FieldDescriptor, ReflectMessage, Value as ReflectValue}; +use prost_reflect::{DynamicMessage, ReflectMessage}; use prost_types::Value; use termtree::Tree; @@ -15,64 +15,59 @@ pub fn proto2dynamic(proto: impl ReflectMessage) -> Result { )?) } -pub fn proto2kv(proto: impl ReflectMessage) -> Result> { - let message = proto2dynamic(proto)?; +pub fn value2kv(value: serde_json::Value) -> Result> { let mut map = HashMap::new(); + fn crawl(prefix: String, map: &mut HashMap, value: serde_json::Value) { + fn dot(prefix: &str, next: String) -> String { + if prefix.is_empty() { + next.to_string() + } else { + format!("{}.{}", prefix, next) + } + } - fn crawl( - prefix: String, - field: Option<&FieldDescriptor>, - map: &mut HashMap, - value: &ReflectValue, - ) { match value { - ReflectValue::Message(child) => { - for (field, field_value) in child.fields() { - let path = if prefix.is_empty() { - field.json_name().to_string() - } else { - format!("{}.{}", prefix, field.json_name()) - }; - crawl(path, Some(&field), map, field_value); + serde_json::Value::Null => { + map.insert(prefix, "null".to_string()); + } + + serde_json::Value::String(value) => { + map.insert(prefix, value); + } + + serde_json::Value::Bool(value) => { + map.insert(prefix, value.to_string()); + } + + serde_json::Value::Number(value) => { + map.insert(prefix, value.to_string()); + } + + serde_json::Value::Array(value) => { + for (i, item) in value.into_iter().enumerate() { + let next = dot(&prefix, i.to_string()); + crawl(next, map, item); } } - ReflectValue::EnumNumber(number) => { - if let Some(kind) = field.map(|x| x.kind()) { - if let Some(e) = kind.as_enum() { - if let Some(value) = e.get_value(*number) { - map.insert(prefix, value.name().to_string()); - } - } + serde_json::Value::Object(value) => { + for (key, item) in value { + let next = dot(&prefix, key); + crawl(next, map, item); } } - - ReflectValue::String(value) => { - map.insert(prefix.to_string(), value.clone()); - } - - ReflectValue::List(value) => { - for (x, value) in value.iter().enumerate() { - crawl(format!("{}.{}", prefix, x), field, map, value); - } - } - - _ => { - map.insert(prefix.to_string(), value.to_string()); - } } } - - crawl( - "".to_string(), - None, - &mut map, - &ReflectValue::Message(message), - ); - + crawl("".to_string(), &mut map, value); Ok(map) } +pub fn proto2kv(proto: impl ReflectMessage) -> Result> { + let message = proto2dynamic(proto)?; + let value = serde_json::to_value(message)?; + value2kv(value) +} + pub fn kv2line(map: HashMap) -> String { map.iter() .map(|(k, v)| format!("{}=\"{}\"", k, v.replace('"', "\\\""))) diff --git a/crates/daemon/bin/daemon.rs b/crates/daemon/bin/daemon.rs index 9a1d984..16fcba2 100644 --- a/crates/daemon/bin/daemon.rs +++ b/crates/daemon/bin/daemon.rs @@ -1,21 +1,9 @@ use anyhow::Result; use clap::Parser; use env_logger::Env; -use krata::dial::ControlDialAddress; -use kratad::Daemon; +use kratad::command::DaemonCommand; use log::LevelFilter; -use std::{ - str::FromStr, - sync::{atomic::AtomicBool, Arc}, -}; - -#[derive(Parser)] -struct DaemonCommand { - #[arg(short, long, default_value = "unix:///var/lib/krata/daemon.socket")] - listen: String, - #[arg(short, long, default_value = "/var/lib/krata")] - store: String, -} +use std::sync::{atomic::AtomicBool, Arc}; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> Result<()> { @@ -24,12 +12,8 @@ async fn main() -> Result<()> { .init(); mask_sighup()?; - let args = DaemonCommand::parse(); - let addr = ControlDialAddress::from_str(&args.listen)?; - - let mut daemon = Daemon::new(args.store.clone()).await?; - daemon.listen(addr).await?; - Ok(()) + let command = DaemonCommand::parse(); + command.run().await } fn mask_sighup() -> Result<()> { diff --git a/crates/daemon/src/command.rs b/crates/daemon/src/command.rs new file mode 100644 index 0000000..052a8f6 --- /dev/null +++ b/crates/daemon/src/command.rs @@ -0,0 +1,36 @@ +use anyhow::Result; +use clap::{CommandFactory, Parser}; +use krata::dial::ControlDialAddress; +use std::str::FromStr; + +use crate::Daemon; + +#[derive(Parser)] +#[command(version, about = "Krata hypervisor daemon")] +pub struct DaemonCommand { + #[arg( + short, + long, + default_value = "unix:///var/lib/krata/daemon.socket", + help = "Listen address" + )] + listen: String, + #[arg(short, long, default_value = "/var/lib/krata", help = "Storage path")] + store: String, +} + +impl DaemonCommand { + pub async fn run(self) -> Result<()> { + let addr = ControlDialAddress::from_str(&self.listen)?; + let mut daemon = Daemon::new(self.store.clone()).await?; + daemon.listen(addr).await?; + Ok(()) + } + + pub fn version() -> String { + DaemonCommand::command() + .get_version() + .unwrap_or("unknown") + .to_string() + } +} diff --git a/crates/daemon/src/console.rs b/crates/daemon/src/console.rs index 38e4792..8c35b71 100644 --- a/crates/daemon/src/console.rs +++ b/crates/daemon/src/console.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use circular_buffer::CircularBuffer; use kratart::channel::ChannelService; use log::error; @@ -11,6 +11,9 @@ use tokio::{ }, task::JoinHandle, }; +use uuid::Uuid; + +use crate::glt::GuestLookupTable; const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024; type RawConsoleBuffer = CircularBuffer; @@ -21,6 +24,7 @@ type BufferMap = Arc>>; #[derive(Clone)] pub struct DaemonConsoleHandle { + glt: GuestLookupTable, listeners: ListenerMap, buffers: BufferMap, sender: Sender<(u32, Vec)>, @@ -50,9 +54,12 @@ impl DaemonConsoleAttachHandle { impl DaemonConsoleHandle { pub async fn attach( &self, - domid: u32, + uuid: Uuid, sender: Sender>, ) -> Result { + let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else { + return Err(anyhow!("unable to find domain {}", uuid)); + }; let buffers = self.buffers.lock().await; let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default(); drop(buffers); @@ -77,6 +84,7 @@ impl Drop for DaemonConsoleHandle { } pub struct DaemonConsole { + glt: GuestLookupTable, listeners: ListenerMap, buffers: BufferMap, receiver: Receiver<(u32, Option>)>, @@ -85,13 +93,14 @@ pub struct DaemonConsole { } impl DaemonConsole { - pub async fn new() -> Result { + pub async fn new(glt: GuestLookupTable) -> Result { let (service, sender, receiver) = ChannelService::new("krata-console".to_string(), Some(0)).await?; let task = service.launch().await?; let listeners = Arc::new(Mutex::new(HashMap::new())); let buffers = Arc::new(Mutex::new(HashMap::new())); Ok(DaemonConsole { + glt, listeners, buffers, receiver, @@ -101,6 +110,7 @@ impl DaemonConsole { } pub async fn launch(mut self) -> Result { + let glt = self.glt.clone(); let listeners = self.listeners.clone(); let buffers = self.buffers.clone(); let sender = self.sender.clone(); @@ -110,6 +120,7 @@ impl DaemonConsole { } }); Ok(DaemonConsoleHandle { + glt, listeners, buffers, sender, diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index a6bb54d..863aee7 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -1,18 +1,19 @@ use async_stream::try_stream; use futures::Stream; use krata::{ - idm::protocol::{ - idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType, - IdmMetricsRequest, + idm::internal::{ + request::Request as IdmRequestType, response::Response as IdmResponseType, MetricsRequest, + Request as IdmRequest, }, v1::{ common::{Guest, GuestState, GuestStatus, OciImageFormat}, control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - ListGuestsReply, ListGuestsRequest, PullImageReply, PullImageRequest, - ReadGuestMetricsReply, ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest, - SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, + IdentifyHostReply, IdentifyHostRequest, ListGuestsReply, ListGuestsRequest, + PullImageReply, PullImageRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, + ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest, + WatchEventsReply, WatchEventsRequest, }, }, }; @@ -32,7 +33,8 @@ use tonic::{Request, Response, Status, Streaming}; use uuid::Uuid; use crate::{ - console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, + command::DaemonCommand, console::DaemonConsoleHandle, db::GuestStore, + event::DaemonEventContext, glt::GuestLookupTable, idm::DaemonIdmHandle, metrics::idm_metric_to_api, oci::convert_oci_progress, }; @@ -56,6 +58,7 @@ impl From for Status { #[derive(Clone)] pub struct DaemonControlService { + glt: GuestLookupTable, events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, @@ -66,6 +69,7 @@ pub struct DaemonControlService { impl DaemonControlService { pub fn new( + glt: GuestLookupTable, events: DaemonEventContext, console: DaemonConsoleHandle, idm: DaemonIdmHandle, @@ -74,6 +78,7 @@ impl DaemonControlService { packer: OciPackerService, ) -> Self { Self { + glt, events, console, idm, @@ -108,6 +113,18 @@ impl ControlService for DaemonControlService { type SnoopIdmStream = Pin> + Send + 'static>>; + async fn identify_host( + &self, + request: Request, + ) -> Result, Status> { + let _ = request.into_inner(); + Ok(Response::new(IdentifyHostReply { + host_domid: self.glt.host_domid(), + host_uuid: self.glt.host_uuid().to_string(), + krata_version: DaemonCommand::version(), + })) + } + async fn create_guest( &self, request: Request, @@ -130,6 +147,7 @@ impl ControlService for DaemonControlService { network: None, exit_info: None, error_info: None, + host: self.glt.host_uuid().to_string(), domid: u32::MAX, }), spec: Some(spec), @@ -230,36 +248,10 @@ impl ControlService for DaemonControlService { let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { message: error.to_string(), })?; - let guest = self - .guests - .read(uuid) - .await - .map_err(|error| ApiError { - message: error.to_string(), - })? - .ok_or_else(|| ApiError { - message: "guest did not exist in the database".to_string(), - })?; - - let Some(ref state) = guest.state else { - return Err(ApiError { - message: "guest did not have state".to_string(), - } - .into()); - }; - - let domid = state.domid; - if domid == 0 { - return Err(ApiError { - message: "invalid domid on the guest".to_string(), - } - .into()); - } - let (sender, mut receiver) = channel(100); let console = self .console - .attach(domid, sender) + .attach(uuid, sender) .await .map_err(|error| ApiError { message: format!("failed to attach to console: {}", error), @@ -309,45 +301,21 @@ impl ControlService for DaemonControlService { let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { message: error.to_string(), })?; - let guest = self - .guests - .read(uuid) - .await - .map_err(|error| ApiError { - message: error.to_string(), - })? - .ok_or_else(|| ApiError { - message: "guest did not exist in the database".to_string(), - })?; - - let Some(ref state) = guest.state else { - return Err(ApiError { - message: "guest did not have state".to_string(), - } - .into()); - }; - - let domid = state.domid; - if domid == 0 { - return Err(ApiError { - message: "invalid domid on the guest".to_string(), - } - .into()); - } - - let client = self.idm.client(domid).await.map_err(|error| ApiError { + let client = self.idm.client(uuid).await.map_err(|error| ApiError { message: error.to_string(), })?; let response = client - .send(IdmRequestType::Metrics(IdmMetricsRequest {})) + .send(IdmRequest { + request: Some(IdmRequestType::Metrics(MetricsRequest {})), + }) .await .map_err(|error| ApiError { message: error.to_string(), })?; let mut reply = ReadGuestMetricsReply::default(); - if let IdmResponseType::Metrics(metrics) = response { + if let Some(IdmResponseType::Metrics(metrics)) = response.response { reply.root = metrics.root.map(idm_metric_to_api); } Ok(Response::new(reply)) @@ -446,9 +414,16 @@ impl ControlService for DaemonControlService { ) -> Result, Status> { let _ = request.into_inner(); let mut messages = self.idm.snoop(); + let glt = self.glt.clone(); let output = try_stream! { while let Ok(event) = messages.recv().await { - yield SnoopIdmReply { from: event.from, to: event.to, packet: Some(event.packet) }; + let Some(from_uuid) = glt.lookup_uuid_by_domid(event.from).await else { + continue; + }; + let Some(to_uuid) = glt.lookup_uuid_by_domid(event.to).await else { + continue; + }; + yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) }; } }; Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream)) diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 9d21ff1..f637314 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use krata::{ - idm::protocol::{idm_event::Event, IdmEvent}, + idm::{internal::event::Event as EventType, internal::Event}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; use log::{error, warn}; @@ -50,8 +50,8 @@ pub struct DaemonEventGenerator { feed: broadcast::Receiver, idm: DaemonIdmHandle, idms: HashMap)>, - idm_sender: Sender<(u32, IdmEvent)>, - idm_receiver: Receiver<(u32, IdmEvent)>, + idm_sender: Sender<(u32, Event)>, + idm_receiver: Receiver<(u32, Event)>, _event_sender: broadcast::Sender, } @@ -93,7 +93,7 @@ impl DaemonEventGenerator { match status { GuestStatus::Started => { if let Entry::Vacant(e) = self.idms.entry(domid) { - let client = self.idm.client(domid).await?; + let client = self.idm.client_by_domid(domid).await?; let mut receiver = client.subscribe().await?; let sender = self.idm_sender.clone(); let task = tokio::task::spawn(async move { @@ -122,9 +122,9 @@ impl DaemonEventGenerator { Ok(()) } - async fn handle_idm_event(&mut self, id: Uuid, event: IdmEvent) -> Result<()> { + async fn handle_idm_event(&mut self, id: Uuid, event: Event) -> Result<()> { match event.event { - Some(Event::Exit(exit)) => self.handle_exit_code(id, exit.code).await, + Some(EventType::Exit(exit)) => self.handle_exit_code(id, exit.code).await, None => Ok(()), } } @@ -136,6 +136,7 @@ impl DaemonEventGenerator { network: guest.state.clone().unwrap_or_default().network, exit_info: Some(GuestExitInfo { code }), error_info: None, + host: guest.state.clone().map(|x| x.host).unwrap_or_default(), domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX), }); diff --git a/crates/daemon/src/glt.rs b/crates/daemon/src/glt.rs new file mode 100644 index 0000000..e91a0cf --- /dev/null +++ b/crates/daemon/src/glt.rs @@ -0,0 +1,69 @@ +use std::{collections::HashMap, sync::Arc}; + +use tokio::sync::RwLock; +use uuid::Uuid; + +struct GuestLookupTableState { + domid_to_uuid: HashMap, + uuid_to_domid: HashMap, +} + +impl GuestLookupTableState { + pub fn new(host_uuid: Uuid) -> Self { + let mut domid_to_uuid = HashMap::new(); + let mut uuid_to_domid = HashMap::new(); + domid_to_uuid.insert(0, host_uuid); + uuid_to_domid.insert(host_uuid, 0); + GuestLookupTableState { + domid_to_uuid, + uuid_to_domid, + } + } +} + +#[derive(Clone)] +pub struct GuestLookupTable { + host_domid: u32, + host_uuid: Uuid, + state: Arc>, +} + +impl GuestLookupTable { + pub fn new(host_domid: u32, host_uuid: Uuid) -> Self { + GuestLookupTable { + host_domid, + host_uuid, + state: Arc::new(RwLock::new(GuestLookupTableState::new(host_uuid))), + } + } + + pub fn host_uuid(&self) -> Uuid { + self.host_uuid + } + + pub fn host_domid(&self) -> u32 { + self.host_domid + } + + pub async fn lookup_uuid_by_domid(&self, domid: u32) -> Option { + let state = self.state.read().await; + state.domid_to_uuid.get(&domid).cloned() + } + + pub async fn lookup_domid_by_uuid(&self, uuid: &Uuid) -> Option { + let state = self.state.read().await; + state.uuid_to_domid.get(uuid).cloned() + } + + pub async fn associate(&self, uuid: Uuid, domid: u32) { + let mut state = self.state.write().await; + state.uuid_to_domid.insert(uuid, domid); + state.domid_to_uuid.insert(domid, uuid); + } + + pub async fn remove(&self, uuid: Uuid, domid: u32) { + let mut state = self.state.write().await; + state.uuid_to_domid.remove(&uuid); + state.domid_to_uuid.remove(&domid); + } +} diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index ee788b9..1aff9a2 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -6,8 +6,9 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::{Buf, BytesMut}; use krata::idm::{ - client::{IdmBackend, IdmClient}, - protocol::IdmPacket, + client::{IdmBackend, IdmInternalClient}, + internal::INTERNAL_IDM_CHANNEL, + transport::IdmTransportPacket, }; use kratart::channel::ChannelService; use log::{error, warn}; @@ -21,15 +22,19 @@ use tokio::{ }, task::JoinHandle, }; +use uuid::Uuid; -type BackendFeedMap = Arc>>>; -type ClientMap = Arc>>; +use crate::glt::GuestLookupTable; + +type BackendFeedMap = Arc>>>; +type ClientMap = Arc>>; #[derive(Clone)] pub struct DaemonIdmHandle { + glt: GuestLookupTable, clients: ClientMap, feeds: BackendFeedMap, - tx_sender: Sender<(u32, IdmPacket)>, + tx_sender: Sender<(u32, IdmTransportPacket)>, task: Arc>, snoop_sender: broadcast::Sender, } @@ -39,7 +44,14 @@ impl DaemonIdmHandle { self.snoop_sender.subscribe() } - pub async fn client(&self, domid: u32) -> Result { + pub async fn client(&self, uuid: Uuid) -> Result { + let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else { + return Err(anyhow!("unable to find domain {}", uuid)); + }; + self.client_by_domid(domid).await + } + + pub async fn client_by_domid(&self, domid: u32) -> Result { client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await } } @@ -56,22 +68,23 @@ impl Drop for DaemonIdmHandle { pub struct DaemonIdmSnoopPacket { pub from: u32, pub to: u32, - pub packet: IdmPacket, + pub packet: IdmTransportPacket, } pub struct DaemonIdm { + glt: GuestLookupTable, clients: ClientMap, feeds: BackendFeedMap, - tx_sender: Sender<(u32, IdmPacket)>, + tx_sender: Sender<(u32, IdmTransportPacket)>, tx_raw_sender: Sender<(u32, Vec)>, - tx_receiver: Receiver<(u32, IdmPacket)>, + tx_receiver: Receiver<(u32, IdmTransportPacket)>, rx_receiver: Receiver<(u32, Option>)>, snoop_sender: broadcast::Sender, task: JoinHandle<()>, } impl DaemonIdm { - pub async fn new() -> Result { + pub async fn new(glt: GuestLookupTable) -> Result { let (service, tx_raw_sender, rx_receiver) = ChannelService::new("krata-channel".to_string(), None).await?; let (tx_sender, tx_receiver) = channel(100); @@ -80,6 +93,7 @@ impl DaemonIdm { let clients = Arc::new(Mutex::new(HashMap::new())); let feeds = Arc::new(Mutex::new(HashMap::new())); Ok(DaemonIdm { + glt, rx_receiver, tx_receiver, tx_sender, @@ -92,6 +106,7 @@ impl DaemonIdm { } pub async fn launch(mut self) -> Result { + let glt = self.glt.clone(); let clients = self.clients.clone(); let feeds = self.feeds.clone(); let tx_sender = self.tx_sender.clone(); @@ -104,6 +119,7 @@ impl DaemonIdm { } }); Ok(DaemonIdmHandle { + glt, clients, feeds, tx_sender, @@ -136,7 +152,7 @@ impl DaemonIdm { } let mut packet = buffer.split_to(needed); packet.advance(6); - match IdmPacket::decode(packet) { + match IdmTransportPacket::decode(packet) { Ok(packet) => { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; let guard = self.feeds.lock().await; @@ -196,10 +212,10 @@ impl Drop for DaemonIdm { async fn client_or_create( domid: u32, - tx_sender: &Sender<(u32, IdmPacket)>, + tx_sender: &Sender<(u32, IdmTransportPacket)>, clients: &ClientMap, feeds: &BackendFeedMap, -) -> Result { +) -> Result { let mut clients = clients.lock().await; let mut feeds = feeds.lock().await; match clients.entry(domid) { @@ -212,7 +228,11 @@ async fn client_or_create( rx_receiver, tx_sender: tx_sender.clone(), }; - let client = IdmClient::new(Box::new(backend) as Box).await?; + let client = IdmInternalClient::new( + INTERNAL_IDM_CHANNEL, + Box::new(backend) as Box, + ) + .await?; entry.insert(client.clone()); Ok(client) } @@ -221,13 +241,13 @@ async fn client_or_create( pub struct IdmDaemonBackend { domid: u32, - rx_receiver: Receiver, - tx_sender: Sender<(u32, IdmPacket)>, + rx_receiver: Receiver, + tx_sender: Sender<(u32, IdmTransportPacket)>, } #[async_trait::async_trait] impl IdmBackend for IdmDaemonBackend { - async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { if let Some(packet) = self.rx_receiver.recv().await { Ok(packet) } else { @@ -235,7 +255,7 @@ impl IdmBackend for IdmDaemonBackend { } } - async fn send(&mut self, packet: IdmPacket) -> Result<()> { + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> { self.tx_sender.send((self.domid, packet)).await?; Ok(()) } diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 4c605a0..7e1023f 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -5,6 +5,7 @@ use console::{DaemonConsole, DaemonConsoleHandle}; use control::DaemonControlService; use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; +use glt::GuestLookupTable; use idm::{DaemonIdm, DaemonIdmHandle}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; use krataoci::{packer::service::OciPackerService, registry::OciPlatform}; @@ -21,10 +22,12 @@ use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::{Identity, Server, ServerTlsConfig}; use uuid::Uuid; +pub mod command; pub mod console; pub mod control; pub mod db; pub mod event; +pub mod glt; pub mod idm; pub mod metrics; pub mod oci; @@ -32,6 +35,7 @@ pub mod reconcile; pub struct Daemon { store: String, + glt: GuestLookupTable, guests: GuestStore, events: DaemonEventContext, guest_reconciler_task: JoinHandle<()>, @@ -51,22 +55,43 @@ impl Daemon { image_cache_dir.push("image"); fs::create_dir_all(&image_cache_dir).await?; + let mut host_uuid_path = PathBuf::from(store.clone()); + host_uuid_path.push("host.uuid"); + let host_uuid = if host_uuid_path.is_file() { + let content = fs::read_to_string(&host_uuid_path).await?; + Uuid::from_str(content.trim()).ok() + } else { + None + }; + + let host_uuid = if let Some(host_uuid) = host_uuid { + host_uuid + } else { + let generated = Uuid::new_v4(); + let mut string = generated.to_string(); + string.push('\n'); + fs::write(&host_uuid_path, string).await?; + generated + }; + let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?; let runtime = Runtime::new(store.clone()).await?; + let glt = GuestLookupTable::new(0, host_uuid); let guests_db_path = format!("{}/guests.db", store); let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let (guest_reconciler_notify, guest_reconciler_receiver) = channel::(GUEST_RECONCILER_QUEUE_LEN); - let idm = DaemonIdm::new().await?; + let idm = DaemonIdm::new(glt.clone()).await?; let idm = idm.launch().await?; - let console = DaemonConsole::new().await?; + let console = DaemonConsole::new(glt.clone()).await?; let console = console.launch().await?; let (events, generator) = DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) .await?; let runtime_for_reconciler = runtime.dupe().await?; let guest_reconciler = GuestReconciler::new( + glt.clone(), guests.clone(), events.clone(), runtime_for_reconciler, @@ -79,6 +104,7 @@ impl Daemon { Ok(Self { store, + glt, guests, events, guest_reconciler_task, @@ -92,6 +118,7 @@ impl Daemon { pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { let control_service = DaemonControlService::new( + self.glt.clone(), self.events.clone(), self.console.clone(), self.idm.clone(), diff --git a/crates/daemon/src/metrics.rs b/crates/daemon/src/metrics.rs index a273dd0..abeae15 100644 --- a/crates/daemon/src/metrics.rs +++ b/crates/daemon/src/metrics.rs @@ -1,18 +1,18 @@ use krata::{ - idm::protocol::{IdmMetricFormat, IdmMetricNode}, + idm::internal::{MetricFormat, MetricNode}, v1::common::{GuestMetricFormat, GuestMetricNode}, }; -fn idm_metric_format_to_api(format: IdmMetricFormat) -> GuestMetricFormat { +fn idm_metric_format_to_api(format: MetricFormat) -> GuestMetricFormat { match format { - IdmMetricFormat::Unknown => GuestMetricFormat::Unknown, - IdmMetricFormat::Bytes => GuestMetricFormat::Bytes, - IdmMetricFormat::Integer => GuestMetricFormat::Integer, - IdmMetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, + MetricFormat::Unknown => GuestMetricFormat::Unknown, + MetricFormat::Bytes => GuestMetricFormat::Bytes, + MetricFormat::Integer => GuestMetricFormat::Integer, + MetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, } } -pub fn idm_metric_to_api(node: IdmMetricNode) -> GuestMetricNode { +pub fn idm_metric_to_api(node: MetricNode) -> GuestMetricNode { let format = node.format(); GuestMetricNode { name: node.name, diff --git a/crates/daemon/src/reconcile/guest.rs b/crates/daemon/src/reconcile/guest.rs index 5e1876c..c947b62 100644 --- a/crates/daemon/src/reconcile/guest.rs +++ b/crates/daemon/src/reconcile/guest.rs @@ -30,6 +30,7 @@ use uuid::Uuid; use crate::{ db::GuestStore, event::{DaemonEvent, DaemonEventContext}, + glt::GuestLookupTable, }; const PARALLEL_LIMIT: u32 = 5; @@ -53,6 +54,7 @@ impl Drop for GuestReconcilerEntry { #[derive(Clone)] pub struct GuestReconciler { + glt: GuestLookupTable, guests: GuestStore, events: DaemonEventContext, runtime: Runtime, @@ -64,6 +66,7 @@ pub struct GuestReconciler { impl GuestReconciler { pub fn new( + glt: GuestLookupTable, guests: GuestStore, events: DaemonEventContext, runtime: Runtime, @@ -71,6 +74,7 @@ impl GuestReconciler { guest_reconciler_notify: Sender, ) -> Result { Ok(Self { + glt, guests, events, runtime, @@ -123,6 +127,23 @@ impl GuestReconciler { trace!("reconciling runtime"); let runtime_guests = self.runtime.list().await?; let stored_guests = self.guests.list().await?; + + let non_existent_guests = runtime_guests + .iter() + .filter(|x| !stored_guests.iter().any(|g| *g.0 == x.uuid)) + .collect::>(); + + for guest in non_existent_guests { + warn!("destroying unknown runtime guest {}", guest.uuid); + if let Err(error) = self.runtime.destroy(guest.uuid).await { + error!( + "failed to destroy unknown runtime guest {}: {}", + guest.uuid, error + ); + } + self.guests.remove(guest.uuid).await?; + } + for (uuid, mut stored_guest) in stored_guests { let previous_guest = stored_guest.clone(); let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid); @@ -136,6 +157,7 @@ impl GuestReconciler { } Some(runtime) => { + self.glt.associate(uuid, runtime.domid).await; let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); if let Some(code) = runtime.state.exit_code { state.status = GuestStatus::Exited.into(); @@ -283,12 +305,14 @@ impl GuestReconciler { debug: false, }) .await?; + self.glt.associate(uuid, info.domid).await; info!("started guest {}", uuid); guest.state = Some(GuestState { status: GuestStatus::Started.into(), network: Some(guestinfo_to_networkstate(&info)), exit_info: None, error_info: None, + host: self.glt.host_uuid().to_string(), domid: info.domid, }); Ok(GuestReconcilerResult::Changed { rerun: false }) @@ -308,13 +332,20 @@ impl GuestReconciler { trace!("failed to destroy runtime guest {}: {}", uuid, error); } + let domid = guest.state.as_ref().map(|x| x.domid); + + if let Some(domid) = domid { + self.glt.remove(uuid, domid).await; + } + info!("destroyed guest {}", uuid); guest.state = Some(GuestState { status: GuestStatus::Destroyed.into(), network: None, exit_info: None, error_info: None, - domid: guest.state.as_ref().map(|x| x.domid).unwrap_or(u32::MAX), + host: self.glt.host_uuid().to_string(), + domid: domid.unwrap_or(u32::MAX), }); Ok(GuestReconcilerResult::Changed { rerun: false }) } diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index a2c2929..d981343 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -6,10 +6,11 @@ use crate::{ use anyhow::Result; use cgroups_rs::Cgroup; use krata::idm::{ - client::IdmClient, - protocol::{ - idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent, - IdmMetricsResponse, IdmPingResponse, IdmRequest, + client::IdmInternalClient, + internal::{ + event::Event as EventType, request::Request as RequestType, + response::Response as ResponseType, Event, ExitEvent, MetricsResponse, PingResponse, + Request, Response, }, }; use log::debug; @@ -17,14 +18,18 @@ use nix::unistd::Pid; use tokio::{select, sync::broadcast}; pub struct GuestBackground { - idm: IdmClient, + idm: IdmInternalClient, child: Pid, _cgroup: Cgroup, wait: ChildWait, } impl GuestBackground { - pub async fn new(idm: IdmClient, cgroup: Cgroup, child: Pid) -> Result { + pub async fn new( + idm: IdmInternalClient, + cgroup: Cgroup, + child: Pid, + ) -> Result { Ok(GuestBackground { idm, child, @@ -54,8 +59,8 @@ impl GuestBackground { }, x = requests_subscription.recv() => match x { - Ok(request) => { - self.handle_idm_request(request).await?; + Ok((id, request)) => { + self.handle_idm_request(id, request).await?; }, Err(broadcast::error::RecvError::Closed) => { @@ -79,22 +84,27 @@ impl GuestBackground { Ok(()) } - async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> { - let id = packet.id; - + async fn handle_idm_request(&mut self, id: u64, packet: Request) -> Result<()> { match packet.request { - Some(Request::Ping(_)) => { + Some(RequestType::Ping(_)) => { self.idm - .respond(id, Response::Ping(IdmPingResponse {})) + .respond( + id, + Response { + response: Some(ResponseType::Ping(PingResponse {})), + }, + ) .await?; } - Some(Request::Metrics(_)) => { + Some(RequestType::Metrics(_)) => { let metrics = MetricsCollector::new()?; let root = metrics.collect()?; - let response = IdmMetricsResponse { root: Some(root) }; + let response = Response { + response: Some(ResponseType::Metrics(MetricsResponse { root: Some(root) })), + }; - self.idm.respond(id, Response::Metrics(response)).await?; + self.idm.respond(id, response).await?; } None => {} @@ -105,8 +115,8 @@ impl GuestBackground { async fn child_event(&mut self, event: ChildEvent) -> Result<()> { if event.pid == self.child { self.idm - .emit(IdmEvent { - event: Some(Event::Exit(IdmExitEvent { code: event.status })), + .emit(Event { + event: Some(EventType::Exit(ExitEvent { code: event.status })), }) .await?; death(event.status).await?; diff --git a/crates/guest/src/init.rs b/crates/guest/src/init.rs index f657015..1f1189f 100644 --- a/crates/guest/src/init.rs +++ b/crates/guest/src/init.rs @@ -3,7 +3,8 @@ use cgroups_rs::{Cgroup, CgroupPid}; use futures::stream::TryStreamExt; use ipnetwork::IpNetwork; use krata::ethtool::EthtoolHandle; -use krata::idm::client::IdmClient; +use krata::idm::client::IdmInternalClient; +use krata::idm::internal::INTERNAL_IDM_CHANNEL; use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat}; use libc::{sethostname, setsid, TIOCSCTTY}; use log::{trace, warn}; @@ -77,7 +78,7 @@ impl GuestInit { Err(error) => warn!("failed to open console: {}", error), }; - let idm = IdmClient::open("/dev/hvc1") + let idm = IdmInternalClient::open(INTERNAL_IDM_CHANNEL, "/dev/hvc1") .await .map_err(|x| anyhow!("failed to open idm client: {}", x))?; self.mount_config_image().await?; @@ -438,7 +439,12 @@ impl GuestInit { Ok(()) } - async fn run(&mut self, config: &Config, launch: &LaunchInfo, idm: IdmClient) -> Result<()> { + async fn run( + &mut self, + config: &Config, + launch: &LaunchInfo, + idm: IdmInternalClient, + ) -> Result<()> { let mut cmd = match config.cmd() { None => vec![], Some(value) => value.clone(), @@ -560,7 +566,7 @@ impl GuestInit { async fn fork_and_exec( &mut self, - idm: IdmClient, + idm: IdmInternalClient, cgroup: Cgroup, working_dir: String, path: CString, @@ -596,7 +602,12 @@ impl GuestInit { Ok(()) } - async fn background(&mut self, idm: IdmClient, cgroup: Cgroup, executed: Pid) -> Result<()> { + async fn background( + &mut self, + idm: IdmInternalClient, + cgroup: Cgroup, + executed: Pid, + ) -> Result<()> { let mut background = GuestBackground::new(idm, cgroup, executed).await?; background.run().await?; Ok(()) diff --git a/crates/guest/src/metrics.rs b/crates/guest/src/metrics.rs index f9ec377..5f97a2d 100644 --- a/crates/guest/src/metrics.rs +++ b/crates/guest/src/metrics.rs @@ -1,7 +1,7 @@ use std::{ops::Add, path::Path}; use anyhow::Result; -use krata::idm::protocol::{IdmMetricFormat, IdmMetricNode}; +use krata::idm::internal::{MetricFormat, MetricNode}; use sysinfo::Process; pub struct MetricsCollector {} @@ -11,9 +11,9 @@ impl MetricsCollector { Ok(MetricsCollector {}) } - pub fn collect(&self) -> Result { + pub fn collect(&self) -> Result { let mut sysinfo = sysinfo::System::new(); - Ok(IdmMetricNode::structural( + Ok(MetricNode::structural( "guest", vec![ self.collect_system(&mut sysinfo)?, @@ -22,22 +22,22 @@ impl MetricsCollector { )) } - fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { + fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { sysinfo.refresh_memory(); - Ok(IdmMetricNode::structural( + Ok(MetricNode::structural( "system", - vec![IdmMetricNode::structural( + vec![MetricNode::structural( "memory", vec![ - IdmMetricNode::value("total", sysinfo.total_memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("used", sysinfo.used_memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("free", sysinfo.free_memory(), IdmMetricFormat::Bytes), + MetricNode::value("total", sysinfo.total_memory(), MetricFormat::Bytes), + MetricNode::value("used", sysinfo.used_memory(), MetricFormat::Bytes), + MetricNode::value("free", sysinfo.free_memory(), MetricFormat::Bytes), ], )], )) } - fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { + fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { sysinfo.refresh_processes(); let mut processes = Vec::new(); let mut sysinfo_processes = sysinfo.processes().values().collect::>(); @@ -48,71 +48,68 @@ impl MetricsCollector { } processes.push(MetricsCollector::process_node(process)?); } - Ok(IdmMetricNode::structural("process", processes)) + Ok(MetricNode::structural("process", processes)) } - fn process_node(process: &Process) -> Result { + fn process_node(process: &Process) -> Result { let mut metrics = vec![]; if let Some(parent) = process.parent() { - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "parent", parent.as_u32() as u64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); } if let Some(exe) = process.exe().and_then(path_as_str) { - metrics.push(IdmMetricNode::raw_value("executable", exe)); + metrics.push(MetricNode::raw_value("executable", exe)); } if let Some(working_directory) = process.cwd().and_then(path_as_str) { - metrics.push(IdmMetricNode::raw_value("cwd", working_directory)); + metrics.push(MetricNode::raw_value("cwd", working_directory)); } let cmdline = process.cmd().to_vec(); - metrics.push(IdmMetricNode::raw_value("cmdline", cmdline)); - metrics.push(IdmMetricNode::structural( + metrics.push(MetricNode::raw_value("cmdline", cmdline)); + metrics.push(MetricNode::structural( "memory", vec![ - IdmMetricNode::value("resident", process.memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("virtual", process.virtual_memory(), IdmMetricFormat::Bytes), + MetricNode::value("resident", process.memory(), MetricFormat::Bytes), + MetricNode::value("virtual", process.virtual_memory(), MetricFormat::Bytes), ], )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "lifetime", process.run_time(), - IdmMetricFormat::DurationSeconds, + MetricFormat::DurationSeconds, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "uid", process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "gid", process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "euid", process .effective_user_id() .map(|x| (*x).add(0)) .unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "egid", process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - Ok(IdmMetricNode::structural( - process.pid().to_string(), - metrics, - )) + Ok(MetricNode::structural(process.pid().to_string(), metrics)) } } diff --git a/crates/krata/build.rs b/crates/krata/build.rs index ea44224..adffbbc 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -6,12 +6,20 @@ fn main() -> Result<()> { .descriptor_pool("crate::DESCRIPTOR_POOL") .configure( &mut config, - &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], + &[ + "proto/krata/v1/control.proto", + "proto/krata/idm/transport.proto", + "proto/krata/idm/internal.proto", + ], &["proto/"], )?; tonic_build::configure().compile_with_config( config, - &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], + &[ + "proto/krata/v1/control.proto", + "proto/krata/idm/transport.proto", + "proto/krata/idm/internal.proto", + ], &["proto/"], )?; Ok(()) diff --git a/crates/krata/proto/krata/bus/idm.proto b/crates/krata/proto/krata/bus/idm.proto deleted file mode 100644 index b8d2f00..0000000 --- a/crates/krata/proto/krata/bus/idm.proto +++ /dev/null @@ -1,67 +0,0 @@ -syntax = "proto3"; - -package krata.bus.idm; - -option java_multiple_files = true; -option java_package = "dev.krata.proto.bus.idm"; -option java_outer_classname = "IdmProto"; - -import "google/protobuf/struct.proto"; - -message IdmPacket { - oneof content { - IdmEvent event = 1; - IdmRequest request = 2; - IdmResponse response = 3; - } -} - -message IdmEvent { - oneof event { - IdmExitEvent exit = 1; - } -} - -message IdmExitEvent { - int32 code = 1; -} - -message IdmRequest { - uint64 id = 1; - oneof request { - IdmPingRequest ping = 2; - IdmMetricsRequest metrics = 3; - } -} - -message IdmPingRequest {} - -message IdmMetricsRequest {} - -message IdmResponse { - uint64 id = 1; - oneof response { - IdmPingResponse ping = 2; - IdmMetricsResponse metrics = 3; - } -} - -message IdmPingResponse {} - -message IdmMetricsResponse { - IdmMetricNode root = 1; -} - -message IdmMetricNode { - string name = 1; - google.protobuf.Value value = 2; - IdmMetricFormat format = 3; - repeated IdmMetricNode children = 4; -} - -enum IdmMetricFormat { - IDM_METRIC_FORMAT_UNKNOWN = 0; - IDM_METRIC_FORMAT_BYTES = 1; - IDM_METRIC_FORMAT_INTEGER = 2; - IDM_METRIC_FORMAT_DURATION_SECONDS = 3; -} diff --git a/crates/krata/proto/krata/idm/internal.proto b/crates/krata/proto/krata/idm/internal.proto new file mode 100644 index 0000000..634643f --- /dev/null +++ b/crates/krata/proto/krata/idm/internal.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package krata.idm.internal; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.idm.internal"; +option java_outer_classname = "IdmInternalProto"; + +import "google/protobuf/struct.proto"; + +message ExitEvent { + int32 code = 1; +} + +message PingRequest {} + +message PingResponse {} + +message MetricsRequest {} + +message MetricsResponse { + MetricNode root = 1; +} + +message MetricNode { + string name = 1; + google.protobuf.Value value = 2; + MetricFormat format = 3; + repeated MetricNode children = 4; +} + +enum MetricFormat { + METRIC_FORMAT_UNKNOWN = 0; + METRIC_FORMAT_BYTES = 1; + METRIC_FORMAT_INTEGER = 2; + METRIC_FORMAT_DURATION_SECONDS = 3; +} + +message Event { + oneof event { + ExitEvent exit = 1; + } +} + +message Request { + oneof request { + PingRequest ping = 1; + MetricsRequest metrics = 2; + } +} + +message Response { + oneof response { + PingResponse ping = 1; + MetricsResponse metrics = 2; + } +} diff --git a/crates/krata/proto/krata/idm/transport.proto b/crates/krata/proto/krata/idm/transport.proto new file mode 100644 index 0000000..37d9283 --- /dev/null +++ b/crates/krata/proto/krata/idm/transport.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package krata.idm.transport; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.idm.transport"; +option java_outer_classname = "IdmTransportProto"; + +message IdmTransportPacket { + uint64 id = 1; + uint64 channel = 2; + IdmTransportPacketForm form = 3; + bytes data = 4; +} + +enum IdmTransportPacketForm { + IDM_TRANSPORT_PACKET_FORM_UNKNOWN = 0; + IDM_TRANSPORT_PACKET_FORM_RAW = 1; + IDM_TRANSPORT_PACKET_FORM_EVENT = 2; + IDM_TRANSPORT_PACKET_FORM_REQUEST = 3; + IDM_TRANSPORT_PACKET_FORM_RESPONSE = 4; +} diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 1c20875..84bd7f9 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -62,7 +62,8 @@ message GuestState { GuestNetworkState network = 2; GuestExitInfo exit_info = 3; GuestErrorInfo error_info = 4; - uint32 domid = 5; + string host = 5; + uint32 domid = 6; } enum GuestStatus { diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index a28f660..03a0881 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -6,10 +6,12 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.control"; option java_outer_classname = "ControlProto"; -import "krata/bus/idm.proto"; +import "krata/idm/transport.proto"; import "krata/v1/common.proto"; service ControlService { + rpc IdentifyHost(IdentifyHostRequest) returns (IdentifyHostReply); + rpc CreateGuest(CreateGuestRequest) returns (CreateGuestReply); rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply); @@ -24,6 +26,14 @@ service ControlService { rpc PullImage(PullImageRequest) returns (stream PullImageReply); } +message IdentifyHostRequest {} + +message IdentifyHostReply { + string host_uuid = 1; + uint32 host_domid = 2; + string krata_version = 3; +} + message CreateGuestRequest { krata.v1.common.GuestSpec spec = 1; } @@ -84,9 +94,9 @@ message ReadGuestMetricsReply { message SnoopIdmRequest {} message SnoopIdmReply { - uint32 from = 1; - uint32 to = 2; - krata.bus.idm.IdmPacket packet = 3; + string from = 1; + string to = 2; + krata.idm.transport.IdmTransportPacket packet = 3; } message ImageProgress { diff --git a/crates/krata/src/bus/mod.rs b/crates/krata/src/bus/mod.rs deleted file mode 100644 index b378e38..0000000 --- a/crates/krata/src/bus/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod idm; diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 3c0181b..f6a740a 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -8,10 +8,6 @@ use std::{ time::Duration, }; -use super::protocol::{ - idm_packet::Content, idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, - IdmRequest, IdmResponse, -}; use anyhow::{anyhow, Result}; use log::{debug, error}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; @@ -22,14 +18,21 @@ use tokio::{ select, sync::{ broadcast, - mpsc::{channel, Receiver, Sender}, + mpsc::{self, Receiver, Sender}, oneshot, Mutex, }, task::JoinHandle, time::timeout, }; -type RequestMap = Arc>>>; +use super::{ + internal, + serialize::{IdmRequest, IdmSerializable}, + transport::{IdmTransportPacket, IdmTransportPacketForm}, +}; + +type RequestMap = Arc::Response>>>>; +pub type IdmInternalClient = IdmClient; const IDM_PACKET_QUEUE_LEN: usize = 100; const IDM_REQUEST_TIMEOUT_SECS: u64 = 10; @@ -37,8 +40,8 @@ const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024; #[async_trait::async_trait] pub trait IdmBackend: Send { - async fn recv(&mut self) -> Result; - async fn send(&mut self, packet: IdmPacket) -> Result<()>; + async fn recv(&mut self) -> Result; + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()>; } pub struct IdmFileBackend { @@ -66,30 +69,30 @@ impl IdmFileBackend { #[async_trait::async_trait] impl IdmBackend for IdmFileBackend { - async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { let mut fd = self.read_fd.lock().await; let mut guard = fd.readable_mut().await?; let b1 = guard.get_inner_mut().read_u8().await?; if b1 != 0xff { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let b2 = guard.get_inner_mut().read_u8().await?; if b2 != 0xff { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let size = guard.get_inner_mut().read_u32_le().await?; if size == 0 { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let mut buffer = vec![0u8; size as usize]; guard.get_inner_mut().read_exact(&mut buffer).await?; - match IdmPacket::decode(buffer.as_slice()) { + match IdmTransportPacket::decode(buffer.as_slice()) { Ok(packet) => Ok(packet), Err(error) => Err(anyhow!("received invalid idm packet: {}", error)), } } - async fn send(&mut self, packet: IdmPacket) -> Result<()> { + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> { let mut file = self.write.lock().await; let data = packet.encode_to_vec(); file.write_all(&[0xff, 0xff]).await?; @@ -100,16 +103,17 @@ impl IdmBackend for IdmFileBackend { } #[derive(Clone)] -pub struct IdmClient { - request_backend_sender: broadcast::Sender, +pub struct IdmClient { + channel: u64, + request_backend_sender: broadcast::Sender<(u64, R)>, next_request_id: Arc>, - event_receiver_sender: broadcast::Sender, - tx_sender: Sender, - requests: RequestMap, + event_receiver_sender: broadcast::Sender, + tx_sender: Sender, + requests: RequestMap, task: Arc>, } -impl Drop for IdmClient { +impl Drop for IdmClient { fn drop(&mut self) { if Arc::strong_count(&self.task) <= 1 { self.task.abort(); @@ -117,12 +121,12 @@ impl Drop for IdmClient { } } -impl IdmClient { - pub async fn new(backend: Box) -> Result { +impl IdmClient { + pub async fn new(channel: u64, backend: Box) -> Result { let requests = Arc::new(Mutex::new(HashMap::new())); let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN); let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN); - let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); + let (tx_sender, tx_receiver) = mpsc::channel(IDM_PACKET_QUEUE_LEN); let backend_event_sender = event_sender.clone(); let request_backend_sender = internal_request_backend_sender.clone(); let requests_for_client = requests.clone(); @@ -141,6 +145,7 @@ impl IdmClient { } }); Ok(IdmClient { + channel, next_request_id: Arc::new(Mutex::new(0)), event_receiver_sender: event_sender.clone(), request_backend_sender, @@ -150,7 +155,7 @@ impl IdmClient { }) } - pub async fn open>(path: P) -> Result { + pub async fn open>(channel: u64, path: P) -> Result { let read_file = File::options() .read(true) .write(false) @@ -164,39 +169,48 @@ impl IdmClient { .open(path) .await?; let backend = IdmFileBackend::new(read_file, write_file).await?; - IdmClient::new(Box::new(backend) as Box).await + IdmClient::new(channel, Box::new(backend) as Box).await } - pub async fn emit(&self, event: IdmEvent) -> Result<()> { + pub async fn emit(&self, event: T) -> Result<()> { + let id = { + let mut guard = self.next_request_id.lock().await; + let req = *guard; + *guard = req.wrapping_add(1); + req + }; self.tx_sender - .send(IdmPacket { - content: Some(Content::Event(event)), + .send(IdmTransportPacket { + id, + form: IdmTransportPacketForm::Event.into(), + channel: self.channel, + data: event.encode()?, }) .await?; Ok(()) } - pub async fn requests(&self) -> Result> { + pub async fn requests(&self) -> Result> { Ok(self.request_backend_sender.subscribe()) } - pub async fn respond(&self, id: u64, response: Response) -> Result<()> { - let packet = IdmPacket { - content: Some(Content::Response(IdmResponse { - id, - response: Some(response), - })), + pub async fn respond(&self, id: u64, response: T) -> Result<()> { + let packet = IdmTransportPacket { + id, + form: IdmTransportPacketForm::Response.into(), + channel: self.channel, + data: response.encode()?, }; self.tx_sender.send(packet).await?; Ok(()) } - pub async fn subscribe(&self) -> Result> { + pub async fn subscribe(&self) -> Result> { Ok(self.event_receiver_sender.subscribe()) } - pub async fn send(&self, request: Request) -> Result { - let (sender, receiver) = oneshot::channel::(); + pub async fn send(&self, request: R) -> Result { + let (sender, receiver) = oneshot::channel::(); let req = { let mut guard = self.next_request_id.lock().await; let req = *guard; @@ -217,49 +231,52 @@ impl IdmClient { }); }); self.tx_sender - .send(IdmPacket { - content: Some(Content::Request(IdmRequest { - id: req, - request: Some(request), - })), + .send(IdmTransportPacket { + id: req, + channel: self.channel, + form: IdmTransportPacketForm::Request.into(), + data: request.encode()?, }) .await?; let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??; success.store(true, Ordering::Release); - if let Some(response) = response.response { - Ok(response) - } else { - Err(anyhow!("response did not contain any content")) - } + Ok(response) } async fn process( mut backend: Box, - event_sender: broadcast::Sender, - requests: RequestMap, - request_backend_sender: broadcast::Sender, - _event_receiver: broadcast::Receiver, - mut receiver: Receiver, + event_sender: broadcast::Sender, + requests: RequestMap, + request_backend_sender: broadcast::Sender<(u64, R)>, + _event_receiver: broadcast::Receiver, + mut receiver: Receiver, ) -> Result<()> { loop { select! { x = backend.recv() => match x { Ok(packet) => { - match packet.content { - Some(Content::Event(event)) => { - let _ = event_sender.send(event); + match packet.form() { + IdmTransportPacketForm::Event => { + if let Ok(event) = E::decode(&packet.data) { + let _ = event_sender.send(event); + } }, - Some(Content::Request(request)) => { - let _ = request_backend_sender.send(request); + IdmTransportPacketForm::Request => { + if let Ok(request) = R::decode(&packet.data) { + let _ = request_backend_sender.send((packet.id, request)); + } }, - Some(Content::Response(response)) => { + IdmTransportPacketForm::Response => { let mut requests = requests.lock().await; - if let Some(sender) = requests.remove(&response.id) { + if let Some(sender) = requests.remove(&packet.id) { drop(requests); - let _ = sender.send(response); + + if let Ok(response) = R::Response::decode(&packet.data) { + let _ = sender.send(response); + } } }, diff --git a/crates/krata/src/bus/idm.rs b/crates/krata/src/idm/internal.rs similarity index 58% rename from crates/krata/src/bus/idm.rs rename to crates/krata/src/idm/internal.rs index 9b02ab9..a6ef6ee 100644 --- a/crates/krata/src/bus/idm.rs +++ b/crates/krata/src/idm/internal.rs @@ -1,26 +1,66 @@ +use anyhow::Result; +use prost::Message; use prost_types::{ListValue, Value}; -include!(concat!(env!("OUT_DIR"), "/krata.bus.idm.rs")); +use super::serialize::{IdmRequest, IdmSerializable}; + +include!(concat!(env!("OUT_DIR"), "/krata.idm.internal.rs")); + +pub const INTERNAL_IDM_CHANNEL: u64 = 0; + +impl IdmSerializable for Event { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} + +impl IdmSerializable for Request { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} + +impl IdmRequest for Request { + type Response = Response; +} + +impl IdmSerializable for Response { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} pub trait AsIdmMetricValue { fn as_metric_value(&self) -> Value; } -impl IdmMetricNode { - pub fn structural>(name: N, children: Vec) -> IdmMetricNode { - IdmMetricNode { +impl MetricNode { + pub fn structural>(name: N, children: Vec) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: None, - format: IdmMetricFormat::Unknown.into(), + format: MetricFormat::Unknown.into(), children, } } - pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode { - IdmMetricNode { + pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: Some(value.as_metric_value()), - format: IdmMetricFormat::Unknown.into(), + format: MetricFormat::Unknown.into(), children: vec![], } } @@ -28,9 +68,9 @@ impl IdmMetricNode { pub fn value, V: AsIdmMetricValue>( name: N, value: V, - format: IdmMetricFormat, - ) -> IdmMetricNode { - IdmMetricNode { + format: MetricFormat, + ) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: Some(value.as_metric_value()), format: format.into(), diff --git a/crates/krata/src/idm/mod.rs b/crates/krata/src/idm/mod.rs index 2524a26..b02969e 100644 --- a/crates/krata/src/idm/mod.rs +++ b/crates/krata/src/idm/mod.rs @@ -1,3 +1,5 @@ #[cfg(unix)] pub mod client; -pub use crate::bus::idm as protocol; +pub mod internal; +pub mod serialize; +pub mod transport; diff --git a/crates/krata/src/idm/serialize.rs b/crates/krata/src/idm/serialize.rs new file mode 100644 index 0000000..519527c --- /dev/null +++ b/crates/krata/src/idm/serialize.rs @@ -0,0 +1,10 @@ +use anyhow::Result; + +pub trait IdmSerializable: Sized + Clone + Send + Sync + 'static { + fn decode(bytes: &[u8]) -> Result; + fn encode(&self) -> Result>; +} + +pub trait IdmRequest: IdmSerializable { + type Response: IdmSerializable; +} diff --git a/crates/krata/src/idm/transport.rs b/crates/krata/src/idm/transport.rs new file mode 100644 index 0000000..65b8c2d --- /dev/null +++ b/crates/krata/src/idm/transport.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/krata.idm.transport.rs")); diff --git a/crates/krata/src/lib.rs b/crates/krata/src/lib.rs index 7b5bb77..fc97f33 100644 --- a/crates/krata/src/lib.rs +++ b/crates/krata/src/lib.rs @@ -1,7 +1,6 @@ use once_cell::sync::Lazy; use prost_reflect::DescriptorPool; -pub mod bus; pub mod v1; pub mod client;