diff --git a/crates/daemon/src/reconcile/guest.rs b/crates/daemon/src/reconcile/guest.rs index 8c98563..4ca58f5 100644 --- a/crates/daemon/src/reconcile/guest.rs +++ b/crates/daemon/src/reconcile/guest.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, + time::Duration, +}; use anyhow::{anyhow, Result}; use krata::v1::{ @@ -10,7 +14,15 @@ use krata::v1::{ }; use kratart::{launch::GuestLaunchRequest, GuestInfo, Runtime}; use log::{error, info, trace, warn}; -use tokio::{select, sync::mpsc::Receiver, task::JoinHandle, time::sleep}; +use tokio::{ + select, + sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, Semaphore, + }, + task::JoinHandle, + time::sleep, +}; use uuid::Uuid; use crate::{ @@ -18,10 +30,24 @@ use crate::{ event::{DaemonEvent, DaemonEventContext}, }; +struct GuestReconcilerEntry { + task: JoinHandle<()>, + sender: Sender<()>, +} + +impl Drop for GuestReconcilerEntry { + fn drop(&mut self) { + self.task.abort(); + } +} + +#[derive(Clone)] pub struct GuestReconciler { guests: GuestStore, events: DaemonEventContext, runtime: Runtime, + limiter: Arc, + tasks: Arc>>, } impl GuestReconciler { @@ -30,6 +56,8 @@ impl GuestReconciler { guests, events, runtime, + limiter: Arc::new(Semaphore::new(10)), + tasks: Arc::new(Mutex::new(HashMap::new())), }) } @@ -47,8 +75,15 @@ impl GuestReconciler { }, Some(uuid) => { - if let Err(error) = self.reconcile(uuid).await { - error!("failed to reconcile guest {}: {}", uuid, error); + if let Err(error) = self.launch_task_if_needed(uuid).await { + error!("failed to start guest reconciler task {}: {}", uuid, error); + } + + let map = self.tasks.lock().await; + if let Some(entry) = map.get(&uuid) { + if let Err(error) = entry.sender.send(()).await { + error!("failed to notify guest reconciler task {}: {}", uuid, error); + } } } }, @@ -105,6 +140,7 @@ impl GuestReconciler { } pub async fn reconcile(&self, uuid: Uuid) -> Result<()> { + let _permit = self.limiter.acquire().await?; let Some(mut guest) = self.guests.read(uuid).await? else { warn!( "notified of reconcile for guest {} but it didn't exist", @@ -151,6 +187,8 @@ impl GuestReconciler { if destroyed { self.guests.remove(uuid).await?; + let mut map = self.tasks.lock().await; + map.remove(&uuid); } else { self.guests.update(uuid, guest.clone()).await?; } @@ -225,6 +263,30 @@ impl GuestReconciler { }); Ok(true) } + + async fn launch_task_if_needed(&self, uuid: Uuid) -> Result<()> { + let mut map = self.tasks.lock().await; + match map.entry(uuid) { + Entry::Occupied(_) => {} + Entry::Vacant(entry) => { + entry.insert(self.launch_task(uuid).await?); + } + } + Ok(()) + } + + async fn launch_task(&self, uuid: Uuid) -> Result { + let this = self.clone(); + let (sender, mut receiver) = channel(10); + let task = tokio::task::spawn(async move { + while receiver.recv().await.is_some() { + if let Err(error) = this.reconcile(uuid).await { + error!("failed to reconcile guest {}: {}", uuid, error); + } + } + }); + Ok(GuestReconcilerEntry { task, sender }) + } } fn empty_vec_optional(value: Vec) -> Option> {