2024-03-23 02:10:30 +00:00
|
|
|
use std::{
|
|
|
|
collections::{hash_map::Entry, HashMap},
|
|
|
|
str::FromStr,
|
|
|
|
time::Duration,
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
use anyhow::Result;
|
2024-03-28 22:38:21 +00:00
|
|
|
use krata::{
|
2024-03-30 03:49:13 +00:00
|
|
|
idm::protocol::{idm_event::Event, IdmPacket},
|
2024-03-28 22:38:21 +00:00
|
|
|
v1::common::{GuestExitInfo, GuestState, GuestStatus},
|
|
|
|
};
|
2024-03-14 14:03:11 +00:00
|
|
|
use log::error;
|
|
|
|
use tokio::{
|
2024-03-15 17:36:26 +00:00
|
|
|
select,
|
|
|
|
sync::{
|
|
|
|
broadcast,
|
|
|
|
mpsc::{channel, Receiver, Sender},
|
|
|
|
},
|
2024-03-14 14:03:11 +00:00
|
|
|
task::JoinHandle,
|
|
|
|
time,
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
2024-03-28 22:38:21 +00:00
|
|
|
use crate::{
|
|
|
|
db::GuestStore,
|
|
|
|
idm::{DaemonIdmHandle, DaemonIdmSubscribeHandle},
|
|
|
|
};
|
2024-03-14 14:03:11 +00:00
|
|
|
|
2024-03-24 05:52:25 +00:00
|
|
|
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
const EVENT_CHANNEL_QUEUE_LEN: usize = 1000;
|
2024-03-28 22:38:21 +00:00
|
|
|
const IDM_CHANNEL_QUEUE_LEN: usize = 1000;
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct DaemonEventContext {
|
|
|
|
sender: broadcast::Sender<DaemonEvent>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl DaemonEventContext {
|
|
|
|
pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
|
|
|
|
self.sender.subscribe()
|
|
|
|
}
|
2024-03-14 14:03:11 +00:00
|
|
|
|
|
|
|
pub fn send(&self, event: DaemonEvent) -> Result<()> {
|
|
|
|
let _ = self.sender.send(event);
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub struct DaemonEventGenerator {
|
2024-03-14 14:03:11 +00:00
|
|
|
guests: GuestStore,
|
|
|
|
guest_reconciler_notify: Sender<Uuid>,
|
2024-03-15 17:36:26 +00:00
|
|
|
feed: broadcast::Receiver<DaemonEvent>,
|
2024-03-28 22:38:21 +00:00
|
|
|
idm: DaemonIdmHandle,
|
|
|
|
idms: HashMap<u32, (Uuid, DaemonIdmSubscribeHandle)>,
|
|
|
|
idm_sender: Sender<(u32, IdmPacket)>,
|
|
|
|
idm_receiver: Receiver<(u32, IdmPacket)>,
|
2024-03-15 17:36:26 +00:00
|
|
|
_event_sender: broadcast::Sender<DaemonEvent>,
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl DaemonEventGenerator {
|
2024-03-14 14:03:11 +00:00
|
|
|
pub async fn new(
|
|
|
|
guests: GuestStore,
|
|
|
|
guest_reconciler_notify: Sender<Uuid>,
|
2024-03-28 22:38:21 +00:00
|
|
|
idm: DaemonIdmHandle,
|
2024-03-14 14:03:11 +00:00
|
|
|
) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
|
2024-03-06 15:57:56 +00:00
|
|
|
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
|
2024-03-28 22:38:21 +00:00
|
|
|
let (idm_sender, idm_receiver) = channel(IDM_CHANNEL_QUEUE_LEN);
|
2024-03-06 15:57:56 +00:00
|
|
|
let generator = DaemonEventGenerator {
|
2024-03-14 14:03:11 +00:00
|
|
|
guests,
|
|
|
|
guest_reconciler_notify,
|
2024-03-15 17:36:26 +00:00
|
|
|
feed: sender.subscribe(),
|
2024-03-28 22:38:21 +00:00
|
|
|
idm,
|
|
|
|
idms: HashMap::new(),
|
|
|
|
idm_sender,
|
|
|
|
idm_receiver,
|
2024-03-15 17:36:26 +00:00
|
|
|
_event_sender: sender.clone(),
|
2024-03-06 15:57:56 +00:00
|
|
|
};
|
|
|
|
let context = DaemonEventContext { sender };
|
|
|
|
Ok((context, generator))
|
|
|
|
}
|
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> {
|
|
|
|
match event {
|
|
|
|
DaemonEvent::GuestChanged(changed) => {
|
|
|
|
let Some(ref guest) = changed.guest else {
|
|
|
|
return Ok(());
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
let Some(ref state) = guest.state else {
|
|
|
|
return Ok(());
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
let status = state.status();
|
|
|
|
let id = Uuid::from_str(&guest.id)?;
|
2024-03-28 22:38:21 +00:00
|
|
|
let domid = state.domid;
|
2024-03-15 17:36:26 +00:00
|
|
|
match status {
|
|
|
|
GuestStatus::Started => {
|
2024-03-28 22:38:21 +00:00
|
|
|
if let Entry::Vacant(e) = self.idms.entry(domid) {
|
|
|
|
let subscribe =
|
|
|
|
self.idm.subscribe(domid, self.idm_sender.clone()).await?;
|
|
|
|
e.insert((id, subscribe));
|
2024-03-23 02:10:30 +00:00
|
|
|
}
|
2024-03-15 17:36:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
GuestStatus::Destroyed => {
|
2024-03-28 22:38:21 +00:00
|
|
|
if let Some((_, handle)) = self.idms.remove(&domid) {
|
|
|
|
handle.unsubscribe().await?;
|
2024-03-15 17:36:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => {}
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
2024-03-15 17:36:26 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-28 22:38:21 +00:00
|
|
|
async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> {
|
2024-03-30 03:49:13 +00:00
|
|
|
if let Some(Event::Exit(exit)) = packet.event.and_then(|x| x.event) {
|
2024-03-28 22:38:21 +00:00
|
|
|
self.handle_exit_code(id, exit.code).await?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
|
2024-03-30 09:29:03 +00:00
|
|
|
if let Some(mut guest) = self.guests.read(id).await? {
|
2024-03-15 17:36:26 +00:00
|
|
|
guest.state = Some(GuestState {
|
|
|
|
status: GuestStatus::Exited.into(),
|
2024-03-23 07:00:12 +00:00
|
|
|
network: guest.state.clone().unwrap_or_default().network,
|
2024-03-15 17:36:26 +00:00
|
|
|
exit_info: Some(GuestExitInfo { code }),
|
|
|
|
error_info: None,
|
2024-03-27 02:54:39 +00:00
|
|
|
domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
|
2024-03-15 17:36:26 +00:00
|
|
|
});
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-30 09:29:03 +00:00
|
|
|
self.guests.update(id, guest).await?;
|
2024-03-15 17:36:26 +00:00
|
|
|
self.guest_reconciler_notify.send(id).await?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn evaluate(&mut self) -> Result<()> {
|
|
|
|
select! {
|
2024-03-28 22:38:21 +00:00
|
|
|
x = self.idm_receiver.recv() => match x {
|
|
|
|
Some((domid, packet)) => {
|
|
|
|
if let Some((id, _)) = self.idms.get(&domid) {
|
|
|
|
self.handle_idm_packet(*id, packet).await?;
|
|
|
|
}
|
|
|
|
Ok(())
|
2024-03-15 17:36:26 +00:00
|
|
|
},
|
|
|
|
None => {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
},
|
|
|
|
x = self.feed.recv() => match x {
|
|
|
|
Ok(event) => {
|
|
|
|
self.handle_feed_event(&event).await
|
|
|
|
},
|
|
|
|
Err(error) => {
|
|
|
|
Err(error.into())
|
|
|
|
}
|
2024-03-14 14:03:11 +00:00
|
|
|
}
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
|
|
|
|
Ok(tokio::task::spawn(async move {
|
|
|
|
loop {
|
|
|
|
if let Err(error) = self.evaluate().await {
|
|
|
|
error!("failed to evaluate daemon events: {}", error);
|
|
|
|
time::sleep(Duration::from_secs(5)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
}
|