diff --git a/Cargo.lock b/Cargo.lock index 0008a5b..287dffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,17 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "fancy-duration" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ae60718ae501dca9d27fd0e322683c86a95a1a01fac1807aa2f9b035cc0882" +dependencies = [ + "anyhow", + "lazy_static", + "regex", +] + [[package]] name = "fastrand" version = "2.0.2" @@ -1072,6 +1083,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "human_bytes" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f255a4535024abf7640cb288260811fc14794f62b063652ed349f9a6c2348e" + [[package]] name = "humantime" version = "2.1.0" @@ -1269,6 +1286,8 @@ dependencies = [ "prost-build", "prost-reflect", "prost-reflect-build", + "prost-types", + "scopeguard", "serde", "tokio", "tokio-stream", @@ -1300,11 +1319,15 @@ dependencies = [ "crossterm", "ctrlc", "env_logger", + "fancy-duration", + "human_bytes", "krata", "log", "prost-reflect", + "prost-types", "serde_json", "serde_yaml", + "termtree", "tokio", "tokio-stream", "tonic", @@ -1357,7 +1380,6 @@ dependencies = [ "sys-mount", "sysinfo", "tokio", - "walkdir", ] [[package]] @@ -2666,6 +2688,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.58" diff --git a/Cargo.toml b/Cargo.toml index 549744e..59c2c9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,10 @@ ctrlc = "3.4.4" elf = "0.7.4" env_logger = "0.11.0" etherparse = "0.14.3" +fancy-duration = "0.9.2" flate2 = "1.0" futures = "0.3.30" +human_bytes = "0.4" ipnetwork = "0.20.0" libc = "0.2" log = "0.4.20" @@ -55,9 +57,11 @@ path-clean = "1.0.1" prost = "0.12.4" prost-build = "0.12.4" prost-reflect-build = "0.13.0" +prost-types = "0.12.4" rand = "0.8.5" redb = "2.0.0" rtnetlink = "0.14.1" +scopeguard = "1.2.0" serde_json = "1.0.113" serde_yaml = "0.9" sha256 = "1.5.0" @@ -65,6 +69,7 @@ signal-hook = "0.3.17" slice-copy = "0.3.0" smoltcp = "0.11.0" sysinfo = "0.30.9" +termtree = "0.4.1" thiserror = "1.0" tokio-tun = "0.11.4" tonic-build = "0.11.0" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index 47a324a..e93d255 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -16,11 +16,15 @@ comfy-table = { workspace = true } crossterm = { workspace = true } ctrlc = { workspace = true, features = ["termination"] } env_logger = { workspace = true } +fancy-duration = { workspace = true } +human_bytes = { workspace = true } krata = { path = "../krata", version = "^0.0.8" } log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } +prost-types = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } +termtree = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } diff --git a/crates/ctl/src/cli/metrics.rs b/crates/ctl/src/cli/metrics.rs index 42c9d7f..17def04 100644 --- a/crates/ctl/src/cli/metrics.rs +++ b/crates/ctl/src/cli/metrics.rs @@ -1,22 +1,22 @@ use anyhow::Result; use clap::{Parser, ValueEnum}; -use comfy_table::{presets::UTF8_FULL_CONDENSED, Table}; use krata::{ events::EventStream, - v1::control::{ - control_service_client::ControlServiceClient, GuestMetrics, ReadGuestMetricsRequest, + v1::{ + common::GuestMetricNode, + control::{control_service_client::ControlServiceClient, ReadGuestMetricsRequest}, }, }; use tonic::transport::Channel; -use crate::format::{kv2line, proto2dynamic, proto2kv}; +use crate::format::{kv2line, metrics_flat, metrics_tree, proto2dynamic}; use super::resolve_guest; #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] enum MetricsFormat { - Table, + Tree, Json, JsonPretty, Yaml, @@ -26,7 +26,7 @@ enum MetricsFormat { #[derive(Parser)] #[command(about = "Read metrics from the guest")] pub struct MetricsCommand { - #[arg(short, long, default_value = "table", help = "Output format")] + #[arg(short, long, default_value = "tree", help = "Output format")] format: MetricsFormat, #[arg(help = "Guest to read metrics for, either the name or the uuid")] guest: String, @@ -39,19 +39,19 @@ impl MetricsCommand { _events: EventStream, ) -> Result<()> { let guest_id: String = resolve_guest(&mut client, &self.guest).await?; - let metrics = client + let root = client .read_guest_metrics(ReadGuestMetricsRequest { guest_id }) .await? .into_inner() - .metrics + .root .unwrap_or_default(); match self.format { - MetricsFormat::Table => { - self.print_metrics_table(metrics)?; + MetricsFormat::Tree => { + self.print_metrics_tree(root)?; } MetricsFormat::Json | MetricsFormat::JsonPretty | MetricsFormat::Yaml => { - let value = serde_json::to_value(proto2dynamic(metrics)?)?; + let value = serde_json::to_value(proto2dynamic(root)?)?; let encoded = if self.format == MetricsFormat::JsonPretty { serde_json::to_string_pretty(&value)? } else if self.format == MetricsFormat::Yaml { @@ -63,29 +63,21 @@ impl MetricsCommand { } MetricsFormat::KeyValue => { - self.print_key_value(metrics)?; + self.print_key_value(root)?; } } Ok(()) } - fn print_metrics_table(&self, metrics: GuestMetrics) -> Result<()> { - let mut table = Table::new(); - table.load_preset(UTF8_FULL_CONDENSED); - table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic); - table.set_header(vec!["metric", "value"]); - let kvs = proto2kv(metrics)?; - for (key, value) in kvs { - table.add_row(vec![key, value]); - } - println!("{}", table); + fn print_metrics_tree(&self, root: GuestMetricNode) -> Result<()> { + print!("{}", metrics_tree(root)); Ok(()) } - fn print_key_value(&self, metrics: GuestMetrics) -> Result<()> { - let kvs = proto2kv(metrics)?; - println!("{}", kv2line(kvs),); + fn print_key_value(&self, metrics: GuestMetricNode) -> Result<()> { + let kvs = metrics_flat(metrics); + println!("{}", kv2line(kvs)); Ok(()) } } diff --git a/crates/ctl/src/format.rs b/crates/ctl/src/format.rs index 79dee5d..0fa0851 100644 --- a/crates/ctl/src/format.rs +++ b/crates/ctl/src/format.rs @@ -1,8 +1,12 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use anyhow::Result; -use krata::v1::common::{Guest, GuestStatus}; -use prost_reflect::{DynamicMessage, ReflectMessage, Value}; +use fancy_duration::FancyDuration; +use human_bytes::human_bytes; +use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus}; +use prost_reflect::{DynamicMessage, FieldDescriptor, ReflectMessage, Value as ReflectValue}; +use prost_types::Value; +use termtree::Tree; pub fn proto2dynamic(proto: impl ReflectMessage) -> Result { Ok(DynamicMessage::decode( @@ -15,38 +19,56 @@ pub fn proto2kv(proto: impl ReflectMessage) -> Result> { let message = proto2dynamic(proto)?; let mut map = HashMap::new(); - fn crawl(prefix: &str, map: &mut HashMap, message: &DynamicMessage) { - for (field, value) in message.fields() { - let path = if prefix.is_empty() { - field.name().to_string() - } else { - format!("{}.{}", prefix, field.name()) - }; - match value { - Value::Message(child) => { - crawl(&path, map, child); + fn crawl( + prefix: String, + field: Option<&FieldDescriptor>, + map: &mut HashMap, + value: &ReflectValue, + ) { + match value { + ReflectValue::Message(child) => { + for (field, field_value) in child.fields() { + let path = if prefix.is_empty() { + field.json_name().to_string() + } else { + format!("{}.{}", prefix, field.json_name()) + }; + crawl(path, Some(&field), map, field_value); } + } - Value::EnumNumber(number) => { - if let Some(e) = field.kind().as_enum() { + ReflectValue::EnumNumber(number) => { + if let Some(kind) = field.map(|x| x.kind()) { + if let Some(e) = kind.as_enum() { if let Some(value) = e.get_value(*number) { - map.insert(path, value.name().to_string()); + map.insert(prefix, value.name().to_string()); } } } + } - Value::String(value) => { - map.insert(path, value.clone()); - } + ReflectValue::String(value) => { + map.insert(prefix.to_string(), value.clone()); + } - _ => { - map.insert(path, value.to_string()); + ReflectValue::List(value) => { + for (x, value) in value.iter().enumerate() { + crawl(format!("{}.{}", prefix, x), field, map, value); } } + + _ => { + map.insert(prefix.to_string(), value.to_string()); + } } } - crawl("", &mut map, &message); + crawl( + "".to_string(), + None, + &mut map, + &ReflectValue::Message(message), + ); Ok(map) } @@ -85,3 +107,63 @@ pub fn guest_simple_line(guest: &Guest) -> String { let ipv6 = network.map(|x| x.guest_ipv6.as_str()).unwrap_or(""); format!("{}\t{}\t{}\t{}\t{}", guest.id, state, name, ipv4, ipv6) } + +fn metrics_value_string(value: Value) -> String { + proto2dynamic(value) + .map(|x| serde_json::to_string(&x).ok()) + .ok() + .flatten() + .unwrap_or_default() +} + +fn metrics_value_numeric(value: Value) -> f64 { + let string = metrics_value_string(value); + string.parse::().ok().unwrap_or(f64::NAN) +} + +fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String { + match format { + GuestMetricFormat::Bytes => human_bytes(metrics_value_numeric(value)), + GuestMetricFormat::Integer => (metrics_value_numeric(value) as u64).to_string(), + GuestMetricFormat::DurationSeconds => { + FancyDuration(Duration::from_secs_f64(metrics_value_numeric(value))).to_string() + } + _ => metrics_value_string(value), + } +} + +fn metrics_flat_internal(prefix: &str, node: GuestMetricNode, map: &mut HashMap) { + if let Some(value) = node.value { + map.insert(prefix.to_string(), metrics_value_string(value)); + } + + for child in node.children { + let path = if prefix.is_empty() { + child.name.to_string() + } else { + format!("{}.{}", prefix, child.name) + }; + metrics_flat_internal(&path, child, map); + } +} + +pub fn metrics_flat(root: GuestMetricNode) -> HashMap { + let mut map = HashMap::new(); + metrics_flat_internal("", root, &mut map); + map +} + +pub fn metrics_tree(node: GuestMetricNode) -> Tree { + let mut name = node.name.to_string(); + let format = node.format(); + if let Some(value) = node.value { + let value_string = metrics_value_pretty(value, format); + name.push_str(&format!(": {}", value_string)); + } + + let mut tree = Tree::new(name); + for child in node.children { + tree.push(metrics_tree(child)); + } + tree +} diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 2eb1f42..68d1b0e 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -12,9 +12,8 @@ use krata::{ control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - GuestMetrics, ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, - ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, - WatchEventsRequest, + ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, + ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest, }, }, }; @@ -28,6 +27,7 @@ use uuid::Uuid; use crate::{ console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, + metrics::idm_metric_to_api, }; pub struct ApiError { @@ -328,10 +328,7 @@ impl ControlService for RuntimeControlService { let mut reply = ReadGuestMetricsReply::default(); if let IdmResponseType::Metrics(metrics) = response { - reply.metrics = Some(GuestMetrics { - total_memory_bytes: metrics.total_memory_bytes, - used_memory_bytes: metrics.used_memory_bytes, - }); + reply.root = metrics.root.map(idm_metric_to_api); } Ok(Response::new(reply)) } diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index 00e757c..cc43a9f 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -101,16 +101,16 @@ impl DaemonIdm { if let Some(data) = data { let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); buffer.extend_from_slice(&data); - if buffer.len() < 2 { + if buffer.len() < 4 { continue; } - let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize; - let needed = size + 2; + let size = (buffer[0] as u32 | (buffer[1] as u32) << 8 | (buffer[2] as u32) << 16 | (buffer[3] as u32) << 24) as usize; + let needed = size + 4; if buffer.len() < needed { continue; } let mut packet = buffer.split_to(needed); - packet.advance(2); + packet.advance(4); match IdmPacket::decode(packet) { Ok(packet) => { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; @@ -139,10 +139,12 @@ impl DaemonIdm { x = self.tx_receiver.recv() => match x { Some((domid, packet)) => { let data = packet.encode_to_vec(); - let mut buffer = vec![0u8; 2]; - let length = data.len(); + let mut buffer = vec![0u8; 4]; + let length = data.len() as u32; buffer[0] = length as u8; buffer[1] = (length << 8) as u8; + buffer[2] = (length << 16) as u8; + buffer[3] = (length << 24) as u8; buffer.extend_from_slice(&data); self.tx_raw_sender.send((domid, buffer)).await?; }, diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 3a898ca..1240121 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -24,6 +24,7 @@ pub mod control; pub mod db; pub mod event; pub mod idm; +pub mod metrics; pub mod reconcile; pub struct Daemon { diff --git a/crates/daemon/src/metrics.rs b/crates/daemon/src/metrics.rs new file mode 100644 index 0000000..a273dd0 --- /dev/null +++ b/crates/daemon/src/metrics.rs @@ -0,0 +1,27 @@ +use krata::{ + idm::protocol::{IdmMetricFormat, IdmMetricNode}, + v1::common::{GuestMetricFormat, GuestMetricNode}, +}; + +fn idm_metric_format_to_api(format: IdmMetricFormat) -> GuestMetricFormat { + match format { + IdmMetricFormat::Unknown => GuestMetricFormat::Unknown, + IdmMetricFormat::Bytes => GuestMetricFormat::Bytes, + IdmMetricFormat::Integer => GuestMetricFormat::Integer, + IdmMetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, + } +} + +pub fn idm_metric_to_api(node: IdmMetricNode) -> GuestMetricNode { + let format = node.format(); + GuestMetricNode { + name: node.name, + value: node.value, + format: idm_metric_format_to_api(format).into(), + children: node + .children + .into_iter() + .map(idm_metric_to_api) + .collect::>(), + } +} diff --git a/crates/guest/Cargo.toml b/crates/guest/Cargo.toml index fdde2a8..e94234c 100644 --- a/crates/guest/Cargo.toml +++ b/crates/guest/Cargo.toml @@ -27,7 +27,6 @@ serde_json = { workspace = true } sys-mount = { workspace = true } sysinfo = { workspace = true } tokio = { workspace = true } -walkdir = { workspace = true } [lib] name = "krataguest" diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index f6bef75..a2c2929 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -1,6 +1,7 @@ use crate::{ childwait::{ChildEvent, ChildWait}, death, + metrics::MetricsCollector, }; use anyhow::Result; use cgroups_rs::Cgroup; @@ -13,7 +14,6 @@ use krata::idm::{ }; use log::debug; use nix::unistd::Pid; -use sysinfo::System; use tokio::{select, sync::broadcast}; pub struct GuestBackground { @@ -90,12 +90,9 @@ impl GuestBackground { } Some(Request::Metrics(_)) => { - let mut sys = System::new(); - sys.refresh_memory(); - let response = IdmMetricsResponse { - total_memory_bytes: sys.total_memory(), - used_memory_bytes: sys.used_memory(), - }; + let metrics = MetricsCollector::new()?; + let root = metrics.collect()?; + let response = IdmMetricsResponse { root: Some(root) }; self.idm.respond(id, Response::Metrics(response)).await?; } diff --git a/crates/guest/src/init.rs b/crates/guest/src/init.rs index 09ba711..b2fb6c9 100644 --- a/crates/guest/src/init.rs +++ b/crates/guest/src/init.rs @@ -17,14 +17,12 @@ use std::fs::{File, OpenOptions, Permissions}; use std::io; use std::net::{Ipv4Addr, Ipv6Addr}; use std::os::fd::AsRawFd; -use std::os::linux::fs::MetadataExt; use std::os::unix::ffi::OsStrExt; use std::os::unix::fs::{chroot, PermissionsExt}; use std::path::{Path, PathBuf}; use std::str::FromStr; use sys_mount::{FilesystemType, Mount, MountFlags}; use tokio::fs; -use walkdir::WalkDir; use crate::background::GuestBackground; @@ -88,7 +86,6 @@ impl GuestInit { let launch = self.parse_launch_config().await?; self.mount_new_root().await?; - self.nuke_initrd().await?; self.bind_new_root().await?; if let Some(hostname) = launch.hostname.clone() { @@ -271,40 +268,6 @@ impl GuestInit { Ok(serde_json::from_str(&content)?) } - async fn nuke_initrd(&mut self) -> Result<()> { - trace!("nuking initrd"); - let initrd_dev = fs::metadata("/").await?.st_dev(); - for item in WalkDir::new("/") - .same_file_system(true) - .follow_links(false) - .contents_first(true) - { - if item.is_err() { - continue; - } - - let item = item?; - let metadata = match item.metadata() { - Ok(value) => value, - Err(_) => continue, - }; - - if metadata.st_dev() != initrd_dev { - continue; - } - - if metadata.is_symlink() || metadata.is_file() { - let _ = fs::remove_file(item.path()).await; - trace!("deleting file {:?}", item.path()); - } else if metadata.is_dir() { - let _ = fs::remove_dir(item.path()).await; - trace!("deleting directory {:?}", item.path()); - } - } - trace!("nuked initrd"); - Ok(()) - } - async fn bind_new_root(&mut self) -> Result<()> { self.mount_move_subtree(Path::new(SYS_PATH), Path::new(NEW_ROOT_SYS_PATH)) .await?; diff --git a/crates/guest/src/lib.rs b/crates/guest/src/lib.rs index cfddbce..b88e833 100644 --- a/crates/guest/src/lib.rs +++ b/crates/guest/src/lib.rs @@ -7,6 +7,7 @@ use xenstore::{XsdClient, XsdInterface}; pub mod background; pub mod childwait; pub mod init; +pub mod metrics; pub async fn death(code: c_int) -> Result<()> { let store = XsdClient::open().await?; diff --git a/crates/guest/src/metrics.rs b/crates/guest/src/metrics.rs new file mode 100644 index 0000000..f9ec377 --- /dev/null +++ b/crates/guest/src/metrics.rs @@ -0,0 +1,121 @@ +use std::{ops::Add, path::Path}; + +use anyhow::Result; +use krata::idm::protocol::{IdmMetricFormat, IdmMetricNode}; +use sysinfo::Process; + +pub struct MetricsCollector {} + +impl MetricsCollector { + pub fn new() -> Result { + Ok(MetricsCollector {}) + } + + pub fn collect(&self) -> Result { + let mut sysinfo = sysinfo::System::new(); + Ok(IdmMetricNode::structural( + "guest", + vec![ + self.collect_system(&mut sysinfo)?, + self.collect_processes(&mut sysinfo)?, + ], + )) + } + + fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { + sysinfo.refresh_memory(); + Ok(IdmMetricNode::structural( + "system", + vec![IdmMetricNode::structural( + "memory", + vec![ + IdmMetricNode::value("total", sysinfo.total_memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("used", sysinfo.used_memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("free", sysinfo.free_memory(), IdmMetricFormat::Bytes), + ], + )], + )) + } + + fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { + sysinfo.refresh_processes(); + let mut processes = Vec::new(); + let mut sysinfo_processes = sysinfo.processes().values().collect::>(); + sysinfo_processes.sort_by_key(|x| x.pid()); + for process in sysinfo_processes { + if process.thread_kind().is_some() { + continue; + } + processes.push(MetricsCollector::process_node(process)?); + } + Ok(IdmMetricNode::structural("process", processes)) + } + + fn process_node(process: &Process) -> Result { + let mut metrics = vec![]; + + if let Some(parent) = process.parent() { + metrics.push(IdmMetricNode::value( + "parent", + parent.as_u32() as u64, + IdmMetricFormat::Integer, + )); + } + + if let Some(exe) = process.exe().and_then(path_as_str) { + metrics.push(IdmMetricNode::raw_value("executable", exe)); + } + + if let Some(working_directory) = process.cwd().and_then(path_as_str) { + metrics.push(IdmMetricNode::raw_value("cwd", working_directory)); + } + + let cmdline = process.cmd().to_vec(); + metrics.push(IdmMetricNode::raw_value("cmdline", cmdline)); + metrics.push(IdmMetricNode::structural( + "memory", + vec![ + IdmMetricNode::value("resident", process.memory(), IdmMetricFormat::Bytes), + IdmMetricNode::value("virtual", process.virtual_memory(), IdmMetricFormat::Bytes), + ], + )); + + metrics.push(IdmMetricNode::value( + "lifetime", + process.run_time(), + IdmMetricFormat::DurationSeconds, + )); + metrics.push(IdmMetricNode::value( + "uid", + process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "gid", + process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "euid", + process + .effective_user_id() + .map(|x| (*x).add(0)) + .unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + metrics.push(IdmMetricNode::value( + "egid", + process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64, + IdmMetricFormat::Integer, + )); + + Ok(IdmMetricNode::structural( + process.pid().to_string(), + metrics, + )) + } +} + +fn path_as_str(path: &Path) -> Option { + String::from_utf8(path.as_os_str().as_encoded_bytes().to_vec()).ok() +} diff --git a/crates/krata/Cargo.toml b/crates/krata/Cargo.toml index e0f46bc..548d4fc 100644 --- a/crates/krata/Cargo.toml +++ b/crates/krata/Cargo.toml @@ -17,6 +17,8 @@ log = { workspace = true } once_cell = { workspace = true } prost = { workspace = true } prost-reflect = { workspace = true } +prost-types = { workspace = true } +scopeguard = { workspace = true } serde = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/internal/idm.proto index 921c13c..af088da 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/internal/idm.proto @@ -6,6 +6,8 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.internal.idm"; option java_outer_classname = "IdmProto"; +import "google/protobuf/struct.proto"; + message IdmPacket { oneof content { IdmEvent event = 1; @@ -47,6 +49,19 @@ message IdmResponse { message IdmPingResponse {} message IdmMetricsResponse { - uint64 total_memory_bytes = 1; - uint64 used_memory_bytes = 2; + IdmMetricNode root = 1; +} + +message IdmMetricNode { + string name = 1; + google.protobuf.Value value = 2; + IdmMetricFormat format = 3; + repeated IdmMetricNode children = 4; +} + +enum IdmMetricFormat { + IDM_METRIC_FORMAT_UNKNOWN = 0; + IDM_METRIC_FORMAT_BYTES = 1; + IDM_METRIC_FORMAT_INTEGER = 2; + IDM_METRIC_FORMAT_DURATION_SECONDS = 3; } diff --git a/crates/krata/proto/krata/v1/common.proto b/crates/krata/proto/krata/v1/common.proto index 9b5ddba..2fe3488 100644 --- a/crates/krata/proto/krata/v1/common.proto +++ b/crates/krata/proto/krata/v1/common.proto @@ -6,6 +6,8 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.common"; option java_outer_classname = "CommonProto"; +import "google/protobuf/struct.proto"; + message Guest { string id = 1; GuestSpec spec = 2; @@ -80,3 +82,17 @@ message GuestExitInfo { message GuestErrorInfo { string message = 1; } + +message GuestMetricNode { + string name = 1; + google.protobuf.Value value = 2; + GuestMetricFormat format = 3; + repeated GuestMetricNode children = 4; +} + +enum GuestMetricFormat { + GUEST_METRIC_FORMAT_UNKNOWN = 0; + GUEST_METRIC_FORMAT_BYTES = 1; + GUEST_METRIC_FORMAT_INTEGER = 2; + GUEST_METRIC_FORMAT_DURATION_SECONDS = 3; +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 0c91af3..8ebd193 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -73,10 +73,5 @@ message ReadGuestMetricsRequest { } message ReadGuestMetricsReply { - GuestMetrics metrics = 1; -} - -message GuestMetrics { - uint64 total_memory_bytes = 1; - uint64 used_memory_bytes = 2; + krata.v1.common.GuestMetricNode root = 1; } diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 5b38171..3df72e6 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -1,4 +1,12 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use crate::idm::protocol::idm_packet::Content; @@ -19,11 +27,14 @@ use tokio::{ oneshot, Mutex, }, task::JoinHandle, + time::timeout, }; type RequestMap = Arc>>>; const IDM_PACKET_QUEUE_LEN: usize = 100; +const IDM_REQUEST_TIMEOUT_SECS: u64 = 10; +const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024; #[async_trait::async_trait] pub trait IdmBackend: Send { @@ -59,7 +70,7 @@ impl IdmBackend for IdmFileBackend { async fn recv(&mut self) -> Result { let mut fd = self.read_fd.lock().await; let mut guard = fd.readable_mut().await?; - let size = guard.get_inner_mut().read_u16_le().await?; + let size = guard.get_inner_mut().read_u32_le().await?; if size == 0 { return Ok(IdmPacket::default()); } @@ -74,7 +85,7 @@ impl IdmBackend for IdmFileBackend { async fn send(&mut self, packet: IdmPacket) -> Result<()> { let mut file = self.write.lock().await; let data = packet.encode_to_vec(); - file.write_u16_le(data.len() as u16).await?; + file.write_u32_le(data.len() as u32).await?; file.write_all(&data).await?; Ok(()) } @@ -177,16 +188,26 @@ impl IdmClient { } pub async fn send(&self, request: Request) -> Result { - let (sender, receiver) = oneshot::channel(); - let mut requests = self.requests.lock().await; + let (sender, receiver) = oneshot::channel::(); let req = { let mut guard = self.next_request_id.lock().await; let req = *guard; *guard = req.wrapping_add(1); req }; + let mut requests = self.requests.lock().await; requests.insert(req, sender); drop(requests); + let success = AtomicBool::new(false); + let _guard = scopeguard::guard(self.requests.clone(), |requests| { + if success.load(Ordering::Acquire) { + return; + } + tokio::task::spawn(async move { + let mut requests = requests.lock().await; + requests.remove(&req); + }); + }); self.tx_sender .send(IdmPacket { content: Some(Content::Request(IdmRequest { @@ -196,7 +217,9 @@ impl IdmClient { }) .await?; - if let Some(response) = receiver.await?.response { + let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??; + success.store(true, Ordering::Release); + if let Some(response) = response.response { Ok(response) } else { Err(anyhow!("response did not contain any content")) @@ -243,7 +266,7 @@ impl IdmClient { x = receiver.recv() => match x { Some(packet) => { let length = packet.encoded_len(); - if length > u16::MAX as usize { + if length > IDM_PACKET_MAX_SIZE { error!("unable to send idm packet, packet size exceeded (tried to send {} bytes)", length); continue; } diff --git a/crates/krata/src/idm/protocol.rs b/crates/krata/src/idm/protocol.rs index f55e9b5..10e70a7 100644 --- a/crates/krata/src/idm/protocol.rs +++ b/crates/krata/src/idm/protocol.rs @@ -1 +1,89 @@ +use prost_types::{ListValue, Value}; + include!(concat!(env!("OUT_DIR"), "/krata.internal.idm.rs")); + +pub trait AsIdmMetricValue { + fn as_metric_value(&self) -> Value; +} + +impl IdmMetricNode { + pub fn structural>(name: N, children: Vec) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: None, + format: IdmMetricFormat::Unknown.into(), + children, + } + } + + pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: Some(value.as_metric_value()), + format: IdmMetricFormat::Unknown.into(), + children: vec![], + } + } + + pub fn value, V: AsIdmMetricValue>( + name: N, + value: V, + format: IdmMetricFormat, + ) -> IdmMetricNode { + IdmMetricNode { + name: name.as_ref().to_string(), + value: Some(value.as_metric_value()), + format: format.into(), + children: vec![], + } + } +} + +impl AsIdmMetricValue for String { + fn as_metric_value(&self) -> Value { + Value { + kind: Some(prost_types::value::Kind::StringValue(self.to_string())), + } + } +} + +impl AsIdmMetricValue for &str { + fn as_metric_value(&self) -> Value { + Value { + kind: Some(prost_types::value::Kind::StringValue(self.to_string())), + } + } +} + +impl AsIdmMetricValue for u64 { + fn as_metric_value(&self) -> Value { + numeric(*self as f64) + } +} + +impl AsIdmMetricValue for i64 { + fn as_metric_value(&self) -> Value { + numeric(*self as f64) + } +} + +impl AsIdmMetricValue for f64 { + fn as_metric_value(&self) -> Value { + numeric(*self) + } +} + +impl AsIdmMetricValue for Vec { + fn as_metric_value(&self) -> Value { + let values = self.iter().map(|x| x.as_metric_value()).collect::<_>(); + Value { + kind: Some(prost_types::value::Kind::ListValue(ListValue { values })), + } + } +} + +fn numeric(value: f64) -> Value { + Value { + kind: Some(prost_types::value::Kind::NumberValue(value)), + } +}