krata: work on parallel reconciliation

This commit is contained in:
Alex Zenla
2024-04-02 00:56:18 +00:00
parent 6a2f1e6517
commit 8dd3cc7692
27 changed files with 582 additions and 428 deletions

View File

@ -49,8 +49,12 @@ impl Daemon {
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
.await?;
let runtime_for_reconciler = runtime.dupe().await?;
let guest_reconciler =
GuestReconciler::new(guests.clone(), events.clone(), runtime_for_reconciler)?;
let guest_reconciler = GuestReconciler::new(
guests.clone(),
events.clone(),
runtime_for_reconciler,
guest_reconciler_notify.clone(),
)?;
let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?;
let generator_task = generator.launch().await?;

View File

@ -18,7 +18,7 @@ use tokio::{
select,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex, Semaphore,
Mutex, RwLock,
},
task::JoinHandle,
time::sleep,
@ -30,6 +30,14 @@ use crate::{
event::{DaemonEvent, DaemonEventContext},
};
const PARALLEL_LIMIT: u32 = 5;
#[derive(Debug)]
enum GuestReconcilerResult {
Unchanged,
Changed { rerun: bool },
}
struct GuestReconcilerEntry {
task: JoinHandle<()>,
sender: Sender<()>,
@ -46,18 +54,25 @@ pub struct GuestReconciler {
guests: GuestStore,
events: DaemonEventContext,
runtime: Runtime,
limiter: Arc<Semaphore>,
tasks: Arc<Mutex<HashMap<Uuid, GuestReconcilerEntry>>>,
guest_reconciler_notify: Sender<Uuid>,
reconcile_lock: Arc<RwLock<()>>,
}
impl GuestReconciler {
pub fn new(guests: GuestStore, events: DaemonEventContext, runtime: Runtime) -> Result<Self> {
pub fn new(
guests: GuestStore,
events: DaemonEventContext,
runtime: Runtime,
guest_reconciler_notify: Sender<Uuid>,
) -> Result<Self> {
Ok(Self {
guests,
events,
runtime,
limiter: Arc::new(Semaphore::new(10)),
tasks: Arc::new(Mutex::new(HashMap::new())),
guest_reconciler_notify,
reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)),
})
}
@ -99,6 +114,7 @@ impl GuestReconciler {
}
pub async fn reconcile_runtime(&self, initial: bool) -> Result<()> {
let _permit = self.reconcile_lock.write().await;
trace!("reconciling runtime");
let runtime_guests = self.runtime.list().await?;
let stored_guests = self.guests.list().await?;
@ -131,22 +147,20 @@ impl GuestReconciler {
if changed || initial {
self.guests.update(uuid, stored_guest).await?;
if let Err(error) = self.reconcile(uuid).await {
error!("failed to reconcile guest {}: {}", uuid, error);
}
let _ = self.guest_reconciler_notify.try_send(uuid);
}
}
Ok(())
}
pub async fn reconcile(&self, uuid: Uuid) -> Result<()> {
let _permit = self.limiter.acquire().await?;
pub async fn reconcile(&self, uuid: Uuid) -> Result<bool> {
let _runtime_reconcile_permit = self.reconcile_lock.read().await;
let Some(mut guest) = self.guests.read(uuid).await? else {
warn!(
"notified of reconcile for guest {} but it didn't exist",
uuid
);
return Ok(());
return Ok(false);
};
info!("reconciling guest {}", uuid);
@ -156,14 +170,16 @@ impl GuestReconciler {
guest: Some(guest.clone()),
}))?;
let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() {
let start_status = guest.state.as_ref().map(|x| x.status()).unwrap_or_default();
let result = match start_status {
GuestStatus::Starting => self.start(uuid, &mut guest).await,
GuestStatus::Destroying | GuestStatus::Exited => self.destroy(uuid, &mut guest).await,
_ => Ok(false),
GuestStatus::Exited => self.exited(&mut guest).await,
GuestStatus::Destroying => self.destroy(uuid, &mut guest).await,
_ => Ok(GuestReconcilerResult::Unchanged),
};
let changed = match result {
Ok(changed) => changed,
let result = match result {
Ok(result) => result,
Err(error) => {
guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default());
guest.state.as_mut().unwrap().status = GuestStatus::Failed.into();
@ -171,16 +187,16 @@ impl GuestReconciler {
message: error.to_string(),
});
warn!("failed to start guest {}: {}", guest.id, error);
true
GuestReconcilerResult::Changed { rerun: false }
}
};
info!("reconciled guest {}", uuid);
let status = guest.state.as_ref().map(|x| x.status()).unwrap_or_default();
let destroyed = status == GuestStatus::Destroyed || status == GuestStatus::Failed;
let destroyed = status == GuestStatus::Destroyed;
if changed {
let rerun = if let GuestReconcilerResult::Changed { rerun } = result {
let event = DaemonEvent::GuestChanged(GuestChangedEvent {
guest: Some(guest.clone()),
});
@ -194,12 +210,15 @@ impl GuestReconciler {
}
self.events.send(event)?;
}
rerun
} else {
false
};
Ok(())
Ok(rerun)
}
async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result<bool> {
async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result<GuestReconcilerResult> {
let Some(ref spec) = guest.spec else {
return Err(anyhow!("guest spec not specified"));
};
@ -245,10 +264,19 @@ impl GuestReconciler {
error_info: None,
domid: info.domid,
});
Ok(true)
Ok(GuestReconcilerResult::Changed { rerun: false })
}
async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result<bool> {
async fn exited(&self, guest: &mut Guest) -> Result<GuestReconcilerResult> {
if let Some(ref mut state) = guest.state {
state.set_status(GuestStatus::Destroying);
Ok(GuestReconcilerResult::Changed { rerun: true })
} else {
Ok(GuestReconcilerResult::Unchanged)
}
}
async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result<GuestReconcilerResult> {
if let Err(error) = self.runtime.destroy(uuid).await {
trace!("failed to destroy runtime guest {}: {}", uuid, error);
}
@ -261,7 +289,7 @@ impl GuestReconciler {
error_info: None,
domid: guest.state.as_ref().map(|x| x.domid).unwrap_or(u32::MAX),
});
Ok(true)
Ok(GuestReconcilerResult::Changed { rerun: false })
}
async fn launch_task_if_needed(&self, uuid: Uuid) -> Result<()> {
@ -279,9 +307,24 @@ impl GuestReconciler {
let this = self.clone();
let (sender, mut receiver) = channel(10);
let task = tokio::task::spawn(async move {
while receiver.recv().await.is_some() {
if let Err(error) = this.reconcile(uuid).await {
error!("failed to reconcile guest {}: {}", uuid, error);
'notify_loop: loop {
if receiver.recv().await.is_none() {
break 'notify_loop;
}
'rerun_loop: loop {
let rerun = match this.reconcile(uuid).await {
Ok(rerun) => rerun,
Err(error) => {
error!("failed to reconcile guest {}: {}", uuid, error);
false
}
};
if rerun {
continue 'rerun_loop;
}
break 'rerun_loop;
}
}
});