From 4e0d843de756360dc24de43c6ea353e0a7b93c0d Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sun, 21 Apr 2024 06:29:32 +0000 Subject: [PATCH] feat: rebuild idm to separate transport from content --- Cargo.lock | 2 + Cargo.toml | 1 + crates/ctl/Cargo.toml | 2 + crates/ctl/src/cli/idm_snoop.rs | 102 +++++++++++-- crates/ctl/src/format.rs | 87 ++++++----- crates/daemon/src/control.rs | 12 +- crates/daemon/src/event.rs | 10 +- crates/daemon/src/idm.rs | 39 ++--- crates/daemon/src/metrics.rs | 14 +- crates/guest/src/background.rs | 46 +++--- crates/guest/src/init.rs | 21 ++- crates/guest/src/metrics.rs | 65 ++++---- crates/krata/build.rs | 12 +- crates/krata/proto/krata/bus/idm.proto | 67 --------- crates/krata/proto/krata/idm/internal.proto | 57 +++++++ crates/krata/proto/krata/idm/transport.proto | 22 +++ crates/krata/proto/krata/v1/control.proto | 4 +- crates/krata/src/bus/mod.rs | 1 - crates/krata/src/idm/client.rs | 139 ++++++++++-------- .../krata/src/{bus/idm.rs => idm/internal.rs} | 62 ++++++-- crates/krata/src/idm/mod.rs | 4 +- crates/krata/src/idm/serialize.rs | 10 ++ crates/krata/src/idm/transport.rs | 1 + crates/krata/src/lib.rs | 1 - 24 files changed, 483 insertions(+), 298 deletions(-) delete mode 100644 crates/krata/proto/krata/bus/idm.proto create mode 100644 crates/krata/proto/krata/idm/internal.proto create mode 100644 crates/krata/proto/krata/idm/transport.proto delete mode 100644 crates/krata/src/bus/mod.rs rename crates/krata/src/{bus/idm.rs => idm/internal.rs} (58%) create mode 100644 crates/krata/src/idm/serialize.rs create mode 100644 crates/krata/src/idm/transport.rs diff --git a/Cargo.lock b/Cargo.lock index cd93e87..1abffc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1412,6 +1412,7 @@ version = "0.0.9" dependencies = [ "anyhow", "async-stream", + "base64 0.22.0", "clap", "comfy-table", "crossterm", @@ -1425,6 +1426,7 @@ dependencies = [ "prost-reflect", "prost-types", "ratatui", + "serde", "serde_json", "serde_yaml", "termtree", diff --git a/Cargo.toml b/Cargo.toml index 856efa9..2df632a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-compression = "0.4.8" async-stream = "0.3.5" async-trait = "0.1.80" backhand = "0.15.0" +base64 = "0.22.0" byteorder = "1" bytes = "1.5.0" cgroups-rs = "0.3.4" diff --git a/crates/ctl/Cargo.toml b/crates/ctl/Cargo.toml index 968ce0e..06de33c 100644 --- a/crates/ctl/Cargo.toml +++ b/crates/ctl/Cargo.toml @@ -11,6 +11,7 @@ resolver = "2" [dependencies] anyhow = { workspace = true } async-stream = { workspace = true } +base64 = { workspace = true } clap = { workspace = true } comfy-table = { workspace = true } crossterm = { workspace = true, features = ["event-stream"] } @@ -24,6 +25,7 @@ log = { workspace = true } prost-reflect = { workspace = true, features = ["serde"] } prost-types = { workspace = true } ratatui = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } termtree = { workspace = true } diff --git a/crates/ctl/src/cli/idm_snoop.rs b/crates/ctl/src/cli/idm_snoop.rs index 3a779a5..3627b06 100644 --- a/crates/ctl/src/cli/idm_snoop.rs +++ b/crates/ctl/src/cli/idm_snoop.rs @@ -1,14 +1,18 @@ use anyhow::Result; +use base64::Engine; use clap::{Parser, ValueEnum}; use krata::{ events::EventStream, + idm::{internal, serialize::IdmSerializable, transport::IdmTransportPacketForm}, v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest}, }; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio_stream::StreamExt; use tonic::transport::Channel; -use crate::format::{kv2line, proto2dynamic, proto2kv}; +use crate::format::{kv2line, proto2dynamic, value2kv}; #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] enum IdmSnoopFormat { @@ -34,19 +38,22 @@ impl IdmSnoopCommand { while let Some(reply) = stream.next().await { let reply = reply?; + let Some(line) = convert_idm_snoop(reply) else { + continue; + }; + match self.format { IdmSnoopFormat::Simple => { - self.print_simple(reply)?; + self.print_simple(line)?; } IdmSnoopFormat::Jsonl => { - let value = serde_json::to_value(proto2dynamic(reply)?)?; - let encoded = serde_json::to_string(&value)?; + let encoded = serde_json::to_string(&line)?; println!("{}", encoded.trim()); } IdmSnoopFormat::KeyValue => { - self.print_key_value(reply)?; + self.print_key_value(line)?; } } } @@ -54,21 +61,86 @@ impl IdmSnoopCommand { Ok(()) } - fn print_simple(&self, reply: SnoopIdmReply) -> Result<()> { - let from = reply.from; - let to = reply.to; - let Some(packet) = reply.packet else { - return Ok(()); + fn print_simple(&self, line: IdmSnoopLine) -> Result<()> { + let encoded = if !line.packet.decoded.is_null() { + serde_json::to_string(&line.packet.decoded)? + } else { + base64::prelude::BASE64_STANDARD.encode(&line.packet.data) }; - let value = serde_json::to_value(proto2dynamic(packet)?)?; - let encoded = serde_json::to_string(&value)?; - println!("({} -> {}) {}", from, to, encoded); + println!( + "({} -> {}) {} {} {}", + line.from, line.to, line.packet.id, line.packet.form, encoded + ); Ok(()) } - fn print_key_value(&self, reply: SnoopIdmReply) -> Result<()> { - let kvs = proto2kv(reply)?; + fn print_key_value(&self, line: IdmSnoopLine) -> Result<()> { + let kvs = value2kv(serde_json::to_value(line)?)?; println!("{}", kv2line(kvs)); Ok(()) } } + +#[derive(Serialize, Deserialize)] +pub struct IdmSnoopLine { + pub from: u32, + pub to: u32, + pub packet: IdmSnoopData, +} + +#[derive(Serialize, Deserialize)] +pub struct IdmSnoopData { + pub id: u64, + pub channel: u64, + pub form: String, + pub data: String, + pub decoded: Value, +} + +pub fn convert_idm_snoop(reply: SnoopIdmReply) -> Option { + let packet = &(reply.packet?); + + let decoded = if packet.channel == 0 { + match packet.form() { + IdmTransportPacketForm::Event => internal::Event::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + IdmTransportPacketForm::Request => internal::Request::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + IdmTransportPacketForm::Response => internal::Response::decode(&packet.data) + .ok() + .and_then(|event| proto2dynamic(event).ok()), + + _ => None, + } + } else { + None + }; + + let decoded = decoded + .and_then(|message| serde_json::to_value(message).ok()) + .unwrap_or(Value::Null); + + let data = IdmSnoopData { + id: packet.id, + channel: packet.channel, + form: match packet.form() { + IdmTransportPacketForm::Raw => "raw".to_string(), + IdmTransportPacketForm::Event => "event".to_string(), + IdmTransportPacketForm::Request => "request".to_string(), + IdmTransportPacketForm::Response => "response".to_string(), + _ => format!("unknown-{}", packet.form), + }, + data: base64::prelude::BASE64_STANDARD.encode(&packet.data), + decoded, + }; + + Some(IdmSnoopLine { + from: reply.from, + to: reply.to, + packet: data, + }) +} diff --git a/crates/ctl/src/format.rs b/crates/ctl/src/format.rs index 9f7a2f4..5e9f232 100644 --- a/crates/ctl/src/format.rs +++ b/crates/ctl/src/format.rs @@ -4,7 +4,7 @@ use anyhow::Result; use fancy_duration::FancyDuration; use human_bytes::human_bytes; use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus}; -use prost_reflect::{DynamicMessage, FieldDescriptor, ReflectMessage, Value as ReflectValue}; +use prost_reflect::{DynamicMessage, ReflectMessage}; use prost_types::Value; use termtree::Tree; @@ -15,64 +15,59 @@ pub fn proto2dynamic(proto: impl ReflectMessage) -> Result { )?) } -pub fn proto2kv(proto: impl ReflectMessage) -> Result> { - let message = proto2dynamic(proto)?; +pub fn value2kv(value: serde_json::Value) -> Result> { let mut map = HashMap::new(); + fn crawl(prefix: String, map: &mut HashMap, value: serde_json::Value) { + fn dot(prefix: &str, next: String) -> String { + if prefix.is_empty() { + next.to_string() + } else { + format!("{}.{}", prefix, next) + } + } - fn crawl( - prefix: String, - field: Option<&FieldDescriptor>, - map: &mut HashMap, - value: &ReflectValue, - ) { match value { - ReflectValue::Message(child) => { - for (field, field_value) in child.fields() { - let path = if prefix.is_empty() { - field.json_name().to_string() - } else { - format!("{}.{}", prefix, field.json_name()) - }; - crawl(path, Some(&field), map, field_value); + serde_json::Value::Null => { + map.insert(prefix, "null".to_string()); + } + + serde_json::Value::String(value) => { + map.insert(prefix, value); + } + + serde_json::Value::Bool(value) => { + map.insert(prefix, value.to_string()); + } + + serde_json::Value::Number(value) => { + map.insert(prefix, value.to_string()); + } + + serde_json::Value::Array(value) => { + for (i, item) in value.into_iter().enumerate() { + let next = dot(&prefix, i.to_string()); + crawl(next, map, item); } } - ReflectValue::EnumNumber(number) => { - if let Some(kind) = field.map(|x| x.kind()) { - if let Some(e) = kind.as_enum() { - if let Some(value) = e.get_value(*number) { - map.insert(prefix, value.name().to_string()); - } - } + serde_json::Value::Object(value) => { + for (key, item) in value { + let next = dot(&prefix, key); + crawl(next, map, item); } } - - ReflectValue::String(value) => { - map.insert(prefix.to_string(), value.clone()); - } - - ReflectValue::List(value) => { - for (x, value) in value.iter().enumerate() { - crawl(format!("{}.{}", prefix, x), field, map, value); - } - } - - _ => { - map.insert(prefix.to_string(), value.to_string()); - } } } - - crawl( - "".to_string(), - None, - &mut map, - &ReflectValue::Message(message), - ); - + crawl("".to_string(), &mut map, value); Ok(map) } +pub fn proto2kv(proto: impl ReflectMessage) -> Result> { + let message = proto2dynamic(proto)?; + let value = serde_json::to_value(message)?; + value2kv(value) +} + pub fn kv2line(map: HashMap) -> String { map.iter() .map(|(k, v)| format!("{}=\"{}\"", k, v.replace('"', "\\\""))) diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index a6bb54d..4dd9d2e 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -1,9 +1,9 @@ use async_stream::try_stream; use futures::Stream; use krata::{ - idm::protocol::{ - idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType, - IdmMetricsRequest, + idm::internal::{ + request::Request as IdmRequestType, response::Response as IdmResponseType, MetricsRequest, + Request as IdmRequest, }, v1::{ common::{Guest, GuestState, GuestStatus, OciImageFormat}, @@ -340,14 +340,16 @@ impl ControlService for DaemonControlService { })?; let response = client - .send(IdmRequestType::Metrics(IdmMetricsRequest {})) + .send(IdmRequest { + request: Some(IdmRequestType::Metrics(MetricsRequest {})), + }) .await .map_err(|error| ApiError { message: error.to_string(), })?; let mut reply = ReadGuestMetricsReply::default(); - if let IdmResponseType::Metrics(metrics) = response { + if let Some(IdmResponseType::Metrics(metrics)) = response.response { reply.root = metrics.root.map(idm_metric_to_api); } Ok(Response::new(reply)) diff --git a/crates/daemon/src/event.rs b/crates/daemon/src/event.rs index 9d21ff1..49556bc 100644 --- a/crates/daemon/src/event.rs +++ b/crates/daemon/src/event.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use krata::{ - idm::protocol::{idm_event::Event, IdmEvent}, + idm::{internal::event::Event as EventType, internal::Event}, v1::common::{GuestExitInfo, GuestState, GuestStatus}, }; use log::{error, warn}; @@ -50,8 +50,8 @@ pub struct DaemonEventGenerator { feed: broadcast::Receiver, idm: DaemonIdmHandle, idms: HashMap)>, - idm_sender: Sender<(u32, IdmEvent)>, - idm_receiver: Receiver<(u32, IdmEvent)>, + idm_sender: Sender<(u32, Event)>, + idm_receiver: Receiver<(u32, Event)>, _event_sender: broadcast::Sender, } @@ -122,9 +122,9 @@ impl DaemonEventGenerator { Ok(()) } - async fn handle_idm_event(&mut self, id: Uuid, event: IdmEvent) -> Result<()> { + async fn handle_idm_event(&mut self, id: Uuid, event: Event) -> Result<()> { match event.event { - Some(Event::Exit(exit)) => self.handle_exit_code(id, exit.code).await, + Some(EventType::Exit(exit)) => self.handle_exit_code(id, exit.code).await, None => Ok(()), } } diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index ee788b9..aecbc43 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -6,8 +6,9 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::{Buf, BytesMut}; use krata::idm::{ - client::{IdmBackend, IdmClient}, - protocol::IdmPacket, + client::{IdmBackend, IdmInternalClient}, + internal::INTERNAL_IDM_CHANNEL, + transport::IdmTransportPacket, }; use kratart::channel::ChannelService; use log::{error, warn}; @@ -22,14 +23,14 @@ use tokio::{ task::JoinHandle, }; -type BackendFeedMap = Arc>>>; -type ClientMap = Arc>>; +type BackendFeedMap = Arc>>>; +type ClientMap = Arc>>; #[derive(Clone)] pub struct DaemonIdmHandle { clients: ClientMap, feeds: BackendFeedMap, - tx_sender: Sender<(u32, IdmPacket)>, + tx_sender: Sender<(u32, IdmTransportPacket)>, task: Arc>, snoop_sender: broadcast::Sender, } @@ -39,7 +40,7 @@ impl DaemonIdmHandle { self.snoop_sender.subscribe() } - pub async fn client(&self, domid: u32) -> Result { + pub async fn client(&self, domid: u32) -> Result { client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await } } @@ -56,15 +57,15 @@ impl Drop for DaemonIdmHandle { pub struct DaemonIdmSnoopPacket { pub from: u32, pub to: u32, - pub packet: IdmPacket, + pub packet: IdmTransportPacket, } pub struct DaemonIdm { clients: ClientMap, feeds: BackendFeedMap, - tx_sender: Sender<(u32, IdmPacket)>, + tx_sender: Sender<(u32, IdmTransportPacket)>, tx_raw_sender: Sender<(u32, Vec)>, - tx_receiver: Receiver<(u32, IdmPacket)>, + tx_receiver: Receiver<(u32, IdmTransportPacket)>, rx_receiver: Receiver<(u32, Option>)>, snoop_sender: broadcast::Sender, task: JoinHandle<()>, @@ -136,7 +137,7 @@ impl DaemonIdm { } let mut packet = buffer.split_to(needed); packet.advance(6); - match IdmPacket::decode(packet) { + match IdmTransportPacket::decode(packet) { Ok(packet) => { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; let guard = self.feeds.lock().await; @@ -196,10 +197,10 @@ impl Drop for DaemonIdm { async fn client_or_create( domid: u32, - tx_sender: &Sender<(u32, IdmPacket)>, + tx_sender: &Sender<(u32, IdmTransportPacket)>, clients: &ClientMap, feeds: &BackendFeedMap, -) -> Result { +) -> Result { let mut clients = clients.lock().await; let mut feeds = feeds.lock().await; match clients.entry(domid) { @@ -212,7 +213,11 @@ async fn client_or_create( rx_receiver, tx_sender: tx_sender.clone(), }; - let client = IdmClient::new(Box::new(backend) as Box).await?; + let client = IdmInternalClient::new( + INTERNAL_IDM_CHANNEL, + Box::new(backend) as Box, + ) + .await?; entry.insert(client.clone()); Ok(client) } @@ -221,13 +226,13 @@ async fn client_or_create( pub struct IdmDaemonBackend { domid: u32, - rx_receiver: Receiver, - tx_sender: Sender<(u32, IdmPacket)>, + rx_receiver: Receiver, + tx_sender: Sender<(u32, IdmTransportPacket)>, } #[async_trait::async_trait] impl IdmBackend for IdmDaemonBackend { - async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { if let Some(packet) = self.rx_receiver.recv().await { Ok(packet) } else { @@ -235,7 +240,7 @@ impl IdmBackend for IdmDaemonBackend { } } - async fn send(&mut self, packet: IdmPacket) -> Result<()> { + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> { self.tx_sender.send((self.domid, packet)).await?; Ok(()) } diff --git a/crates/daemon/src/metrics.rs b/crates/daemon/src/metrics.rs index a273dd0..abeae15 100644 --- a/crates/daemon/src/metrics.rs +++ b/crates/daemon/src/metrics.rs @@ -1,18 +1,18 @@ use krata::{ - idm::protocol::{IdmMetricFormat, IdmMetricNode}, + idm::internal::{MetricFormat, MetricNode}, v1::common::{GuestMetricFormat, GuestMetricNode}, }; -fn idm_metric_format_to_api(format: IdmMetricFormat) -> GuestMetricFormat { +fn idm_metric_format_to_api(format: MetricFormat) -> GuestMetricFormat { match format { - IdmMetricFormat::Unknown => GuestMetricFormat::Unknown, - IdmMetricFormat::Bytes => GuestMetricFormat::Bytes, - IdmMetricFormat::Integer => GuestMetricFormat::Integer, - IdmMetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, + MetricFormat::Unknown => GuestMetricFormat::Unknown, + MetricFormat::Bytes => GuestMetricFormat::Bytes, + MetricFormat::Integer => GuestMetricFormat::Integer, + MetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, } } -pub fn idm_metric_to_api(node: IdmMetricNode) -> GuestMetricNode { +pub fn idm_metric_to_api(node: MetricNode) -> GuestMetricNode { let format = node.format(); GuestMetricNode { name: node.name, diff --git a/crates/guest/src/background.rs b/crates/guest/src/background.rs index a2c2929..d981343 100644 --- a/crates/guest/src/background.rs +++ b/crates/guest/src/background.rs @@ -6,10 +6,11 @@ use crate::{ use anyhow::Result; use cgroups_rs::Cgroup; use krata::idm::{ - client::IdmClient, - protocol::{ - idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent, - IdmMetricsResponse, IdmPingResponse, IdmRequest, + client::IdmInternalClient, + internal::{ + event::Event as EventType, request::Request as RequestType, + response::Response as ResponseType, Event, ExitEvent, MetricsResponse, PingResponse, + Request, Response, }, }; use log::debug; @@ -17,14 +18,18 @@ use nix::unistd::Pid; use tokio::{select, sync::broadcast}; pub struct GuestBackground { - idm: IdmClient, + idm: IdmInternalClient, child: Pid, _cgroup: Cgroup, wait: ChildWait, } impl GuestBackground { - pub async fn new(idm: IdmClient, cgroup: Cgroup, child: Pid) -> Result { + pub async fn new( + idm: IdmInternalClient, + cgroup: Cgroup, + child: Pid, + ) -> Result { Ok(GuestBackground { idm, child, @@ -54,8 +59,8 @@ impl GuestBackground { }, x = requests_subscription.recv() => match x { - Ok(request) => { - self.handle_idm_request(request).await?; + Ok((id, request)) => { + self.handle_idm_request(id, request).await?; }, Err(broadcast::error::RecvError::Closed) => { @@ -79,22 +84,27 @@ impl GuestBackground { Ok(()) } - async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> { - let id = packet.id; - + async fn handle_idm_request(&mut self, id: u64, packet: Request) -> Result<()> { match packet.request { - Some(Request::Ping(_)) => { + Some(RequestType::Ping(_)) => { self.idm - .respond(id, Response::Ping(IdmPingResponse {})) + .respond( + id, + Response { + response: Some(ResponseType::Ping(PingResponse {})), + }, + ) .await?; } - Some(Request::Metrics(_)) => { + Some(RequestType::Metrics(_)) => { let metrics = MetricsCollector::new()?; let root = metrics.collect()?; - let response = IdmMetricsResponse { root: Some(root) }; + let response = Response { + response: Some(ResponseType::Metrics(MetricsResponse { root: Some(root) })), + }; - self.idm.respond(id, Response::Metrics(response)).await?; + self.idm.respond(id, response).await?; } None => {} @@ -105,8 +115,8 @@ impl GuestBackground { async fn child_event(&mut self, event: ChildEvent) -> Result<()> { if event.pid == self.child { self.idm - .emit(IdmEvent { - event: Some(Event::Exit(IdmExitEvent { code: event.status })), + .emit(Event { + event: Some(EventType::Exit(ExitEvent { code: event.status })), }) .await?; death(event.status).await?; diff --git a/crates/guest/src/init.rs b/crates/guest/src/init.rs index f657015..1f1189f 100644 --- a/crates/guest/src/init.rs +++ b/crates/guest/src/init.rs @@ -3,7 +3,8 @@ use cgroups_rs::{Cgroup, CgroupPid}; use futures::stream::TryStreamExt; use ipnetwork::IpNetwork; use krata::ethtool::EthtoolHandle; -use krata::idm::client::IdmClient; +use krata::idm::client::IdmInternalClient; +use krata::idm::internal::INTERNAL_IDM_CHANNEL; use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat}; use libc::{sethostname, setsid, TIOCSCTTY}; use log::{trace, warn}; @@ -77,7 +78,7 @@ impl GuestInit { Err(error) => warn!("failed to open console: {}", error), }; - let idm = IdmClient::open("/dev/hvc1") + let idm = IdmInternalClient::open(INTERNAL_IDM_CHANNEL, "/dev/hvc1") .await .map_err(|x| anyhow!("failed to open idm client: {}", x))?; self.mount_config_image().await?; @@ -438,7 +439,12 @@ impl GuestInit { Ok(()) } - async fn run(&mut self, config: &Config, launch: &LaunchInfo, idm: IdmClient) -> Result<()> { + async fn run( + &mut self, + config: &Config, + launch: &LaunchInfo, + idm: IdmInternalClient, + ) -> Result<()> { let mut cmd = match config.cmd() { None => vec![], Some(value) => value.clone(), @@ -560,7 +566,7 @@ impl GuestInit { async fn fork_and_exec( &mut self, - idm: IdmClient, + idm: IdmInternalClient, cgroup: Cgroup, working_dir: String, path: CString, @@ -596,7 +602,12 @@ impl GuestInit { Ok(()) } - async fn background(&mut self, idm: IdmClient, cgroup: Cgroup, executed: Pid) -> Result<()> { + async fn background( + &mut self, + idm: IdmInternalClient, + cgroup: Cgroup, + executed: Pid, + ) -> Result<()> { let mut background = GuestBackground::new(idm, cgroup, executed).await?; background.run().await?; Ok(()) diff --git a/crates/guest/src/metrics.rs b/crates/guest/src/metrics.rs index f9ec377..5f97a2d 100644 --- a/crates/guest/src/metrics.rs +++ b/crates/guest/src/metrics.rs @@ -1,7 +1,7 @@ use std::{ops::Add, path::Path}; use anyhow::Result; -use krata::idm::protocol::{IdmMetricFormat, IdmMetricNode}; +use krata::idm::internal::{MetricFormat, MetricNode}; use sysinfo::Process; pub struct MetricsCollector {} @@ -11,9 +11,9 @@ impl MetricsCollector { Ok(MetricsCollector {}) } - pub fn collect(&self) -> Result { + pub fn collect(&self) -> Result { let mut sysinfo = sysinfo::System::new(); - Ok(IdmMetricNode::structural( + Ok(MetricNode::structural( "guest", vec![ self.collect_system(&mut sysinfo)?, @@ -22,22 +22,22 @@ impl MetricsCollector { )) } - fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { + fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result { sysinfo.refresh_memory(); - Ok(IdmMetricNode::structural( + Ok(MetricNode::structural( "system", - vec![IdmMetricNode::structural( + vec![MetricNode::structural( "memory", vec![ - IdmMetricNode::value("total", sysinfo.total_memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("used", sysinfo.used_memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("free", sysinfo.free_memory(), IdmMetricFormat::Bytes), + MetricNode::value("total", sysinfo.total_memory(), MetricFormat::Bytes), + MetricNode::value("used", sysinfo.used_memory(), MetricFormat::Bytes), + MetricNode::value("free", sysinfo.free_memory(), MetricFormat::Bytes), ], )], )) } - fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { + fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result { sysinfo.refresh_processes(); let mut processes = Vec::new(); let mut sysinfo_processes = sysinfo.processes().values().collect::>(); @@ -48,71 +48,68 @@ impl MetricsCollector { } processes.push(MetricsCollector::process_node(process)?); } - Ok(IdmMetricNode::structural("process", processes)) + Ok(MetricNode::structural("process", processes)) } - fn process_node(process: &Process) -> Result { + fn process_node(process: &Process) -> Result { let mut metrics = vec![]; if let Some(parent) = process.parent() { - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "parent", parent.as_u32() as u64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); } if let Some(exe) = process.exe().and_then(path_as_str) { - metrics.push(IdmMetricNode::raw_value("executable", exe)); + metrics.push(MetricNode::raw_value("executable", exe)); } if let Some(working_directory) = process.cwd().and_then(path_as_str) { - metrics.push(IdmMetricNode::raw_value("cwd", working_directory)); + metrics.push(MetricNode::raw_value("cwd", working_directory)); } let cmdline = process.cmd().to_vec(); - metrics.push(IdmMetricNode::raw_value("cmdline", cmdline)); - metrics.push(IdmMetricNode::structural( + metrics.push(MetricNode::raw_value("cmdline", cmdline)); + metrics.push(MetricNode::structural( "memory", vec![ - IdmMetricNode::value("resident", process.memory(), IdmMetricFormat::Bytes), - IdmMetricNode::value("virtual", process.virtual_memory(), IdmMetricFormat::Bytes), + MetricNode::value("resident", process.memory(), MetricFormat::Bytes), + MetricNode::value("virtual", process.virtual_memory(), MetricFormat::Bytes), ], )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "lifetime", process.run_time(), - IdmMetricFormat::DurationSeconds, + MetricFormat::DurationSeconds, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "uid", process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "gid", process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "euid", process .effective_user_id() .map(|x| (*x).add(0)) .unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - metrics.push(IdmMetricNode::value( + metrics.push(MetricNode::value( "egid", process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64, - IdmMetricFormat::Integer, + MetricFormat::Integer, )); - Ok(IdmMetricNode::structural( - process.pid().to_string(), - metrics, - )) + Ok(MetricNode::structural(process.pid().to_string(), metrics)) } } diff --git a/crates/krata/build.rs b/crates/krata/build.rs index ea44224..adffbbc 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -6,12 +6,20 @@ fn main() -> Result<()> { .descriptor_pool("crate::DESCRIPTOR_POOL") .configure( &mut config, - &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], + &[ + "proto/krata/v1/control.proto", + "proto/krata/idm/transport.proto", + "proto/krata/idm/internal.proto", + ], &["proto/"], )?; tonic_build::configure().compile_with_config( config, - &["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], + &[ + "proto/krata/v1/control.proto", + "proto/krata/idm/transport.proto", + "proto/krata/idm/internal.proto", + ], &["proto/"], )?; Ok(()) diff --git a/crates/krata/proto/krata/bus/idm.proto b/crates/krata/proto/krata/bus/idm.proto deleted file mode 100644 index b8d2f00..0000000 --- a/crates/krata/proto/krata/bus/idm.proto +++ /dev/null @@ -1,67 +0,0 @@ -syntax = "proto3"; - -package krata.bus.idm; - -option java_multiple_files = true; -option java_package = "dev.krata.proto.bus.idm"; -option java_outer_classname = "IdmProto"; - -import "google/protobuf/struct.proto"; - -message IdmPacket { - oneof content { - IdmEvent event = 1; - IdmRequest request = 2; - IdmResponse response = 3; - } -} - -message IdmEvent { - oneof event { - IdmExitEvent exit = 1; - } -} - -message IdmExitEvent { - int32 code = 1; -} - -message IdmRequest { - uint64 id = 1; - oneof request { - IdmPingRequest ping = 2; - IdmMetricsRequest metrics = 3; - } -} - -message IdmPingRequest {} - -message IdmMetricsRequest {} - -message IdmResponse { - uint64 id = 1; - oneof response { - IdmPingResponse ping = 2; - IdmMetricsResponse metrics = 3; - } -} - -message IdmPingResponse {} - -message IdmMetricsResponse { - IdmMetricNode root = 1; -} - -message IdmMetricNode { - string name = 1; - google.protobuf.Value value = 2; - IdmMetricFormat format = 3; - repeated IdmMetricNode children = 4; -} - -enum IdmMetricFormat { - IDM_METRIC_FORMAT_UNKNOWN = 0; - IDM_METRIC_FORMAT_BYTES = 1; - IDM_METRIC_FORMAT_INTEGER = 2; - IDM_METRIC_FORMAT_DURATION_SECONDS = 3; -} diff --git a/crates/krata/proto/krata/idm/internal.proto b/crates/krata/proto/krata/idm/internal.proto new file mode 100644 index 0000000..634643f --- /dev/null +++ b/crates/krata/proto/krata/idm/internal.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package krata.idm.internal; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.idm.internal"; +option java_outer_classname = "IdmInternalProto"; + +import "google/protobuf/struct.proto"; + +message ExitEvent { + int32 code = 1; +} + +message PingRequest {} + +message PingResponse {} + +message MetricsRequest {} + +message MetricsResponse { + MetricNode root = 1; +} + +message MetricNode { + string name = 1; + google.protobuf.Value value = 2; + MetricFormat format = 3; + repeated MetricNode children = 4; +} + +enum MetricFormat { + METRIC_FORMAT_UNKNOWN = 0; + METRIC_FORMAT_BYTES = 1; + METRIC_FORMAT_INTEGER = 2; + METRIC_FORMAT_DURATION_SECONDS = 3; +} + +message Event { + oneof event { + ExitEvent exit = 1; + } +} + +message Request { + oneof request { + PingRequest ping = 1; + MetricsRequest metrics = 2; + } +} + +message Response { + oneof response { + PingResponse ping = 1; + MetricsResponse metrics = 2; + } +} diff --git a/crates/krata/proto/krata/idm/transport.proto b/crates/krata/proto/krata/idm/transport.proto new file mode 100644 index 0000000..37d9283 --- /dev/null +++ b/crates/krata/proto/krata/idm/transport.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package krata.idm.transport; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.idm.transport"; +option java_outer_classname = "IdmTransportProto"; + +message IdmTransportPacket { + uint64 id = 1; + uint64 channel = 2; + IdmTransportPacketForm form = 3; + bytes data = 4; +} + +enum IdmTransportPacketForm { + IDM_TRANSPORT_PACKET_FORM_UNKNOWN = 0; + IDM_TRANSPORT_PACKET_FORM_RAW = 1; + IDM_TRANSPORT_PACKET_FORM_EVENT = 2; + IDM_TRANSPORT_PACKET_FORM_REQUEST = 3; + IDM_TRANSPORT_PACKET_FORM_RESPONSE = 4; +} diff --git a/crates/krata/proto/krata/v1/control.proto b/crates/krata/proto/krata/v1/control.proto index a28f660..0c0b674 100644 --- a/crates/krata/proto/krata/v1/control.proto +++ b/crates/krata/proto/krata/v1/control.proto @@ -6,7 +6,7 @@ option java_multiple_files = true; option java_package = "dev.krata.proto.v1.control"; option java_outer_classname = "ControlProto"; -import "krata/bus/idm.proto"; +import "krata/idm/transport.proto"; import "krata/v1/common.proto"; service ControlService { @@ -86,7 +86,7 @@ message SnoopIdmRequest {} message SnoopIdmReply { uint32 from = 1; uint32 to = 2; - krata.bus.idm.IdmPacket packet = 3; + krata.idm.transport.IdmTransportPacket packet = 3; } message ImageProgress { diff --git a/crates/krata/src/bus/mod.rs b/crates/krata/src/bus/mod.rs deleted file mode 100644 index b378e38..0000000 --- a/crates/krata/src/bus/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod idm; diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 3c0181b..f6a740a 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -8,10 +8,6 @@ use std::{ time::Duration, }; -use super::protocol::{ - idm_packet::Content, idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, - IdmRequest, IdmResponse, -}; use anyhow::{anyhow, Result}; use log::{debug, error}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; @@ -22,14 +18,21 @@ use tokio::{ select, sync::{ broadcast, - mpsc::{channel, Receiver, Sender}, + mpsc::{self, Receiver, Sender}, oneshot, Mutex, }, task::JoinHandle, time::timeout, }; -type RequestMap = Arc>>>; +use super::{ + internal, + serialize::{IdmRequest, IdmSerializable}, + transport::{IdmTransportPacket, IdmTransportPacketForm}, +}; + +type RequestMap = Arc::Response>>>>; +pub type IdmInternalClient = IdmClient; const IDM_PACKET_QUEUE_LEN: usize = 100; const IDM_REQUEST_TIMEOUT_SECS: u64 = 10; @@ -37,8 +40,8 @@ const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024; #[async_trait::async_trait] pub trait IdmBackend: Send { - async fn recv(&mut self) -> Result; - async fn send(&mut self, packet: IdmPacket) -> Result<()>; + async fn recv(&mut self) -> Result; + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()>; } pub struct IdmFileBackend { @@ -66,30 +69,30 @@ impl IdmFileBackend { #[async_trait::async_trait] impl IdmBackend for IdmFileBackend { - async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { let mut fd = self.read_fd.lock().await; let mut guard = fd.readable_mut().await?; let b1 = guard.get_inner_mut().read_u8().await?; if b1 != 0xff { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let b2 = guard.get_inner_mut().read_u8().await?; if b2 != 0xff { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let size = guard.get_inner_mut().read_u32_le().await?; if size == 0 { - return Ok(IdmPacket::default()); + return Ok(IdmTransportPacket::default()); } let mut buffer = vec![0u8; size as usize]; guard.get_inner_mut().read_exact(&mut buffer).await?; - match IdmPacket::decode(buffer.as_slice()) { + match IdmTransportPacket::decode(buffer.as_slice()) { Ok(packet) => Ok(packet), Err(error) => Err(anyhow!("received invalid idm packet: {}", error)), } } - async fn send(&mut self, packet: IdmPacket) -> Result<()> { + async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> { let mut file = self.write.lock().await; let data = packet.encode_to_vec(); file.write_all(&[0xff, 0xff]).await?; @@ -100,16 +103,17 @@ impl IdmBackend for IdmFileBackend { } #[derive(Clone)] -pub struct IdmClient { - request_backend_sender: broadcast::Sender, +pub struct IdmClient { + channel: u64, + request_backend_sender: broadcast::Sender<(u64, R)>, next_request_id: Arc>, - event_receiver_sender: broadcast::Sender, - tx_sender: Sender, - requests: RequestMap, + event_receiver_sender: broadcast::Sender, + tx_sender: Sender, + requests: RequestMap, task: Arc>, } -impl Drop for IdmClient { +impl Drop for IdmClient { fn drop(&mut self) { if Arc::strong_count(&self.task) <= 1 { self.task.abort(); @@ -117,12 +121,12 @@ impl Drop for IdmClient { } } -impl IdmClient { - pub async fn new(backend: Box) -> Result { +impl IdmClient { + pub async fn new(channel: u64, backend: Box) -> Result { let requests = Arc::new(Mutex::new(HashMap::new())); let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN); let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN); - let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); + let (tx_sender, tx_receiver) = mpsc::channel(IDM_PACKET_QUEUE_LEN); let backend_event_sender = event_sender.clone(); let request_backend_sender = internal_request_backend_sender.clone(); let requests_for_client = requests.clone(); @@ -141,6 +145,7 @@ impl IdmClient { } }); Ok(IdmClient { + channel, next_request_id: Arc::new(Mutex::new(0)), event_receiver_sender: event_sender.clone(), request_backend_sender, @@ -150,7 +155,7 @@ impl IdmClient { }) } - pub async fn open>(path: P) -> Result { + pub async fn open>(channel: u64, path: P) -> Result { let read_file = File::options() .read(true) .write(false) @@ -164,39 +169,48 @@ impl IdmClient { .open(path) .await?; let backend = IdmFileBackend::new(read_file, write_file).await?; - IdmClient::new(Box::new(backend) as Box).await + IdmClient::new(channel, Box::new(backend) as Box).await } - pub async fn emit(&self, event: IdmEvent) -> Result<()> { + pub async fn emit(&self, event: T) -> Result<()> { + let id = { + let mut guard = self.next_request_id.lock().await; + let req = *guard; + *guard = req.wrapping_add(1); + req + }; self.tx_sender - .send(IdmPacket { - content: Some(Content::Event(event)), + .send(IdmTransportPacket { + id, + form: IdmTransportPacketForm::Event.into(), + channel: self.channel, + data: event.encode()?, }) .await?; Ok(()) } - pub async fn requests(&self) -> Result> { + pub async fn requests(&self) -> Result> { Ok(self.request_backend_sender.subscribe()) } - pub async fn respond(&self, id: u64, response: Response) -> Result<()> { - let packet = IdmPacket { - content: Some(Content::Response(IdmResponse { - id, - response: Some(response), - })), + pub async fn respond(&self, id: u64, response: T) -> Result<()> { + let packet = IdmTransportPacket { + id, + form: IdmTransportPacketForm::Response.into(), + channel: self.channel, + data: response.encode()?, }; self.tx_sender.send(packet).await?; Ok(()) } - pub async fn subscribe(&self) -> Result> { + pub async fn subscribe(&self) -> Result> { Ok(self.event_receiver_sender.subscribe()) } - pub async fn send(&self, request: Request) -> Result { - let (sender, receiver) = oneshot::channel::(); + pub async fn send(&self, request: R) -> Result { + let (sender, receiver) = oneshot::channel::(); let req = { let mut guard = self.next_request_id.lock().await; let req = *guard; @@ -217,49 +231,52 @@ impl IdmClient { }); }); self.tx_sender - .send(IdmPacket { - content: Some(Content::Request(IdmRequest { - id: req, - request: Some(request), - })), + .send(IdmTransportPacket { + id: req, + channel: self.channel, + form: IdmTransportPacketForm::Request.into(), + data: request.encode()?, }) .await?; let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??; success.store(true, Ordering::Release); - if let Some(response) = response.response { - Ok(response) - } else { - Err(anyhow!("response did not contain any content")) - } + Ok(response) } async fn process( mut backend: Box, - event_sender: broadcast::Sender, - requests: RequestMap, - request_backend_sender: broadcast::Sender, - _event_receiver: broadcast::Receiver, - mut receiver: Receiver, + event_sender: broadcast::Sender, + requests: RequestMap, + request_backend_sender: broadcast::Sender<(u64, R)>, + _event_receiver: broadcast::Receiver, + mut receiver: Receiver, ) -> Result<()> { loop { select! { x = backend.recv() => match x { Ok(packet) => { - match packet.content { - Some(Content::Event(event)) => { - let _ = event_sender.send(event); + match packet.form() { + IdmTransportPacketForm::Event => { + if let Ok(event) = E::decode(&packet.data) { + let _ = event_sender.send(event); + } }, - Some(Content::Request(request)) => { - let _ = request_backend_sender.send(request); + IdmTransportPacketForm::Request => { + if let Ok(request) = R::decode(&packet.data) { + let _ = request_backend_sender.send((packet.id, request)); + } }, - Some(Content::Response(response)) => { + IdmTransportPacketForm::Response => { let mut requests = requests.lock().await; - if let Some(sender) = requests.remove(&response.id) { + if let Some(sender) = requests.remove(&packet.id) { drop(requests); - let _ = sender.send(response); + + if let Ok(response) = R::Response::decode(&packet.data) { + let _ = sender.send(response); + } } }, diff --git a/crates/krata/src/bus/idm.rs b/crates/krata/src/idm/internal.rs similarity index 58% rename from crates/krata/src/bus/idm.rs rename to crates/krata/src/idm/internal.rs index 9b02ab9..a6ef6ee 100644 --- a/crates/krata/src/bus/idm.rs +++ b/crates/krata/src/idm/internal.rs @@ -1,26 +1,66 @@ +use anyhow::Result; +use prost::Message; use prost_types::{ListValue, Value}; -include!(concat!(env!("OUT_DIR"), "/krata.bus.idm.rs")); +use super::serialize::{IdmRequest, IdmSerializable}; + +include!(concat!(env!("OUT_DIR"), "/krata.idm.internal.rs")); + +pub const INTERNAL_IDM_CHANNEL: u64 = 0; + +impl IdmSerializable for Event { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} + +impl IdmSerializable for Request { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} + +impl IdmRequest for Request { + type Response = Response; +} + +impl IdmSerializable for Response { + fn encode(&self) -> Result> { + Ok(self.encode_to_vec()) + } + + fn decode(bytes: &[u8]) -> Result { + Ok(::decode(bytes)?) + } +} pub trait AsIdmMetricValue { fn as_metric_value(&self) -> Value; } -impl IdmMetricNode { - pub fn structural>(name: N, children: Vec) -> IdmMetricNode { - IdmMetricNode { +impl MetricNode { + pub fn structural>(name: N, children: Vec) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: None, - format: IdmMetricFormat::Unknown.into(), + format: MetricFormat::Unknown.into(), children, } } - pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode { - IdmMetricNode { + pub fn raw_value, V: AsIdmMetricValue>(name: N, value: V) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: Some(value.as_metric_value()), - format: IdmMetricFormat::Unknown.into(), + format: MetricFormat::Unknown.into(), children: vec![], } } @@ -28,9 +68,9 @@ impl IdmMetricNode { pub fn value, V: AsIdmMetricValue>( name: N, value: V, - format: IdmMetricFormat, - ) -> IdmMetricNode { - IdmMetricNode { + format: MetricFormat, + ) -> MetricNode { + MetricNode { name: name.as_ref().to_string(), value: Some(value.as_metric_value()), format: format.into(), diff --git a/crates/krata/src/idm/mod.rs b/crates/krata/src/idm/mod.rs index 2524a26..b02969e 100644 --- a/crates/krata/src/idm/mod.rs +++ b/crates/krata/src/idm/mod.rs @@ -1,3 +1,5 @@ #[cfg(unix)] pub mod client; -pub use crate::bus::idm as protocol; +pub mod internal; +pub mod serialize; +pub mod transport; diff --git a/crates/krata/src/idm/serialize.rs b/crates/krata/src/idm/serialize.rs new file mode 100644 index 0000000..519527c --- /dev/null +++ b/crates/krata/src/idm/serialize.rs @@ -0,0 +1,10 @@ +use anyhow::Result; + +pub trait IdmSerializable: Sized + Clone + Send + Sync + 'static { + fn decode(bytes: &[u8]) -> Result; + fn encode(&self) -> Result>; +} + +pub trait IdmRequest: IdmSerializable { + type Response: IdmSerializable; +} diff --git a/crates/krata/src/idm/transport.rs b/crates/krata/src/idm/transport.rs new file mode 100644 index 0000000..65b8c2d --- /dev/null +++ b/crates/krata/src/idm/transport.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/krata.idm.transport.rs")); diff --git a/crates/krata/src/lib.rs b/crates/krata/src/lib.rs index 7b5bb77..fc97f33 100644 --- a/crates/krata/src/lib.rs +++ b/crates/krata/src/lib.rs @@ -1,7 +1,6 @@ use once_cell::sync::Lazy; use prost_reflect::DescriptorPool; -pub mod bus; pub mod v1; pub mod client;