mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
feat: basic kratactl top command (#72)
* feat: basic kratactl top command * fix: use magic bytes 0xff 0xff in idm to improve reliability
This commit is contained in:
@ -13,7 +13,7 @@ anyhow = { workspace = true }
|
||||
async-stream = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
comfy-table = { workspace = true }
|
||||
crossterm = { workspace = true }
|
||||
crossterm = { workspace = true, features = ["event-stream"] }
|
||||
ctrlc = { workspace = true, features = ["termination"] }
|
||||
env_logger = { workspace = true }
|
||||
fancy-duration = { workspace = true }
|
||||
@ -23,6 +23,7 @@ krata = { path = "../krata", version = "^0.0.8" }
|
||||
log = { workspace = true }
|
||||
prost-reflect = { workspace = true, features = ["serde"] }
|
||||
prost-types = { workspace = true }
|
||||
ratatui = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
termtree = { workspace = true }
|
||||
|
@ -6,6 +6,7 @@ pub mod list;
|
||||
pub mod logs;
|
||||
pub mod metrics;
|
||||
pub mod resolve;
|
||||
pub mod top;
|
||||
pub mod watch;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
@ -20,7 +21,7 @@ use tonic::{transport::Channel, Request};
|
||||
use self::{
|
||||
attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand,
|
||||
launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand,
|
||||
resolve::ResolveCommand, watch::WatchCommand,
|
||||
resolve::ResolveCommand, top::TopCommand, watch::WatchCommand,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
@ -52,6 +53,7 @@ pub enum Commands {
|
||||
Resolve(ResolveCommand),
|
||||
Metrics(MetricsCommand),
|
||||
IdmSnoop(IdmSnoopCommand),
|
||||
Top(TopCommand),
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
@ -95,6 +97,10 @@ impl ControlCommand {
|
||||
Commands::IdmSnoop(snoop) => {
|
||||
snoop.run(client, events).await?;
|
||||
}
|
||||
|
||||
Commands::Top(top) => {
|
||||
top.run(client, events).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
215
crates/ctl/src/cli/top.rs
Normal file
215
crates/ctl/src/cli/top.rs
Normal file
@ -0,0 +1,215 @@
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use krata::{events::EventStream, v1::control::control_service_client::ControlServiceClient};
|
||||
use std::{
|
||||
io::{self, stdout, Stdout},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::select;
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crossterm::{
|
||||
event::{Event, KeyCode, KeyEvent, KeyEventKind},
|
||||
execute,
|
||||
terminal::*,
|
||||
};
|
||||
use ratatui::{
|
||||
prelude::*,
|
||||
symbols::border,
|
||||
widgets::{
|
||||
block::{Position, Title},
|
||||
Block, Borders, Row, Table, TableState,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
format::guest_status_text,
|
||||
metrics::{
|
||||
lookup_metric_value, MultiMetricCollector, MultiMetricCollectorHandle, MultiMetricState,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about = "Dashboard for running guests")]
|
||||
pub struct TopCommand {}
|
||||
|
||||
pub type Tui = Terminal<CrosstermBackend<Stdout>>;
|
||||
|
||||
impl TopCommand {
|
||||
pub async fn run(
|
||||
self,
|
||||
client: ControlServiceClient<Channel>,
|
||||
events: EventStream,
|
||||
) -> Result<()> {
|
||||
let collector = MultiMetricCollector::new(client, events, Duration::from_millis(200))?;
|
||||
let collector = collector.launch().await?;
|
||||
let mut tui = TopCommand::init()?;
|
||||
let mut app = TopApp {
|
||||
metrics: MultiMetricState { guests: vec![] },
|
||||
exit: false,
|
||||
table: TableState::new(),
|
||||
};
|
||||
app.run(collector, &mut tui).await?;
|
||||
TopCommand::restore()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init() -> io::Result<Tui> {
|
||||
execute!(stdout(), EnterAlternateScreen)?;
|
||||
enable_raw_mode()?;
|
||||
Terminal::new(CrosstermBackend::new(stdout()))
|
||||
}
|
||||
|
||||
pub fn restore() -> io::Result<()> {
|
||||
execute!(stdout(), LeaveAlternateScreen)?;
|
||||
disable_raw_mode()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TopApp {
|
||||
table: TableState,
|
||||
metrics: MultiMetricState,
|
||||
exit: bool,
|
||||
}
|
||||
|
||||
impl TopApp {
|
||||
pub async fn run(
|
||||
&mut self,
|
||||
mut collector: MultiMetricCollectorHandle,
|
||||
terminal: &mut Tui,
|
||||
) -> Result<()> {
|
||||
let mut events = crossterm::event::EventStream::new();
|
||||
|
||||
while !self.exit {
|
||||
terminal.draw(|frame| self.render_frame(frame))?;
|
||||
|
||||
select! {
|
||||
x = collector.receiver.recv() => match x {
|
||||
Some(state) => {
|
||||
self.metrics = state;
|
||||
},
|
||||
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
},
|
||||
|
||||
x = events.next() => match x {
|
||||
Some(event) => {
|
||||
let event = event?;
|
||||
self.handle_event(event)?;
|
||||
},
|
||||
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn render_frame(&mut self, frame: &mut Frame) {
|
||||
frame.render_widget(self, frame.size());
|
||||
}
|
||||
|
||||
fn handle_event(&mut self, event: Event) -> io::Result<()> {
|
||||
match event {
|
||||
Event::Key(key_event) if key_event.kind == KeyEventKind::Press => {
|
||||
self.handle_key_event(key_event)
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exit(&mut self) {
|
||||
self.exit = true;
|
||||
}
|
||||
|
||||
fn handle_key_event(&mut self, key_event: KeyEvent) {
|
||||
if let KeyCode::Char('q') = key_event.code {
|
||||
self.exit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Widget for &mut TopApp {
|
||||
fn render(self, area: Rect, buf: &mut Buffer) {
|
||||
let title = Title::from(" krata hypervisor ".bold());
|
||||
let instructions = Title::from(vec![" Quit ".into(), "<Q> ".blue().bold()]);
|
||||
let block = Block::default()
|
||||
.title(title.alignment(Alignment::Center))
|
||||
.title(
|
||||
instructions
|
||||
.alignment(Alignment::Center)
|
||||
.position(Position::Bottom),
|
||||
)
|
||||
.borders(Borders::ALL)
|
||||
.border_set(border::THICK);
|
||||
|
||||
let mut rows = vec![];
|
||||
|
||||
for ms in &self.metrics.guests {
|
||||
let Some(ref spec) = ms.guest.spec else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(ref state) = ms.guest.state else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let memory_total = ms
|
||||
.root
|
||||
.as_ref()
|
||||
.and_then(|root| lookup_metric_value(root, "system/memory/total"));
|
||||
let memory_used = ms
|
||||
.root
|
||||
.as_ref()
|
||||
.and_then(|root| lookup_metric_value(root, "system/memory/used"));
|
||||
let memory_free = ms
|
||||
.root
|
||||
.as_ref()
|
||||
.and_then(|root| lookup_metric_value(root, "system/memory/free"));
|
||||
|
||||
let row = Row::new(vec![
|
||||
spec.name.clone(),
|
||||
ms.guest.id.clone(),
|
||||
guest_status_text(state.status()),
|
||||
memory_total.unwrap_or_default(),
|
||||
memory_used.unwrap_or_default(),
|
||||
memory_free.unwrap_or_default(),
|
||||
]);
|
||||
rows.push(row);
|
||||
}
|
||||
|
||||
let widths = [
|
||||
Constraint::Min(8),
|
||||
Constraint::Min(8),
|
||||
Constraint::Min(8),
|
||||
Constraint::Min(8),
|
||||
Constraint::Min(8),
|
||||
Constraint::Min(8),
|
||||
];
|
||||
|
||||
let table = Table::new(rows, widths)
|
||||
.header(
|
||||
Row::new(vec![
|
||||
"name",
|
||||
"id",
|
||||
"status",
|
||||
"total memory",
|
||||
"used memory",
|
||||
"free memory",
|
||||
])
|
||||
.style(Style::new().bold())
|
||||
.bottom_margin(1),
|
||||
)
|
||||
.column_spacing(1)
|
||||
.block(block);
|
||||
|
||||
StatefulWidget::render(table, area, buf, &mut self.table);
|
||||
}
|
||||
}
|
@ -121,7 +121,7 @@ fn metrics_value_numeric(value: Value) -> f64 {
|
||||
string.parse::<f64>().ok().unwrap_or(f64::NAN)
|
||||
}
|
||||
|
||||
fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String {
|
||||
pub fn metrics_value_pretty(value: Value, format: GuestMetricFormat) -> String {
|
||||
match format {
|
||||
GuestMetricFormat::Bytes => human_bytes(metrics_value_numeric(value)),
|
||||
GuestMetricFormat::Integer => (metrics_value_numeric(value) as u64).to_string(),
|
||||
|
@ -1,3 +1,4 @@
|
||||
pub mod cli;
|
||||
pub mod console;
|
||||
pub mod format;
|
||||
pub mod metrics;
|
||||
|
159
crates/ctl/src/metrics.rs
Normal file
159
crates/ctl/src/metrics.rs
Normal file
@ -0,0 +1,159 @@
|
||||
use anyhow::Result;
|
||||
use krata::{
|
||||
events::EventStream,
|
||||
v1::{
|
||||
common::{Guest, GuestMetricNode, GuestStatus},
|
||||
control::{
|
||||
control_service_client::ControlServiceClient, watch_events_reply::Event,
|
||||
ListGuestsRequest, ReadGuestMetricsRequest,
|
||||
},
|
||||
},
|
||||
};
|
||||
use log::error;
|
||||
use std::time::Duration;
|
||||
use tokio::{
|
||||
select,
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::format::metrics_value_pretty;
|
||||
|
||||
pub struct MetricState {
|
||||
pub guest: Guest,
|
||||
pub root: Option<GuestMetricNode>,
|
||||
}
|
||||
|
||||
pub struct MultiMetricState {
|
||||
pub guests: Vec<MetricState>,
|
||||
}
|
||||
|
||||
pub struct MultiMetricCollector {
|
||||
client: ControlServiceClient<Channel>,
|
||||
events: EventStream,
|
||||
period: Duration,
|
||||
}
|
||||
|
||||
pub struct MultiMetricCollectorHandle {
|
||||
pub receiver: Receiver<MultiMetricState>,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Drop for MultiMetricCollectorHandle {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl MultiMetricCollector {
|
||||
pub fn new(
|
||||
client: ControlServiceClient<Channel>,
|
||||
events: EventStream,
|
||||
period: Duration,
|
||||
) -> Result<MultiMetricCollector> {
|
||||
Ok(MultiMetricCollector {
|
||||
client,
|
||||
events,
|
||||
period,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<MultiMetricCollectorHandle> {
|
||||
let (sender, receiver) = channel::<MultiMetricState>(100);
|
||||
let task = tokio::task::spawn(async move {
|
||||
if let Err(error) = self.process(sender).await {
|
||||
error!("failed to process multi metric collector: {}", error);
|
||||
}
|
||||
});
|
||||
Ok(MultiMetricCollectorHandle { receiver, task })
|
||||
}
|
||||
|
||||
pub async fn process(&mut self, sender: Sender<MultiMetricState>) -> Result<()> {
|
||||
let mut events = self.events.subscribe();
|
||||
let mut guests: Vec<Guest> = self
|
||||
.client
|
||||
.list_guests(ListGuestsRequest {})
|
||||
.await?
|
||||
.into_inner()
|
||||
.guests;
|
||||
loop {
|
||||
let collect = select! {
|
||||
x = events.recv() => match x {
|
||||
Ok(event) => {
|
||||
if let Event::GuestChanged(changed) = event {
|
||||
let Some(guest) = changed.guest else {
|
||||
continue;
|
||||
};
|
||||
let Some(ref state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
guests.retain(|x| x.id != guest.id);
|
||||
if state.status() != GuestStatus::Destroying {
|
||||
guests.push(guest);
|
||||
}
|
||||
}
|
||||
false
|
||||
},
|
||||
|
||||
Err(error) => {
|
||||
return Err(error.into());
|
||||
}
|
||||
},
|
||||
|
||||
_ = sleep(self.period) => {
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if !collect {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
for guest in &guests {
|
||||
let Some(ref state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if state.status() != GuestStatus::Started {
|
||||
continue;
|
||||
}
|
||||
|
||||
let root = timeout(
|
||||
Duration::from_secs(5),
|
||||
self.client.read_guest_metrics(ReadGuestMetricsRequest {
|
||||
guest_id: guest.id.clone(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|x| x.ok())
|
||||
.map(|x| x.into_inner())
|
||||
.and_then(|x| x.root);
|
||||
metrics.push(MetricState {
|
||||
guest: guest.clone(),
|
||||
root,
|
||||
});
|
||||
}
|
||||
sender.send(MultiMetricState { guests: metrics }).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lookup<'a>(node: &'a GuestMetricNode, path: &str) -> Option<&'a GuestMetricNode> {
|
||||
let Some((what, b)) = path.split_once('/') else {
|
||||
return node.children.iter().find(|x| x.name == path);
|
||||
};
|
||||
let next = node.children.iter().find(|x| x.name == what)?;
|
||||
return lookup(next, b);
|
||||
}
|
||||
|
||||
pub fn lookup_metric_value(node: &GuestMetricNode, path: &str) -> Option<String> {
|
||||
lookup(node, path).and_then(|x| {
|
||||
x.value
|
||||
.as_ref()
|
||||
.map(|v| metrics_value_pretty(v.clone(), x.format()))
|
||||
})
|
||||
}
|
@ -120,16 +120,22 @@ impl DaemonIdm {
|
||||
if let Some(data) = data {
|
||||
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
|
||||
buffer.extend_from_slice(&data);
|
||||
if buffer.len() < 4 {
|
||||
if buffer.len() < 6 {
|
||||
continue;
|
||||
}
|
||||
let size = (buffer[0] as u32 | (buffer[1] as u32) << 8 | (buffer[2] as u32) << 16 | (buffer[3] as u32) << 24) as usize;
|
||||
let needed = size + 4;
|
||||
|
||||
if buffer[0] != 0xff || buffer[1] != 0xff {
|
||||
buffer.clear();
|
||||
continue;
|
||||
}
|
||||
|
||||
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
|
||||
let needed = size + 6;
|
||||
if buffer.len() < needed {
|
||||
continue;
|
||||
}
|
||||
let mut packet = buffer.split_to(needed);
|
||||
packet.advance(4);
|
||||
packet.advance(6);
|
||||
match IdmPacket::decode(packet) {
|
||||
Ok(packet) => {
|
||||
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
|
||||
@ -159,12 +165,14 @@ impl DaemonIdm {
|
||||
x = self.tx_receiver.recv() => match x {
|
||||
Some((domid, packet)) => {
|
||||
let data = packet.encode_to_vec();
|
||||
let mut buffer = vec![0u8; 4];
|
||||
let mut buffer = vec![0u8; 6];
|
||||
let length = data.len() as u32;
|
||||
buffer[0] = length as u8;
|
||||
buffer[1] = (length << 8) as u8;
|
||||
buffer[2] = (length << 16) as u8;
|
||||
buffer[3] = (length << 24) as u8;
|
||||
buffer[0] = 0xff;
|
||||
buffer[1] = 0xff;
|
||||
buffer[2] = length as u8;
|
||||
buffer[3] = (length << 8) as u8;
|
||||
buffer[4] = (length << 16) as u8;
|
||||
buffer[5] = (length << 24) as u8;
|
||||
buffer.extend_from_slice(&data);
|
||||
self.tx_raw_sender.send((domid, buffer)).await?;
|
||||
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet });
|
||||
|
@ -69,6 +69,14 @@ impl IdmBackend for IdmFileBackend {
|
||||
async fn recv(&mut self) -> Result<IdmPacket> {
|
||||
let mut fd = self.read_fd.lock().await;
|
||||
let mut guard = fd.readable_mut().await?;
|
||||
let b1 = guard.get_inner_mut().read_u8().await?;
|
||||
if b1 != 0xff {
|
||||
return Ok(IdmPacket::default());
|
||||
}
|
||||
let b2 = guard.get_inner_mut().read_u8().await?;
|
||||
if b2 != 0xff {
|
||||
return Ok(IdmPacket::default());
|
||||
}
|
||||
let size = guard.get_inner_mut().read_u32_le().await?;
|
||||
if size == 0 {
|
||||
return Ok(IdmPacket::default());
|
||||
@ -84,6 +92,7 @@ impl IdmBackend for IdmFileBackend {
|
||||
async fn send(&mut self, packet: IdmPacket) -> Result<()> {
|
||||
let mut file = self.write.lock().await;
|
||||
let data = packet.encode_to_vec();
|
||||
file.write_all(&[0xff, 0xff]).await?;
|
||||
file.write_u32_le(data.len() as u32).await?;
|
||||
file.write_all(&data).await?;
|
||||
Ok(())
|
||||
|
@ -448,6 +448,10 @@ impl KrataChannelBackendProcessor {
|
||||
error!("channel for domid {} has an invalid input space of {}", self.domid, space);
|
||||
}
|
||||
let free = XenConsoleInterface::INPUT_SIZE.wrapping_sub(space);
|
||||
if free == 0 {
|
||||
sleep(Duration::from_micros(100)).await;
|
||||
continue;
|
||||
}
|
||||
let want = data.len().min(free);
|
||||
let buffer = &data[index..want];
|
||||
for b in buffer {
|
||||
|
Reference in New Issue
Block a user