mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
krata: restructure packages for cleanliness
This commit is contained in:
34
crates/daemon/Cargo.toml
Normal file
34
crates/daemon/Cargo.toml
Normal file
@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "krata-daemon"
|
||||
version.workspace = true
|
||||
edition = "2021"
|
||||
resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-stream = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
krata = { path = "../krata" }
|
||||
krata-runtime = { path = "../runtime" }
|
||||
log = { workspace = true }
|
||||
prost = { workspace = true }
|
||||
redb = { workspace = true }
|
||||
signal-hook = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
tonic = { workspace = true, features = ["tls"] }
|
||||
uuid = { workspace = true }
|
||||
|
||||
[lib]
|
||||
name = "kratad"
|
||||
|
||||
[[bin]]
|
||||
name = "kratad"
|
||||
path = "bin/daemon.rs"
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = { workspace = true }
|
40
crates/daemon/bin/daemon.rs
Normal file
40
crates/daemon/bin/daemon.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use env_logger::Env;
|
||||
use krata::dial::ControlDialAddress;
|
||||
use kratad::Daemon;
|
||||
use kratart::Runtime;
|
||||
use log::LevelFilter;
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
struct DaemonCommand {
|
||||
#[arg(short, long, default_value = "unix:///var/lib/krata/daemon.socket")]
|
||||
listen: String,
|
||||
#[arg(short, long, default_value = "/var/lib/krata")]
|
||||
store: String,
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::Builder::from_env(Env::default().default_filter_or("info"))
|
||||
.filter(Some("backhand::filesystem::writer"), LevelFilter::Warn)
|
||||
.init();
|
||||
mask_sighup()?;
|
||||
|
||||
let args = DaemonCommand::parse();
|
||||
let addr = ControlDialAddress::from_str(&args.listen)?;
|
||||
let runtime = Runtime::new(args.store.clone()).await?;
|
||||
let mut daemon = Daemon::new(args.store.clone(), runtime).await?;
|
||||
daemon.listen(addr).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mask_sighup() -> Result<()> {
|
||||
let flag = Arc::new(AtomicBool::new(false));
|
||||
signal_hook::flag::register(signal_hook::consts::SIGHUP, flag)?;
|
||||
Ok(())
|
||||
}
|
8
crates/daemon/build.rs
Normal file
8
crates/daemon/build.rs
Normal file
@ -0,0 +1,8 @@
|
||||
use std::io::Result;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
prost_build::Config::new()
|
||||
.extern_path(".krata.v1.common", "::krata::v1::common")
|
||||
.compile_protos(&["proto/kratad/db.proto"], &["proto/", "../../proto"])?;
|
||||
Ok(())
|
||||
}
|
10
crates/daemon/proto/kratad/db.proto
Normal file
10
crates/daemon/proto/kratad/db.proto
Normal file
@ -0,0 +1,10 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package kratad.db;
|
||||
|
||||
import "krata/v1/common.proto";
|
||||
|
||||
message GuestEntry {
|
||||
string id = 1;
|
||||
krata.v1.common.Guest guest = 2;
|
||||
}
|
266
crates/daemon/src/control.rs
Normal file
266
crates/daemon/src/control.rs
Normal file
@ -0,0 +1,266 @@
|
||||
use std::{io, pin::Pin, str::FromStr};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use krata::v1::{
|
||||
common::{Guest, GuestState, GuestStatus},
|
||||
control::{
|
||||
control_service_server::ControlService, ConsoleDataReply, ConsoleDataRequest,
|
||||
CreateGuestReply, CreateGuestRequest, DestroyGuestReply, DestroyGuestRequest,
|
||||
ListGuestsReply, ListGuestsRequest, ResolveGuestReply, ResolveGuestRequest,
|
||||
WatchEventsReply, WatchEventsRequest,
|
||||
},
|
||||
};
|
||||
use kratart::Runtime;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
select,
|
||||
sync::mpsc::Sender,
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
db::{proto::GuestEntry, GuestStore},
|
||||
event::DaemonEventContext,
|
||||
};
|
||||
|
||||
pub struct ApiError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for ApiError {
|
||||
fn from(value: anyhow::Error) -> Self {
|
||||
ApiError {
|
||||
message: value.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ApiError> for Status {
|
||||
fn from(value: ApiError) -> Self {
|
||||
Status::unknown(value.message)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RuntimeControlService {
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
}
|
||||
|
||||
impl RuntimeControlService {
|
||||
pub fn new(
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
) -> Self {
|
||||
Self {
|
||||
events,
|
||||
runtime,
|
||||
guests,
|
||||
guest_reconciler_notify,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ConsoleDataSelect {
|
||||
Read(io::Result<usize>),
|
||||
Write(Option<Result<ConsoleDataRequest, tonic::Status>>),
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl ControlService for RuntimeControlService {
|
||||
type ConsoleDataStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ConsoleDataReply, Status>> + Send + 'static>>;
|
||||
|
||||
type WatchEventsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
|
||||
|
||||
async fn create_guest(
|
||||
&self,
|
||||
request: Request<CreateGuestRequest>,
|
||||
) -> Result<Response<CreateGuestReply>, Status> {
|
||||
let request = request.into_inner();
|
||||
let Some(spec) = request.spec else {
|
||||
return Err(ApiError {
|
||||
message: "guest spec not provided".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
let uuid = Uuid::new_v4();
|
||||
self.guests
|
||||
.update(
|
||||
uuid,
|
||||
GuestEntry {
|
||||
id: uuid.to_string(),
|
||||
guest: Some(Guest {
|
||||
id: uuid.to_string(),
|
||||
state: Some(GuestState {
|
||||
status: GuestStatus::Starting.into(),
|
||||
network: None,
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
domid: u32::MAX,
|
||||
}),
|
||||
spec: Some(spec),
|
||||
}),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
self.guest_reconciler_notify
|
||||
.send(uuid)
|
||||
.await
|
||||
.map_err(|x| ApiError {
|
||||
message: x.to_string(),
|
||||
})?;
|
||||
Ok(Response::new(CreateGuestReply {
|
||||
guest_id: uuid.to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn destroy_guest(
|
||||
&self,
|
||||
request: Request<DestroyGuestRequest>,
|
||||
) -> Result<Response<DestroyGuestReply>, Status> {
|
||||
let request = request.into_inner();
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let Some(mut entry) = self.guests.read(uuid).await.map_err(ApiError::from)? else {
|
||||
return Err(ApiError {
|
||||
message: "guest not found".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
let Some(ref mut guest) = entry.guest else {
|
||||
return Err(ApiError {
|
||||
message: "guest not found".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
|
||||
guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default());
|
||||
|
||||
if guest.state.as_ref().unwrap().status() == GuestStatus::Destroyed {
|
||||
return Err(ApiError {
|
||||
message: "guest already destroyed".to_string(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
guest.state.as_mut().unwrap().status = GuestStatus::Destroying.into();
|
||||
self.guests
|
||||
.update(uuid, entry)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
self.guest_reconciler_notify
|
||||
.send(uuid)
|
||||
.await
|
||||
.map_err(|x| ApiError {
|
||||
message: x.to_string(),
|
||||
})?;
|
||||
Ok(Response::new(DestroyGuestReply {}))
|
||||
}
|
||||
|
||||
async fn list_guests(
|
||||
&self,
|
||||
request: Request<ListGuestsRequest>,
|
||||
) -> Result<Response<ListGuestsReply>, Status> {
|
||||
let _ = request.into_inner();
|
||||
let guests = self.guests.list().await.map_err(ApiError::from)?;
|
||||
let guests = guests
|
||||
.into_values()
|
||||
.filter_map(|entry| entry.guest)
|
||||
.collect::<Vec<Guest>>();
|
||||
Ok(Response::new(ListGuestsReply { guests }))
|
||||
}
|
||||
|
||||
async fn resolve_guest(
|
||||
&self,
|
||||
request: Request<ResolveGuestRequest>,
|
||||
) -> Result<Response<ResolveGuestReply>, Status> {
|
||||
let request = request.into_inner();
|
||||
let guests = self.guests.list().await.map_err(ApiError::from)?;
|
||||
let guests = guests
|
||||
.into_values()
|
||||
.filter_map(|entry| entry.guest)
|
||||
.filter(|x| {
|
||||
let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default();
|
||||
(!request.name.is_empty() && comparison_spec.name == request.name)
|
||||
|| x.id == request.name
|
||||
})
|
||||
.collect::<Vec<Guest>>();
|
||||
Ok(Response::new(ResolveGuestReply {
|
||||
guest: guests.first().cloned(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn console_data(
|
||||
&self,
|
||||
request: Request<Streaming<ConsoleDataRequest>>,
|
||||
) -> Result<Response<Self::ConsoleDataStream>, Status> {
|
||||
let mut input = request.into_inner();
|
||||
let Some(request) = input.next().await else {
|
||||
return Err(ApiError {
|
||||
message: "expected to have at least one request".to_string(),
|
||||
}
|
||||
.into());
|
||||
};
|
||||
let request = request?;
|
||||
let uuid = Uuid::from_str(&request.guest_id).map_err(|error| ApiError {
|
||||
message: error.to_string(),
|
||||
})?;
|
||||
let mut console = self.runtime.console(uuid).await.map_err(ApiError::from)?;
|
||||
|
||||
let output = try_stream! {
|
||||
let mut buffer: Vec<u8> = vec![0u8; 256];
|
||||
loop {
|
||||
let what = select! {
|
||||
x = console.read_handle.read(&mut buffer) => ConsoleDataSelect::Read(x),
|
||||
x = input.next() => ConsoleDataSelect::Write(x),
|
||||
};
|
||||
|
||||
match what {
|
||||
ConsoleDataSelect::Read(result) => {
|
||||
let size = result?;
|
||||
let data = buffer[0..size].to_vec();
|
||||
yield ConsoleDataReply { data, };
|
||||
},
|
||||
|
||||
ConsoleDataSelect::Write(Some(request)) => {
|
||||
let request = request?;
|
||||
if !request.data.is_empty() {
|
||||
console.write_handle.write_all(&request.data).await?;
|
||||
}
|
||||
},
|
||||
|
||||
ConsoleDataSelect::Write(None) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(Box::pin(output) as Self::ConsoleDataStream))
|
||||
}
|
||||
|
||||
async fn watch_events(
|
||||
&self,
|
||||
request: Request<WatchEventsRequest>,
|
||||
) -> Result<Response<Self::WatchEventsStream>, Status> {
|
||||
let _ = request.into_inner();
|
||||
let mut events = self.events.subscribe();
|
||||
let output = try_stream! {
|
||||
while let Ok(event) = events.recv().await {
|
||||
yield WatchEventsReply { event: Some(event), };
|
||||
}
|
||||
};
|
||||
Ok(Response::new(Box::pin(output) as Self::WatchEventsStream))
|
||||
}
|
||||
}
|
82
crates/daemon/src/db/mod.rs
Normal file
82
crates/daemon/src/db/mod.rs
Normal file
@ -0,0 +1,82 @@
|
||||
pub mod proto;
|
||||
|
||||
use std::{collections::HashMap, path::Path, sync::Arc};
|
||||
|
||||
use self::proto::GuestEntry;
|
||||
use anyhow::Result;
|
||||
use log::error;
|
||||
use prost::Message;
|
||||
use redb::{Database, ReadableTable, TableDefinition};
|
||||
use uuid::Uuid;
|
||||
|
||||
const GUESTS: TableDefinition<u128, &[u8]> = TableDefinition::new("guests");
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GuestStore {
|
||||
database: Arc<Database>,
|
||||
}
|
||||
|
||||
impl GuestStore {
|
||||
pub fn open(path: &Path) -> Result<Self> {
|
||||
let database = Database::create(path)?;
|
||||
let write = database.begin_write()?;
|
||||
let _ = write.open_table(GUESTS);
|
||||
write.commit()?;
|
||||
Ok(GuestStore {
|
||||
database: Arc::new(database),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn read(&self, id: Uuid) -> Result<Option<GuestEntry>> {
|
||||
let read = self.database.begin_read()?;
|
||||
let table = read.open_table(GUESTS)?;
|
||||
let Some(entry) = table.get(id.to_u128_le())? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let bytes = entry.value();
|
||||
Ok(Some(GuestEntry::decode(bytes)?))
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<HashMap<Uuid, GuestEntry>> {
|
||||
let mut guests: HashMap<Uuid, GuestEntry> = HashMap::new();
|
||||
let read = self.database.begin_read()?;
|
||||
let table = read.open_table(GUESTS)?;
|
||||
for result in table.iter()? {
|
||||
let (key, value) = result?;
|
||||
let uuid = Uuid::from_u128_le(key.value());
|
||||
let state = match GuestEntry::decode(value.value()) {
|
||||
Ok(state) => state,
|
||||
Err(error) => {
|
||||
error!(
|
||||
"found invalid guest state in database for uuid {}: {}",
|
||||
uuid, error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
guests.insert(uuid, state);
|
||||
}
|
||||
Ok(guests)
|
||||
}
|
||||
|
||||
pub async fn update(&self, id: Uuid, entry: GuestEntry) -> Result<()> {
|
||||
let write = self.database.begin_write()?;
|
||||
{
|
||||
let mut table = write.open_table(GUESTS)?;
|
||||
let bytes = entry.encode_to_vec();
|
||||
table.insert(id.to_u128_le(), bytes.as_slice())?;
|
||||
}
|
||||
write.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove(&self, id: Uuid) -> Result<()> {
|
||||
let write = self.database.begin_write()?;
|
||||
{
|
||||
let mut table = write.open_table(GUESTS)?;
|
||||
table.remove(id.to_u128_le())?;
|
||||
}
|
||||
write.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
1
crates/daemon/src/db/proto.rs
Normal file
1
crates/daemon/src/db/proto.rs
Normal file
@ -0,0 +1 @@
|
||||
include!(concat!(env!("OUT_DIR"), "/kratad.db.rs"));
|
180
crates/daemon/src/event.rs
Normal file
180
crates/daemon/src/event.rs
Normal file
@ -0,0 +1,180 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use krata::{
|
||||
idm::protocol::{idm_event::Event, IdmPacket},
|
||||
v1::common::{GuestExitInfo, GuestState, GuestStatus},
|
||||
};
|
||||
use log::error;
|
||||
use tokio::{
|
||||
select,
|
||||
sync::{
|
||||
broadcast,
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
},
|
||||
task::JoinHandle,
|
||||
time,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
db::GuestStore,
|
||||
idm::{DaemonIdmHandle, DaemonIdmSubscribeHandle},
|
||||
};
|
||||
|
||||
pub type DaemonEvent = krata::v1::control::watch_events_reply::Event;
|
||||
|
||||
const EVENT_CHANNEL_QUEUE_LEN: usize = 1000;
|
||||
const IDM_CHANNEL_QUEUE_LEN: usize = 1000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonEventContext {
|
||||
sender: broadcast::Sender<DaemonEvent>,
|
||||
}
|
||||
|
||||
impl DaemonEventContext {
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
|
||||
self.sender.subscribe()
|
||||
}
|
||||
|
||||
pub fn send(&self, event: DaemonEvent) -> Result<()> {
|
||||
let _ = self.sender.send(event);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DaemonEventGenerator {
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
feed: broadcast::Receiver<DaemonEvent>,
|
||||
idm: DaemonIdmHandle,
|
||||
idms: HashMap<u32, (Uuid, DaemonIdmSubscribeHandle)>,
|
||||
idm_sender: Sender<(u32, IdmPacket)>,
|
||||
idm_receiver: Receiver<(u32, IdmPacket)>,
|
||||
_event_sender: broadcast::Sender<DaemonEvent>,
|
||||
}
|
||||
|
||||
impl DaemonEventGenerator {
|
||||
pub async fn new(
|
||||
guests: GuestStore,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
idm: DaemonIdmHandle,
|
||||
) -> Result<(DaemonEventContext, DaemonEventGenerator)> {
|
||||
let (sender, _) = broadcast::channel(EVENT_CHANNEL_QUEUE_LEN);
|
||||
let (idm_sender, idm_receiver) = channel(IDM_CHANNEL_QUEUE_LEN);
|
||||
let generator = DaemonEventGenerator {
|
||||
guests,
|
||||
guest_reconciler_notify,
|
||||
feed: sender.subscribe(),
|
||||
idm,
|
||||
idms: HashMap::new(),
|
||||
idm_sender,
|
||||
idm_receiver,
|
||||
_event_sender: sender.clone(),
|
||||
};
|
||||
let context = DaemonEventContext { sender };
|
||||
Ok((context, generator))
|
||||
}
|
||||
|
||||
async fn handle_feed_event(&mut self, event: &DaemonEvent) -> Result<()> {
|
||||
match event {
|
||||
DaemonEvent::GuestChanged(changed) => {
|
||||
let Some(ref guest) = changed.guest else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(ref state) = guest.state else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let status = state.status();
|
||||
let id = Uuid::from_str(&guest.id)?;
|
||||
let domid = state.domid;
|
||||
match status {
|
||||
GuestStatus::Started => {
|
||||
if let Entry::Vacant(e) = self.idms.entry(domid) {
|
||||
let subscribe =
|
||||
self.idm.subscribe(domid, self.idm_sender.clone()).await?;
|
||||
e.insert((id, subscribe));
|
||||
}
|
||||
}
|
||||
|
||||
GuestStatus::Destroyed => {
|
||||
if let Some((_, handle)) = self.idms.remove(&domid) {
|
||||
handle.unsubscribe().await?;
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> {
|
||||
if let Some(Event::Exit(exit)) = packet.event.and_then(|x| x.event) {
|
||||
self.handle_exit_code(id, exit.code).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
|
||||
if let Some(mut entry) = self.guests.read(id).await? {
|
||||
let Some(ref mut guest) = entry.guest else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
guest.state = Some(GuestState {
|
||||
status: GuestStatus::Exited.into(),
|
||||
network: guest.state.clone().unwrap_or_default().network,
|
||||
exit_info: Some(GuestExitInfo { code }),
|
||||
error_info: None,
|
||||
domid: guest.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
|
||||
});
|
||||
|
||||
self.guests.update(id, entry).await?;
|
||||
self.guest_reconciler_notify.send(id).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn evaluate(&mut self) -> Result<()> {
|
||||
select! {
|
||||
x = self.idm_receiver.recv() => match x {
|
||||
Some((domid, packet)) => {
|
||||
if let Some((id, _)) = self.idms.get(&domid) {
|
||||
self.handle_idm_packet(*id, packet).await?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
None => {
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
x = self.feed.recv() => match x {
|
||||
Ok(event) => {
|
||||
self.handle_feed_event(&event).await
|
||||
},
|
||||
Err(error) => {
|
||||
Err(error.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
|
||||
Ok(tokio::task::spawn(async move {
|
||||
loop {
|
||||
if let Err(error) = self.evaluate().await {
|
||||
error!("failed to evaluate daemon events: {}", error);
|
||||
time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
135
crates/daemon/src/idm.rs
Normal file
135
crates/daemon/src/idm.rs
Normal file
@ -0,0 +1,135 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use krata::idm::protocol::IdmPacket;
|
||||
use kratart::channel::ChannelService;
|
||||
use log::{error, warn};
|
||||
use prost::Message;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{Receiver, Sender},
|
||||
Mutex,
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
type ListenerMap = Arc<Mutex<HashMap<u32, Sender<(u32, IdmPacket)>>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonIdmHandle {
|
||||
listeners: ListenerMap,
|
||||
task: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaemonIdmSubscribeHandle {
|
||||
domid: u32,
|
||||
listeners: ListenerMap,
|
||||
}
|
||||
|
||||
impl DaemonIdmSubscribeHandle {
|
||||
pub async fn unsubscribe(&self) -> Result<()> {
|
||||
let mut guard = self.listeners.lock().await;
|
||||
let _ = guard.remove(&self.domid);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DaemonIdmHandle {
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
domid: u32,
|
||||
sender: Sender<(u32, IdmPacket)>,
|
||||
) -> Result<DaemonIdmSubscribeHandle> {
|
||||
let mut guard = self.listeners.lock().await;
|
||||
guard.insert(domid, sender);
|
||||
Ok(DaemonIdmSubscribeHandle {
|
||||
domid,
|
||||
listeners: self.listeners.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DaemonIdmHandle {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.task) <= 1 {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DaemonIdm {
|
||||
listeners: ListenerMap,
|
||||
receiver: Receiver<(u32, Vec<u8>)>,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl DaemonIdm {
|
||||
pub async fn new() -> Result<DaemonIdm> {
|
||||
let (service, receiver) = ChannelService::new("krata-channel".to_string()).await?;
|
||||
let task = service.launch().await?;
|
||||
let listeners = Arc::new(Mutex::new(HashMap::new()));
|
||||
Ok(DaemonIdm {
|
||||
receiver,
|
||||
task,
|
||||
listeners,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn launch(mut self) -> Result<DaemonIdmHandle> {
|
||||
let listeners = self.listeners.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
let mut buffers: HashMap<u32, BytesMut> = HashMap::new();
|
||||
if let Err(error) = self.process(&mut buffers).await {
|
||||
error!("failed to process idm: {}", error);
|
||||
}
|
||||
});
|
||||
Ok(DaemonIdmHandle {
|
||||
listeners,
|
||||
task: Arc::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
async fn process(&mut self, buffers: &mut HashMap<u32, BytesMut>) -> Result<()> {
|
||||
loop {
|
||||
let Some((domid, data)) = self.receiver.recv().await else {
|
||||
break;
|
||||
};
|
||||
|
||||
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
|
||||
buffer.extend_from_slice(&data);
|
||||
if buffer.len() < 2 {
|
||||
continue;
|
||||
}
|
||||
let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize;
|
||||
let needed = size + 2;
|
||||
if buffer.len() < needed {
|
||||
continue;
|
||||
}
|
||||
let mut packet = buffer.split_to(needed);
|
||||
packet.advance(2);
|
||||
match IdmPacket::decode(packet) {
|
||||
Ok(packet) => {
|
||||
let guard = self.listeners.lock().await;
|
||||
if let Some(sender) = guard.get(&domid) {
|
||||
if let Err(error) = sender.try_send((domid, packet)) {
|
||||
warn!("dropped idm packet from domain {}: {}", domid, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(packet) => {
|
||||
warn!("received invalid packet from domain {}: {}", domid, packet);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DaemonIdm {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
130
crates/daemon/src/lib.rs
Normal file
130
crates/daemon/src/lib.rs
Normal file
@ -0,0 +1,130 @@
|
||||
use std::{net::SocketAddr, path::PathBuf, str::FromStr};
|
||||
|
||||
use anyhow::Result;
|
||||
use control::RuntimeControlService;
|
||||
use db::GuestStore;
|
||||
use event::{DaemonEventContext, DaemonEventGenerator};
|
||||
use idm::{DaemonIdm, DaemonIdmHandle};
|
||||
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
|
||||
use kratart::Runtime;
|
||||
use log::info;
|
||||
use reconcile::guest::GuestReconciler;
|
||||
use tokio::{
|
||||
net::UnixListener,
|
||||
sync::mpsc::{channel, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_stream::wrappers::UnixListenerStream;
|
||||
use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod control;
|
||||
pub mod db;
|
||||
pub mod event;
|
||||
pub mod idm;
|
||||
pub mod reconcile;
|
||||
|
||||
pub struct Daemon {
|
||||
store: String,
|
||||
runtime: Runtime,
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
guest_reconciler_task: JoinHandle<()>,
|
||||
guest_reconciler_notify: Sender<Uuid>,
|
||||
generator_task: JoinHandle<()>,
|
||||
_idm: DaemonIdmHandle,
|
||||
}
|
||||
|
||||
const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;
|
||||
|
||||
impl Daemon {
|
||||
pub async fn new(store: String, runtime: Runtime) -> Result<Self> {
|
||||
let guests_db_path = format!("{}/guests.db", store);
|
||||
let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
|
||||
let (guest_reconciler_notify, guest_reconciler_receiver) =
|
||||
channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN);
|
||||
let idm = DaemonIdm::new().await?;
|
||||
let idm = idm.launch().await?;
|
||||
let (events, generator) =
|
||||
DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
|
||||
.await?;
|
||||
let runtime_for_reconciler = runtime.dupe().await?;
|
||||
let guest_reconciler =
|
||||
GuestReconciler::new(guests.clone(), events.clone(), runtime_for_reconciler)?;
|
||||
|
||||
let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?;
|
||||
let generator_task = generator.launch().await?;
|
||||
Ok(Self {
|
||||
store,
|
||||
runtime,
|
||||
guests,
|
||||
events,
|
||||
guest_reconciler_task,
|
||||
guest_reconciler_notify,
|
||||
generator_task,
|
||||
_idm: idm,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
|
||||
let control_service = RuntimeControlService::new(
|
||||
self.events.clone(),
|
||||
self.runtime.clone(),
|
||||
self.guests.clone(),
|
||||
self.guest_reconciler_notify.clone(),
|
||||
);
|
||||
|
||||
let mut server = Server::builder();
|
||||
|
||||
if let ControlDialAddress::Tls {
|
||||
host: _,
|
||||
port: _,
|
||||
insecure,
|
||||
} = &addr
|
||||
{
|
||||
let mut tls_config = ServerTlsConfig::new();
|
||||
if !insecure {
|
||||
let certificate_path = format!("{}/tls/daemon.pem", self.store);
|
||||
let key_path = format!("{}/tls/daemon.key", self.store);
|
||||
tls_config = tls_config.identity(Identity::from_pem(certificate_path, key_path));
|
||||
}
|
||||
server = server.tls_config(tls_config)?;
|
||||
}
|
||||
|
||||
let server = server.add_service(ControlServiceServer::new(control_service));
|
||||
info!("listening on address {}", addr);
|
||||
match addr {
|
||||
ControlDialAddress::UnixSocket { path } => {
|
||||
let path = PathBuf::from(path);
|
||||
if path.exists() {
|
||||
tokio::fs::remove_file(&path).await?;
|
||||
}
|
||||
let listener = UnixListener::bind(path)?;
|
||||
let stream = UnixListenerStream::new(listener);
|
||||
server.serve_with_incoming(stream).await?;
|
||||
}
|
||||
|
||||
ControlDialAddress::Tcp { host, port } => {
|
||||
let address = format!("{}:{}", host, port);
|
||||
server.serve(SocketAddr::from_str(&address)?).await?;
|
||||
}
|
||||
|
||||
ControlDialAddress::Tls {
|
||||
host,
|
||||
port,
|
||||
insecure: _,
|
||||
} => {
|
||||
let address = format!("{}:{}", host, port);
|
||||
server.serve(SocketAddr::from_str(&address)?).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Daemon {
|
||||
fn drop(&mut self) {
|
||||
self.guest_reconciler_task.abort();
|
||||
self.generator_task.abort();
|
||||
}
|
||||
}
|
255
crates/daemon/src/reconcile/guest.rs
Normal file
255
crates/daemon/src/reconcile/guest.rs
Normal file
@ -0,0 +1,255 @@
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use krata::v1::{
|
||||
common::{
|
||||
guest_image_spec::Image, Guest, GuestErrorInfo, GuestExitInfo, GuestNetworkState,
|
||||
GuestState, GuestStatus,
|
||||
},
|
||||
control::GuestChangedEvent,
|
||||
};
|
||||
use kratart::{launch::GuestLaunchRequest, GuestInfo, Runtime};
|
||||
use log::{error, info, trace, warn};
|
||||
use tokio::{select, sync::mpsc::Receiver, task::JoinHandle, time::sleep};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
db::GuestStore,
|
||||
event::{DaemonEvent, DaemonEventContext},
|
||||
};
|
||||
|
||||
pub struct GuestReconciler {
|
||||
guests: GuestStore,
|
||||
events: DaemonEventContext,
|
||||
runtime: Runtime,
|
||||
}
|
||||
|
||||
impl GuestReconciler {
|
||||
pub fn new(guests: GuestStore, events: DaemonEventContext, runtime: Runtime) -> Result<Self> {
|
||||
Ok(Self {
|
||||
guests,
|
||||
events,
|
||||
runtime,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn launch(self, mut notify: Receiver<Uuid>) -> Result<JoinHandle<()>> {
|
||||
Ok(tokio::task::spawn(async move {
|
||||
if let Err(error) = self.reconcile_runtime(true).await {
|
||||
error!("runtime reconciler failed: {}", error);
|
||||
}
|
||||
|
||||
loop {
|
||||
select! {
|
||||
x = notify.recv() => match x {
|
||||
None => {
|
||||
break;
|
||||
},
|
||||
|
||||
Some(uuid) => {
|
||||
if let Err(error) = self.reconcile(uuid).await {
|
||||
error!("failed to reconcile guest {}: {}", uuid, error);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
_ = sleep(Duration::from_secs(5)) => {
|
||||
if let Err(error) = self.reconcile_runtime(false).await {
|
||||
error!("runtime reconciler failed: {}", error);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn reconcile_runtime(&self, initial: bool) -> Result<()> {
|
||||
trace!("reconciling runtime");
|
||||
let runtime_guests = self.runtime.list().await?;
|
||||
let stored_guests = self.guests.list().await?;
|
||||
for (uuid, mut stored_guest_entry) in stored_guests {
|
||||
let Some(ref mut stored_guest) = stored_guest_entry.guest else {
|
||||
warn!("removing unpopulated guest entry for guest {}", uuid);
|
||||
self.guests.remove(uuid).await?;
|
||||
continue;
|
||||
};
|
||||
let previous_guest = stored_guest.clone();
|
||||
let runtime_guest = runtime_guests.iter().find(|x| x.uuid == uuid);
|
||||
match runtime_guest {
|
||||
None => {
|
||||
let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default();
|
||||
if state.status() == GuestStatus::Started {
|
||||
state.status = GuestStatus::Starting.into();
|
||||
}
|
||||
stored_guest.state = Some(state);
|
||||
}
|
||||
|
||||
Some(runtime) => {
|
||||
let mut state = stored_guest.state.as_mut().cloned().unwrap_or_default();
|
||||
if let Some(code) = runtime.state.exit_code {
|
||||
state.status = GuestStatus::Exited.into();
|
||||
state.exit_info = Some(GuestExitInfo { code });
|
||||
} else {
|
||||
state.status = GuestStatus::Started.into();
|
||||
}
|
||||
state.network = Some(guestinfo_to_networkstate(runtime));
|
||||
stored_guest.state = Some(state);
|
||||
}
|
||||
}
|
||||
|
||||
let changed = *stored_guest != previous_guest;
|
||||
|
||||
if changed || initial {
|
||||
self.guests.update(uuid, stored_guest_entry).await?;
|
||||
if let Err(error) = self.reconcile(uuid).await {
|
||||
error!("failed to reconcile guest {}: {}", uuid, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reconcile(&self, uuid: Uuid) -> Result<()> {
|
||||
let Some(mut entry) = self.guests.read(uuid).await? else {
|
||||
warn!(
|
||||
"notified of reconcile for guest {} but it didn't exist",
|
||||
uuid
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!("reconciling guest {}", uuid);
|
||||
|
||||
let Some(ref mut guest) = entry.guest else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.events
|
||||
.send(DaemonEvent::GuestChanged(GuestChangedEvent {
|
||||
guest: Some(guest.clone()),
|
||||
}))?;
|
||||
|
||||
let result = match guest.state.as_ref().map(|x| x.status()).unwrap_or_default() {
|
||||
GuestStatus::Starting => self.start(uuid, guest).await,
|
||||
GuestStatus::Destroying | GuestStatus::Exited => self.destroy(uuid, guest).await,
|
||||
_ => Ok(false),
|
||||
};
|
||||
|
||||
let changed = match result {
|
||||
Ok(changed) => changed,
|
||||
Err(error) => {
|
||||
guest.state = Some(guest.state.as_mut().cloned().unwrap_or_default());
|
||||
guest.state.as_mut().unwrap().status = GuestStatus::Failed.into();
|
||||
guest.state.as_mut().unwrap().error_info = Some(GuestErrorInfo {
|
||||
message: error.to_string(),
|
||||
});
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
info!("reconciled guest {}", uuid);
|
||||
|
||||
let status = guest.state.as_ref().map(|x| x.status()).unwrap_or_default();
|
||||
let destroyed = status == GuestStatus::Destroyed || status == GuestStatus::Failed;
|
||||
|
||||
if changed {
|
||||
let event = DaemonEvent::GuestChanged(GuestChangedEvent {
|
||||
guest: Some(guest.clone()),
|
||||
});
|
||||
|
||||
if destroyed {
|
||||
self.guests.remove(uuid).await?;
|
||||
} else {
|
||||
self.guests.update(uuid, entry.clone()).await?;
|
||||
}
|
||||
|
||||
self.events.send(event)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start(&self, uuid: Uuid, guest: &mut Guest) -> Result<bool> {
|
||||
let Some(ref spec) = guest.spec else {
|
||||
return Err(anyhow!("guest spec not specified"));
|
||||
};
|
||||
|
||||
let Some(ref image) = spec.image else {
|
||||
return Err(anyhow!("image spec not provided"));
|
||||
};
|
||||
let oci = match image.image {
|
||||
Some(Image::Oci(ref oci)) => oci,
|
||||
None => {
|
||||
return Err(anyhow!("oci spec not specified"));
|
||||
}
|
||||
};
|
||||
|
||||
let task = spec.task.as_ref().cloned().unwrap_or_default();
|
||||
|
||||
let info = self
|
||||
.runtime
|
||||
.launch(GuestLaunchRequest {
|
||||
uuid: Some(uuid),
|
||||
name: if spec.name.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(&spec.name)
|
||||
},
|
||||
image: &oci.image,
|
||||
vcpus: spec.vcpus,
|
||||
mem: spec.mem,
|
||||
env: task
|
||||
.environment
|
||||
.iter()
|
||||
.map(|x| (x.key.clone(), x.value.clone()))
|
||||
.collect::<HashMap<_, _>>(),
|
||||
run: empty_vec_optional(task.command.clone()),
|
||||
debug: false,
|
||||
})
|
||||
.await?;
|
||||
info!("started guest {}", uuid);
|
||||
guest.state = Some(GuestState {
|
||||
status: GuestStatus::Started.into(),
|
||||
network: Some(guestinfo_to_networkstate(&info)),
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
domid: info.domid,
|
||||
});
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn destroy(&self, uuid: Uuid, guest: &mut Guest) -> Result<bool> {
|
||||
if let Err(error) = self.runtime.destroy(uuid).await {
|
||||
trace!("failed to destroy runtime guest {}: {}", uuid, error);
|
||||
}
|
||||
|
||||
info!("destroyed guest {}", uuid);
|
||||
guest.state = Some(GuestState {
|
||||
status: GuestStatus::Destroyed.into(),
|
||||
network: None,
|
||||
exit_info: None,
|
||||
error_info: None,
|
||||
domid: guest.state.as_ref().map(|x| x.domid).unwrap_or(u32::MAX),
|
||||
});
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
fn empty_vec_optional<T>(value: Vec<T>) -> Option<Vec<T>> {
|
||||
if value.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(value)
|
||||
}
|
||||
}
|
||||
|
||||
fn guestinfo_to_networkstate(info: &GuestInfo) -> GuestNetworkState {
|
||||
GuestNetworkState {
|
||||
guest_ipv4: info.guest_ipv4.map(|x| x.to_string()).unwrap_or_default(),
|
||||
guest_ipv6: info.guest_ipv6.map(|x| x.to_string()).unwrap_or_default(),
|
||||
guest_mac: info.guest_mac.as_ref().cloned().unwrap_or_default(),
|
||||
gateway_ipv4: info.gateway_ipv4.map(|x| x.to_string()).unwrap_or_default(),
|
||||
gateway_ipv6: info.gateway_ipv6.map(|x| x.to_string()).unwrap_or_default(),
|
||||
gateway_mac: info.gateway_mac.as_ref().cloned().unwrap_or_default(),
|
||||
}
|
||||
}
|
1
crates/daemon/src/reconcile/mod.rs
Normal file
1
crates/daemon/src/reconcile/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod guest;
|
Reference in New Issue
Block a user