mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
krata: reimplement console to utilize channels, and provide logs support
This commit is contained in:
@ -13,6 +13,7 @@ anyhow = { workspace = true }
|
||||
async-stream = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
circular-buffer = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
147
crates/daemon/src/console.rs
Normal file
147
crates/daemon/src/console.rs
Normal file
@ -0,0 +1,147 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use circular_buffer::CircularBuffer;
|
||||
use kratart::channel::ChannelService;
|
||||
use log::error;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{error::TrySendError, Receiver, Sender},
|
||||
Mutex,
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
type RawConsoleBuffer = CircularBuffer<CONSOLE_BUFFER_SIZE, u8>;
|
||||
type ConsoleBuffer = Box<RawConsoleBuffer>;
|
||||
|
||||
type ListenerMap = Arc<Mutex<HashMap<u32, Vec<Sender<Vec<u8>>>>>>;
|
||||
type BufferMap = Arc<Mutex<HashMap<u32, ConsoleBuffer>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonConsoleHandle {
|
||||
listeners: ListenerMap,
|
||||
buffers: BufferMap,
|
||||
sender: Sender<(u32, Vec<u8>)>,
|
||||
task: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonConsoleAttachHandle {
|
||||
pub initial: Vec<u8>,
|
||||
listeners: ListenerMap,
|
||||
sender: Sender<(u32, Vec<u8>)>,
|
||||
domid: u32,
|
||||
}
|
||||
|
||||
impl DaemonConsoleAttachHandle {
|
||||
pub async fn unsubscribe(&self) -> Result<()> {
|
||||
let mut guard = self.listeners.lock().await;
|
||||
let _ = guard.remove(&self.domid);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send(&self, data: Vec<u8>) -> Result<()> {
|
||||
Ok(self.sender.send((self.domid, data)).await?)
|
||||
}
|
||||
}
|
||||
|
||||
impl DaemonConsoleHandle {
|
||||
pub async fn attach(
|
||||
&self,
|
||||
domid: u32,
|
||||
sender: Sender<Vec<u8>>,
|
||||
) -> Result<DaemonConsoleAttachHandle> {
|
||||
let buffers = self.buffers.lock().await;
|
||||
let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default();
|
||||
drop(buffers);
|
||||
let mut listeners = self.listeners.lock().await;
|
||||
let senders = listeners.entry(domid).or_default();
|
||||
senders.push(sender);
|
||||
Ok(DaemonConsoleAttachHandle {
|
||||
initial: buffer,
|
||||
sender: self.sender.clone(),
|
||||
listeners: self.listeners.clone(),
|
||||
domid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DaemonConsoleHandle {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.task) <= 1 {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DaemonConsole {
|
||||
listeners: ListenerMap,
|
||||
buffers: BufferMap,
|
||||
receiver: Receiver<(u32, Vec<u8>)>,
|
||||
sender: Sender<(u32, Vec<u8>)>,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl DaemonConsole {
|
||||
pub async fn new() -> Result<DaemonConsole> {
|
||||
let (service, sender, receiver) =
|
||||
ChannelService::new("krata-console".to_string(), Some(0)).await?;
|
||||
let task = service.launch().await?;
|
||||
let listeners = Arc::new(Mutex::new(HashMap::new()));
|
||||
let buffers = Arc::new(Mutex::new(HashMap::new()));
|
||||
Ok(DaemonConsole {
|
||||
listeners,
|
||||
buffers,
|
||||
receiver,
|
||||
sender,
|
||||
task,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<DaemonConsoleHandle> {
|
||||
let listeners = self.listeners.clone();
|
||||
let buffers = self.buffers.clone();
|
||||
let sender = self.sender.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
if let Err(error) = self.process().await {
|
||||
error!("failed to process console: {}", error);
|
||||
}
|
||||
});
|
||||
Ok(DaemonConsoleHandle {
|
||||
listeners,
|
||||
buffers,
|
||||
sender,
|
||||
task: Arc::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
async fn process(&mut self) -> Result<()> {
|
||||
loop {
|
||||
let Some((domid, data)) = self.receiver.recv().await else {
|
||||
break;
|
||||
};
|
||||
|
||||
let mut buffers = self.buffers.lock().await;
|
||||
let buffer = buffers
|
||||
.entry(domid)
|
||||
.or_insert_with_key(|_| RawConsoleBuffer::boxed());
|
||||
buffer.extend_from_slice(&data);
|
||||
drop(buffers);
|
||||
let mut listeners = self.listeners.lock().await;
|
||||
if let Some(senders) = listeners.get_mut(&domid) {
|
||||
senders.retain(|sender| {
|
||||
!matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_)))
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DaemonConsole {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
use std::{io, pin::Pin, str::FromStr};
|
||||
use std::{pin::Pin, str::FromStr};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
@ -11,17 +11,15 @@ use krata::v1::{
|
||||
WatchEventsReply, WatchEventsRequest,
|
||||
},
|
||||
};
|
||||
use kratart::Runtime;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
select,
|
||||
sync::mpsc::Sender,
|
||||
sync::mpsc::{channel, Sender},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{db::GuestStore, event::DaemonEventContext};
|
||||
use crate::{console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext};
|
||||
|
||||
pub struct ApiError {
|
||||
message: String,
|
||||
@ -44,7 +42,7 @@ impl From<ApiError> for Status {
|
||||
#[derive(Clone)]
|
||||
pub struct RuntimeControlService {
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
console: DaemonConsoleHandle,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
}
|
||||
@ -52,13 +50,13 @@ pub struct RuntimeControlService {
|
||||
impl RuntimeControlService {
|
||||
pub fn new(
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
console: DaemonConsoleHandle,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
) -> Self {
|
||||
Self {
|
||||
events,
|
||||
runtime,
|
||||
console,
|
||||
guests,
|
||||
guest_reconciler_notify,
|
||||
}
|
||||
@ -66,7 +64,7 @@ impl RuntimeControlService {
|
||||
}
|
||||
|
||||
enum ConsoleDataSelect {
|
||||
Read(io::Result<usize>),
|
||||
Read(Option<Vec<u8>>),
|
||||
Write(Option<Result<ConsoleDataRequest, tonic::Status>>),
|
||||
}
|
||||
|
||||
@ -200,27 +198,64 @@ impl ControlService for RuntimeControlService {
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let mut console = self.runtime.console(uuid).await.map_err(ApiError::from)?;
|
||||
let guest = self
|
||||
.guests
|
||||
.read(uuid)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?
|
||||
.ok_or_else(|| ApiError {
|
||||
message: "guest did not exist in the database".to_string(),
|
||||
})?;
|
||||
|
||||
let Some(ref state) = guest.state else {
|
||||
return Err(ApiError {
|
||||
message: "guest did not have state".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
|
||||
let domid = state.domid;
|
||||
if domid == 0 {
|
||||
return Err(ApiError {
|
||||
message: "invalid domid on the guest".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let (sender, mut receiver) = channel(100);
|
||||
let console = self
|
||||
.console
|
||||
.attach(domid, sender)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: format!("failed to attach to console: {}", error),
|
||||
})?;
|
||||
|
||||
let output = try_stream! {
|
||||
let mut buffer: Vec<u8> = vec![0u8; 256];
|
||||
yield ConsoleDataReply { data: console.initial.clone(), };
|
||||
loop {
|
||||
let what = select! {
|
||||
x = console.read_handle.read(&mut buffer) => ConsoleDataSelect::Read(x),
|
||||
x = receiver.recv() => ConsoleDataSelect::Read(x),
|
||||
x = input.next() => ConsoleDataSelect::Write(x),
|
||||
};
|
||||
|
||||
match what {
|
||||
ConsoleDataSelect::Read(result) => {
|
||||
let size = result?;
|
||||
let data = buffer[0..size].to_vec();
|
||||
ConsoleDataSelect::Read(Some(data)) => {
|
||||
yield ConsoleDataReply { data, };
|
||||
},
|
||||
|
||||
ConsoleDataSelect::Read(None) => {
|
||||
break;
|
||||
}
|
||||
|
||||
ConsoleDataSelect::Write(Some(request)) => {
|
||||
let request = request?;
|
||||
if !request.data.is_empty() {
|
||||
console.write_handle.write_all(&request.data).await?;
|
||||
console.send(request.data).await.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -67,7 +67,7 @@ pub struct DaemonIdm {
|
||||
|
||||
impl DaemonIdm {
|
||||
pub async fn new() -> Result<DaemonIdm> {
|
||||
let (service, receiver) = ChannelService::new("krata-channel".to_string()).await?;
|
||||
let (service, _, receiver) = ChannelService::new("krata-channel".to_string(), None).await?;
|
||||
let task = service.launch().await?;
|
||||
let listeners = Arc::new(Mutex::new(HashMap::new()));
|
||||
Ok(DaemonIdm {
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::{net::SocketAddr, path::PathBuf, str::FromStr};
|
||||
|
||||
use anyhow::Result;
|
||||
use console::{DaemonConsole, DaemonConsoleHandle};
|
||||
use control::RuntimeControlService;
|
||||
use db::GuestStore;
|
||||
use event::{DaemonEventContext, DaemonEventGenerator};
|
||||
@ -18,6 +19,7 @@ use tokio_stream::wrappers::UnixListenerStream;
|
||||
use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod console;
|
||||
pub mod control;
|
||||
pub mod db;
|
||||
pub mod event;
|
||||
@ -26,13 +28,13 @@ pub mod reconcile;
|
||||
|
||||
pub struct Daemon {
|
||||
store: String,
|
||||
runtime: Runtime,
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
guest_reconciler_task: JoinHandle<()>,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
generator_task: JoinHandle<()>,
|
||||
_idm: DaemonIdmHandle,
|
||||
console: DaemonConsoleHandle,
|
||||
}
|
||||
|
||||
const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;
|
||||
@ -45,6 +47,8 @@ impl Daemon {
|
||||
channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN);
|
||||
let idm = DaemonIdm::new().await?;
|
||||
let idm = idm.launch().await?;
|
||||
let console = DaemonConsole::new().await?;
|
||||
let console = console.launch().await?;
|
||||
let (events, generator) =
|
||||
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
|
||||
.await?;
|
||||
@ -60,20 +64,20 @@ impl Daemon {
|
||||
let generator_task = generator.launch().await?;
|
||||
Ok(Self {
|
||||
store,
|
||||
runtime,
|
||||
guests,
|
||||
events,
|
||||
guest_reconciler_task,
|
||||
guest_reconciler_notify,
|
||||
generator_task,
|
||||
_idm: idm,
|
||||
console,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
|
||||
let control_service = RuntimeControlService::new(
|
||||
self.events.clone(),
|
||||
self.runtime.clone(),
|
||||
self.console.clone(),
|
||||
self.guests.clone(),
|
||||
self.guest_reconciler_notify.clone(),
|
||||
);
|
||||
|
Reference in New Issue
Block a user