From 9bbf8420f221ca73a9f29425aa3cf62a20904b5c Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Thu, 14 Mar 2024 14:03:11 +0000 Subject: [PATCH] krata: implement guest reconciliation --- Cargo.toml | 2 + crates/krata/build.rs | 9 +- crates/krata/proto/krata/common.proto | 61 +++++++ crates/krata/proto/krata/control.proto | 59 ++----- crates/krata/src/common.rs | 1 + crates/krata/src/lib.rs | 1 + crates/kratactl/bin/control.rs | 138 +++++++++++----- crates/kratactl/src/console.rs | 35 ++-- crates/kratad/Cargo.toml | 5 + crates/kratad/bin/daemon.rs | 8 - crates/kratad/build.rs | 8 + crates/kratad/proto/kratad/db.proto | 10 ++ crates/kratad/src/control.rs | 172 ++++++++++++-------- crates/kratad/src/db/mod.rs | 82 ++++++++++ crates/kratad/src/db/proto.rs | 1 + crates/kratad/src/event.rs | 100 ++++++------ crates/kratad/src/lib.rs | 113 +++++-------- crates/kratad/src/reconcile/mod.rs | 216 +++++++++++++++++++++++++ crates/kratad/src/tab.rs | 20 --- crates/kratart/src/launch/mod.rs | 3 +- hack/code/autofix.sh | 2 +- 21 files changed, 717 insertions(+), 329 deletions(-) create mode 100644 crates/krata/proto/krata/common.proto create mode 100644 crates/krata/src/common.rs create mode 100644 crates/kratad/build.rs create mode 100644 crates/kratad/proto/kratad/db.proto create mode 100644 crates/kratad/src/db/mod.rs create mode 100644 crates/kratad/src/db/proto.rs create mode 100644 crates/kratad/src/reconcile/mod.rs delete mode 100644 crates/kratad/src/tab.rs diff --git a/Cargo.toml b/Cargo.toml index 271503f..73b1620 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,9 @@ oci-spec = "0.6.4" path-absolutize = "3.1.1" path-clean = "1.0.1" prost = "0.12.3" +prost-build = "0.12.3" rand = "0.8.5" +redb = "1.5.0" rtnetlink = "0.14.1" serde_json = "1.0.113" sha256 = "1.5.0" diff --git a/crates/krata/build.rs b/crates/krata/build.rs index 6a18bf2..682ec54 100644 --- a/crates/krata/build.rs +++ b/crates/krata/build.rs @@ -1,5 +1,6 @@ -fn main() { - tonic_build::configure() - .compile(&["proto/krata/control.proto"], &["proto"]) - .unwrap(); +use std::io::Result; + +fn main() -> Result<()> { + tonic_build::configure().compile(&["proto/krata/control.proto"], &["proto/"])?; + Ok(()) } diff --git a/crates/krata/proto/krata/common.proto b/crates/krata/proto/krata/common.proto new file mode 100644 index 0000000..bba0601 --- /dev/null +++ b/crates/krata/proto/krata/common.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "dev.krata.proto.common"; +option java_outer_classname = "CommonProto"; + +package krata.common; + +message GuestOciImageSpec { + string image = 1; +} + +message GuestImageSpec { + oneof image { + GuestOciImageSpec oci = 1; + } +} + +message GuestSpec { + string name = 1; + GuestImageSpec image = 2; + uint32 vcpus = 3; + uint64 mem = 4; + repeated string env = 5; + repeated string run = 6; +} + +message GuestNetworkState { + string ipv4 = 1; + string ipv6 = 2; +} + +message GuestExitInfo { + int32 code = 1; +} + +message GuestErrorInfo { + string message = 1; +} + +enum GuestStatus { + GUEST_STATUS_UNKNOWN = 0; + GUEST_STATUS_START = 1; + GUEST_STATUS_STARTED = 2; + GUEST_STATUS_EXITED = 3; + GUEST_STATUS_DESTROY = 4; + GUEST_STATUS_DESTROYED = 5; +} + +message GuestState { + GuestStatus status = 1; + GuestExitInfo exit_info = 2; + GuestErrorInfo error_info = 3; +} + +message Guest { + string id = 1; + GuestState state = 2; + GuestSpec spec = 3; + GuestNetworkState network = 4; +} diff --git a/crates/krata/proto/krata/control.proto b/crates/krata/proto/krata/control.proto index d05604f..799e861 100644 --- a/crates/krata/proto/krata/control.proto +++ b/crates/krata/proto/krata/control.proto @@ -6,45 +6,20 @@ option java_outer_classname = "ControlProto"; package krata.control; -message GuestOciImageSpec { - string image = 1; +import "krata/common.proto"; + +message CreateGuestRequest { + krata.common.GuestSpec spec = 1; } -message GuestImageSpec { - oneof image { - GuestOciImageSpec oci = 1; - } -} - -message GuestNetworkInfo { - string ipv4 = 1; - string ipv6 = 2; -} - -message GuestInfo { - string id = 1; - string name = 2; - GuestImageSpec image = 3; - GuestNetworkInfo network = 4; -} - -message LaunchGuestRequest { - string name = 1; - GuestImageSpec image = 2; - uint32 vcpus = 3; - uint64 mem = 4; - repeated string env = 5; - repeated string run = 6; -} - -message LaunchGuestReply { - GuestInfo guest = 1; +message CreateGuestReply { + string guest_id = 1; } message ListGuestsRequest {} message ListGuestsReply { - repeated GuestInfo guests = 1; + repeated krata.common.Guest guests = 1; } message DestroyGuestRequest { @@ -64,32 +39,20 @@ message ConsoleDataReply { message WatchEventsRequest {} -message GuestLaunchedEvent { - string guest_id = 1; -} - -message GuestDestroyedEvent { - string guest_id = 1; -} - -message GuestExitedEvent { - string guest_id = 1; - int32 code = 2; +message GuestChangedEvent { + krata.common.Guest guest = 1; } message WatchEventsReply { oneof event { - GuestLaunchedEvent guest_launched = 1; - GuestDestroyedEvent guest_destroyed = 2; - GuestExitedEvent guest_exited = 3; + GuestChangedEvent guest_changed = 1; } } service ControlService { - rpc LaunchGuest(LaunchGuestRequest) returns (LaunchGuestReply); + rpc CreateGuest(CreateGuestRequest) returns (CreateGuestReply); rpc DestroyGuest(DestroyGuestRequest) returns (DestroyGuestReply); rpc ListGuests(ListGuestsRequest) returns (ListGuestsReply); rpc ConsoleData(stream ConsoleDataRequest) returns (stream ConsoleDataReply); - rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply); } diff --git a/crates/krata/src/common.rs b/crates/krata/src/common.rs new file mode 100644 index 0000000..403d037 --- /dev/null +++ b/crates/krata/src/common.rs @@ -0,0 +1 @@ +tonic::include_proto!("krata.common"); diff --git a/crates/krata/src/lib.rs b/crates/krata/src/lib.rs index 5aceb24..ed74a4c 100644 --- a/crates/krata/src/lib.rs +++ b/crates/krata/src/lib.rs @@ -1,3 +1,4 @@ +pub mod common; pub mod control; pub mod dial; pub mod ethtool; diff --git a/crates/kratactl/bin/control.rs b/crates/kratactl/bin/control.rs index a3b1b75..2bf1e80 100644 --- a/crates/kratactl/bin/control.rs +++ b/crates/kratactl/bin/control.rs @@ -1,11 +1,19 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use clap::{Parser, Subcommand}; use env_logger::Env; -use krata::control::{ - guest_image_spec::Image, watch_events_reply::Event, DestroyGuestRequest, GuestImageSpec, - GuestOciImageSpec, LaunchGuestRequest, ListGuestsRequest, WatchEventsRequest, +use krata::{ + common::{ + guest_image_spec::Image, GuestImageSpec, GuestOciImageSpec, GuestSpec, GuestState, + GuestStatus, + }, + control::{ + watch_events_reply::Event, CreateGuestRequest, DestroyGuestRequest, ListGuestsRequest, + WatchEventsRequest, + }, }; use kratactl::{client::ControlClientProvider, console::StdioConsoleStream}; +use log::error; +use tokio_stream::StreamExt; use tonic::Request; #[derive(Parser, Debug)] @@ -54,7 +62,7 @@ async fn main() -> Result<()> { let args = ControllerArgs::parse(); let mut client = ControlClientProvider::dial(args.connection.parse()?).await?; - let events = client + let mut events = client .watch_events(WatchEventsRequest {}) .await? .into_inner(); @@ -69,33 +77,67 @@ async fn main() -> Result<()> { env, run, } => { - let request = LaunchGuestRequest { - name: name.unwrap_or_default(), - image: Some(GuestImageSpec { - image: Some(Image::Oci(GuestOciImageSpec { image: oci })), + let request = CreateGuestRequest { + spec: Some(GuestSpec { + name: name.unwrap_or_default(), + image: Some(GuestImageSpec { + image: Some(Image::Oci(GuestOciImageSpec { image: oci })), + }), + vcpus: cpus, + mem, + env: env.unwrap_or_default(), + run, }), - vcpus: cpus, - mem, - env: env.unwrap_or_default(), - run, }; let response = client - .launch_guest(Request::new(request)) + .create_guest(Request::new(request)) .await? .into_inner(); - let Some(guest) = response.guest else { - return Err(anyhow!( - "control service did not return a guest in the response" - )); - }; - println!("launched guest: {}", guest.id); + let id = response.guest_id; if attach { - let input = StdioConsoleStream::stdin_stream(guest.id.clone()).await; + while let Some(event) = events.next().await { + let reply = event?; + match reply.event { + Some(Event::GuestChanged(changed)) => { + let Some(guest) = changed.guest else { + continue; + }; + + if guest.id != id { + continue; + } + + let Some(state) = guest.state else { + continue; + }; + + if let Some(ref error) = state.error_info { + error!("guest error: {}", error.message); + } + + if state.status() == GuestStatus::Destroyed { + error!("guest destroyed"); + std::process::exit(1); + } + + if state.status() == GuestStatus::Started { + break; + } + } + + None => { + continue; + } + } + } + let input = StdioConsoleStream::stdin_stream(id.clone()).await; let output = client.console_data(input).await?.into_inner(); let exit_hook_task = - StdioConsoleStream::guest_exit_hook(guest.id.clone(), events).await?; + StdioConsoleStream::guest_exit_hook(id.clone(), events).await?; StdioConsoleStream::stdout(output).await?; exit_hook_task.abort(); + } else { + println!("created guest: {}", id); } } @@ -123,7 +165,7 @@ async fn main() -> Result<()> { .await? .into_inner(); let mut table = cli_tables::Table::new(); - let header = vec!["name", "uuid", "ipv4", "ipv6", "image"]; + let header = vec!["name", "uuid", "state", "ipv4", "ipv6", "image"]; table.push_row(&header)?; for guest in response.guests { let ipv4 = guest @@ -136,7 +178,10 @@ async fn main() -> Result<()> { .as_ref() .map(|x| x.ipv6.as_str()) .unwrap_or("unknown"); - let image = guest + let Some(spec) = guest.spec else { + continue; + }; + let image = spec .image .map(|x| { x.image @@ -147,8 +192,9 @@ async fn main() -> Result<()> { }) .unwrap_or("unknown".to_string()); table.push_row_string(&vec![ - guest.name, + spec.name, guest.id, + format!("{}", guest_state_text(guest.state.unwrap_or_default())), ipv4.to_string(), ipv6.to_string(), image, @@ -172,19 +218,10 @@ async fn main() -> Result<()> { }; match event { - Event::GuestLaunched(launched) => { - println!("event=guest.launched guest={}", launched.guest_id); - } - - Event::GuestDestroyed(destroyed) => { - println!("event=guest.destroyed guest={}", destroyed.guest_id); - } - - Event::GuestExited(exited) => { - println!( - "event=guest.exited guest={} code={}", - exited.guest_id, exited.code - ); + Event::GuestChanged(changed) => { + if let Some(guest) = changed.guest { + println!("event=guest.changed guest={}", guest.id); + } } } } @@ -192,3 +229,28 @@ async fn main() -> Result<()> { } Ok(()) } + +fn guest_status_text(status: GuestStatus) -> String { + match status { + GuestStatus::Unknown => "unknown", + GuestStatus::Destroyed => "destroyed", + GuestStatus::Start => "starting", + GuestStatus::Exited => "exited", + GuestStatus::Started => "started", + _ => "unknown", + } + .to_string() +} + +fn guest_state_text(state: GuestState) -> String { + let mut text = guest_status_text(state.status()); + + if let Some(exit) = state.exit_info { + text.push_str(&format!(" (exit code: {})", exit.code)); + } + + if let Some(error) = state.error_info { + text.push_str(&format!(" (error: {})", error.message)); + } + text +} diff --git a/crates/kratactl/src/console.rs b/crates/kratactl/src/console.rs index 6afc265..38aeec8 100644 --- a/crates/kratactl/src/console.rs +++ b/crates/kratactl/src/console.rs @@ -5,8 +5,9 @@ use std::{ use anyhow::Result; use async_stream::stream; -use krata::control::{ - watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest, WatchEventsReply, +use krata::{ + common::GuestStatus, + control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest, WatchEventsReply}, }; use log::{debug, error, warn}; use termion::raw::IntoRawMode; @@ -76,22 +77,28 @@ impl StdioConsoleStream { }; match event { - Event::GuestExited(exit) => { - if exit.guest_id == id { - std::process::exit(exit.code); - } - } + Event::GuestChanged(changed) => { + let Some(guest) = changed.guest else { + continue; + }; - Event::GuestDestroyed(destroy) => { - if destroy.guest_id == id { - warn!("attached guest destroyed"); + let Some(state) = guest.state else { + continue; + }; + + if guest.id != id { + continue; + } + + if let Some(exit_info) = state.exit_info { + std::process::exit(exit_info.code); + } + + if state.status() == GuestStatus::Destroyed { + warn!("attached guest was destroyed"); std::process::exit(1); } } - - _ => { - continue; - } } } } diff --git a/crates/kratad/Cargo.toml b/crates/kratad/Cargo.toml index d6de10a..fa778ee 100644 --- a/crates/kratad/Cargo.toml +++ b/crates/kratad/Cargo.toml @@ -14,6 +14,8 @@ futures = { workspace = true } krata = { path = "../krata" } kratart = { path = "../kratart" } log = { workspace = true } +prost = { workspace = true } +redb = { workspace = true } serde = { workspace = true } serde_yaml = { workspace = true } signal-hook = { workspace = true } @@ -28,3 +30,6 @@ name = "kratad" [[bin]] name = "kratad" path = "bin/daemon.rs" + +[build-dependencies] +prost-build = { workspace = true } diff --git a/crates/kratad/bin/daemon.rs b/crates/kratad/bin/daemon.rs index 18aa163..5948cc1 100644 --- a/crates/kratad/bin/daemon.rs +++ b/crates/kratad/bin/daemon.rs @@ -4,7 +4,6 @@ use env_logger::Env; use krata::dial::ControlDialAddress; use kratad::Daemon; use kratart::Runtime; -use log::error; use std::{ str::FromStr, sync::{atomic::AtomicBool, Arc}, @@ -16,8 +15,6 @@ struct Args { listen: String, #[arg(short, long, default_value = "/var/lib/krata")] store: String, - #[arg(long, default_value = "false")] - no_load_guest_tab: bool, } #[tokio::main(flavor = "multi_thread", worker_threads = 10)] @@ -29,11 +26,6 @@ async fn main() -> Result<()> { let addr = ControlDialAddress::from_str(&args.listen)?; let runtime = Runtime::new(args.store.clone()).await?; let mut daemon = Daemon::new(args.store.clone(), runtime).await?; - if !args.no_load_guest_tab { - if let Err(error) = daemon.load_guest_tab().await { - error!("failed to load guest tab: {}", error); - } - } daemon.listen(addr).await?; Ok(()) } diff --git a/crates/kratad/build.rs b/crates/kratad/build.rs new file mode 100644 index 0000000..e21abcb --- /dev/null +++ b/crates/kratad/build.rs @@ -0,0 +1,8 @@ +use std::io::Result; + +fn main() -> Result<()> { + prost_build::Config::new() + .extern_path(".krata.common", "::krata::common") + .compile_protos(&["proto/kratad/db.proto"], &["proto/", "../krata/proto"])?; + Ok(()) +} diff --git a/crates/kratad/proto/kratad/db.proto b/crates/kratad/proto/kratad/db.proto new file mode 100644 index 0000000..b47f6c7 --- /dev/null +++ b/crates/kratad/proto/kratad/db.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package kratad.db; + +import "krata/common.proto"; + +message GuestEntry { + string id = 1; + krata.common.Guest guest = 2; +} diff --git a/crates/kratad/src/control.rs b/crates/kratad/src/control.rs index 2e0ac3d..c71f23d 100644 --- a/crates/kratad/src/control.rs +++ b/crates/kratad/src/control.rs @@ -1,22 +1,29 @@ -use std::{io, pin::Pin}; +use std::{io, pin::Pin, str::FromStr}; use async_stream::try_stream; use futures::Stream; -use krata::control::{ - control_service_server::ControlService, guest_image_spec::Image, ConsoleDataReply, - ConsoleDataRequest, DestroyGuestReply, DestroyGuestRequest, GuestImageSpec, GuestInfo, - GuestNetworkInfo, GuestOciImageSpec, LaunchGuestReply, LaunchGuestRequest, ListGuestsReply, - ListGuestsRequest, WatchEventsReply, WatchEventsRequest, +use krata::{ + common::{Guest, GuestState, GuestStatus}, + control::{ + control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest, + CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest, + ListGuestsReply, ListGuestsRequest, WatchEventsReply, WatchEventsRequest, + }, }; +use kratart::Runtime; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, select, + sync::mpsc::Sender, }; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; +use uuid::Uuid; -use crate::event::DaemonEventContext; -use kratart::{launch::GuestLaunchRequest, Runtime}; +use crate::{ + db::{proto::GuestEntry, GuestStore}, + event::DaemonEventContext, +}; pub struct ApiError { message: String, @@ -40,11 +47,23 @@ impl From for Status { pub struct RuntimeControlService { events: DaemonEventContext, runtime: Runtime, + guests: GuestStore, + guest_reconciler_notify: Sender, } impl RuntimeControlService { - pub fn new(events: DaemonEventContext, runtime: Runtime) -> Self { - Self { events, runtime } + pub fn new( + events: DaemonEventContext, + runtime: Runtime, + guests: GuestStore, + guest_reconciler_notify: Sender, + ) -> Self { + Self { + events, + runtime, + guests, + guest_reconciler_notify, + } } } @@ -61,45 +80,46 @@ impl ControlService for RuntimeControlService { type WatchEventsStream = Pin> + Send + 'static>>; - async fn launch_guest( + async fn create_guest( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let request = request.into_inner(); - let Some(image) = request.image else { + let Some(spec) = request.spec else { return Err(ApiError { - message: "image spec not provider".to_string(), + message: "guest spec not provided".to_string(), } .into()); }; - let oci = match image.image { - Some(Image::Oci(oci)) => oci, - None => { - return Err(ApiError { - message: "image spec not provided".to_string(), - } - .into()) - } - }; - let guest: GuestInfo = convert_guest_info( - self.runtime - .launch(GuestLaunchRequest { - name: if request.name.is_empty() { - None - } else { - Some(&request.name) - }, - image: &oci.image, - vcpus: request.vcpus, - mem: request.mem, - env: empty_vec_optional(request.env), - run: empty_vec_optional(request.run), - debug: false, - }) - .await - .map_err(ApiError::from)?, - ); - Ok(Response::new(LaunchGuestReply { guest: Some(guest) })) + let uuid = Uuid::new_v4(); + self.guests + .update( + uuid, + GuestEntry { + id: uuid.to_string(), + guest: Some(Guest { + id: uuid.to_string(), + state: Some(GuestState { + status: GuestStatus::Start.into(), + exit_info: None, + error_info: None, + }), + spec: Some(spec), + network: None, + }), + }, + ) + .await + .map_err(ApiError::from)?; + self.guest_reconciler_notify + .send(uuid) + .await + .map_err(|x| ApiError { + message: x.to_string(), + })?; + Ok(Response::new(CreateGuestReply { + guest_id: uuid.to_string(), + })) } async fn destroy_guest( @@ -107,10 +127,42 @@ impl ControlService for RuntimeControlService { request: Request, ) -> Result, Status> { let request = request.into_inner(); - self.runtime - .destroy(&request.guest_id) + let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError { + message: error.to_string(), + })?; + let Some(mut entry) = self.guests.read(uuid).await.map_err(ApiError::from)? else { + return Err(ApiError { + message: "guest not found".to_string(), + } + .into()); + }; + let Some(ref mut guest) = entry.guest else { + return Err(ApiError { + message: "guest not found".to_string(), + } + .into()); + }; + + guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default()); + + if guest.state.as_ref().unwrap().status() == GuestStatus::Destroyed { + return Err(ApiError { + message: "guest already destroyed".to_string(), + } + .into()); + } + + guest.state.as_mut().unwrap().status = GuestStatus::Destroy.into(); + self.guests + .update(uuid, entry) .await .map_err(ApiError::from)?; + self.guest_reconciler_notify + .send(uuid) + .await + .map_err(|x| ApiError { + message: x.to_string(), + })?; Ok(Response::new(DestroyGuestReply {})) } @@ -119,11 +171,11 @@ impl ControlService for RuntimeControlService { request: Request, ) -> Result, Status> { let _ = request.into_inner(); - let guests = self.runtime.list().await.map_err(ApiError::from)?; + let guests = self.guests.list().await.map_err(ApiError::from)?; let guests = guests - .into_iter() - .map(convert_guest_info) - .collect::>(); + .into_values() + .filter_map(|entry| entry.guest) + .collect::>(); Ok(Response::new(ListGuestsReply { guests })) } @@ -191,25 +243,3 @@ impl ControlService for RuntimeControlService { Ok(Response::new(Box::pin(output) as Self::WatchEventsStream)) } } - -fn empty_vec_optional(value: Vec) -> Option> { - if value.is_empty() { - None - } else { - Some(value) - } -} - -fn convert_guest_info(value: kratart::GuestInfo) -> GuestInfo { - GuestInfo { - name: value.name.unwrap_or_default(), - id: value.uuid.to_string(), - image: Some(GuestImageSpec { - image: Some(Image::Oci(GuestOciImageSpec { image: value.image })), - }), - network: Some(GuestNetworkInfo { - ipv4: value.ipv4.map(|x| x.ip().to_string()).unwrap_or_default(), - ipv6: value.ipv6.map(|x| x.ip().to_string()).unwrap_or_default(), - }), - } -} diff --git a/crates/kratad/src/db/mod.rs b/crates/kratad/src/db/mod.rs new file mode 100644 index 0000000..2de9d8d --- /dev/null +++ b/crates/kratad/src/db/mod.rs @@ -0,0 +1,82 @@ +pub mod proto; + +use std::{collections::HashMap, path::Path, sync::Arc}; + +use self::proto::GuestEntry; +use anyhow::Result; +use log::error; +use prost::Message; +use redb::{Database, ReadableTable, TableDefinition}; +use uuid::Uuid; + +const GUESTS: TableDefinition = TableDefinition::new("guests"); + +#[derive(Clone)] +pub struct GuestStore { + database: Arc, +} + +impl GuestStore { + pub fn open(path: &Path) -> Result { + let database = Database::create(path)?; + let write = database.begin_write()?; + let _ = write.open_table(GUESTS); + write.commit()?; + Ok(GuestStore { + database: Arc::new(database), + }) + } + + pub async fn read(&self, id: Uuid) -> Result> { + let read = self.database.begin_read()?; + let table = read.open_table(GUESTS)?; + let Some(entry) = table.get(id.to_u128_le())? else { + return Ok(None); + }; + let bytes = entry.value(); + Ok(Some(GuestEntry::decode(bytes)?)) + } + + pub async fn list(&self) -> Result> { + let mut guests: HashMap = HashMap::new(); + let read = self.database.begin_read()?; + let table = read.open_table(GUESTS)?; + for result in table.iter()? { + let (key, value) = result?; + let uuid = Uuid::from_u128_le(key.value()); + let state = match GuestEntry::decode(value.value()) { + Ok(state) => state, + Err(error) => { + error!( + "found invalid guest state in database for uuid {}: {}", + uuid, error + ); + continue; + } + }; + guests.insert(uuid, state); + } + Ok(guests) + } + + pub async fn update(&self, id: Uuid, entry: GuestEntry) -> Result<()> { + let write = self.database.begin_write()?; + { + let mut table = write.open_table(GUESTS)?; + let bytes = entry.encode_to_vec(); + table.insert(id.to_u128_le(), bytes.as_slice())?; + } + write.commit()?; + Ok(()) + } + + pub async fn remove(&self, id: Uuid) -> Result<()> { + let write = self.database.begin_write()?; + { + let mut table = write.open_table(GUESTS)?; + table.remove(id.to_u128_le())?; + } + write.commit()?; + Ok(()) + } +} diff --git a/crates/kratad/src/db/proto.rs b/crates/kratad/src/db/proto.rs new file mode 100644 index 0000000..ddc1af5 --- /dev/null +++ b/crates/kratad/src/db/proto.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/kratad.db.rs")); diff --git a/crates/kratad/src/event.rs b/crates/kratad/src/event.rs index ceb1bc0..36828a8 100644 --- a/crates/kratad/src/event.rs +++ b/crates/kratad/src/event.rs @@ -1,13 +1,22 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Result; -use krata::control::{GuestDestroyedEvent, GuestExitedEvent, GuestLaunchedEvent}; -use log::{error, info, warn}; -use tokio::{sync::broadcast, task::JoinHandle, time}; +use krata::{ + common::{GuestExitInfo, GuestState, GuestStatus}, + control::watch_events_reply::Event, +}; +use log::error; +use tokio::{ + sync::{broadcast, mpsc::Sender}, + task::JoinHandle, + time, +}; use uuid::Uuid; use kratart::{GuestInfo, Runtime}; +use crate::db::GuestStore; + pub type DaemonEvent = krata::control::watch_events_reply::Event; const EVENT_CHANNEL_QUEUE_LEN: usize = 1000; @@ -21,21 +30,34 @@ impl DaemonEventContext { pub fn subscribe(&self) -> broadcast::Receiver { self.sender.subscribe() } + + pub fn send(&self, event: DaemonEvent) -> Result<()> { + let _ = self.sender.send(event); + Ok(()) + } } pub struct DaemonEventGenerator { runtime: Runtime, + guests: GuestStore, + guest_reconciler_notify: Sender, last: HashMap, - sender: broadcast::Sender, + _sender: broadcast::Sender, } impl DaemonEventGenerator { - pub async fn new(runtime: Runtime) -> Result<(DaemonEventContext, DaemonEventGenerator)> { + pub async fn new( + guests: GuestStore, + guest_reconciler_notify: Sender, + runtime: Runtime, + ) -> Result<(DaemonEventContext, DaemonEventGenerator)> { let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN); let generator = DaemonEventGenerator { runtime, + guests, + guest_reconciler_notify, last: HashMap::new(), - sender: sender.clone(), + _sender: sender.clone(), }; let context = DaemonEventContext { sender }; Ok((context, generator)) @@ -51,24 +73,7 @@ impl DaemonEventGenerator { map }; - let mut events: Vec = Vec::new(); - let mut exits: Vec = Vec::new(); - - for uuid in guests.keys() { - if !self.last.contains_key(uuid) { - events.push(DaemonEvent::GuestLaunched(GuestLaunchedEvent { - guest_id: uuid.to_string(), - })); - } - } - - for uuid in self.last.keys() { - if !guests.contains_key(uuid) { - events.push(DaemonEvent::GuestDestroyed(GuestDestroyedEvent { - guest_id: uuid.to_string(), - })); - } - } + let mut exits: Vec<(Uuid, i32)> = Vec::new(); for (uuid, guest) in &guests { let Some(last) = self.last.get(uuid) else { @@ -83,23 +88,27 @@ impl DaemonEventGenerator { continue; }; - let exit = GuestExitedEvent { - guest_id: uuid.to_string(), - code, - }; + exits.push((*uuid, code)); + } - exits.push(exit.clone()); - events.push(DaemonEvent::GuestExited(exit)); + for (uuid, code) in exits { + if let Some(mut entry) = self.guests.read(uuid).await? { + let Some(ref mut guest) = entry.guest else { + continue; + }; + + guest.state = Some(GuestState { + status: GuestStatus::Exited.into(), + exit_info: Some(GuestExitInfo { code }), + error_info: None, + }); + + self.guests.update(uuid, entry).await?; + self.guest_reconciler_notify.send(uuid).await?; + } } self.last = guests; - - for event in events { - let _ = self.sender.send(event); - } - - self.process_exit_auto_destroy(exits).await?; - Ok(()) } @@ -115,21 +124,4 @@ impl DaemonEventGenerator { } })) } - - async fn process_exit_auto_destroy(&mut self, exits: Vec) -> Result<()> { - for exit in exits { - if let Err(error) = self.runtime.destroy(&exit.guest_id).await { - warn!( - "failed to auto-destroy exited guest {}: {}", - exit.guest_id, error - ); - } else { - info!( - "auto-destroyed guest {}: exited with status {}", - exit.guest_id, exit.code - ); - } - } - Ok(()) - } } diff --git a/crates/kratad/src/lib.rs b/crates/kratad/src/lib.rs index c133d35..3bbb2a8 100644 --- a/crates/kratad/src/lib.rs +++ b/crates/kratad/src/lib.rs @@ -2,100 +2,72 @@ use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use anyhow::Result; use control::RuntimeControlService; +use db::GuestStore; use event::{DaemonEventContext, DaemonEventGenerator}; use krata::{control::control_service_server::ControlServiceServer, dial::ControlDialAddress}; -use kratart::{launch::GuestLaunchRequest, Runtime}; -use log::{info, warn}; -use tab::Tab; -use tokio::{fs, net::UnixListener, task::JoinHandle}; +use kratart::Runtime; +use log::info; +use reconcile::GuestReconciler; +use tokio::{ + net::UnixListener, + sync::mpsc::{channel, Sender}, + task::JoinHandle, +}; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::{Identity, Server, ServerTlsConfig}; +use uuid::Uuid; pub mod control; +pub mod db; pub mod event; -pub mod tab; +pub mod reconcile; pub struct Daemon { store: String, runtime: Runtime, + guests: GuestStore, events: DaemonEventContext, - task: JoinHandle<()>, + guest_reconciler_task: JoinHandle<()>, + guest_reconciler_notify: Sender, + generator_task: JoinHandle<()>, } +const GUEST_RECONCILER_QUEUE_LEN: usize = 1000; + impl Daemon { pub async fn new(store: String, runtime: Runtime) -> Result { + let guests_db_path = format!("{}/guests.db", store); + let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; let runtime_for_events = runtime.dupe().await?; - let (events, generator) = DaemonEventGenerator::new(runtime_for_events).await?; + let (guest_reconciler_notify, guest_reconciler_receiver) = + channel::(GUEST_RECONCILER_QUEUE_LEN); + let (events, generator) = DaemonEventGenerator::new( + guests.clone(), + guest_reconciler_notify.clone(), + runtime_for_events, + ) + .await?; + let runtime_for_reconciler = runtime.dupe().await?; + let guest_reconciler = + GuestReconciler::new(guests.clone(), events.clone(), runtime_for_reconciler)?; Ok(Self { store, runtime, + guests, events, - task: generator.launch().await?, + guest_reconciler_task: guest_reconciler.launch(guest_reconciler_receiver).await?, + guest_reconciler_notify, + generator_task: generator.launch().await?, }) } - pub async fn load_guest_tab(&mut self) -> Result<()> { - let tab_path = PathBuf::from(format!("{}/guests.yml", self.store)); - - if !tab_path.exists() { - return Ok(()); - } - - info!("loading guest tab"); - - let tab_content = fs::read_to_string(tab_path).await?; - let tab: Tab = serde_yaml::from_str(&tab_content)?; - let running = self.runtime.list().await?; - for (name, guest) in tab.guests { - let existing = running - .iter() - .filter(|x| x.name.is_some()) - .find(|run| *run.name.as_ref().unwrap() == name); - - if let Some(existing) = existing { - info!("guest {} is already running: {}", name, existing.uuid); - continue; - } - - let request = GuestLaunchRequest { - name: Some(&name), - image: &guest.image, - vcpus: guest.cpus, - mem: guest.mem, - env: if guest.env.is_empty() { - None - } else { - Some( - guest - .env - .iter() - .map(|(key, value)| format!("{}={}", key, value)) - .collect::>(), - ) - }, - run: if guest.run.is_empty() { - None - } else { - Some(guest.run) - }, - debug: false, - }; - match self.runtime.launch(request).await { - Err(error) => { - warn!("failed to launch guest {}: {}", name, error); - } - - Ok(info) => { - info!("launched guest {}: {}", name, info.uuid); - } - } - } - info!("loaded guest tab"); - Ok(()) - } - pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> { - let control_service = RuntimeControlService::new(self.events.clone(), self.runtime.clone()); + let control_service = RuntimeControlService::new( + self.events.clone(), + self.runtime.clone(), + self.guests.clone(), + self.guest_reconciler_notify.clone(), + ); let mut server = Server::builder(); @@ -147,6 +119,7 @@ impl Daemon { impl Drop for Daemon { fn drop(&mut self) { - self.task.abort(); + self.guest_reconciler_task.abort(); + self.generator_task.abort(); } } diff --git a/crates/kratad/src/reconcile/mod.rs b/crates/kratad/src/reconcile/mod.rs new file mode 100644 index 0000000..f218e1d --- /dev/null +++ b/crates/kratad/src/reconcile/mod.rs @@ -0,0 +1,216 @@ +use anyhow::{anyhow, Result}; +use krata::{ + common::{ + guest_image_spec::Image, Guest, GuestErrorInfo, GuestNetworkState, GuestState, GuestStatus, + }, + control::GuestChangedEvent, +}; +use kratart::{launch::GuestLaunchRequest, Runtime}; +use log::{error, info, warn}; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use uuid::Uuid; + +use crate::{ + db::GuestStore, + event::{DaemonEvent, DaemonEventContext}, +}; + +pub struct GuestReconciler { + guests: GuestStore, + events: DaemonEventContext, + runtime: Runtime, +} + +impl GuestReconciler { + pub fn new(guests: GuestStore, events: DaemonEventContext, runtime: Runtime) -> Result { + Ok(Self { + guests, + events, + runtime, + }) + } + + pub async fn launch(self, mut notify: Receiver) -> Result> { + Ok(tokio::task::spawn(async move { + if let Err(error) = self.reconcile_runtime().await { + error!("runtime reconciler failed: {}", error); + } + + loop { + let Some(uuid) = notify.recv().await else { + break; + }; + if let Err(error) = self.reconcile(uuid).await { + error!("guest reconciler failed: {}", error); + } + } + })) + } + + pub async fn reconcile_runtime(&self) -> Result<()> { + let runtime_guests = self.runtime.list().await?; + let stored_guests = self.guests.list().await?; + for (uuid, mut stored_guest_entry) in stored_guests { + let Some(ref mut stored_guest) = stored_guest_entry.guest else { + warn!("removing unpopulated guest entry for guest {}", uuid); + self.guests.remove(uuid).await?; + continue; + }; + let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid); + match runtime_guest { + None => { + let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); + if state.status() == GuestStatus::Started { + state.status = GuestStatus::Start.into(); + } + stored_guest.state = Some(state); + stored_guest.network = None; + self.guests.update(uuid, stored_guest_entry).await?; + if let Err(error) = self.reconcile(uuid).await { + error!("failed to reconcile guest {}: {}", uuid, error); + } + } + + Some(_) => { + let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default(); + state.status = GuestStatus::Started.into(); + stored_guest.state = Some(state); + stored_guest.network = None; + self.guests.update(uuid, stored_guest_entry).await?; + if let Err(error) = self.reconcile(uuid).await { + error!("failed to reconcile guest {}: {}", uuid, error); + } + } + } + } + Ok(()) + } + + pub async fn reconcile(&self, uuid: Uuid) -> Result<()> { + let Some(mut entry) = self.guests.read(uuid).await? else { + warn!( + "notified of reconcile for guest {} but it didn't exist", + uuid + ); + return Ok(()); + }; + + info!("reconciling guest {}", uuid); + + let Some(ref mut guest) = entry.guest else { + return Ok(()); + }; + + self.events + .send(DaemonEvent::GuestChanged(GuestChangedEvent { + guest: Some(guest.clone()), + }))?; + + let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() { + GuestStatus::Start => self.start(uuid, guest).await.map(|_| true), + + GuestStatus::Destroy | GuestStatus::Exited => { + self.destroy(uuid, guest).await.map(|_| true) + } + + _ => Ok(false), + }; + + let changed = match result { + Ok(changed) => changed, + Err(error) => { + guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default()); + guest.state.as_mut().unwrap().error_info = Some(GuestErrorInfo { + message: error.to_string(), + }); + true + } + }; + + info!("reconciled guest {}", uuid); + + let destroyed = + guest.state.as_ref().map(|x| x.status()).unwrap_or_default() == GuestStatus::Destroyed; + + if changed { + let event = DaemonEvent::GuestChanged(GuestChangedEvent { + guest: Some(guest.clone()), + }); + + if destroyed { + self.guests.remove(uuid).await?; + } else { + self.guests.update(uuid, entry.clone()).await?; + } + + self.events.send(event)?; + } + + Ok(()) + } + + async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result<()> { + let Some(ref spec) = guest.spec else { + return Err(anyhow!("guest spec not specified")); + }; + + let Some(ref image) = spec.image else { + return Err(anyhow!("image spec not provided")); + }; + let oci = match image.image { + Some(Image::Oci(ref oci)) => oci, + None => { + return Err(anyhow!("oci spec not specified")); + } + }; + + let info = self + .runtime + .launch(GuestLaunchRequest { + uuid: Some(uuid), + name: if spec.name.is_empty() { + None + } else { + Some(&spec.name) + }, + image: &oci.image, + vcpus: spec.vcpus, + mem: spec.mem, + env: empty_vec_optional(spec.env.clone()), + run: empty_vec_optional(spec.run.clone()), + debug: false, + }) + .await?; + info!("started guest {}", uuid); + guest.network = Some(GuestNetworkState { + ipv4: info.ipv4.map(|x| x.ip().to_string()).unwrap_or_default(), + ipv6: info.ipv6.map(|x| x.ip().to_string()).unwrap_or_default(), + }); + guest.state = Some(GuestState { + status: GuestStatus::Started.into(), + exit_info: None, + error_info: None, + }); + Ok(()) + } + + async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result<()> { + self.runtime.destroy(&uuid.to_string()).await?; + info!("destroyed guest {}", uuid); + guest.network = None; + guest.state = Some(GuestState { + status: GuestStatus::Destroyed.into(), + exit_info: None, + error_info: None, + }); + Ok(()) + } +} + +fn empty_vec_optional(value: Vec) -> Option> { + if value.is_empty() { + None + } else { + Some(value) + } +} diff --git a/crates/kratad/src/tab.rs b/crates/kratad/src/tab.rs deleted file mode 100644 index 907b819..0000000 --- a/crates/kratad/src/tab.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::collections::HashMap; - -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Tab { - #[serde(default)] - pub guests: HashMap, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TabGuest { - pub image: String, - pub mem: u64, - pub cpus: u32, - #[serde(default)] - pub env: HashMap, - #[serde(default)] - pub run: Vec, -} diff --git a/crates/kratart/src/launch/mod.rs b/crates/kratart/src/launch/mod.rs index 240f8c2..019bb08 100644 --- a/crates/kratart/src/launch/mod.rs +++ b/crates/kratart/src/launch/mod.rs @@ -22,6 +22,7 @@ use crate::RuntimeContext; use super::{GuestInfo, GuestState}; pub struct GuestLaunchRequest<'a> { + pub uuid: Option, pub name: Option<&'a str>, pub image: &'a str, pub vcpus: u32, @@ -43,7 +44,7 @@ impl GuestLauncher { context: &mut RuntimeContext, request: GuestLaunchRequest<'r>, ) -> Result { - let uuid = Uuid::new_v4(); + let uuid = request.uuid.unwrap_or_else(Uuid::new_v4); let name = format!("krata-{uuid}"); let image_info = self.compile(request.image, &context.image_cache).await?; diff --git a/hack/code/autofix.sh b/hack/code/autofix.sh index f736388..c3c15fd 100755 --- a/hack/code/autofix.sh +++ b/hack/code/autofix.sh @@ -5,4 +5,4 @@ REAL_SCRIPT="$(realpath "${0}")" cd "$(dirname "${REAL_SCRIPT}")/../.." ./hack/build/cargo.sh clippy --fix --allow-dirty --allow-staged -cargo fmt --all +./hack/build/cargo.sh fmt --all