mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 21:21:32 +00:00
feat: guest metrics support (#46)
* feat: initial support for idm send in daemon * feat: implement IdmClient backend support * feat: daemon idm now uses IdmClient * fix: implement channel destruction propagation * feat: implement request response idm system * feat: implement metrics support * proto: move metrics into GuestMetrics for reusability * fix: log level of guest agent was trace * feat: metrics tree with process information
This commit is contained in:
@ -10,12 +10,15 @@ resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
log = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
prost = { workspace = true }
|
||||
prost-reflect = { workspace = true }
|
||||
prost-types = { workspace = true }
|
||||
scopeguard = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
@ -6,8 +6,14 @@ option java_multiple_files = true;
|
||||
option java_package = "dev.krata.proto.internal.idm";
|
||||
option java_outer_classname = "IdmProto";
|
||||
|
||||
message IdmExitEvent {
|
||||
int32 code = 1;
|
||||
import "google/protobuf/struct.proto";
|
||||
|
||||
message IdmPacket {
|
||||
oneof content {
|
||||
IdmEvent event = 1;
|
||||
IdmRequest request = 2;
|
||||
IdmResponse response = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message IdmEvent {
|
||||
@ -16,6 +22,46 @@ message IdmEvent {
|
||||
}
|
||||
}
|
||||
|
||||
message IdmPacket {
|
||||
IdmEvent event = 1;
|
||||
message IdmExitEvent {
|
||||
int32 code = 1;
|
||||
}
|
||||
|
||||
message IdmRequest {
|
||||
uint64 id = 1;
|
||||
oneof request {
|
||||
IdmPingRequest ping = 2;
|
||||
IdmMetricsRequest metrics = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message IdmPingRequest {}
|
||||
|
||||
message IdmMetricsRequest {}
|
||||
|
||||
message IdmResponse {
|
||||
uint64 id = 1;
|
||||
oneof response {
|
||||
IdmPingResponse ping = 2;
|
||||
IdmMetricsResponse metrics = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message IdmPingResponse {}
|
||||
|
||||
message IdmMetricsResponse {
|
||||
IdmMetricNode root = 1;
|
||||
}
|
||||
|
||||
message IdmMetricNode {
|
||||
string name = 1;
|
||||
google.protobuf.Value value = 2;
|
||||
IdmMetricFormat format = 3;
|
||||
repeated IdmMetricNode children = 4;
|
||||
}
|
||||
|
||||
enum IdmMetricFormat {
|
||||
IDM_METRIC_FORMAT_UNKNOWN = 0;
|
||||
IDM_METRIC_FORMAT_BYTES = 1;
|
||||
IDM_METRIC_FORMAT_INTEGER = 2;
|
||||
IDM_METRIC_FORMAT_DURATION_SECONDS = 3;
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ option java_multiple_files = true;
|
||||
option java_package = "dev.krata.proto.v1.common";
|
||||
option java_outer_classname = "CommonProto";
|
||||
|
||||
import "google/protobuf/struct.proto";
|
||||
|
||||
message Guest {
|
||||
string id = 1;
|
||||
GuestSpec spec = 2;
|
||||
@ -80,3 +82,17 @@ message GuestExitInfo {
|
||||
message GuestErrorInfo {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message GuestMetricNode {
|
||||
string name = 1;
|
||||
google.protobuf.Value value = 2;
|
||||
GuestMetricFormat format = 3;
|
||||
repeated GuestMetricNode children = 4;
|
||||
}
|
||||
|
||||
enum GuestMetricFormat {
|
||||
GUEST_METRIC_FORMAT_UNKNOWN = 0;
|
||||
GUEST_METRIC_FORMAT_BYTES = 1;
|
||||
GUEST_METRIC_FORMAT_INTEGER = 2;
|
||||
GUEST_METRIC_FORMAT_DURATION_SECONDS = 3;
|
||||
}
|
||||
|
@ -15,6 +15,8 @@ service ControlService {
|
||||
rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply);
|
||||
rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply);
|
||||
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
|
||||
|
||||
rpc ReadGuestMetrics(ReadGuestMetricsRequest) returns (ReadGuestMetricsReply);
|
||||
}
|
||||
|
||||
message CreateGuestRequest {
|
||||
@ -65,3 +67,11 @@ message WatchEventsReply {
|
||||
message GuestChangedEvent {
|
||||
krata.v1.common.Guest guest = 1;
|
||||
}
|
||||
|
||||
message ReadGuestMetricsRequest {
|
||||
string guest_id = 1;
|
||||
}
|
||||
|
||||
message ReadGuestMetricsReply {
|
||||
krata.v1.common.GuestMetricNode root = 1;
|
||||
}
|
||||
|
@ -1,8 +1,19 @@
|
||||
use std::path::Path;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use super::protocol::IdmPacket;
|
||||
use crate::idm::protocol::idm_packet::Content;
|
||||
|
||||
use super::protocol::{
|
||||
idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse,
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
use bytes::BytesMut;
|
||||
use log::{debug, error};
|
||||
use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg};
|
||||
use prost::Message;
|
||||
@ -10,44 +21,39 @@ use tokio::{
|
||||
fs::File,
|
||||
io::{unix::AsyncFd, AsyncReadExt, AsyncWriteExt},
|
||||
select,
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
sync::{
|
||||
broadcast,
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
oneshot, Mutex,
|
||||
},
|
||||
task::JoinHandle,
|
||||
time::timeout,
|
||||
};
|
||||
|
||||
type RequestMap = Arc<Mutex<HashMap<u64, oneshot::Sender<IdmResponse>>>>;
|
||||
|
||||
const IDM_PACKET_QUEUE_LEN: usize = 100;
|
||||
const IDM_REQUEST_TIMEOUT_SECS: u64 = 10;
|
||||
const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024;
|
||||
|
||||
pub struct IdmClient {
|
||||
pub receiver: Receiver<IdmPacket>,
|
||||
pub sender: Sender<IdmPacket>,
|
||||
task: JoinHandle<()>,
|
||||
#[async_trait::async_trait]
|
||||
pub trait IdmBackend: Send {
|
||||
async fn recv(&mut self) -> Result<IdmPacket>;
|
||||
async fn send(&mut self, packet: IdmPacket) -> Result<()>;
|
||||
}
|
||||
|
||||
impl Drop for IdmClient {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
}
|
||||
pub struct IdmFileBackend {
|
||||
read_fd: Arc<Mutex<AsyncFd<File>>>,
|
||||
write: Arc<Mutex<File>>,
|
||||
}
|
||||
|
||||
impl IdmClient {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<IdmClient> {
|
||||
let file = File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(path)
|
||||
.await?;
|
||||
IdmClient::set_raw_port(&file)?;
|
||||
let (rx_sender, rx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
||||
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
||||
let task = tokio::task::spawn(async move {
|
||||
if let Err(error) = IdmClient::process(file, rx_sender, tx_receiver).await {
|
||||
debug!("failed to handle idm client processing: {}", error);
|
||||
}
|
||||
});
|
||||
Ok(IdmClient {
|
||||
receiver: rx_receiver,
|
||||
sender: tx_sender,
|
||||
task,
|
||||
impl IdmFileBackend {
|
||||
pub async fn new(read_file: File, write_file: File) -> Result<IdmFileBackend> {
|
||||
IdmFileBackend::set_raw_port(&read_file)?;
|
||||
IdmFileBackend::set_raw_port(&write_file)?;
|
||||
Ok(IdmFileBackend {
|
||||
read_fd: Arc::new(Mutex::new(AsyncFd::new(read_file)?)),
|
||||
write: Arc::new(Mutex::new(write_file)),
|
||||
})
|
||||
}
|
||||
|
||||
@ -57,31 +63,199 @@ impl IdmClient {
|
||||
tcsetattr(file, SetArg::TCSANOW, &termios)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IdmBackend for IdmFileBackend {
|
||||
async fn recv(&mut self) -> Result<IdmPacket> {
|
||||
let mut fd = self.read_fd.lock().await;
|
||||
let mut guard = fd.readable_mut().await?;
|
||||
let size = guard.get_inner_mut().read_u32_le().await?;
|
||||
if size == 0 {
|
||||
return Ok(IdmPacket::default());
|
||||
}
|
||||
let mut buffer = vec![0u8; size as usize];
|
||||
guard.get_inner_mut().read_exact(&mut buffer).await?;
|
||||
match IdmPacket::decode(buffer.as_slice()) {
|
||||
Ok(packet) => Ok(packet),
|
||||
Err(error) => Err(anyhow!("received invalid idm packet: {}", error)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send(&mut self, packet: IdmPacket) -> Result<()> {
|
||||
let mut file = self.write.lock().await;
|
||||
let data = packet.encode_to_vec();
|
||||
file.write_u32_le(data.len() as u32).await?;
|
||||
file.write_all(&data).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IdmClient {
|
||||
request_backend_sender: broadcast::Sender<IdmRequest>,
|
||||
next_request_id: Arc<Mutex<u64>>,
|
||||
event_receiver_sender: broadcast::Sender<IdmEvent>,
|
||||
tx_sender: Sender<IdmPacket>,
|
||||
requests: RequestMap,
|
||||
task: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl Drop for IdmClient {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.task) <= 1 {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IdmClient {
|
||||
pub async fn new(backend: Box<dyn IdmBackend>) -> Result<IdmClient> {
|
||||
let requests = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
|
||||
let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
|
||||
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
||||
let backend_event_sender = event_sender.clone();
|
||||
let request_backend_sender = internal_request_backend_sender.clone();
|
||||
let requests_for_client = requests.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
if let Err(error) = IdmClient::process(
|
||||
backend,
|
||||
backend_event_sender,
|
||||
requests,
|
||||
internal_request_backend_sender,
|
||||
event_receiver,
|
||||
tx_receiver,
|
||||
)
|
||||
.await
|
||||
{
|
||||
debug!("failed to handle idm client processing: {}", error);
|
||||
}
|
||||
});
|
||||
Ok(IdmClient {
|
||||
next_request_id: Arc::new(Mutex::new(0)),
|
||||
event_receiver_sender: event_sender.clone(),
|
||||
request_backend_sender,
|
||||
requests: requests_for_client,
|
||||
tx_sender,
|
||||
task: Arc::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<IdmClient> {
|
||||
let read_file = File::options()
|
||||
.read(true)
|
||||
.write(false)
|
||||
.create(false)
|
||||
.open(&path)
|
||||
.await?;
|
||||
let write_file = File::options()
|
||||
.read(false)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(path)
|
||||
.await?;
|
||||
let backend = IdmFileBackend::new(read_file, write_file).await?;
|
||||
IdmClient::new(Box::new(backend) as Box<dyn IdmBackend>).await
|
||||
}
|
||||
|
||||
pub async fn emit(&self, event: IdmEvent) -> Result<()> {
|
||||
self.tx_sender
|
||||
.send(IdmPacket {
|
||||
content: Some(Content::Event(event)),
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn requests(&self) -> Result<broadcast::Receiver<IdmRequest>> {
|
||||
Ok(self.request_backend_sender.subscribe())
|
||||
}
|
||||
|
||||
pub async fn respond(&self, id: u64, response: Response) -> Result<()> {
|
||||
let packet = IdmPacket {
|
||||
content: Some(Content::Response(IdmResponse {
|
||||
id,
|
||||
response: Some(response),
|
||||
})),
|
||||
};
|
||||
self.tx_sender.send(packet).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn subscribe(&self) -> Result<broadcast::Receiver<IdmEvent>> {
|
||||
Ok(self.event_receiver_sender.subscribe())
|
||||
}
|
||||
|
||||
pub async fn send(&self, request: Request) -> Result<Response> {
|
||||
let (sender, receiver) = oneshot::channel::<IdmResponse>();
|
||||
let req = {
|
||||
let mut guard = self.next_request_id.lock().await;
|
||||
let req = *guard;
|
||||
*guard = req.wrapping_add(1);
|
||||
req
|
||||
};
|
||||
let mut requests = self.requests.lock().await;
|
||||
requests.insert(req, sender);
|
||||
drop(requests);
|
||||
let success = AtomicBool::new(false);
|
||||
let _guard = scopeguard::guard(self.requests.clone(), |requests| {
|
||||
if success.load(Ordering::Acquire) {
|
||||
return;
|
||||
}
|
||||
tokio::task::spawn(async move {
|
||||
let mut requests = requests.lock().await;
|
||||
requests.remove(&req);
|
||||
});
|
||||
});
|
||||
self.tx_sender
|
||||
.send(IdmPacket {
|
||||
content: Some(Content::Request(IdmRequest {
|
||||
id: req,
|
||||
request: Some(request),
|
||||
})),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let response = timeout(Duration::from_secs(IDM_REQUEST_TIMEOUT_SECS), receiver).await??;
|
||||
success.store(true, Ordering::Release);
|
||||
if let Some(response) = response.response {
|
||||
Ok(response)
|
||||
} else {
|
||||
Err(anyhow!("response did not contain any content"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn process(
|
||||
file: File,
|
||||
sender: Sender<IdmPacket>,
|
||||
mut backend: Box<dyn IdmBackend>,
|
||||
event_sender: broadcast::Sender<IdmEvent>,
|
||||
requests: RequestMap,
|
||||
request_backend_sender: broadcast::Sender<IdmRequest>,
|
||||
_event_receiver: broadcast::Receiver<IdmEvent>,
|
||||
mut receiver: Receiver<IdmPacket>,
|
||||
) -> Result<()> {
|
||||
let mut file = AsyncFd::new(file)?;
|
||||
loop {
|
||||
select! {
|
||||
x = file.readable_mut() => match x {
|
||||
Ok(mut guard) => {
|
||||
let size = guard.get_inner_mut().read_u16_le().await?;
|
||||
if size == 0 {
|
||||
continue;
|
||||
}
|
||||
let mut buffer = BytesMut::with_capacity(size as usize);
|
||||
guard.get_inner_mut().read_exact(&mut buffer).await?;
|
||||
match IdmPacket::decode(buffer) {
|
||||
Ok(packet) => {
|
||||
sender.send(packet).await?;
|
||||
x = backend.recv() => match x {
|
||||
Ok(packet) => {
|
||||
match packet.content {
|
||||
Some(Content::Event(event)) => {
|
||||
let _ = event_sender.send(event);
|
||||
},
|
||||
|
||||
Err(error) => {
|
||||
error!("received invalid idm packet: {}", error);
|
||||
}
|
||||
Some(Content::Request(request)) => {
|
||||
let _ = request_backend_sender.send(request);
|
||||
},
|
||||
|
||||
Some(Content::Response(response)) => {
|
||||
let mut requests = requests.lock().await;
|
||||
if let Some(sender) = requests.remove(&response.id) {
|
||||
drop(requests);
|
||||
let _ = sender.send(response);
|
||||
}
|
||||
},
|
||||
|
||||
_ => {},
|
||||
}
|
||||
},
|
||||
|
||||
@ -91,13 +265,12 @@ impl IdmClient {
|
||||
},
|
||||
x = receiver.recv() => match x {
|
||||
Some(packet) => {
|
||||
let data = packet.encode_to_vec();
|
||||
if data.len() > u16::MAX as usize {
|
||||
error!("unable to send idm packet, packet size exceeded (tried to send {} bytes)", data.len());
|
||||
let length = packet.encoded_len();
|
||||
if length > IDM_PACKET_MAX_SIZE {
|
||||
error!("unable to send idm packet, packet size exceeded (tried to send {} bytes)", length);
|
||||
continue;
|
||||
}
|
||||
file.get_mut().write_u16_le(data.len() as u16).await?;
|
||||
file.get_mut().write_all(&data).await?;
|
||||
backend.send(packet).await?;
|
||||
},
|
||||
|
||||
None => {
|
||||
|
@ -1 +1,89 @@
|
||||
use prost_types::{ListValue, Value};
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/krata.internal.idm.rs"));
|
||||
|
||||
pub trait AsIdmMetricValue {
|
||||
fn as_metric_value(&self) -> Value;
|
||||
}
|
||||
|
||||
impl IdmMetricNode {
|
||||
pub fn structural<N: AsRef<str>>(name: N, children: Vec<IdmMetricNode>) -> IdmMetricNode {
|
||||
IdmMetricNode {
|
||||
name: name.as_ref().to_string(),
|
||||
value: None,
|
||||
format: IdmMetricFormat::Unknown.into(),
|
||||
children,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn raw_value<N: AsRef<str>, V: AsIdmMetricValue>(name: N, value: V) -> IdmMetricNode {
|
||||
IdmMetricNode {
|
||||
name: name.as_ref().to_string(),
|
||||
value: Some(value.as_metric_value()),
|
||||
format: IdmMetricFormat::Unknown.into(),
|
||||
children: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn value<N: AsRef<str>, V: AsIdmMetricValue>(
|
||||
name: N,
|
||||
value: V,
|
||||
format: IdmMetricFormat,
|
||||
) -> IdmMetricNode {
|
||||
IdmMetricNode {
|
||||
name: name.as_ref().to_string(),
|
||||
value: Some(value.as_metric_value()),
|
||||
format: format.into(),
|
||||
children: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsIdmMetricValue for String {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
Value {
|
||||
kind: Some(prost_types::value::Kind::StringValue(self.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsIdmMetricValue for &str {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
Value {
|
||||
kind: Some(prost_types::value::Kind::StringValue(self.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsIdmMetricValue for u64 {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
numeric(*self as f64)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsIdmMetricValue for i64 {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
numeric(*self as f64)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsIdmMetricValue for f64 {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
numeric(*self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsIdmMetricValue> AsIdmMetricValue for Vec<T> {
|
||||
fn as_metric_value(&self) -> Value {
|
||||
let values = self.iter().map(|x| x.as_metric_value()).collect::<_>();
|
||||
Value {
|
||||
kind: Some(prost_types::value::Kind::ListValue(ListValue { values })),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn numeric(value: f64) -> Value {
|
||||
Value {
|
||||
kind: Some(prost_types::value::Kind::NumberValue(value)),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user