diff --git a/crates/ctl/src/cli/idm_snoop.rs b/crates/ctl/src/cli/idm_snoop.rs new file mode 100644 index 0000000..3a779a5 --- /dev/null +++ b/crates/ctl/src/cli/idm_snoop.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use clap::{Parser, ValueEnum}; +use krata::{ + events::EventStream, + v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest}, +}; + +use tokio_stream::StreamExt; +use tonic::transport::Channel; + +use crate::format::{kv2line, proto2dynamic, proto2kv}; + +#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] +enum IdmSnoopFormat { + Simple, + Jsonl, + KeyValue, +} + +#[derive(Parser)] +#[command(about = "Snoop on the IDM bus")] +pub struct IdmSnoopCommand { + #[arg(short, long, default_value = "simple", help = "Output format")] + format: IdmSnoopFormat, +} + +impl IdmSnoopCommand { + pub async fn run( + self, + mut client: ControlServiceClient, + _events: EventStream, + ) -> Result<()> { + let mut stream = client.snoop_idm(SnoopIdmRequest {}).await?.into_inner(); + + while let Some(reply) = stream.next().await { + let reply = reply?; + match self.format { + IdmSnoopFormat::Simple => { + self.print_simple(reply)?; + } + + IdmSnoopFormat::Jsonl => { + let value = serde_json::to_value(proto2dynamic(reply)?)?; + let encoded = serde_json::to_string(&value)?; + println!("{}", encoded.trim()); + } + + IdmSnoopFormat::KeyValue => { + self.print_key_value(reply)?; + } + } + } + + Ok(()) + } + + fn print_simple(&self, reply: SnoopIdmReply) -> Result<()> { + let from = reply.from; + let to = reply.to; + let Some(packet) = reply.packet else { + return Ok(()); + }; + let value = serde_json::to_value(proto2dynamic(packet)?)?; + let encoded = serde_json::to_string(&value)?; + println!("({} -> {}) {}", from, to, encoded); + Ok(()) + } + + fn print_key_value(&self, reply: SnoopIdmReply) -> Result<()> { + let kvs = proto2kv(reply)?; + println!("{}", kv2line(kvs)); + Ok(()) + } +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 654f959..1926567 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -1,5 +1,6 @@ pub mod attach; pub mod destroy; +pub mod idm_snoop; pub mod launch; pub mod list; pub mod logs; @@ -17,8 +18,9 @@ use krata::{ use tonic::{transport::Channel, Request}; use self::{ - attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand, - logs::LogsCommand, metrics::MetricsCommand, resolve::ResolveCommand, watch::WatchCommand, + attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, + launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, + resolve::ResolveCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -49,6 +51,7 @@ pub enum Commands { Watch(WatchCommand), Resolve(ResolveCommand), Metrics(MetricsCommand), + IdmSnoop(IdmSnoopCommand), } impl ControlCommand { @@ -88,6 +91,10 @@ impl ControlCommand { Commands::Metrics(metrics) => { metrics.run(client, events).await?; } + + Commands::IdmSnoop(snoop) => { + snoop.run(client, events).await?; + } } Ok(()) } diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 68d1b0e..596d11d 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -13,7 +13,8 @@ use krata::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest, - ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest, + ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest, + WatchEventsReply, WatchEventsRequest, }, }, }; @@ -88,6 +89,9 @@ impl ControlService for RuntimeControlService { type WatchEventsStream = Pin> + Send + 'static>>; + type SnoopIdmStream = + Pin> + Send + 'static>>; + async fn create_guest( &self, request: Request, @@ -346,4 +350,18 @@ impl ControlService for RuntimeControlService { }; Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) } + + async fn snoop_idm( + &self, + request: Request, + ) -> Result, Status> { + let _ = request.into_inner(); + let mut messages = self.idm.snoop(); + let output = try_stream! { + while let Ok(event) = messages.recv().await { + yield SnoopIdmReply { from: event.from, to: event.to, packet: Some(event.packet) }; + } + }; + Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream)) + } } diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index cc43a9f..7525def 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -15,6 +15,7 @@ use prost::Message; use tokio::{ select, sync::{ + broadcast, mpsc::{channel, Receiver, Sender}, Mutex, }, @@ -30,9 +31,14 @@ pub struct DaemonIdmHandle { feeds: BackendFeedMap, tx_sender: Sender<(u32, IdmPacket)>, task: Arc>, + snoop_sender: broadcast::Sender, } impl DaemonIdmHandle { + pub fn snoop(&self) -> broadcast::Receiver { + self.snoop_sender.subscribe() + } + pub async fn client(&self, domid: u32) -> Result { client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await } @@ -46,6 +52,13 @@ impl Drop for DaemonIdmHandle { } } +#[derive(Clone)] +pub struct DaemonIdmSnoopPacket { + pub from: u32, + pub to: u32, + pub packet: IdmPacket, +} + pub struct DaemonIdm { clients: ClientMap, feeds: BackendFeedMap, @@ -53,6 +66,7 @@ pub struct DaemonIdm { tx_raw_sender: Sender<(u32, Vec)>, tx_receiver: Receiver<(u32, IdmPacket)>, rx_receiver: Receiver<(u32, Option>)>, + snoop_sender: broadcast::Sender, task: JoinHandle<()>, } @@ -61,6 +75,7 @@ impl DaemonIdm { let (service, tx_raw_sender, rx_receiver) = ChannelService::new("krata-channel".to_string(), None).await?; let (tx_sender, tx_receiver) = channel(100); + let (snoop_sender, _) = broadcast::channel(100); let task = service.launch().await?; let clients = Arc::new(Mutex::new(HashMap::new())); let feeds = Arc::new(Mutex::new(HashMap::new())); @@ -69,6 +84,7 @@ impl DaemonIdm { tx_receiver, tx_sender, tx_raw_sender, + snoop_sender, task, clients, feeds, @@ -79,9 +95,11 @@ impl DaemonIdm { let clients = self.clients.clone(); let feeds = self.feeds.clone(); let tx_sender = self.tx_sender.clone(); + let snoop_sender = self.snoop_sender.clone(); let task = tokio::task::spawn(async move { let mut buffers: HashMap = HashMap::new(); - if let Err(error) = self.process(&mut buffers).await { + + while let Err(error) = self.process(&mut buffers).await { error!("failed to process idm: {}", error); } }); @@ -89,6 +107,7 @@ impl DaemonIdm { clients, feeds, tx_sender, + snoop_sender, task: Arc::new(task), }) } @@ -116,8 +135,9 @@ impl DaemonIdm { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; let guard = self.feeds.lock().await; if let Some(feed) = guard.get(&domid) { - let _ = feed.try_send(packet); + let _ = feed.try_send(packet.clone()); } + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet }); } Err(packet) => { @@ -147,6 +167,7 @@ impl DaemonIdm { buffer[3] = (length << 24) as u8; buffer.extend_from_slice(&data); self.tx_raw_sender.send((domid, buffer)).await?; + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet }); }, None => { diff --git a/crates/krata/build.rs b/crates/krata/build.rs index 93a3900..ea44224 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -6,18 +6,12 @@ fn main() -> Result<()> { .descriptor_pool("crate::DESCRIPTOR_POOL") .configure( &mut config, - &[ - "proto/krata/v1/control.proto", - "proto/krata/internal/idm.proto", - ], + &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &["proto/"], )?; tonic_build::configure().compile_with_config( config, - &[ - "proto/krata/v1/control.proto", - "proto/krata/internal/idm.proto", - ], + &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &["proto/"], )?; Ok(()) diff --git a/crates/krata/proto/krata/internal/idm.proto b/crates/krata/proto/krata/bus/idm.proto similarity index 93% rename from crates/krata/proto/krata/internal/idm.proto rename to crates/krata/proto/krata/bus/idm.proto index af088da..b8d2f00 100644 --- a/crates/krata/proto/krata/internal/idm.proto +++ b/crates/krata/proto/krata/bus/idm.proto @@ -1,9 +1,9 @@ syntax = "proto3"; -package krata.internal.idm; +package krata.bus.idm; option java_multiple_files = true; -option java_package = "dev.krata.proto.internal.idm"; +option java_package = "dev.krata.proto.bus.idm"; option java_outer_classname = "IdmProto"; import "google/protobuf/struct.proto"; diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index 1dfaf0f..ca4adf0 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -6,6 +6,7 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.control"; option java_outer_classname = "ControlProto"; +import "krata/bus/idm.proto"; import "krata/v1/common.proto"; service ControlService { @@ -17,6 +18,7 @@ service ControlService { rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply); + rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply); } message CreateGuestRequest { @@ -110,3 +112,11 @@ message ReadGuestMetricsRequest { message ReadGuestMetricsReply { krata.v1.common.GuestMetricNode root = 1; } + +message SnoopIdmRequest {} + +message SnoopIdmReply { + uint32 from = 1; + uint32 to = 2; + krata.bus.idm.IdmPacket packet = 3; +} diff --git a/crates/krata/src/idm/protocol.rs b/crates/krata/src/bus/idm.rs similarity index 97% rename from crates/krata/src/idm/protocol.rs rename to crates/krata/src/bus/idm.rs index 10e70a7..9b02ab9 100644 --- a/crates/krata/src/idm/protocol.rs +++ b/crates/krata/src/bus/idm.rs @@ -1,6 +1,6 @@ use prost_types::{ListValue, Value}; -include!(concat!(env!("OUT_DIR"), "/krata.internal.idm.rs")); +include!(concat!(env!("OUT_DIR"), "/krata.bus.idm.rs")); pub trait AsIdmMetricValue { fn as_metric_value(&self) -> Value; diff --git a/crates/krata/src/bus/mod.rs b/crates/krata/src/bus/mod.rs new file mode 100644 index 0000000..b378e38 --- /dev/null +++ b/crates/krata/src/bus/mod.rs @@ -0,0 +1 @@ +pub mod idm; diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 3df72e6..c9ac714 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -8,10 +8,9 @@ use std::{ time::Duration, }; -use crate::idm::protocol::idm_packet::Content; - use super::protocol::{ - idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse, + idm_packet::Content, idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, + IdmRequest, IdmResponse, }; use anyhow::{anyhow, Result}; use log::{debug, error}; diff --git a/crates/krata/src/idm/mod.rs b/crates/krata/src/idm/mod.rs index 80399ba..2524a26 100644 --- a/crates/krata/src/idm/mod.rs +++ b/crates/krata/src/idm/mod.rs @@ -1,3 +1,3 @@ #[cfg(unix)] pub mod client; -pub mod protocol; +pub use crate::bus::idm as protocol; diff --git a/crates/krata/src/lib.rs b/crates/krata/src/lib.rs index fc97f33..7b5bb77 100644 --- a/crates/krata/src/lib.rs +++ b/crates/krata/src/lib.rs @@ -1,6 +1,7 @@ use once_cell::sync::Lazy; use prost_reflect::DescriptorPool; +pub mod bus; pub mod v1; pub mod client;