mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 05:10:55 +00:00
feat: fast guest lookup table and host identification
This commit is contained in:
parent
4e0d843de7
commit
fd0abae5a9
22
crates/ctl/src/cli/identify_host.rs
Normal file
22
crates/ctl/src/cli/identify_host.rs
Normal file
@ -0,0 +1,22 @@
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use krata::v1::control::{control_service_client::ControlServiceClient, IdentifyHostRequest};
|
||||
|
||||
use tonic::{transport::Channel, Request};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about = "Identify information about the host")]
|
||||
pub struct IdentifyHostCommand {}
|
||||
|
||||
impl IdentifyHostCommand {
|
||||
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
|
||||
let response = client
|
||||
.identify_host(Request::new(IdentifyHostRequest {}))
|
||||
.await?
|
||||
.into_inner();
|
||||
println!("Host UUID: {}", response.host_uuid);
|
||||
println!("Host Domain: {}", response.host_domid);
|
||||
println!("Krata Version: {}", response.krata_version);
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -83,8 +83,8 @@ impl IdmSnoopCommand {
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct IdmSnoopLine {
|
||||
pub from: u32,
|
||||
pub to: u32,
|
||||
pub from: String,
|
||||
pub to: String,
|
||||
pub packet: IdmSnoopData,
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
pub mod attach;
|
||||
pub mod destroy;
|
||||
pub mod identify_host;
|
||||
pub mod idm_snoop;
|
||||
pub mod launch;
|
||||
pub mod list;
|
||||
@ -20,9 +21,10 @@ use krata::{
|
||||
use tonic::{transport::Channel, Request};
|
||||
|
||||
use self::{
|
||||
attach::AttachCommand, destroy::DestroyCommand, idm_snoop::IdmSnoopCommand,
|
||||
launch::LauchCommand, list::ListCommand, logs::LogsCommand, metrics::MetricsCommand,
|
||||
pull::PullCommand, resolve::ResolveCommand, top::TopCommand, watch::WatchCommand,
|
||||
attach::AttachCommand, destroy::DestroyCommand, identify_host::IdentifyHostCommand,
|
||||
idm_snoop::IdmSnoopCommand, launch::LauchCommand, list::ListCommand, logs::LogsCommand,
|
||||
metrics::MetricsCommand, pull::PullCommand, resolve::ResolveCommand, top::TopCommand,
|
||||
watch::WatchCommand,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
@ -56,6 +58,7 @@ pub enum Commands {
|
||||
Metrics(MetricsCommand),
|
||||
IdmSnoop(IdmSnoopCommand),
|
||||
Top(TopCommand),
|
||||
IdentifyHost(IdentifyHostCommand),
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
@ -107,6 +110,10 @@ impl ControlCommand {
|
||||
Commands::Pull(pull) => {
|
||||
pull.run(client).await?;
|
||||
}
|
||||
|
||||
Commands::IdentifyHost(identify) => {
|
||||
identify.run(client).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,21 +1,9 @@
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use env_logger::Env;
|
||||
use krata::dial::ControlDialAddress;
|
||||
use kratad::Daemon;
|
||||
use kratad::command::DaemonCommand;
|
||||
use log::LevelFilter;
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
struct DaemonCommand {
|
||||
#[arg(short, long, default_value = "unix:///var/lib/krata/daemon.socket")]
|
||||
listen: String,
|
||||
#[arg(short, long, default_value = "/var/lib/krata")]
|
||||
store: String,
|
||||
}
|
||||
use std::sync::{atomic::AtomicBool, Arc};
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn main() -> Result<()> {
|
||||
@ -24,12 +12,8 @@ async fn main() -> Result<()> {
|
||||
.init();
|
||||
mask_sighup()?;
|
||||
|
||||
let args = DaemonCommand::parse();
|
||||
let addr = ControlDialAddress::from_str(&args.listen)?;
|
||||
|
||||
let mut daemon = Daemon::new(args.store.clone()).await?;
|
||||
daemon.listen(addr).await?;
|
||||
Ok(())
|
||||
let command = DaemonCommand::parse();
|
||||
command.run().await
|
||||
}
|
||||
|
||||
fn mask_sighup() -> Result<()> {
|
||||
|
36
crates/daemon/src/command.rs
Normal file
36
crates/daemon/src/command.rs
Normal file
@ -0,0 +1,36 @@
|
||||
use anyhow::Result;
|
||||
use clap::{CommandFactory, Parser};
|
||||
use krata::dial::ControlDialAddress;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::Daemon;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(version, about = "Krata hypervisor daemon")]
|
||||
pub struct DaemonCommand {
|
||||
#[arg(
|
||||
short,
|
||||
long,
|
||||
default_value = "unix:///var/lib/krata/daemon.socket",
|
||||
help = "Listen address"
|
||||
)]
|
||||
listen: String,
|
||||
#[arg(short, long, default_value = "/var/lib/krata", help = "Storage path")]
|
||||
store: String,
|
||||
}
|
||||
|
||||
impl DaemonCommand {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
let addr = ControlDialAddress::from_str(&self.listen)?;
|
||||
let mut daemon = Daemon::new(self.store.clone()).await?;
|
||||
daemon.listen(addr).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn version() -> String {
|
||||
DaemonCommand::command()
|
||||
.get_version()
|
||||
.unwrap_or("unknown")
|
||||
.to_string()
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, Result};
|
||||
use circular_buffer::CircularBuffer;
|
||||
use kratart::channel::ChannelService;
|
||||
use log::error;
|
||||
@ -11,6 +11,9 @@ use tokio::{
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::glt::GuestLookupTable;
|
||||
|
||||
const CONSOLE_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
type RawConsoleBuffer = CircularBuffer<CONSOLE_BUFFER_SIZE, u8>;
|
||||
@ -21,6 +24,7 @@ type BufferMap = Arc<Mutex<HashMap<u32, ConsoleBuffer>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonConsoleHandle {
|
||||
glt: GuestLookupTable,
|
||||
listeners: ListenerMap,
|
||||
buffers: BufferMap,
|
||||
sender: Sender<(u32, Vec<u8>)>,
|
||||
@ -50,9 +54,12 @@ impl DaemonConsoleAttachHandle {
|
||||
impl DaemonConsoleHandle {
|
||||
pub async fn attach(
|
||||
&self,
|
||||
domid: u32,
|
||||
uuid: Uuid,
|
||||
sender: Sender<Vec<u8>>,
|
||||
) -> Result<DaemonConsoleAttachHandle> {
|
||||
let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else {
|
||||
return Err(anyhow!("unable to find domain {}", uuid));
|
||||
};
|
||||
let buffers = self.buffers.lock().await;
|
||||
let buffer = buffers.get(&domid).map(|x| x.to_vec()).unwrap_or_default();
|
||||
drop(buffers);
|
||||
@ -77,6 +84,7 @@ impl Drop for DaemonConsoleHandle {
|
||||
}
|
||||
|
||||
pub struct DaemonConsole {
|
||||
glt: GuestLookupTable,
|
||||
listeners: ListenerMap,
|
||||
buffers: BufferMap,
|
||||
receiver: Receiver<(u32, Option<Vec<u8>>)>,
|
||||
@ -85,13 +93,14 @@ pub struct DaemonConsole {
|
||||
}
|
||||
|
||||
impl DaemonConsole {
|
||||
pub async fn new() -> Result<DaemonConsole> {
|
||||
pub async fn new(glt: GuestLookupTable) -> Result<DaemonConsole> {
|
||||
let (service, sender, receiver) =
|
||||
ChannelService::new("krata-console".to_string(), Some(0)).await?;
|
||||
let task = service.launch().await?;
|
||||
let listeners = Arc::new(Mutex::new(HashMap::new()));
|
||||
let buffers = Arc::new(Mutex::new(HashMap::new()));
|
||||
Ok(DaemonConsole {
|
||||
glt,
|
||||
listeners,
|
||||
buffers,
|
||||
receiver,
|
||||
@ -101,6 +110,7 @@ impl DaemonConsole {
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<DaemonConsoleHandle> {
|
||||
let glt = self.glt.clone();
|
||||
let listeners = self.listeners.clone();
|
||||
let buffers = self.buffers.clone();
|
||||
let sender = self.sender.clone();
|
||||
@ -110,6 +120,7 @@ impl DaemonConsole {
|
||||
}
|
||||
});
|
||||
Ok(DaemonConsoleHandle {
|
||||
glt,
|
||||
listeners,
|
||||
buffers,
|
||||
sender,
|
||||
|
@ -10,9 +10,10 @@ use krata::{
|
||||
control::{
|
||||
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
|
||||
CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest,
|
||||
ListGuestsReply, ListGuestsRequest, PullImageReply, PullImageRequest,
|
||||
ReadGuestMetricsReply, ReadGuestMetricsRequest, ResolveGuestReply, ResolveGuestRequest,
|
||||
SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest,
|
||||
IdentifyHostReply, IdentifyHostRequest, ListGuestsReply, ListGuestsRequest,
|
||||
PullImageReply, PullImageRequest, ReadGuestMetricsReply, ReadGuestMetricsRequest,
|
||||
ResolveGuestReply, ResolveGuestRequest, SnoopIdmReply, SnoopIdmRequest,
|
||||
WatchEventsReply, WatchEventsRequest,
|
||||
},
|
||||
},
|
||||
};
|
||||
@ -32,7 +33,8 @@ use tonic::{Request, Response, Status, Streaming};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
console::DaemonConsoleHandle, db::GuestStore, event::DaemonEventContext, idm::DaemonIdmHandle,
|
||||
command::DaemonCommand, console::DaemonConsoleHandle, db::GuestStore,
|
||||
event::DaemonEventContext, glt::GuestLookupTable, idm::DaemonIdmHandle,
|
||||
metrics::idm_metric_to_api, oci::convert_oci_progress,
|
||||
};
|
||||
|
||||
@ -56,6 +58,7 @@ impl From<ApiError> for Status {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonControlService {
|
||||
glt: GuestLookupTable,
|
||||
events: DaemonEventContext,
|
||||
console: DaemonConsoleHandle,
|
||||
idm: DaemonIdmHandle,
|
||||
@ -66,6 +69,7 @@ pub struct DaemonControlService {
|
||||
|
||||
impl DaemonControlService {
|
||||
pub fn new(
|
||||
glt: GuestLookupTable,
|
||||
events: DaemonEventContext,
|
||||
console: DaemonConsoleHandle,
|
||||
idm: DaemonIdmHandle,
|
||||
@ -74,6 +78,7 @@ impl DaemonControlService {
|
||||
packer: OciPackerService,
|
||||
) -> Self {
|
||||
Self {
|
||||
glt,
|
||||
events,
|
||||
console,
|
||||
idm,
|
||||
@ -108,6 +113,18 @@ impl ControlService for DaemonControlService {
|
||||
type SnoopIdmStream =
|
||||
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
|
||||
|
||||
async fn identify_host(
|
||||
&self,
|
||||
request: Request<IdentifyHostRequest>,
|
||||
) -> Result<Response<IdentifyHostReply>, Status> {
|
||||
let _ = request.into_inner();
|
||||
Ok(Response::new(IdentifyHostReply {
|
||||
host_domid: self.glt.host_domid(),
|
||||
host_uuid: self.glt.host_uuid().to_string(),
|
||||
krata_version: DaemonCommand::version(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn create_guest(
|
||||
&self,
|
||||
request: Request<CreateGuestRequest>,
|
||||
@ -130,6 +147,7 @@ impl ControlService for DaemonControlService {
|
||||
network: None,
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
host: self.glt.host_uuid().to_string(),
|
||||
domid: u32::MAX,
|
||||
}),
|
||||
spec: Some(spec),
|
||||
@ -230,36 +248,10 @@ impl ControlService for DaemonControlService {
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let guest = self
|
||||
.guests
|
||||
.read(uuid)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?
|
||||
.ok_or_else(|| ApiError {
|
||||
message: "guest did not exist in the database".to_string(),
|
||||
})?;
|
||||
|
||||
let Some(ref state) = guest.state else {
|
||||
return Err(ApiError {
|
||||
message: "guest did not have state".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
|
||||
let domid = state.domid;
|
||||
if domid == 0 {
|
||||
return Err(ApiError {
|
||||
message: "invalid domid on the guest".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let (sender, mut receiver) = channel(100);
|
||||
let console = self
|
||||
.console
|
||||
.attach(domid, sender)
|
||||
.attach(uuid, sender)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: format!("failed to attach to console: {}", error),
|
||||
@ -309,33 +301,7 @@ impl ControlService for DaemonControlService {
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let guest = self
|
||||
.guests
|
||||
.read(uuid)
|
||||
.await
|
||||
.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?
|
||||
.ok_or_else(|| ApiError {
|
||||
message: "guest did not exist in the database".to_string(),
|
||||
})?;
|
||||
|
||||
let Some(ref state) = guest.state else {
|
||||
return Err(ApiError {
|
||||
message: "guest did not have state".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
|
||||
let domid = state.domid;
|
||||
if domid == 0 {
|
||||
return Err(ApiError {
|
||||
message: "invalid domid on the guest".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let client = self.idm.client(domid).await.map_err(|error| ApiError {
|
||||
let client = self.idm.client(uuid).await.map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
|
||||
@ -448,9 +414,16 @@ impl ControlService for DaemonControlService {
|
||||
) -> Result<Response<Self::SnoopIdmStream>, Status> {
|
||||
let _ = request.into_inner();
|
||||
let mut messages = self.idm.snoop();
|
||||
let glt = self.glt.clone();
|
||||
let output = try_stream! {
|
||||
while let Ok(event) = messages.recv().await {
|
||||
yield SnoopIdmReply { from: event.from, to: event.to, packet: Some(event.packet) };
|
||||
let Some(from_uuid) = glt.lookup_uuid_by_domid(event.from).await else {
|
||||
continue;
|
||||
};
|
||||
let Some(to_uuid) = glt.lookup_uuid_by_domid(event.to).await else {
|
||||
continue;
|
||||
};
|
||||
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
|
||||
}
|
||||
};
|
||||
Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream))
|
||||
|
@ -93,7 +93,7 @@ impl DaemonEventGenerator {
|
||||
match status {
|
||||
GuestStatus::Started => {
|
||||
if let Entry::Vacant(e) = self.idms.entry(domid) {
|
||||
let client = self.idm.client(domid).await?;
|
||||
let client = self.idm.client_by_domid(domid).await?;
|
||||
let mut receiver = client.subscribe().await?;
|
||||
let sender = self.idm_sender.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
@ -136,6 +136,7 @@ impl DaemonEventGenerator {
|
||||
network: guest.state.clone().unwrap_or_default().network,
|
||||
exit_info: Some(GuestExitInfo { code }),
|
||||
error_info: None,
|
||||
host: guest.state.clone().map(|x| x.host).unwrap_or_default(),
|
||||
domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
|
||||
});
|
||||
|
||||
|
69
crates/daemon/src/glt.rs
Normal file
69
crates/daemon/src/glt.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
struct GuestLookupTableState {
|
||||
domid_to_uuid: HashMap<u32, Uuid>,
|
||||
uuid_to_domid: HashMap<Uuid, u32>,
|
||||
}
|
||||
|
||||
impl GuestLookupTableState {
|
||||
pub fn new(host_uuid: Uuid) -> Self {
|
||||
let mut domid_to_uuid = HashMap::new();
|
||||
let mut uuid_to_domid = HashMap::new();
|
||||
domid_to_uuid.insert(0, host_uuid);
|
||||
uuid_to_domid.insert(host_uuid, 0);
|
||||
GuestLookupTableState {
|
||||
domid_to_uuid,
|
||||
uuid_to_domid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GuestLookupTable {
|
||||
host_domid: u32,
|
||||
host_uuid: Uuid,
|
||||
state: Arc<RwLock<GuestLookupTableState>>,
|
||||
}
|
||||
|
||||
impl GuestLookupTable {
|
||||
pub fn new(host_domid: u32, host_uuid: Uuid) -> Self {
|
||||
GuestLookupTable {
|
||||
host_domid,
|
||||
host_uuid,
|
||||
state: Arc::new(RwLock::new(GuestLookupTableState::new(host_uuid))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn host_uuid(&self) -> Uuid {
|
||||
self.host_uuid
|
||||
}
|
||||
|
||||
pub fn host_domid(&self) -> u32 {
|
||||
self.host_domid
|
||||
}
|
||||
|
||||
pub async fn lookup_uuid_by_domid(&self, domid: u32) -> Option<Uuid> {
|
||||
let state = self.state.read().await;
|
||||
state.domid_to_uuid.get(&domid).cloned()
|
||||
}
|
||||
|
||||
pub async fn lookup_domid_by_uuid(&self, uuid: &Uuid) -> Option<u32> {
|
||||
let state = self.state.read().await;
|
||||
state.uuid_to_domid.get(uuid).cloned()
|
||||
}
|
||||
|
||||
pub async fn associate(&self, uuid: Uuid, domid: u32) {
|
||||
let mut state = self.state.write().await;
|
||||
state.uuid_to_domid.insert(uuid, domid);
|
||||
state.domid_to_uuid.insert(domid, uuid);
|
||||
}
|
||||
|
||||
pub async fn remove(&self, uuid: Uuid, domid: u32) {
|
||||
let mut state = self.state.write().await;
|
||||
state.uuid_to_domid.remove(&uuid);
|
||||
state.domid_to_uuid.remove(&domid);
|
||||
}
|
||||
}
|
@ -22,12 +22,16 @@ use tokio::{
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::glt::GuestLookupTable;
|
||||
|
||||
type BackendFeedMap = Arc<Mutex<HashMap<u32, Sender<IdmTransportPacket>>>>;
|
||||
type ClientMap = Arc<Mutex<HashMap<u32, IdmInternalClient>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonIdmHandle {
|
||||
glt: GuestLookupTable,
|
||||
clients: ClientMap,
|
||||
feeds: BackendFeedMap,
|
||||
tx_sender: Sender<(u32, IdmTransportPacket)>,
|
||||
@ -40,7 +44,14 @@ impl DaemonIdmHandle {
|
||||
self.snoop_sender.subscribe()
|
||||
}
|
||||
|
||||
pub async fn client(&self, domid: u32) -> Result<IdmInternalClient> {
|
||||
pub async fn client(&self, uuid: Uuid) -> Result<IdmInternalClient> {
|
||||
let Some(domid) = self.glt.lookup_domid_by_uuid(&uuid).await else {
|
||||
return Err(anyhow!("unable to find domain {}", uuid));
|
||||
};
|
||||
self.client_by_domid(domid).await
|
||||
}
|
||||
|
||||
pub async fn client_by_domid(&self, domid: u32) -> Result<IdmInternalClient> {
|
||||
client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await
|
||||
}
|
||||
}
|
||||
@ -61,6 +72,7 @@ pub struct DaemonIdmSnoopPacket {
|
||||
}
|
||||
|
||||
pub struct DaemonIdm {
|
||||
glt: GuestLookupTable,
|
||||
clients: ClientMap,
|
||||
feeds: BackendFeedMap,
|
||||
tx_sender: Sender<(u32, IdmTransportPacket)>,
|
||||
@ -72,7 +84,7 @@ pub struct DaemonIdm {
|
||||
}
|
||||
|
||||
impl DaemonIdm {
|
||||
pub async fn new() -> Result<DaemonIdm> {
|
||||
pub async fn new(glt: GuestLookupTable) -> Result<DaemonIdm> {
|
||||
let (service, tx_raw_sender, rx_receiver) =
|
||||
ChannelService::new("krata-channel".to_string(), None).await?;
|
||||
let (tx_sender, tx_receiver) = channel(100);
|
||||
@ -81,6 +93,7 @@ impl DaemonIdm {
|
||||
let clients = Arc::new(Mutex::new(HashMap::new()));
|
||||
let feeds = Arc::new(Mutex::new(HashMap::new()));
|
||||
Ok(DaemonIdm {
|
||||
glt,
|
||||
rx_receiver,
|
||||
tx_receiver,
|
||||
tx_sender,
|
||||
@ -93,6 +106,7 @@ impl DaemonIdm {
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<DaemonIdmHandle> {
|
||||
let glt = self.glt.clone();
|
||||
let clients = self.clients.clone();
|
||||
let feeds = self.feeds.clone();
|
||||
let tx_sender = self.tx_sender.clone();
|
||||
@ -105,6 +119,7 @@ impl DaemonIdm {
|
||||
}
|
||||
});
|
||||
Ok(DaemonIdmHandle {
|
||||
glt,
|
||||
clients,
|
||||
feeds,
|
||||
tx_sender,
|
||||
|
@ -5,6 +5,7 @@ use console::{DaemonConsole, DaemonConsoleHandle};
|
||||
use control::DaemonControlService;
|
||||
use db::GuestStore;
|
||||
use event::{DaemonEventContext, DaemonEventGenerator};
|
||||
use glt::GuestLookupTable;
|
||||
use idm::{DaemonIdm, DaemonIdmHandle};
|
||||
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
|
||||
use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
|
||||
@ -21,10 +22,12 @@ use tokio_stream::wrappers::UnixListenerStream;
|
||||
use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod command;
|
||||
pub mod console;
|
||||
pub mod control;
|
||||
pub mod db;
|
||||
pub mod event;
|
||||
pub mod glt;
|
||||
pub mod idm;
|
||||
pub mod metrics;
|
||||
pub mod oci;
|
||||
@ -32,6 +35,7 @@ pub mod reconcile;
|
||||
|
||||
pub struct Daemon {
|
||||
store: String,
|
||||
glt: GuestLookupTable,
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
guest_reconciler_task: JoinHandle<()>,
|
||||
@ -51,22 +55,43 @@ impl Daemon {
|
||||
image_cache_dir.push("image");
|
||||
fs::create_dir_all(&image_cache_dir).await?;
|
||||
|
||||
let mut host_uuid_path = PathBuf::from(store.clone());
|
||||
host_uuid_path.push("host.uuid");
|
||||
let host_uuid = if host_uuid_path.is_file() {
|
||||
let content = fs::read_to_string(&host_uuid_path).await?;
|
||||
Uuid::from_str(content.trim()).ok()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let host_uuid = if let Some(host_uuid) = host_uuid {
|
||||
host_uuid
|
||||
} else {
|
||||
let generated = Uuid::new_v4();
|
||||
let mut string = generated.to_string();
|
||||
string.push('\n');
|
||||
fs::write(&host_uuid_path, string).await?;
|
||||
generated
|
||||
};
|
||||
|
||||
let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?;
|
||||
|
||||
let runtime = Runtime::new(store.clone()).await?;
|
||||
let glt = GuestLookupTable::new(0, host_uuid);
|
||||
let guests_db_path = format!("{}/guests.db", store);
|
||||
let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
|
||||
let (guest_reconciler_notify, guest_reconciler_receiver) =
|
||||
channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN);
|
||||
let idm = DaemonIdm::new().await?;
|
||||
let idm = DaemonIdm::new(glt.clone()).await?;
|
||||
let idm = idm.launch().await?;
|
||||
let console = DaemonConsole::new().await?;
|
||||
let console = DaemonConsole::new(glt.clone()).await?;
|
||||
let console = console.launch().await?;
|
||||
let (events, generator) =
|
||||
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
|
||||
.await?;
|
||||
let runtime_for_reconciler = runtime.dupe().await?;
|
||||
let guest_reconciler = GuestReconciler::new(
|
||||
glt.clone(),
|
||||
guests.clone(),
|
||||
events.clone(),
|
||||
runtime_for_reconciler,
|
||||
@ -79,6 +104,7 @@ impl Daemon {
|
||||
|
||||
Ok(Self {
|
||||
store,
|
||||
glt,
|
||||
guests,
|
||||
events,
|
||||
guest_reconciler_task,
|
||||
@ -92,6 +118,7 @@ impl Daemon {
|
||||
|
||||
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
|
||||
let control_service = DaemonControlService::new(
|
||||
self.glt.clone(),
|
||||
self.events.clone(),
|
||||
self.console.clone(),
|
||||
self.idm.clone(),
|
||||
|
@ -30,6 +30,7 @@ use uuid::Uuid;
|
||||
use crate::{
|
||||
db::GuestStore,
|
||||
event::{DaemonEvent, DaemonEventContext},
|
||||
glt::GuestLookupTable,
|
||||
};
|
||||
|
||||
const PARALLEL_LIMIT: u32 = 5;
|
||||
@ -53,6 +54,7 @@ impl Drop for GuestReconcilerEntry {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GuestReconciler {
|
||||
glt: GuestLookupTable,
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
@ -64,6 +66,7 @@ pub struct GuestReconciler {
|
||||
|
||||
impl GuestReconciler {
|
||||
pub fn new(
|
||||
glt: GuestLookupTable,
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
@ -71,6 +74,7 @@ impl GuestReconciler {
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
glt,
|
||||
guests,
|
||||
events,
|
||||
runtime,
|
||||
@ -123,6 +127,23 @@ impl GuestReconciler {
|
||||
trace!("reconciling runtime");
|
||||
let runtime_guests = self.runtime.list().await?;
|
||||
let stored_guests = self.guests.list().await?;
|
||||
|
||||
let non_existent_guests = runtime_guests
|
||||
.iter()
|
||||
.filter(|x| !stored_guests.iter().any(|g| *g.0 == x.uuid))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for guest in non_existent_guests {
|
||||
warn!("destroying unknown runtime guest {}", guest.uuid);
|
||||
if let Err(error) = self.runtime.destroy(guest.uuid).await {
|
||||
error!(
|
||||
"failed to destroy unknown runtime guest {}: {}",
|
||||
guest.uuid, error
|
||||
);
|
||||
}
|
||||
self.guests.remove(guest.uuid).await?;
|
||||
}
|
||||
|
||||
for (uuid, mut stored_guest) in stored_guests {
|
||||
let previous_guest = stored_guest.clone();
|
||||
let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid);
|
||||
@ -136,6 +157,7 @@ impl GuestReconciler {
|
||||
}
|
||||
|
||||
Some(runtime) => {
|
||||
self.glt.associate(uuid, runtime.domid).await;
|
||||
let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default();
|
||||
if let Some(code) = runtime.state.exit_code {
|
||||
state.status = GuestStatus::Exited.into();
|
||||
@ -283,12 +305,14 @@ impl GuestReconciler {
|
||||
debug: false,
|
||||
})
|
||||
.await?;
|
||||
self.glt.associate(uuid, info.domid).await;
|
||||
info!("started guest {}", uuid);
|
||||
guest.state = Some(GuestState {
|
||||
status: GuestStatus::Started.into(),
|
||||
network: Some(guestinfo_to_networkstate(&info)),
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
host: self.glt.host_uuid().to_string(),
|
||||
domid: info.domid,
|
||||
});
|
||||
Ok(GuestReconcilerResult::Changed { rerun: false })
|
||||
@ -308,13 +332,20 @@ impl GuestReconciler {
|
||||
trace!("failed to destroy runtime guest {}: {}", uuid, error);
|
||||
}
|
||||
|
||||
let domid = guest.state.as_ref().map(|x| x.domid);
|
||||
|
||||
if let Some(domid) = domid {
|
||||
self.glt.remove(uuid, domid).await;
|
||||
}
|
||||
|
||||
info!("destroyed guest {}", uuid);
|
||||
guest.state = Some(GuestState {
|
||||
status: GuestStatus::Destroyed.into(),
|
||||
network: None,
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
domid: guest.state.as_ref().map(|x| x.domid).unwrap_or(u32::MAX),
|
||||
host: self.glt.host_uuid().to_string(),
|
||||
domid: domid.unwrap_or(u32::MAX),
|
||||
});
|
||||
Ok(GuestReconcilerResult::Changed { rerun: false })
|
||||
}
|
||||
|
@ -62,7 +62,8 @@ message GuestState {
|
||||
GuestNetworkState network = 2;
|
||||
GuestExitInfo exit_info = 3;
|
||||
GuestErrorInfo error_info = 4;
|
||||
uint32 domid = 5;
|
||||
string host = 5;
|
||||
uint32 domid = 6;
|
||||
}
|
||||
|
||||
enum GuestStatus {
|
||||
|
@ -10,6 +10,8 @@ import "krata/idm/transport.proto";
|
||||
import "krata/v1/common.proto";
|
||||
|
||||
service ControlService {
|
||||
rpc IdentifyHost(IdentifyHostRequest) returns (IdentifyHostReply);
|
||||
|
||||
rpc CreateGuest(CreateGuestRequest) returns (CreateGuestReply);
|
||||
rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply);
|
||||
rpc ResolveGuest(ResolveGuestRequest) returns (ResolveGuestReply);
|
||||
@ -24,6 +26,14 @@ service ControlService {
|
||||
rpc PullImage(PullImageRequest) returns (stream PullImageReply);
|
||||
}
|
||||
|
||||
message IdentifyHostRequest {}
|
||||
|
||||
message IdentifyHostReply {
|
||||
string host_uuid = 1;
|
||||
uint32 host_domid = 2;
|
||||
string krata_version = 3;
|
||||
}
|
||||
|
||||
message CreateGuestRequest {
|
||||
krata.v1.common.GuestSpec spec = 1;
|
||||
}
|
||||
@ -84,8 +94,8 @@ message ReadGuestMetricsReply {
|
||||
message SnoopIdmRequest {}
|
||||
|
||||
message SnoopIdmReply {
|
||||
uint32 from = 1;
|
||||
uint32 to = 2;
|
||||
string from = 1;
|
||||
string to = 2;
|
||||
krata.idm.transport.IdmTransportPacket packet = 3;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user