krata: implement parallel guest reconciliation

This commit is contained in:
Alex Zenla 2024-03-31 10:10:06 +00:00
parent 377b837db9
commit 6bf1d3f88c
No known key found for this signature in database
GPG Key ID: 067B238899B51269

View File

@ -1,4 +1,8 @@
use std::{collections::HashMap, time::Duration};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
time::Duration,
};
use anyhow::{anyhow, Result};
use krata::v1::{
@ -10,7 +14,15 @@ use krata::v1::{
};
use kratart::{launch::GuestLaunchRequest, GuestInfo, Runtime};
use log::{error, info, trace, warn};
use tokio::{select, sync::mpsc::Receiver, task::JoinHandle, time::sleep};
use tokio::{
select,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex, Semaphore,
},
task::JoinHandle,
time::sleep,
};
use uuid::Uuid;
use crate::{
@ -18,10 +30,24 @@ use crate::{
event::{DaemonEvent, DaemonEventContext},
};
struct GuestReconcilerEntry {
task: JoinHandle<()>,
sender: Sender<()>,
}
impl Drop for GuestReconcilerEntry {
fn drop(&mut self) {
self.task.abort();
}
}
#[derive(Clone)]
pub struct GuestReconciler {
guests: GuestStore,
events: DaemonEventContext,
runtime: Runtime,
limiter: Arc<Semaphore>,
tasks: Arc<Mutex<HashMap<Uuid, GuestReconcilerEntry>>>,
}
impl GuestReconciler {
@ -30,6 +56,8 @@ impl GuestReconciler {
guests,
events,
runtime,
limiter: Arc::new(Semaphore::new(10)),
tasks: Arc::new(Mutex::new(HashMap::new())),
})
}
@ -47,8 +75,15 @@ impl GuestReconciler {
},
Some(uuid) => {
if let Err(error) = self.reconcile(uuid).await {
error!("failed to reconcile guest {}: {}", uuid, error);
if let Err(error) = self.launch_task_if_needed(uuid).await {
error!("failed to start guest reconciler task {}: {}", uuid, error);
}
let map = self.tasks.lock().await;
if let Some(entry) = map.get(&uuid) {
if let Err(error) = entry.sender.send(()).await {
error!("failed to notify guest reconciler task {}: {}", uuid, error);
}
}
}
},
@ -105,6 +140,7 @@ impl GuestReconciler {
}
pub async fn reconcile(&self, uuid: Uuid) -> Result<()> {
let _permit = self.limiter.acquire().await?;
let Some(mut guest) = self.guests.read(uuid).await? else {
warn!(
"notified of reconcile for guest {} but it didn't exist",
@ -151,6 +187,8 @@ impl GuestReconciler {
if destroyed {
self.guests.remove(uuid).await?;
let mut map = self.tasks.lock().await;
map.remove(&uuid);
} else {
self.guests.update(uuid, guest.clone()).await?;
}
@ -225,6 +263,30 @@ impl GuestReconciler {
});
Ok(true)
}
async fn launch_task_if_needed(&self, uuid: Uuid) -> Result<()> {
let mut map = self.tasks.lock().await;
match map.entry(uuid) {
Entry::Occupied(_) => {}
Entry::Vacant(entry) => {
entry.insert(self.launch_task(uuid).await?);
}
}
Ok(())
}
async fn launch_task(&self, uuid: Uuid) -> Result<GuestReconcilerEntry> {
let this = self.clone();
let (sender, mut receiver) = channel(10);
let task = tokio::task::spawn(async move {
while receiver.recv().await.is_some() {
if let Err(error) = this.reconcile(uuid).await {
error!("failed to reconcile guest {}: {}", uuid, error);
}
}
});
Ok(GuestReconcilerEntry { task, sender })
}
}
fn empty_vec_optional<T>(value: Vec<T>) -> Option<Vec<T>> {