krata: begin work on idm channel

This commit is contained in:
Alex Zenla 2024-03-28 07:36:48 +00:00
parent 63f7db6cf4
commit f7267e4f5d
No known key found for this signature in database
GPG Key ID: 067B238899B51269
9 changed files with 186 additions and 66 deletions

View File

@ -6,13 +6,19 @@ fn main() -> Result<()> {
.descriptor_pool("crate::DESCRIPTOR_POOL")
.configure(
&mut config,
&["../../proto/krata/v1/control.proto"],
&["../../proto/"],
&[
"../../proto/krata/v1/control.proto",
"proto/krata/internal/idm.proto",
],
&["../../proto/", "proto/"],
)?;
tonic_build::configure().compile_with_config(
config,
&["../../proto/krata/v1/control.proto"],
&["../../proto/"],
&[
"../../proto/krata/v1/control.proto",
"proto/krata/internal/idm.proto",
],
&["../../proto/", "proto/"],
)?;
Ok(())
}

View File

@ -0,0 +1,17 @@
syntax = "proto3";
package krata.internal.idm;
option java_multiple_files = true;
option java_package = "dev.krata.proto.internal.idm";
option java_outer_classname = "IdmProto";
message IdmExitMessage {
int32 code = 1;
}
message IdmPacket {
oneof message {
IdmExitMessage exit = 1;
}
}

1
crates/krata/src/idm.rs Normal file
View File

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), "/krata.idm.rs"));

40
crates/kratad/src/idm.rs Normal file
View File

@ -0,0 +1,40 @@
use anyhow::Result;
use kratart::channel::ChannelService;
use log::error;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
pub struct DaemonIdm {
receiver: Receiver<(u32, Vec<u8>)>,
task: JoinHandle<()>,
}
impl DaemonIdm {
pub async fn new() -> Result<DaemonIdm> {
let (service, receiver) = ChannelService::new("krata-channel".to_string()).await?;
let task = service.launch().await?;
Ok(DaemonIdm { receiver, task })
}
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
Ok(tokio::task::spawn(async move {
if let Err(error) = self.process().await {
error!("failed to process idm: {}", error);
}
}))
}
async fn process(&mut self) -> Result<()> {
loop {
let Some(_) = self.receiver.recv().await else {
break;
};
}
Ok(())
}
}
impl Drop for DaemonIdm {
fn drop(&mut self) {
self.task.abort();
}
}

View File

@ -4,6 +4,7 @@ use anyhow::Result;
use control::RuntimeControlService;
use db::GuestStore;
use event::{DaemonEventContext, DaemonEventGenerator};
use idm::DaemonIdm;
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use kratart::Runtime;
use log::info;
@ -20,6 +21,7 @@ use uuid::Uuid;
pub mod control;
pub mod db;
pub mod event;
pub mod idm;
pub mod reconcile;
pub struct Daemon {
@ -30,6 +32,7 @@ pub struct Daemon {
guest_reconciler_task: JoinHandle<()>,
guest_reconciler_notify: Sender<Uuid>,
generator_task: JoinHandle<()>,
idm_task: JoinHandle<()>,
}
const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;
@ -50,14 +53,20 @@ impl Daemon {
let runtime_for_reconciler = runtime.dupe().await?;
let guest_reconciler =
GuestReconciler::new(guests.clone(), events.clone(), runtime_for_reconciler)?;
let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?;
let idm = DaemonIdm::new().await?;
let idm_task = idm.launch().await?;
let generator_task = generator.launch().await?;
Ok(Self {
store,
runtime,
guests,
events,
guest_reconciler_task: guest_reconciler.launch(guest_reconciler_receiver).await?,
guest_reconciler_task,
guest_reconciler_notify,
generator_task: generator.launch().await?,
generator_task,
idm_task,
})
}
@ -121,5 +130,6 @@ impl Drop for Daemon {
fn drop(&mut self) {
self.guest_reconciler_task.abort();
self.generator_task.abort();
self.idm_task.abort();
}
}

View File

@ -1,19 +1,23 @@
use anyhow::Result;
use env_logger::Env;
use kratart::chan::KrataChannelService;
use xenevtchn::EventChannel;
use xengnt::GrantTab;
use xenstore::XsdClient;
use kratart::channel::ChannelService;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let mut krata = KrataChannelService::new(
EventChannel::open().await?,
XsdClient::open().await?,
GrantTab::open()?,
)?;
krata.watch().await?;
let (service, mut receiver) = ChannelService::new("krata-channel".to_string()).await?;
let task = service.launch().await?;
loop {
let Some((id, data)) = receiver.recv().await else {
break;
};
println!("domain {} = {:?}", id, data);
}
task.abort();
Ok(())
}

View File

@ -5,7 +5,7 @@ use std::{
};
use anyhow::{anyhow, Result};
use log::{error, info};
use log::{debug, error};
use tokio::{
select,
sync::{
@ -19,7 +19,8 @@ use xenevtchn::EventChannel;
use xengnt::{sys::GrantRef, GrantTab, MappedMemory};
use xenstore::{XsdClient, XsdInterface};
const KRATA_SINGLE_CHANNEL_QUEUE_LEN: usize = 100;
const SINGLE_CHANNEL_QUEUE_LEN: usize = 100;
const GROUPED_CHANNEL_QUEUE_LEN: usize = 1000;
#[repr(C)]
struct XenConsoleInterface {
@ -38,45 +39,87 @@ impl XenConsoleInterface {
const OUTPUT_SIZE: usize = 2048;
}
pub struct KrataChannelService {
backends: HashMap<(u32, u32), KrataChannelBackend>,
pub struct ChannelService {
typ: String,
backends: HashMap<u32, ChannelBackend>,
evtchn: EventChannel,
store: XsdClient,
gnttab: GrantTab,
input_receiver: Receiver<(u32, Vec<u8>)>,
pub input_sender: Sender<(u32, Vec<u8>)>,
output_sender: Sender<(u32, Vec<u8>)>,
}
impl KrataChannelService {
pub fn new(
evtchn: EventChannel,
store: XsdClient,
gnttab: GrantTab,
) -> Result<KrataChannelService> {
Ok(KrataChannelService {
backends: HashMap::new(),
evtchn,
store,
gnttab,
})
impl ChannelService {
pub async fn new(typ: String) -> Result<(ChannelService, Receiver<(u32, Vec<u8>)>)> {
let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
Ok((
ChannelService {
typ,
backends: HashMap::new(),
evtchn: EventChannel::open().await?,
store: XsdClient::open().await?,
gnttab: GrantTab::open()?,
input_sender,
input_receiver,
output_sender,
},
output_receiver,
))
}
pub async fn watch(&mut self) -> Result<()> {
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
Ok(tokio::task::spawn(async move {
if let Err(error) = self.process().await {
error!("channel processor failed: {}", error);
}
}))
}
async fn process(&mut self) -> Result<()> {
self.scan_all_backends().await?;
let mut watch_handle = self.store.create_watch().await?;
self.store
.bind_watch(&watch_handle, "/local/domain/0/backend/console".to_string())
.await?;
loop {
let Some(_) = watch_handle.receiver.recv().await else {
break;
};
select! {
x = watch_handle.receiver.recv() => match x {
Some(_) => {
self.scan_all_backends().await?;
}
self.scan_all_backends().await?;
None => {
break;
}
},
x = self.input_receiver.recv() => match x {
Some((domid, data)) => {
if let Some(backend) = self.backends.get_mut(&domid) {
let _ = backend.sender.try_send(data);
}
},
None => {
break;
}
}
}
}
Ok(())
}
pub async fn send(&mut self, domid: u32, message: Vec<u8>) -> Result<()> {
if let Some(backend) = self.backends.get(&domid) {
backend.sender.send(message).await?;
}
Ok(())
}
async fn ensure_backend_exists(&mut self, domid: u32, id: u32, path: String) -> Result<()> {
if self.backends.contains_key(&(domid, id)) {
if self.backends.contains_key(&domid) {
return Ok(());
}
let Some(frontend_path) = self.store.read_string(format!("{}/frontend", path)).await?
@ -91,11 +134,11 @@ impl KrataChannelService {
return Ok(());
};
if typ != "krata-channel" {
if typ != self.typ {
return Ok(());
}
let backend = KrataChannelBackend::new(
let backend = ChannelBackend::new(
path.clone(),
frontend_path.clone(),
domid,
@ -103,15 +146,16 @@ impl KrataChannelService {
self.store.clone(),
self.evtchn.clone(),
self.gnttab.clone(),
self.output_sender.clone(),
)
.await?;
self.backends.insert((domid, id), backend);
self.backends.insert(domid, backend);
Ok(())
}
async fn scan_all_backends(&mut self) -> Result<()> {
let domains = self.store.list("/local/domain/0/backend/console").await?;
let mut seen: Vec<(u32, u32)> = Vec::new();
let mut seen: Vec<u32> = Vec::new();
for domid_string in &domains {
let domid = domid_string.parse::<u32>()?;
let domid_path = format!("/local/domain/0/backend/console/{}", domid);
@ -122,11 +166,11 @@ impl KrataChannelService {
domid_string, id_string
);
self.ensure_backend_exists(domid, id, console_path).await?;
seen.push((domid, id));
seen.push(domid);
}
}
let mut gone: Vec<(u32, u32)> = Vec::new();
let mut gone: Vec<u32> = Vec::new();
for backend in self.backends.keys() {
if !seen.contains(backend) {
gone.push(*backend);
@ -143,25 +187,25 @@ impl KrataChannelService {
}
}
pub struct KrataChannelBackend {
pub struct ChannelBackend {
pub domid: u32,
pub id: u32,
pub receiver: Receiver<Vec<u8>>,
pub sender: Sender<Vec<u8>>,
task: JoinHandle<()>,
}
impl Drop for KrataChannelBackend {
impl Drop for ChannelBackend {
fn drop(&mut self) {
self.task.abort();
info!(
debug!(
"destroyed channel backend for domain {} channel {}",
self.domid, self.id
);
}
}
impl KrataChannelBackend {
impl ChannelBackend {
#[allow(clippy::too_many_arguments)]
pub async fn new(
backend: String,
frontend: String,
@ -170,7 +214,8 @@ impl KrataChannelBackend {
store: XsdClient,
evtchn: EventChannel,
gnttab: GrantTab,
) -> Result<KrataChannelBackend> {
output_sender: Sender<(u32, Vec<u8>)>,
) -> Result<ChannelBackend> {
let processor = KrataChannelBackendProcessor {
backend,
frontend,
@ -181,15 +226,13 @@ impl KrataChannelBackend {
gnttab,
};
let (output_sender, output_receiver) = channel(KRATA_SINGLE_CHANNEL_QUEUE_LEN);
let (input_sender, input_receiver) = channel(KRATA_SINGLE_CHANNEL_QUEUE_LEN);
let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN);
let task = processor.launch(output_sender, input_receiver).await?;
Ok(KrataChannelBackend {
Ok(ChannelBackend {
domid,
id,
task,
receiver: output_receiver,
sender: input_sender,
})
}
@ -211,7 +254,7 @@ impl KrataChannelBackendProcessor {
self.store
.write_string(format!("{}/state", self.backend), "3")
.await?;
info!(
debug!(
"created channel backend for domain {} channel {}",
self.domid, self.id
);
@ -246,7 +289,7 @@ impl KrataChannelBackendProcessor {
async fn launch(
&self,
output_sender: Sender<Vec<u8>>,
output_sender: Sender<(u32, Vec<u8>)>,
input_receiver: Receiver<Vec<u8>>,
) -> Result<JoinHandle<()>> {
let owned = self.clone();
@ -263,7 +306,7 @@ impl KrataChannelBackendProcessor {
async fn processor(
&self,
sender: Sender<Vec<u8>>,
sender: Sender<(u32, Vec<u8>)>,
mut receiver: Receiver<Vec<u8>>,
) -> Result<()> {
self.init().await?;
@ -335,7 +378,7 @@ impl KrataChannelBackendProcessor {
unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() {
sender.send(buffer).await?;
sender.send((self.domid, buffer)).await?;
}
};
@ -404,7 +447,7 @@ impl KrataChannelBackendProcessor {
unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() {
sender.send(buffer).await?;
sender.send((self.domid, buffer)).await?;
}
};
channel.unmask_sender.send(channel.local_port).await?;

View File

@ -9,7 +9,7 @@ use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
};
use uuid::Uuid;
use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface};
use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface};
use xenstore::XsdInterface;
use crate::cfgblk::ConfigBlock;
@ -180,11 +180,10 @@ impl GuestLauncher {
writable: false,
},
],
// channels: vec![DomainChannel {
// typ: "krata-channel".to_string(),
// initialized: false,
// }],
channels: vec![],
channels: vec![DomainChannel {
typ: "krata-channel".to_string(),
initialized: false,
}],
vifs: vec![DomainNetworkInterface {
mac: &guest_mac_string,
mtu: 1500,

View File

@ -26,7 +26,7 @@ use krataoci::cache::ImageCache;
pub mod autoloop;
pub mod cfgblk;
pub mod chan;
pub mod channel;
pub mod console;
pub mod launch;