use std::{io, pin::Pin}; use async_stream::try_stream; use futures::Stream; use krata::control::{ control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, DestroyGuestReply, DestroyGuestRequest, GuestInfo, LaunchGuestReply, LaunchGuestRequest, ListGuestsReply, ListGuestsRequest, WatchEventsReply, WatchEventsRequest, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, select, }; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use crate::event::DaemonEventContext; use kratart::{launch::GuestLaunchRequest, Runtime}; pub struct ApiError { message: String, } impl From for ApiError { fn from(value: anyhow::Error) -> Self { ApiError { message: value.to_string(), } } } impl From for Status { fn from(value: ApiError) -> Self { Status::unknown(value.message) } } #[derive(Clone)] pub struct RuntimeControlService { events: DaemonEventContext, runtime: Runtime, } impl RuntimeControlService { pub fn new(events: DaemonEventContext, runtime: Runtime) -> Self { Self { events, runtime } } } enum ConsoleDataSelect { Read(io::Result), Write(Option>), } #[tonic::async_trait] impl ControlService for RuntimeControlService { type ConsoleDataStream = Pin> + Send + 'static>>; type WatchEventsStream = Pin> + Send + 'static>>; async fn launch_guest( &self, request: Request, ) -> Result, Status> { let request = request.into_inner(); let guest: GuestInfo = convert_guest_info( self.runtime .launch(GuestLaunchRequest { image: &request.image, vcpus: request.vcpus, mem: request.mem, env: empty_vec_optional(request.env), run: empty_vec_optional(request.run), debug: false, }) .await .map_err(ApiError::from)?, ); Ok(Response::new(LaunchGuestReply { guest: Some(guest) })) } async fn destroy_guest( &self, request: Request, ) -> Result, Status> { let request = request.into_inner(); self.runtime .destroy(&request.guest_id) .await .map_err(ApiError::from)?; Ok(Response::new(DestroyGuestReply {})) } async fn list_guests( &self, request: Request, ) -> Result, Status> { let _ = request.into_inner(); let guests = self.runtime.list().await.map_err(ApiError::from)?; let guests = guests .into_iter() .map(convert_guest_info) .collect::>(); Ok(Response::new(ListGuestsReply { guests })) } async fn console_data( &self, request: Request>, ) -> Result, Status> { let mut input = request.into_inner(); let Some(request) = input.next().await else { return Err(ApiError { message: "expected to have at least one request".to_string(), } .into()); }; let request = request?; let mut console = self .runtime .console(&request.guest_id) .await .map_err(ApiError::from)?; let output = try_stream! { let mut buffer: Vec = vec![0u8; 256]; loop { let what = select! { x = console.read_handle.read(&mut buffer) => ConsoleDataSelect::Read(x), x = input.next() => ConsoleDataSelect::Write(x), }; match what { ConsoleDataSelect::Read(result) => { let size = result?; let data = buffer[0..size].to_vec(); yield ConsoleDataReply { data, }; }, ConsoleDataSelect::Write(Some(request)) => { let request = request?; if !request.data.is_empty() { console.write_handle.write_all(&request.data).await?; } }, ConsoleDataSelect::Write(None) => { break; } } } }; Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream)) } async fn watch_events( &self, request: Request, ) -> Result, Status> { let _ = request.into_inner(); let mut events = self.events.subscribe(); let output = try_stream! { while let Ok(event) = events.recv().await { yield WatchEventsReply { event: Some(event), }; } }; Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) } } fn empty_vec_optional(value: Vec) -> Option> { if value.is_empty() { None } else { Some(value) } } fn convert_guest_info(value: kratart::GuestInfo) -> GuestInfo { GuestInfo { id: value.uuid.to_string(), image: value.image, ipv4: value.ipv4.map(|x| x.ip().to_string()).unwrap_or_default(), ipv6: value.ipv6.map(|x| x.ip().to_string()).unwrap_or_default(), } }