diff --git a/Cargo.lock b/Cargo.lock index 539e96d..bf12917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "circular-buffer" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da987586004ae7c43b7df5e3f7693775068522e1086f8d9b2d74c778a0f43313" + [[package]] name = "clang-sys" version = "1.7.0" @@ -1293,6 +1299,7 @@ dependencies = [ "async-stream", "async-trait", "bytes", + "circular-buffer", "clap", "env_logger", "futures", diff --git a/Cargo.toml b/Cargo.toml index d2cd2dc..fd2fd0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ backhand = "0.15.0" byteorder = "1" bytes = "1.5.0" cgroups-rs = "0.3.4" +circular-buffer = "0.1.7" comfy-table = "7.1.0" crossterm = "0.27.0" ctrlc = "3.4.4" diff --git a/crates/ctl/src/cli/logs.rs b/crates/ctl/src/cli/logs.rs new file mode 100644 index 0000000..17b9e9f --- /dev/null +++ b/crates/ctl/src/cli/logs.rs @@ -0,0 +1,57 @@ +use anyhow::Result; +use async_stream::stream; +use clap::Parser; +use krata::{ + events::EventStream, + v1::control::{control_service_client::ControlServiceClient, ConsoleDataRequest}, +}; + +use tokio::select; +use tokio_stream::{pending, StreamExt}; +use tonic::transport::Channel; + +use crate::console::StdioConsoleStream; + +use super::resolve_guest; + +#[derive(Parser)] +pub struct LogsCommand { + #[arg()] + guest: String, + #[arg(short, long)] + follow: bool, +} + +impl LogsCommand { + pub async fn run( + self, + mut client: ControlServiceClient, + events: EventStream, + ) -> Result<()> { + let guest_id: String = resolve_guest(&mut client, &self.guest).await?; + let guest_id_stream = guest_id.clone(); + let follow = self.follow; + let input = stream! { + yield ConsoleDataRequest { guest_id: guest_id_stream, data: Vec::new() }; + if follow { + let mut pending = pending::(); + while let Some(x) = pending.next().await { + yield x; + } + } + }; + let output = client.console_data(input).await?.into_inner(); + let stdout_handle = + tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await }); + let exit_hook_task = StdioConsoleStream::guest_exit_hook(guest_id.clone(), events).await?; + let code = select! { + x = stdout_handle => { + x??; + None + }, + x = exit_hook_task => x? + }; + StdioConsoleStream::restore_terminal_mode(); + std::process::exit(code.unwrap_or(0)); + } +} diff --git a/crates/ctl/src/cli/mod.rs b/crates/ctl/src/cli/mod.rs index 7c9fdde..26fea9e 100644 --- a/crates/ctl/src/cli/mod.rs +++ b/crates/ctl/src/cli/mod.rs @@ -2,6 +2,7 @@ pub mod attach; pub mod destroy; pub mod launch; pub mod list; +pub mod logs; pub mod resolve; pub mod watch; @@ -16,7 +17,7 @@ use tonic::{transport::Channel, Request}; use self::{ attach::AttachCommand, destroy::DestroyCommand, launch::LauchCommand, list::ListCommand, - resolve::ResolveCommand, watch::WatchCommand, + logs::LogsCommand, resolve::ResolveCommand, watch::WatchCommand, }; #[derive(Parser)] @@ -35,6 +36,7 @@ pub enum Commands { Destroy(DestroyCommand), List(ListCommand), Attach(AttachCommand), + Logs(LogsCommand), Watch(WatchCommand), Resolve(ResolveCommand), } @@ -57,6 +59,10 @@ impl ControlCommand { attach.run(client, events).await?; } + Commands::Logs(logs) => { + logs.run(client, events).await?; + } + Commands::List(list) => { list.run(client, events).await?; } diff --git a/crates/daemon/Cargo.toml b/crates/daemon/Cargo.toml index bc1770f..2d3a3d6 100644 --- a/crates/daemon/Cargo.toml +++ b/crates/daemon/Cargo.toml @@ -13,6 +13,7 @@ anyhow = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +circular-buffer = { workspace = true } clap = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/crates/daemon/src/console.rs b/crates/daemon/src/console.rs new file mode 100644 index 0000000..a6e57bf --- /dev/null +++ b/crates/daemon/src/console.rs @@ -0,0 +1,147 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Result; +use circular_buffer::CircularBuffer; +use kratart::channel::ChannelService; +use log::error; +use tokio::{ + sync::{ + mpsc::{error::TrySendError, Receiver, Sender}, + Mutex, + }, + task::JoinHandle, +}; + +const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024; +type RawConsoleBuffer = CircularBuffer; +type ConsoleBuffer = Box; + +type ListenerMap = Arc>>>>>; +type BufferMap = Arc>>; + +#[derive(Clone)] +pub struct DaemonConsoleHandle { + listeners: ListenerMap, + buffers: BufferMap, + sender: Sender<(u32, Vec)>, + task: Arc>, +} + +#[derive(Clone)] +pub struct DaemonConsoleAttachHandle { + pub initial: Vec, + listeners: ListenerMap, + sender: Sender<(u32, Vec)>, + domid: u32, +} + +impl DaemonConsoleAttachHandle { + pub async fn unsubscribe(&self) -> Result<()> { + let mut guard = self.listeners.lock().await; + let _ = guard.remove(&self.domid); + Ok(()) + } + + pub async fn send(&self, data: Vec) -> Result<()> { + Ok(self.sender.send((self.domid, data)).await?) + } +} + +impl DaemonConsoleHandle { + pub async fn attach( + &self, + domid: u32, + sender: Sender>, + ) -> Result { + let buffers = self.buffers.lock().await; + let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default(); + drop(buffers); + let mut listeners = self.listeners.lock().await; + let senders = listeners.entry(domid).or_default(); + senders.push(sender); + Ok(DaemonConsoleAttachHandle { + initial: buffer, + sender: self.sender.clone(), + listeners: self.listeners.clone(), + domid, + }) + } +} + +impl Drop for DaemonConsoleHandle { + fn drop(&mut self) { + if Arc::strong_count(&self.task) <= 1 { + self.task.abort(); + } + } +} + +pub struct DaemonConsole { + listeners: ListenerMap, + buffers: BufferMap, + receiver: Receiver<(u32, Vec)>, + sender: Sender<(u32, Vec)>, + task: JoinHandle<()>, +} + +impl DaemonConsole { + pub async fn new() -> Result { + let (service, sender, receiver) = + ChannelService::new("krata-console".to_string(), Some(0)).await?; + let task = service.launch().await?; + let listeners = Arc::new(Mutex::new(HashMap::new())); + let buffers = Arc::new(Mutex::new(HashMap::new())); + Ok(DaemonConsole { + listeners, + buffers, + receiver, + sender, + task, + }) + } + + pub async fn launch(mut self) -> Result { + let listeners = self.listeners.clone(); + let buffers = self.buffers.clone(); + let sender = self.sender.clone(); + let task = tokio::task::spawn(async move { + if let Err(error) = self.process().await { + error!("failed to process console: {}", error); + } + }); + Ok(DaemonConsoleHandle { + listeners, + buffers, + sender, + task: Arc::new(task), + }) + } + + async fn process(&mut self) -> Result<()> { + loop { + let Some((domid, data)) = self.receiver.recv().await else { + break; + }; + + let mut buffers = self.buffers.lock().await; + let buffer = buffers + .entry(domid) + .or_insert_with_key(|_| RawConsoleBuffer::boxed()); + buffer.extend_from_slice(&data); + drop(buffers); + let mut listeners = self.listeners.lock().await; + if let Some(senders) = listeners.get_mut(&domid) { + senders.retain(|sender| { + !matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_))) + }); + } + } + Ok(()) + } +} + +impl Drop for DaemonConsole { + fn drop(&mut self) { + self.task.abort(); + } +} diff --git a/crates/daemon/src/control.rs b/crates/daemon/src/control.rs index 1df740d..3883f83 100644 --- a/crates/daemon/src/control.rs +++ b/crates/daemon/src/control.rs @@ -1,4 +1,4 @@ -use std::{io, pin::Pin, str::FromStr}; +use std::{pin::Pin, str::FromStr}; use async_stream::try_stream; use futures::Stream; @@ -11,17 +11,15 @@ use krata::v1::{ WatchEventsReply, WatchEventsRequest, }, }; -use kratart::Runtime; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, select, - sync::mpsc::Sender, + sync::mpsc::{channel, Sender}, }; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use uuid::Uuid; -use crate::{db::GuestStore, event::DaemonEventContext}; +use crate::{console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext}; pub struct ApiError { message: String, @@ -44,7 +42,7 @@ impl From for Status { #[derive(Clone)] pub struct RuntimeControlService { events: DaemonEventContext, - runtime: Runtime, + console: DaemonConsoleHandle, guests: GuestStore, guest_reconciler_notify: Sender, } @@ -52,13 +50,13 @@ pub struct RuntimeControlService { impl RuntimeControlService { pub fn new( events: DaemonEventContext, - runtime: Runtime, + console: DaemonConsoleHandle, guests: GuestStore, guest_reconciler_notify: Sender, ) -> Self { Self { events, - runtime, + console, guests, guest_reconciler_notify, } @@ -66,7 +64,7 @@ impl RuntimeControlService { } enum ConsoleDataSelect { - Read(io::Result), + Read(Option>), Write(Option>), } @@ -200,27 +198,64 @@ impl ControlService for RuntimeControlService { let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { message: error.to_string(), })?; - let mut console = self.runtime.console(uuid).await.map_err(ApiError::from)?; + let guest = self + .guests + .read(uuid) + .await + .map_err(|error| ApiError { + message: error.to_string(), + })? + .ok_or_else(|| ApiError { + message: "guest did not exist in the database".to_string(), + })?; + + let Some(ref state) = guest.state else { + return Err(ApiError { + message: "guest did not have state".to_string(), + } + .into()); + }; + + let domid = state.domid; + if domid == 0 { + return Err(ApiError { + message: "invalid domid on the guest".to_string(), + } + .into()); + } + + let (sender, mut receiver) = channel(100); + let console = self + .console + .attach(domid, sender) + .await + .map_err(|error| ApiError { + message: format!("failed to attach to console: {}", error), + })?; let output = try_stream! { - let mut buffer: Vec = vec![0u8; 256]; + yield ConsoleDataReply { data: console.initial.clone(), }; loop { let what = select! { - x = console.read_handle.read(&mut buffer) => ConsoleDataSelect::Read(x), + x = receiver.recv() => ConsoleDataSelect::Read(x), x = input.next() => ConsoleDataSelect::Write(x), }; match what { - ConsoleDataSelect::Read(result) => { - let size = result?; - let data = buffer[0..size].to_vec(); + ConsoleDataSelect::Read(Some(data)) => { yield ConsoleDataReply { data, }; }, + ConsoleDataSelect::Read(None) => { + break; + } + ConsoleDataSelect::Write(Some(request)) => { let request = request?; if !request.data.is_empty() { - console.write_handle.write_all(&request.data).await?; + console.send(request.data).await.map_err(|error| ApiError { + message: error.to_string(), + })?; } }, diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index 45c5a96..abca368 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -67,7 +67,7 @@ pub struct DaemonIdm { impl DaemonIdm { pub async fn new() -> Result { - let (service, receiver) = ChannelService::new("krata-channel".to_string()).await?; + let (service, _, receiver) = ChannelService::new("krata-channel".to_string(), None).await?; let task = service.launch().await?; let listeners = Arc::new(Mutex::new(HashMap::new())); Ok(DaemonIdm { diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index 167980e..ec68c92 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use anyhow::Result; +use console::{DaemonConsole, DaemonConsoleHandle}; use control::RuntimeControlService; use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; @@ -18,6 +19,7 @@ use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::{Identity, Server, ServerTlsConfig}; use uuid::Uuid; +pub mod console; pub mod control; pub mod db; pub mod event; @@ -26,13 +28,13 @@ pub mod reconcile; pub struct Daemon { store: String, - runtime: Runtime, guests: GuestStore, events: DaemonEventContext, guest_reconciler_task: JoinHandle<()>, guest_reconciler_notify: Sender, generator_task: JoinHandle<()>, _idm: DaemonIdmHandle, + console: DaemonConsoleHandle, } const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; @@ -45,6 +47,8 @@ impl Daemon { channel::(GUEST_RECONCILER_QUEUE_LEN); let idm = DaemonIdm::new().await?; let idm = idm.launch().await?; + let console = DaemonConsole::new().await?; + let console = console.launch().await?; let (events, generator) = DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone()) .await?; @@ -60,20 +64,20 @@ impl Daemon { let generator_task = generator.launch().await?; Ok(Self { store, - runtime, guests, events, guest_reconciler_task, guest_reconciler_notify, generator_task, _idm: idm, + console, }) } pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { let control_service = RuntimeControlService::new( self.events.clone(), - self.runtime.clone(), + self.console.clone(), self.guests.clone(), self.guest_reconciler_notify.clone(), ); diff --git a/crates/guest/src/init.rs b/crates/guest/src/init.rs index 6929b3b..09ba711 100644 --- a/crates/guest/src/init.rs +++ b/crates/guest/src/init.rs @@ -436,7 +436,7 @@ impl GuestInit { }; if launch.run.is_some() { - cmd = launch.run.as_ref().unwrap().clone(); + cmd.clone_from(launch.run.as_ref().unwrap()); } if let Some(entrypoint) = config.entrypoint() { diff --git a/crates/krata/src/idm/client.rs b/crates/krata/src/idm/client.rs index 919469f..a3c7096 100644 --- a/crates/krata/src/idm/client.rs +++ b/crates/krata/src/idm/client.rs @@ -3,7 +3,7 @@ use std::path::Path; use super::protocol::IdmPacket; use anyhow::{anyhow, Result}; use bytes::BytesMut; -use log::error; +use log::{debug, error}; use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg}; use prost::Message; use tokio::{ @@ -41,7 +41,7 @@ impl IdmClient { let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN); let task = tokio::task::spawn(async move { if let Err(error) = IdmClient::process(file, rx_sender, tx_receiver).await { - error!("failed to handle idm client processing: {}", error); + debug!("failed to handle idm client processing: {}", error); } }); Ok(IdmClient { diff --git a/crates/runtime/examples/channel.rs b/crates/runtime/examples/channel.rs index d99c1ea..9e81e1f 100644 --- a/crates/runtime/examples/channel.rs +++ b/crates/runtime/examples/channel.rs @@ -6,7 +6,7 @@ use kratart::channel::ChannelService; async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let (service, mut receiver) = ChannelService::new("krata-channel".to_string()).await?; + let (service, _, mut receiver) = ChannelService::new("krata-channel".to_string(), None).await?; let task = service.launch().await?; loop { diff --git a/crates/runtime/src/channel.rs b/crates/runtime/src/channel.rs index b702f09..2af8c2d 100644 --- a/crates/runtime/src/channel.rs +++ b/crates/runtime/src/channel.rs @@ -41,6 +41,7 @@ impl XenConsoleInterface { pub struct ChannelService { typ: String, + use_reserved_ref: Option, backends: HashMap, evtchn: EventChannel, store: XsdClient, @@ -51,20 +52,29 @@ pub struct ChannelService { } impl ChannelService { - pub async fn new(typ: String) -> Result<(ChannelService, Receiver<(u32, Vec)>)> { + pub async fn new( + typ: String, + use_reserved_ref: Option, + ) -> Result<( + ChannelService, + Sender<(u32, Vec)>, + Receiver<(u32, Vec)>, + )> { let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); Ok(( ChannelService { typ, + use_reserved_ref, backends: HashMap::new(), evtchn: EventChannel::open().await?, store: XsdClient::open().await?, gnttab: GrantTab::open()?, - input_sender, + input_sender: input_sender.clone(), input_receiver, output_sender, }, + input_sender, output_receiver, )) } @@ -148,6 +158,7 @@ impl ChannelService { self.evtchn.clone(), self.gnttab.clone(), self.output_sender.clone(), + self.use_reserved_ref, ) .await?; self.backends.insert(domid, backend); @@ -216,6 +227,7 @@ impl ChannelBackend { evtchn: EventChannel, gnttab: GrantTab, output_sender: Sender<(u32, Vec)>, + use_reserved_ref: Option, ) -> Result { let processor = KrataChannelBackendProcessor { backend, @@ -225,6 +237,7 @@ impl ChannelBackend { store, evtchn, gnttab, + use_reserved_ref, }; let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN); @@ -241,6 +254,7 @@ impl ChannelBackend { #[derive(Clone)] pub struct KrataChannelBackendProcessor { + use_reserved_ref: Option, backend: String, frontend: String, id: u32, @@ -347,7 +361,7 @@ impl KrataChannelBackendProcessor { return Err(anyhow!("frontend did not give ring-ref and port")); } - let Ok(ring_ref) = ring_ref.unwrap().parse::() else { + let Ok(mut ring_ref) = ring_ref.unwrap().parse::() else { return Err(anyhow!("frontend gave invalid ring-ref")); }; @@ -355,6 +369,8 @@ impl KrataChannelBackendProcessor { return Err(anyhow!("frontend gave invalid port")); }; + ring_ref = self.use_reserved_ref.unwrap_or(ring_ref); + break (ring_ref, port); } } diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index cfac3bb..dad8433 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -179,6 +179,7 @@ impl GuestLauncher { kernel_path: &context.kernel, initrd_path: &context.initrd, cmdline: &cmdline, + use_console_backend: Some("krata-console"), disks: vec![ DomainDisk { vdev: "xvda", diff --git a/crates/xen/xenclient/examples/boot.rs b/crates/xen/xenclient/examples/boot.rs index 6318278..b8357dc 100644 --- a/crates/xen/xenclient/examples/boot.rs +++ b/crates/xen/xenclient/examples/boot.rs @@ -22,6 +22,7 @@ async fn main() -> Result<()> { kernel_path: kernel_image_path.as_str(), initrd_path: initrd_path.as_str(), cmdline: "debug elevator=noop", + use_console_backend: None, disks: vec![], channels: vec![], vifs: vec![], diff --git a/crates/xen/xenclient/src/lib.rs b/crates/xen/xenclient/src/lib.rs index ead9607..dd7870b 100644 --- a/crates/xen/xenclient/src/lib.rs +++ b/crates/xen/xenclient/src/lib.rs @@ -89,6 +89,7 @@ pub struct DomainConfig<'a> { pub initrd_path: &'a str, pub cmdline: &'a str, pub disks: Vec>, + pub use_console_backend: Option<&'a str>, pub channels: Vec, pub vifs: Vec>, pub filesystems: Vec>, @@ -349,7 +350,10 @@ impl XenClient { } self.console_device_add( &DomainChannel { - typ: "xenconsoled".to_string(), + typ: config + .use_console_backend + .unwrap_or("xenconsoled") + .to_string(), initialized: true, }, &p2m, diff --git a/crates/xen/xenstore/src/sys.rs b/crates/xen/xenstore/src/sys.rs index e1a1a8b..f25e1df 100644 --- a/crates/xen/xenstore/src/sys.rs +++ b/crates/xen/xenstore/src/sys.rs @@ -1,7 +1,5 @@ /// Handwritten protocol definitions for XenStore. /// Used xen/include/public/io/xs_wire.h as a reference. -use libc; - use crate::error::Result; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use std::io::Cursor;