mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-04 13:41:31 +00:00
krata: implement idm support for guest <-> host messages
This commit is contained in:
@ -5,7 +5,10 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use krata::v1::common::{GuestExitInfo, GuestState, GuestStatus};
|
||||
use krata::{
|
||||
idm::protocol::{idm_packet::Message, IdmPacket},
|
||||
v1::common::{GuestExitInfo, GuestState, GuestStatus},
|
||||
};
|
||||
use log::error;
|
||||
use tokio::{
|
||||
select,
|
||||
@ -18,14 +21,15 @@ use tokio::{
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use kratart::Runtime;
|
||||
|
||||
use crate::db::GuestStore;
|
||||
use crate::{
|
||||
db::GuestStore,
|
||||
idm::{DaemonIdmHandle, DaemonIdmSubscribeHandle},
|
||||
};
|
||||
|
||||
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
|
||||
|
||||
const EVENT_CHANNEL_QUEUE_LEN: usize = 1000;
|
||||
const EXIT_CODE_CHANNEL_QUEUE_LEN: usize = 1000;
|
||||
const IDM_CHANNEL_QUEUE_LEN: usize = 1000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonEventContext {
|
||||
@ -44,13 +48,13 @@ impl DaemonEventContext {
|
||||
}
|
||||
|
||||
pub struct DaemonEventGenerator {
|
||||
runtime: Runtime,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
feed: broadcast::Receiver<DaemonEvent>,
|
||||
exit_code_sender: Sender<(Uuid, i32)>,
|
||||
exit_code_receiver: Receiver<(Uuid, i32)>,
|
||||
exit_code_handles: HashMap<Uuid, JoinHandle<()>>,
|
||||
idm: DaemonIdmHandle,
|
||||
idms: HashMap<u32, (Uuid, DaemonIdmSubscribeHandle)>,
|
||||
idm_sender: Sender<(u32, IdmPacket)>,
|
||||
idm_receiver: Receiver<(u32, IdmPacket)>,
|
||||
_event_sender: broadcast::Sender<DaemonEvent>,
|
||||
}
|
||||
|
||||
@ -58,18 +62,18 @@ impl DaemonEventGenerator {
|
||||
pub async fn new(
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
runtime: Runtime,
|
||||
idm: DaemonIdmHandle,
|
||||
) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
|
||||
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
|
||||
let (exit_code_sender, exit_code_receiver) = channel(EXIT_CODE_CHANNEL_QUEUE_LEN);
|
||||
let (idm_sender, idm_receiver) = channel(IDM_CHANNEL_QUEUE_LEN);
|
||||
let generator = DaemonEventGenerator {
|
||||
runtime,
|
||||
guests,
|
||||
guest_reconciler_notify,
|
||||
feed: sender.subscribe(),
|
||||
exit_code_receiver,
|
||||
exit_code_sender,
|
||||
exit_code_handles: HashMap::new(),
|
||||
idm,
|
||||
idms: HashMap::new(),
|
||||
idm_sender,
|
||||
idm_receiver,
|
||||
_event_sender: sender.clone(),
|
||||
};
|
||||
let context = DaemonEventContext { sender };
|
||||
@ -89,20 +93,19 @@ impl DaemonEventGenerator {
|
||||
|
||||
let status = state.status();
|
||||
let id = Uuid::from_str(&guest.id)?;
|
||||
let domid = state.domid;
|
||||
match status {
|
||||
GuestStatus::Started => {
|
||||
if let Entry::Vacant(e) = self.exit_code_handles.entry(id) {
|
||||
let handle = self
|
||||
.runtime
|
||||
.subscribe_exit_code(id, self.exit_code_sender.clone())
|
||||
.await?;
|
||||
e.insert(handle);
|
||||
if let Entry::Vacant(e) = self.idms.entry(domid) {
|
||||
let subscribe =
|
||||
self.idm.subscribe(domid, self.idm_sender.clone()).await?;
|
||||
e.insert((id, subscribe));
|
||||
}
|
||||
}
|
||||
|
||||
GuestStatus::Destroyed => {
|
||||
if let Some(handle) = self.exit_code_handles.remove(&id) {
|
||||
handle.abort();
|
||||
if let Some((_, handle)) = self.idms.remove(&domid) {
|
||||
handle.unsubscribe().await?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,6 +116,13 @@ impl DaemonEventGenerator {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> {
|
||||
if let Some(Message::Exit(exit)) = packet.message {
|
||||
self.handle_exit_code(id, exit.code).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
|
||||
if let Some(mut entry) = self.guests.read(id).await? {
|
||||
let Some(ref mut guest) = entry.guest else {
|
||||
@ -135,9 +145,12 @@ impl DaemonEventGenerator {
|
||||
|
||||
async fn evaluate(&mut self) -> Result<()> {
|
||||
select! {
|
||||
x = self.exit_code_receiver.recv() => match x {
|
||||
Some((uuid, code)) => {
|
||||
self.handle_exit_code(uuid, code).await
|
||||
x = self.idm_receiver.recv() => match x {
|
||||
Some((domid, packet)) => {
|
||||
if let Some((id, _)) = self.idms.get(&domid) {
|
||||
self.handle_idm_packet(*id, packet).await?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
None => {
|
||||
Ok(())
|
||||
|
Reference in New Issue
Block a user