feat: idm v2 (#102)

* feat: rebuild idm to separate transport from content

* feat: fast guest lookup table and host identification
This commit is contained in:
Alex Zenla
2024-04-21 21:00:32 -07:00
committed by GitHub
parent 1a90372037
commit 38e892e249
33 changed files with 763 additions and 391 deletions

2
Cargo.lock generated
View File

@ -1412,6 +1412,7 @@ version = "0.0.9"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
"base64 0.22.0",
"clap", "clap",
"comfy-table", "comfy-table",
"crossterm", "crossterm",
@ -1425,6 +1426,7 @@ dependencies = [
"prost-reflect", "prost-reflect",
"prost-types", "prost-types",
"ratatui", "ratatui",
"serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"termtree", "termtree",

View File

@ -28,6 +28,7 @@ async-compression = "0.4.8"
async-stream = "0.3.5" async-stream = "0.3.5"
async-trait = "0.1.80" async-trait = "0.1.80"
backhand = "0.15.0" backhand = "0.15.0"
base64 = "0.22.0"
byteorder = "1" byteorder = "1"
bytes = "1.5.0" bytes = "1.5.0"
cgroups-rs = "0.3.4" cgroups-rs = "0.3.4"

View File

@ -11,6 +11,7 @@ resolver = "2"
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
async-stream = { workspace = true } async-stream = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true } clap = { workspace = true }
comfy-table = { workspace = true } comfy-table = { workspace = true }
crossterm = { workspace = true, features = ["event-stream"] } crossterm = { workspace = true, features = ["event-stream"] }
@ -24,6 +25,7 @@ log = { workspace = true }
prost-reflect = { workspace = true, features = ["serde"] } prost-reflect = { workspace = true, features = ["serde"] }
prost-types = { workspace = true } prost-types = { workspace = true }
ratatui = { workspace = true } ratatui = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
serde_yaml = { workspace = true } serde_yaml = { workspace = true }
termtree = { workspace = true } termtree = { workspace = true }

View File

@ -0,0 +1,22 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, IdentifyHostRequest};
use tonic::{transport::Channel, Request};
#[derive(Parser)]
#[command(about = "Identify information about the host")]
pub struct IdentifyHostCommand {}
impl IdentifyHostCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.identify_host(Request::new(IdentifyHostRequest {}))
.await?
.into_inner();
println!("Host UUID: {}", response.host_uuid);
println!("Host Domain: {}", response.host_domid);
println!("Krata Version: {}", response.krata_version);
Ok(())
}
}

View File

@ -1,14 +1,18 @@
use anyhow::Result; use anyhow::Result;
use base64::Engine;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use krata::{ use krata::{
events::EventStream, events::EventStream,
idm::{internal, serialize::IdmSerializable, transport::IdmTransportPacketForm},
v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest}, v1::control::{control_service_client::ControlServiceClient, SnoopIdmReply, SnoopIdmRequest},
}; };
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tonic::transport::Channel; use tonic::transport::Channel;
use crate::format::{kv2line, proto2dynamic, proto2kv}; use crate::format::{kv2line, proto2dynamic, value2kv};
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum IdmSnoopFormat { enum IdmSnoopFormat {
@ -34,19 +38,22 @@ impl IdmSnoopCommand {
while let Some(reply) = stream.next().await { while let Some(reply) = stream.next().await {
let reply = reply?; let reply = reply?;
let Some(line) = convert_idm_snoop(reply) else {
continue;
};
match self.format { match self.format {
IdmSnoopFormat::Simple => { IdmSnoopFormat::Simple => {
self.print_simple(reply)?; self.print_simple(line)?;
} }
IdmSnoopFormat::Jsonl => { IdmSnoopFormat::Jsonl => {
let value = serde_json::to_value(proto2dynamic(reply)?)?; let encoded = serde_json::to_string(&line)?;
let encoded = serde_json::to_string(&value)?;
println!("{}", encoded.trim()); println!("{}", encoded.trim());
} }
IdmSnoopFormat::KeyValue => { IdmSnoopFormat::KeyValue => {
self.print_key_value(reply)?; self.print_key_value(line)?;
} }
} }
} }
@ -54,21 +61,86 @@ impl IdmSnoopCommand {
Ok(()) Ok(())
} }
fn print_simple(&self, reply: SnoopIdmReply) -> Result<()> { fn print_simple(&self, line: IdmSnoopLine) -> Result<()> {
let from = reply.from; let encoded = if !line.packet.decoded.is_null() {
let to = reply.to; serde_json::to_string(&line.packet.decoded)?
let Some(packet) = reply.packet else { } else {
return Ok(()); base64::prelude::BASE64_STANDARD.encode(&line.packet.data)
}; };
let value = serde_json::to_value(proto2dynamic(packet)?)?; println!(
let encoded = serde_json::to_string(&value)?; "({} -> {}) {} {} {}",
println!("({} -> {}) {}", from, to, encoded); line.from, line.to, line.packet.id, line.packet.form, encoded
);
Ok(()) Ok(())
} }
fn print_key_value(&self, reply: SnoopIdmReply) -> Result<()> { fn print_key_value(&self, line: IdmSnoopLine) -> Result<()> {
let kvs = proto2kv(reply)?; let kvs = value2kv(serde_json::to_value(line)?)?;
println!("{}", kv2line(kvs)); println!("{}", kv2line(kvs));
Ok(()) Ok(())
} }
} }
#[derive(Serialize, Deserialize)]
pub struct IdmSnoopLine {
pub from: String,
pub to: String,
pub packet: IdmSnoopData,
}
#[derive(Serialize, Deserialize)]
pub struct IdmSnoopData {
pub id: u64,
pub channel: u64,
pub form: String,
pub data: String,
pub decoded: Value,
}
pub fn convert_idm_snoop(reply: SnoopIdmReply) -> Option<IdmSnoopLine> {
let packet = &(reply.packet?);
let decoded = if packet.channel == 0 {
match packet.form() {
IdmTransportPacketForm::Event => internal::Event::decode(&packet.data)
.ok()
.and_then(|event| proto2dynamic(event).ok()),
IdmTransportPacketForm::Request => internal::Request::decode(&packet.data)
.ok()
.and_then(|event| proto2dynamic(event).ok()),
IdmTransportPacketForm::Response => internal::Response::decode(&packet.data)
.ok()
.and_then(|event| proto2dynamic(event).ok()),
_ => None,
}
} else {
None
};
let decoded = decoded
.and_then(|message| serde_json::to_value(message).ok())
.unwrap_or(Value::Null);
let data = IdmSnoopData {
id: packet.id,
channel: packet.channel,
form: match packet.form() {
IdmTransportPacketForm::Raw => "raw".to_string(),
IdmTransportPacketForm::Event => "event".to_string(),
IdmTransportPacketForm::Request => "request".to_string(),
IdmTransportPacketForm::Response => "response".to_string(),
_ => format!("unknown-{}", packet.form),
},
data: base64::prelude::BASE64_STANDARD.encode(&packet.data),
decoded,
};
Some(IdmSnoopLine {
from: reply.from,
to: reply.to,
packet: data,
})
}

View File

@ -1,5 +1,6 @@
pub mod attach; pub mod attach;
pub mod destroy; pub mod destroy;
pub mod identify_host;
pub mod idm_snoop; pub mod idm_snoop;
pub mod launch; pub mod launch;
pub mod list; pub mod list;
@ -20,9 +21,10 @@ use krata::{
use tonic::{transport::Channel, Request}; use tonic::{transport::Channel, Request};
use self::{ use self::{
attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand, attach::AttachCommand, destroy::DestroyCommand, identify_host::IdentifyHostCommand,
launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand, idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand,
pull::PullCommand, resolve::ResolveCommand, top::TopCommand, watch::WatchCommand, metrics::MetricsCommand, pull::PullCommand, resolve::ResolveCommand, top::TopCommand,
watch::WatchCommand,
}; };
#[derive(Parser)] #[derive(Parser)]
@ -56,6 +58,7 @@ pub enum Commands {
Metrics(MetricsCommand), Metrics(MetricsCommand),
IdmSnoop(IdmSnoopCommand), IdmSnoop(IdmSnoopCommand),
Top(TopCommand), Top(TopCommand),
IdentifyHost(IdentifyHostCommand),
} }
impl ControlCommand { impl ControlCommand {
@ -107,6 +110,10 @@ impl ControlCommand {
Commands::Pull(pull) => { Commands::Pull(pull) => {
pull.run(client).await?; pull.run(client).await?;
} }
Commands::IdentifyHost(identify) => {
identify.run(client).await?;
}
} }
Ok(()) Ok(())
} }

View File

@ -4,7 +4,7 @@ use anyhow::Result;
use fancy_duration::FancyDuration; use fancy_duration::FancyDuration;
use human_bytes::human_bytes; use human_bytes::human_bytes;
use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus}; use krata::v1::common::{Guest, GuestMetricFormat, GuestMetricNode, GuestStatus};
use prost_reflect::{DynamicMessage, FieldDescriptor, ReflectMessage, Value as ReflectValue}; use prost_reflect::{DynamicMessage, ReflectMessage};
use prost_types::Value; use prost_types::Value;
use termtree::Tree; use termtree::Tree;
@ -15,64 +15,59 @@ pub fn proto2dynamic(proto: impl ReflectMessage) -> Result<DynamicMessage> {
)?) )?)
} }
pub fn proto2kv(proto: impl ReflectMessage) -> Result<HashMap<String, String>> { pub fn value2kv(value: serde_json::Value) -> Result<HashMap<String, String>> {
let message = proto2dynamic(proto)?;
let mut map = HashMap::new(); let mut map = HashMap::new();
fn crawl(prefix: String, map: &mut HashMap<String, String>, value: serde_json::Value) {
fn dot(prefix: &str, next: String) -> String {
if prefix.is_empty() {
next.to_string()
} else {
format!("{}.{}", prefix, next)
}
}
fn crawl(
prefix: String,
field: Option<&FieldDescriptor>,
map: &mut HashMap<String, String>,
value: &ReflectValue,
) {
match value { match value {
ReflectValue::Message(child) => { serde_json::Value::Null => {
for (field, field_value) in child.fields() { map.insert(prefix, "null".to_string());
let path = if prefix.is_empty() { }
field.json_name().to_string()
} else { serde_json::Value::String(value) => {
format!("{}.{}", prefix, field.json_name()) map.insert(prefix, value);
}; }
crawl(path, Some(&field), map, field_value);
serde_json::Value::Bool(value) => {
map.insert(prefix, value.to_string());
}
serde_json::Value::Number(value) => {
map.insert(prefix, value.to_string());
}
serde_json::Value::Array(value) => {
for (i, item) in value.into_iter().enumerate() {
let next = dot(&prefix, i.to_string());
crawl(next, map, item);
} }
} }
ReflectValue::EnumNumber(number) => { serde_json::Value::Object(value) => {
if let Some(kind) = field.map(|x| x.kind()) { for (key, item) in value {
if let Some(e) = kind.as_enum() { let next = dot(&prefix, key);
if let Some(value) = e.get_value(*number) { crawl(next, map, item);
map.insert(prefix, value.name().to_string());
}
}
} }
} }
ReflectValue::String(value) => {
map.insert(prefix.to_string(), value.clone());
}
ReflectValue::List(value) => {
for (x, value) in value.iter().enumerate() {
crawl(format!("{}.{}", prefix, x), field, map, value);
}
}
_ => {
map.insert(prefix.to_string(), value.to_string());
}
} }
} }
crawl("".to_string(), &mut map, value);
crawl(
"".to_string(),
None,
&mut map,
&ReflectValue::Message(message),
);
Ok(map) Ok(map)
} }
pub fn proto2kv(proto: impl ReflectMessage) -> Result<HashMap<String, String>> {
let message = proto2dynamic(proto)?;
let value = serde_json::to_value(message)?;
value2kv(value)
}
pub fn kv2line(map: HashMap<String, String>) -> String { pub fn kv2line(map: HashMap<String, String>) -> String {
map.iter() map.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v.replace('"', "\\\""))) .map(|(k, v)| format!("{}=\"{}\"", k, v.replace('"', "\\\"")))

View File

@ -1,21 +1,9 @@
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use env_logger::Env; use env_logger::Env;
use krata::dial::ControlDialAddress; use kratad::command::DaemonCommand;
use kratad::Daemon;
use log::LevelFilter; use log::LevelFilter;
use std::{ use std::sync::{atomic::AtomicBool, Arc};
str::FromStr,
sync::{atomic::AtomicBool, Arc},
};
#[derive(Parser)]
struct DaemonCommand {
#[arg(short, long, default_value = "unix:///var/lib/krata/daemon.socket")]
listen: String,
#[arg(short, long, default_value = "/var/lib/krata")]
store: String,
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -24,12 +12,8 @@ async fn main() -> Result<()> {
.init(); .init();
mask_sighup()?; mask_sighup()?;
let args = DaemonCommand::parse(); let command = DaemonCommand::parse();
let addr = ControlDialAddress::from_str(&args.listen)?; command.run().await
let mut daemon = Daemon::new(args.store.clone()).await?;
daemon.listen(addr).await?;
Ok(())
} }
fn mask_sighup() -> Result<()> { fn mask_sighup() -> Result<()> {

View File

@ -0,0 +1,36 @@
use anyhow::Result;
use clap::{CommandFactory, Parser};
use krata::dial::ControlDialAddress;
use std::str::FromStr;
use crate::Daemon;
#[derive(Parser)]
#[command(version, about = "Krata hypervisor daemon")]
pub struct DaemonCommand {
#[arg(
short,
long,
default_value = "unix:///var/lib/krata/daemon.socket",
help = "Listen address"
)]
listen: String,
#[arg(short, long, default_value = "/var/lib/krata", help = "Storage path")]
store: String,
}
impl DaemonCommand {
pub async fn run(self) -> Result<()> {
let addr = ControlDialAddress::from_str(&self.listen)?;
let mut daemon = Daemon::new(self.store.clone()).await?;
daemon.listen(addr).await?;
Ok(())
}
pub fn version() -> String {
DaemonCommand::command()
.get_version()
.unwrap_or("unknown")
.to_string()
}
}

View File

@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use anyhow::Result; use anyhow::{anyhow, Result};
use circular_buffer::CircularBuffer; use circular_buffer::CircularBuffer;
use kratart::channel::ChannelService; use kratart::channel::ChannelService;
use log::error; use log::error;
@ -11,6 +11,9 @@ use tokio::{
}, },
task::JoinHandle, task::JoinHandle,
}; };
use uuid::Uuid;
use crate::glt::GuestLookupTable;
const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024; const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024;
type RawConsoleBuffer = CircularBuffer<CONSOLE_BUFFER_SIZE, u8>; type RawConsoleBuffer = CircularBuffer<CONSOLE_BUFFER_SIZE, u8>;
@ -21,6 +24,7 @@ type BufferMap = Arc<Mutex<HashMap<u32, ConsoleBuffer>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct DaemonConsoleHandle { pub struct DaemonConsoleHandle {
glt: GuestLookupTable,
listeners: ListenerMap, listeners: ListenerMap,
buffers: BufferMap, buffers: BufferMap,
sender: Sender<(u32, Vec<u8>)>, sender: Sender<(u32, Vec<u8>)>,
@ -50,9 +54,12 @@ impl DaemonConsoleAttachHandle {
impl DaemonConsoleHandle { impl DaemonConsoleHandle {
pub async fn attach( pub async fn attach(
&self, &self,
domid: u32, uuid: Uuid,
sender: Sender<Vec<u8>>, sender: Sender<Vec<u8>>,
) -> Result<DaemonConsoleAttachHandle> { ) -> Result<DaemonConsoleAttachHandle> {
let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else {
return Err(anyhow!("unable to find domain {}", uuid));
};
let buffers = self.buffers.lock().await; let buffers = self.buffers.lock().await;
let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default(); let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default();
drop(buffers); drop(buffers);
@ -77,6 +84,7 @@ impl Drop for DaemonConsoleHandle {
} }
pub struct DaemonConsole { pub struct DaemonConsole {
glt: GuestLookupTable,
listeners: ListenerMap, listeners: ListenerMap,
buffers: BufferMap, buffers: BufferMap,
receiver: Receiver<(u32, Option<Vec<u8>>)>, receiver: Receiver<(u32, Option<Vec<u8>>)>,
@ -85,13 +93,14 @@ pub struct DaemonConsole {
} }
impl DaemonConsole { impl DaemonConsole {
pub async fn new() -> Result<DaemonConsole> { pub async fn new(glt: GuestLookupTable) -> Result<DaemonConsole> {
let (service, sender, receiver) = let (service, sender, receiver) =
ChannelService::new("krata-console".to_string(), Some(0)).await?; ChannelService::new("krata-console".to_string(), Some(0)).await?;
let task = service.launch().await?; let task = service.launch().await?;
let listeners = Arc::new(Mutex::new(HashMap::new())); let listeners = Arc::new(Mutex::new(HashMap::new()));
let buffers = Arc::new(Mutex::new(HashMap::new())); let buffers = Arc::new(Mutex::new(HashMap::new()));
Ok(DaemonConsole { Ok(DaemonConsole {
glt,
listeners, listeners,
buffers, buffers,
receiver, receiver,
@ -101,6 +110,7 @@ impl DaemonConsole {
} }
pub async fn launch(mut self) -> Result<DaemonConsoleHandle> { pub async fn launch(mut self) -> Result<DaemonConsoleHandle> {
let glt = self.glt.clone();
let listeners = self.listeners.clone(); let listeners = self.listeners.clone();
let buffers = self.buffers.clone(); let buffers = self.buffers.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();
@ -110,6 +120,7 @@ impl DaemonConsole {
} }
}); });
Ok(DaemonConsoleHandle { Ok(DaemonConsoleHandle {
glt,
listeners, listeners,
buffers, buffers,
sender, sender,

View File

@ -1,18 +1,19 @@
use async_stream::try_stream; use async_stream::try_stream;
use futures::Stream; use futures::Stream;
use krata::{ use krata::{
idm::protocol::{ idm::internal::{
idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType, request::Request as IdmRequestType, response::Response as IdmResponseType, MetricsRequest,
IdmMetricsRequest, Request as IdmRequest,
}, },
v1::{ v1::{
common::{Guest, GuestState, GuestStatus, OciImageFormat}, common::{Guest, GuestState, GuestStatus, OciImageFormat},
control::{ control::{
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest,
ListGuestsReply, ListGuestsRequest, PullImageReply, PullImageRequest, IdentifyHostReply, IdentifyHostRequest, ListGuestsReply, ListGuestsRequest,
ReadGuestMetricsReply, ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest, PullImageReply, PullImageRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest,
SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest,
WatchEventsReply, WatchEventsRequest,
}, },
}, },
}; };
@ -32,7 +33,8 @@ use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle, command::DaemonCommand, console::DaemonConsoleHandle, db::GuestStore,
event::DaemonEventContext, glt::GuestLookupTable, idm::DaemonIdmHandle,
metrics::idm_metric_to_api, oci::convert_oci_progress, metrics::idm_metric_to_api, oci::convert_oci_progress,
}; };
@ -56,6 +58,7 @@ impl From<ApiError> for Status {
#[derive(Clone)] #[derive(Clone)]
pub struct DaemonControlService { pub struct DaemonControlService {
glt: GuestLookupTable,
events: DaemonEventContext, events: DaemonEventContext,
console: DaemonConsoleHandle, console: DaemonConsoleHandle,
idm: DaemonIdmHandle, idm: DaemonIdmHandle,
@ -66,6 +69,7 @@ pub struct DaemonControlService {
impl DaemonControlService { impl DaemonControlService {
pub fn new( pub fn new(
glt: GuestLookupTable,
events: DaemonEventContext, events: DaemonEventContext,
console: DaemonConsoleHandle, console: DaemonConsoleHandle,
idm: DaemonIdmHandle, idm: DaemonIdmHandle,
@ -74,6 +78,7 @@ impl DaemonControlService {
packer: OciPackerService, packer: OciPackerService,
) -> Self { ) -> Self {
Self { Self {
glt,
events, events,
console, console,
idm, idm,
@ -108,6 +113,18 @@ impl ControlService for DaemonControlService {
type SnoopIdmStream = type SnoopIdmStream =
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>; Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
async fn identify_host(
&self,
request: Request<IdentifyHostRequest>,
) -> Result<Response<IdentifyHostReply>, Status> {
let _ = request.into_inner();
Ok(Response::new(IdentifyHostReply {
host_domid: self.glt.host_domid(),
host_uuid: self.glt.host_uuid().to_string(),
krata_version: DaemonCommand::version(),
}))
}
async fn create_guest( async fn create_guest(
&self, &self,
request: Request<CreateGuestRequest>, request: Request<CreateGuestRequest>,
@ -130,6 +147,7 @@ impl ControlService for DaemonControlService {
network: None, network: None,
exit_info: None, exit_info: None,
error_info: None, error_info: None,
host: self.glt.host_uuid().to_string(),
domid: u32::MAX, domid: u32::MAX,
}), }),
spec: Some(spec), spec: Some(spec),
@ -230,36 +248,10 @@ impl ControlService for DaemonControlService {
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
message: error.to_string(), message: error.to_string(),
})?; })?;
let guest = self
.guests
.read(uuid)
.await
.map_err(|error| ApiError {
message: error.to_string(),
})?
.ok_or_else(|| ApiError {
message: "guest did not exist in the database".to_string(),
})?;
let Some(ref state) = guest.state else {
return Err(ApiError {
message: "guest did not have state".to_string(),
}
.into());
};
let domid = state.domid;
if domid == 0 {
return Err(ApiError {
message: "invalid domid on the guest".to_string(),
}
.into());
}
let (sender, mut receiver) = channel(100); let (sender, mut receiver) = channel(100);
let console = self let console = self
.console .console
.attach(domid, sender) .attach(uuid, sender)
.await .await
.map_err(|error| ApiError { .map_err(|error| ApiError {
message: format!("failed to attach to console: {}", error), message: format!("failed to attach to console: {}", error),
@ -309,45 +301,21 @@ impl ControlService for DaemonControlService {
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
message: error.to_string(), message: error.to_string(),
})?; })?;
let guest = self let client = self.idm.client(uuid).await.map_err(|error| ApiError {
.guests
.read(uuid)
.await
.map_err(|error| ApiError {
message: error.to_string(),
})?
.ok_or_else(|| ApiError {
message: "guest did not exist in the database".to_string(),
})?;
let Some(ref state) = guest.state else {
return Err(ApiError {
message: "guest did not have state".to_string(),
}
.into());
};
let domid = state.domid;
if domid == 0 {
return Err(ApiError {
message: "invalid domid on the guest".to_string(),
}
.into());
}
let client = self.idm.client(domid).await.map_err(|error| ApiError {
message: error.to_string(), message: error.to_string(),
})?; })?;
let response = client let response = client
.send(IdmRequestType::Metrics(IdmMetricsRequest {})) .send(IdmRequest {
request: Some(IdmRequestType::Metrics(MetricsRequest {})),
})
.await .await
.map_err(|error| ApiError { .map_err(|error| ApiError {
message: error.to_string(), message: error.to_string(),
})?; })?;
let mut reply = ReadGuestMetricsReply::default(); let mut reply = ReadGuestMetricsReply::default();
if let IdmResponseType::Metrics(metrics) = response { if let Some(IdmResponseType::Metrics(metrics)) = response.response {
reply.root = metrics.root.map(idm_metric_to_api); reply.root = metrics.root.map(idm_metric_to_api);
} }
Ok(Response::new(reply)) Ok(Response::new(reply))
@ -446,9 +414,16 @@ impl ControlService for DaemonControlService {
) -> Result<Response<Self::SnoopIdmStream>, Status> { ) -> Result<Response<Self::SnoopIdmStream>, Status> {
let _ = request.into_inner(); let _ = request.into_inner();
let mut messages = self.idm.snoop(); let mut messages = self.idm.snoop();
let glt = self.glt.clone();
let output = try_stream! { let output = try_stream! {
while let Ok(event) = messages.recv().await { while let Ok(event) = messages.recv().await {
yield SnoopIdmReply { from: event.from, to: event.to, packet: Some(event.packet) }; let Some(from_uuid) = glt.lookup_uuid_by_domid(event.from).await else {
continue;
};
let Some(to_uuid) = glt.lookup_uuid_by_domid(event.to).await else {
continue;
};
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
} }
}; };
Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream)) Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream))

View File

@ -6,7 +6,7 @@ use std::{
use anyhow::Result; use anyhow::Result;
use krata::{ use krata::{
idm::protocol::{idm_event::Event, IdmEvent}, idm::{internal::event::Event as EventType, internal::Event},
v1::common::{GuestExitInfo, GuestState, GuestStatus}, v1::common::{GuestExitInfo, GuestState, GuestStatus},
}; };
use log::{error, warn}; use log::{error, warn};
@ -50,8 +50,8 @@ pub struct DaemonEventGenerator {
feed: broadcast::Receiver<DaemonEvent>, feed: broadcast::Receiver<DaemonEvent>,
idm: DaemonIdmHandle, idm: DaemonIdmHandle,
idms: HashMap<u32, (Uuid, JoinHandle<()>)>, idms: HashMap<u32, (Uuid, JoinHandle<()>)>,
idm_sender: Sender<(u32, IdmEvent)>, idm_sender: Sender<(u32, Event)>,
idm_receiver: Receiver<(u32, IdmEvent)>, idm_receiver: Receiver<(u32, Event)>,
_event_sender: broadcast::Sender<DaemonEvent>, _event_sender: broadcast::Sender<DaemonEvent>,
} }
@ -93,7 +93,7 @@ impl DaemonEventGenerator {
match status { match status {
GuestStatus::Started => { GuestStatus::Started => {
if let Entry::Vacant(e) = self.idms.entry(domid) { if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client(domid).await?; let client = self.idm.client_by_domid(domid).await?;
let mut receiver = client.subscribe().await?; let mut receiver = client.subscribe().await?;
let sender = self.idm_sender.clone(); let sender = self.idm_sender.clone();
let task = tokio::task::spawn(async move { let task = tokio::task::spawn(async move {
@ -122,9 +122,9 @@ impl DaemonEventGenerator {
Ok(()) Ok(())
} }
async fn handle_idm_event(&mut self, id: Uuid, event: IdmEvent) -> Result<()> { async fn handle_idm_event(&mut self, id: Uuid, event: Event) -> Result<()> {
match event.event { match event.event {
Some(Event::Exit(exit)) => self.handle_exit_code(id, exit.code).await, Some(EventType::Exit(exit)) => self.handle_exit_code(id, exit.code).await,
None => Ok(()), None => Ok(()),
} }
} }
@ -136,6 +136,7 @@ impl DaemonEventGenerator {
network: guest.state.clone().unwrap_or_default().network, network: guest.state.clone().unwrap_or_default().network,
exit_info: Some(GuestExitInfo { code }), exit_info: Some(GuestExitInfo { code }),
error_info: None, error_info: None,
host: guest.state.clone().map(|x| x.host).unwrap_or_default(),
domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX), domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
}); });

69
crates/daemon/src/glt.rs Normal file
View File

@ -0,0 +1,69 @@
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use uuid::Uuid;
struct GuestLookupTableState {
domid_to_uuid: HashMap<u32, Uuid>,
uuid_to_domid: HashMap<Uuid, u32>,
}
impl GuestLookupTableState {
pub fn new(host_uuid: Uuid) -> Self {
let mut domid_to_uuid = HashMap::new();
let mut uuid_to_domid = HashMap::new();
domid_to_uuid.insert(0, host_uuid);
uuid_to_domid.insert(host_uuid, 0);
GuestLookupTableState {
domid_to_uuid,
uuid_to_domid,
}
}
}
#[derive(Clone)]
pub struct GuestLookupTable {
host_domid: u32,
host_uuid: Uuid,
state: Arc<RwLock<GuestLookupTableState>>,
}
impl GuestLookupTable {
pub fn new(host_domid: u32, host_uuid: Uuid) -> Self {
GuestLookupTable {
host_domid,
host_uuid,
state: Arc::new(RwLock::new(GuestLookupTableState::new(host_uuid))),
}
}
pub fn host_uuid(&self) -> Uuid {
self.host_uuid
}
pub fn host_domid(&self) -> u32 {
self.host_domid
}
pub async fn lookup_uuid_by_domid(&self, domid: u32) -> Option<Uuid> {
let state = self.state.read().await;
state.domid_to_uuid.get(&domid).cloned()
}
pub async fn lookup_domid_by_uuid(&self, uuid: &Uuid) -> Option<u32> {
let state = self.state.read().await;
state.uuid_to_domid.get(uuid).cloned()
}
pub async fn associate(&self, uuid: Uuid, domid: u32) {
let mut state = self.state.write().await;
state.uuid_to_domid.insert(uuid, domid);
state.domid_to_uuid.insert(domid, uuid);
}
pub async fn remove(&self, uuid: Uuid, domid: u32) {
let mut state = self.state.write().await;
state.uuid_to_domid.remove(&uuid);
state.domid_to_uuid.remove(&domid);
}
}

View File

@ -6,8 +6,9 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use krata::idm::{ use krata::idm::{
client::{IdmBackend, IdmClient}, client::{IdmBackend, IdmInternalClient},
protocol::IdmPacket, internal::INTERNAL_IDM_CHANNEL,
transport::IdmTransportPacket,
}; };
use kratart::channel::ChannelService; use kratart::channel::ChannelService;
use log::{error, warn}; use log::{error, warn};
@ -21,15 +22,19 @@ use tokio::{
}, },
task::JoinHandle, task::JoinHandle,
}; };
use uuid::Uuid;
type BackendFeedMap = Arc<Mutex<HashMap<u32, Sender<IdmPacket>>>>; use crate::glt::GuestLookupTable;
type ClientMap = Arc<Mutex<HashMap<u32, IdmClient>>>;
type BackendFeedMap = Arc<Mutex<HashMap<u32, Sender<IdmTransportPacket>>>>;
type ClientMap = Arc<Mutex<HashMap<u32, IdmInternalClient>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct DaemonIdmHandle { pub struct DaemonIdmHandle {
glt: GuestLookupTable,
clients: ClientMap, clients: ClientMap,
feeds: BackendFeedMap, feeds: BackendFeedMap,
tx_sender: Sender<(u32, IdmPacket)>, tx_sender: Sender<(u32, IdmTransportPacket)>,
task: Arc<JoinHandle<()>>, task: Arc<JoinHandle<()>>,
snoop_sender: broadcast::Sender<DaemonIdmSnoopPacket>, snoop_sender: broadcast::Sender<DaemonIdmSnoopPacket>,
} }
@ -39,7 +44,14 @@ impl DaemonIdmHandle {
self.snoop_sender.subscribe() self.snoop_sender.subscribe()
} }
pub async fn client(&self, domid: u32) -> Result<IdmClient> { pub async fn client(&self, uuid: Uuid) -> Result<IdmInternalClient> {
let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else {
return Err(anyhow!("unable to find domain {}", uuid));
};
self.client_by_domid(domid).await
}
pub async fn client_by_domid(&self, domid: u32) -> Result<IdmInternalClient> {
client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await
} }
} }
@ -56,22 +68,23 @@ impl Drop for DaemonIdmHandle {
pub struct DaemonIdmSnoopPacket { pub struct DaemonIdmSnoopPacket {
pub from: u32, pub from: u32,
pub to: u32, pub to: u32,
pub packet: IdmPacket, pub packet: IdmTransportPacket,
} }
pub struct DaemonIdm { pub struct DaemonIdm {
glt: GuestLookupTable,
clients: ClientMap, clients: ClientMap,
feeds: BackendFeedMap, feeds: BackendFeedMap,
tx_sender: Sender<(u32, IdmPacket)>, tx_sender: Sender<(u32, IdmTransportPacket)>,
tx_raw_sender: Sender<(u32, Vec<u8>)>, tx_raw_sender: Sender<(u32, Vec<u8>)>,
tx_receiver: Receiver<(u32, IdmPacket)>, tx_receiver: Receiver<(u32, IdmTransportPacket)>,
rx_receiver: Receiver<(u32, Option<Vec<u8>>)>, rx_receiver: Receiver<(u32, Option<Vec<u8>>)>,
snoop_sender: broadcast::Sender<DaemonIdmSnoopPacket>, snoop_sender: broadcast::Sender<DaemonIdmSnoopPacket>,
task: JoinHandle<()>, task: JoinHandle<()>,
} }
impl DaemonIdm { impl DaemonIdm {
pub async fn new() -> Result<DaemonIdm> { pub async fn new(glt: GuestLookupTable) -> Result<DaemonIdm> {
let (service, tx_raw_sender, rx_receiver) = let (service, tx_raw_sender, rx_receiver) =
ChannelService::new("krata-channel".to_string(), None).await?; ChannelService::new("krata-channel".to_string(), None).await?;
let (tx_sender, tx_receiver) = channel(100); let (tx_sender, tx_receiver) = channel(100);
@ -80,6 +93,7 @@ impl DaemonIdm {
let clients = Arc::new(Mutex::new(HashMap::new())); let clients = Arc::new(Mutex::new(HashMap::new()));
let feeds = Arc::new(Mutex::new(HashMap::new())); let feeds = Arc::new(Mutex::new(HashMap::new()));
Ok(DaemonIdm { Ok(DaemonIdm {
glt,
rx_receiver, rx_receiver,
tx_receiver, tx_receiver,
tx_sender, tx_sender,
@ -92,6 +106,7 @@ impl DaemonIdm {
} }
pub async fn launch(mut self) -> Result<DaemonIdmHandle> { pub async fn launch(mut self) -> Result<DaemonIdmHandle> {
let glt = self.glt.clone();
let clients = self.clients.clone(); let clients = self.clients.clone();
let feeds = self.feeds.clone(); let feeds = self.feeds.clone();
let tx_sender = self.tx_sender.clone(); let tx_sender = self.tx_sender.clone();
@ -104,6 +119,7 @@ impl DaemonIdm {
} }
}); });
Ok(DaemonIdmHandle { Ok(DaemonIdmHandle {
glt,
clients, clients,
feeds, feeds,
tx_sender, tx_sender,
@ -136,7 +152,7 @@ impl DaemonIdm {
} }
let mut packet = buffer.split_to(needed); let mut packet = buffer.split_to(needed);
packet.advance(6); packet.advance(6);
match IdmPacket::decode(packet) { match IdmTransportPacket::decode(packet) {
Ok(packet) => { Ok(packet) => {
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
let guard = self.feeds.lock().await; let guard = self.feeds.lock().await;
@ -196,10 +212,10 @@ impl Drop for DaemonIdm {
async fn client_or_create( async fn client_or_create(
domid: u32, domid: u32,
tx_sender: &Sender<(u32, IdmPacket)>, tx_sender: &Sender<(u32, IdmTransportPacket)>,
clients: &ClientMap, clients: &ClientMap,
feeds: &BackendFeedMap, feeds: &BackendFeedMap,
) -> Result<IdmClient> { ) -> Result<IdmInternalClient> {
let mut clients = clients.lock().await; let mut clients = clients.lock().await;
let mut feeds = feeds.lock().await; let mut feeds = feeds.lock().await;
match clients.entry(domid) { match clients.entry(domid) {
@ -212,7 +228,11 @@ async fn client_or_create(
rx_receiver, rx_receiver,
tx_sender: tx_sender.clone(), tx_sender: tx_sender.clone(),
}; };
let client = IdmClient::new(Box::new(backend) as Box<dyn IdmBackend>).await?; let client = IdmInternalClient::new(
INTERNAL_IDM_CHANNEL,
Box::new(backend) as Box<dyn IdmBackend>,
)
.await?;
entry.insert(client.clone()); entry.insert(client.clone());
Ok(client) Ok(client)
} }
@ -221,13 +241,13 @@ async fn client_or_create(
pub struct IdmDaemonBackend { pub struct IdmDaemonBackend {
domid: u32, domid: u32,
rx_receiver: Receiver<IdmPacket>, rx_receiver: Receiver<IdmTransportPacket>,
tx_sender: Sender<(u32, IdmPacket)>, tx_sender: Sender<(u32, IdmTransportPacket)>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl IdmBackend for IdmDaemonBackend { impl IdmBackend for IdmDaemonBackend {
async fn recv(&mut self) -> Result<IdmPacket> { async fn recv(&mut self) -> Result<IdmTransportPacket> {
if let Some(packet) = self.rx_receiver.recv().await { if let Some(packet) = self.rx_receiver.recv().await {
Ok(packet) Ok(packet)
} else { } else {
@ -235,7 +255,7 @@ impl IdmBackend for IdmDaemonBackend {
} }
} }
async fn send(&mut self, packet: IdmPacket) -> Result<()> { async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> {
self.tx_sender.send((self.domid, packet)).await?; self.tx_sender.send((self.domid, packet)).await?;
Ok(()) Ok(())
} }

View File

@ -5,6 +5,7 @@ use console::{DaemonConsole, DaemonConsoleHandle};
use control::DaemonControlService; use control::DaemonControlService;
use db::GuestStore; use db::GuestStore;
use event::{DaemonEventContext, DaemonEventGenerator}; use event::{DaemonEventContext, DaemonEventGenerator};
use glt::GuestLookupTable;
use idm::{DaemonIdm, DaemonIdmHandle}; use idm::{DaemonIdm, DaemonIdmHandle};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer}; use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use krataoci::{packer::service::OciPackerService, registry::OciPlatform}; use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
@ -21,10 +22,12 @@ use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic::transport::{Identity, Server, ServerTlsConfig};
use uuid::Uuid; use uuid::Uuid;
pub mod command;
pub mod console; pub mod console;
pub mod control; pub mod control;
pub mod db; pub mod db;
pub mod event; pub mod event;
pub mod glt;
pub mod idm; pub mod idm;
pub mod metrics; pub mod metrics;
pub mod oci; pub mod oci;
@ -32,6 +35,7 @@ pub mod reconcile;
pub struct Daemon { pub struct Daemon {
store: String, store: String,
glt: GuestLookupTable,
guests: GuestStore, guests: GuestStore,
events: DaemonEventContext, events: DaemonEventContext,
guest_reconciler_task: JoinHandle<()>, guest_reconciler_task: JoinHandle<()>,
@ -51,22 +55,43 @@ impl Daemon {
image_cache_dir.push("image"); image_cache_dir.push("image");
fs::create_dir_all(&image_cache_dir).await?; fs::create_dir_all(&image_cache_dir).await?;
let mut host_uuid_path = PathBuf::from(store.clone());
host_uuid_path.push("host.uuid");
let host_uuid = if host_uuid_path.is_file() {
let content = fs::read_to_string(&host_uuid_path).await?;
Uuid::from_str(content.trim()).ok()
} else {
None
};
let host_uuid = if let Some(host_uuid) = host_uuid {
host_uuid
} else {
let generated = Uuid::new_v4();
let mut string = generated.to_string();
string.push('\n');
fs::write(&host_uuid_path, string).await?;
generated
};
let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?; let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?;
let runtime = Runtime::new(store.clone()).await?; let runtime = Runtime::new(store.clone()).await?;
let glt = GuestLookupTable::new(0, host_uuid);
let guests_db_path = format!("{}/guests.db", store); let guests_db_path = format!("{}/guests.db", store);
let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
let (guest_reconciler_notify, guest_reconciler_receiver) = let (guest_reconciler_notify, guest_reconciler_receiver) =
channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN); channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN);
let idm = DaemonIdm::new().await?; let idm = DaemonIdm::new(glt.clone()).await?;
let idm = idm.launch().await?; let idm = idm.launch().await?;
let console = DaemonConsole::new().await?; let console = DaemonConsole::new(glt.clone()).await?;
let console = console.launch().await?; let console = console.launch().await?;
let (events, generator) = let (events, generator) =
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
.await?; .await?;
let runtime_for_reconciler = runtime.dupe().await?; let runtime_for_reconciler = runtime.dupe().await?;
let guest_reconciler = GuestReconciler::new( let guest_reconciler = GuestReconciler::new(
glt.clone(),
guests.clone(), guests.clone(),
events.clone(), events.clone(),
runtime_for_reconciler, runtime_for_reconciler,
@ -79,6 +104,7 @@ impl Daemon {
Ok(Self { Ok(Self {
store, store,
glt,
guests, guests,
events, events,
guest_reconciler_task, guest_reconciler_task,
@ -92,6 +118,7 @@ impl Daemon {
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
let control_service = DaemonControlService::new( let control_service = DaemonControlService::new(
self.glt.clone(),
self.events.clone(), self.events.clone(),
self.console.clone(), self.console.clone(),
self.idm.clone(), self.idm.clone(),

View File

@ -1,18 +1,18 @@
use krata::{ use krata::{
idm::protocol::{IdmMetricFormat, IdmMetricNode}, idm::internal::{MetricFormat, MetricNode},
v1::common::{GuestMetricFormat, GuestMetricNode}, v1::common::{GuestMetricFormat, GuestMetricNode},
}; };
fn idm_metric_format_to_api(format: IdmMetricFormat) -> GuestMetricFormat { fn idm_metric_format_to_api(format: MetricFormat) -> GuestMetricFormat {
match format { match format {
IdmMetricFormat::Unknown => GuestMetricFormat::Unknown, MetricFormat::Unknown => GuestMetricFormat::Unknown,
IdmMetricFormat::Bytes => GuestMetricFormat::Bytes, MetricFormat::Bytes => GuestMetricFormat::Bytes,
IdmMetricFormat::Integer => GuestMetricFormat::Integer, MetricFormat::Integer => GuestMetricFormat::Integer,
IdmMetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds, MetricFormat::DurationSeconds => GuestMetricFormat::DurationSeconds,
} }
} }
pub fn idm_metric_to_api(node: IdmMetricNode) -> GuestMetricNode { pub fn idm_metric_to_api(node: MetricNode) -> GuestMetricNode {
let format = node.format(); let format = node.format();
GuestMetricNode { GuestMetricNode {
name: node.name, name: node.name,

View File

@ -30,6 +30,7 @@ use uuid::Uuid;
use crate::{ use crate::{
db::GuestStore, db::GuestStore,
event::{DaemonEvent, DaemonEventContext}, event::{DaemonEvent, DaemonEventContext},
glt::GuestLookupTable,
}; };
const PARALLEL_LIMIT: u32 = 5; const PARALLEL_LIMIT: u32 = 5;
@ -53,6 +54,7 @@ impl Drop for GuestReconcilerEntry {
#[derive(Clone)] #[derive(Clone)]
pub struct GuestReconciler { pub struct GuestReconciler {
glt: GuestLookupTable,
guests: GuestStore, guests: GuestStore,
events: DaemonEventContext, events: DaemonEventContext,
runtime: Runtime, runtime: Runtime,
@ -64,6 +66,7 @@ pub struct GuestReconciler {
impl GuestReconciler { impl GuestReconciler {
pub fn new( pub fn new(
glt: GuestLookupTable,
guests: GuestStore, guests: GuestStore,
events: DaemonEventContext, events: DaemonEventContext,
runtime: Runtime, runtime: Runtime,
@ -71,6 +74,7 @@ impl GuestReconciler {
guest_reconciler_notify: Sender<Uuid>, guest_reconciler_notify: Sender<Uuid>,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
glt,
guests, guests,
events, events,
runtime, runtime,
@ -123,6 +127,23 @@ impl GuestReconciler {
trace!("reconciling runtime"); trace!("reconciling runtime");
let runtime_guests = self.runtime.list().await?; let runtime_guests = self.runtime.list().await?;
let stored_guests = self.guests.list().await?; let stored_guests = self.guests.list().await?;
let non_existent_guests = runtime_guests
.iter()
.filter(|x| !stored_guests.iter().any(|g| *g.0 == x.uuid))
.collect::<Vec<_>>();
for guest in non_existent_guests {
warn!("destroying unknown runtime guest {}", guest.uuid);
if let Err(error) = self.runtime.destroy(guest.uuid).await {
error!(
"failed to destroy unknown runtime guest {}: {}",
guest.uuid, error
);
}
self.guests.remove(guest.uuid).await?;
}
for (uuid, mut stored_guest) in stored_guests { for (uuid, mut stored_guest) in stored_guests {
let previous_guest = stored_guest.clone(); let previous_guest = stored_guest.clone();
let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid); let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid);
@ -136,6 +157,7 @@ impl GuestReconciler {
} }
Some(runtime) => { Some(runtime) => {
self.glt.associate(uuid, runtime.domid).await;
let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default();
if let Some(code) = runtime.state.exit_code { if let Some(code) = runtime.state.exit_code {
state.status = GuestStatus::Exited.into(); state.status = GuestStatus::Exited.into();
@ -283,12 +305,14 @@ impl GuestReconciler {
debug: false, debug: false,
}) })
.await?; .await?;
self.glt.associate(uuid, info.domid).await;
info!("started guest {}", uuid); info!("started guest {}", uuid);
guest.state = Some(GuestState { guest.state = Some(GuestState {
status: GuestStatus::Started.into(), status: GuestStatus::Started.into(),
network: Some(guestinfo_to_networkstate(&info)), network: Some(guestinfo_to_networkstate(&info)),
exit_info: None, exit_info: None,
error_info: None, error_info: None,
host: self.glt.host_uuid().to_string(),
domid: info.domid, domid: info.domid,
}); });
Ok(GuestReconcilerResult::Changed { rerun: false }) Ok(GuestReconcilerResult::Changed { rerun: false })
@ -308,13 +332,20 @@ impl GuestReconciler {
trace!("failed to destroy runtime guest {}: {}", uuid, error); trace!("failed to destroy runtime guest {}: {}", uuid, error);
} }
let domid = guest.state.as_ref().map(|x| x.domid);
if let Some(domid) = domid {
self.glt.remove(uuid, domid).await;
}
info!("destroyed guest {}", uuid); info!("destroyed guest {}", uuid);
guest.state = Some(GuestState { guest.state = Some(GuestState {
status: GuestStatus::Destroyed.into(), status: GuestStatus::Destroyed.into(),
network: None, network: None,
exit_info: None, exit_info: None,
error_info: None, error_info: None,
domid: guest.state.as_ref().map(|x| x.domid).unwrap_or(u32::MAX), host: self.glt.host_uuid().to_string(),
domid: domid.unwrap_or(u32::MAX),
}); });
Ok(GuestReconcilerResult::Changed { rerun: false }) Ok(GuestReconcilerResult::Changed { rerun: false })
} }

View File

@ -6,10 +6,11 @@ use crate::{
use anyhow::Result; use anyhow::Result;
use cgroups_rs::Cgroup; use cgroups_rs::Cgroup;
use krata::idm::{ use krata::idm::{
client::IdmClient, client::IdmInternalClient,
protocol::{ internal::{
idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent, event::Event as EventType, request::Request as RequestType,
IdmMetricsResponse, IdmPingResponse, IdmRequest, response::Response as ResponseType, Event, ExitEvent, MetricsResponse, PingResponse,
Request, Response,
}, },
}; };
use log::debug; use log::debug;
@ -17,14 +18,18 @@ use nix::unistd::Pid;
use tokio::{select, sync::broadcast}; use tokio::{select, sync::broadcast};
pub struct GuestBackground { pub struct GuestBackground {
idm: IdmClient, idm: IdmInternalClient,
child: Pid, child: Pid,
_cgroup: Cgroup, _cgroup: Cgroup,
wait: ChildWait, wait: ChildWait,
} }
impl GuestBackground { impl GuestBackground {
pub async fn new(idm: IdmClient, cgroup: Cgroup, child: Pid) -> Result<GuestBackground> { pub async fn new(
idm: IdmInternalClient,
cgroup: Cgroup,
child: Pid,
) -> Result<GuestBackground> {
Ok(GuestBackground { Ok(GuestBackground {
idm, idm,
child, child,
@ -54,8 +59,8 @@ impl GuestBackground {
}, },
x = requests_subscription.recv() => match x { x = requests_subscription.recv() => match x {
Ok(request) => { Ok((id, request)) => {
self.handle_idm_request(request).await?; self.handle_idm_request(id, request).await?;
}, },
Err(broadcast::error::RecvError::Closed) => { Err(broadcast::error::RecvError::Closed) => {
@ -79,22 +84,27 @@ impl GuestBackground {
Ok(()) Ok(())
} }
async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> { async fn handle_idm_request(&mut self, id: u64, packet: Request) -> Result<()> {
let id = packet.id;
match packet.request { match packet.request {
Some(Request::Ping(_)) => { Some(RequestType::Ping(_)) => {
self.idm self.idm
.respond(id, Response::Ping(IdmPingResponse {})) .respond(
id,
Response {
response: Some(ResponseType::Ping(PingResponse {})),
},
)
.await?; .await?;
} }
Some(Request::Metrics(_)) => { Some(RequestType::Metrics(_)) => {
let metrics = MetricsCollector::new()?; let metrics = MetricsCollector::new()?;
let root = metrics.collect()?; let root = metrics.collect()?;
let response = IdmMetricsResponse { root: Some(root) }; let response = Response {
response: Some(ResponseType::Metrics(MetricsResponse { root: Some(root) })),
};
self.idm.respond(id, Response::Metrics(response)).await?; self.idm.respond(id, response).await?;
} }
None => {} None => {}
@ -105,8 +115,8 @@ impl GuestBackground {
async fn child_event(&mut self, event: ChildEvent) -> Result<()> { async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
if event.pid == self.child { if event.pid == self.child {
self.idm self.idm
.emit(IdmEvent { .emit(Event {
event: Some(Event::Exit(IdmExitEvent { code: event.status })), event: Some(EventType::Exit(ExitEvent { code: event.status })),
}) })
.await?; .await?;
death(event.status).await?; death(event.status).await?;

View File

@ -3,7 +3,8 @@ use cgroups_rs::{Cgroup, CgroupPid};
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use krata::ethtool::EthtoolHandle; use krata::ethtool::EthtoolHandle;
use krata::idm::client::IdmClient; use krata::idm::client::IdmInternalClient;
use krata::idm::internal::INTERNAL_IDM_CHANNEL;
use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat}; use krata::launchcfg::{LaunchInfo, LaunchNetwork, LaunchPackedFormat};
use libc::{sethostname, setsid, TIOCSCTTY}; use libc::{sethostname, setsid, TIOCSCTTY};
use log::{trace, warn}; use log::{trace, warn};
@ -77,7 +78,7 @@ impl GuestInit {
Err(error) => warn!("failed to open console: {}", error), Err(error) => warn!("failed to open console: {}", error),
}; };
let idm = IdmClient::open("/dev/hvc1") let idm = IdmInternalClient::open(INTERNAL_IDM_CHANNEL, "/dev/hvc1")
.await .await
.map_err(|x| anyhow!("failed to open idm client: {}", x))?; .map_err(|x| anyhow!("failed to open idm client: {}", x))?;
self.mount_config_image().await?; self.mount_config_image().await?;
@ -438,7 +439,12 @@ impl GuestInit {
Ok(()) Ok(())
} }
async fn run(&mut self, config: &Config, launch: &LaunchInfo, idm: IdmClient) -> Result<()> { async fn run(
&mut self,
config: &Config,
launch: &LaunchInfo,
idm: IdmInternalClient,
) -> Result<()> {
let mut cmd = match config.cmd() { let mut cmd = match config.cmd() {
None => vec![], None => vec![],
Some(value) => value.clone(), Some(value) => value.clone(),
@ -560,7 +566,7 @@ impl GuestInit {
async fn fork_and_exec( async fn fork_and_exec(
&mut self, &mut self,
idm: IdmClient, idm: IdmInternalClient,
cgroup: Cgroup, cgroup: Cgroup,
working_dir: String, working_dir: String,
path: CString, path: CString,
@ -596,7 +602,12 @@ impl GuestInit {
Ok(()) Ok(())
} }
async fn background(&mut self, idm: IdmClient, cgroup: Cgroup, executed: Pid) -> Result<()> { async fn background(
&mut self,
idm: IdmInternalClient,
cgroup: Cgroup,
executed: Pid,
) -> Result<()> {
let mut background = GuestBackground::new(idm, cgroup, executed).await?; let mut background = GuestBackground::new(idm, cgroup, executed).await?;
background.run().await?; background.run().await?;
Ok(()) Ok(())

View File

@ -1,7 +1,7 @@
use std::{ops::Add, path::Path}; use std::{ops::Add, path::Path};
use anyhow::Result; use anyhow::Result;
use krata::idm::protocol::{IdmMetricFormat, IdmMetricNode}; use krata::idm::internal::{MetricFormat, MetricNode};
use sysinfo::Process; use sysinfo::Process;
pub struct MetricsCollector {} pub struct MetricsCollector {}
@ -11,9 +11,9 @@ impl MetricsCollector {
Ok(MetricsCollector {}) Ok(MetricsCollector {})
} }
pub fn collect(&self) -> Result<IdmMetricNode> { pub fn collect(&self) -> Result<MetricNode> {
let mut sysinfo = sysinfo::System::new(); let mut sysinfo = sysinfo::System::new();
Ok(IdmMetricNode::structural( Ok(MetricNode::structural(
"guest", "guest",
vec![ vec![
self.collect_system(&mut sysinfo)?, self.collect_system(&mut sysinfo)?,
@ -22,22 +22,22 @@ impl MetricsCollector {
)) ))
} }
fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result<IdmMetricNode> { fn collect_system(&self, sysinfo: &mut sysinfo::System) -> Result<MetricNode> {
sysinfo.refresh_memory(); sysinfo.refresh_memory();
Ok(IdmMetricNode::structural( Ok(MetricNode::structural(
"system", "system",
vec![IdmMetricNode::structural( vec![MetricNode::structural(
"memory", "memory",
vec![ vec![
IdmMetricNode::value("total", sysinfo.total_memory(), IdmMetricFormat::Bytes), MetricNode::value("total", sysinfo.total_memory(), MetricFormat::Bytes),
IdmMetricNode::value("used", sysinfo.used_memory(), IdmMetricFormat::Bytes), MetricNode::value("used", sysinfo.used_memory(), MetricFormat::Bytes),
IdmMetricNode::value("free", sysinfo.free_memory(), IdmMetricFormat::Bytes), MetricNode::value("free", sysinfo.free_memory(), MetricFormat::Bytes),
], ],
)], )],
)) ))
} }
fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result<IdmMetricNode> { fn collect_processes(&self, sysinfo: &mut sysinfo::System) -> Result<MetricNode> {
sysinfo.refresh_processes(); sysinfo.refresh_processes();
let mut processes = Vec::new(); let mut processes = Vec::new();
let mut sysinfo_processes = sysinfo.processes().values().collect::<Vec<_>>(); let mut sysinfo_processes = sysinfo.processes().values().collect::<Vec<_>>();
@ -48,71 +48,68 @@ impl MetricsCollector {
} }
processes.push(MetricsCollector::process_node(process)?); processes.push(MetricsCollector::process_node(process)?);
} }
Ok(IdmMetricNode::structural("process", processes)) Ok(MetricNode::structural("process", processes))
} }
fn process_node(process: &Process) -> Result<IdmMetricNode> { fn process_node(process: &Process) -> Result<MetricNode> {
let mut metrics = vec![]; let mut metrics = vec![];
if let Some(parent) = process.parent() { if let Some(parent) = process.parent() {
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"parent", "parent",
parent.as_u32() as u64, parent.as_u32() as u64,
IdmMetricFormat::Integer, MetricFormat::Integer,
)); ));
} }
if let Some(exe) = process.exe().and_then(path_as_str) { if let Some(exe) = process.exe().and_then(path_as_str) {
metrics.push(IdmMetricNode::raw_value("executable", exe)); metrics.push(MetricNode::raw_value("executable", exe));
} }
if let Some(working_directory) = process.cwd().and_then(path_as_str) { if let Some(working_directory) = process.cwd().and_then(path_as_str) {
metrics.push(IdmMetricNode::raw_value("cwd", working_directory)); metrics.push(MetricNode::raw_value("cwd", working_directory));
} }
let cmdline = process.cmd().to_vec(); let cmdline = process.cmd().to_vec();
metrics.push(IdmMetricNode::raw_value("cmdline", cmdline)); metrics.push(MetricNode::raw_value("cmdline", cmdline));
metrics.push(IdmMetricNode::structural( metrics.push(MetricNode::structural(
"memory", "memory",
vec![ vec![
IdmMetricNode::value("resident", process.memory(), IdmMetricFormat::Bytes), MetricNode::value("resident", process.memory(), MetricFormat::Bytes),
IdmMetricNode::value("virtual", process.virtual_memory(), IdmMetricFormat::Bytes), MetricNode::value("virtual", process.virtual_memory(), MetricFormat::Bytes),
], ],
)); ));
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"lifetime", "lifetime",
process.run_time(), process.run_time(),
IdmMetricFormat::DurationSeconds, MetricFormat::DurationSeconds,
)); ));
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"uid", "uid",
process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, process.user_id().map(|x| (*x).add(0)).unwrap_or(0) as f64,
IdmMetricFormat::Integer, MetricFormat::Integer,
)); ));
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"gid", "gid",
process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64, process.group_id().map(|x| (*x).add(0)).unwrap_or(0) as f64,
IdmMetricFormat::Integer, MetricFormat::Integer,
)); ));
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"euid", "euid",
process process
.effective_user_id() .effective_user_id()
.map(|x| (*x).add(0)) .map(|x| (*x).add(0))
.unwrap_or(0) as f64, .unwrap_or(0) as f64,
IdmMetricFormat::Integer, MetricFormat::Integer,
)); ));
metrics.push(IdmMetricNode::value( metrics.push(MetricNode::value(
"egid", "egid",
process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64, process.effective_group_id().map(|x| x.add(0)).unwrap_or(0) as f64,
IdmMetricFormat::Integer, MetricFormat::Integer,
)); ));
Ok(IdmMetricNode::structural( Ok(MetricNode::structural(process.pid().to_string(), metrics))
process.pid().to_string(),
metrics,
))
} }
} }

View File

@ -6,12 +6,20 @@ fn main() -> Result<()> {
.descriptor_pool("crate::DESCRIPTOR_POOL") .descriptor_pool("crate::DESCRIPTOR_POOL")
.configure( .configure(
&mut config, &mut config,
&["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &[
"proto/krata/v1/control.proto",
"proto/krata/idm/transport.proto",
"proto/krata/idm/internal.proto",
],
&["proto/"], &["proto/"],
)?; )?;
tonic_build::configure().compile_with_config( tonic_build::configure().compile_with_config(
config, config,
&["proto/krata/v1/control.proto", "proto/krata/bus/idm.proto"], &[
"proto/krata/v1/control.proto",
"proto/krata/idm/transport.proto",
"proto/krata/idm/internal.proto",
],
&["proto/"], &["proto/"],
)?; )?;
Ok(()) Ok(())

View File

@ -1,67 +0,0 @@
syntax = "proto3";
package krata.bus.idm;
option java_multiple_files = true;
option java_package = "dev.krata.proto.bus.idm";
option java_outer_classname = "IdmProto";
import "google/protobuf/struct.proto";
message IdmPacket {
oneof content {
IdmEvent event = 1;
IdmRequest request = 2;
IdmResponse response = 3;
}
}
message IdmEvent {
oneof event {
IdmExitEvent exit = 1;
}
}
message IdmExitEvent {
int32 code = 1;
}
message IdmRequest {
uint64 id = 1;
oneof request {
IdmPingRequest ping = 2;
IdmMetricsRequest metrics = 3;
}
}
message IdmPingRequest {}
message IdmMetricsRequest {}
message IdmResponse {
uint64 id = 1;
oneof response {
IdmPingResponse ping = 2;
IdmMetricsResponse metrics = 3;
}
}
message IdmPingResponse {}
message IdmMetricsResponse {
IdmMetricNode root = 1;
}
message IdmMetricNode {
string name = 1;
google.protobuf.Value value = 2;
IdmMetricFormat format = 3;
repeated IdmMetricNode children = 4;
}
enum IdmMetricFormat {
IDM_METRIC_FORMAT_UNKNOWN = 0;
IDM_METRIC_FORMAT_BYTES = 1;
IDM_METRIC_FORMAT_INTEGER = 2;
IDM_METRIC_FORMAT_DURATION_SECONDS = 3;
}

View File

@ -0,0 +1,57 @@
syntax = "proto3";
package krata.idm.internal;
option java_multiple_files = true;
option java_package = "dev.krata.proto.idm.internal";
option java_outer_classname = "IdmInternalProto";
import "google/protobuf/struct.proto";
message ExitEvent {
int32 code = 1;
}
message PingRequest {}
message PingResponse {}
message MetricsRequest {}
message MetricsResponse {
MetricNode root = 1;
}
message MetricNode {
string name = 1;
google.protobuf.Value value = 2;
MetricFormat format = 3;
repeated MetricNode children = 4;
}
enum MetricFormat {
METRIC_FORMAT_UNKNOWN = 0;
METRIC_FORMAT_BYTES = 1;
METRIC_FORMAT_INTEGER = 2;
METRIC_FORMAT_DURATION_SECONDS = 3;
}
message Event {
oneof event {
ExitEvent exit = 1;
}
}
message Request {
oneof request {
PingRequest ping = 1;
MetricsRequest metrics = 2;
}
}
message Response {
oneof response {
PingResponse ping = 1;
MetricsResponse metrics = 2;
}
}

View File

@ -0,0 +1,22 @@
syntax = "proto3";
package krata.idm.transport;
option java_multiple_files = true;
option java_package = "dev.krata.proto.idm.transport";
option java_outer_classname = "IdmTransportProto";
message IdmTransportPacket {
uint64 id = 1;
uint64 channel = 2;
IdmTransportPacketForm form = 3;
bytes data = 4;
}
enum IdmTransportPacketForm {
IDM_TRANSPORT_PACKET_FORM_UNKNOWN = 0;
IDM_TRANSPORT_PACKET_FORM_RAW = 1;
IDM_TRANSPORT_PACKET_FORM_EVENT = 2;
IDM_TRANSPORT_PACKET_FORM_REQUEST = 3;
IDM_TRANSPORT_PACKET_FORM_RESPONSE = 4;
}

View File

@ -62,7 +62,8 @@ message GuestState {
GuestNetworkState network = 2; GuestNetworkState network = 2;
GuestExitInfo exit_info = 3; GuestExitInfo exit_info = 3;
GuestErrorInfo error_info = 4; GuestErrorInfo error_info = 4;
uint32 domid = 5; string host = 5;
uint32 domid = 6;
} }
enum GuestStatus { enum GuestStatus {

View File

@ -6,10 +6,12 @@ option java_multiple_files = true;
option java_package = "dev.krata.proto.v1.control"; option java_package = "dev.krata.proto.v1.control";
option java_outer_classname = "ControlProto"; option java_outer_classname = "ControlProto";
import "krata/bus/idm.proto"; import "krata/idm/transport.proto";
import "krata/v1/common.proto"; import "krata/v1/common.proto";
service ControlService { service ControlService {
rpc IdentifyHost(IdentifyHostRequest) returns (IdentifyHostReply);
rpc CreateGuest(CreateGuestRequest) returns (CreateGuestReply); rpc CreateGuest(CreateGuestRequest) returns (CreateGuestReply);
rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply);
rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply); rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply);
@ -24,6 +26,14 @@ service ControlService {
rpc PullImage(PullImageRequest) returns (stream PullImageReply); rpc PullImage(PullImageRequest) returns (stream PullImageReply);
} }
message IdentifyHostRequest {}
message IdentifyHostReply {
string host_uuid = 1;
uint32 host_domid = 2;
string krata_version = 3;
}
message CreateGuestRequest { message CreateGuestRequest {
krata.v1.common.GuestSpec spec = 1; krata.v1.common.GuestSpec spec = 1;
} }
@ -84,9 +94,9 @@ message ReadGuestMetricsReply {
message SnoopIdmRequest {} message SnoopIdmRequest {}
message SnoopIdmReply { message SnoopIdmReply {
uint32 from = 1; string from = 1;
uint32 to = 2; string to = 2;
krata.bus.idm.IdmPacket packet = 3; krata.idm.transport.IdmTransportPacket packet = 3;
} }
message ImageProgress { message ImageProgress {

View File

@ -1 +0,0 @@
pub mod idm;

View File

@ -8,10 +8,6 @@ use std::{
time::Duration, time::Duration,
}; };
use super::protocol::{
idm_packet::Content, idm_request::Request, idm_response::Response, IdmEvent, IdmPacket,
IdmRequest, IdmResponse,
};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use log::{debug, error}; use log::{debug, error};
use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg};
@ -22,14 +18,21 @@ use tokio::{
select, select,
sync::{ sync::{
broadcast, broadcast,
mpsc::{channel, Receiver, Sender}, mpsc::{self, Receiver, Sender},
oneshot, Mutex, oneshot, Mutex,
}, },
task::JoinHandle, task::JoinHandle,
time::timeout, time::timeout,
}; };
type RequestMap = Arc<Mutex<HashMap<u64, oneshot::Sender<IdmResponse>>>>; use super::{
internal,
serialize::{IdmRequest, IdmSerializable},
transport::{IdmTransportPacket, IdmTransportPacketForm},
};
type RequestMap<R> = Arc<Mutex<HashMap<u64, oneshot::Sender<<R as IdmRequest>::Response>>>>;
pub type IdmInternalClient = IdmClient<internal::Request, internal::Event>;
const IDM_PACKET_QUEUE_LEN: usize = 100; const IDM_PACKET_QUEUE_LEN: usize = 100;
const IDM_REQUEST_TIMEOUT_SECS: u64 = 10; const IDM_REQUEST_TIMEOUT_SECS: u64 = 10;
@ -37,8 +40,8 @@ const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait IdmBackend: Send { pub trait IdmBackend: Send {
async fn recv(&mut self) -> Result<IdmPacket>; async fn recv(&mut self) -> Result<IdmTransportPacket>;
async fn send(&mut self, packet: IdmPacket) -> Result<()>; async fn send(&mut self, packet: IdmTransportPacket) -> Result<()>;
} }
pub struct IdmFileBackend { pub struct IdmFileBackend {
@ -66,30 +69,30 @@ impl IdmFileBackend {
#[async_trait::async_trait] #[async_trait::async_trait]
impl IdmBackend for IdmFileBackend { impl IdmBackend for IdmFileBackend {
async fn recv(&mut self) -> Result<IdmPacket> { async fn recv(&mut self) -> Result<IdmTransportPacket> {
let mut fd = self.read_fd.lock().await; let mut fd = self.read_fd.lock().await;
let mut guard = fd.readable_mut().await?; let mut guard = fd.readable_mut().await?;
let b1 = guard.get_inner_mut().read_u8().await?; let b1 = guard.get_inner_mut().read_u8().await?;
if b1 != 0xff { if b1 != 0xff {
return Ok(IdmPacket::default()); return Ok(IdmTransportPacket::default());
} }
let b2 = guard.get_inner_mut().read_u8().await?; let b2 = guard.get_inner_mut().read_u8().await?;
if b2 != 0xff { if b2 != 0xff {
return Ok(IdmPacket::default()); return Ok(IdmTransportPacket::default());
} }
let size = guard.get_inner_mut().read_u32_le().await?; let size = guard.get_inner_mut().read_u32_le().await?;
if size == 0 { if size == 0 {
return Ok(IdmPacket::default()); return Ok(IdmTransportPacket::default());
} }
let mut buffer = vec![0u8; size as usize]; let mut buffer = vec![0u8; size as usize];
guard.get_inner_mut().read_exact(&mut buffer).await?; guard.get_inner_mut().read_exact(&mut buffer).await?;
match IdmPacket::decode(buffer.as_slice()) { match IdmTransportPacket::decode(buffer.as_slice()) {
Ok(packet) => Ok(packet), Ok(packet) => Ok(packet),
Err(error) => Err(anyhow!("received invalid idm packet: {}", error)), Err(error) => Err(anyhow!("received invalid idm packet: {}", error)),
} }
} }
async fn send(&mut self, packet: IdmPacket) -> Result<()> { async fn send(&mut self, packet: IdmTransportPacket) -> Result<()> {
let mut file = self.write.lock().await; let mut file = self.write.lock().await;
let data = packet.encode_to_vec(); let data = packet.encode_to_vec();
file.write_all(&[0xff, 0xff]).await?; file.write_all(&[0xff, 0xff]).await?;
@ -100,16 +103,17 @@ impl IdmBackend for IdmFileBackend {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct IdmClient { pub struct IdmClient<R: IdmRequest, E: IdmSerializable> {
request_backend_sender: broadcast::Sender<IdmRequest>, channel: u64,
request_backend_sender: broadcast::Sender<(u64, R)>,
next_request_id: Arc<Mutex<u64>>, next_request_id: Arc<Mutex<u64>>,
event_receiver_sender: broadcast::Sender<IdmEvent>, event_receiver_sender: broadcast::Sender<E>,
tx_sender: Sender<IdmPacket>, tx_sender: Sender<IdmTransportPacket>,
requests: RequestMap, requests: RequestMap<R>,
task: Arc<JoinHandle<()>>, task: Arc<JoinHandle<()>>,
} }
impl Drop for IdmClient { impl<R: IdmRequest, E: IdmSerializable> Drop for IdmClient<R, E> {
fn drop(&mut self) { fn drop(&mut self) {
if Arc::strong_count(&self.task) <= 1 { if Arc::strong_count(&self.task) <= 1 {
self.task.abort(); self.task.abort();
@ -117,12 +121,12 @@ impl Drop for IdmClient {
} }
} }
impl IdmClient { impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
pub async fn new(backend: Box<dyn IdmBackend>) -> Result<IdmClient> { pub async fn new(channel: u64, backend: Box<dyn IdmBackend>) -> Result<Self> {
let requests = Arc::new(Mutex::new(HashMap::new())); let requests = Arc::new(Mutex::new(HashMap::new()));
let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN); let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN); let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); let (tx_sender, tx_receiver) = mpsc::channel(IDM_PACKET_QUEUE_LEN);
let backend_event_sender = event_sender.clone(); let backend_event_sender = event_sender.clone();
let request_backend_sender = internal_request_backend_sender.clone(); let request_backend_sender = internal_request_backend_sender.clone();
let requests_for_client = requests.clone(); let requests_for_client = requests.clone();
@ -141,6 +145,7 @@ impl IdmClient {
} }
}); });
Ok(IdmClient { Ok(IdmClient {
channel,
next_request_id: Arc::new(Mutex::new(0)), next_request_id: Arc::new(Mutex::new(0)),
event_receiver_sender: event_sender.clone(), event_receiver_sender: event_sender.clone(),
request_backend_sender, request_backend_sender,
@ -150,7 +155,7 @@ impl IdmClient {
}) })
} }
pub async fn open<P: AsRef<Path>>(path: P) -> Result<IdmClient> { pub async fn open<P: AsRef<Path>>(channel: u64, path: P) -> Result<Self> {
let read_file = File::options() let read_file = File::options()
.read(true) .read(true)
.write(false) .write(false)
@ -164,39 +169,48 @@ impl IdmClient {
.open(path) .open(path)
.await?; .await?;
let backend = IdmFileBackend::new(read_file, write_file).await?; let backend = IdmFileBackend::new(read_file, write_file).await?;
IdmClient::new(Box::new(backend) as Box<dyn IdmBackend>).await IdmClient::new(channel, Box::new(backend) as Box<dyn IdmBackend>).await
} }
pub async fn emit(&self, event: IdmEvent) -> Result<()> { pub async fn emit<T: IdmSerializable>(&self, event: T) -> Result<()> {
let id = {
let mut guard = self.next_request_id.lock().await;
let req = *guard;
*guard = req.wrapping_add(1);
req
};
self.tx_sender self.tx_sender
.send(IdmPacket { .send(IdmTransportPacket {
content: Some(Content::Event(event)), id,
form: IdmTransportPacketForm::Event.into(),
channel: self.channel,
data: event.encode()?,
}) })
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn requests(&self) -> Result<broadcast::Receiver<IdmRequest>> { pub async fn requests(&self) -> Result<broadcast::Receiver<(u64, R)>> {
Ok(self.request_backend_sender.subscribe()) Ok(self.request_backend_sender.subscribe())
} }
pub async fn respond(&self, id: u64, response: Response) -> Result<()> { pub async fn respond<T: IdmSerializable>(&self, id: u64, response: T) -> Result<()> {
let packet = IdmPacket { let packet = IdmTransportPacket {
content: Some(Content::Response(IdmResponse { id,
id, form: IdmTransportPacketForm::Response.into(),
response: Some(response), channel: self.channel,
})), data: response.encode()?,
}; };
self.tx_sender.send(packet).await?; self.tx_sender.send(packet).await?;
Ok(()) Ok(())
} }
pub async fn subscribe(&self) -> Result<broadcast::Receiver<IdmEvent>> { pub async fn subscribe(&self) -> Result<broadcast::Receiver<E>> {
Ok(self.event_receiver_sender.subscribe()) Ok(self.event_receiver_sender.subscribe())
} }
pub async fn send(&self, request: Request) -> Result<Response> { pub async fn send(&self, request: R) -> Result<R::Response> {
let (sender, receiver) = oneshot::channel::<IdmResponse>(); let (sender, receiver) = oneshot::channel::<R::Response>();
let req = { let req = {
let mut guard = self.next_request_id.lock().await; let mut guard = self.next_request_id.lock().await;
let req = *guard; let req = *guard;
@ -217,49 +231,52 @@ impl IdmClient {
}); });
}); });
self.tx_sender self.tx_sender
.send(IdmPacket { .send(IdmTransportPacket {
content: Some(Content::Request(IdmRequest { id: req,
id: req, channel: self.channel,
request: Some(request), form: IdmTransportPacketForm::Request.into(),
})), data: request.encode()?,
}) })
.await?; .await?;
let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??; let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??;
success.store(true, Ordering::Release); success.store(true, Ordering::Release);
if let Some(response) = response.response { Ok(response)
Ok(response)
} else {
Err(anyhow!("response did not contain any content"))
}
} }
async fn process( async fn process(
mut backend: Box<dyn IdmBackend>, mut backend: Box<dyn IdmBackend>,
event_sender: broadcast::Sender<IdmEvent>, event_sender: broadcast::Sender<E>,
requests: RequestMap, requests: RequestMap<R>,
request_backend_sender: broadcast::Sender<IdmRequest>, request_backend_sender: broadcast::Sender<(u64, R)>,
_event_receiver: broadcast::Receiver<IdmEvent>, _event_receiver: broadcast::Receiver<E>,
mut receiver: Receiver<IdmPacket>, mut receiver: Receiver<IdmTransportPacket>,
) -> Result<()> { ) -> Result<()> {
loop { loop {
select! { select! {
x = backend.recv() => match x { x = backend.recv() => match x {
Ok(packet) => { Ok(packet) => {
match packet.content { match packet.form() {
Some(Content::Event(event)) => { IdmTransportPacketForm::Event => {
let _ = event_sender.send(event); if let Ok(event) = E::decode(&packet.data) {
let _ = event_sender.send(event);
}
}, },
Some(Content::Request(request)) => { IdmTransportPacketForm::Request => {
let _ = request_backend_sender.send(request); if let Ok(request) = R::decode(&packet.data) {
let _ = request_backend_sender.send((packet.id, request));
}
}, },
Some(Content::Response(response)) => { IdmTransportPacketForm::Response => {
let mut requests = requests.lock().await; let mut requests = requests.lock().await;
if let Some(sender) = requests.remove(&response.id) { if let Some(sender) = requests.remove(&packet.id) {
drop(requests); drop(requests);
let _ = sender.send(response);
if let Ok(response) = R::Response::decode(&packet.data) {
let _ = sender.send(response);
}
} }
}, },

View File

@ -1,26 +1,66 @@
use anyhow::Result;
use prost::Message;
use prost_types::{ListValue, Value}; use prost_types::{ListValue, Value};
include!(concat!(env!("OUT_DIR"), "/krata.bus.idm.rs")); use super::serialize::{IdmRequest, IdmSerializable};
include!(concat!(env!("OUT_DIR"), "/krata.idm.internal.rs"));
pub const INTERNAL_IDM_CHANNEL: u64 = 0;
impl IdmSerializable for Event {
fn encode(&self) -> Result<Vec<u8>> {
Ok(self.encode_to_vec())
}
fn decode(bytes: &[u8]) -> Result<Self> {
Ok(<Self as prost::Message>::decode(bytes)?)
}
}
impl IdmSerializable for Request {
fn encode(&self) -> Result<Vec<u8>> {
Ok(self.encode_to_vec())
}
fn decode(bytes: &[u8]) -> Result<Self> {
Ok(<Self as prost::Message>::decode(bytes)?)
}
}
impl IdmRequest for Request {
type Response = Response;
}
impl IdmSerializable for Response {
fn encode(&self) -> Result<Vec<u8>> {
Ok(self.encode_to_vec())
}
fn decode(bytes: &[u8]) -> Result<Self> {
Ok(<Self as prost::Message>::decode(bytes)?)
}
}
pub trait AsIdmMetricValue { pub trait AsIdmMetricValue {
fn as_metric_value(&self) -> Value; fn as_metric_value(&self) -> Value;
} }
impl IdmMetricNode { impl MetricNode {
pub fn structural<N: AsRef<str>>(name: N, children: Vec<IdmMetricNode>) -> IdmMetricNode { pub fn structural<N: AsRef<str>>(name: N, children: Vec<MetricNode>) -> MetricNode {
IdmMetricNode { MetricNode {
name: name.as_ref().to_string(), name: name.as_ref().to_string(),
value: None, value: None,
format: IdmMetricFormat::Unknown.into(), format: MetricFormat::Unknown.into(),
children, children,
} }
} }
pub fn raw_value<N: AsRef<str>, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode { pub fn raw_value<N: AsRef<str>, V: AsIdmMetricValue>(name: N, value: V) -> MetricNode {
IdmMetricNode { MetricNode {
name: name.as_ref().to_string(), name: name.as_ref().to_string(),
value: Some(value.as_metric_value()), value: Some(value.as_metric_value()),
format: IdmMetricFormat::Unknown.into(), format: MetricFormat::Unknown.into(),
children: vec![], children: vec![],
} }
} }
@ -28,9 +68,9 @@ impl IdmMetricNode {
pub fn value<N: AsRef<str>, V: AsIdmMetricValue>( pub fn value<N: AsRef<str>, V: AsIdmMetricValue>(
name: N, name: N,
value: V, value: V,
format: IdmMetricFormat, format: MetricFormat,
) -> IdmMetricNode { ) -> MetricNode {
IdmMetricNode { MetricNode {
name: name.as_ref().to_string(), name: name.as_ref().to_string(),
value: Some(value.as_metric_value()), value: Some(value.as_metric_value()),
format: format.into(), format: format.into(),

View File

@ -1,3 +1,5 @@
#[cfg(unix)] #[cfg(unix)]
pub mod client; pub mod client;
pub use crate::bus::idm as protocol; pub mod internal;
pub mod serialize;
pub mod transport;

View File

@ -0,0 +1,10 @@
use anyhow::Result;
pub trait IdmSerializable: Sized + Clone + Send + Sync + 'static {
fn decode(bytes: &[u8]) -> Result<Self>;
fn encode(&self) -> Result<Vec<u8>>;
}
pub trait IdmRequest: IdmSerializable {
type Response: IdmSerializable;
}

View File

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), "/krata.idm.transport.rs"));

View File

@ -1,7 +1,6 @@
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use prost_reflect::DescriptorPool; use prost_reflect::DescriptorPool;
pub mod bus;
pub mod v1; pub mod v1;
pub mod client; pub mod client;