From 1627cbcdd7f46b56dd74b11888f4494897ed28cc Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sun, 14 Apr 2024 04:54:21 -0700 Subject: [PATCH] feat: idm snooping (#71) Implement IDM snooping, a new feature that lets you snoop on messages between guests and the host. The feature exposes the IDM packets send and receives to the API, allowing kratactl to now listen for messages and feed them to a user for debugging purposes. --- crates/ctl/src/cli/idm_snoop.rs | 74 +++++++++++++++++++ crates/ctl/src/cli/mod.rs | 11 ++- crates/daemon/src/control.rs | 20 ++++- crates/daemon/src/idm.rs | 25 ++++++- crates/krata/build.rs | 10 +-- .../proto/krata/{internal => bus}/idm.proto | 4 +- crates/krata/proto/krata/v1/control.proto | 10 +++ .../krata/src/{idm/protocol.rs => bus/idm.rs} | 2 +- crates/krata/src/bus/mod.rs | 1 + crates/krata/src/idm/client.rs | 5 +- crates/krata/src/idm/mod.rs | 2 +- crates/krata/src/lib.rs | 1 + 12 files changed, 145 insertions(+), 20 deletions(-) create mode 100644 crates/ctl/src/cli/idm_snoop.rs rename crates/krata/proto/krata/{internal => bus}/idm.proto (93%) rename crates/krata/src/{idm/protocol.rs => bus/idm.rs} (97%) create mode 100644 crates/krata/src/bus/mod.rs diff --git a/crates/ctl/src/cli/idm_snoop.rs b/crates/ctl/src/cli/idm_snoop.rs new file mode 100644 index 0000000..3a779a5 --- /dev/null +++ b/crates/ctl/src/cli/idm_snoop.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use krata::{ + events::EventStream, + v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest}, +}; + +use tokio_stream::StreamExt; +use tonic::transport::Channel; + +use crate::format::{kv2line, proto2dynamic, proto2kv}; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +enum IdmSnoopFormat { + Simple, + Jsonl, + KeyValue, +} + +#[derive(Parser)] +#[command(about = "Snoop on the IDM bus")] +pub struct IdmSnoopCommand { + #[arg(short, long, default_value = "simple", help = "Output format")] + format: IdmSnoopFormat, +} + +impl IdmSnoopCommand { + pub async fn run( + self, + mut client: ControlServiceClient, + _events: EventStream, + ) -> Result<()> { + let mut stream = client.snoop_idm(SnoopIdmRequest {}).await?.into_inner(); + + while let Some(reply) = stream.next().await { + let reply = reply?; + match self.format { + IdmSnoopFormat::Simple => { + self.print_simple(reply)?; + } + + IdmSnoopFormat::Jsonl => { + let value = serde_json::to_value(proto2dynamic(reply)?)?; + let encoded = serde_json::to_string(&value)?; + println!("{}", encoded.trim()); + } + + IdmSnoopFormat::KeyValue => { + self.print_key_value(reply)?; + } + } + } + + Ok(()) + } + + fn print_simple(&self, reply: SnoopIdmReply) -> Result<()> { + let from = reply.from; + let to = reply.to; + let Some(packet) = reply.packet else { + return Ok(()); + }; + let value = serde_json::to_value(proto2dynamic(packet)?)?; + let encoded = serde_json::to_string(&value)?; + println!("({} -> {}) {}", from, to, encoded); + Ok(()) + } + + fn print_key_value(&self, reply: SnoopIdmReply) -> Result<()> { + let kvs = proto2kv(reply)?; + println!("{}", kv2line(kvs)); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 654f959..1926567 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -1,5 +1,6 @@ pub mod attach; pub mod destroy; +pub mod idm_snoop; pub mod launch; pub mod list; pub mod logs; @@ -17,8 +18,9 @@ use krata::{ use tonic::{transport::Channel, Request}; use self::{ - attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand, - logs::LogsCommand, metrics::MetricsCommand, resolve::ResolveCommand, watch::WatchCommand, + attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, + launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, + resolve::ResolveCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -49,6 +51,7 @@ pub enum Commands { Watch(WatchCommand), Resolve(ResolveCommand), Metrics(MetricsCommand), + IdmSnoop(IdmSnoopCommand), } impl ControlCommand { @@ -88,6 +91,10 @@ impl ControlCommand { Commands::Metrics(metrics) => { metrics.run(client, events).await?; } + + Commands::IdmSnoop(snoop) => { + snoop.run(client, events).await?; + } } Ok(()) } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 68d1b0e..596d11d 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -13,7 +13,8 @@ use krata::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, - ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest, + ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest, + WatchEventsReply, WatchEventsRequest, }, }, }; @@ -88,6 +89,9 @@ impl ControlService for RuntimeControlService { type WatchEventsStream = Pin> + Send + 'static>>; + type SnoopIdmStream = + Pin> + Send + 'static>>; + async fn create_guest( &self, request: Request, @@ -346,4 +350,18 @@ impl ControlService for RuntimeControlService { }; Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) } + + async fn snoop_idm( + &self, + request: Request, + ) -> Result, Status> { + let _ = request.into_inner(); + let mut messages = self.idm.snoop(); + let output = try_stream! { + while let Ok(event) = messages.recv().await { + yield SnoopIdmReply { from: event.from, to: event.to, packet: Some(event.packet) }; + } + }; + Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream)) + } } diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index cc43a9f..7525def 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -15,6 +15,7 @@ use prost::Message; use tokio::{ select, sync::{ + broadcast, mpsc::{channel, Receiver, Sender}, Mutex, }, @@ -30,9 +31,14 @@ pub struct DaemonIdmHandle { feeds: BackendFeedMap, tx_sender: Sender<(u32, IdmPacket)>, task: Arc>, + snoop_sender: broadcast::Sender, } impl DaemonIdmHandle { + pub fn snoop(&self) -> broadcast::Receiver { + self.snoop_sender.subscribe() + } + pub async fn client(&self, domid: u32) -> Result { client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await } @@ -46,6 +52,13 @@ impl Drop for DaemonIdmHandle { } } +#[derive(Clone)] +pub struct DaemonIdmSnoopPacket { + pub from: u32, + pub to: u32, + pub packet: IdmPacket, +} + pub struct DaemonIdm { clients: ClientMap, feeds: BackendFeedMap, @@ -53,6 +66,7 @@ pub struct DaemonIdm { tx_raw_sender: Sender<(u32, Vec)>, tx_receiver: Receiver<(u32, IdmPacket)>, rx_receiver: Receiver<(u32, Option>)>, + snoop_sender: broadcast::Sender, task: JoinHandle<()>, } @@ -61,6 +75,7 @@ impl DaemonIdm { let (service, tx_raw_sender, rx_receiver) = ChannelService::new("krata-channel".to_string(), None).await?; let (tx_sender, tx_receiver) = channel(100); + let (snoop_sender, _) = broadcast::channel(100); let task = service.launch().await?; let clients = Arc::new(Mutex::new(HashMap::new())); let feeds = Arc::new(Mutex::new(HashMap::new())); @@ -69,6 +84,7 @@ impl DaemonIdm { tx_receiver, tx_sender, tx_raw_sender, + snoop_sender, task, clients, feeds, @@ -79,9 +95,11 @@ impl DaemonIdm { let clients = self.clients.clone(); let feeds = self.feeds.clone(); let tx_sender = self.tx_sender.clone(); + let snoop_sender = self.snoop_sender.clone(); let task = tokio::task::spawn(async move { let mut buffers: HashMap = HashMap::new(); - if let Err(error) = self.process(&mut buffers).await { + + while let Err(error) = self.process(&mut buffers).await { error!("failed to process idm: {}", error); } }); @@ -89,6 +107,7 @@ impl DaemonIdm { clients, feeds, tx_sender, + snoop_sender, task: Arc::new(task), }) } @@ -116,8 +135,9 @@ impl DaemonIdm { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; let guard = self.feeds.lock().await; if let Some(feed) = guard.get(&domid) { - let _ = feed.try_send(packet); + let _ = feed.try_send(packet.clone()); } + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet }); } Err(packet) => { @@ -147,6 +167,7 @@ impl DaemonIdm { buffer[3] = (length << 24) as u8; buffer.extend_from_slice(&data); self.tx_raw_sender.send((domid, buffer)).await?; + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet }); }, None => { diff --git a/crates/krata/build.rs b/crates/krata/build.rs index 93a3900..ea44224 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -6,18 +6,12 @@ fn main() -> Result<()> { .descriptor_pool("crate::DESCRIPTOR_POOL") .configure( &mut config, - &[ - "proto/krata/v1/control.proto", - "proto/krata/internal/idm.proto", - ], + &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &["proto/"], )?; tonic_build::configure().compile_with_config( config, - &[ - "proto/krata/v1/control.proto", - "proto/krata/internal/idm.proto", - ], + &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &["proto/"], )?; Ok(()) diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/bus/idm.proto similarity index 93% rename from crates/krata/proto/krata/internal/idm.proto rename to crates/krata/proto/krata/bus/idm.proto index af088da..b8d2f00 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/bus/idm.proto @@ -1,9 +1,9 @@ syntax = "proto3"; -package krata.internal.idm; +package krata.bus.idm; option java_multiple_files = true; -option java_package = "dev.krata.proto.internal.idm"; +option java_package = "dev.krata.proto.bus.idm"; option java_outer_classname = "IdmProto"; import "google/protobuf/struct.proto"; diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 1dfaf0f..ca4adf0 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -6,6 +6,7 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.control"; option java_outer_classname = "ControlProto"; +import "krata/bus/idm.proto"; import "krata/v1/common.proto"; service ControlService { @@ -17,6 +18,7 @@ service ControlService { rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); + rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); } message CreateGuestRequest { @@ -110,3 +112,11 @@ message ReadGuestMetricsRequest { message ReadGuestMetricsReply { krata.v1.common.GuestMetricNode root = 1; } + +message SnoopIdmRequest {} + +message SnoopIdmReply { + uint32 from = 1; + uint32 to = 2; + krata.bus.idm.IdmPacket packet = 3; +} diff --git a/crates/krata/src/idm/protocol.rs b/crates/krata/src/bus/idm.rs similarity index 97% rename from crates/krata/src/idm/protocol.rs rename to crates/krata/src/bus/idm.rs index 10e70a7..9b02ab9 100644 --- a/crates/krata/src/idm/protocol.rs +++ b/crates/krata/src/bus/idm.rs @@ -1,6 +1,6 @@ use prost_types::{ListValue, Value}; -include!(concat!(env!("OUT_DIR"), "/krata.internal.idm.rs")); +include!(concat!(env!("OUT_DIR"), "/krata.bus.idm.rs")); pub trait AsIdmMetricValue { fn as_metric_value(&self) -> Value; diff --git a/crates/krata/src/bus/mod.rs b/crates/krata/src/bus/mod.rs new file mode 100644 index 0000000..b378e38 --- /dev/null +++ b/crates/krata/src/bus/mod.rs @@ -0,0 +1 @@ +pub mod idm; diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 3df72e6..c9ac714 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -8,10 +8,9 @@ use std::{ time::Duration, }; -use crate::idm::protocol::idm_packet::Content; - use super::protocol::{ - idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse, + idm_packet::Content, idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, + IdmRequest, IdmResponse, }; use anyhow::{anyhow, Result}; use log::{debug, error}; diff --git a/crates/krata/src/idm/mod.rs b/crates/krata/src/idm/mod.rs index 80399ba..2524a26 100644 --- a/crates/krata/src/idm/mod.rs +++ b/crates/krata/src/idm/mod.rs @@ -1,3 +1,3 @@ #[cfg(unix)] pub mod client; -pub mod protocol; +pub use crate::bus::idm as protocol; diff --git a/crates/krata/src/lib.rs b/crates/krata/src/lib.rs index fc97f33..7b5bb77 100644 --- a/crates/krata/src/lib.rs +++ b/crates/krata/src/lib.rs @@ -1,6 +1,7 @@ use once_cell::sync::Lazy; use prost_reflect::DescriptorPool; +pub mod bus; pub mod v1; pub mod client;