mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
feat: implement metrics support
This commit is contained in:
95
Cargo.lock
generated
95
Cargo.lock
generated
@ -421,6 +421,12 @@ dependencies = [
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.12"
|
||||
@ -439,6 +445,31 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
|
||||
|
||||
[[package]]
|
||||
name = "crossterm"
|
||||
version = "0.27.0"
|
||||
@ -1324,6 +1355,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sys-mount",
|
||||
"sysinfo",
|
||||
"tokio",
|
||||
"walkdir",
|
||||
]
|
||||
@ -1719,6 +1751,15 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.18"
|
||||
@ -2075,6 +2116,26 @@ dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redb"
|
||||
version = "2.0.0"
|
||||
@ -2572,6 +2633,21 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sysinfo"
|
||||
version = "0.30.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9a84fe4cfc513b41cb2596b624e561ec9e7e1c4b46328e496ed56a53514ef2a"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
"ntapi",
|
||||
"once_cell",
|
||||
"rayon",
|
||||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tap"
|
||||
version = "1.0.1"
|
||||
@ -3072,6 +3148,25 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
|
||||
dependencies = [
|
||||
"windows-core",
|
||||
"windows-targets 0.52.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
|
@ -64,6 +64,7 @@ sha256 = "1.5.0"
|
||||
signal-hook = "0.3.17"
|
||||
slice-copy = "0.3.0"
|
||||
smoltcp = "0.11.0"
|
||||
sysinfo = "0.30.9"
|
||||
thiserror = "1.0"
|
||||
tokio-tun = "0.11.4"
|
||||
tonic-build = "0.11.0"
|
||||
|
90
crates/ctl/src/cli/metrics.rs
Normal file
90
crates/ctl/src/cli/metrics.rs
Normal file
@ -0,0 +1,90 @@
|
||||
use anyhow::Result;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use comfy_table::{presets::UTF8_FULL_CONDENSED, Table};
|
||||
use krata::{
|
||||
events::EventStream,
|
||||
v1::control::{
|
||||
control_service_client::ControlServiceClient, ReadGuestMetricsReply,
|
||||
ReadGuestMetricsRequest,
|
||||
},
|
||||
};
|
||||
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::format::{kv2line, proto2dynamic, proto2kv};
|
||||
|
||||
use super::resolve_guest;
|
||||
|
||||
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
|
||||
enum MetricsFormat {
|
||||
Table,
|
||||
Json,
|
||||
JsonPretty,
|
||||
Yaml,
|
||||
KeyValue,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about = "Read metrics from the guest")]
|
||||
pub struct MetricsCommand {
|
||||
#[arg(short, long, default_value = "table", help = "Output format")]
|
||||
format: MetricsFormat,
|
||||
#[arg(help = "Guest to read metrics for, either the name or the uuid")]
|
||||
guest: String,
|
||||
}
|
||||
|
||||
impl MetricsCommand {
|
||||
pub async fn run(
|
||||
self,
|
||||
mut client: ControlServiceClient<Channel>,
|
||||
_events: EventStream,
|
||||
) -> Result<()> {
|
||||
let guest_id: String = resolve_guest(&mut client, &self.guest).await?;
|
||||
let reply = client
|
||||
.read_guest_metrics(ReadGuestMetricsRequest { guest_id })
|
||||
.await?
|
||||
.into_inner();
|
||||
match self.format {
|
||||
MetricsFormat::Table => {
|
||||
self.print_metrics_table(reply)?;
|
||||
}
|
||||
|
||||
MetricsFormat::Json | MetricsFormat::JsonPretty | MetricsFormat::Yaml => {
|
||||
let value = serde_json::to_value(proto2dynamic(reply)?)?;
|
||||
let encoded = if self.format == MetricsFormat::JsonPretty {
|
||||
serde_json::to_string_pretty(&value)?
|
||||
} else if self.format == MetricsFormat::Yaml {
|
||||
serde_yaml::to_string(&value)?
|
||||
} else {
|
||||
serde_json::to_string(&value)?
|
||||
};
|
||||
println!("{}", encoded.trim());
|
||||
}
|
||||
|
||||
MetricsFormat::KeyValue => {
|
||||
self.print_key_value(reply)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_metrics_table(&self, reply: ReadGuestMetricsReply) -> Result<()> {
|
||||
let mut table = Table::new();
|
||||
table.load_preset(UTF8_FULL_CONDENSED);
|
||||
table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic);
|
||||
table.set_header(vec!["metric", "value"]);
|
||||
let kvs = proto2kv(reply)?;
|
||||
for (key, value) in kvs {
|
||||
table.add_row(vec![key, value]);
|
||||
}
|
||||
println!("{}", table);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_key_value(&self, metrics: ReadGuestMetricsReply) -> Result<()> {
|
||||
let kvs = proto2kv(metrics)?;
|
||||
println!("{}", kv2line(kvs),);
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@ pub mod destroy;
|
||||
pub mod launch;
|
||||
pub mod list;
|
||||
pub mod logs;
|
||||
pub mod metrics;
|
||||
pub mod resolve;
|
||||
pub mod watch;
|
||||
|
||||
@ -17,7 +18,7 @@ use tonic::{transport::Channel, Request};
|
||||
|
||||
use self::{
|
||||
attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand,
|
||||
logs::LogsCommand, resolve::ResolveCommand, watch::WatchCommand,
|
||||
logs::LogsCommand, metrics::MetricsCommand, resolve::ResolveCommand, watch::WatchCommand,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
@ -47,6 +48,7 @@ pub enum Commands {
|
||||
Logs(LogsCommand),
|
||||
Watch(WatchCommand),
|
||||
Resolve(ResolveCommand),
|
||||
Metrics(MetricsCommand),
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
@ -82,6 +84,10 @@ impl ControlCommand {
|
||||
Commands::Resolve(resolve) => {
|
||||
resolve.run(client).await?;
|
||||
}
|
||||
|
||||
Commands::Metrics(metrics) => {
|
||||
metrics.run(client, events).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2,13 +2,19 @@ use std::{pin::Pin, str::FromStr};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use krata::v1::{
|
||||
common::{Guest, GuestState, GuestStatus},
|
||||
control::{
|
||||
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
|
||||
CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest,
|
||||
ListGuestsReply, ListGuestsRequest, ResolveGuestReply, ResolveGuestRequest,
|
||||
WatchEventsReply, WatchEventsRequest,
|
||||
use krata::{
|
||||
idm::protocol::{
|
||||
idm_request::Request as IdmRequestType, idm_response::Response as IdmResponseType,
|
||||
IdmMetricsRequest,
|
||||
},
|
||||
v1::{
|
||||
common::{Guest, GuestState, GuestStatus},
|
||||
control::{
|
||||
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
|
||||
CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest,
|
||||
ListGuestsReply, ListGuestsRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest,
|
||||
ResolveGuestReply, ResolveGuestRequest, WatchEventsReply, WatchEventsRequest,
|
||||
},
|
||||
},
|
||||
};
|
||||
use tokio::{
|
||||
@ -19,7 +25,9 @@ use tokio_stream::StreamExt;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext};
|
||||
use crate::{
|
||||
console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle,
|
||||
};
|
||||
|
||||
pub struct ApiError {
|
||||
message: String,
|
||||
@ -43,6 +51,7 @@ impl From<ApiError> for Status {
|
||||
pub struct RuntimeControlService {
|
||||
events: DaemonEventContext,
|
||||
console: DaemonConsoleHandle,
|
||||
idm: DaemonIdmHandle,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
}
|
||||
@ -51,12 +60,14 @@ impl RuntimeControlService {
|
||||
pub fn new(
|
||||
events: DaemonEventContext,
|
||||
console: DaemonConsoleHandle,
|
||||
idm: DaemonIdmHandle,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
) -> Self {
|
||||
Self {
|
||||
events,
|
||||
console,
|
||||
idm,
|
||||
guests,
|
||||
guest_reconciler_notify,
|
||||
}
|
||||
@ -269,6 +280,59 @@ impl ControlService for RuntimeControlService {
|
||||
Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream))
|
||||
}
|
||||
|
||||
async fn read_guest_metrics(
|
||||
&self,
|
||||
request: Request<ReadGuestMetricsRequest>,
|
||||
) -> Result<Response<ReadGuestMetricsReply>, Status> {
|
||||
let request = request.into_inner();
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let guest = self
|
||||
.guests
|
||||
.read(uuid)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?
|
||||
.ok_or_else(|| ApiError {
|
||||
message: "guest did not exist in the database".to_string(),
|
||||
})?;
|
||||
|
||||
let Some(ref state) = guest.state else {
|
||||
return Err(ApiError {
|
||||
message: "guest did not have state".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
|
||||
let domid = state.domid;
|
||||
if domid == 0 {
|
||||
return Err(ApiError {
|
||||
message: "invalid domid on the guest".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let client = self.idm.client(domid).await.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
|
||||
let response = client
|
||||
.send(IdmRequestType::Metrics(IdmMetricsRequest {}))
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
|
||||
let mut reply = ReadGuestMetricsReply::default();
|
||||
if let IdmResponseType::Metrics(metrics) = response {
|
||||
reply.total_memory_bytes = metrics.total_memory_bytes;
|
||||
reply.used_memory_bytes = metrics.used_memory_bytes;
|
||||
}
|
||||
Ok(Response::new(reply))
|
||||
}
|
||||
|
||||
async fn watch_events(
|
||||
&self,
|
||||
request: Request<WatchEventsRequest>,
|
||||
|
@ -139,7 +139,12 @@ impl DaemonIdm {
|
||||
x = self.tx_receiver.recv() => match x {
|
||||
Some((domid, packet)) => {
|
||||
let data = packet.encode_to_vec();
|
||||
self.tx_raw_sender.send((domid, data)).await?;
|
||||
let mut buffer = vec![0u8; 2];
|
||||
let length = data.len();
|
||||
buffer[0] = length as u8;
|
||||
buffer[1] = (length << 8) as u8;
|
||||
buffer.extend_from_slice(&data);
|
||||
self.tx_raw_sender.send((domid, buffer)).await?;
|
||||
},
|
||||
|
||||
None => {
|
||||
|
@ -33,7 +33,7 @@ pub struct Daemon {
|
||||
guest_reconciler_task: JoinHandle<()>,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
generator_task: JoinHandle<()>,
|
||||
_idm: DaemonIdmHandle,
|
||||
idm: DaemonIdmHandle,
|
||||
console: DaemonConsoleHandle,
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ impl Daemon {
|
||||
guest_reconciler_task,
|
||||
guest_reconciler_notify,
|
||||
generator_task,
|
||||
_idm: idm,
|
||||
idm,
|
||||
console,
|
||||
})
|
||||
}
|
||||
@ -78,6 +78,7 @@ impl Daemon {
|
||||
let control_service = RuntimeControlService::new(
|
||||
self.events.clone(),
|
||||
self.console.clone(),
|
||||
self.idm.clone(),
|
||||
self.guests.clone(),
|
||||
self.guest_reconciler_notify.clone(),
|
||||
);
|
||||
|
@ -25,6 +25,7 @@ rtnetlink = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sys-mount = { workspace = true }
|
||||
sysinfo = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
|
||||
|
@ -7,7 +7,7 @@ use std::env;
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
env::set_var("RUST_BACKTRACE", "1");
|
||||
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
|
||||
env_logger::Builder::from_env(Env::default().default_filter_or("trace")).init();
|
||||
if env::var("KRATA_UNSAFE_ALWAYS_ALLOW_INIT").unwrap_or("0".to_string()) != "1" {
|
||||
let pid = std::process::id();
|
||||
if pid > 3 {
|
||||
|
@ -8,11 +8,12 @@ use krata::idm::{
|
||||
client::IdmClient,
|
||||
protocol::{
|
||||
idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent,
|
||||
IdmPingResponse, IdmRequest,
|
||||
IdmMetricsResponse, IdmPingResponse, IdmRequest,
|
||||
},
|
||||
};
|
||||
use log::debug;
|
||||
use nix::unistd::Pid;
|
||||
use sysinfo::System;
|
||||
use tokio::{select, sync::broadcast};
|
||||
|
||||
pub struct GuestBackground {
|
||||
@ -80,10 +81,26 @@ impl GuestBackground {
|
||||
|
||||
async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> {
|
||||
let id = packet.id;
|
||||
if let Some(Request::Ping(_)) = packet.request {
|
||||
self.idm
|
||||
.respond(id, Response::Ping(IdmPingResponse {}))
|
||||
.await?;
|
||||
|
||||
match packet.request {
|
||||
Some(Request::Ping(_)) => {
|
||||
self.idm
|
||||
.respond(id, Response::Ping(IdmPingResponse {}))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(Request::Metrics(_)) => {
|
||||
let mut sys = System::new();
|
||||
sys.refresh_memory();
|
||||
let response = IdmMetricsResponse {
|
||||
total_memory_bytes: sys.total_memory(),
|
||||
used_memory_bytes: sys.used_memory(),
|
||||
};
|
||||
|
||||
self.idm.respond(id, Response::Metrics(response)).await?;
|
||||
}
|
||||
|
||||
None => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -28,16 +28,25 @@ message IdmRequest {
|
||||
uint64 id = 1;
|
||||
oneof request {
|
||||
IdmPingRequest ping = 2;
|
||||
IdmMetricsRequest metrics = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message IdmPingRequest {}
|
||||
|
||||
message IdmMetricsRequest {}
|
||||
|
||||
message IdmResponse {
|
||||
uint64 id = 1;
|
||||
oneof response {
|
||||
IdmPingResponse ping = 2;
|
||||
IdmMetricsResponse metrics = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message IdmPingResponse {}
|
||||
|
||||
message IdmMetricsResponse {
|
||||
uint64 total_memory_bytes = 1;
|
||||
uint64 used_memory_bytes = 2;
|
||||
}
|
||||
|
@ -15,6 +15,8 @@ service ControlService {
|
||||
rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply);
|
||||
rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply);
|
||||
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
|
||||
|
||||
rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply);
|
||||
}
|
||||
|
||||
message CreateGuestRequest {
|
||||
@ -65,3 +67,12 @@ message WatchEventsReply {
|
||||
message GuestChangedEvent {
|
||||
krata.v1.common.Guest guest = 1;
|
||||
}
|
||||
|
||||
message ReadGuestMetricsRequest {
|
||||
string guest_id = 1;
|
||||
}
|
||||
|
||||
message ReadGuestMetricsReply {
|
||||
uint64 total_memory_bytes = 1;
|
||||
uint64 used_memory_bytes = 2;
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ use super::protocol::{
|
||||
idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse,
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
use bytes::BytesMut;
|
||||
use log::{debug, error};
|
||||
use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg};
|
||||
use prost::Message;
|
||||
@ -33,14 +32,17 @@ pub trait IdmBackend: Send {
|
||||
}
|
||||
|
||||
pub struct IdmFileBackend {
|
||||
fd: Arc<Mutex<AsyncFd<File>>>,
|
||||
read_fd: Arc<Mutex<AsyncFd<File>>>,
|
||||
write: Arc<Mutex<File>>,
|
||||
}
|
||||
|
||||
impl IdmFileBackend {
|
||||
pub async fn new(file: File) -> Result<IdmFileBackend> {
|
||||
IdmFileBackend::set_raw_port(&file)?;
|
||||
pub async fn new(read_file: File, write_file: File) -> Result<IdmFileBackend> {
|
||||
IdmFileBackend::set_raw_port(&read_file)?;
|
||||
IdmFileBackend::set_raw_port(&write_file)?;
|
||||
Ok(IdmFileBackend {
|
||||
fd: Arc::new(Mutex::new(AsyncFd::new(file)?)),
|
||||
read_fd: Arc::new(Mutex::new(AsyncFd::new(read_file)?)),
|
||||
write: Arc::new(Mutex::new(write_file)),
|
||||
})
|
||||
}
|
||||
|
||||
@ -55,26 +57,25 @@ impl IdmFileBackend {
|
||||
#[async_trait::async_trait]
|
||||
impl IdmBackend for IdmFileBackend {
|
||||
async fn recv(&mut self) -> Result<IdmPacket> {
|
||||
let mut fd = self.fd.lock().await;
|
||||
let mut fd = self.read_fd.lock().await;
|
||||
let mut guard = fd.readable_mut().await?;
|
||||
let size = guard.get_inner_mut().read_u16_le().await?;
|
||||
if size == 0 {
|
||||
return Ok(IdmPacket::default());
|
||||
}
|
||||
let mut buffer = BytesMut::with_capacity(size as usize);
|
||||
let mut buffer = vec![0u8; size as usize];
|
||||
guard.get_inner_mut().read_exact(&mut buffer).await?;
|
||||
match IdmPacket::decode(buffer) {
|
||||
match IdmPacket::decode(buffer.as_slice()) {
|
||||
Ok(packet) => Ok(packet),
|
||||
|
||||
Err(error) => Err(anyhow!("received invalid idm packet: {}", error)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send(&mut self, packet: IdmPacket) -> Result<()> {
|
||||
let mut fd = self.fd.lock().await;
|
||||
let mut file = self.write.lock().await;
|
||||
let data = packet.encode_to_vec();
|
||||
fd.get_mut().write_u16_le(data.len() as u16).await?;
|
||||
fd.get_mut().write_all(&data).await?;
|
||||
file.write_u16_le(data.len() as u16).await?;
|
||||
file.write_all(&data).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -105,6 +106,7 @@ impl IdmClient {
|
||||
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
||||
let backend_event_sender = event_sender.clone();
|
||||
let request_backend_sender = internal_request_backend_sender.clone();
|
||||
let requests_for_client = requests.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
if let Err(error) = IdmClient::process(
|
||||
backend,
|
||||
@ -123,20 +125,26 @@ impl IdmClient {
|
||||
next_request_id: Arc::new(Mutex::new(0)),
|
||||
event_receiver_sender: event_sender.clone(),
|
||||
request_backend_sender,
|
||||
requests: Arc::new(Mutex::new(HashMap::new())),
|
||||
requests: requests_for_client,
|
||||
tx_sender,
|
||||
task: Arc::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<IdmClient> {
|
||||
let file = File::options()
|
||||
let read_file = File::options()
|
||||
.read(true)
|
||||
.write(false)
|
||||
.create(false)
|
||||
.open(&path)
|
||||
.await?;
|
||||
let write_file = File::options()
|
||||
.read(false)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(path)
|
||||
.await?;
|
||||
let backend = IdmFileBackend::new(file).await?;
|
||||
let backend = IdmFileBackend::new(read_file, write_file).await?;
|
||||
IdmClient::new(Box::new(backend) as Box<dyn IdmBackend>).await
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user