krata: rework into daemon / controller structure

This commit is contained in:
Alex Zenla 2024-03-05 11:35:25 +00:00
parent 17889d1c64
commit 8653fd6249
No known key found for this signature in database
GPG Key ID: 067B238899B51269
45 changed files with 1597 additions and 493 deletions

View File

@ -7,7 +7,8 @@ members = [
"libs/advmac", "libs/advmac",
"libs/loopdev", "libs/loopdev",
"shared", "shared",
"container", "daemon",
"guest",
"network", "network",
"controller", "controller",
] ]
@ -56,6 +57,8 @@ async-trait = "0.1.77"
bytes = "1.5.0" bytes = "1.5.0"
path-absolutize = "3.1.1" path-absolutize = "3.1.1"
tokio-tun = "0.11.2" tokio-tun = "0.11.2"
tokio-listener = "0.3.1"
trait-variant = "0.1.1"
[workspace.dependencies.uuid] [workspace.dependencies.uuid]
version = "1.6.1" version = "1.6.1"
@ -71,7 +74,11 @@ features = ["derive"]
[workspace.dependencies.tokio] [workspace.dependencies.tokio]
version = "1.35.1" version = "1.35.1"
features = ["macros", "rt", "rt-multi-thread"] features = ["macros", "rt", "rt-multi-thread", "io-util"]
[workspace.dependencies.tokio-stream]
version = "0.1"
features = ["io-util"]
[workspace.dependencies.reqwest] [workspace.dependencies.reqwest]
version = "0.11.24" version = "0.11.24"

View File

@ -44,9 +44,10 @@ krata is composed of three major executables:
| Executable | Runs On | User Interaction | Dev Runner | Code Path | | Executable | Runs On | User Interaction | Dev Runner | Code Path |
| ---------- | ------- | ---------------- | --------------------------- | ----------- | | ---------- | ------- | ---------------- | --------------------------- | ----------- |
| kratad | host | backend daemon | ./scripts/kratad-debug.sh | daemon |
| kratanet | host | backend daemon | ./scripts/kratanet-debug.sh | network | | kratanet | host | backend daemon | ./scripts/kratanet-debug.sh | network |
| kratactl | host | CLI tool | ./scripts/kratactl-debug.sh | controller | | kratactl | host | CLI tool | ./scripts/kratactl-debug.sh | controller |
| kratactr | guest | none, guest init | N/A | container | | krataguest | guest | none, guest init | N/A | guest |
You will find the code to each executable available in the bin/ and src/ directories inside You will find the code to each executable available in the bin/ and src/ directories inside
it's corresponding code path from the above table. it's corresponding code path from the above table.
@ -96,20 +97,21 @@ $ ./kernel/build.sh -j4
7. Copy the guest kernel image at `kernel/target/kernel` to `/var/lib/krata/default/kernel` to have it automatically detected by kratactl. 7. Copy the guest kernel image at `kernel/target/kernel` to `/var/lib/krata/default/kernel` to have it automatically detected by kratactl.
8. Launch `./scripts/kratanet-debug.sh` and keep it running in the foreground. 8. Launch `./scripts/kratanet-debug.sh` and keep it running in the foreground.
9. Run kratactl to launch a container: 9. Launch `./scripts/kratad-debug.sh` and keep it running in the foreground.
10. Run kratactl to launch a guest:
```sh ```sh
$ ./scripts/kratactl-debug.sh launch --attach mirror.gcr.io/library/alpine:latest /bin/busybox sh $ ./scripts/kratactl-debug.sh launch --attach alpine:latest
``` ```
To detach from the container console, use `Ctrl + ]` on your keyboard. To detach from the guest console, use `Ctrl + ]` on your keyboard.
To list the running containers, run: To list the running guests, run:
```sh ```sh
$ ./scripts/kratactl-debug.sh list $ ./scripts/kratactl-debug.sh list
``` ```
To destroy a running container, copy it's UUID from either the launch command or the container list and run: To destroy a running guest, copy it's UUID from either the launch command or the guest list and run:
```sh ```sh
$ ./scripts/kratactl-debug.sh destroy CONTAINER_UUID $ ./scripts/kratactl-debug.sh destroy GUEST_UUID
``` ```

View File

@ -1,5 +1,5 @@
[package] [package]
name = "kratactrl" name = "kratactl"
version.workspace = true version.workspace = true
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"
@ -30,6 +30,7 @@ ipnetwork = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
bytes = { workspace = true } bytes = { workspace = true }
tokio-stream = { workspace = true }
[dependencies.krata] [dependencies.krata]
path = "../shared" path = "../shared"

View File

@ -1,20 +1,20 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use env_logger::Env; use env_logger::Env;
use kratactrl::{ use krata::control::{
ctl::{ ConsoleStreamRequest, DestroyRequest, LaunchRequest, ListRequest, Request, Response,
console::ControllerConsole, destroy::ControllerDestroy, launch::ControllerLaunch,
ControllerContext,
},
launch::GuestLaunchRequest,
}; };
use std::path::PathBuf; use kratactl::{
client::{KrataClient, KrataClientTransport},
console::XenConsole,
};
use tokio::net::UnixStream;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about)] #[command(version, about)]
struct ControllerArgs { struct ControllerArgs {
#[arg(short, long, default_value = "auto")] #[arg(long, default_value = "/var/lib/krata/daemon.socket")]
store: String, connection: String,
#[command(subcommand)] #[command(subcommand)]
command: Commands, command: Commands,
@ -25,10 +25,6 @@ enum Commands {
List {}, List {},
Launch { Launch {
#[arg(short, long, default_value = "auto")]
kernel: String,
#[arg(short = 'r', long, default_value = "auto")]
initrd: String,
#[arg(short, long, default_value_t = 1)] #[arg(short, long, default_value_t = 1)]
cpus: u32, cpus: u32,
#[arg(short, long, default_value_t = 512)] #[arg(short, long, default_value_t = 512)]
@ -37,8 +33,6 @@ enum Commands {
env: Option<Vec<String>>, env: Option<Vec<String>>,
#[arg(short, long)] #[arg(short, long)]
attach: bool, attach: bool,
#[arg(long)]
debug: bool,
#[arg()] #[arg()]
image: String, image: String,
#[arg(allow_hyphen_values = true, trailing_var_arg = true)] #[arg(allow_hyphen_values = true, trailing_var_arg = true)]
@ -46,11 +40,11 @@ enum Commands {
}, },
Destroy { Destroy {
#[arg()] #[arg()]
container: String, guest: String,
}, },
Console { Console {
#[arg()] #[arg()]
container: String, guest: String,
}, },
} }
@ -59,77 +53,81 @@ async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
let args = ControllerArgs::parse(); let args = ControllerArgs::parse();
let store_path = if args.store == "auto" { let stream = UnixStream::connect(&args.connection).await?;
default_store_path().ok_or_else(|| anyhow!("unable to determine default store path")) let transport = KrataClientTransport::new(stream).await?;
} else { let client = KrataClient::new(transport).await?;
Ok(PathBuf::from(args.store))
}?;
let store_path = store_path
.to_str()
.map(|x| x.to_string())
.ok_or_else(|| anyhow!("unable to convert store path to string"))?;
let mut context = ControllerContext::new(store_path.clone()).await?;
match args.command { match args.command {
Commands::Launch { Commands::Launch {
kernel,
initrd,
image, image,
cpus, cpus,
mem, mem,
attach, attach,
env, env,
run, run,
debug,
} => { } => {
let kernel = map_kernel_path(&store_path, kernel); let request = LaunchRequest {
let initrd = map_initrd_path(&store_path, initrd); image,
let mut launch = ControllerLaunch::new(&mut context);
let request = GuestLaunchRequest {
kernel_path: &kernel,
initrd_path: &initrd,
image: &image,
vcpus: cpus, vcpus: cpus,
mem, mem,
env, env,
run: if run.is_empty() { None } else { Some(run) }, run: if run.is_empty() { None } else { Some(run) },
debug,
}; };
let info = launch.perform(request).await?; let Response::Launch(response) = client.send(Request::Launch(request)).await? else {
println!("launched guest: {}", info.uuid); return Err(anyhow!("invalid response type"));
};
println!("launched guest: {}", response.guest.id);
if attach { if attach {
let mut console = ControllerConsole::new(&mut context); let request = ConsoleStreamRequest {
console.perform(&info.uuid.to_string()).await?; guest: response.guest.id.clone(),
};
let Response::ConsoleStream(response) =
client.send(Request::ConsoleStream(request)).await?
else {
return Err(anyhow!("invalid response type"));
};
let stream = client.acquire(response.stream).await?;
let console = XenConsole::new(stream).await?;
console.attach().await?;
} }
} }
Commands::Destroy { container } => { Commands::Destroy { guest } => {
let mut destroy = ControllerDestroy::new(&mut context); let request = DestroyRequest { guest };
destroy.perform(&container).await?; let Response::Destroy(response) = client.send(Request::Destroy(request)).await? else {
return Err(anyhow!("invalid response type"));
};
println!("destroyed guest: {}", response.guest);
} }
Commands::Console { container } => { Commands::Console { guest } => {
let mut console = ControllerConsole::new(&mut context); let request = ConsoleStreamRequest { guest };
console.perform(&container).await?; let Response::ConsoleStream(response) =
client.send(Request::ConsoleStream(request)).await?
else {
return Err(anyhow!("invalid response type"));
};
let stream = client.acquire(response.stream).await?;
let console = XenConsole::new(stream).await?;
console.attach().await?;
} }
Commands::List { .. } => { Commands::List { .. } => {
let containers = context.list().await?; let request = ListRequest {};
let Response::List(response) = client.send(Request::List(request)).await? else {
return Err(anyhow!("invalid response type"));
};
let mut table = cli_tables::Table::new(); let mut table = cli_tables::Table::new();
let header = vec!["uuid", "ipv4", "ipv6", "image"]; let header = vec!["uuid", "ipv4", "ipv6", "image"];
table.push_row(&header)?; table.push_row(&header)?;
for container in containers { for guest in response.guests {
let row = vec![ table.push_row_string(&vec![
container.uuid.to_string(), guest.id,
container.ipv4, guest.ipv4.unwrap_or("none".to_string()),
container.ipv6, guest.ipv6.unwrap_or("none".to_string()),
container.image, guest.image,
]; ])?;
table.push_row_string(&row)?;
} }
if table.num_records() == 1 { if table.num_records() == 1 {
println!("no guests have been launched"); println!("no guests have been launched");
} else { } else {
@ -139,28 +137,3 @@ async fn main() -> Result<()> {
} }
Ok(()) Ok(())
} }
fn map_kernel_path(store: &str, value: String) -> String {
if value == "auto" {
return format!("{}/default/kernel", store);
}
value
}
fn map_initrd_path(store: &str, value: String) -> String {
if value == "auto" {
return format!("{}/default/initrd", store);
}
value
}
fn default_store_path() -> Option<PathBuf> {
let user_dirs = directories::UserDirs::new()?;
let mut path = user_dirs.home_dir().to_path_buf();
if path == PathBuf::from("/root") {
path.push("/var/lib/krata")
} else {
path.push(".krata");
}
Some(path)
}

195
controller/src/client.rs Normal file
View File

@ -0,0 +1,195 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::{anyhow, Result};
use krata::{
control::{Message, Request, RequestBox, Response},
stream::{ConnectionStreams, StreamContext},
};
use log::{trace, warn};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{unix, UnixStream},
select,
sync::{
mpsc::{channel, Receiver, Sender},
oneshot, Mutex,
},
task::JoinHandle,
};
use tokio_stream::{wrappers::LinesStream, StreamExt};
const QUEUE_MAX_LEN: usize = 100;
pub struct KrataClientTransport {
sender: Sender<Message>,
receiver: Receiver<Message>,
task: JoinHandle<()>,
}
impl Drop for KrataClientTransport {
fn drop(&mut self) {
self.task.abort();
}
}
impl KrataClientTransport {
pub async fn new(stream: UnixStream) -> Result<Self> {
let (read, write) = stream.into_split();
let (tx_sender, tx_receiver) = channel::<Message>(QUEUE_MAX_LEN);
let (rx_sender, rx_receiver) = channel::<Message>(QUEUE_MAX_LEN);
let task = tokio::task::spawn(async move {
if let Err(error) =
KrataClientTransport::process_unix_stream(read, write, rx_sender, tx_receiver).await
{
warn!("failed to process krata transport messages: {}", error);
}
});
Ok(Self {
sender: tx_sender,
receiver: rx_receiver,
task,
})
}
async fn process_unix_stream(
read: unix::OwnedReadHalf,
mut write: unix::OwnedWriteHalf,
rx_sender: Sender<Message>,
mut tx_receiver: Receiver<Message>,
) -> Result<()> {
let mut read = LinesStream::new(BufReader::new(read).lines());
loop {
select! {
x = tx_receiver.recv() => match x {
Some(message) => {
let mut line = serde_json::to_string(&message)?;
trace!("sending line '{}'", line);
line.push('\n');
write.write_all(line.as_bytes()).await?;
},
None => {
break;
}
},
x = read.next() => match x {
Some(Ok(line)) => {
let message = serde_json::from_str::<Message>(&line)?;
rx_sender.send(message).await?;
},
Some(Err(error)) => {
return Err(error.into());
},
None => {
break;
}
}
};
}
Ok(())
}
}
type RequestsMap = Arc<Mutex<HashMap<u64, oneshot::Sender<Response>>>>;
#[derive(Clone)]
pub struct KrataClient {
tx_sender: Sender<Message>,
next: Arc<Mutex<u64>>,
streams: ConnectionStreams,
requests: RequestsMap,
task: Arc<JoinHandle<()>>,
}
impl KrataClient {
pub async fn new(transport: KrataClientTransport) -> Result<Self> {
let tx_sender = transport.sender.clone();
let streams = ConnectionStreams::new(tx_sender.clone());
let requests = Arc::new(Mutex::new(HashMap::new()));
let task = {
let requests = requests.clone();
let streams = streams.clone();
tokio::task::spawn(async move {
if let Err(error) = KrataClient::process(transport, streams, requests).await {
warn!("failed to process krata client messages: {}", error);
}
})
};
Ok(Self {
tx_sender,
next: Arc::new(Mutex::new(0)),
requests,
streams,
task: Arc::new(task),
})
}
pub async fn send(&self, request: Request) -> Result<Response> {
let id = {
let mut next = self.next.lock().await;
let id = *next;
*next = id + 1;
id
};
let (sender, receiver) = oneshot::channel();
self.requests.lock().await.insert(id, sender);
self.tx_sender
.send(Message::Request(RequestBox { id, request }))
.await?;
let response = receiver.await?;
if let Response::Error(error) = response {
Err(anyhow!("krata error: {}", error.message))
} else {
Ok(response)
}
}
pub async fn acquire(&self, stream: u64) -> Result<StreamContext> {
self.streams.acquire(stream).await
}
async fn process(
mut transport: KrataClientTransport,
streams: ConnectionStreams,
requests: RequestsMap,
) -> Result<()> {
loop {
let Some(message) = transport.receiver.recv().await else {
break;
};
match message {
Message::Request(_) => {
return Err(anyhow!("received request from service"));
}
Message::Response(resp) => {
let Some(sender) = requests.lock().await.remove(&resp.id) else {
continue;
};
let _ = sender.send(resp.response);
}
Message::StreamUpdated(updated) => {
streams.incoming(updated).await?;
}
}
}
Ok(())
}
}
impl Drop for KrataClient {
fn drop(&mut self) {
if Arc::strong_count(&self.task) <= 1 {
self.task.abort();
}
}
}

View File

@ -4,71 +4,73 @@ use std::{
}; };
use anyhow::Result; use anyhow::Result;
use futures::future::join_all; use krata::{
control::{ConsoleStreamUpdate, StreamUpdate},
stream::StreamContext,
};
use log::debug; use log::debug;
use std::process::exit; use std::process::exit;
use termion::raw::IntoRawMode; use termion::raw::IntoRawMode;
use tokio::{ use tokio::{
fs::File, fs::File,
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
select,
}; };
pub struct XenConsole { pub struct XenConsole {
xen_read_handle: File, stream: StreamContext,
xen_write_handle: File,
} }
impl XenConsole { impl XenConsole {
pub async fn new(tty: &str) -> Result<XenConsole> { pub async fn new(stream: StreamContext) -> Result<XenConsole> {
let xen_read_handle = File::options().read(true).write(false).open(tty).await?; Ok(XenConsole { stream })
let xen_write_handle = File::options().read(false).write(true).open(tty).await?;
Ok(XenConsole {
xen_read_handle,
xen_write_handle,
})
} }
pub async fn attach(self) -> Result<()> { pub async fn attach(self) -> Result<()> {
let stdin = stdin(); let stdin = unsafe { File::from_raw_fd(stdin().as_raw_fd()) };
let terminal = stdout().into_raw_mode()?; let terminal = stdout().into_raw_mode()?;
let stdout = unsafe { File::from_raw_fd(terminal.as_raw_fd()) }; let stdout = unsafe { File::from_raw_fd(terminal.as_raw_fd()) };
let reader_task = tokio::task::spawn(async move {
if let Err(error) = XenConsole::copy_stdout(stdout, self.xen_read_handle).await {
debug!("failed to copy console output: {}", error);
}
});
let writer_task = tokio::task::spawn(async move {
if let Err(error) = XenConsole::intercept_stdin(
unsafe { File::from_raw_fd(stdin.as_raw_fd()) },
self.xen_write_handle,
)
.await
{
debug!("failed to intercept stdin: {}", error);
}
});
join_all(vec![reader_task, writer_task]).await; if let Err(error) = XenConsole::process(stdin, stdout, self.stream).await {
debug!("failed to process console stream: {}", error);
}
Ok(()) Ok(())
} }
async fn copy_stdout(mut stdout: File, mut console: File) -> Result<()> { async fn process(mut stdin: File, mut stdout: File, mut stream: StreamContext) -> Result<()> {
let mut buffer = vec![0u8; 256];
loop {
let size = console.read(&mut buffer).await?;
stdout.write_all(&buffer[0..size]).await?;
stdout.flush().await?;
}
}
async fn intercept_stdin(mut stdin: File, mut console: File) -> Result<()> {
let mut buffer = vec![0u8; 60]; let mut buffer = vec![0u8; 60];
loop { loop {
let size = stdin.read(&mut buffer).await?; select! {
x = stream.receiver.recv() => match x {
Some(StreamUpdate::ConsoleStream(update)) => {
stdout.write_all(&update.data).await?;
stdout.flush().await?;
},
None => {
break;
}
},
x = stdin.read(&mut buffer) => match x {
Ok(size) => {
if size == 1 && buffer[0] == 0x1d { if size == 1 && buffer[0] == 0x1d {
exit(0); exit(0);
} }
console.write_all(&buffer[0..size]).await?;
let data = buffer[0..size].to_vec();
stream.send(StreamUpdate::ConsoleStream(ConsoleStreamUpdate {
data,
})).await?;
},
Err(error) => {
return Err(error.into());
} }
} }
};
}
Ok(())
}
} }

View File

@ -1,51 +0,0 @@
use std::{process::exit, time::Duration};
use anyhow::{anyhow, Result};
use log::warn;
use tokio::time::sleep;
use xenstore::client::XsdInterface;
use super::destroy::ControllerDestroy;
use crate::console::XenConsole;
use super::ControllerContext;
pub struct ControllerConsole<'a> {
context: &'a mut ControllerContext,
}
impl ControllerConsole<'_> {
pub fn new(context: &mut ControllerContext) -> ControllerConsole<'_> {
ControllerConsole { context }
}
pub async fn perform(&mut self, id: &str) -> Result<()> {
let info = self
.context
.resolve(id)
.await?
.ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?;
let domid = info.domid;
let tty = self.context.xen.get_console_path(domid).await?;
let console = XenConsole::new(&tty).await?;
let dom_path = self.context.xen.store.get_domain_path(domid).await?;
tokio::task::spawn(async move {
if let Err(error) = console.attach().await {
warn!("failed to attach to console: {}", error);
}
});
let exit_code_path = format!("{}/krata/guest/exit-code", dom_path);
loop {
let Some(code) = self.context.xen.store.read_string(&exit_code_path).await? else {
sleep(Duration::from_secs(1)).await;
continue;
};
let mut destroy = ControllerDestroy::new(self.context);
destroy.perform(&domid.to_string()).await?;
exit(code.parse::<i32>()?);
}
}
}

View File

@ -1,64 +0,0 @@
use std::{fs, path::PathBuf};
use anyhow::{anyhow, Result};
use uuid::Uuid;
use xenstore::client::{XsdClient, XsdInterface};
use super::ControllerContext;
pub struct ControllerDestroy<'a> {
context: &'a mut ControllerContext,
}
impl ControllerDestroy<'_> {
pub fn new(context: &mut ControllerContext) -> ControllerDestroy<'_> {
ControllerDestroy { context }
}
pub async fn perform(&mut self, id: &str) -> Result<Uuid> {
let info = self
.context
.resolve(id)
.await?
.ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?;
let domid = info.domid;
let mut store = XsdClient::open().await?;
let dom_path = store.get_domain_path(domid).await?;
let uuid = match store
.read_string(format!("{}/krata/uuid", dom_path).as_str())
.await?
{
None => {
return Err(anyhow!(
"domain {} was not found or not created by krata",
domid
))
}
Some(value) => value,
};
if uuid.is_empty() {
return Err(anyhow!("unable to find krata uuid based on the domain",));
}
let uuid = Uuid::parse_str(&uuid)?;
let loops = store
.read_string(format!("{}/krata/loops", dom_path).as_str())
.await?;
let loops = ControllerContext::parse_loop_set(&loops);
self.context.xen.destroy(domid).await?;
for info in &loops {
self.context.autoloop.unloop(&info.device)?;
match &info.delete {
None => {}
Some(delete) => {
let delete_path = PathBuf::from(delete);
if delete_path.is_file() || delete_path.is_symlink() {
fs::remove_file(&delete_path)?;
} else if delete_path.is_dir() {
fs::remove_dir_all(&delete_path)?;
}
}
}
}
Ok(uuid)
}
}

View File

@ -1,21 +0,0 @@
use super::{ControllerContext, GuestInfo};
use crate::launch::{GuestLaunchRequest, GuestLauncher};
use anyhow::Result;
pub struct ControllerLaunch<'a> {
context: &'a mut ControllerContext,
}
impl ControllerLaunch<'_> {
pub fn new(context: &mut ControllerContext) -> ControllerLaunch<'_> {
ControllerLaunch { context }
}
pub async fn perform<'c, 'r>(
&'c mut self,
request: GuestLaunchRequest<'r>,
) -> Result<GuestInfo> {
let mut launcher = GuestLauncher::new()?;
launcher.launch(self.context, request).await
}
}

View File

@ -1,141 +0,0 @@
pub mod cfgblk;
use crate::autoloop::AutoLoop;
use crate::image::cache::ImageCache;
use anyhow::{anyhow, Result};
use loopdev::LoopControl;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use uuid::Uuid;
use xenclient::XenClient;
use xenstore::client::XsdInterface;
pub mod console;
pub mod destroy;
pub mod launch;
pub struct ControllerContext {
pub image_cache: ImageCache,
pub autoloop: AutoLoop,
pub xen: XenClient,
}
pub struct ContainerLoopInfo {
pub device: String,
pub file: String,
pub delete: Option<String>,
}
pub struct GuestInfo {
pub uuid: Uuid,
pub domid: u32,
pub image: String,
pub loops: Vec<ContainerLoopInfo>,
pub ipv4: String,
pub ipv6: String,
}
impl ControllerContext {
pub async fn new(store_path: String) -> Result<ControllerContext> {
let mut image_cache_path = PathBuf::from(store_path);
image_cache_path.push("cache");
fs::create_dir_all(&image_cache_path)?;
let xen = XenClient::open().await?;
image_cache_path.push("image");
fs::create_dir_all(&image_cache_path)?;
let image_cache = ImageCache::new(&image_cache_path)?;
Ok(ControllerContext {
image_cache,
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
})
}
pub async fn list(&mut self) -> Result<Vec<GuestInfo>> {
let mut containers: Vec<GuestInfo> = Vec::new();
for domid_candidate in self.xen.store.list("/local/domain").await? {
let dom_path = format!("/local/domain/{}", domid_candidate);
let uuid_string = match self
.xen
.store
.read_string(&format!("{}/krata/uuid", &dom_path))
.await?
{
None => continue,
Some(value) => value,
};
let domid =
u32::from_str(&domid_candidate).map_err(|_| anyhow!("failed to parse domid"))?;
let uuid = Uuid::from_str(&uuid_string)?;
let image = self
.xen
.store
.read_string(&format!("{}/krata/image", &dom_path))
.await?
.unwrap_or("unknown".to_string());
let loops = self
.xen
.store
.read_string(&format!("{}/krata/loops", &dom_path))
.await?;
let ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/guest/ipv4", &dom_path))
.await?
.unwrap_or("unknown".to_string());
let ipv6: String = self
.xen
.store
.read_string(&format!("{}/krata/network/guest/ipv6", &dom_path))
.await?
.unwrap_or("unknown".to_string());
let loops = ControllerContext::parse_loop_set(&loops);
containers.push(GuestInfo {
uuid,
domid,
image,
loops,
ipv4,
ipv6,
});
}
Ok(containers)
}
pub async fn resolve(&mut self, id: &str) -> Result<Option<GuestInfo>> {
for container in self.list().await? {
let uuid_string = container.uuid.to_string();
let domid_string = container.domid.to_string();
if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) {
return Ok(Some(container));
}
}
Ok(None)
}
fn parse_loop_set(input: &Option<String>) -> Vec<ContainerLoopInfo> {
let Some(input) = input else {
return Vec::new();
};
let sets = input
.split(',')
.map(|x| x.to_string())
.map(|x| x.split(':').map(|v| v.to_string()).collect::<Vec<String>>())
.map(|x| (x[0].clone(), x[1].clone(), x[2].clone()))
.collect::<Vec<(String, String, String)>>();
sets.iter()
.map(|(device, file, delete)| ContainerLoopInfo {
device: device.clone(),
file: file.clone(),
delete: if delete == "none" {
None
} else {
Some(delete.clone())
},
})
.collect::<Vec<ContainerLoopInfo>>()
}
}

View File

@ -1,5 +1,2 @@
pub mod autoloop; pub mod client;
pub mod console; pub mod console;
pub mod ctl;
pub mod image;
pub mod launch;

68
daemon/Cargo.toml Normal file
View File

@ -0,0 +1,68 @@
[package]
name = "kratad"
version.workspace = true
edition = "2021"
resolver = "2"
[dependencies]
anyhow = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
zstd = { workspace = true }
flate2 = { workspace = true }
tar = { workspace = true }
directories = { workspace = true }
walkdir = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha256 = { workspace = true }
url = { workspace = true }
ureq = { workspace = true }
reqwest = { workspace = true }
path-clean = { workspace = true }
termion = { workspace = true }
cli-tables = { workspace = true }
clap = { workspace = true }
oci-spec = { workspace = true }
backhand = { workspace = true }
uuid = { workspace = true }
ipnetwork = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
tokio-stream = { workspace = true }
async-trait = { workspace = true }
[dependencies.tokio-listener]
workspace = true
features = ["clap"]
[dependencies.krata]
path = "../shared"
[dependencies.nix]
workspace = true
features = ["process"]
[dependencies.advmac]
path = "../libs/advmac"
[dependencies.loopdev]
path = "../libs/loopdev"
[dependencies.xenclient]
path = "../libs/xen/xenclient"
[dependencies.xenstore]
path = "../libs/xen/xenstore"
[lib]
path = "src/lib.rs"
[[bin]]
name = "kratad"
path = "bin/daemon.rs"
[[example]]
name = "kratad-dial"
path = "examples/dial.rs"

27
daemon/bin/daemon.rs Normal file
View File

@ -0,0 +1,27 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use env_logger::Env;
use kratad::{runtime::Runtime, Daemon};
use tokio_listener::ListenerAddressLFlag;
#[derive(Parser)]
struct Args {
#[clap(flatten)]
listener: ListenerAddressLFlag,
#[arg(short, long, default_value = "/var/lib/krata")]
store: String,
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
let args = Args::parse();
let Some(listener) = args.listener.bind().await else {
return Err(anyhow!("no listener specified"));
};
let runtime = Runtime::new(args.store.clone()).await?;
let mut daemon = Daemon::new(runtime).await?;
daemon.listen(listener?).await?;
Ok(())
}

28
daemon/examples/dial.rs Normal file
View File

@ -0,0 +1,28 @@
use anyhow::Result;
use krata::control::{ListRequest, Message, Request, RequestBox};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpStream,
};
use tokio_stream::{wrappers::LinesStream, StreamExt};
#[tokio::main]
async fn main() -> Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:4050").await?;
let (read, mut write) = stream.split();
let mut read = LinesStream::new(BufReader::new(read).lines());
let send = Message::Request(RequestBox {
id: 1,
request: Request::List(ListRequest {}),
});
let mut line = serde_json::to_string(&send)?;
line.push('\n');
write.write_all(line.as_bytes()).await?;
println!("sent: {:?}", send);
while let Some(line) = read.try_next().await? {
let message: Message = serde_json::from_str(&line)?;
println!("received: {:?}", message);
}
Ok(())
}

View File

@ -0,0 +1,91 @@
use anyhow::{anyhow, Result};
use krata::control::{ConsoleStreamResponse, ConsoleStreamUpdate, Request, Response, StreamUpdate};
use log::warn;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
select,
};
use crate::{
listen::DaemonRequestHandler,
runtime::{console::XenConsole, Runtime},
};
use krata::stream::{ConnectionStreams, StreamContext};
pub struct ConsoleStreamRequestHandler {}
impl Default for ConsoleStreamRequestHandler {
fn default() -> Self {
Self::new()
}
}
impl ConsoleStreamRequestHandler {
pub fn new() -> Self {
Self {}
}
async fn link_console_stream(mut stream: StreamContext, mut console: XenConsole) -> Result<()> {
loop {
let mut buffer = vec![0u8; 256];
select! {
x = console.read_handle.read(&mut buffer) => match x {
Ok(size) => {
let data = buffer[0..size].to_vec();
let update = StreamUpdate::ConsoleStream(ConsoleStreamUpdate {
data,
});
stream.send(update).await?;
},
Err(error) => {
return Err(error.into());
}
},
x = stream.receiver.recv() => match x {
Some(StreamUpdate::ConsoleStream(update)) => {
console.write_handle.write_all(&update.data).await?;
}
None => {
break;
}
}
};
}
Ok(())
}
}
#[async_trait::async_trait]
impl DaemonRequestHandler for ConsoleStreamRequestHandler {
fn accepts(&self, request: &Request) -> bool {
matches!(request, Request::ConsoleStream(_))
}
async fn handle(
&self,
streams: ConnectionStreams,
runtime: Runtime,
request: Request,
) -> Result<Response> {
let console_stream = match request {
Request::ConsoleStream(stream) => stream,
_ => return Err(anyhow!("unknown request")),
};
let console = runtime.console(&console_stream.guest).await?;
let stream = streams.open().await?;
let id = stream.id;
tokio::task::spawn(async move {
if let Err(error) =
ConsoleStreamRequestHandler::link_console_stream(stream, console).await
{
warn!("failed to process console stream: {}", error);
}
});
Ok(Response::ConsoleStream(ConsoleStreamResponse {
stream: id,
}))
}
}

View File

@ -0,0 +1,44 @@
use anyhow::{anyhow, Result};
use krata::{
control::{DestroyResponse, Request, Response},
stream::ConnectionStreams,
};
use crate::{listen::DaemonRequestHandler, runtime::Runtime};
pub struct DestroyRequestHandler {}
impl Default for DestroyRequestHandler {
fn default() -> Self {
Self::new()
}
}
impl DestroyRequestHandler {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait]
impl DaemonRequestHandler for DestroyRequestHandler {
fn accepts(&self, request: &Request) -> bool {
matches!(request, Request::Destroy(_))
}
async fn handle(
&self,
_: ConnectionStreams,
runtime: Runtime,
request: Request,
) -> Result<Response> {
let destroy = match request {
Request::Destroy(destroy) => destroy,
_ => return Err(anyhow!("unknown request")),
};
let guest = runtime.destroy(&destroy.guest).await?;
Ok(Response::Destroy(DestroyResponse {
guest: guest.to_string(),
}))
}
}

View File

@ -0,0 +1,55 @@
use anyhow::{anyhow, Result};
use krata::{
control::{GuestInfo, LaunchResponse, Request, Response},
stream::ConnectionStreams,
};
use crate::{
listen::DaemonRequestHandler,
runtime::{launch::GuestLaunchRequest, Runtime},
};
pub struct LaunchRequestHandler {}
impl Default for LaunchRequestHandler {
fn default() -> Self {
Self::new()
}
}
impl LaunchRequestHandler {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait]
impl DaemonRequestHandler for LaunchRequestHandler {
fn accepts(&self, request: &Request) -> bool {
matches!(request, Request::Launch(_))
}
async fn handle(
&self,
_: ConnectionStreams,
runtime: Runtime,
request: Request,
) -> Result<Response> {
let launch = match request {
Request::Launch(launch) => launch,
_ => return Err(anyhow!("unknown request")),
};
let guest: GuestInfo = runtime
.launch(GuestLaunchRequest {
image: &launch.image,
vcpus: launch.vcpus,
mem: launch.mem,
env: launch.env,
run: launch.run,
debug: false,
})
.await?
.into();
Ok(Response::Launch(LaunchResponse { guest }))
}
}

View File

@ -0,0 +1,37 @@
use anyhow::Result;
use krata::{
control::{GuestInfo, ListResponse, Request, Response},
stream::ConnectionStreams,
};
use crate::{listen::DaemonRequestHandler, runtime::Runtime};
pub struct ListRequestHandler {}
impl Default for ListRequestHandler {
fn default() -> Self {
Self::new()
}
}
impl ListRequestHandler {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait]
impl DaemonRequestHandler for ListRequestHandler {
fn accepts(&self, request: &Request) -> bool {
matches!(request, Request::List(_))
}
async fn handle(&self, _: ConnectionStreams, runtime: Runtime, _: Request) -> Result<Response> {
let guests = runtime.list().await?;
let guests = guests
.into_iter()
.map(GuestInfo::from)
.collect::<Vec<GuestInfo>>();
Ok(Response::List(ListResponse { guests }))
}
}

View File

@ -0,0 +1,15 @@
pub mod console;
pub mod destroy;
pub mod launch;
pub mod list;
impl From<crate::runtime::GuestInfo> for krata::control::GuestInfo {
fn from(value: crate::runtime::GuestInfo) -> Self {
krata::control::GuestInfo {
id: value.uuid.to_string(),
image: value.image.clone(),
ipv4: value.ipv4.map(|x| x.ip().to_string()),
ipv6: value.ipv6.map(|x| x.ip().to_string()),
}
}
}

37
daemon/src/lib.rs Normal file
View File

@ -0,0 +1,37 @@
use anyhow::Result;
use handlers::{
console::ConsoleStreamRequestHandler, destroy::DestroyRequestHandler,
launch::LaunchRequestHandler, list::ListRequestHandler,
};
use listen::{DaemonListener, DaemonRequestHandlers};
use runtime::Runtime;
use tokio_listener::Listener;
pub mod handlers;
pub mod listen;
pub mod runtime;
pub struct Daemon {
runtime: Runtime,
}
impl Daemon {
pub async fn new(runtime: Runtime) -> Result<Self> {
Ok(Self { runtime })
}
pub async fn listen(&mut self, listener: Listener) -> Result<()> {
let handlers = DaemonRequestHandlers::new(
self.runtime.clone(),
vec![
Box::new(LaunchRequestHandler::new()),
Box::new(DestroyRequestHandler::new()),
Box::new(ConsoleStreamRequestHandler::new()),
Box::new(ListRequestHandler::new()),
],
);
let mut listener = DaemonListener::new(listener, handlers);
listener.handle().await?;
Ok(())
}
}

228
daemon/src/listen.rs Normal file
View File

@ -0,0 +1,228 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use krata::control::{ErrorResponse, Message, Request, RequestBox, Response, ResponseBox};
use log::trace;
use log::warn;
use tokio::sync::Mutex;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
select,
sync::mpsc::{channel, Receiver, Sender},
};
use tokio_listener::{Connection, Listener, SomeSocketAddrClonable};
use tokio_stream::{wrappers::LinesStream, StreamExt};
use crate::runtime::Runtime;
use krata::stream::ConnectionStreams;
const QUEUE_MAX_LEN: usize = 100;
#[async_trait::async_trait]
pub trait DaemonRequestHandler: Send + Sync {
fn accepts(&self, request: &Request) -> bool;
async fn handle(
&self,
streams: ConnectionStreams,
runtime: Runtime,
request: Request,
) -> Result<Response>;
}
#[derive(Clone)]
pub struct DaemonRequestHandlers {
runtime: Runtime,
handlers: Arc<Vec<Box<dyn DaemonRequestHandler>>>,
}
impl DaemonRequestHandlers {
pub fn new(runtime: Runtime, handlers: Vec<Box<dyn DaemonRequestHandler>>) -> Self {
DaemonRequestHandlers {
runtime,
handlers: Arc::new(handlers),
}
}
async fn dispatch(&self, streams: ConnectionStreams, request: Request) -> Result<Response> {
for handler in self.handlers.iter() {
if handler.accepts(&request) {
return handler.handle(streams, self.runtime.clone(), request).await;
}
}
Err(anyhow!("daemon cannot handle that request"))
}
}
pub struct DaemonListener {
listener: Listener,
handlers: DaemonRequestHandlers,
connections: Arc<Mutex<HashMap<u64, DaemonConnection>>>,
next: Arc<Mutex<u64>>,
}
impl DaemonListener {
pub fn new(listener: Listener, handlers: DaemonRequestHandlers) -> DaemonListener {
DaemonListener {
listener,
handlers,
connections: Arc::new(Mutex::new(HashMap::new())),
next: Arc::new(Mutex::new(0)),
}
}
pub async fn handle(&mut self) -> Result<()> {
loop {
let (connection, addr) = self.listener.accept().await?;
let connection =
DaemonConnection::new(connection, addr.clonable(), self.handlers.clone()).await?;
let id = {
let mut next = self.next.lock().await;
let id = *next;
*next = id + 1;
id
};
trace!("new connection from {}", connection.addr);
let tx_channel = connection.tx_sender.clone();
let addr = connection.addr.clone();
self.connections.lock().await.insert(id, connection);
let connections_for_close = self.connections.clone();
tokio::task::spawn(async move {
tx_channel.closed().await;
trace!("connection from {} closed", addr);
connections_for_close.lock().await.remove(&id);
});
}
}
}
#[derive(Clone)]
pub struct DaemonConnection {
tx_sender: Sender<Message>,
addr: SomeSocketAddrClonable,
handlers: DaemonRequestHandlers,
streams: ConnectionStreams,
}
impl DaemonConnection {
pub async fn new(
connection: Connection,
addr: SomeSocketAddrClonable,
handlers: DaemonRequestHandlers,
) -> Result<Self> {
let (tx_sender, tx_receiver) = channel::<Message>(QUEUE_MAX_LEN);
let streams_tx_sender = tx_sender.clone();
let instance = DaemonConnection {
tx_sender,
addr,
handlers,
streams: ConnectionStreams::new(streams_tx_sender),
};
{
let mut instance = instance.clone();
tokio::task::spawn(async move {
if let Err(error) = instance.process(tx_receiver, connection).await {
warn!(
"failed to process daemon connection for {}: {}",
instance.addr, error
);
}
});
}
Ok(instance)
}
async fn process(
&mut self,
mut tx_receiver: Receiver<Message>,
connection: Connection,
) -> Result<()> {
let (read, mut write) = tokio::io::split(connection);
let mut read = LinesStream::new(BufReader::new(read).lines());
loop {
select! {
x = read.next() => match x {
Some(Ok(line)) => {
let message: Message = serde_json::from_str(&line)?;
trace!("received message '{}' from {}", serde_json::to_string(&message)?, self.addr);
let mut context = self.clone();
tokio::task::spawn(async move {
if let Err(error) = context.handle_message(&message).await {
let line = serde_json::to_string(&message).unwrap_or("<invalid>".to_string());
warn!("failed to handle message '{}' from {}: {}", line, context.addr, error);
}
});
},
Some(Err(error)) => {
return Err(error.into());
},
None => {
break;
}
},
x = tx_receiver.recv() => match x {
Some(message) => {
if let Message::StreamUpdated(ref update) = message {
self.streams.outgoing(update).await?;
}
let mut line = serde_json::to_string(&message)?;
trace!("sending message '{}' to {}", line, self.addr);
line.push('\n');
write.write_all(line.as_bytes()).await?;
},
None => {
break;
}
}
};
}
Ok(())
}
async fn handle_message(&mut self, message: &Message) -> Result<()> {
match message {
Message::Request(req) => {
self.handle_request(req.clone()).await?;
}
Message::Response(_) => {
return Err(anyhow!(
"received a response message from client {}, but this is the daemon",
self.addr
));
}
Message::StreamUpdated(updated) => {
self.streams.incoming(updated.clone()).await?;
}
}
Ok(())
}
async fn handle_request(&mut self, req: RequestBox) -> Result<()> {
let id = req.id;
let response = self
.handlers
.dispatch(self.streams.clone(), req.request)
.await
.map_err(|error| {
Response::Error(ErrorResponse {
message: error.to_string(),
})
});
let response = if let Err(response) = response {
response
} else {
response.unwrap()
};
let resp = ResponseBox { id, response };
self.tx_sender.send(Message::Response(resp)).await?;
Ok(())
}
}

View File

@ -1,7 +1,7 @@
use crate::image::ImageInfo; use crate::runtime::image::ImageInfo;
use anyhow::Result; use anyhow::Result;
use backhand::{FilesystemWriter, NodeHeader}; use backhand::{FilesystemWriter, NodeHeader};
use krata::LaunchInfo; use krata::launchcfg::LaunchInfo;
use log::trace; use log::trace;
use std::fs; use std::fs;
use std::fs::File; use std::fs::File;

View File

@ -0,0 +1,18 @@
use anyhow::Result;
use tokio::fs::File;
pub struct XenConsole {
pub read_handle: File,
pub write_handle: File,
}
impl XenConsole {
pub async fn new(tty: &str) -> Result<XenConsole> {
let read_handle = File::options().read(true).write(false).open(tty).await?;
let write_handle = File::options().read(false).write(true).open(tty).await?;
Ok(XenConsole {
read_handle,
write_handle,
})
}
}

View File

@ -1,4 +1,5 @@
use crate::image::{ImageInfo, Result}; use super::ImageInfo;
use anyhow::Result;
use log::debug; use log::debug;
use oci_spec::image::{ImageConfiguration, ImageManifest}; use oci_spec::image::{ImageConfiguration, ImageManifest};
use std::fs; use std::fs;

View File

@ -2,9 +2,9 @@ pub mod cache;
pub mod fetch; pub mod fetch;
pub mod name; pub mod name;
use crate::image::cache::ImageCache; use crate::runtime::image::cache::ImageCache;
use crate::image::fetch::RegistryClient; use crate::runtime::image::fetch::RegistryClient;
use crate::image::name::ImageName; use crate::runtime::image::name::ImageName;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use backhand::compression::Compressor; use backhand::compression::Compressor;
use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader}; use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader};

View File

@ -1,25 +1,23 @@
use std::net::IpAddr;
use std::{fs, net::Ipv4Addr, str::FromStr}; use std::{fs, net::Ipv4Addr, str::FromStr};
use advmac::MacAddr6; use advmac::MacAddr6;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use ipnetwork::Ipv4Network; use ipnetwork::{IpNetwork, Ipv4Network};
use krata::{ use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
}; };
use uuid::Uuid; use uuid::Uuid;
use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface};
use xenstore::client::XsdInterface; use xenstore::client::XsdInterface;
use crate::{ use crate::runtime::cfgblk::ConfigBlock;
ctl::GuestInfo, use crate::runtime::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo};
image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}, use crate::runtime::RuntimeContext;
};
use crate::ctl::{cfgblk::ConfigBlock, ControllerContext}; use super::GuestInfo;
pub struct GuestLaunchRequest<'a> { pub struct GuestLaunchRequest<'a> {
pub kernel_path: &'a str,
pub initrd_path: &'a str,
pub image: &'a str, pub image: &'a str,
pub vcpus: u32, pub vcpus: u32,
pub mem: u64, pub mem: u64,
@ -35,9 +33,9 @@ impl GuestLauncher {
Ok(Self {}) Ok(Self {})
} }
pub async fn launch<'c, 'r>( pub async fn launch<'r>(
&mut self, &mut self,
context: &'c mut ControllerContext, context: &mut RuntimeContext,
request: GuestLaunchRequest<'r>, request: GuestLaunchRequest<'r>,
) -> Result<GuestInfo> { ) -> Result<GuestInfo> {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
@ -115,8 +113,8 @@ impl GuestLauncher {
name: &name, name: &name,
max_vcpus: request.vcpus, max_vcpus: request.vcpus,
mem_mb: request.mem, mem_mb: request.mem,
kernel_path: request.kernel_path, kernel_path: &context.kernel,
initrd_path: request.initrd_path, initrd_path: &context.initrd,
cmdline: &cmdline, cmdline: &cmdline,
disks: vec![ disks: vec![
DomainDisk { DomainDisk {
@ -186,8 +184,14 @@ impl GuestLauncher {
domid, domid,
image: request.image.to_string(), image: request.image.to_string(),
loops: vec![], loops: vec![],
ipv4: format!("{}/{}", guest_ipv4, ipv4_network_mask), ipv4: Some(IpNetwork::new(
ipv6: format!("{}/{}", guest_ipv6, ipv6_network_mask), IpAddr::V4(guest_ipv4),
ipv4_network_mask as u8,
)?),
ipv6: Some(IpNetwork::new(
IpAddr::V6(guest_ipv6),
ipv6_network_mask as u8,
)?),
}), }),
Err(error) => { Err(error) => {
let _ = context.autoloop.unloop(&image_squashfs_loop.path); let _ = context.autoloop.unloop(&image_squashfs_loop.path);
@ -204,7 +208,7 @@ impl GuestLauncher {
compiler.compile(&image).await compiler.compile(&image).await
} }
async fn allocate_ipv4(&mut self, context: &mut ControllerContext) -> Result<Ipv4Addr> { async fn allocate_ipv4(&mut self, context: &mut RuntimeContext) -> Result<Ipv4Addr> {
let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?;
let mut used: Vec<Ipv4Addr> = vec![]; let mut used: Vec<Ipv4Addr> = vec![];
for domid_candidate in context.xen.store.list("/local/domain").await? { for domid_candidate in context.xen.store.list("/local/domain").await? {

247
daemon/src/runtime/mod.rs Normal file
View File

@ -0,0 +1,247 @@
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
use ipnetwork::IpNetwork;
use loopdev::LoopControl;
use tokio::sync::Mutex;
use uuid::Uuid;
use xenclient::XenClient;
use xenstore::client::{XsdClient, XsdInterface};
use self::{
autoloop::AutoLoop,
console::XenConsole,
image::cache::ImageCache,
launch::{GuestLaunchRequest, GuestLauncher},
};
pub mod autoloop;
pub mod cfgblk;
pub mod console;
pub mod image;
pub mod launch;
pub struct ContainerLoopInfo {
pub device: String,
pub file: String,
pub delete: Option<String>,
}
pub struct GuestInfo {
pub uuid: Uuid,
pub domid: u32,
pub image: String,
pub loops: Vec<ContainerLoopInfo>,
pub ipv4: Option<IpNetwork>,
pub ipv6: Option<IpNetwork>,
}
pub struct RuntimeContext {
pub image_cache: ImageCache,
pub autoloop: AutoLoop,
pub xen: XenClient,
pub kernel: String,
pub initrd: String,
}
impl RuntimeContext {
pub async fn new(store: String) -> Result<Self> {
let mut image_cache_path = PathBuf::from(&store);
image_cache_path.push("cache");
fs::create_dir_all(&image_cache_path)?;
let xen = XenClient::open().await?;
image_cache_path.push("image");
fs::create_dir_all(&image_cache_path)?;
let image_cache = ImageCache::new(&image_cache_path)?;
let kernel = format!("{}/default/kernel", store);
let initrd = format!("{}/default/initrd", store);
Ok(RuntimeContext {
image_cache,
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
kernel,
initrd,
})
}
pub async fn list(&mut self) -> Result<Vec<GuestInfo>> {
let mut guests: Vec<GuestInfo> = Vec::new();
for domid_candidate in self.xen.store.list("/local/domain").await? {
let dom_path = format!("/local/domain/{}", domid_candidate);
let uuid_string = match self
.xen
.store
.read_string(&format!("{}/krata/uuid", &dom_path))
.await?
{
None => continue,
Some(value) => value,
};
let domid =
u32::from_str(&domid_candidate).map_err(|_| anyhow!("failed to parse domid"))?;
let uuid = Uuid::from_str(&uuid_string)?;
let image = self
.xen
.store
.read_string(&format!("{}/krata/image", &dom_path))
.await?
.unwrap_or("unknown".to_string());
let loops = self
.xen
.store
.read_string(&format!("{}/krata/loops", &dom_path))
.await?;
let ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/guest/ipv4", &dom_path))
.await?;
let ipv6 = self
.xen
.store
.read_string(&format!("{}/krata/network/guest/ipv6", &dom_path))
.await?;
let ipv4 = if let Some(ipv4) = ipv4 {
IpNetwork::from_str(&ipv4).ok()
} else {
None
};
let ipv6 = if let Some(ipv6) = ipv6 {
IpNetwork::from_str(&ipv6).ok()
} else {
None
};
let loops = RuntimeContext::parse_loop_set(&loops);
guests.push(GuestInfo {
uuid,
domid,
image,
loops,
ipv4,
ipv6,
});
}
Ok(guests)
}
pub async fn resolve(&mut self, id: &str) -> Result<Option<GuestInfo>> {
for guest in self.list().await? {
let uuid_string = guest.uuid.to_string();
let domid_string = guest.domid.to_string();
if uuid_string == id || domid_string == id || id == format!("krata-{}", uuid_string) {
return Ok(Some(guest));
}
}
Ok(None)
}
fn parse_loop_set(input: &Option<String>) -> Vec<ContainerLoopInfo> {
let Some(input) = input else {
return Vec::new();
};
let sets = input
.split(',')
.map(|x| x.to_string())
.map(|x| x.split(':').map(|v| v.to_string()).collect::<Vec<String>>())
.map(|x| (x[0].clone(), x[1].clone(), x[2].clone()))
.collect::<Vec<(String, String, String)>>();
sets.iter()
.map(|(device, file, delete)| ContainerLoopInfo {
device: device.clone(),
file: file.clone(),
delete: if delete == "none" {
None
} else {
Some(delete.clone())
},
})
.collect::<Vec<ContainerLoopInfo>>()
}
}
#[derive(Clone)]
pub struct Runtime {
context: Arc<Mutex<RuntimeContext>>,
}
impl Runtime {
pub async fn new(store: String) -> Result<Self> {
let context = RuntimeContext::new(store).await?;
Ok(Self {
context: Arc::new(Mutex::new(context)),
})
}
pub async fn launch<'a>(&self, request: GuestLaunchRequest<'a>) -> Result<GuestInfo> {
let mut context = self.context.lock().await;
let mut launcher = GuestLauncher::new()?;
launcher.launch(&mut context, request).await
}
pub async fn destroy(&self, id: &str) -> Result<Uuid> {
let mut context = self.context.lock().await;
let info = context
.resolve(id)
.await?
.ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?;
let domid = info.domid;
let mut store = XsdClient::open().await?;
let dom_path = store.get_domain_path(domid).await?;
let uuid = match store
.read_string(format!("{}/krata/uuid", dom_path).as_str())
.await?
{
None => {
return Err(anyhow!(
"domain {} was not found or not created by krata",
domid
))
}
Some(value) => value,
};
if uuid.is_empty() {
return Err(anyhow!("unable to find krata uuid based on the domain",));
}
let uuid = Uuid::parse_str(&uuid)?;
let loops = store
.read_string(format!("{}/krata/loops", dom_path).as_str())
.await?;
let loops = RuntimeContext::parse_loop_set(&loops);
context.xen.destroy(domid).await?;
for info in &loops {
context.autoloop.unloop(&info.device)?;
match &info.delete {
None => {}
Some(delete) => {
let delete_path = PathBuf::from(delete);
if delete_path.is_file() || delete_path.is_symlink() {
fs::remove_file(&delete_path)?;
} else if delete_path.is_dir() {
fs::remove_dir_all(&delete_path)?;
}
}
}
}
Ok(uuid)
}
pub async fn console(&self, id: &str) -> Result<XenConsole> {
let mut context = self.context.lock().await;
let info = context
.resolve(id)
.await?
.ok_or_else(|| anyhow!("unable to resolve guest: {}", id))?;
let domid = info.domid;
let tty = context.xen.get_console_path(domid).await?;
XenConsole::new(&tty).await
}
pub async fn list(&self) -> Result<Vec<GuestInfo>> {
let mut context = self.context.lock().await;
context.list().await
}
}

View File

@ -1,5 +1,5 @@
[package] [package]
name = "kratactr" name = "krataguest"
version.workspace = true version.workspace = true
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"
@ -18,6 +18,7 @@ tokio = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
ipnetwork = { workspace = true } ipnetwork = { workspace = true }
path-absolutize = { workspace = true } path-absolutize = { workspace = true }
tokio-stream = { workspace = true }
[dependencies.nix] [dependencies.nix]
workspace = true workspace = true
@ -33,5 +34,5 @@ path = "../libs/xen/xenstore"
path = "src/lib.rs" path = "src/lib.rs"
[[bin]] [[bin]]
name = "kratactr" name = "krataguest"
path = "bin/init.rs" path = "bin/init.rs"

View File

@ -1,6 +1,6 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use env_logger::Env; use env_logger::Env;
use kratactr::init::ContainerInit; use krataguest::init::GuestInit;
use std::env; use std::env;
#[tokio::main] #[tokio::main]
@ -18,7 +18,7 @@ async fn main() -> Result<()> {
)); ));
} }
} }
let mut container = ContainerInit::new(); let mut guest = GuestInit::new();
container.init().await?; guest.init().await?;
Ok(()) Ok(())
} }

View File

@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use krata::ethtool::EthtoolHandle; use krata::ethtool::EthtoolHandle;
use krata::{LaunchInfo, LaunchNetwork}; use krata::launchcfg::{LaunchInfo, LaunchNetwork};
use log::{trace, warn}; use log::{trace, warn};
use nix::libc::{dup2, ioctl}; use nix::libc::{dup2, ioctl};
use nix::unistd::{execve, fork, ForkResult, Pid}; use nix::unistd::{execve, fork, ForkResult, Pid};
@ -47,17 +47,17 @@ const NEW_ROOT_DEV_PATH: &str = "/newroot/dev";
const IMAGE_CONFIG_JSON_PATH: &str = "/config/image/config.json"; const IMAGE_CONFIG_JSON_PATH: &str = "/config/image/config.json";
const LAUNCH_CONFIG_JSON_PATH: &str = "/config/launch.json"; const LAUNCH_CONFIG_JSON_PATH: &str = "/config/launch.json";
pub struct ContainerInit {} pub struct GuestInit {}
impl Default for ContainerInit { impl Default for GuestInit {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new()
} }
} }
impl ContainerInit { impl GuestInit {
pub fn new() -> ContainerInit { pub fn new() -> GuestInit {
ContainerInit {} GuestInit {}
} }
pub async fn init(&mut self) -> Result<()> { pub async fn init(&mut self) -> Result<()> {
@ -407,8 +407,8 @@ impl ContainerInit {
env.extend_from_slice(extra_env.as_slice()); env.extend_from_slice(extra_env.as_slice());
} }
let env = ContainerInit::env_map(env); let env = GuestInit::env_map(env);
let path = ContainerInit::resolve_executable(&env, path.into())?; let path = GuestInit::resolve_executable(&env, path.into())?;
let Some(file_name) = path.file_name() else { let Some(file_name) = path.file_name() else {
return Err(anyhow!("cannot get file name of command path")); return Err(anyhow!("cannot get file name of command path"));
}; };
@ -416,13 +416,13 @@ impl ContainerInit {
return Err(anyhow!("cannot get file name of command path as str")); return Err(anyhow!("cannot get file name of command path as str"));
}; };
cmd.insert(0, file_name.to_string()); cmd.insert(0, file_name.to_string());
let env = ContainerInit::env_list(env); let env = GuestInit::env_list(env);
trace!("running container command: {}", cmd.join(" ")); trace!("running container command: {}", cmd.join(" "));
let path = CString::new(path.as_os_str().as_bytes())?; let path = CString::new(path.as_os_str().as_bytes())?;
let cmd = ContainerInit::strings_as_cstrings(cmd)?; let cmd = GuestInit::strings_as_cstrings(cmd)?;
let env = ContainerInit::strings_as_cstrings(env)?; let env = GuestInit::strings_as_cstrings(env)?;
let mut working_dir = config let mut working_dir = config
.working_dir() .working_dir()
.as_ref() .as_ref()
@ -501,7 +501,7 @@ impl ContainerInit {
cmd: Vec<CString>, cmd: Vec<CString>,
env: Vec<CString>, env: Vec<CString>,
) -> Result<()> { ) -> Result<()> {
ContainerInit::set_controlling_terminal()?; GuestInit::set_controlling_terminal()?;
execve(&path, &cmd, &env)?; execve(&path, &cmd, &env)?;
Ok(()) Ok(())
} }

View File

@ -6,9 +6,9 @@ TARGET="x86_64-unknown-linux-gnu"
export RUSTFLAGS="-Ctarget-feature=+crt-static" export RUSTFLAGS="-Ctarget-feature=+crt-static"
cd "$(dirname "${0}")/.." cd "$(dirname "${0}")/.."
krata_DIR="${PWD}" krata_DIR="${PWD}"
cargo build -q --bin kratactr --release --target "${TARGET}" cargo build -q --bin krataguest --release --target "${TARGET}"
INITRD_DIR="$(mktemp -d /tmp/krata-initrd.XXXXXXXXXXXXX)" INITRD_DIR="$(mktemp -d /tmp/krata-initrd.XXXXXXXXXXXXX)"
cp "target/${TARGET}/release/kratactr" "${INITRD_DIR}/init" cp "target/${TARGET}/release/krataguest" "${INITRD_DIR}/init"
chmod +x "${INITRD_DIR}/init" chmod +x "${INITRD_DIR}/init"
cd "${INITRD_DIR}" cd "${INITRD_DIR}"
mkdir -p "${krata_DIR}/target/initrd" mkdir -p "${krata_DIR}/target/initrd"

View File

@ -0,0 +1,12 @@
[Unit]
Description=Krata Controller Daemon
[Service]
Restart=on-failure
Type=simple
WorkingDirectory=/var/lib/krata
ExecStart=/usr/bin/kratad
User=root
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,12 @@
[Unit]
Description=Krata Networking Daemon
[Service]
Restart=on-failure
Type=simple
WorkingDirectory=/var/lib/krata
ExecStart=/usr/bin/kratanet
User=root
[Install]
WantedBy=multi-user.target

8
scripts/kratad-debug.sh Executable file
View File

@ -0,0 +1,8 @@
#!/bin/sh
set -e
REAL_SCRIPT="$(realpath "${0}")"
# shellcheck source-path=krata-debug-common.sh
. "$(dirname "${REAL_SCRIPT}")/krata-debug-common.sh"
KRATA_BUILD_INITRD=1 build_and_run kratad "${@}"

View File

@ -8,6 +8,8 @@ resolver = "2"
anyhow = { workspace = true } anyhow = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
libc = { workspace = true } libc = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }
[dependencies.nix] [dependencies.nix]
workspace = true workspace = true

115
shared/src/control.rs Normal file
View File

@ -0,0 +1,115 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GuestInfo {
pub id: String,
pub image: String,
pub ipv4: Option<String>,
pub ipv6: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchRequest {
pub image: String,
pub vcpus: u32,
pub mem: u64,
pub env: Option<Vec<String>>,
pub run: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchResponse {
pub guest: GuestInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListRequest {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListResponse {
pub guests: Vec<GuestInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DestroyRequest {
pub guest: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DestroyResponse {
pub guest: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleStreamRequest {
pub guest: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleStreamResponse {
pub stream: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleStreamUpdate {
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorResponse {
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Request {
Launch(LaunchRequest),
Destroy(DestroyRequest),
List(ListRequest),
ConsoleStream(ConsoleStreamRequest),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
Error(ErrorResponse),
Launch(LaunchResponse),
Destroy(DestroyResponse),
List(ListResponse),
ConsoleStream(ConsoleStreamResponse),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestBox {
pub id: u64,
pub request: Request,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResponseBox {
pub id: u64,
pub response: Response,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum StreamStatus {
Open,
Closed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamUpdate {
ConsoleStream(ConsoleStreamUpdate),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamUpdated {
pub id: u64,
pub update: Option<StreamUpdate>,
pub status: StreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
Request(RequestBox),
Response(ResponseBox),
StreamUpdated(StreamUpdated),
}

33
shared/src/launchcfg.rs Normal file
View File

@ -0,0 +1,33 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkIpv4 {
pub address: String,
pub gateway: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkIpv6 {
pub address: String,
pub gateway: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkResolver {
pub nameservers: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetwork {
pub link: String,
pub ipv4: LaunchNetworkIpv4,
pub ipv6: LaunchNetworkIpv6,
pub resolver: LaunchNetworkResolver,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchInfo {
pub network: Option<LaunchNetwork>,
pub env: Option<Vec<String>>,
pub run: Option<Vec<String>>,
}

View File

@ -1,35 +1,4 @@
pub mod control;
pub mod ethtool; pub mod ethtool;
pub mod launchcfg;
use serde::{Deserialize, Serialize}; pub mod stream;
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkIpv4 {
pub address: String,
pub gateway: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkIpv6 {
pub address: String,
pub gateway: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetworkResolver {
pub nameservers: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchNetwork {
pub link: String,
pub ipv4: LaunchNetworkIpv4,
pub ipv6: LaunchNetworkIpv6,
pub resolver: LaunchNetworkResolver,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LaunchInfo {
pub network: Option<LaunchNetwork>,
pub env: Option<Vec<String>>,
pub run: Option<Vec<String>>,
}

152
shared/src/stream.rs Normal file
View File

@ -0,0 +1,152 @@
use crate::control::{Message, StreamStatus, StreamUpdate, StreamUpdated};
use anyhow::{anyhow, Result};
use log::warn;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
pub struct StreamContext {
pub id: u64,
pub receiver: Receiver<StreamUpdate>,
sender: Sender<Message>,
}
impl StreamContext {
pub async fn send(&self, update: StreamUpdate) -> Result<()> {
self.sender
.send(Message::StreamUpdated(StreamUpdated {
id: self.id,
update: Some(update),
status: StreamStatus::Open,
}))
.await?;
Ok(())
}
}
impl Drop for StreamContext {
fn drop(&mut self) {
if self.sender.is_closed() {
return;
}
let result = self.sender.try_send(Message::StreamUpdated(StreamUpdated {
id: self.id,
update: None,
status: StreamStatus::Closed,
}));
if let Err(error) = result {
warn!(
"failed to send close message for stream {}: {}",
self.id, error
);
}
}
}
struct StreamStorage {
rx_sender: Sender<StreamUpdate>,
rx_receiver: Option<Receiver<StreamUpdate>>,
}
#[derive(Clone)]
pub struct ConnectionStreams {
next: Arc<Mutex<u64>>,
streams: Arc<Mutex<HashMap<u64, StreamStorage>>>,
tx_sender: Sender<Message>,
}
const QUEUE_MAX_LEN: usize = 100;
impl ConnectionStreams {
pub fn new(tx_sender: Sender<Message>) -> Self {
Self {
next: Arc::new(Mutex::new(0)),
streams: Arc::new(Mutex::new(HashMap::new())),
tx_sender,
}
}
pub async fn open(&self) -> Result<StreamContext> {
let id = {
let mut next = self.next.lock().await;
let id = *next;
*next = id + 1;
id
};
let (rx_sender, rx_receiver) = channel(QUEUE_MAX_LEN);
let store = StreamStorage {
rx_sender,
rx_receiver: None,
};
self.streams.lock().await.insert(id, store);
let open = Message::StreamUpdated(StreamUpdated {
id,
update: None,
status: StreamStatus::Open,
});
self.tx_sender.send(open).await?;
Ok(StreamContext {
id,
sender: self.tx_sender.clone(),
receiver: rx_receiver,
})
}
pub async fn incoming(&self, updated: StreamUpdated) -> Result<()> {
let mut streams = self.streams.lock().await;
if updated.update.is_none() && updated.status == StreamStatus::Open {
let (rx_sender, rx_receiver) = channel(QUEUE_MAX_LEN);
let store = StreamStorage {
rx_sender,
rx_receiver: Some(rx_receiver),
};
streams.insert(updated.id, store);
}
let Some(storage) = streams.get(&updated.id) else {
return Ok(());
};
if let Some(update) = updated.update {
storage.rx_sender.send(update).await?;
}
if updated.status == StreamStatus::Closed {
streams.remove(&updated.id);
}
Ok(())
}
pub async fn outgoing(&self, updated: &StreamUpdated) -> Result<()> {
if updated.status == StreamStatus::Closed {
let mut streams = self.streams.lock().await;
streams.remove(&updated.id);
}
Ok(())
}
pub async fn acquire(&self, id: u64) -> Result<StreamContext> {
let mut streams = self.streams.lock().await;
let Some(storage) = streams.get_mut(&id) else {
return Err(anyhow!("stream {} has not been opened", id));
};
let Some(receiver) = storage.rx_receiver.take() else {
return Err(anyhow!("stream has already been acquired"));
};
Ok(StreamContext {
id,
receiver,
sender: self.tx_sender.clone(),
})
}
}