diff --git a/Cargo.lock b/Cargo.lock index 4104ae3..0008a5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -439,6 +445,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crossterm" version = "0.27.0" @@ -1324,6 +1355,7 @@ dependencies = [ "serde", "serde_json", "sys-mount", + "sysinfo", "tokio", "walkdir", ] @@ -1719,6 +1751,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -2075,6 +2116,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redb" version = "2.0.0" @@ -2572,6 +2633,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "sysinfo" +version = "0.30.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a84fe4cfc513b41cb2596b624e561ec9e7e1c4b46328e496ed56a53514ef2a" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "tap" version = "1.0.1" @@ -3072,6 +3148,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.4", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.4", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index e2aeb3a..549744e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ sha256 = "1.5.0" signal-hook = "0.3.17" slice-copy = "0.3.0" smoltcp = "0.11.0" +sysinfo = "0.30.9" thiserror = "1.0" tokio-tun = "0.11.4" tonic-build = "0.11.0" diff --git a/crates/ctl/src/cli/metrics.rs b/crates/ctl/src/cli/metrics.rs new file mode 100644 index 0000000..cbb2223 --- /dev/null +++ b/crates/ctl/src/cli/metrics.rs @@ -0,0 +1,90 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use comfy_table::{presets::UTF8_FULL_CONDENSED, Table}; +use krata::{ + events::EventStream, + v1::control::{ + control_service_client::ControlServiceClient, ReadGuestMetricsReply, + ReadGuestMetricsRequest, + }, +}; + +use tonic::transport::Channel; + +use crate::format::{kv2line, proto2dynamic, proto2kv}; + +use super::resolve_guest; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +enum MetricsFormat { + Table, + Json, + JsonPretty, + Yaml, + KeyValue, +} + +#[derive(Parser)] +#[command(about = "Read metrics from the guest")] +pub struct MetricsCommand { + #[arg(short, long, default_value = "table", help = "Output format")] + format: MetricsFormat, + #[arg(help = "Guest to read metrics for, either the name or the uuid")] + guest: String, +} + +impl MetricsCommand { + pub async fn run( + self, + mut client: ControlServiceClient, + _events: EventStream, + ) -> Result<()> { + let guest_id: String = resolve_guest(&mut client, &self.guest).await?; + let reply = client + .read_guest_metrics(ReadGuestMetricsRequest { guest_id }) + .await? + .into_inner(); + match self.format { + MetricsFormat::Table => { + self.print_metrics_table(reply)?; + } + + MetricsFormat::Json | MetricsFormat::JsonPretty | MetricsFormat::Yaml => { + let value = serde_json::to_value(proto2dynamic(reply)?)?; + let encoded = if self.format == MetricsFormat::JsonPretty { + serde_json::to_string_pretty(&value)? + } else if self.format == MetricsFormat::Yaml { + serde_yaml::to_string(&value)? + } else { + serde_json::to_string(&value)? + }; + println!("{}", encoded.trim()); + } + + MetricsFormat::KeyValue => { + self.print_key_value(reply)?; + } + } + + Ok(()) + } + + fn print_metrics_table(&self, reply: ReadGuestMetricsReply) -> Result<()> { + let mut table = Table::new(); + table.load_preset(UTF8_FULL_CONDENSED); + table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic); + table.set_header(vec!["metric", "value"]); + let kvs = proto2kv(reply)?; + for (key, value) in kvs { + table.add_row(vec![key, value]); + } + println!("{}", table); + Ok(()) + } + + fn print_key_value(&self, metrics: ReadGuestMetricsReply) -> Result<()> { + let kvs = proto2kv(metrics)?; + println!("{}", kv2line(kvs),); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 0b9f220..654f959 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -3,6 +3,7 @@ pub mod destroy; pub mod launch; pub mod list; pub mod logs; +pub mod metrics; pub mod resolve; pub mod watch; @@ -17,7 +18,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand, - logs::LogsCommand, resolve::ResolveCommand, watch::WatchCommand, + logs::LogsCommand, metrics::MetricsCommand, resolve::ResolveCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -47,6 +48,7 @@ pub enum Commands { Logs(LogsCommand), Watch(WatchCommand), Resolve(ResolveCommand), + Metrics(MetricsCommand), } impl ControlCommand { @@ -82,6 +84,10 @@ impl ControlCommand { Commands::Resolve(resolve) => { resolve.run(client).await?; } + + Commands::Metrics(metrics) => { + metrics.run(client, events).await?; + } } Ok(()) } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 3883f83..fe32131 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -2,13 +2,19 @@ use std::{pin::Pin, str::FromStr}; use async_stream::try_stream; use futures::Stream; -use krata::v1::{ - common::{Guest, GuestState, GuestStatus}, - control::{ - control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, - CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, - ListGuestsReply, ListGuestsRequest, ResolveGuestReply, ResolveGuestRequest, - WatchEventsReply, WatchEventsRequest, +use krata::{ + idm::protocol::{ + idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType, + IdmMetricsRequest, + }, + v1::{ + common::{Guest, GuestState, GuestStatus}, + control::{ + control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, + CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, + ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, + ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest, + }, }, }; use tokio::{ @@ -19,7 +25,9 @@ use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use uuid::Uuid; -use crate::{console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext}; +use crate::{ + console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, +}; pub struct ApiError { message: String, @@ -43,6 +51,7 @@ impl From for Status { pub struct RuntimeControlService { events: DaemonEventContext, console: DaemonConsoleHandle, + idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, } @@ -51,12 +60,14 @@ impl RuntimeControlService { pub fn new( events: DaemonEventContext, console: DaemonConsoleHandle, + idm: DaemonIdmHandle, guests: GuestStore, guest_reconciler_notify: Sender, ) -> Self { Self { events, console, + idm, guests, guest_reconciler_notify, } @@ -269,6 +280,59 @@ impl ControlService for RuntimeControlService { Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream)) } + async fn read_guest_metrics( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { + message: error.to_string(), + })?; + let guest = self + .guests + .read(uuid) + .await + .map_err(|error| ApiError { + message: error.to_string(), + })? + .ok_or_else(|| ApiError { + message: "guest did not exist in the database".to_string(), + })?; + + let Some(ref state) = guest.state else { + return Err(ApiError { + message: "guest did not have state".to_string(), + } + .into()); + }; + + let domid = state.domid; + if domid == 0 { + return Err(ApiError { + message: "invalid domid on the guest".to_string(), + } + .into()); + } + + let client = self.idm.client(domid).await.map_err(|error| ApiError { + message: error.to_string(), + })?; + + let response = client + .send(IdmRequestType::Metrics(IdmMetricsRequest {})) + .await + .map_err(|error| ApiError { + message: error.to_string(), + })?; + + let mut reply = ReadGuestMetricsReply::default(); + if let IdmResponseType::Metrics(metrics) = response { + reply.total_memory_bytes = metrics.total_memory_bytes; + reply.used_memory_bytes = metrics.used_memory_bytes; + } + Ok(Response::new(reply)) + } + async fn watch_events( &self, request: Request, diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index 2a155cf..00e757c 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -139,7 +139,12 @@ impl DaemonIdm { x = self.tx_receiver.recv() => match x { Some((domid, packet)) => { let data = packet.encode_to_vec(); - self.tx_raw_sender.send((domid, data)).await?; + let mut buffer = vec![0u8; 2]; + let length = data.len(); + buffer[0] = length as u8; + buffer[1] = (length << 8) as u8; + buffer.extend_from_slice(&data); + self.tx_raw_sender.send((domid, buffer)).await?; }, None => { diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index ec68c92..3a898ca 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -33,7 +33,7 @@ pub struct Daemon { guest_reconciler_task: JoinHandle<()>, guest_reconciler_notify: Sender, generator_task: JoinHandle<()>, - _idm: DaemonIdmHandle, + idm: DaemonIdmHandle, console: DaemonConsoleHandle, } @@ -69,7 +69,7 @@ impl Daemon { guest_reconciler_task, guest_reconciler_notify, generator_task, - _idm: idm, + idm, console, }) } @@ -78,6 +78,7 @@ impl Daemon { let control_service = RuntimeControlService::new( self.events.clone(), self.console.clone(), + self.idm.clone(), self.guests.clone(), self.guest_reconciler_notify.clone(), ); diff --git a/crates/guest/Cargo.toml b/crates/guest/Cargo.toml index 228164e..fdde2a8 100644 --- a/crates/guest/Cargo.toml +++ b/crates/guest/Cargo.toml @@ -25,6 +25,7 @@ rtnetlink = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sys-mount = { workspace = true } +sysinfo = { workspace = true } tokio = { workspace = true } walkdir = { workspace = true } diff --git a/crates/guest/bin/init.rs b/crates/guest/bin/init.rs index 62678e2..9f1b453 100644 --- a/crates/guest/bin/init.rs +++ b/crates/guest/bin/init.rs @@ -7,7 +7,7 @@ use std::env; #[tokio::main] async fn main() -> Result<()> { env::set_var("RUST_BACKTRACE", "1"); - env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init(); + env_logger::Builder::from_env(Env::default().default_filter_or("trace")).init(); if env::var("KRATA_UNSAFE_ALWAYS_ALLOW_INIT").unwrap_or("0".to_string()) != "1" { let pid = std::process::id(); if pid > 3 { diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index a8fb732..f6bef75 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -8,11 +8,12 @@ use krata::idm::{ client::IdmClient, protocol::{ idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent, - IdmPingResponse, IdmRequest, + IdmMetricsResponse, IdmPingResponse, IdmRequest, }, }; use log::debug; use nix::unistd::Pid; +use sysinfo::System; use tokio::{select, sync::broadcast}; pub struct GuestBackground { @@ -80,10 +81,26 @@ impl GuestBackground { async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> { let id = packet.id; - if let Some(Request::Ping(_)) = packet.request { - self.idm - .respond(id, Response::Ping(IdmPingResponse {})) - .await?; + + match packet.request { + Some(Request::Ping(_)) => { + self.idm + .respond(id, Response::Ping(IdmPingResponse {})) + .await?; + } + + Some(Request::Metrics(_)) => { + let mut sys = System::new(); + sys.refresh_memory(); + let response = IdmMetricsResponse { + total_memory_bytes: sys.total_memory(), + used_memory_bytes: sys.used_memory(), + }; + + self.idm.respond(id, Response::Metrics(response)).await?; + } + + None => {} } Ok(()) } diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/internal/idm.proto index 0ec96de..921c13c 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/internal/idm.proto @@ -28,16 +28,25 @@ message IdmRequest { uint64 id = 1; oneof request { IdmPingRequest ping = 2; + IdmMetricsRequest metrics = 3; } } message IdmPingRequest {} +message IdmMetricsRequest {} + message IdmResponse { uint64 id = 1; oneof response { IdmPingResponse ping = 2; + IdmMetricsResponse metrics = 3; } } message IdmPingResponse {} + +message IdmMetricsResponse { + uint64 total_memory_bytes = 1; + uint64 used_memory_bytes = 2; +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 0b84323..6634cd1 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -15,6 +15,8 @@ service ControlService { rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); + + rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); } message CreateGuestRequest { @@ -65,3 +67,12 @@ message WatchEventsReply { message GuestChangedEvent { krata.v1.common.Guest guest = 1; } + +message ReadGuestMetricsRequest { + string guest_id = 1; +} + +message ReadGuestMetricsReply { + uint64 total_memory_bytes = 1; + uint64 used_memory_bytes = 2; +} diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 2bb5482..5b38171 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -6,7 +6,6 @@ use super::protocol::{ idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse, }; use anyhow::{anyhow, Result}; -use bytes::BytesMut; use log::{debug, error}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; use prost::Message; @@ -33,14 +32,17 @@ pub trait IdmBackend: Send { } pub struct IdmFileBackend { - fd: Arc>>, + read_fd: Arc>>, + write: Arc>, } impl IdmFileBackend { - pub async fn new(file: File) -> Result { - IdmFileBackend::set_raw_port(&file)?; + pub async fn new(read_file: File, write_file: File) -> Result { + IdmFileBackend::set_raw_port(&read_file)?; + IdmFileBackend::set_raw_port(&write_file)?; Ok(IdmFileBackend { - fd: Arc::new(Mutex::new(AsyncFd::new(file)?)), + read_fd: Arc::new(Mutex::new(AsyncFd::new(read_file)?)), + write: Arc::new(Mutex::new(write_file)), }) } @@ -55,26 +57,25 @@ impl IdmFileBackend { #[async_trait::async_trait] impl IdmBackend for IdmFileBackend { async fn recv(&mut self) -> Result { - let mut fd = self.fd.lock().await; + let mut fd = self.read_fd.lock().await; let mut guard = fd.readable_mut().await?; let size = guard.get_inner_mut().read_u16_le().await?; if size == 0 { return Ok(IdmPacket::default()); } - let mut buffer = BytesMut::with_capacity(size as usize); + let mut buffer = vec![0u8; size as usize]; guard.get_inner_mut().read_exact(&mut buffer).await?; - match IdmPacket::decode(buffer) { + match IdmPacket::decode(buffer.as_slice()) { Ok(packet) => Ok(packet), - Err(error) => Err(anyhow!("received invalid idm packet: {}", error)), } } async fn send(&mut self, packet: IdmPacket) -> Result<()> { - let mut fd = self.fd.lock().await; + let mut file = self.write.lock().await; let data = packet.encode_to_vec(); - fd.get_mut().write_u16_le(data.len() as u16).await?; - fd.get_mut().write_all(&data).await?; + file.write_u16_le(data.len() as u16).await?; + file.write_all(&data).await?; Ok(()) } } @@ -105,6 +106,7 @@ impl IdmClient { let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); let backend_event_sender = event_sender.clone(); let request_backend_sender = internal_request_backend_sender.clone(); + let requests_for_client = requests.clone(); let task = tokio::task::spawn(async move { if let Err(error) = IdmClient::process( backend, @@ -123,20 +125,26 @@ impl IdmClient { next_request_id: Arc::new(Mutex::new(0)), event_receiver_sender: event_sender.clone(), request_backend_sender, - requests: Arc::new(Mutex::new(HashMap::new())), + requests: requests_for_client, tx_sender, task: Arc::new(task), }) } pub async fn open>(path: P) -> Result { - let file = File::options() + let read_file = File::options() .read(true) + .write(false) + .create(false) + .open(&path) + .await?; + let write_file = File::options() + .read(false) .write(true) .create(false) .open(path) .await?; - let backend = IdmFileBackend::new(file).await?; + let backend = IdmFileBackend::new(read_file, write_file).await?; IdmClient::new(Box::new(backend) as Box).await }