2024-03-23 02:10:30 +00:00
|
|
|
use std::{
|
|
|
|
collections::{hash_map::Entry, HashMap},
|
|
|
|
str::FromStr,
|
|
|
|
time::Duration,
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
use anyhow::Result;
|
2024-03-24 05:52:25 +00:00
|
|
|
use krata::v1::common::{GuestExitInfo, GuestState, GuestStatus};
|
2024-03-14 14:03:11 +00:00
|
|
|
use log::error;
|
|
|
|
use tokio::{
|
2024-03-15 17:36:26 +00:00
|
|
|
select,
|
|
|
|
sync::{
|
|
|
|
broadcast,
|
|
|
|
mpsc::{channel, Receiver, Sender},
|
|
|
|
},
|
2024-03-14 14:03:11 +00:00
|
|
|
task::JoinHandle,
|
|
|
|
time,
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
use kratart::Runtime;
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-14 14:03:11 +00:00
|
|
|
use crate::db::GuestStore;
|
|
|
|
|
2024-03-24 05:52:25 +00:00
|
|
|
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
const EVENT_CHANNEL_QUEUE_LEN: usize = 1000;
|
2024-03-15 17:36:26 +00:00
|
|
|
const EXIT_CODE_CHANNEL_QUEUE_LEN: usize = 1000;
|
2024-03-06 15:57:56 +00:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct DaemonEventContext {
|
|
|
|
sender: broadcast::Sender<DaemonEvent>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl DaemonEventContext {
|
|
|
|
pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
|
|
|
|
self.sender.subscribe()
|
|
|
|
}
|
2024-03-14 14:03:11 +00:00
|
|
|
|
|
|
|
pub fn send(&self, event: DaemonEvent) -> Result<()> {
|
|
|
|
let _ = self.sender.send(event);
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub struct DaemonEventGenerator {
|
|
|
|
runtime: Runtime,
|
2024-03-14 14:03:11 +00:00
|
|
|
guests: GuestStore,
|
|
|
|
guest_reconciler_notify: Sender<Uuid>,
|
2024-03-15 17:36:26 +00:00
|
|
|
feed: broadcast::Receiver<DaemonEvent>,
|
|
|
|
exit_code_sender: Sender<(Uuid, i32)>,
|
|
|
|
exit_code_receiver: Receiver<(Uuid, i32)>,
|
|
|
|
exit_code_handles: HashMap<Uuid, JoinHandle<()>>,
|
|
|
|
_event_sender: broadcast::Sender<DaemonEvent>,
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl DaemonEventGenerator {
|
2024-03-14 14:03:11 +00:00
|
|
|
pub async fn new(
|
|
|
|
guests: GuestStore,
|
|
|
|
guest_reconciler_notify: Sender<Uuid>,
|
|
|
|
runtime: Runtime,
|
|
|
|
) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
|
2024-03-06 15:57:56 +00:00
|
|
|
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
|
2024-03-15 17:36:26 +00:00
|
|
|
let (exit_code_sender, exit_code_receiver) = channel(EXIT_CODE_CHANNEL_QUEUE_LEN);
|
2024-03-06 15:57:56 +00:00
|
|
|
let generator = DaemonEventGenerator {
|
|
|
|
runtime,
|
2024-03-14 14:03:11 +00:00
|
|
|
guests,
|
|
|
|
guest_reconciler_notify,
|
2024-03-15 17:36:26 +00:00
|
|
|
feed: sender.subscribe(),
|
|
|
|
exit_code_receiver,
|
|
|
|
exit_code_sender,
|
|
|
|
exit_code_handles: HashMap::new(),
|
|
|
|
_event_sender: sender.clone(),
|
2024-03-06 15:57:56 +00:00
|
|
|
};
|
|
|
|
let context = DaemonEventContext { sender };
|
|
|
|
Ok((context, generator))
|
|
|
|
}
|
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> {
|
|
|
|
match event {
|
|
|
|
DaemonEvent::GuestChanged(changed) => {
|
|
|
|
let Some(ref guest) = changed.guest else {
|
|
|
|
return Ok(());
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
let Some(ref state) = guest.state else {
|
|
|
|
return Ok(());
|
|
|
|
};
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
let status = state.status();
|
|
|
|
let id = Uuid::from_str(&guest.id)?;
|
|
|
|
match status {
|
|
|
|
GuestStatus::Started => {
|
2024-03-23 02:10:30 +00:00
|
|
|
if let Entry::Vacant(e) = self.exit_code_handles.entry(id) {
|
|
|
|
let handle = self
|
|
|
|
.runtime
|
|
|
|
.subscribe_exit_code(id, self.exit_code_sender.clone())
|
|
|
|
.await?;
|
|
|
|
e.insert(handle);
|
|
|
|
}
|
2024-03-15 17:36:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
GuestStatus::Destroyed => {
|
|
|
|
if let Some(handle) = self.exit_code_handles.remove(&id) {
|
|
|
|
handle.abort();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => {}
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
2024-03-15 17:36:26 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
|
|
|
|
if let Some(mut entry) = self.guests.read(id).await? {
|
|
|
|
let Some(ref mut guest) = entry.guest else {
|
|
|
|
return Ok(());
|
2024-03-06 15:57:56 +00:00
|
|
|
};
|
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
guest.state = Some(GuestState {
|
|
|
|
status: GuestStatus::Exited.into(),
|
2024-03-23 07:00:12 +00:00
|
|
|
network: guest.state.clone().unwrap_or_default().network,
|
2024-03-15 17:36:26 +00:00
|
|
|
exit_info: Some(GuestExitInfo { code }),
|
|
|
|
error_info: None,
|
2024-03-27 02:54:39 +00:00
|
|
|
domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
|
2024-03-15 17:36:26 +00:00
|
|
|
});
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
self.guests.update(id, entry).await?;
|
|
|
|
self.guest_reconciler_notify.send(id).await?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
|
2024-03-15 17:36:26 +00:00
|
|
|
async fn evaluate(&mut self) -> Result<()> {
|
|
|
|
select! {
|
|
|
|
x = self.exit_code_receiver.recv() => match x {
|
|
|
|
Some((uuid, code)) => {
|
|
|
|
self.handle_exit_code(uuid, code).await
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
},
|
|
|
|
x = self.feed.recv() => match x {
|
|
|
|
Ok(event) => {
|
|
|
|
self.handle_feed_event(&event).await
|
|
|
|
},
|
|
|
|
Err(error) => {
|
|
|
|
Err(error.into())
|
|
|
|
}
|
2024-03-14 14:03:11 +00:00
|
|
|
}
|
|
|
|
}
|
2024-03-06 15:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
|
|
|
|
Ok(tokio::task::spawn(async move {
|
|
|
|
loop {
|
|
|
|
if let Err(error) = self.evaluate().await {
|
|
|
|
error!("failed to evaluate daemon events: {}", error);
|
|
|
|
time::sleep(Duration::from_secs(5)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
}
|