From 8653fd62490c93ea21d32f50f86adb211c2fda41 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 5 Mar 2024 11:35:25 +0000 Subject: [PATCH] krata: rework into daemon / controller structure --- Cargo.toml | 11 +- README.md | 16 +- controller/Cargo.toml | 3 +- controller/bin/control.rs | 143 ++++------ controller/src/client.rs | 195 ++++++++++++++ controller/src/console.rs | 86 +++--- controller/src/ctl/console.rs | 51 ---- controller/src/ctl/destroy.rs | 64 ----- controller/src/ctl/launch.rs | 21 -- controller/src/ctl/mod.rs | 141 ---------- controller/src/lib.rs | 5 +- daemon/Cargo.toml | 68 +++++ daemon/bin/daemon.rs | 27 ++ daemon/examples/dial.rs | 28 ++ daemon/src/handlers/console.rs | 91 +++++++ daemon/src/handlers/destroy.rs | 44 ++++ daemon/src/handlers/launch.rs | 55 ++++ daemon/src/handlers/list.rs | 37 +++ daemon/src/handlers/mod.rs | 15 ++ daemon/src/lib.rs | 37 +++ daemon/src/listen.rs | 228 ++++++++++++++++ .../src => daemon/src/runtime}/autoloop.rs | 0 .../src/ctl => daemon/src/runtime}/cfgblk.rs | 4 +- daemon/src/runtime/console.rs | 18 ++ .../src => daemon/src/runtime}/image/cache.rs | 3 +- .../src => daemon/src/runtime}/image/fetch.rs | 0 .../src => daemon/src/runtime}/image/mod.rs | 6 +- .../src => daemon/src/runtime}/image/name.rs | 0 .../src => daemon/src/runtime}/launch/mod.rs | 36 +-- daemon/src/runtime/mod.rs | 247 ++++++++++++++++++ {container => guest}/Cargo.toml | 5 +- {container => guest}/bin/init.rs | 6 +- {container => guest}/src/background.rs | 0 {container => guest}/src/childwait.rs | 0 {container => guest}/src/init.rs | 24 +- {container => guest}/src/lib.rs | 0 initrd/build.sh | 4 +- resources/systemd/kratad.service | 12 + resources/systemd/kratanet.service | 12 + scripts/kratad-debug.sh | 8 + shared/Cargo.toml | 2 + shared/src/control.rs | 115 ++++++++ shared/src/launchcfg.rs | 33 +++ shared/src/lib.rs | 37 +-- shared/src/stream.rs | 152 +++++++++++ 45 files changed, 1597 insertions(+), 493 deletions(-) create mode 100644 controller/src/client.rs delete mode 100644 controller/src/ctl/console.rs delete mode 100644 controller/src/ctl/destroy.rs delete mode 100644 controller/src/ctl/launch.rs delete mode 100644 controller/src/ctl/mod.rs create mode 100644 daemon/Cargo.toml create mode 100644 daemon/bin/daemon.rs create mode 100644 daemon/examples/dial.rs create mode 100644 daemon/src/handlers/console.rs create mode 100644 daemon/src/handlers/destroy.rs create mode 100644 daemon/src/handlers/launch.rs create mode 100644 daemon/src/handlers/list.rs create mode 100644 daemon/src/handlers/mod.rs create mode 100644 daemon/src/lib.rs create mode 100644 daemon/src/listen.rs rename {controller/src => daemon/src/runtime}/autoloop.rs (100%) rename {controller/src/ctl => daemon/src/runtime}/cfgblk.rs (96%) create mode 100644 daemon/src/runtime/console.rs rename {controller/src => daemon/src/runtime}/image/cache.rs (98%) rename {controller/src => daemon/src/runtime}/image/fetch.rs (100%) rename {controller/src => daemon/src/runtime}/image/mod.rs (98%) rename {controller/src => daemon/src/runtime}/image/name.rs (100%) rename {controller/src => daemon/src/runtime}/launch/mod.rs (90%) create mode 100644 daemon/src/runtime/mod.rs rename {container => guest}/Cargo.toml (90%) rename {container => guest}/bin/init.rs (86%) rename {container => guest}/src/background.rs (100%) rename {container => guest}/src/childwait.rs (100%) rename {container => guest}/src/init.rs (96%) rename {container => guest}/src/lib.rs (100%) create mode 100644 resources/systemd/kratad.service create mode 100644 resources/systemd/kratanet.service create mode 100755 scripts/kratad-debug.sh create mode 100644 shared/src/control.rs create mode 100644 shared/src/launchcfg.rs create mode 100644 shared/src/stream.rs diff --git a/Cargo.toml b/Cargo.toml index a1cba8f..3e76a32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ members = [ "libs/advmac", "libs/loopdev", "shared", - "container", + "daemon", + "guest", "network", "controller", ] @@ -56,6 +57,8 @@ async-trait = "0.1.77" bytes = "1.5.0" path-absolutize = "3.1.1" tokio-tun = "0.11.2" +tokio-listener = "0.3.1" +trait-variant = "0.1.1" [workspace.dependencies.uuid] version = "1.6.1" @@ -71,7 +74,11 @@ features = ["derive"] [workspace.dependencies.tokio] version = "1.35.1" -features = ["macros", "rt", "rt-multi-thread"] +features = ["macros", "rt", "rt-multi-thread", "io-util"] + +[workspace.dependencies.tokio-stream] +version = "0.1" +features = ["io-util"] [workspace.dependencies.reqwest] version = "0.11.24" diff --git a/README.md b/README.md index 467852c..bd28297 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,10 @@ krata is composed of three major executables: | Executable | Runs On | User Interaction | Dev Runner | Code Path | | ---------- | ------- | ---------------- | --------------------------- | ----------- | +| kratad | host | backend daemon | ./scripts/kratad-debug.sh | daemon | | kratanet | host | backend daemon | ./scripts/kratanet-debug.sh | network | | kratactl | host | CLI tool | ./scripts/kratactl-debug.sh | controller | -| kratactr | guest | none, guest init | N/A | container | +| krataguest | guest | none, guest init | N/A | guest | You will find the code to each executable available in the bin/ and src/ directories inside it's corresponding code path from the above table. @@ -96,20 +97,21 @@ $ ./kernel/build.sh -j4 7. Copy the guest kernel image at `kernel/target/kernel` to `/var/lib/krata/default/kernel` to have it automatically detected by kratactl. 8. Launch `./scripts/kratanet-debug.sh` and keep it running in the foreground. -9. Run kratactl to launch a container: +9. Launch `./scripts/kratad-debug.sh` and keep it running in the foreground. +10. Run kratactl to launch a guest: ```sh -$ ./scripts/kratactl-debug.sh launch --attach mirror.gcr.io/library/alpine:latest /bin/busybox sh +$ ./scripts/kratactl-debug.sh launch --attach alpine:latest ``` -To detach from the container console, use `Ctrl + ]` on your keyboard. +To detach from the guest console, use `Ctrl + ]` on your keyboard. -To list the running containers, run: +To list the running guests, run: ```sh $ ./scripts/kratactl-debug.sh list ``` -To destroy a running container, copy it's UUID from either the launch command or the container list and run: +To destroy a running guest, copy it's UUID from either the launch command or the guest list and run: ```sh -$ ./scripts/kratactl-debug.sh destroy CONTAINER_UUID +$ ./scripts/kratactl-debug.sh destroy GUEST_UUID ``` diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 22fce9e..216c8c4 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "kratactrl" +name = "kratactl" version.workspace = true edition = "2021" resolver = "2" @@ -30,6 +30,7 @@ ipnetwork = { workspace = true } tokio = { workspace = true } futures = { workspace = true } bytes = { workspace = true } +tokio-stream = { workspace = true } [dependencies.krata] path = "../shared" diff --git a/controller/bin/control.rs b/controller/bin/control.rs index 5eab2a9..31be9d4 100644 --- a/controller/bin/control.rs +++ b/controller/bin/control.rs @@ -1,20 +1,20 @@ use anyhow::{anyhow, Result}; use clap::{Parser, Subcommand}; use env_logger::Env; -use kratactrl::{ - ctl::{ - console::ControllerConsole, destroy::ControllerDestroy, launch::ControllerLaunch, - ControllerContext, - }, - launch::GuestLaunchRequest, +use krata::control::{ + ConsoleStreamRequest, DestroyRequest, LaunchRequest, ListRequest, Request, Response, }; -use std::path::PathBuf; +use kratactl::{ + client::{KrataClient, KrataClientTransport}, + console::XenConsole, +}; +use tokio::net::UnixStream; #[derive(Parser, Debug)] #[command(version, about)] struct ControllerArgs { - #[arg(short, long, default_value = "auto")] - store: String, + #[arg(long, default_value = "/var/lib/krata/daemon.socket")] + connection: String, #[command(subcommand)] command: Commands, @@ -25,10 +25,6 @@ enum Commands { List {}, Launch { - #[arg(short, long, default_value = "auto")] - kernel: String, - #[arg(short = 'r', long, default_value = "auto")] - initrd: String, #[arg(short, long, default_value_t = 1)] cpus: u32, #[arg(short, long, default_value_t = 512)] @@ -37,8 +33,6 @@ enum Commands { env: Option>, #[arg(short, long)] attach: bool, - #[arg(long)] - debug: bool, #[arg()] image: String, #[arg(allow_hyphen_values = true, trailing_var_arg = true)] @@ -46,11 +40,11 @@ enum Commands { }, Destroy { #[arg()] - container: String, + guest: String, }, Console { #[arg()] - container: String, + guest: String, }, } @@ -59,77 +53,81 @@ async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init(); let args = ControllerArgs::parse(); - let store_path = if args.store == "auto" { - default_store_path().ok_or_else(|| anyhow!("unable to determine default store path")) - } else { - Ok(PathBuf::from(args.store)) - }?; - - let store_path = store_path - .to_str() - .map(|x| x.to_string()) - .ok_or_else(|| anyhow!("unable to convert store path to string"))?; - - let mut context = ControllerContext::new(store_path.clone()).await?; + let stream = UnixStream::connect(&args.connection).await?; + let transport = KrataClientTransport::new(stream).await?; + let client = KrataClient::new(transport).await?; match args.command { Commands::Launch { - kernel, - initrd, image, cpus, mem, attach, env, run, - debug, } => { - let kernel = map_kernel_path(&store_path, kernel); - let initrd = map_initrd_path(&store_path, initrd); - let mut launch = ControllerLaunch::new(&mut context); - let request = GuestLaunchRequest { - kernel_path: &kernel, - initrd_path: &initrd, - image: &image, + let request = LaunchRequest { + image, vcpus: cpus, mem, env, run: if run.is_empty() { None } else { Some(run) }, - debug, }; - let info = launch.perform(request).await?; - println!("launched guest: {}", info.uuid); + let Response::Launch(response) = client.send(Request::Launch(request)).await? else { + return Err(anyhow!("invalid response type")); + }; + println!("launched guest: {}", response.guest.id); if attach { - let mut console = ControllerConsole::new(&mut context); - console.perform(&info.uuid.to_string()).await?; + let request = ConsoleStreamRequest { + guest: response.guest.id.clone(), + }; + let Response::ConsoleStream(response) = + client.send(Request::ConsoleStream(request)).await? + else { + return Err(anyhow!("invalid response type")); + }; + let stream = client.acquire(response.stream).await?; + let console = XenConsole::new(stream).await?; + console.attach().await?; } } - Commands::Destroy { container } => { - let mut destroy = ControllerDestroy::new(&mut context); - destroy.perform(&container).await?; + Commands::Destroy { guest } => { + let request = DestroyRequest { guest }; + let Response::Destroy(response) = client.send(Request::Destroy(request)).await? else { + return Err(anyhow!("invalid response type")); + }; + println!("destroyed guest: {}", response.guest); } - Commands::Console { container } => { - let mut console = ControllerConsole::new(&mut context); - console.perform(&container).await?; + Commands::Console { guest } => { + let request = ConsoleStreamRequest { guest }; + let Response::ConsoleStream(response) = + client.send(Request::ConsoleStream(request)).await? + else { + return Err(anyhow!("invalid response type")); + }; + let stream = client.acquire(response.stream).await?; + let console = XenConsole::new(stream).await?; + console.attach().await?; } Commands::List { .. } => { - let containers = context.list().await?; + let request = ListRequest {}; + let Response::List(response) = client.send(Request::List(request)).await? else { + return Err(anyhow!("invalid response type")); + }; let mut table = cli_tables::Table::new(); let header = vec!["uuid", "ipv4", "ipv6", "image"]; table.push_row(&header)?; - for container in containers { - let row = vec![ - container.uuid.to_string(), - container.ipv4, - container.ipv6, - container.image, - ]; - table.push_row_string(&row)?; + for guest in response.guests { + table.push_row_string(&vec![ + guest.id, + guest.ipv4.unwrap_or("none".to_string()), + guest.ipv6.unwrap_or("none".to_string()), + guest.image, + ])?; } - if table.num_records() == 1 { println!("no guests have been launched"); } else { @@ -139,28 +137,3 @@ async fn main() -> Result<()> { } Ok(()) } - -fn map_kernel_path(store: &str, value: String) -> String { - if value == "auto" { - return format!("{}/default/kernel", store); - } - value -} - -fn map_initrd_path(store: &str, value: String) -> String { - if value == "auto" { - return format!("{}/default/initrd", store); - } - value -} - -fn default_store_path() -> Option { - let user_dirs = directories::UserDirs::new()?; - let mut path = user_dirs.home_dir().to_path_buf(); - if path == PathBuf::from("/root") { - path.push("/var/lib/krata") - } else { - path.push(".krata"); - } - Some(path) -} diff --git a/controller/src/client.rs b/controller/src/client.rs new file mode 100644 index 0000000..021a203 --- /dev/null +++ b/controller/src/client.rs @@ -0,0 +1,195 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::{anyhow, Result}; +use krata::{ + control::{Message, Request, RequestBox, Response}, + stream::{ConnectionStreams, StreamContext}, +}; +use log::{trace, warn}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{unix, UnixStream}, + select, + sync::{ + mpsc::{channel, Receiver, Sender}, + oneshot, Mutex, + }, + task::JoinHandle, +}; +use tokio_stream::{wrappers::LinesStream, StreamExt}; + +const QUEUE_MAX_LEN: usize = 100; + +pub struct KrataClientTransport { + sender: Sender, + receiver: Receiver, + task: JoinHandle<()>, +} + +impl Drop for KrataClientTransport { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl KrataClientTransport { + pub async fn new(stream: UnixStream) -> Result { + let (read, write) = stream.into_split(); + let (tx_sender, tx_receiver) = channel::(QUEUE_MAX_LEN); + let (rx_sender, rx_receiver) = channel::(QUEUE_MAX_LEN); + + let task = tokio::task::spawn(async move { + if let Err(error) = + KrataClientTransport::process_unix_stream(read, write, rx_sender, tx_receiver).await + { + warn!("failed to process krata transport messages: {}", error); + } + }); + + Ok(Self { + sender: tx_sender, + receiver: rx_receiver, + task, + }) + } + + async fn process_unix_stream( + read: unix::OwnedReadHalf, + mut write: unix::OwnedWriteHalf, + rx_sender: Sender, + mut tx_receiver: Receiver, + ) -> Result<()> { + let mut read = LinesStream::new(BufReader::new(read).lines()); + loop { + select! { + x = tx_receiver.recv() => match x { + Some(message) => { + let mut line = serde_json::to_string(&message)?; + trace!("sending line '{}'", line); + line.push('\n'); + write.write_all(line.as_bytes()).await?; + }, + + None => { + break; + } + }, + + x = read.next() => match x { + Some(Ok(line)) => { + let message = serde_json::from_str::(&line)?; + rx_sender.send(message).await?; + }, + + Some(Err(error)) => { + return Err(error.into()); + }, + + None => { + break; + } + } + }; + } + Ok(()) + } +} + +type RequestsMap = Arc>>>; + +#[derive(Clone)] +pub struct KrataClient { + tx_sender: Sender, + next: Arc>, + streams: ConnectionStreams, + requests: RequestsMap, + task: Arc>, +} + +impl KrataClient { + pub async fn new(transport: KrataClientTransport) -> Result { + let tx_sender = transport.sender.clone(); + let streams = ConnectionStreams::new(tx_sender.clone()); + let requests = Arc::new(Mutex::new(HashMap::new())); + let task = { + let requests = requests.clone(); + let streams = streams.clone(); + tokio::task::spawn(async move { + if let Err(error) = KrataClient::process(transport, streams, requests).await { + warn!("failed to process krata client messages: {}", error); + } + }) + }; + + Ok(Self { + tx_sender, + next: Arc::new(Mutex::new(0)), + requests, + streams, + task: Arc::new(task), + }) + } + + pub async fn send(&self, request: Request) -> Result { + let id = { + let mut next = self.next.lock().await; + let id = *next; + *next = id + 1; + id + }; + let (sender, receiver) = oneshot::channel(); + self.requests.lock().await.insert(id, sender); + self.tx_sender + .send(Message::Request(RequestBox { id, request })) + .await?; + let response = receiver.await?; + if let Response::Error(error) = response { + Err(anyhow!("krata error: {}", error.message)) + } else { + Ok(response) + } + } + + pub async fn acquire(&self, stream: u64) -> Result { + self.streams.acquire(stream).await + } + + async fn process( + mut transport: KrataClientTransport, + streams: ConnectionStreams, + requests: RequestsMap, + ) -> Result<()> { + loop { + let Some(message) = transport.receiver.recv().await else { + break; + }; + + match message { + Message::Request(_) => { + return Err(anyhow!("received request from service")); + } + + Message::Response(resp) => { + let Some(sender) = requests.lock().await.remove(&resp.id) else { + continue; + }; + + let _ = sender.send(resp.response); + } + + Message::StreamUpdated(updated) => { + streams.incoming(updated).await?; + } + } + } + Ok(()) + } +} + +impl Drop for KrataClient { + fn drop(&mut self) { + if Arc::strong_count(&self.task) <= 1 { + self.task.abort(); + } + } +} diff --git a/controller/src/console.rs b/controller/src/console.rs index f487c3e..45b1d36 100644 --- a/controller/src/console.rs +++ b/controller/src/console.rs @@ -4,71 +4,73 @@ use std::{ }; use anyhow::Result; -use futures::future::join_all; +use krata::{ + control::{ConsoleStreamUpdate, StreamUpdate}, + stream::StreamContext, +}; use log::debug; use std::process::exit; use termion::raw::IntoRawMode; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, + select, }; pub struct XenConsole { - xen_read_handle: File, - xen_write_handle: File, + stream: StreamContext, } impl XenConsole { - pub async fn new(tty: &str) -> Result { - let xen_read_handle = File::options().read(true).write(false).open(tty).await?; - let xen_write_handle = File::options().read(false).write(true).open(tty).await?; - Ok(XenConsole { - xen_read_handle, - xen_write_handle, - }) + pub async fn new(stream: StreamContext) -> Result { + Ok(XenConsole { stream }) } pub async fn attach(self) -> Result<()> { - let stdin = stdin(); + let stdin = unsafe { File::from_raw_fd(stdin().as_raw_fd()) }; let terminal = stdout().into_raw_mode()?; let stdout = unsafe { File::from_raw_fd(terminal.as_raw_fd()) }; - let reader_task = tokio::task::spawn(async move { - if let Err(error) = XenConsole::copy_stdout(stdout, self.xen_read_handle).await { - debug!("failed to copy console output: {}", error); - } - }); - let writer_task = tokio::task::spawn(async move { - if let Err(error) = XenConsole::intercept_stdin( - unsafe { File::from_raw_fd(stdin.as_raw_fd()) }, - self.xen_write_handle, - ) - .await - { - debug!("failed to intercept stdin: {}", error); - } - }); - join_all(vec![reader_task, writer_task]).await; + if let Err(error) = XenConsole::process(stdin, stdout, self.stream).await { + debug!("failed to process console stream: {}", error); + } + Ok(()) } - async fn copy_stdout(mut stdout: File, mut console: File) -> Result<()> { - let mut buffer = vec![0u8; 256]; - loop { - let size = console.read(&mut buffer).await?; - stdout.write_all(&buffer[0..size]).await?; - stdout.flush().await?; - } - } - - async fn intercept_stdin(mut stdin: File, mut console: File) -> Result<()> { + async fn process(mut stdin: File, mut stdout: File, mut stream: StreamContext) -> Result<()> { let mut buffer = vec![0u8; 60]; loop { - let size = stdin.read(&mut buffer).await?; - if size == 1 && buffer[0] == 0x1d { - exit(0); - } - console.write_all(&buffer[0..size]).await?; + select! { + x = stream.receiver.recv() => match x { + Some(StreamUpdate::ConsoleStream(update)) => { + stdout.write_all(&update.data).await?; + stdout.flush().await?; + }, + + None => { + break; + } + }, + + x = stdin.read(&mut buffer) => match x { + Ok(size) => { + if size == 1 && buffer[0] == 0x1d { + exit(0); + } + + let data = buffer[0..size].to_vec(); + stream.send(StreamUpdate::ConsoleStream(ConsoleStreamUpdate { + data, + })).await?; + }, + + Err(error) => { + return Err(error.into()); + } + } + }; } + Ok(()) } } diff --git a/controller/src/ctl/console.rs b/controller/src/ctl/console.rs deleted file mode 100644 index 4711d8d..0000000 --- a/controller/src/ctl/console.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::{process::exit, time::Duration}; - -use anyhow::{anyhow, Result}; -use log::warn; -use tokio::time::sleep; -use xenstore::client::XsdInterface; - -use super::destroy::ControllerDestroy; -use crate::console::XenConsole; - -use super::ControllerContext; - -pub struct ControllerConsole<'a> { - context: &'a mut ControllerContext, -} - -impl ControllerConsole<'_> { - pub fn new(context: &mut ControllerContext) -> ControllerConsole<'_> { - ControllerConsole { context } - } - - pub async fn perform(&mut self, id: &str) -> Result<()> { - let info = self - .context - .resolve(id) - .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; - let domid = info.domid; - let tty = self.context.xen.get_console_path(domid).await?; - let console = XenConsole::new(&tty).await?; - - let dom_path = self.context.xen.store.get_domain_path(domid).await?; - - tokio::task::spawn(async move { - if let Err(error) = console.attach().await { - warn!("failed to attach to console: {}", error); - } - }); - - let exit_code_path = format!("{}/krata/guest/exit-code", dom_path); - loop { - let Some(code) = self.context.xen.store.read_string(&exit_code_path).await? else { - sleep(Duration::from_secs(1)).await; - continue; - }; - let mut destroy = ControllerDestroy::new(self.context); - destroy.perform(&domid.to_string()).await?; - exit(code.parse::()?); - } - } -} diff --git a/controller/src/ctl/destroy.rs b/controller/src/ctl/destroy.rs deleted file mode 100644 index 42cfbf3..0000000 --- a/controller/src/ctl/destroy.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{fs, path::PathBuf}; - -use anyhow::{anyhow, Result}; -use uuid::Uuid; -use xenstore::client::{XsdClient, XsdInterface}; - -use super::ControllerContext; - -pub struct ControllerDestroy<'a> { - context: &'a mut ControllerContext, -} - -impl ControllerDestroy<'_> { - pub fn new(context: &mut ControllerContext) -> ControllerDestroy<'_> { - ControllerDestroy { context } - } - - pub async fn perform(&mut self, id: &str) -> Result { - let info = self - .context - .resolve(id) - .await? - .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; - let domid = info.domid; - let mut store = XsdClient::open().await?; - let dom_path = store.get_domain_path(domid).await?; - let uuid = match store - .read_string(format!("{}/krata/uuid", dom_path).as_str()) - .await? - { - None => { - return Err(anyhow!( - "domain {} was not found or not created by krata", - domid - )) - } - Some(value) => value, - }; - if uuid.is_empty() { - return Err(anyhow!("unable to find krata uuid based on the domain",)); - } - let uuid = Uuid::parse_str(&uuid)?; - let loops = store - .read_string(format!("{}/krata/loops", dom_path).as_str()) - .await?; - let loops = ControllerContext::parse_loop_set(&loops); - self.context.xen.destroy(domid).await?; - for info in &loops { - self.context.autoloop.unloop(&info.device)?; - match &info.delete { - None => {} - Some(delete) => { - let delete_path = PathBuf::from(delete); - if delete_path.is_file() || delete_path.is_symlink() { - fs::remove_file(&delete_path)?; - } else if delete_path.is_dir() { - fs::remove_dir_all(&delete_path)?; - } - } - } - } - Ok(uuid) - } -} diff --git a/controller/src/ctl/launch.rs b/controller/src/ctl/launch.rs deleted file mode 100644 index 6ae9aec..0000000 --- a/controller/src/ctl/launch.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::{ControllerContext, GuestInfo}; -use crate::launch::{GuestLaunchRequest, GuestLauncher}; -use anyhow::Result; - -pub struct ControllerLaunch<'a> { - context: &'a mut ControllerContext, -} - -impl ControllerLaunch<'_> { - pub fn new(context: &mut ControllerContext) -> ControllerLaunch<'_> { - ControllerLaunch { context } - } - - pub async fn perform<'c, 'r>( - &'c mut self, - request: GuestLaunchRequest<'r>, - ) -> Result { - let mut launcher = GuestLauncher::new()?; - launcher.launch(self.context, request).await - } -} diff --git a/controller/src/ctl/mod.rs b/controller/src/ctl/mod.rs deleted file mode 100644 index c720870..0000000 --- a/controller/src/ctl/mod.rs +++ /dev/null @@ -1,141 +0,0 @@ -pub mod cfgblk; - -use crate::autoloop::AutoLoop; -use crate::image::cache::ImageCache; -use anyhow::{anyhow, Result}; -use loopdev::LoopControl; -use std::fs; -use std::path::PathBuf; -use std::str::FromStr; -use uuid::Uuid; -use xenclient::XenClient; -use xenstore::client::XsdInterface; - -pub mod console; -pub mod destroy; -pub mod launch; - -pub struct ControllerContext { - pub image_cache: ImageCache, - pub autoloop: AutoLoop, - pub xen: XenClient, -} - -pub struct ContainerLoopInfo { - pub device: String, - pub file: String, - pub delete: Option, -} - -pub struct GuestInfo { - pub uuid: Uuid, - pub domid: u32, - pub image: String, - pub loops: Vec, - pub ipv4: String, - pub ipv6: String, -} - -impl ControllerContext { - pub async fn new(store_path: String) -> Result { - let mut image_cache_path = PathBuf::from(store_path); - image_cache_path.push("cache"); - fs::create_dir_all(&image_cache_path)?; - - let xen = XenClient::open().await?; - image_cache_path.push("image"); - fs::create_dir_all(&image_cache_path)?; - let image_cache = ImageCache::new(&image_cache_path)?; - Ok(ControllerContext { - image_cache, - autoloop: AutoLoop::new(LoopControl::open()?), - xen, - }) - } - - pub async fn list(&mut self) -> Result> { - let mut containers: Vec = Vec::new(); - for domid_candidate in self.xen.store.list("/local/domain").await? { - let dom_path = format!("/local/domain/{}", domid_candidate); - let uuid_string = match self - .xen - .store - .read_string(&format!("{}/krata/uuid", &dom_path)) - .await? - { - None => continue, - Some(value) => value, - }; - let domid = - u32::from_str(&domid_candidate).map_err(|_| anyhow!("failed to parse domid"))?; - let uuid = Uuid::from_str(&uuid_string)?; - let image = self - .xen - .store - .read_string(&format!("{}/krata/image", &dom_path)) - .await? - .unwrap_or("unknown".to_string()); - let loops = self - .xen - .store - .read_string(&format!("{}/krata/loops", &dom_path)) - .await?; - let ipv4 = self - .xen - .store - .read_string(&format!("{}/krata/network/guest/ipv4", &dom_path)) - .await? - .unwrap_or("unknown".to_string()); - let ipv6: String = self - .xen - .store - .read_string(&format!("{}/krata/network/guest/ipv6", &dom_path)) - .await? - .unwrap_or("unknown".to_string()); - let loops = ControllerContext::parse_loop_set(&loops); - containers.push(GuestInfo { - uuid, - domid, - image, - loops, - ipv4, - ipv6, - }); - } - Ok(containers) - } - - pub async fn resolve(&mut self, id: &str) -> Result> { - for container in self.list().await? { - let uuid_string = container.uuid.to_string(); - let domid_string = container.domid.to_string(); - if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) { - return Ok(Some(container)); - } - } - Ok(None) - } - - fn parse_loop_set(input: &Option) -> Vec { - let Some(input) = input else { - return Vec::new(); - }; - let sets = input - .split(',') - .map(|x| x.to_string()) - .map(|x| x.split(':').map(|v| v.to_string()).collect::>()) - .map(|x| (x[0].clone(), x[1].clone(), x[2].clone())) - .collect::>(); - sets.iter() - .map(|(device, file, delete)| ContainerLoopInfo { - device: device.clone(), - file: file.clone(), - delete: if delete == "none" { - None - } else { - Some(delete.clone()) - }, - }) - .collect::>() - } -} diff --git a/controller/src/lib.rs b/controller/src/lib.rs index ceefb8f..5393e6a 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -1,5 +1,2 @@ -pub mod autoloop; +pub mod client; pub mod console; -pub mod ctl; -pub mod image; -pub mod launch; diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml new file mode 100644 index 0000000..46b1bfd --- /dev/null +++ b/daemon/Cargo.toml @@ -0,0 +1,68 @@ +[package] +name = "kratad" +version.workspace = true +edition = "2021" +resolver = "2" + +[dependencies] +anyhow = { workspace = true } +log = { workspace = true } +env_logger = { workspace = true } +zstd = { workspace = true } +flate2 = { workspace = true } +tar = { workspace = true } +directories = { workspace = true } +walkdir = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +sha256 = { workspace = true } +url = { workspace = true } +ureq = { workspace = true } +reqwest = { workspace = true } +path-clean = { workspace = true } +termion = { workspace = true } +cli-tables = { workspace = true } +clap = { workspace = true } +oci-spec = { workspace = true } +backhand = { workspace = true } +uuid = { workspace = true } +ipnetwork = { workspace = true } +tokio = { workspace = true } +futures = { workspace = true } +bytes = { workspace = true } +tokio-stream = { workspace = true } +async-trait = { workspace = true } + +[dependencies.tokio-listener] +workspace = true +features = ["clap"] + +[dependencies.krata] +path = "../shared" + +[dependencies.nix] +workspace = true +features = ["process"] + +[dependencies.advmac] +path = "../libs/advmac" + +[dependencies.loopdev] +path = "../libs/loopdev" + +[dependencies.xenclient] +path = "../libs/xen/xenclient" + +[dependencies.xenstore] +path = "../libs/xen/xenstore" + +[lib] +path = "src/lib.rs" + +[[bin]] +name = "kratad" +path = "bin/daemon.rs" + +[[example]] +name = "kratad-dial" +path = "examples/dial.rs" diff --git a/daemon/bin/daemon.rs b/daemon/bin/daemon.rs new file mode 100644 index 0000000..75b45b3 --- /dev/null +++ b/daemon/bin/daemon.rs @@ -0,0 +1,27 @@ +use anyhow::{anyhow, Result}; +use clap::Parser; +use env_logger::Env; +use kratad::{runtime::Runtime, Daemon}; +use tokio_listener::ListenerAddressLFlag; + +#[derive(Parser)] +struct Args { + #[clap(flatten)] + listener: ListenerAddressLFlag, + #[arg(short, long, default_value = "/var/lib/krata")] + store: String, +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init(); + + let args = Args::parse(); + let Some(listener) = args.listener.bind().await else { + return Err(anyhow!("no listener specified")); + }; + let runtime = Runtime::new(args.store.clone()).await?; + let mut daemon = Daemon::new(runtime).await?; + daemon.listen(listener?).await?; + Ok(()) +} diff --git a/daemon/examples/dial.rs b/daemon/examples/dial.rs new file mode 100644 index 0000000..193823d --- /dev/null +++ b/daemon/examples/dial.rs @@ -0,0 +1,28 @@ +use anyhow::Result; +use krata::control::{ListRequest, Message, Request, RequestBox}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::TcpStream, +}; +use tokio_stream::{wrappers::LinesStream, StreamExt}; + +#[tokio::main] +async fn main() -> Result<()> { + let mut stream = TcpStream::connect("127.0.0.1:4050").await?; + let (read, mut write) = stream.split(); + let mut read = LinesStream::new(BufReader::new(read).lines()); + + let send = Message::Request(RequestBox { + id: 1, + request: Request::List(ListRequest {}), + }); + let mut line = serde_json::to_string(&send)?; + line.push('\n'); + write.write_all(line.as_bytes()).await?; + println!("sent: {:?}", send); + while let Some(line) = read.try_next().await? { + let message: Message = serde_json::from_str(&line)?; + println!("received: {:?}", message); + } + Ok(()) +} diff --git a/daemon/src/handlers/console.rs b/daemon/src/handlers/console.rs new file mode 100644 index 0000000..404d7f2 --- /dev/null +++ b/daemon/src/handlers/console.rs @@ -0,0 +1,91 @@ +use anyhow::{anyhow, Result}; +use krata::control::{ConsoleStreamResponse, ConsoleStreamUpdate, Request, Response, StreamUpdate}; +use log::warn; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + select, +}; + +use crate::{ + listen::DaemonRequestHandler, + runtime::{console::XenConsole, Runtime}, +}; +use krata::stream::{ConnectionStreams, StreamContext}; +pub struct ConsoleStreamRequestHandler {} + +impl Default for ConsoleStreamRequestHandler { + fn default() -> Self { + Self::new() + } +} + +impl ConsoleStreamRequestHandler { + pub fn new() -> Self { + Self {} + } + + async fn link_console_stream(mut stream: StreamContext, mut console: XenConsole) -> Result<()> { + loop { + let mut buffer = vec![0u8; 256]; + select! { + x = console.read_handle.read(&mut buffer) => match x { + Ok(size) => { + let data = buffer[0..size].to_vec(); + let update = StreamUpdate::ConsoleStream(ConsoleStreamUpdate { + data, + }); + stream.send(update).await?; + }, + + Err(error) => { + return Err(error.into()); + } + }, + + x = stream.receiver.recv() => match x { + Some(StreamUpdate::ConsoleStream(update)) => { + console.write_handle.write_all(&update.data).await?; + } + + None => { + break; + } + } + }; + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl DaemonRequestHandler for ConsoleStreamRequestHandler { + fn accepts(&self, request: &Request) -> bool { + matches!(request, Request::ConsoleStream(_)) + } + + async fn handle( + &self, + streams: ConnectionStreams, + runtime: Runtime, + request: Request, + ) -> Result { + let console_stream = match request { + Request::ConsoleStream(stream) => stream, + _ => return Err(anyhow!("unknown request")), + }; + let console = runtime.console(&console_stream.guest).await?; + let stream = streams.open().await?; + let id = stream.id; + tokio::task::spawn(async move { + if let Err(error) = + ConsoleStreamRequestHandler::link_console_stream(stream, console).await + { + warn!("failed to process console stream: {}", error); + } + }); + + Ok(Response::ConsoleStream(ConsoleStreamResponse { + stream: id, + })) + } +} diff --git a/daemon/src/handlers/destroy.rs b/daemon/src/handlers/destroy.rs new file mode 100644 index 0000000..7af1e13 --- /dev/null +++ b/daemon/src/handlers/destroy.rs @@ -0,0 +1,44 @@ +use anyhow::{anyhow, Result}; +use krata::{ + control::{DestroyResponse, Request, Response}, + stream::ConnectionStreams, +}; + +use crate::{listen::DaemonRequestHandler, runtime::Runtime}; + +pub struct DestroyRequestHandler {} + +impl Default for DestroyRequestHandler { + fn default() -> Self { + Self::new() + } +} + +impl DestroyRequestHandler { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl DaemonRequestHandler for DestroyRequestHandler { + fn accepts(&self, request: &Request) -> bool { + matches!(request, Request::Destroy(_)) + } + + async fn handle( + &self, + _: ConnectionStreams, + runtime: Runtime, + request: Request, + ) -> Result { + let destroy = match request { + Request::Destroy(destroy) => destroy, + _ => return Err(anyhow!("unknown request")), + }; + let guest = runtime.destroy(&destroy.guest).await?; + Ok(Response::Destroy(DestroyResponse { + guest: guest.to_string(), + })) + } +} diff --git a/daemon/src/handlers/launch.rs b/daemon/src/handlers/launch.rs new file mode 100644 index 0000000..2fa575d --- /dev/null +++ b/daemon/src/handlers/launch.rs @@ -0,0 +1,55 @@ +use anyhow::{anyhow, Result}; +use krata::{ + control::{GuestInfo, LaunchResponse, Request, Response}, + stream::ConnectionStreams, +}; + +use crate::{ + listen::DaemonRequestHandler, + runtime::{launch::GuestLaunchRequest, Runtime}, +}; + +pub struct LaunchRequestHandler {} + +impl Default for LaunchRequestHandler { + fn default() -> Self { + Self::new() + } +} + +impl LaunchRequestHandler { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl DaemonRequestHandler for LaunchRequestHandler { + fn accepts(&self, request: &Request) -> bool { + matches!(request, Request::Launch(_)) + } + + async fn handle( + &self, + _: ConnectionStreams, + runtime: Runtime, + request: Request, + ) -> Result { + let launch = match request { + Request::Launch(launch) => launch, + _ => return Err(anyhow!("unknown request")), + }; + let guest: GuestInfo = runtime + .launch(GuestLaunchRequest { + image: &launch.image, + vcpus: launch.vcpus, + mem: launch.mem, + env: launch.env, + run: launch.run, + debug: false, + }) + .await? + .into(); + Ok(Response::Launch(LaunchResponse { guest })) + } +} diff --git a/daemon/src/handlers/list.rs b/daemon/src/handlers/list.rs new file mode 100644 index 0000000..2e48b5d --- /dev/null +++ b/daemon/src/handlers/list.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use krata::{ + control::{GuestInfo, ListResponse, Request, Response}, + stream::ConnectionStreams, +}; + +use crate::{listen::DaemonRequestHandler, runtime::Runtime}; + +pub struct ListRequestHandler {} + +impl Default for ListRequestHandler { + fn default() -> Self { + Self::new() + } +} + +impl ListRequestHandler { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl DaemonRequestHandler for ListRequestHandler { + fn accepts(&self, request: &Request) -> bool { + matches!(request, Request::List(_)) + } + + async fn handle(&self, _: ConnectionStreams, runtime: Runtime, _: Request) -> Result { + let guests = runtime.list().await?; + let guests = guests + .into_iter() + .map(GuestInfo::from) + .collect::>(); + Ok(Response::List(ListResponse { guests })) + } +} diff --git a/daemon/src/handlers/mod.rs b/daemon/src/handlers/mod.rs new file mode 100644 index 0000000..46406fd --- /dev/null +++ b/daemon/src/handlers/mod.rs @@ -0,0 +1,15 @@ +pub mod console; +pub mod destroy; +pub mod launch; +pub mod list; + +impl From for krata::control::GuestInfo { + fn from(value: crate::runtime::GuestInfo) -> Self { + krata::control::GuestInfo { + id: value.uuid.to_string(), + image: value.image.clone(), + ipv4: value.ipv4.map(|x| x.ip().to_string()), + ipv6: value.ipv6.map(|x| x.ip().to_string()), + } + } +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs new file mode 100644 index 0000000..f3a0e4c --- /dev/null +++ b/daemon/src/lib.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use handlers::{ + console::ConsoleStreamRequestHandler, destroy::DestroyRequestHandler, + launch::LaunchRequestHandler, list::ListRequestHandler, +}; +use listen::{DaemonListener, DaemonRequestHandlers}; +use runtime::Runtime; +use tokio_listener::Listener; + +pub mod handlers; +pub mod listen; +pub mod runtime; + +pub struct Daemon { + runtime: Runtime, +} + +impl Daemon { + pub async fn new(runtime: Runtime) -> Result { + Ok(Self { runtime }) + } + + pub async fn listen(&mut self, listener: Listener) -> Result<()> { + let handlers = DaemonRequestHandlers::new( + self.runtime.clone(), + vec![ + Box::new(LaunchRequestHandler::new()), + Box::new(DestroyRequestHandler::new()), + Box::new(ConsoleStreamRequestHandler::new()), + Box::new(ListRequestHandler::new()), + ], + ); + let mut listener = DaemonListener::new(listener, handlers); + listener.handle().await?; + Ok(()) + } +} diff --git a/daemon/src/listen.rs b/daemon/src/listen.rs new file mode 100644 index 0000000..9f0f0b5 --- /dev/null +++ b/daemon/src/listen.rs @@ -0,0 +1,228 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use krata::control::{ErrorResponse, Message, Request, RequestBox, Response, ResponseBox}; +use log::trace; +use log::warn; +use tokio::sync::Mutex; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + select, + sync::mpsc::{channel, Receiver, Sender}, +}; +use tokio_listener::{Connection, Listener, SomeSocketAddrClonable}; +use tokio_stream::{wrappers::LinesStream, StreamExt}; + +use crate::runtime::Runtime; +use krata::stream::ConnectionStreams; + +const QUEUE_MAX_LEN: usize = 100; + +#[async_trait::async_trait] +pub trait DaemonRequestHandler: Send + Sync { + fn accepts(&self, request: &Request) -> bool; + async fn handle( + &self, + streams: ConnectionStreams, + runtime: Runtime, + request: Request, + ) -> Result; +} + +#[derive(Clone)] +pub struct DaemonRequestHandlers { + runtime: Runtime, + handlers: Arc>>, +} + +impl DaemonRequestHandlers { + pub fn new(runtime: Runtime, handlers: Vec>) -> Self { + DaemonRequestHandlers { + runtime, + handlers: Arc::new(handlers), + } + } + + async fn dispatch(&self, streams: ConnectionStreams, request: Request) -> Result { + for handler in self.handlers.iter() { + if handler.accepts(&request) { + return handler.handle(streams, self.runtime.clone(), request).await; + } + } + Err(anyhow!("daemon cannot handle that request")) + } +} + +pub struct DaemonListener { + listener: Listener, + handlers: DaemonRequestHandlers, + connections: Arc>>, + next: Arc>, +} + +impl DaemonListener { + pub fn new(listener: Listener, handlers: DaemonRequestHandlers) -> DaemonListener { + DaemonListener { + listener, + handlers, + connections: Arc::new(Mutex::new(HashMap::new())), + next: Arc::new(Mutex::new(0)), + } + } + + pub async fn handle(&mut self) -> Result<()> { + loop { + let (connection, addr) = self.listener.accept().await?; + let connection = + DaemonConnection::new(connection, addr.clonable(), self.handlers.clone()).await?; + let id = { + let mut next = self.next.lock().await; + let id = *next; + *next = id + 1; + id + }; + trace!("new connection from {}", connection.addr); + let tx_channel = connection.tx_sender.clone(); + let addr = connection.addr.clone(); + self.connections.lock().await.insert(id, connection); + let connections_for_close = self.connections.clone(); + tokio::task::spawn(async move { + tx_channel.closed().await; + trace!("connection from {} closed", addr); + connections_for_close.lock().await.remove(&id); + }); + } + } +} + +#[derive(Clone)] +pub struct DaemonConnection { + tx_sender: Sender, + addr: SomeSocketAddrClonable, + handlers: DaemonRequestHandlers, + streams: ConnectionStreams, +} + +impl DaemonConnection { + pub async fn new( + connection: Connection, + addr: SomeSocketAddrClonable, + handlers: DaemonRequestHandlers, + ) -> Result { + let (tx_sender, tx_receiver) = channel::(QUEUE_MAX_LEN); + let streams_tx_sender = tx_sender.clone(); + let instance = DaemonConnection { + tx_sender, + addr, + handlers, + streams: ConnectionStreams::new(streams_tx_sender), + }; + + { + let mut instance = instance.clone(); + tokio::task::spawn(async move { + if let Err(error) = instance.process(tx_receiver, connection).await { + warn!( + "failed to process daemon connection for {}: {}", + instance.addr, error + ); + } + }); + } + + Ok(instance) + } + + async fn process( + &mut self, + mut tx_receiver: Receiver, + connection: Connection, + ) -> Result<()> { + let (read, mut write) = tokio::io::split(connection); + let mut read = LinesStream::new(BufReader::new(read).lines()); + + loop { + select! { + x = read.next() => match x { + Some(Ok(line)) => { + let message: Message = serde_json::from_str(&line)?; + trace!("received message '{}' from {}", serde_json::to_string(&message)?, self.addr); + let mut context = self.clone(); + tokio::task::spawn(async move { + if let Err(error) = context.handle_message(&message).await { + let line = serde_json::to_string(&message).unwrap_or("".to_string()); + warn!("failed to handle message '{}' from {}: {}", line, context.addr, error); + } + }); + }, + + Some(Err(error)) => { + return Err(error.into()); + }, + + None => { + break; + } + }, + + x = tx_receiver.recv() => match x { + Some(message) => { + if let Message::StreamUpdated(ref update) = message { + self.streams.outgoing(update).await?; + } + let mut line = serde_json::to_string(&message)?; + trace!("sending message '{}' to {}", line, self.addr); + line.push('\n'); + write.write_all(line.as_bytes()).await?; + }, + None => { + break; + } + } + }; + } + Ok(()) + } + + async fn handle_message(&mut self, message: &Message) -> Result<()> { + match message { + Message::Request(req) => { + self.handle_request(req.clone()).await?; + } + + Message::Response(_) => { + return Err(anyhow!( + "received a response message from client {}, but this is the daemon", + self.addr + )); + } + + Message::StreamUpdated(updated) => { + self.streams.incoming(updated.clone()).await?; + } + } + Ok(()) + } + + async fn handle_request(&mut self, req: RequestBox) -> Result<()> { + let id = req.id; + let response = self + .handlers + .dispatch(self.streams.clone(), req.request) + .await + .map_err(|error| { + Response::Error(ErrorResponse { + message: error.to_string(), + }) + }); + let response = if let Err(response) = response { + response + } else { + response.unwrap() + }; + let resp = ResponseBox { id, response }; + self.tx_sender.send(Message::Response(resp)).await?; + Ok(()) + } +} diff --git a/controller/src/autoloop.rs b/daemon/src/runtime/autoloop.rs similarity index 100% rename from controller/src/autoloop.rs rename to daemon/src/runtime/autoloop.rs diff --git a/controller/src/ctl/cfgblk.rs b/daemon/src/runtime/cfgblk.rs similarity index 96% rename from controller/src/ctl/cfgblk.rs rename to daemon/src/runtime/cfgblk.rs index 9ffb7f3..2a66fdb 100644 --- a/controller/src/ctl/cfgblk.rs +++ b/daemon/src/runtime/cfgblk.rs @@ -1,7 +1,7 @@ -use crate::image::ImageInfo; +use crate::runtime::image::ImageInfo; use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; -use krata::LaunchInfo; +use krata::launchcfg::LaunchInfo; use log::trace; use std::fs; use std::fs::File; diff --git a/daemon/src/runtime/console.rs b/daemon/src/runtime/console.rs new file mode 100644 index 0000000..7571226 --- /dev/null +++ b/daemon/src/runtime/console.rs @@ -0,0 +1,18 @@ +use anyhow::Result; +use tokio::fs::File; + +pub struct XenConsole { + pub read_handle: File, + pub write_handle: File, +} + +impl XenConsole { + pub async fn new(tty: &str) -> Result { + let read_handle = File::options().read(true).write(false).open(tty).await?; + let write_handle = File::options().read(false).write(true).open(tty).await?; + Ok(XenConsole { + read_handle, + write_handle, + }) + } +} diff --git a/controller/src/image/cache.rs b/daemon/src/runtime/image/cache.rs similarity index 98% rename from controller/src/image/cache.rs rename to daemon/src/runtime/image/cache.rs index 73c0ee6..ca59540 100644 --- a/controller/src/image/cache.rs +++ b/daemon/src/runtime/image/cache.rs @@ -1,4 +1,5 @@ -use crate::image::{ImageInfo, Result}; +use super::ImageInfo; +use anyhow::Result; use log::debug; use oci_spec::image::{ImageConfiguration, ImageManifest}; use std::fs; diff --git a/controller/src/image/fetch.rs b/daemon/src/runtime/image/fetch.rs similarity index 100% rename from controller/src/image/fetch.rs rename to daemon/src/runtime/image/fetch.rs diff --git a/controller/src/image/mod.rs b/daemon/src/runtime/image/mod.rs similarity index 98% rename from controller/src/image/mod.rs rename to daemon/src/runtime/image/mod.rs index 51c6464..202e2a5 100644 --- a/controller/src/image/mod.rs +++ b/daemon/src/runtime/image/mod.rs @@ -2,9 +2,9 @@ pub mod cache; pub mod fetch; pub mod name; -use crate::image::cache::ImageCache; -use crate::image::fetch::RegistryClient; -use crate::image::name::ImageName; +use crate::runtime::image::cache::ImageCache; +use crate::runtime::image::fetch::RegistryClient; +use crate::runtime::image::name::ImageName; use anyhow::{anyhow, Result}; use backhand::compression::Compressor; use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader}; diff --git a/controller/src/image/name.rs b/daemon/src/runtime/image/name.rs similarity index 100% rename from controller/src/image/name.rs rename to daemon/src/runtime/image/name.rs diff --git a/controller/src/launch/mod.rs b/daemon/src/runtime/launch/mod.rs similarity index 90% rename from controller/src/launch/mod.rs rename to daemon/src/runtime/launch/mod.rs index c01ffd8..8d811c6 100644 --- a/controller/src/launch/mod.rs +++ b/daemon/src/runtime/launch/mod.rs @@ -1,25 +1,23 @@ +use std::net::IpAddr; use std::{fs, net::Ipv4Addr, str::FromStr}; use advmac::MacAddr6; use anyhow::{anyhow, Result}; -use ipnetwork::Ipv4Network; -use krata::{ +use ipnetwork::{IpNetwork, Ipv4Network}; +use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, }; use uuid::Uuid; use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; use xenstore::client::XsdInterface; -use crate::{ - ctl::GuestInfo, - image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}, -}; +use crate::runtime::cfgblk::ConfigBlock; +use crate::runtime::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}; +use crate::runtime::RuntimeContext; -use crate::ctl::{cfgblk::ConfigBlock, ControllerContext}; +use super::GuestInfo; pub struct GuestLaunchRequest<'a> { - pub kernel_path: &'a str, - pub initrd_path: &'a str, pub image: &'a str, pub vcpus: u32, pub mem: u64, @@ -35,9 +33,9 @@ impl GuestLauncher { Ok(Self {}) } - pub async fn launch<'c, 'r>( + pub async fn launch<'r>( &mut self, - context: &'c mut ControllerContext, + context: &mut RuntimeContext, request: GuestLaunchRequest<'r>, ) -> Result { let uuid = Uuid::new_v4(); @@ -115,8 +113,8 @@ impl GuestLauncher { name: &name, max_vcpus: request.vcpus, mem_mb: request.mem, - kernel_path: request.kernel_path, - initrd_path: request.initrd_path, + kernel_path: &context.kernel, + initrd_path: &context.initrd, cmdline: &cmdline, disks: vec![ DomainDisk { @@ -186,8 +184,14 @@ impl GuestLauncher { domid, image: request.image.to_string(), loops: vec![], - ipv4: format!("{}/{}", guest_ipv4, ipv4_network_mask), - ipv6: format!("{}/{}", guest_ipv6, ipv6_network_mask), + ipv4: Some(IpNetwork::new( + IpAddr::V4(guest_ipv4), + ipv4_network_mask as u8, + )?), + ipv6: Some(IpNetwork::new( + IpAddr::V6(guest_ipv6), + ipv6_network_mask as u8, + )?), }), Err(error) => { let _ = context.autoloop.unloop(&image_squashfs_loop.path); @@ -204,7 +208,7 @@ impl GuestLauncher { compiler.compile(&image).await } - async fn allocate_ipv4(&mut self, context: &mut ControllerContext) -> Result { + async fn allocate_ipv4(&mut self, context: &mut RuntimeContext) -> Result { let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; let mut used: Vec = vec![]; for domid_candidate in context.xen.store.list("/local/domain").await? { diff --git a/daemon/src/runtime/mod.rs b/daemon/src/runtime/mod.rs new file mode 100644 index 0000000..6d520a6 --- /dev/null +++ b/daemon/src/runtime/mod.rs @@ -0,0 +1,247 @@ +use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; + +use anyhow::{anyhow, Result}; +use ipnetwork::IpNetwork; +use loopdev::LoopControl; +use tokio::sync::Mutex; +use uuid::Uuid; +use xenclient::XenClient; +use xenstore::client::{XsdClient, XsdInterface}; + +use self::{ + autoloop::AutoLoop, + console::XenConsole, + image::cache::ImageCache, + launch::{GuestLaunchRequest, GuestLauncher}, +}; + +pub mod autoloop; +pub mod cfgblk; +pub mod console; +pub mod image; +pub mod launch; + +pub struct ContainerLoopInfo { + pub device: String, + pub file: String, + pub delete: Option, +} + +pub struct GuestInfo { + pub uuid: Uuid, + pub domid: u32, + pub image: String, + pub loops: Vec, + pub ipv4: Option, + pub ipv6: Option, +} + +pub struct RuntimeContext { + pub image_cache: ImageCache, + pub autoloop: AutoLoop, + pub xen: XenClient, + pub kernel: String, + pub initrd: String, +} + +impl RuntimeContext { + pub async fn new(store: String) -> Result { + let mut image_cache_path = PathBuf::from(&store); + image_cache_path.push("cache"); + fs::create_dir_all(&image_cache_path)?; + + let xen = XenClient::open().await?; + image_cache_path.push("image"); + fs::create_dir_all(&image_cache_path)?; + let image_cache = ImageCache::new(&image_cache_path)?; + let kernel = format!("{}/default/kernel", store); + let initrd = format!("{}/default/initrd", store); + Ok(RuntimeContext { + image_cache, + autoloop: AutoLoop::new(LoopControl::open()?), + xen, + kernel, + initrd, + }) + } + + pub async fn list(&mut self) -> Result> { + let mut guests: Vec = Vec::new(); + for domid_candidate in self.xen.store.list("/local/domain").await? { + let dom_path = format!("/local/domain/{}", domid_candidate); + let uuid_string = match self + .xen + .store + .read_string(&format!("{}/krata/uuid", &dom_path)) + .await? + { + None => continue, + Some(value) => value, + }; + let domid = + u32::from_str(&domid_candidate).map_err(|_| anyhow!("failed to parse domid"))?; + let uuid = Uuid::from_str(&uuid_string)?; + let image = self + .xen + .store + .read_string(&format!("{}/krata/image", &dom_path)) + .await? + .unwrap_or("unknown".to_string()); + let loops = self + .xen + .store + .read_string(&format!("{}/krata/loops", &dom_path)) + .await?; + let ipv4 = self + .xen + .store + .read_string(&format!("{}/krata/network/guest/ipv4", &dom_path)) + .await?; + let ipv6 = self + .xen + .store + .read_string(&format!("{}/krata/network/guest/ipv6", &dom_path)) + .await?; + + let ipv4 = if let Some(ipv4) = ipv4 { + IpNetwork::from_str(&ipv4).ok() + } else { + None + }; + + let ipv6 = if let Some(ipv6) = ipv6 { + IpNetwork::from_str(&ipv6).ok() + } else { + None + }; + + let loops = RuntimeContext::parse_loop_set(&loops); + guests.push(GuestInfo { + uuid, + domid, + image, + loops, + ipv4, + ipv6, + }); + } + Ok(guests) + } + + pub async fn resolve(&mut self, id: &str) -> Result> { + for guest in self.list().await? { + let uuid_string = guest.uuid.to_string(); + let domid_string = guest.domid.to_string(); + if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) { + return Ok(Some(guest)); + } + } + Ok(None) + } + + fn parse_loop_set(input: &Option) -> Vec { + let Some(input) = input else { + return Vec::new(); + }; + let sets = input + .split(',') + .map(|x| x.to_string()) + .map(|x| x.split(':').map(|v| v.to_string()).collect::>()) + .map(|x| (x[0].clone(), x[1].clone(), x[2].clone())) + .collect::>(); + sets.iter() + .map(|(device, file, delete)| ContainerLoopInfo { + device: device.clone(), + file: file.clone(), + delete: if delete == "none" { + None + } else { + Some(delete.clone()) + }, + }) + .collect::>() + } +} + +#[derive(Clone)] +pub struct Runtime { + context: Arc>, +} + +impl Runtime { + pub async fn new(store: String) -> Result { + let context = RuntimeContext::new(store).await?; + Ok(Self { + context: Arc::new(Mutex::new(context)), + }) + } + + pub async fn launch<'a>(&self, request: GuestLaunchRequest<'a>) -> Result { + let mut context = self.context.lock().await; + let mut launcher = GuestLauncher::new()?; + launcher.launch(&mut context, request).await + } + + pub async fn destroy(&self, id: &str) -> Result { + let mut context = self.context.lock().await; + let info = context + .resolve(id) + .await? + .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; + let domid = info.domid; + let mut store = XsdClient::open().await?; + let dom_path = store.get_domain_path(domid).await?; + let uuid = match store + .read_string(format!("{}/krata/uuid", dom_path).as_str()) + .await? + { + None => { + return Err(anyhow!( + "domain {} was not found or not created by krata", + domid + )) + } + Some(value) => value, + }; + if uuid.is_empty() { + return Err(anyhow!("unable to find krata uuid based on the domain",)); + } + let uuid = Uuid::parse_str(&uuid)?; + let loops = store + .read_string(format!("{}/krata/loops", dom_path).as_str()) + .await?; + let loops = RuntimeContext::parse_loop_set(&loops); + context.xen.destroy(domid).await?; + for info in &loops { + context.autoloop.unloop(&info.device)?; + match &info.delete { + None => {} + Some(delete) => { + let delete_path = PathBuf::from(delete); + if delete_path.is_file() || delete_path.is_symlink() { + fs::remove_file(&delete_path)?; + } else if delete_path.is_dir() { + fs::remove_dir_all(&delete_path)?; + } + } + } + } + Ok(uuid) + } + + pub async fn console(&self, id: &str) -> Result { + let mut context = self.context.lock().await; + let info = context + .resolve(id) + .await? + .ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?; + let domid = info.domid; + let tty = context.xen.get_console_path(domid).await?; + XenConsole::new(&tty).await + } + + pub async fn list(&self) -> Result> { + let mut context = self.context.lock().await; + context.list().await + } +} diff --git a/container/Cargo.toml b/guest/Cargo.toml similarity index 90% rename from container/Cargo.toml rename to guest/Cargo.toml index b57c1ac..b1444a6 100644 --- a/container/Cargo.toml +++ b/guest/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "kratactr" +name = "krataguest" version.workspace = true edition = "2021" resolver = "2" @@ -18,6 +18,7 @@ tokio = { workspace = true } futures = { workspace = true } ipnetwork = { workspace = true } path-absolutize = { workspace = true } +tokio-stream = { workspace = true } [dependencies.nix] workspace = true @@ -33,5 +34,5 @@ path = "../libs/xen/xenstore" path = "src/lib.rs" [[bin]] -name = "kratactr" +name = "krataguest" path = "bin/init.rs" diff --git a/container/bin/init.rs b/guest/bin/init.rs similarity index 86% rename from container/bin/init.rs rename to guest/bin/init.rs index 8bd19fb..5d5d569 100644 --- a/container/bin/init.rs +++ b/guest/bin/init.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; use env_logger::Env; -use kratactr::init::ContainerInit; +use krataguest::init::GuestInit; use std::env; #[tokio::main] @@ -18,7 +18,7 @@ async fn main() -> Result<()> { )); } } - let mut container = ContainerInit::new(); - container.init().await?; + let mut guest = GuestInit::new(); + guest.init().await?; Ok(()) } diff --git a/container/src/background.rs b/guest/src/background.rs similarity index 100% rename from container/src/background.rs rename to guest/src/background.rs diff --git a/container/src/childwait.rs b/guest/src/childwait.rs similarity index 100% rename from container/src/childwait.rs rename to guest/src/childwait.rs diff --git a/container/src/init.rs b/guest/src/init.rs similarity index 96% rename from container/src/init.rs rename to guest/src/init.rs index 3d23945..e197157 100644 --- a/container/src/init.rs +++ b/guest/src/init.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use futures::stream::TryStreamExt; use ipnetwork::IpNetwork; use krata::ethtool::EthtoolHandle; -use krata::{LaunchInfo, LaunchNetwork}; +use krata::launchcfg::{LaunchInfo, LaunchNetwork}; use log::{trace, warn}; use nix::libc::{dup2, ioctl}; use nix::unistd::{execve, fork, ForkResult, Pid}; @@ -47,17 +47,17 @@ const NEW_ROOT_DEV_PATH: &str = "/newroot/dev"; const IMAGE_CONFIG_JSON_PATH: &str = "/config/image/config.json"; const LAUNCH_CONFIG_JSON_PATH: &str = "/config/launch.json"; -pub struct ContainerInit {} +pub struct GuestInit {} -impl Default for ContainerInit { +impl Default for GuestInit { fn default() -> Self { Self::new() } } -impl ContainerInit { - pub fn new() -> ContainerInit { - ContainerInit {} +impl GuestInit { + pub fn new() -> GuestInit { + GuestInit {} } pub async fn init(&mut self) -> Result<()> { @@ -407,8 +407,8 @@ impl ContainerInit { env.extend_from_slice(extra_env.as_slice()); } - let env = ContainerInit::env_map(env); - let path = ContainerInit::resolve_executable(&env, path.into())?; + let env = GuestInit::env_map(env); + let path = GuestInit::resolve_executable(&env, path.into())?; let Some(file_name) = path.file_name() else { return Err(anyhow!("cannot get file name of command path")); }; @@ -416,13 +416,13 @@ impl ContainerInit { return Err(anyhow!("cannot get file name of command path as str")); }; cmd.insert(0, file_name.to_string()); - let env = ContainerInit::env_list(env); + let env = GuestInit::env_list(env); trace!("running container command: {}", cmd.join(" ")); let path = CString::new(path.as_os_str().as_bytes())?; - let cmd = ContainerInit::strings_as_cstrings(cmd)?; - let env = ContainerInit::strings_as_cstrings(env)?; + let cmd = GuestInit::strings_as_cstrings(cmd)?; + let env = GuestInit::strings_as_cstrings(env)?; let mut working_dir = config .working_dir() .as_ref() @@ -501,7 +501,7 @@ impl ContainerInit { cmd: Vec, env: Vec, ) -> Result<()> { - ContainerInit::set_controlling_terminal()?; + GuestInit::set_controlling_terminal()?; execve(&path, &cmd, &env)?; Ok(()) } diff --git a/container/src/lib.rs b/guest/src/lib.rs similarity index 100% rename from container/src/lib.rs rename to guest/src/lib.rs diff --git a/initrd/build.sh b/initrd/build.sh index 86444cb..481b3e5 100755 --- a/initrd/build.sh +++ b/initrd/build.sh @@ -6,9 +6,9 @@ TARGET="x86_64-unknown-linux-gnu" export RUSTFLAGS="-Ctarget-feature=+crt-static" cd "$(dirname "${0}")/.." krata_DIR="${PWD}" -cargo build -q --bin kratactr --release --target "${TARGET}" +cargo build -q --bin krataguest --release --target "${TARGET}" INITRD_DIR="$(mktemp -d /tmp/krata-initrd.XXXXXXXXXXXXX)" -cp "target/${TARGET}/release/kratactr" "${INITRD_DIR}/init" +cp "target/${TARGET}/release/krataguest" "${INITRD_DIR}/init" chmod +x "${INITRD_DIR}/init" cd "${INITRD_DIR}" mkdir -p "${krata_DIR}/target/initrd" diff --git a/resources/systemd/kratad.service b/resources/systemd/kratad.service new file mode 100644 index 0000000..8171871 --- /dev/null +++ b/resources/systemd/kratad.service @@ -0,0 +1,12 @@ +[Unit] +Description=Krata Controller Daemon + +[Service] +Restart=on-failure +Type=simple +WorkingDirectory=/var/lib/krata +ExecStart=/usr/bin/kratad +User=root + +[Install] +WantedBy=multi-user.target diff --git a/resources/systemd/kratanet.service b/resources/systemd/kratanet.service new file mode 100644 index 0000000..5c3dca4 --- /dev/null +++ b/resources/systemd/kratanet.service @@ -0,0 +1,12 @@ +[Unit] +Description=Krata Networking Daemon + +[Service] +Restart=on-failure +Type=simple +WorkingDirectory=/var/lib/krata +ExecStart=/usr/bin/kratanet +User=root + +[Install] +WantedBy=multi-user.target diff --git a/scripts/kratad-debug.sh b/scripts/kratad-debug.sh new file mode 100755 index 0000000..13ede39 --- /dev/null +++ b/scripts/kratad-debug.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -e + +REAL_SCRIPT="$(realpath "${0}")" +# shellcheck source-path=krata-debug-common.sh +. "$(dirname "${REAL_SCRIPT}")/krata-debug-common.sh" + +KRATA_BUILD_INITRD=1 build_and_run kratad "${@}" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 0115c14..45ad9e1 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -8,6 +8,8 @@ resolver = "2" anyhow = { workspace = true } serde = { workspace = true } libc = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } [dependencies.nix] workspace = true diff --git a/shared/src/control.rs b/shared/src/control.rs new file mode 100644 index 0000000..2f19243 --- /dev/null +++ b/shared/src/control.rs @@ -0,0 +1,115 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GuestInfo { + pub id: String, + pub image: String, + pub ipv4: Option, + pub ipv6: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LaunchRequest { + pub image: String, + pub vcpus: u32, + pub mem: u64, + pub env: Option>, + pub run: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LaunchResponse { + pub guest: GuestInfo, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListRequest {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListResponse { + pub guests: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DestroyRequest { + pub guest: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DestroyResponse { + pub guest: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsoleStreamRequest { + pub guest: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsoleStreamResponse { + pub stream: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsoleStreamUpdate { + pub data: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorResponse { + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Request { + Launch(LaunchRequest), + Destroy(DestroyRequest), + List(ListRequest), + ConsoleStream(ConsoleStreamRequest), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Response { + Error(ErrorResponse), + Launch(LaunchResponse), + Destroy(DestroyResponse), + List(ListResponse), + ConsoleStream(ConsoleStreamResponse), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestBox { + pub id: u64, + pub request: Request, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResponseBox { + pub id: u64, + pub response: Response, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum StreamStatus { + Open, + Closed, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StreamUpdate { + ConsoleStream(ConsoleStreamUpdate), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamUpdated { + pub id: u64, + pub update: Option, + pub status: StreamStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Message { + Request(RequestBox), + Response(ResponseBox), + StreamUpdated(StreamUpdated), +} diff --git a/shared/src/launchcfg.rs b/shared/src/launchcfg.rs new file mode 100644 index 0000000..8dae750 --- /dev/null +++ b/shared/src/launchcfg.rs @@ -0,0 +1,33 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchNetworkIpv4 { + pub address: String, + pub gateway: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchNetworkIpv6 { + pub address: String, + pub gateway: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchNetworkResolver { + pub nameservers: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchNetwork { + pub link: String, + pub ipv4: LaunchNetworkIpv4, + pub ipv6: LaunchNetworkIpv6, + pub resolver: LaunchNetworkResolver, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct LaunchInfo { + pub network: Option, + pub env: Option>, + pub run: Option>, +} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 7bfa964..fe2ee99 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,35 +1,4 @@ +pub mod control; pub mod ethtool; - -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct LaunchNetworkIpv4 { - pub address: String, - pub gateway: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct LaunchNetworkIpv6 { - pub address: String, - pub gateway: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct LaunchNetworkResolver { - pub nameservers: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct LaunchNetwork { - pub link: String, - pub ipv4: LaunchNetworkIpv4, - pub ipv6: LaunchNetworkIpv6, - pub resolver: LaunchNetworkResolver, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct LaunchInfo { - pub network: Option, - pub env: Option>, - pub run: Option>, -} +pub mod launchcfg; +pub mod stream; diff --git a/shared/src/stream.rs b/shared/src/stream.rs new file mode 100644 index 0000000..60388d0 --- /dev/null +++ b/shared/src/stream.rs @@ -0,0 +1,152 @@ +use crate::control::{Message, StreamStatus, StreamUpdate, StreamUpdated}; +use anyhow::{anyhow, Result}; +use log::warn; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; + +pub struct StreamContext { + pub id: u64, + pub receiver: Receiver, + sender: Sender, +} + +impl StreamContext { + pub async fn send(&self, update: StreamUpdate) -> Result<()> { + self.sender + .send(Message::StreamUpdated(StreamUpdated { + id: self.id, + update: Some(update), + status: StreamStatus::Open, + })) + .await?; + Ok(()) + } +} + +impl Drop for StreamContext { + fn drop(&mut self) { + if self.sender.is_closed() { + return; + } + let result = self.sender.try_send(Message::StreamUpdated(StreamUpdated { + id: self.id, + update: None, + status: StreamStatus::Closed, + })); + + if let Err(error) = result { + warn!( + "failed to send close message for stream {}: {}", + self.id, error + ); + } + } +} + +struct StreamStorage { + rx_sender: Sender, + rx_receiver: Option>, +} + +#[derive(Clone)] +pub struct ConnectionStreams { + next: Arc>, + streams: Arc>>, + tx_sender: Sender, +} + +const QUEUE_MAX_LEN: usize = 100; + +impl ConnectionStreams { + pub fn new(tx_sender: Sender) -> Self { + Self { + next: Arc::new(Mutex::new(0)), + streams: Arc::new(Mutex::new(HashMap::new())), + tx_sender, + } + } + + pub async fn open(&self) -> Result { + let id = { + let mut next = self.next.lock().await; + let id = *next; + *next = id + 1; + id + }; + + let (rx_sender, rx_receiver) = channel(QUEUE_MAX_LEN); + let store = StreamStorage { + rx_sender, + rx_receiver: None, + }; + + self.streams.lock().await.insert(id, store); + + let open = Message::StreamUpdated(StreamUpdated { + id, + update: None, + status: StreamStatus::Open, + }); + self.tx_sender.send(open).await?; + + Ok(StreamContext { + id, + sender: self.tx_sender.clone(), + receiver: rx_receiver, + }) + } + + pub async fn incoming(&self, updated: StreamUpdated) -> Result<()> { + let mut streams = self.streams.lock().await; + if updated.update.is_none() && updated.status == StreamStatus::Open { + let (rx_sender, rx_receiver) = channel(QUEUE_MAX_LEN); + let store = StreamStorage { + rx_sender, + rx_receiver: Some(rx_receiver), + }; + streams.insert(updated.id, store); + } + + let Some(storage) = streams.get(&updated.id) else { + return Ok(()); + }; + + if let Some(update) = updated.update { + storage.rx_sender.send(update).await?; + } + + if updated.status == StreamStatus::Closed { + streams.remove(&updated.id); + } + + Ok(()) + } + + pub async fn outgoing(&self, updated: &StreamUpdated) -> Result<()> { + if updated.status == StreamStatus::Closed { + let mut streams = self.streams.lock().await; + streams.remove(&updated.id); + } + Ok(()) + } + + pub async fn acquire(&self, id: u64) -> Result { + let mut streams = self.streams.lock().await; + let Some(storage) = streams.get_mut(&id) else { + return Err(anyhow!("stream {} has not been opened", id)); + }; + + let Some(receiver) = storage.rx_receiver.take() else { + return Err(anyhow!("stream has already been acquired")); + }; + + Ok(StreamContext { + id, + receiver, + sender: self.tx_sender.clone(), + }) + } +}