From 5e16f3149f3dd8bf1e4a3d342150963aaabef2f9 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Fri, 12 Apr 2024 00:34:46 -0700 Subject: [PATCH] feat: guest metrics support (#46) * feat: initial support for idm send in daemon * feat: implement IdmClient backend support * feat: daemon idm now uses IdmClient * fix: implement channel destruction propagation * feat: implement request response idm system * feat: implement metrics support * proto: move metrics into GuestMetrics for reusability * fix: log level of guest agent was trace * feat: metrics tree with process information --- Cargo.lock | 126 ++++++++- Cargo.toml | 6 + crates/ctl/Cargo.toml | 4 + crates/ctl/src/cli/metrics.rs | 83 ++++++ crates/ctl/src/cli/mod.rs | 8 +- crates/ctl/src/format.rs | 126 +++++++-- crates/daemon/src/console.rs | 28 +- crates/daemon/src/control.rs | 80 +++++- crates/daemon/src/event.rs | 51 ++-- crates/daemon/src/idm.rs | 208 ++++++++++----- crates/daemon/src/lib.rs | 6 +- crates/daemon/src/metrics.rs | 27 ++ crates/guest/Cargo.toml | 2 +- crates/guest/bin/init.rs | 2 + crates/guest/src/background.rs | 69 ++++- crates/guest/src/init.rs | 37 --- crates/guest/src/lib.rs | 1 + crates/guest/src/metrics.rs | 121 +++++++++ crates/krata/Cargo.toml | 3 + crates/krata/proto/krata/internal/idm.proto | 54 +++- crates/krata/proto/krata/v1/common.proto | 16 ++ crates/krata/proto/krata/v1/control.proto | 10 + crates/krata/src/idm/client.rs | 281 ++++++++++++++++---- crates/krata/src/idm/protocol.rs | 88 ++++++ crates/runtime/src/channel.rs | 21 +- crates/runtime/src/console.rs | 18 -- crates/runtime/src/lib.rs | 13 - 27 files changed, 1211 insertions(+), 278 deletions(-) create mode 100644 crates/ctl/src/cli/metrics.rs create mode 100644 crates/daemon/src/metrics.rs create mode 100644 crates/guest/src/metrics.rs delete mode 100644 crates/runtime/src/console.rs diff --git a/Cargo.lock b/Cargo.lock index 65b5d85..287dffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -439,6 +445,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crossterm" version = "0.27.0" @@ -710,6 +741,17 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "fancy-duration" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ae60718ae501dca9d27fd0e322683c86a95a1a01fac1807aa2f9b035cc0882" +dependencies = [ + "anyhow", + "lazy_static", + "regex", +] + [[package]] name = "fastrand" version = "2.0.2" @@ -1041,6 +1083,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "human_bytes" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f255a4535024abf7640cb288260811fc14794f62b063652ed349f9a6c2348e" + [[package]] name = "humantime" version = "2.1.0" @@ -1228,6 +1276,7 @@ name = "krata" version = "0.0.8" dependencies = [ "anyhow", + "async-trait", "bytes", "libc", "log", @@ -1237,6 +1286,8 @@ dependencies = [ "prost-build", "prost-reflect", "prost-reflect-build", + "prost-types", + "scopeguard", "serde", "tokio", "tokio-stream", @@ -1268,11 +1319,15 @@ dependencies = [ "crossterm", "ctrlc", "env_logger", + "fancy-duration", + "human_bytes", "krata", "log", "prost-reflect", + "prost-types", "serde_json", "serde_yaml", + "termtree", "tokio", "tokio-stream", "tonic", @@ -1323,8 +1378,8 @@ dependencies = [ "serde", "serde_json", "sys-mount", + "sysinfo", "tokio", - "walkdir", ] [[package]] @@ -1718,6 +1773,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -2074,6 +2138,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redb" version = "2.0.0" @@ -2571,6 +2655,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "sysinfo" +version = "0.30.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a84fe4cfc513b41cb2596b624e561ec9e7e1c4b46328e496ed56a53514ef2a" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "tap" version = "1.0.1" @@ -2589,6 +2688,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.58" @@ -3071,6 +3176,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.4", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.4", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index e2aeb3a..59c2c9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,10 @@ ctrlc = "3.4.4" elf = "0.7.4" env_logger = "0.11.0" etherparse = "0.14.3" +fancy-duration = "0.9.2" flate2 = "1.0" futures = "0.3.30" +human_bytes = "0.4" ipnetwork = "0.20.0" libc = "0.2" log = "0.4.20" @@ -55,15 +57,19 @@ path-clean = "1.0.1" prost = "0.12.4" prost-build = "0.12.4" prost-reflect-build = "0.13.0" +prost-types = "0.12.4" rand = "0.8.5" redb = "2.0.0" rtnetlink = "0.14.1" +scopeguard = "1.2.0" serde_json = "1.0.113" serde_yaml = "0.9" sha256 = "1.5.0" signal-hook = "0.3.17" slice-copy = "0.3.0" smoltcp = "0.11.0" +sysinfo = "0.30.9" +termtree = "0.4.1" thiserror = "1.0" tokio-tun = "0.11.4" tonic-build = "0.11.0" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index 47a324a..e93d255 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -16,11 +16,15 @@ comfy-table = { workspace = true } crossterm = { workspace = true } ctrlc = { workspace = true, features = ["termination"] } env_logger = { workspace = true } +fancy-duration = { workspace = true } +human_bytes = { workspace = true } krata = { path = "../krata", version = "^0.0.8" } log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } +prost-types = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } +termtree = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } diff --git a/crates/ctl/src/cli/metrics.rs b/crates/ctl/src/cli/metrics.rs new file mode 100644 index 0000000..17def04 --- /dev/null +++ b/crates/ctl/src/cli/metrics.rs @@ -0,0 +1,83 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use krata::{ + events::EventStream, + v1::{ + common::GuestMetricNode, + control::{control_service_client::ControlServiceClient, ReadGuestMetricsRequest}, + }, +}; + +use tonic::transport::Channel; + +use crate::format::{kv2line, metrics_flat, metrics_tree, proto2dynamic}; + +use super::resolve_guest; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +enum MetricsFormat { + Tree, + Json, + JsonPretty, + Yaml, + KeyValue, +} + +#[derive(Parser)] +#[command(about = "Read metrics from the guest")] +pub struct MetricsCommand { + #[arg(short, long, default_value = "tree", help = "Output format")] + format: MetricsFormat, + #[arg(help = "Guest to read metrics for, either the name or the uuid")] + guest: String, +} + +impl MetricsCommand { + pub async fn run( + self, + mut client: ControlServiceClient, + _events: EventStream, + ) -> Result<()> { + let guest_id: String = resolve_guest(&mut client, &self.guest).await?; + let root = client + .read_guest_metrics(ReadGuestMetricsRequest { guest_id }) + .await? + .into_inner() + .root + .unwrap_or_default(); + match self.format { + MetricsFormat::Tree => { + self.print_metrics_tree(root)?; + } + + MetricsFormat::Json | MetricsFormat::JsonPretty | MetricsFormat::Yaml => { + let value = serde_json::to_value(proto2dynamic(root)?)?; + let encoded = if self.format == MetricsFormat::JsonPretty { + serde_json::to_string_pretty(&value)? + } else if self.format == MetricsFormat::Yaml { + serde_yaml::to_string(&value)? + } else { + serde_json::to_string(&value)? + }; + println!("{}", encoded.trim()); + } + + MetricsFormat::KeyValue => { + self.print_key_value(root)?; + } + } + + Ok(()) + } + + fn print_metrics_tree(&self, root: GuestMetricNode) -> Result<()> { + print!("{}", metrics_tree(root)); + Ok(()) + } + + fn print_key_value(&self, metrics: GuestMetricNode) -> Result<()> { + let kvs = metrics_flat(metrics); + println!("{}", kv2line(kvs)); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 0b9f220..654f959 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -3,6 +3,7 @@ pub mod destroy; pub mod launch; pub mod list; pub mod logs; +pub mod metrics; pub mod resolve; pub mod watch; @@ -17,7 +18,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand, - logs::LogsCommand, resolve::ResolveCommand, watch::WatchCommand, + logs::LogsCommand, metrics::MetricsCommand, resolve::ResolveCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -47,6 +48,7 @@ pub enum Commands { Logs(LogsCommand), Watch(WatchCommand), Resolve(ResolveCommand), + Metrics(MetricsCommand), } impl ControlCommand { @@ -82,6 +84,10 @@ impl ControlCommand { Commands::Resolve(resolve) => { resolve.run(client).await?; } + + Commands::Metrics(metrics) => { + metrics.run(client, events).await?; + } } Ok(()) } diff --git a/crates/ctl/src/format.rs b/crates/ctl/src/format.rs index 79dee5d..0fa0851 100644 --- a/crates/ctl/src/format.rs +++ b/crates/ctl/src/format.rs @@ -1,8 +1,12 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use anyhow::Result; -use krata::v1::common::{Guest, GuestStatus}; -use prost_reflect::{DynamicMessage, ReflectMessage, Value}; +use fancy_duration::FancyDuration; +use human_bytes::human_bytes; +use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus}; +use prost_reflect::{DynamicMessage, FieldDescriptor, ReflectMessage, Value as ReflectValue}; +use prost_types::Value; +use termtree::Tree; pub fn proto2dynamic(proto: impl ReflectMessage) -> Result { Ok(DynamicMessage::decode( @@ -15,38 +19,56 @@ pub fn proto2kv(proto: impl ReflectMessage) -> Result> { let message = proto2dynamic(proto)?; let mut map = HashMap::new(); - fn crawl(prefix: &str, map: &mut HashMap, message: &DynamicMessage) { - for (field, value) in message.fields() { - let path = if prefix.is_empty() { - field.name().to_string() - } else { - format!("{}.{}", prefix, field.name()) - }; - match value { - Value::Message(child) => { - crawl(&path, map, child); + fn crawl( + prefix: String, + field: Option<&FieldDescriptor>, + map: &mut HashMap, + value: &ReflectValue, + ) { + match value { + ReflectValue::Message(child) => { + for (field, field_value) in child.fields() { + let path = if prefix.is_empty() { + field.json_name().to_string() + } else { + format!("{}.{}", prefix, field.json_name()) + }; + crawl(path, Some(&field), map, field_value); } + } - Value::EnumNumber(number) => { - if let Some(e) = field.kind().as_enum() { + ReflectValue::EnumNumber(number) => { + if let Some(kind) = field.map(|x| x.kind()) { + if let Some(e) = kind.as_enum() { if let Some(value) = e.get_value(*number) { - map.insert(path, value.name().to_string()); + map.insert(prefix, value.name().to_string()); } } } + } - Value::String(value) => { - map.insert(path, value.clone()); - } + ReflectValue::String(value) => { + map.insert(prefix.to_string(), value.clone()); + } - _ => { - map.insert(path, value.to_string()); + ReflectValue::List(value) => { + for (x, value) in value.iter().enumerate() { + crawl(format!("{}.{}", prefix, x), field, map, value); } } + + _ => { + map.insert(prefix.to_string(), value.to_string()); + } } } - crawl("", &mut map, &message); + crawl( + "".to_string(), + None, + &mut map, + &ReflectValue::Message(message), + ); Ok(map) } @@ -85,3 +107,63 @@ pub fn guest_simple_line(guest: &Guest) -> String { let ipv6 = network.map(|x| x.guest_ipv6.as_str()).unwrap_or(""); format!("{}\t{}\t{}\t{}\t{}", guest.id, state, name, ipv4, ipv6) } + +fn metrics_value_string(value: Value) -> String { + proto2dynamic(value) + .map(|x| serde_json::to_string(&x).ok()) + .ok() + .flatten() + .unwrap_or_default() +} + +fn metrics_value_numeric(value: Value) -> f64 { + let string = metrics_value_string(value); + string.parse::().ok().unwrap_or(f64::NAN) +} + +fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String { + match format { + GuestMetricFormat::Bytes => human_bytes(metrics_value_numeric(value)), + GuestMetricFormat::Integer => (metrics_value_numeric(value) as u64).to_string(), + GuestMetricFormat::DurationSeconds => { + FancyDuration(Duration::from_secs_f64(metrics_value_numeric(value))).to_string() + } + _ => metrics_value_string(value), + } +} + +fn metrics_flat_internal(prefix: &str, node: GuestMetricNode, map: &mut HashMap) { + if let Some(value) = node.value { + map.insert(prefix.to_string(), metrics_value_string(value)); + } + + for child in node.children { + let path = if prefix.is_empty() { + child.name.to_string() + } else { + format!("{}.{}", prefix, child.name) + }; + metrics_flat_internal(&path, child, map); + } +} + +pub fn metrics_flat(root: GuestMetricNode) -> HashMap { + let mut map = HashMap::new(); + metrics_flat_internal("", root, &mut map); + map +} + +pub fn metrics_tree(node: GuestMetricNode) -> Tree { + let mut name = node.name.to_string(); + let format = node.format(); + if let Some(value) = node.value { + let value_string = metrics_value_pretty(value, format); + name.push_str(&format!(": {}", value_string)); + } + + let mut tree = Tree::new(name); + for child in node.children { + tree.push(metrics_tree(child)); + } + tree +} diff --git a/crates/daemon/src/console.rs b/crates/daemon/src/console.rs index a6e57bf..38e4792 100644 --- a/crates/daemon/src/console.rs +++ b/crates/daemon/src/console.rs @@ -79,7 +79,7 @@ impl Drop for DaemonConsoleHandle { pub struct DaemonConsole { listeners: ListenerMap, buffers: BufferMap, - receiver: Receiver<(u32, Vec)>, + receiver: Receiver<(u32, Option>)>, sender: Sender<(u32, Vec)>, task: JoinHandle<()>, } @@ -124,16 +124,22 @@ impl DaemonConsole { }; let mut buffers = self.buffers.lock().await; - let buffer = buffers - .entry(domid) - .or_insert_with_key(|_| RawConsoleBuffer::boxed()); - buffer.extend_from_slice(&data); - drop(buffers); - let mut listeners = self.listeners.lock().await; - if let Some(senders) = listeners.get_mut(&domid) { - senders.retain(|sender| { - !matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_))) - }); + if let Some(data) = data { + let buffer = buffers + .entry(domid) + .or_insert_with_key(|_| RawConsoleBuffer::boxed()); + buffer.extend_from_slice(&data); + drop(buffers); + let mut listeners = self.listeners.lock().await; + if let Some(senders) = listeners.get_mut(&domid) { + senders.retain(|sender| { + !matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_))) + }); + } + } else { + buffers.remove(&domid); + let mut listeners = self.listeners.lock().await; + listeners.remove(&domid); } } Ok(()) diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 3883f83..68d1b0e 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -2,13 +2,19 @@ use std::{pin::Pin, str::FromStr}; use async_stream::try_stream; use futures::Stream; -use krata::v1::{ - common::{Guest, GuestState, GuestStatus}, - control::{ - control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, - CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - ListGuestsReply, ListGuestsRequest, ResolveGuestReply, ResolveGuestRequest, - WatchEventsReply, WatchEventsRequest, +use krata::{ + idm::protocol::{ + idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType, + IdmMetricsRequest, + }, + v1::{ + common::{Guest, GuestState, GuestStatus}, + control::{ + control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, + CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, + ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, + ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest, + }, }, }; use tokio::{ @@ -19,7 +25,10 @@ use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use uuid::Uuid; -use crate::{console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext}; +use crate::{ + console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, + metrics::idm_metric_to_api, +}; pub struct ApiError { message: String, @@ -43,6 +52,7 @@ impl From for Status { pub struct RuntimeControlService { events: DaemonEventContext, console: DaemonConsoleHandle, + idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, } @@ -51,12 +61,14 @@ impl RuntimeControlService { pub fn new( events: DaemonEventContext, console: DaemonConsoleHandle, + idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, ) -> Self { Self { events, console, + idm, guests, guest_reconciler_notify, } @@ -269,6 +281,58 @@ impl ControlService for RuntimeControlService { Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream)) } + async fn read_guest_metrics( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { + message: error.to_string(), + })?; + let guest = self + .guests + .read(uuid) + .await + .map_err(|error| ApiError { + message: error.to_string(), + })? + .ok_or_else(|| ApiError { + message: "guest did not exist in the database".to_string(), + })?; + + let Some(ref state) = guest.state else { + return Err(ApiError { + message: "guest did not have state".to_string(), + } + .into()); + }; + + let domid = state.domid; + if domid == 0 { + return Err(ApiError { + message: "invalid domid on the guest".to_string(), + } + .into()); + } + + let client = self.idm.client(domid).await.map_err(|error| ApiError { + message: error.to_string(), + })?; + + let response = client + .send(IdmRequestType::Metrics(IdmMetricsRequest {})) + .await + .map_err(|error| ApiError { + message: error.to_string(), + })?; + + let mut reply = ReadGuestMetricsReply::default(); + if let IdmResponseType::Metrics(metrics) = response { + reply.root = metrics.root.map(idm_metric_to_api); + } + Ok(Response::new(reply)) + } + async fn watch_events( &self, request: Request, diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 903db8b..55d9872 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -6,10 +6,10 @@ use std::{ use anyhow::Result; use krata::{ - idm::protocol::{idm_event::Event, IdmPacket}, + idm::protocol::{idm_event::Event, IdmEvent}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; -use log::error; +use log::{error, warn}; use tokio::{ select, sync::{ @@ -21,15 +21,12 @@ use tokio::{ }; use uuid::Uuid; -use crate::{ - db::GuestStore, - idm::{DaemonIdmHandle, DaemonIdmSubscribeHandle}, -}; +use crate::{db::GuestStore, idm::DaemonIdmHandle}; pub type DaemonEvent = krata::v1::control::watch_events_reply::Event; const EVENT_CHANNEL_QUEUE_LEN: usize = 1000; -const IDM_CHANNEL_QUEUE_LEN: usize = 1000; +const IDM_EVENT_CHANNEL_QUEUE_LEN: usize = 1000; #[derive(Clone)] pub struct DaemonEventContext { @@ -52,9 +49,9 @@ pub struct DaemonEventGenerator { guest_reconciler_notify: Sender, feed: broadcast::Receiver, idm: DaemonIdmHandle, - idms: HashMap, - idm_sender: Sender<(u32, IdmPacket)>, - idm_receiver: Receiver<(u32, IdmPacket)>, + idms: HashMap)>, + idm_sender: Sender<(u32, IdmEvent)>, + idm_receiver: Receiver<(u32, IdmEvent)>, _event_sender: broadcast::Sender, } @@ -65,7 +62,7 @@ impl DaemonEventGenerator { idm: DaemonIdmHandle, ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); - let (idm_sender, idm_receiver) = channel(IDM_CHANNEL_QUEUE_LEN); + let (idm_sender, idm_receiver) = channel(IDM_EVENT_CHANNEL_QUEUE_LEN); let generator = DaemonEventGenerator { guests, guest_reconciler_notify, @@ -97,15 +94,27 @@ impl DaemonEventGenerator { match status { GuestStatus::Started => { if let Entry::Vacant(e) = self.idms.entry(domid) { - let subscribe = - self.idm.subscribe(domid, self.idm_sender.clone()).await?; - e.insert((id, subscribe)); + let client = self.idm.client(domid).await?; + let mut receiver = client.subscribe().await?; + let sender = self.idm_sender.clone(); + let task = tokio::task::spawn(async move { + loop { + let Ok(event) = receiver.recv().await else { + break; + }; + + if let Err(error) = sender.send((domid, event)).await { + warn!("unable to deliver idm event: {}", error); + } + } + }); + e.insert((id, task)); } } GuestStatus::Destroyed => { if let Some((_, handle)) = self.idms.remove(&domid) { - handle.unsubscribe().await?; + handle.abort(); } } @@ -116,11 +125,11 @@ impl DaemonEventGenerator { Ok(()) } - async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> { - if let Some(Event::Exit(exit)) = packet.event.and_then(|x| x.event) { - self.handle_exit_code(id, exit.code).await?; + async fn handle_idm_event(&mut self, id: Uuid, event: IdmEvent) -> Result<()> { + match event.event { + Some(Event::Exit(exit)) => self.handle_exit_code(id, exit.code).await, + None => Ok(()), } - Ok(()) } async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> { @@ -142,9 +151,9 @@ impl DaemonEventGenerator { async fn evaluate(&mut self) -> Result<()> { select! { x = self.idm_receiver.recv() => match x { - Some((domid, packet)) => { + Some((domid, event)) => { if let Some((id, _)) = self.idms.get(&domid) { - self.handle_idm_packet(*id, packet).await?; + self.handle_idm_event(*id, event).await?; } Ok(()) }, diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index abca368..cc43a9f 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -1,53 +1,40 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, +}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use bytes::{Buf, BytesMut}; -use krata::idm::protocol::IdmPacket; +use krata::idm::{ + client::{IdmBackend, IdmClient}, + protocol::IdmPacket, +}; use kratart::channel::ChannelService; use log::{error, warn}; use prost::Message; use tokio::{ + select, sync::{ - mpsc::{Receiver, Sender}, + mpsc::{channel, Receiver, Sender}, Mutex, }, task::JoinHandle, }; -type ListenerMap = Arc>>>; +type BackendFeedMap = Arc>>>; +type ClientMap = Arc>>; #[derive(Clone)] pub struct DaemonIdmHandle { - listeners: ListenerMap, + clients: ClientMap, + feeds: BackendFeedMap, + tx_sender: Sender<(u32, IdmPacket)>, task: Arc>, } -#[derive(Clone)] -pub struct DaemonIdmSubscribeHandle { - domid: u32, - listeners: ListenerMap, -} - -impl DaemonIdmSubscribeHandle { - pub async fn unsubscribe(&self) -> Result<()> { - let mut guard = self.listeners.lock().await; - let _ = guard.remove(&self.domid); - Ok(()) - } -} - impl DaemonIdmHandle { - pub async fn subscribe( - &self, - domid: u32, - sender: Sender<(u32, IdmPacket)>, - ) -> Result { - let mut guard = self.listeners.lock().await; - guard.insert(domid, sender); - Ok(DaemonIdmSubscribeHandle { - domid, - listeners: self.listeners.clone(), - }) + pub async fn client(&self, domid: u32) -> Result { + client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await } } @@ -60,25 +47,38 @@ impl Drop for DaemonIdmHandle { } pub struct DaemonIdm { - listeners: ListenerMap, - receiver: Receiver<(u32, Vec)>, + clients: ClientMap, + feeds: BackendFeedMap, + tx_sender: Sender<(u32, IdmPacket)>, + tx_raw_sender: Sender<(u32, Vec)>, + tx_receiver: Receiver<(u32, IdmPacket)>, + rx_receiver: Receiver<(u32, Option>)>, task: JoinHandle<()>, } impl DaemonIdm { pub async fn new() -> Result { - let (service, _, receiver) = ChannelService::new("krata-channel".to_string(), None).await?; + let (service, tx_raw_sender, rx_receiver) = + ChannelService::new("krata-channel".to_string(), None).await?; + let (tx_sender, tx_receiver) = channel(100); let task = service.launch().await?; - let listeners = Arc::new(Mutex::new(HashMap::new())); + let clients = Arc::new(Mutex::new(HashMap::new())); + let feeds = Arc::new(Mutex::new(HashMap::new())); Ok(DaemonIdm { - receiver, + rx_receiver, + tx_receiver, + tx_sender, + tx_raw_sender, task, - listeners, + clients, + feeds, }) } pub async fn launch(mut self) -> Result { - let listeners = self.listeners.clone(); + let clients = self.clients.clone(); + let feeds = self.feeds.clone(); + let tx_sender = self.tx_sender.clone(); let task = tokio::task::spawn(async move { let mut buffers: HashMap = HashMap::new(); if let Err(error) = self.process(&mut buffers).await { @@ -86,43 +86,74 @@ impl DaemonIdm { } }); Ok(DaemonIdmHandle { - listeners, + clients, + feeds, + tx_sender, task: Arc::new(task), }) } async fn process(&mut self, buffers: &mut HashMap) -> Result<()> { loop { - let Some((domid, data)) = self.receiver.recv().await else { - break; - }; + select! { + x = self.rx_receiver.recv() => match x { + Some((domid, data)) => { + if let Some(data) = data { + let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); + buffer.extend_from_slice(&data); + if buffer.len() < 4 { + continue; + } + let size = (buffer[0] as u32 | (buffer[1] as u32) << 8 | (buffer[2] as u32) << 16 | (buffer[3] as u32) << 24) as usize; + let needed = size + 4; + if buffer.len() < needed { + continue; + } + let mut packet = buffer.split_to(needed); + packet.advance(4); + match IdmPacket::decode(packet) { + Ok(packet) => { + let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; + let guard = self.feeds.lock().await; + if let Some(feed) = guard.get(&domid) { + let _ = feed.try_send(packet); + } + } - let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); - buffer.extend_from_slice(&data); - if buffer.len() < 2 { - continue; - } - let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize; - let needed = size + 2; - if buffer.len() < needed { - continue; - } - let mut packet = buffer.split_to(needed); - packet.advance(2); - match IdmPacket::decode(packet) { - Ok(packet) => { - let guard = self.listeners.lock().await; - if let Some(sender) = guard.get(&domid) { - if let Err(error) = sender.try_send((domid, packet)) { - warn!("dropped idm packet from domain {}: {}", domid, error); + Err(packet) => { + warn!("received invalid packet from domain {}: {}", domid, packet); + } + } + } else { + let mut clients = self.clients.lock().await; + let mut feeds = self.feeds.lock().await; + clients.remove(&domid); + feeds.remove(&domid); } + }, + + None => { + break; + } + }, + x = self.tx_receiver.recv() => match x { + Some((domid, packet)) => { + let data = packet.encode_to_vec(); + let mut buffer = vec![0u8; 4]; + let length = data.len() as u32; + buffer[0] = length as u8; + buffer[1] = (length << 8) as u8; + buffer[2] = (length << 16) as u8; + buffer[3] = (length << 24) as u8; + buffer.extend_from_slice(&data); + self.tx_raw_sender.send((domid, buffer)).await?; + }, + + None => { + break; } } - - Err(packet) => { - warn!("received invalid packet from domain {}: {}", domid, packet); - } - } + }; } Ok(()) } @@ -133,3 +164,50 @@ impl Drop for DaemonIdm { self.task.abort(); } } + +async fn client_or_create( + domid: u32, + tx_sender: &Sender<(u32, IdmPacket)>, + clients: &ClientMap, + feeds: &BackendFeedMap, +) -> Result { + let mut clients = clients.lock().await; + let mut feeds = feeds.lock().await; + match clients.entry(domid) { + Entry::Occupied(entry) => Ok(entry.get().clone()), + Entry::Vacant(entry) => { + let (rx_sender, rx_receiver) = channel(100); + feeds.insert(domid, rx_sender); + let backend = IdmDaemonBackend { + domid, + rx_receiver, + tx_sender: tx_sender.clone(), + }; + let client = IdmClient::new(Box::new(backend) as Box).await?; + entry.insert(client.clone()); + Ok(client) + } + } +} + +pub struct IdmDaemonBackend { + domid: u32, + rx_receiver: Receiver, + tx_sender: Sender<(u32, IdmPacket)>, +} + +#[async_trait::async_trait] +impl IdmBackend for IdmDaemonBackend { + async fn recv(&mut self) -> Result { + if let Some(packet) = self.rx_receiver.recv().await { + Ok(packet) + } else { + Err(anyhow!("idm receive channel closed")) + } + } + + async fn send(&mut self, packet: IdmPacket) -> Result<()> { + self.tx_sender.send((self.domid, packet)).await?; + Ok(()) + } +} diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index ec68c92..1240121 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -24,6 +24,7 @@ pub mod control; pub mod db; pub mod event; pub mod idm; +pub mod metrics; pub mod reconcile; pub struct Daemon { @@ -33,7 +34,7 @@ pub struct Daemon { guest_reconciler_task: JoinHandle<()>, guest_reconciler_notify: Sender, generator_task: JoinHandle<()>, - _idm: DaemonIdmHandle, + idm: DaemonIdmHandle, console: DaemonConsoleHandle, } @@ -69,7 +70,7 @@ impl Daemon { guest_reconciler_task, guest_reconciler_notify, generator_task, - _idm: idm, + idm, console, }) } @@ -78,6 +79,7 @@ impl Daemon { let control_service = RuntimeControlService::new( self.events.clone(), self.console.clone(), + self.idm.clone(), self.guests.clone(), self.guest_reconciler_notify.clone(), ); diff --git a/crates/daemon/src/metrics.rs b/crates/daemon/src/metrics.rs new file mode 100644 index 0000000..a273dd0 --- /dev/null +++ b/crates/daemon/src/metrics.rs @@ -0,0 +1,27 @@ +use krata::{ + idm::protocol::{IdmMetricFormat, IdmMetricNode}, + v1::common::{GuestMetricFormat, GuestMetricNode}, +}; + +fn idm_metric_format_to_api(format: IdmMetricFormat) -> GuestMetricFormat { + match format { + IdmMetricFormat::Unknown => GuestMetricFormat::Unknown, + IdmMetricFormat::Bytes => GuestMetricFormat::Bytes, + IdmMetricFormat::Integer => GuestMetricFormat::Integer, + IdmMetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, + } +} + +pub fn idm_metric_to_api(node: IdmMetricNode) -> GuestMetricNode { + let format = node.format(); + GuestMetricNode { + name: node.name, + value: node.value, + format: idm_metric_format_to_api(format).into(), + children: node + .children + .into_iter() + .map(idm_metric_to_api) + .collect::>(), + } +} diff --git a/crates/guest/Cargo.toml b/crates/guest/Cargo.toml index 228164e..e94234c 100644 --- a/crates/guest/Cargo.toml +++ b/crates/guest/Cargo.toml @@ -25,8 +25,8 @@ rtnetlink = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sys-mount = { workspace = true } +sysinfo = { workspace = true } tokio = { workspace = true } -walkdir = { workspace = true } [lib] name = "krataguest" diff --git a/crates/guest/bin/init.rs b/crates/guest/bin/init.rs index 11d7b6c..62678e2 100644 --- a/crates/guest/bin/init.rs +++ b/crates/guest/bin/init.rs @@ -23,6 +23,8 @@ async fn main() -> Result<()> { if let Err(error) = guest.init().await { error!("failed to initialize guest: {}", error); death(127).await?; + return Ok(()); } + death(1).await?; Ok(()) } diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index 8252a6f..a2c2929 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -1,16 +1,20 @@ use crate::{ childwait::{ChildEvent, ChildWait}, death, + metrics::MetricsCollector, }; use anyhow::Result; use cgroups_rs::Cgroup; use krata::idm::{ client::IdmClient, - protocol::{idm_event::Event, IdmEvent, IdmExitEvent, IdmPacket}, + protocol::{ + idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent, + IdmMetricsResponse, IdmPingResponse, IdmRequest, + }, }; -use log::error; +use log::debug; use nix::unistd::Pid; -use tokio::select; +use tokio::{select, sync::broadcast}; pub struct GuestBackground { idm: IdmClient, @@ -30,16 +34,37 @@ impl GuestBackground { } pub async fn run(&mut self) -> Result<()> { + let mut event_subscription = self.idm.subscribe().await?; + let mut requests_subscription = self.idm.requests().await?; loop { select! { - x = self.idm.receiver.recv() => match x { - Some(_packet) => { + x = event_subscription.recv() => match x { + Ok(_event) => { }, - None => { - error!("idm packet channel closed"); + Err(broadcast::error::RecvError::Closed) => { + debug!("idm packet channel closed"); break; + }, + + _ => { + continue; + } + }, + + x = requests_subscription.recv() => match x { + Ok(request) => { + self.handle_idm_request(request).await?; + }, + + Err(broadcast::error::RecvError::Closed) => { + debug!("idm packet channel closed"); + break; + }, + + _ => { + continue; } }, @@ -54,14 +79,34 @@ impl GuestBackground { Ok(()) } + async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> { + let id = packet.id; + + match packet.request { + Some(Request::Ping(_)) => { + self.idm + .respond(id, Response::Ping(IdmPingResponse {})) + .await?; + } + + Some(Request::Metrics(_)) => { + let metrics = MetricsCollector::new()?; + let root = metrics.collect()?; + let response = IdmMetricsResponse { root: Some(root) }; + + self.idm.respond(id, Response::Metrics(response)).await?; + } + + None => {} + } + Ok(()) + } + async fn child_event(&mut self, event: ChildEvent) -> Result<()> { if event.pid == self.child { self.idm - .sender - .send(IdmPacket { - event: Some(IdmEvent { - event: Some(Event::Exit(IdmExitEvent { code: event.status })), - }), + .emit(IdmEvent { + event: Some(Event::Exit(IdmExitEvent { code: event.status })), }) .await?; death(event.status).await?; diff --git a/crates/guest/src/init.rs b/crates/guest/src/init.rs index 09ba711..b2fb6c9 100644 --- a/crates/guest/src/init.rs +++ b/crates/guest/src/init.rs @@ -17,14 +17,12 @@ use std::fs::{File, OpenOptions, Permissions}; use std::io; use std::net::{Ipv4Addr, Ipv6Addr}; use std::os::fd::AsRawFd; -use std::os::linux::fs::MetadataExt; use std::os::unix::ffi::OsStrExt; use std::os::unix::fs::{chroot, PermissionsExt}; use std::path::{Path, PathBuf}; use std::str::FromStr; use sys_mount::{FilesystemType, Mount, MountFlags}; use tokio::fs; -use walkdir::WalkDir; use crate::background::GuestBackground; @@ -88,7 +86,6 @@ impl GuestInit { let launch = self.parse_launch_config().await?; self.mount_new_root().await?; - self.nuke_initrd().await?; self.bind_new_root().await?; if let Some(hostname) = launch.hostname.clone() { @@ -271,40 +268,6 @@ impl GuestInit { Ok(serde_json::from_str(&content)?) } - async fn nuke_initrd(&mut self) -> Result<()> { - trace!("nuking initrd"); - let initrd_dev = fs::metadata("/").await?.st_dev(); - for item in WalkDir::new("/") - .same_file_system(true) - .follow_links(false) - .contents_first(true) - { - if item.is_err() { - continue; - } - - let item = item?; - let metadata = match item.metadata() { - Ok(value) => value, - Err(_) => continue, - }; - - if metadata.st_dev() != initrd_dev { - continue; - } - - if metadata.is_symlink() || metadata.is_file() { - let _ = fs::remove_file(item.path()).await; - trace!("deleting file {:?}", item.path()); - } else if metadata.is_dir() { - let _ = fs::remove_dir(item.path()).await; - trace!("deleting directory {:?}", item.path()); - } - } - trace!("nuked initrd"); - Ok(()) - } - async fn bind_new_root(&mut self) -> Result<()> { self.mount_move_subtree(Path::new(SYS_PATH), Path::new(NEW_ROOT_SYS_PATH)) .await?; diff --git a/crates/guest/src/lib.rs b/crates/guest/src/lib.rs index cfddbce..b88e833 100644 --- a/crates/guest/src/lib.rs +++ b/crates/guest/src/lib.rs @@ -7,6 +7,7 @@ use xenstore::{XsdClient, XsdInterface}; pub mod background; pub mod childwait; pub mod init; +pub mod metrics; pub async fn death(code: c_int) -> Result<()> { let store = XsdClient::open().await?; diff --git a/crates/guest/src/metrics.rs b/crates/guest/src/metrics.rs new file mode 100644 index 0000000..f9ec377 --- /dev/null +++ b/crates/guest/src/metrics.rs @@ -0,0 +1,121 @@ +use std::{ops::Add, path::Path}; + +use anyhow::Result; +use krata::idm::protocol::{IdmMetricFormat, IdmMetricNode}; +use sysinfo::Process; + +pub struct MetricsCollector {} + +impl MetricsCollector { + pub fn new() -> Result { + Ok(MetricsCollector {}) + } + + pub fn collect(&self) -> Result { + let mut sysinfo = sysinfo::System::new(); + Ok(IdmMetricNode::structural( + "guest", + vec![ + self.collect_system(&mut sysinfo)?, + self.collect_processes(&mut sysinfo)?, + ], + )) + } + + fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { + sysinfo.refresh_memory(); + Ok(IdmMetricNode::structural( + "system", + vec![IdmMetricNode::structural( + "memory", + vec![ + IdmMetricNode::value("total", sysinfo.total_memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("used", sysinfo.used_memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("free", sysinfo.free_memory(), IdmMetricFormat::Bytes), + ], + )], + )) + } + + fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { + sysinfo.refresh_processes(); + let mut processes = Vec::new(); + let mut sysinfo_processes = sysinfo.processes().values().collect::>(); + sysinfo_processes.sort_by_key(|x| x.pid()); + for process in sysinfo_processes { + if process.thread_kind().is_some() { + continue; + } + processes.push(MetricsCollector::process_node(process)?); + } + Ok(IdmMetricNode::structural("process", processes)) + } + + fn process_node(process: &Process) -> Result { + let mut metrics = vec![]; + + if let Some(parent) = process.parent() { + metrics.push(IdmMetricNode::value( + "parent", + parent.as_u32() as u64, + IdmMetricFormat::Integer, + )); + } + + if let Some(exe) = process.exe().and_then(path_as_str) { + metrics.push(IdmMetricNode::raw_value("executable", exe)); + } + + if let Some(working_directory) = process.cwd().and_then(path_as_str) { + metrics.push(IdmMetricNode::raw_value("cwd", working_directory)); + } + + let cmdline = process.cmd().to_vec(); + metrics.push(IdmMetricNode::raw_value("cmdline", cmdline)); + metrics.push(IdmMetricNode::structural( + "memory", + vec![ + IdmMetricNode::value("resident", process.memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("virtual", process.virtual_memory(), IdmMetricFormat::Bytes), + ], + )); + + metrics.push(IdmMetricNode::value( + "lifetime", + process.run_time(), + IdmMetricFormat::DurationSeconds, + )); + metrics.push(IdmMetricNode::value( + "uid", + process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "gid", + process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "euid", + process + .effective_user_id() + .map(|x| (*x).add(0)) + .unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "egid", + process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + + Ok(IdmMetricNode::structural( + process.pid().to_string(), + metrics, + )) + } +} + +fn path_as_str(path: &Path) -> Option { + String::from_utf8(path.as_os_str().as_encoded_bytes().to_vec()).ok() +} diff --git a/crates/krata/Cargo.toml b/crates/krata/Cargo.toml index a845dcb..548d4fc 100644 --- a/crates/krata/Cargo.toml +++ b/crates/krata/Cargo.toml @@ -10,12 +10,15 @@ resolver = "2" [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true } libc = { workspace = true } log = { workspace = true } once_cell = { workspace = true } prost = { workspace = true } prost-reflect = { workspace = true } +prost-types = { workspace = true } +scopeguard = { workspace = true } serde = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/internal/idm.proto index 015fe5a..af088da 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/internal/idm.proto @@ -6,8 +6,14 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.internal.idm"; option java_outer_classname = "IdmProto"; -message IdmExitEvent { - int32 code = 1; +import "google/protobuf/struct.proto"; + +message IdmPacket { + oneof content { + IdmEvent event = 1; + IdmRequest request = 2; + IdmResponse response = 3; + } } message IdmEvent { @@ -16,6 +22,46 @@ message IdmEvent { } } -message IdmPacket { - IdmEvent event = 1; +message IdmExitEvent { + int32 code = 1; +} + +message IdmRequest { + uint64 id = 1; + oneof request { + IdmPingRequest ping = 2; + IdmMetricsRequest metrics = 3; + } +} + +message IdmPingRequest {} + +message IdmMetricsRequest {} + +message IdmResponse { + uint64 id = 1; + oneof response { + IdmPingResponse ping = 2; + IdmMetricsResponse metrics = 3; + } +} + +message IdmPingResponse {} + +message IdmMetricsResponse { + IdmMetricNode root = 1; +} + +message IdmMetricNode { + string name = 1; + google.protobuf.Value value = 2; + IdmMetricFormat format = 3; + repeated IdmMetricNode children = 4; +} + +enum IdmMetricFormat { + IDM_METRIC_FORMAT_UNKNOWN = 0; + IDM_METRIC_FORMAT_BYTES = 1; + IDM_METRIC_FORMAT_INTEGER = 2; + IDM_METRIC_FORMAT_DURATION_SECONDS = 3; } diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 9b5ddba..2fe3488 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -6,6 +6,8 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.common"; option java_outer_classname = "CommonProto"; +import "google/protobuf/struct.proto"; + message Guest { string id = 1; GuestSpec spec = 2; @@ -80,3 +82,17 @@ message GuestExitInfo { message GuestErrorInfo { string message = 1; } + +message GuestMetricNode { + string name = 1; + google.protobuf.Value value = 2; + GuestMetricFormat format = 3; + repeated GuestMetricNode children = 4; +} + +enum GuestMetricFormat { + GUEST_METRIC_FORMAT_UNKNOWN = 0; + GUEST_METRIC_FORMAT_BYTES = 1; + GUEST_METRIC_FORMAT_INTEGER = 2; + GUEST_METRIC_FORMAT_DURATION_SECONDS = 3; +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 0b84323..8ebd193 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -15,6 +15,8 @@ service ControlService { rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); + + rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); } message CreateGuestRequest { @@ -65,3 +67,11 @@ message WatchEventsReply { message GuestChangedEvent { krata.v1.common.Guest guest = 1; } + +message ReadGuestMetricsRequest { + string guest_id = 1; +} + +message ReadGuestMetricsReply { + krata.v1.common.GuestMetricNode root = 1; +} diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index a3c7096..3df72e6 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -1,8 +1,19 @@ -use std::path::Path; +use std::{ + collections::HashMap, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; -use super::protocol::IdmPacket; +use crate::idm::protocol::idm_packet::Content; + +use super::protocol::{ + idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse, +}; use anyhow::{anyhow, Result}; -use bytes::BytesMut; use log::{debug, error}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; use prost::Message; @@ -10,44 +21,39 @@ use tokio::{ fs::File, io::{unix::AsyncFd, AsyncReadExt, AsyncWriteExt}, select, - sync::mpsc::{channel, Receiver, Sender}, + sync::{ + broadcast, + mpsc::{channel, Receiver, Sender}, + oneshot, Mutex, + }, task::JoinHandle, + time::timeout, }; +type RequestMap = Arc>>>; + const IDM_PACKET_QUEUE_LEN: usize = 100; +const IDM_REQUEST_TIMEOUT_SECS: u64 = 10; +const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024; -pub struct IdmClient { - pub receiver: Receiver, - pub sender: Sender, - task: JoinHandle<()>, +#[async_trait::async_trait] +pub trait IdmBackend: Send { + async fn recv(&mut self) -> Result; + async fn send(&mut self, packet: IdmPacket) -> Result<()>; } -impl Drop for IdmClient { - fn drop(&mut self) { - self.task.abort(); - } +pub struct IdmFileBackend { + read_fd: Arc>>, + write: Arc>, } -impl IdmClient { - pub async fn open>(path: P) -> Result { - let file = File::options() - .read(true) - .write(true) - .create(false) - .open(path) - .await?; - IdmClient::set_raw_port(&file)?; - let (rx_sender, rx_receiver) = channel(IDM_PACKET_QUEUE_LEN); - let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); - let task = tokio::task::spawn(async move { - if let Err(error) = IdmClient::process(file, rx_sender, tx_receiver).await { - debug!("failed to handle idm client processing: {}", error); - } - }); - Ok(IdmClient { - receiver: rx_receiver, - sender: tx_sender, - task, +impl IdmFileBackend { + pub async fn new(read_file: File, write_file: File) -> Result { + IdmFileBackend::set_raw_port(&read_file)?; + IdmFileBackend::set_raw_port(&write_file)?; + Ok(IdmFileBackend { + read_fd: Arc::new(Mutex::new(AsyncFd::new(read_file)?)), + write: Arc::new(Mutex::new(write_file)), }) } @@ -57,31 +63,199 @@ impl IdmClient { tcsetattr(file, SetArg::TCSANOW, &termios)?; Ok(()) } +} + +#[async_trait::async_trait] +impl IdmBackend for IdmFileBackend { + async fn recv(&mut self) -> Result { + let mut fd = self.read_fd.lock().await; + let mut guard = fd.readable_mut().await?; + let size = guard.get_inner_mut().read_u32_le().await?; + if size == 0 { + return Ok(IdmPacket::default()); + } + let mut buffer = vec![0u8; size as usize]; + guard.get_inner_mut().read_exact(&mut buffer).await?; + match IdmPacket::decode(buffer.as_slice()) { + Ok(packet) => Ok(packet), + Err(error) => Err(anyhow!("received invalid idm packet: {}", error)), + } + } + + async fn send(&mut self, packet: IdmPacket) -> Result<()> { + let mut file = self.write.lock().await; + let data = packet.encode_to_vec(); + file.write_u32_le(data.len() as u32).await?; + file.write_all(&data).await?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct IdmClient { + request_backend_sender: broadcast::Sender, + next_request_id: Arc>, + event_receiver_sender: broadcast::Sender, + tx_sender: Sender, + requests: RequestMap, + task: Arc>, +} + +impl Drop for IdmClient { + fn drop(&mut self) { + if Arc::strong_count(&self.task) <= 1 { + self.task.abort(); + } + } +} + +impl IdmClient { + pub async fn new(backend: Box) -> Result { + let requests = Arc::new(Mutex::new(HashMap::new())); + let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN); + let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN); + let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); + let backend_event_sender = event_sender.clone(); + let request_backend_sender = internal_request_backend_sender.clone(); + let requests_for_client = requests.clone(); + let task = tokio::task::spawn(async move { + if let Err(error) = IdmClient::process( + backend, + backend_event_sender, + requests, + internal_request_backend_sender, + event_receiver, + tx_receiver, + ) + .await + { + debug!("failed to handle idm client processing: {}", error); + } + }); + Ok(IdmClient { + next_request_id: Arc::new(Mutex::new(0)), + event_receiver_sender: event_sender.clone(), + request_backend_sender, + requests: requests_for_client, + tx_sender, + task: Arc::new(task), + }) + } + + pub async fn open>(path: P) -> Result { + let read_file = File::options() + .read(true) + .write(false) + .create(false) + .open(&path) + .await?; + let write_file = File::options() + .read(false) + .write(true) + .create(false) + .open(path) + .await?; + let backend = IdmFileBackend::new(read_file, write_file).await?; + IdmClient::new(Box::new(backend) as Box).await + } + + pub async fn emit(&self, event: IdmEvent) -> Result<()> { + self.tx_sender + .send(IdmPacket { + content: Some(Content::Event(event)), + }) + .await?; + Ok(()) + } + + pub async fn requests(&self) -> Result> { + Ok(self.request_backend_sender.subscribe()) + } + + pub async fn respond(&self, id: u64, response: Response) -> Result<()> { + let packet = IdmPacket { + content: Some(Content::Response(IdmResponse { + id, + response: Some(response), + })), + }; + self.tx_sender.send(packet).await?; + Ok(()) + } + + pub async fn subscribe(&self) -> Result> { + Ok(self.event_receiver_sender.subscribe()) + } + + pub async fn send(&self, request: Request) -> Result { + let (sender, receiver) = oneshot::channel::(); + let req = { + let mut guard = self.next_request_id.lock().await; + let req = *guard; + *guard = req.wrapping_add(1); + req + }; + let mut requests = self.requests.lock().await; + requests.insert(req, sender); + drop(requests); + let success = AtomicBool::new(false); + let _guard = scopeguard::guard(self.requests.clone(), |requests| { + if success.load(Ordering::Acquire) { + return; + } + tokio::task::spawn(async move { + let mut requests = requests.lock().await; + requests.remove(&req); + }); + }); + self.tx_sender + .send(IdmPacket { + content: Some(Content::Request(IdmRequest { + id: req, + request: Some(request), + })), + }) + .await?; + + let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??; + success.store(true, Ordering::Release); + if let Some(response) = response.response { + Ok(response) + } else { + Err(anyhow!("response did not contain any content")) + } + } async fn process( - file: File, - sender: Sender, + mut backend: Box, + event_sender: broadcast::Sender, + requests: RequestMap, + request_backend_sender: broadcast::Sender, + _event_receiver: broadcast::Receiver, mut receiver: Receiver, ) -> Result<()> { - let mut file = AsyncFd::new(file)?; loop { select! { - x = file.readable_mut() => match x { - Ok(mut guard) => { - let size = guard.get_inner_mut().read_u16_le().await?; - if size == 0 { - continue; - } - let mut buffer = BytesMut::with_capacity(size as usize); - guard.get_inner_mut().read_exact(&mut buffer).await?; - match IdmPacket::decode(buffer) { - Ok(packet) => { - sender.send(packet).await?; + x = backend.recv() => match x { + Ok(packet) => { + match packet.content { + Some(Content::Event(event)) => { + let _ = event_sender.send(event); }, - Err(error) => { - error!("received invalid idm packet: {}", error); - } + Some(Content::Request(request)) => { + let _ = request_backend_sender.send(request); + }, + + Some(Content::Response(response)) => { + let mut requests = requests.lock().await; + if let Some(sender) = requests.remove(&response.id) { + drop(requests); + let _ = sender.send(response); + } + }, + + _ => {}, } }, @@ -91,13 +265,12 @@ impl IdmClient { }, x = receiver.recv() => match x { Some(packet) => { - let data = packet.encode_to_vec(); - if data.len() > u16::MAX as usize { - error!("unable to send idm packet, packet size exceeded (tried to send {} bytes)", data.len()); + let length = packet.encoded_len(); + if length > IDM_PACKET_MAX_SIZE { + error!("unable to send idm packet, packet size exceeded (tried to send {} bytes)", length); continue; } - file.get_mut().write_u16_le(data.len() as u16).await?; - file.get_mut().write_all(&data).await?; + backend.send(packet).await?; }, None => { diff --git a/crates/krata/src/idm/protocol.rs b/crates/krata/src/idm/protocol.rs index f55e9b5..10e70a7 100644 --- a/crates/krata/src/idm/protocol.rs +++ b/crates/krata/src/idm/protocol.rs @@ -1 +1,89 @@ +use prost_types::{ListValue, Value}; + include!(concat!(env!("OUT_DIR"), "/krata.internal.idm.rs")); + +pub trait AsIdmMetricValue { + fn as_metric_value(&self) -> Value; +} + +impl IdmMetricNode { + pub fn structural>(name: N, children: Vec) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: None, + format: IdmMetricFormat::Unknown.into(), + children, + } + } + + pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: Some(value.as_metric_value()), + format: IdmMetricFormat::Unknown.into(), + children: vec![], + } + } + + pub fn value, V: AsIdmMetricValue>( + name: N, + value: V, + format: IdmMetricFormat, + ) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: Some(value.as_metric_value()), + format: format.into(), + children: vec![], + } + } +} + +impl AsIdmMetricValue for String { + fn as_metric_value(&self) -> Value { + Value { + kind: Some(prost_types::value::Kind::StringValue(self.to_string())), + } + } +} + +impl AsIdmMetricValue for &str { + fn as_metric_value(&self) -> Value { + Value { + kind: Some(prost_types::value::Kind::StringValue(self.to_string())), + } + } +} + +impl AsIdmMetricValue for u64 { + fn as_metric_value(&self) -> Value { + numeric(*self as f64) + } +} + +impl AsIdmMetricValue for i64 { + fn as_metric_value(&self) -> Value { + numeric(*self as f64) + } +} + +impl AsIdmMetricValue for f64 { + fn as_metric_value(&self) -> Value { + numeric(*self) + } +} + +impl AsIdmMetricValue for Vec { + fn as_metric_value(&self) -> Value { + let values = self.iter().map(|x| x.as_metric_value()).collect::<_>(); + Value { + kind: Some(prost_types::value::Kind::ListValue(ListValue { values })), + } + } +} + +fn numeric(value: f64) -> Value { + Value { + kind: Some(prost_types::value::Kind::NumberValue(value)), + } +} diff --git a/crates/runtime/src/channel.rs b/crates/runtime/src/channel.rs index c1d12a5..86ec0c7 100644 --- a/crates/runtime/src/channel.rs +++ b/crates/runtime/src/channel.rs @@ -48,7 +48,7 @@ pub struct ChannelService { gnttab: GrantTab, input_receiver: Receiver<(u32, Vec)>, pub input_sender: Sender<(u32, Vec)>, - output_sender: Sender<(u32, Vec)>, + output_sender: Sender<(u32, Option>)>, } impl ChannelService { @@ -58,7 +58,7 @@ impl ChannelService { ) -> Result<( ChannelService, Sender<(u32, Vec)>, - Receiver<(u32, Vec)>, + Receiver<(u32, Option>)>, )> { let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); @@ -203,12 +203,14 @@ pub struct ChannelBackend { pub domid: u32, pub id: u32, pub sender: Sender>, + raw_sender: Sender<(u32, Option>)>, task: JoinHandle<()>, } impl Drop for ChannelBackend { fn drop(&mut self) { self.task.abort(); + let _ = self.raw_sender.try_send((self.domid, None)); debug!( "destroyed channel backend for domain {} channel {}", self.domid, self.id @@ -226,7 +228,7 @@ impl ChannelBackend { store: XsdClient, evtchn: EventChannel, gnttab: GrantTab, - output_sender: Sender<(u32, Vec)>, + output_sender: Sender<(u32, Option>)>, use_reserved_ref: Option, ) -> Result { let processor = KrataChannelBackendProcessor { @@ -242,11 +244,14 @@ impl ChannelBackend { let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN); - let task = processor.launch(output_sender, input_receiver).await?; + let task = processor + .launch(output_sender.clone(), input_receiver) + .await?; Ok(ChannelBackend { domid, id, task, + raw_sender: output_sender, sender: input_sender, }) } @@ -304,7 +309,7 @@ impl KrataChannelBackendProcessor { async fn launch( &self, - output_sender: Sender<(u32, Vec)>, + output_sender: Sender<(u32, Option>)>, input_receiver: Receiver>, ) -> Result> { let owned = self.clone(); @@ -321,7 +326,7 @@ impl KrataChannelBackendProcessor { async fn processor( &self, - sender: Sender<(u32, Vec)>, + sender: Sender<(u32, Option>)>, mut receiver: Receiver>, ) -> Result<()> { self.init().await?; @@ -396,7 +401,7 @@ impl KrataChannelBackendProcessor { unsafe { let buffer = self.read_output_buffer(channel.local_port, &memory).await?; if !buffer.is_empty() { - sender.send((self.domid, buffer)).await?; + sender.send((self.domid, Some(buffer))).await?; } }; @@ -466,7 +471,7 @@ impl KrataChannelBackendProcessor { unsafe { let buffer = self.read_output_buffer(channel.local_port, &memory).await?; if !buffer.is_empty() { - sender.send((self.domid, buffer)).await?; + sender.send((self.domid, Some(buffer))).await?; } }; channel.unmask_sender.send(channel.local_port).await?; diff --git a/crates/runtime/src/console.rs b/crates/runtime/src/console.rs deleted file mode 100644 index 7571226..0000000 --- a/crates/runtime/src/console.rs +++ /dev/null @@ -1,18 +0,0 @@ -use anyhow::Result; -use tokio::fs::File; - -pub struct XenConsole { - pub read_handle: File, - pub write_handle: File, -} - -impl XenConsole { - pub async fn new(tty: &str) -> Result { - let read_handle = File::options().read(true).write(false).open(tty).await?; - let write_handle = File::options().read(false).write(true).open(tty).await?; - Ok(XenConsole { - read_handle, - write_handle, - }) - } -} diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index f6e81d8..1c42763 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -15,7 +15,6 @@ use xenstore::{XsdClient, XsdInterface}; use self::{ autoloop::AutoLoop, - console::XenConsole, launch::{GuestLaunchRequest, GuestLauncher}, }; use krataoci::cache::ImageCache; @@ -23,7 +22,6 @@ use krataoci::cache::ImageCache; pub mod autoloop; pub mod cfgblk; pub mod channel; -pub mod console; pub mod launch; pub struct GuestLoopInfo { @@ -321,17 +319,6 @@ impl Runtime { Ok(uuid) } - pub async fn console(&self, uuid: Uuid) -> Result { - let info = self - .context - .resolve(uuid) - .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?; - let domid = info.domid; - let tty = self.context.xen.get_console_path(domid).await?; - XenConsole::new(&tty).await - } - pub async fn list(&self) -> Result> { self.context.list().await }