diff --git a/crates/kratad/src/lib.rs b/crates/kratad/src/lib.rs index 3bbb2a8..719bcb7 100644 --- a/crates/kratad/src/lib.rs +++ b/crates/kratad/src/lib.rs @@ -7,7 +7,7 @@ use event::{DaemonEventContext, DaemonEventGenerator}; use krata::{control::control_service_server::ControlServiceServer, dial::ControlDialAddress}; use kratart::Runtime; use log::info; -use reconcile::GuestReconciler; +use reconcile::guest::GuestReconciler; use tokio::{ net::UnixListener, sync::mpsc::{channel, Sender}, diff --git a/crates/kratad/src/reconcile/guest.rs b/crates/kratad/src/reconcile/guest.rs new file mode 100644 index 0000000..1daa934 --- /dev/null +++ b/crates/kratad/src/reconcile/guest.rs @@ -0,0 +1,212 @@ +use anyhow::{anyhow, Result}; +use krata::{ + common::{ + guest_image_spec::Image, Guest, GuestErrorInfo, GuestNetworkState, GuestState, GuestStatus, + }, + control::GuestChangedEvent, +}; +use kratart::{launch::GuestLaunchRequest, Runtime}; +use log::{error, info, warn}; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use uuid::Uuid; + +use crate::{ + db::GuestStore, + event::{DaemonEvent, DaemonEventContext}, +}; + +pub struct GuestReconciler { + guests: GuestStore, + events: DaemonEventContext, + runtime: Runtime, +} + +impl GuestReconciler { + pub fn new(guests: GuestStore, events: DaemonEventContext, runtime: Runtime) -> Result { + Ok(Self { + guests, + events, + runtime, + }) + } + + pub async fn launch(self, mut notify: Receiver) -> Result> { + Ok(tokio::task::spawn(async move { + if let Err(error) = self.reconcile_runtime().await { + error!("runtime reconciler failed: {}", error); + } + + loop { + let Some(uuid) = notify.recv().await else { + break; + }; + if let Err(error) = self.reconcile(uuid).await { + error!("guest reconciler failed: {}", error); + } + } + })) + } + + pub async fn reconcile_runtime(&self) -> Result<()> { + let runtime_guests = self.runtime.list().await?; + let stored_guests = self.guests.list().await?; + for (uuid, mut stored_guest_entry) in stored_guests { + let Some(ref mut stored_guest) = stored_guest_entry.guest else { + warn!("removing unpopulated guest entry for guest {}", uuid); + self.guests.remove(uuid).await?; + continue; + }; + let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid); + match runtime_guest { + None => { + let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); + if state.status() == GuestStatus::Started { + state.status = GuestStatus::Start.into(); + } + stored_guest.state = Some(state); + stored_guest.network = None; + self.guests.update(uuid, stored_guest_entry).await?; + if let Err(error) = self.reconcile(uuid).await { + error!("failed to reconcile guest {}: {}", uuid, error); + } + } + + Some(_) => { + let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); + state.status = GuestStatus::Started.into(); + stored_guest.state = Some(state); + stored_guest.network = None; + self.guests.update(uuid, stored_guest_entry).await?; + if let Err(error) = self.reconcile(uuid).await { + error!("failed to reconcile guest {}: {}", uuid, error); + } + } + } + } + Ok(()) + } + + pub async fn reconcile(&self, uuid: Uuid) -> Result<()> { + let Some(mut entry) = self.guests.read(uuid).await? else { + warn!( + "notified of reconcile for guest {} but it didn't exist", + uuid + ); + return Ok(()); + }; + + info!("reconciling guest {}", uuid); + + let Some(ref mut guest) = entry.guest else { + return Ok(()); + }; + + self.events + .send(DaemonEvent::GuestChanged(GuestChangedEvent { + guest: Some(guest.clone()), + }))?; + + let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() { + GuestStatus::Start => self.start(uuid, guest).await, + GuestStatus::Destroy | GuestStatus::Exited => self.destroy(uuid, guest).await, + _ => Ok(false), + }; + + let changed = match result { + Ok(changed) => changed, + Err(error) => { + guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default()); + guest.state.as_mut().unwrap().error_info = Some(GuestErrorInfo { + message: error.to_string(), + }); + true + } + }; + + info!("reconciled guest {}", uuid); + + let destroyed = + guest.state.as_ref().map(|x| x.status()).unwrap_or_default() == GuestStatus::Destroyed; + + if changed { + let event = DaemonEvent::GuestChanged(GuestChangedEvent { + guest: Some(guest.clone()), + }); + + if destroyed { + self.guests.remove(uuid).await?; + } else { + self.guests.update(uuid, entry.clone()).await?; + } + + self.events.send(event)?; + } + + Ok(()) + } + + async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result { + let Some(ref spec) = guest.spec else { + return Err(anyhow!("guest spec not specified")); + }; + + let Some(ref image) = spec.image else { + return Err(anyhow!("image spec not provided")); + }; + let oci = match image.image { + Some(Image::Oci(ref oci)) => oci, + None => { + return Err(anyhow!("oci spec not specified")); + } + }; + + let info = self + .runtime + .launch(GuestLaunchRequest { + uuid: Some(uuid), + name: if spec.name.is_empty() { + None + } else { + Some(&spec.name) + }, + image: &oci.image, + vcpus: spec.vcpus, + mem: spec.mem, + env: empty_vec_optional(spec.env.clone()), + run: empty_vec_optional(spec.run.clone()), + debug: false, + }) + .await?; + info!("started guest {}", uuid); + guest.network = Some(GuestNetworkState { + ipv4: info.ipv4.map(|x| x.ip().to_string()).unwrap_or_default(), + ipv6: info.ipv6.map(|x| x.ip().to_string()).unwrap_or_default(), + }); + guest.state = Some(GuestState { + status: GuestStatus::Started.into(), + exit_info: None, + error_info: None, + }); + Ok(true) + } + + async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result { + self.runtime.destroy(uuid).await?; + info!("destroyed guest {}", uuid); + guest.network = None; + guest.state = Some(GuestState { + status: GuestStatus::Destroyed.into(), + exit_info: None, + error_info: None, + }); + Ok(true) + } +} + +fn empty_vec_optional(value: Vec) -> Option> { + if value.is_empty() { + None + } else { + Some(value) + } +} diff --git a/crates/kratad/src/reconcile/mod.rs b/crates/kratad/src/reconcile/mod.rs index 1daa934..051b1c0 100644 --- a/crates/kratad/src/reconcile/mod.rs +++ b/crates/kratad/src/reconcile/mod.rs @@ -1,212 +1 @@ -use anyhow::{anyhow, Result}; -use krata::{ - common::{ - guest_image_spec::Image, Guest, GuestErrorInfo, GuestNetworkState, GuestState, GuestStatus, - }, - control::GuestChangedEvent, -}; -use kratart::{launch::GuestLaunchRequest, Runtime}; -use log::{error, info, warn}; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; -use uuid::Uuid; - -use crate::{ - db::GuestStore, - event::{DaemonEvent, DaemonEventContext}, -}; - -pub struct GuestReconciler { - guests: GuestStore, - events: DaemonEventContext, - runtime: Runtime, -} - -impl GuestReconciler { - pub fn new(guests: GuestStore, events: DaemonEventContext, runtime: Runtime) -> Result { - Ok(Self { - guests, - events, - runtime, - }) - } - - pub async fn launch(self, mut notify: Receiver) -> Result> { - Ok(tokio::task::spawn(async move { - if let Err(error) = self.reconcile_runtime().await { - error!("runtime reconciler failed: {}", error); - } - - loop { - let Some(uuid) = notify.recv().await else { - break; - }; - if let Err(error) = self.reconcile(uuid).await { - error!("guest reconciler failed: {}", error); - } - } - })) - } - - pub async fn reconcile_runtime(&self) -> Result<()> { - let runtime_guests = self.runtime.list().await?; - let stored_guests = self.guests.list().await?; - for (uuid, mut stored_guest_entry) in stored_guests { - let Some(ref mut stored_guest) = stored_guest_entry.guest else { - warn!("removing unpopulated guest entry for guest {}", uuid); - self.guests.remove(uuid).await?; - continue; - }; - let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid); - match runtime_guest { - None => { - let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); - if state.status() == GuestStatus::Started { - state.status = GuestStatus::Start.into(); - } - stored_guest.state = Some(state); - stored_guest.network = None; - self.guests.update(uuid, stored_guest_entry).await?; - if let Err(error) = self.reconcile(uuid).await { - error!("failed to reconcile guest {}: {}", uuid, error); - } - } - - Some(_) => { - let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); - state.status = GuestStatus::Started.into(); - stored_guest.state = Some(state); - stored_guest.network = None; - self.guests.update(uuid, stored_guest_entry).await?; - if let Err(error) = self.reconcile(uuid).await { - error!("failed to reconcile guest {}: {}", uuid, error); - } - } - } - } - Ok(()) - } - - pub async fn reconcile(&self, uuid: Uuid) -> Result<()> { - let Some(mut entry) = self.guests.read(uuid).await? else { - warn!( - "notified of reconcile for guest {} but it didn't exist", - uuid - ); - return Ok(()); - }; - - info!("reconciling guest {}", uuid); - - let Some(ref mut guest) = entry.guest else { - return Ok(()); - }; - - self.events - .send(DaemonEvent::GuestChanged(GuestChangedEvent { - guest: Some(guest.clone()), - }))?; - - let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() { - GuestStatus::Start => self.start(uuid, guest).await, - GuestStatus::Destroy | GuestStatus::Exited => self.destroy(uuid, guest).await, - _ => Ok(false), - }; - - let changed = match result { - Ok(changed) => changed, - Err(error) => { - guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default()); - guest.state.as_mut().unwrap().error_info = Some(GuestErrorInfo { - message: error.to_string(), - }); - true - } - }; - - info!("reconciled guest {}", uuid); - - let destroyed = - guest.state.as_ref().map(|x| x.status()).unwrap_or_default() == GuestStatus::Destroyed; - - if changed { - let event = DaemonEvent::GuestChanged(GuestChangedEvent { - guest: Some(guest.clone()), - }); - - if destroyed { - self.guests.remove(uuid).await?; - } else { - self.guests.update(uuid, entry.clone()).await?; - } - - self.events.send(event)?; - } - - Ok(()) - } - - async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result { - let Some(ref spec) = guest.spec else { - return Err(anyhow!("guest spec not specified")); - }; - - let Some(ref image) = spec.image else { - return Err(anyhow!("image spec not provided")); - }; - let oci = match image.image { - Some(Image::Oci(ref oci)) => oci, - None => { - return Err(anyhow!("oci spec not specified")); - } - }; - - let info = self - .runtime - .launch(GuestLaunchRequest { - uuid: Some(uuid), - name: if spec.name.is_empty() { - None - } else { - Some(&spec.name) - }, - image: &oci.image, - vcpus: spec.vcpus, - mem: spec.mem, - env: empty_vec_optional(spec.env.clone()), - run: empty_vec_optional(spec.run.clone()), - debug: false, - }) - .await?; - info!("started guest {}", uuid); - guest.network = Some(GuestNetworkState { - ipv4: info.ipv4.map(|x| x.ip().to_string()).unwrap_or_default(), - ipv6: info.ipv6.map(|x| x.ip().to_string()).unwrap_or_default(), - }); - guest.state = Some(GuestState { - status: GuestStatus::Started.into(), - exit_info: None, - error_info: None, - }); - Ok(true) - } - - async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result { - self.runtime.destroy(uuid).await?; - info!("destroyed guest {}", uuid); - guest.network = None; - guest.state = Some(GuestState { - status: GuestStatus::Destroyed.into(), - exit_info: None, - error_info: None, - }); - Ok(true) - } -} - -fn empty_vec_optional(value: Vec) -> Option> { - if value.is_empty() { - None - } else { - Some(value) - } -} +pub mod guest; diff --git a/crates/krataguest/Cargo.toml b/crates/krataguest/Cargo.toml index 5015981..6604d9d 100644 --- a/crates/krataguest/Cargo.toml +++ b/crates/krataguest/Cargo.toml @@ -10,6 +10,7 @@ env_logger = { workspace = true } futures = { workspace = true } ipnetwork = { workspace = true } krata = { path = "../krata" } +libc = { workspace = true } log = { workspace = true } nix = { workspace = true, features = ["process"] } oci-spec = { workspace = true } diff --git a/crates/krataguest/src/background.rs b/crates/krataguest/src/background.rs index 3f5e261..fee48b5 100644 --- a/crates/krataguest/src/background.rs +++ b/crates/krataguest/src/background.rs @@ -6,14 +6,14 @@ use nix::{libc::c_int, unistd::Pid}; use tokio::{select, time::sleep}; use xenstore::{XsdClient, XsdInterface}; -pub struct ContainerBackground { +pub struct GuestBackground { child: Pid, wait: ChildWait, } -impl ContainerBackground { - pub async fn new(child: Pid) -> Result { - Ok(ContainerBackground { +impl GuestBackground { + pub async fn new(child: Pid) -> Result { + Ok(GuestBackground { child, wait: ChildWait::new()?, }) diff --git a/crates/krataguest/src/childwait.rs b/crates/krataguest/src/childwait.rs index 1e5b7c5..331e8c0 100644 --- a/crates/krataguest/src/childwait.rs +++ b/crates/krataguest/src/childwait.rs @@ -8,11 +8,9 @@ use std::{ }; use anyhow::Result; +use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED}; use log::warn; -use nix::{ - libc::{c_int, wait}, - unistd::Pid, -}; +use nix::unistd::Pid; use tokio::sync::mpsc::{channel, Receiver, Sender}; const CHILD_WAIT_QUEUE_LEN: usize = 10; @@ -63,15 +61,18 @@ impl ChildWaitTask { fn process(&mut self) -> Result<()> { loop { let mut status: c_int = 0; - let pid = unsafe { wait(addr_of_mut!(status)) }; - let event = ChildEvent { - pid: Pid::from_raw(pid), - status, - }; - let _ = self.sender.try_send(event); + let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) }; - if self.signal.load(Ordering::Acquire) { - return Ok(()); + if WIFEXITED(status) { + let event = ChildEvent { + pid: Pid::from_raw(pid), + status: WEXITSTATUS(status), + }; + let _ = self.sender.try_send(event); + + if self.signal.load(Ordering::Acquire) { + return Ok(()); + } } } } diff --git a/crates/krataguest/src/init.rs b/crates/krataguest/src/init.rs index e197157..3f13f4a 100644 --- a/crates/krataguest/src/init.rs +++ b/crates/krataguest/src/init.rs @@ -22,7 +22,7 @@ use std::{fs, io}; use sys_mount::{FilesystemType, Mount, MountFlags}; use walkdir::WalkDir; -use crate::background::ContainerBackground; +use crate::background::GuestBackground; const IMAGE_BLOCK_DEVICE_PATH: &str = "/dev/xvda"; const CONFIG_BLOCK_DEVICE_PATH: &str = "/dev/xvdb"; @@ -516,7 +516,7 @@ impl GuestInit { } async fn background(&mut self, executed: Pid) -> Result<()> { - let mut background = ContainerBackground::new(executed).await?; + let mut background = GuestBackground::new(executed).await?; background.run().await?; Ok(()) }