diff --git a/crates/kratart/Cargo.toml b/crates/kratart/Cargo.toml index 59750a3..d2f6e8b 100644 --- a/crates/kratart/Cargo.toml +++ b/crates/kratart/Cargo.toml @@ -17,6 +17,8 @@ serde_json = { workspace = true } tokio = { workspace = true } uuid = { workspace = true } xenclient = { path = "../xen/xenclient" } +xenevtchn = { path = "../xen/xenevtchn" } +xengnt = { path = "../xen/xengnt" } xenstore = { path = "../xen/xenstore" } [lib] @@ -28,3 +30,7 @@ env_logger = { workspace = true } [[example]] name = "kratart-squashify" path = "examples/squashify.rs" + +[[example]] +name = "kratart-channel" +path = "examples/channel.rs" diff --git a/crates/kratart/examples/channel.rs b/crates/kratart/examples/channel.rs new file mode 100644 index 0000000..adc8538 --- /dev/null +++ b/crates/kratart/examples/channel.rs @@ -0,0 +1,15 @@ +use anyhow::Result; +use env_logger::Env; +use kratart::chan::KrataChannelService; +use xenevtchn::EventChannel; +use xenstore::XsdClient; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + + let mut krata = + KrataChannelService::new(EventChannel::open().await?, XsdClient::open().await?)?; + krata.init().await?; + Ok(()) +} diff --git a/crates/kratart/src/chan.rs b/crates/kratart/src/chan.rs new file mode 100644 index 0000000..420939a --- /dev/null +++ b/crates/kratart/src/chan.rs @@ -0,0 +1,158 @@ +use std::collections::HashMap; + +use anyhow::Result; +use xenevtchn::EventChannel; +use xengnt::{sys::GrantRef, GrantTab}; +use xenstore::{XsdClient, XsdInterface}; + +#[repr(C)] +struct XenConsoleInterface { + input: [u8; 1024], + output: [u8; 2048], + in_cons: u32, + in_prod: u32, + out_cons: u32, + out_prod: u32, +} + +pub struct KrataChannelService { + backends: HashMap<(u32, u32), KrataChannelBackend>, + evtchn: EventChannel, + store: XsdClient, +} + +impl KrataChannelService { + pub fn new(evtchn: EventChannel, store: XsdClient) -> Result { + Ok(KrataChannelService { + backends: HashMap::new(), + evtchn, + store, + }) + } + + pub async fn init(&mut self) -> Result<()> { + let domains = self.store.list("/local/domain/0/backend/console").await?; + for domid_string in domains { + let domid = domid_string.parse::()?; + let domid_path = format!("/local/domain/0/backend/console/{}", domid); + for id_string in self.store.list(&domid_path).await? { + let id = id_string.parse::()?; + let console_path = format!( + "/local/domain/0/backend/console/{}/{}", + domid_string, id_string + ); + let Some(frontend_path) = self + .store + .read_string(format!("{}/frontend", console_path)) + .await? + else { + continue; + }; + let Some(typ) = self + .store + .read_string(format!("{}/type", frontend_path)) + .await? + else { + continue; + }; + + if typ != "krata-channel" { + continue; + } + + let Some(ring_ref_string) = self + .store + .read_string(format!("{}/ring-ref", frontend_path)) + .await? + else { + continue; + }; + + let Some(port_string) = self + .store + .read_string(format!("{}/port", frontend_path)) + .await? + else { + continue; + }; + + let ring_ref = ring_ref_string.parse::()?; + let port = port_string.parse::()?; + let backend = KrataChannelBackend { + backend: console_path.clone(), + domid, + ring_ref, + port, + store: self.store.clone(), + evtchn: self.evtchn.clone(), + grant: GrantTab::open()?, + }; + + backend.init().await?; + self.backends.insert((domid, id), backend); + } + } + Ok(()) + } +} + +#[derive(Clone)] +pub struct KrataChannelBackend { + backend: String, + domid: u32, + ring_ref: u64, + port: u32, + store: XsdClient, + evtchn: EventChannel, + grant: GrantTab, +} + +impl KrataChannelBackend { + pub async fn init(&self) -> Result<()> { + self.store.write_string(&self.backend, "4").await?; + Ok(()) + } + + pub async fn read(&self) -> Result<()> { + let memory = self.grant.map_grant_refs( + vec![GrantRef { + domid: self.domid, + reference: self.ring_ref as u32, + }], + true, + true, + )?; + let interface = memory.ptr() as *mut XenConsoleInterface; + let mut channel = self.evtchn.bind(self.domid, self.port).await?; + unsafe { self.read_buffer(channel.local_port, interface).await? }; + loop { + channel.receiver.recv().await?; + unsafe { self.read_buffer(channel.local_port, interface).await? }; + channel.unmask_sender.send(channel.local_port).await?; + } + } + + async unsafe fn read_buffer( + &self, + local_port: u32, + interface: *mut XenConsoleInterface, + ) -> Result<()> { + let mut cons = (*interface).out_cons; + let prod = (*interface).out_prod; + let size = prod - cons; + if size == 0 || size > 2048 { + return Ok(()); + } + let mut data: Vec = Vec::new(); + loop { + if cons == prod { + break; + } + data.push((*interface).output[cons as usize]); + cons += 1; + } + (*interface).out_cons = cons; + self.evtchn.notify(local_port).await?; + Ok(()) + } +} diff --git a/crates/kratart/src/launch/mod.rs b/crates/kratart/src/launch/mod.rs index db97d2b..7945e4f 100644 --- a/crates/kratart/src/launch/mod.rs +++ b/crates/kratart/src/launch/mod.rs @@ -9,7 +9,7 @@ use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, }; use uuid::Uuid; -use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; +use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; @@ -180,7 +180,10 @@ impl GuestLauncher { writable: false, }, ], - consoles: vec![], + channels: vec![DomainChannel { + typ: "krata-channel".to_string(), + initialized: true, + }], vifs: vec![DomainNetworkInterface { mac: &guest_mac_string, mtu: 1500, @@ -193,10 +196,10 @@ impl GuestLauncher { extra_rw_paths: vec!["krata/guest".to_string()], }; match context.xen.create(&config).await { - Ok(domid) => Ok(GuestInfo { + Ok(created) => Ok(GuestInfo { name: request.name.map(|x| x.to_string()), uuid, - domid, + domid: created.domid, image: request.image.to_string(), loops: vec![], guest_ipv4: Some(IpNetwork::new( diff --git a/crates/kratart/src/lib.rs b/crates/kratart/src/lib.rs index 06085e5..f1632a9 100644 --- a/crates/kratart/src/lib.rs +++ b/crates/kratart/src/lib.rs @@ -26,6 +26,7 @@ use krataoci::cache::ImageCache; pub mod autoloop; pub mod cfgblk; +pub mod chan; pub mod console; pub mod launch; diff --git a/crates/xen/xenclient/examples/boot.rs b/crates/xen/xenclient/examples/boot.rs index 3ade6d1..c2564c5 100644 --- a/crates/xen/xenclient/examples/boot.rs +++ b/crates/xen/xenclient/examples/boot.rs @@ -23,14 +23,14 @@ async fn main() -> Result<()> { initrd_path: initrd_path.as_str(), cmdline: "debug elevator=noop", disks: vec![], - consoles: vec![], + channels: vec![], vifs: vec![], filesystems: vec![], extra_keys: vec![], extra_rw_paths: vec![], event_channels: vec![], }; - let domid = client.create(&config).await?; - println!("created domain {}", domid); + let created = client.create(&config).await?; + println!("created domain {}", created.domid); Ok(()) } diff --git a/crates/xen/xenclient/src/boot.rs b/crates/xen/xenclient/src/boot.rs index 114fb30..ff559eb 100644 --- a/crates/xen/xenclient/src/boot.rs +++ b/crates/xen/xenclient/src/boot.rs @@ -58,7 +58,6 @@ pub struct BootState { pub kernel_segment: DomainSegment, pub start_info_segment: DomainSegment, pub xenstore_segment: DomainSegment, - pub console_segment: DomainSegment, pub boot_stack_segment: DomainSegment, pub p2m_segment: Option, pub page_table_segment: Option, @@ -66,7 +65,7 @@ pub struct BootState { pub shared_info_frame: u64, pub initrd_segment: DomainSegment, pub store_evtchn: u32, - pub console_evtchn: u32, + pub consoles: Vec<(u32, DomainSegment)>, } impl BootSetup<'_> { @@ -114,6 +113,7 @@ impl BootSetup<'_> { initrd: &[u8], max_vcpus: u32, mem_mb: u64, + console_count: usize, ) -> Result { debug!("initialize max_vcpus={:?} mem_mb={:?}", max_vcpus, mem_mb); @@ -145,7 +145,12 @@ impl BootSetup<'_> { } let start_info_segment = self.alloc_page(arch)?; let xenstore_segment = self.alloc_page(arch)?; - let console_segment = self.alloc_page(arch)?; + let mut consoles: Vec<(u32, DomainSegment)> = Vec::new(); + for _ in 0..console_count { + let evtchn = self.call.evtchn_alloc_unbound(self.domid, 0)?; + let page = self.alloc_page(arch)?; + consoles.push((evtchn, page)); + } let page_table_segment = arch.alloc_page_tables(self, &image_info)?; let boot_stack_segment = self.alloc_page(arch)?; @@ -166,7 +171,6 @@ impl BootSetup<'_> { let initrd_segment = initrd_segment.unwrap(); let store_evtchn = self.call.evtchn_alloc_unbound(self.domid, 0)?; - let console_evtchn = self.call.evtchn_alloc_unbound(self.domid, 0)?; let kernel_segment = kernel_segment.ok_or(Error::MemorySetupFailed("kernel_segment missing"))?; @@ -175,14 +179,13 @@ impl BootSetup<'_> { kernel_segment, start_info_segment, xenstore_segment, - console_segment, + consoles, boot_stack_segment, p2m_segment, page_table_segment, image_info, initrd_segment, store_evtchn, - console_evtchn, shared_info_frame: 0, }; debug!("initialize state={:?}", state); @@ -210,7 +213,8 @@ impl BootSetup<'_> { } fn gnttab_seed(&mut self, state: &mut BootState) -> Result<()> { - let console_gfn = self.phys.p2m[state.console_segment.pfn as usize]; + let console_gfn = + self.phys.p2m[state.consoles.first().map(|x| x.1.pfn).unwrap_or(0) as usize]; let xenstore_gfn = self.phys.p2m[state.xenstore_segment.pfn as usize]; let addr = self .call diff --git a/crates/xen/xenclient/src/lib.rs b/crates/xen/xenclient/src/lib.rs index 29e5f5c..72ac4a4 100644 --- a/crates/xen/xenclient/src/lib.rs +++ b/crates/xen/xenclient/src/lib.rs @@ -19,6 +19,7 @@ use crate::arm64::Arm64BootSetup; use crate::boot::BootSetup; use crate::elfloader::ElfImageLoader; use crate::error::{Error, Result}; +use boot::BootState; use log::{trace, warn}; use std::fs::read; @@ -67,7 +68,10 @@ pub struct DomainNetworkInterface<'a> { } #[derive(Debug)] -pub struct DomainConsole {} +pub struct DomainChannel { + pub typ: String, + pub initialized: bool, +} #[derive(Debug)] pub struct DomainEventChannel<'a> { @@ -84,7 +88,7 @@ pub struct DomainConfig<'a> { pub initrd_path: &'a str, pub cmdline: &'a str, pub disks: Vec>, - pub consoles: Vec, + pub channels: Vec, pub vifs: Vec>, pub filesystems: Vec>, pub event_channels: Vec>, @@ -92,6 +96,18 @@ pub struct DomainConfig<'a> { pub extra_rw_paths: Vec, } +#[derive(Debug)] +pub struct CreatedChannel { + pub ring_ref: u64, + pub evtchn: u32, +} + +#[derive(Debug)] +pub struct CreatedDomain { + pub domid: u32, + pub channels: Vec, +} + impl XenClient { pub async fn open(current_domid: u32) -> Result { let store = XsdClient::open().await?; @@ -99,7 +115,7 @@ impl XenClient { Ok(XenClient { store, call }) } - pub async fn create(&mut self, config: &DomainConfig<'_>) -> Result { + pub async fn create(&mut self, config: &DomainConfig<'_>) -> Result { let mut domain = CreateDomain { max_vcpus: config.max_vcpus, ..Default::default() @@ -111,7 +127,7 @@ impl XenClient { let domid = self.call.create_domain(domain)?; match self.init(domid, &domain, config).await { - Ok(_) => Ok(domid), + Ok(created) => Ok(created), Err(err) => { // ignore since destroying a domain is best // effort when an error occurs @@ -126,7 +142,7 @@ impl XenClient { domid: u32, domain: &CreateDomain, config: &DomainConfig<'_>, - ) -> Result<()> { + ) -> Result { trace!( "XenClient init domid={} domain={:?} config={:?}", domid, @@ -241,11 +257,11 @@ impl XenClient { self.call.set_max_mem(domid, config.mem_mb * 1024)?; let image_loader = ElfImageLoader::load_file_kernel(config.kernel_path)?; - let console_evtchn: u32; let xenstore_evtchn: u32; - let console_mfn: u64; let xenstore_mfn: u64; + let p2m: Vec; + let mut state: BootState; { let mut boot = BootSetup::new(&self.call, domid); #[cfg(target_arch = "x86_64")] @@ -253,18 +269,18 @@ impl XenClient { #[cfg(target_arch = "aarch64")] let mut arch = Arm64BootSetup::new(); let initrd = read(config.initrd_path)?; - let mut state = boot.initialize( + state = boot.initialize( &mut arch, &image_loader, initrd.as_slice(), config.max_vcpus, config.mem_mb, + 1 + config.channels.len(), )?; boot.boot(&mut arch, &mut state, config.cmdline)?; - console_evtchn = state.console_evtchn; xenstore_evtchn = state.store_evtchn; - console_mfn = boot.phys.p2m[state.console_segment.pfn as usize]; xenstore_mfn = boot.phys.p2m[state.xenstore_segment.pfn as usize]; + p2m = boot.phys.p2m; } { @@ -329,27 +345,38 @@ impl XenClient { return Err(Error::IntroduceDomainFailed); } self.console_device_add( + &DomainChannel { + typ: "xenconsoled".to_string(), + initialized: true, + }, + &p2m, + &state, &dom_path, &backend_dom_path, config.backend_domid, domid, 0, - Some(console_evtchn), - Some(console_mfn), ) .await?; - for (index, _) in config.consoles.iter().enumerate() { - self.console_device_add( - &dom_path, - &backend_dom_path, - config.backend_domid, - domid, - index + 1, - None, - None, - ) - .await?; + let mut channels: Vec = Vec::new(); + for (index, channel) in config.channels.iter().enumerate() { + let (Some(ring_ref), Some(evtchn)) = self + .console_device_add( + channel, + &p2m, + &state, + &dom_path, + &backend_dom_path, + config.backend_domid, + domid, + index + 1, + ) + .await? + else { + continue; + }; + channels.push(CreatedChannel { ring_ref, evtchn }); } for (index, disk) in config.disks.iter().enumerate() { @@ -402,7 +429,7 @@ impl XenClient { } self.call.unpause_domain(domid)?; - Ok(()) + Ok(CreatedDomain { domid, channels }) } async fn disk_device_add( @@ -460,18 +487,22 @@ impl XenClient { #[allow(clippy::too_many_arguments, clippy::unnecessary_unwrap)] async fn console_device_add( &mut self, + channel: &DomainChannel, + p2m: &[u64], + state: &BootState, dom_path: &str, backend_dom_path: &str, backend_domid: u32, domid: u32, index: usize, - port: Option, - ring: Option, - ) -> Result<()> { + ) -> Result<(Option, Option)> { + let console = state.consoles.get(index); + let port = console.map(|x| x.0); + let ring = console.map(|x| p2m[x.1.pfn as usize]); + let mut backend_entries = vec![ ("frontend-id", domid.to_string()), ("online", "1".to_string()), - ("state", "1".to_string()), ("protocol", "vt100".to_string()), ]; @@ -482,15 +513,14 @@ impl XenClient { ("tty", "".to_string()), ]; - if index == 0 { - frontend_entries.push(("type", "xenconsoled".to_string())); - } else { - frontend_entries.push(("type", "ioemu".to_string())); - backend_entries.push(("connection", "pty".to_string())); - backend_entries.push(("output", "pty".to_string())); - } + frontend_entries.push(("type", channel.typ.clone())); + backend_entries.push(("type", channel.typ.clone())); if port.is_some() && ring.is_some() { + if channel.typ != "xenconsoled" { + frontend_entries.push(("state", "1".to_string())); + } + frontend_entries.extend_from_slice(&[ ("port", port.unwrap().to_string()), ("ring-ref", ring.unwrap().to_string()), @@ -502,6 +532,12 @@ impl XenClient { ]); } + if channel.initialized { + backend_entries.push(("state", "4".to_string())); + } else { + backend_entries.push(("state", "1".to_string())); + } + self.device_add( "console", index as u64, @@ -513,7 +549,7 @@ impl XenClient { backend_entries, ) .await?; - Ok(()) + Ok((ring, port)) } async fn fs_9p_device_add( diff --git a/crates/xen/xenclient/src/x86.rs b/crates/xen/xenclient/src/x86.rs index 93fe98b..950fc2b 100644 --- a/crates/xen/xenclient/src/x86.rs +++ b/crates/xen/xenclient/src/x86.rs @@ -442,8 +442,9 @@ impl ArchBootSetup for X86BootSetup { (*info).flags = 0; (*info).store_evtchn = state.store_evtchn; (*info).store_mfn = setup.phys.p2m[state.xenstore_segment.pfn as usize]; - (*info).console.mfn = setup.phys.p2m[state.console_segment.pfn as usize]; - (*info).console.evtchn = state.console_evtchn; + let console = state.consoles.first().unwrap(); + (*info).console.mfn = setup.phys.p2m[console.1.pfn as usize]; + (*info).console.evtchn = console.0; (*info).mod_start = state.initrd_segment.vstart; (*info).mod_len = state.initrd_segment.size; for (i, c) in cmdline.chars().enumerate() { diff --git a/crates/xen/xenevtchn/Cargo.toml b/crates/xen/xenevtchn/Cargo.toml index 72fa98a..a7f2fc9 100644 --- a/crates/xen/xenevtchn/Cargo.toml +++ b/crates/xen/xenevtchn/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" resolver = "2" [dependencies] +libc = { workspace = true } log = { workspace = true } thiserror = { workspace = true } nix = { workspace = true, features = ["ioctl"] } diff --git a/crates/xen/xenevtchn/src/error.rs b/crates/xen/xenevtchn/src/error.rs index 3db25c7..5ece1ad 100644 --- a/crates/xen/xenevtchn/src/error.rs +++ b/crates/xen/xenevtchn/src/error.rs @@ -2,11 +2,11 @@ use std::io; #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("kernel error")] + #[error("kernel error: {0}")] Kernel(#[from] nix::errno::Errno), - #[error("io issue encountered")] + #[error("io issue encountered: {0}")] Io(#[from] io::Error), - #[error("failed to send event channel wake")] + #[error("failed to send event channel wake: {0}")] WakeSend(tokio::sync::broadcast::error::SendError), } diff --git a/crates/xen/xenevtchn/src/lib.rs b/crates/xen/xenevtchn/src/lib.rs index cc670a7..0fb78c6 100644 --- a/crates/xen/xenevtchn/src/lib.rs +++ b/crates/xen/xenevtchn/src/lib.rs @@ -7,10 +7,12 @@ use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort} use log::error; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::mem::size_of; use std::os::fd::AsRawFd; +use std::os::raw::c_void; use std::sync::Arc; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncReadExt; use tokio::select; use tokio::sync::broadcast::{ channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender, @@ -19,6 +21,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; +const UNBIND_CHANNEL_QUEUE_LEN: usize = 30; const UNMASK_CHANNEL_QUEUE_LEN: usize = 30; const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30; @@ -28,10 +31,24 @@ type WakeMap = Arc>>>; pub struct EventChannel { handle: Arc>, wakes: WakeMap, + unbind_sender: Sender, unmask_sender: Sender, task: Arc>, } +pub struct BoundEventChannel { + pub local_port: u32, + pub receiver: BroadcastReceiver, + unbind_sender: Sender, + pub unmask_sender: Sender, +} + +impl Drop for BoundEventChannel { + fn drop(&mut self) { + let _ = self.unbind_sender.try_send(self.local_port); + } +} + impl EventChannel { pub async fn open() -> Result { let file = OpenOptions::new() @@ -41,12 +58,15 @@ impl EventChannel { .await?; let wakes = Arc::new(Mutex::new(HashMap::new())); + let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN); let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN); let task = { let file = file.try_clone().await?; let wakes = wakes.clone(); tokio::task::spawn(async move { - if let Err(error) = EventChannel::process(file, wakes, unmask_receiver).await { + if let Err(error) = + EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await + { error!("event channel processor failed: {}", error); } }) @@ -54,6 +74,7 @@ impl EventChannel { Ok(EventChannel { handle: Arc::new(Mutex::new(file)), wakes, + unbind_sender, unmask_sender, task: Arc::new(task), }) @@ -109,6 +130,18 @@ impl EventChannel { unsafe { Ok(sys::reset(handle.as_raw_fd())? as u32) } } + pub async fn bind(&self, domid: u32, port: u32) -> Result { + let local_port = self.bind_interdomain(domid, port).await?; + let (receiver, unmask_sender) = self.subscribe(local_port).await?; + let bound = BoundEventChannel { + local_port, + receiver, + unbind_sender: self.unbind_sender.clone(), + unmask_sender, + }; + Ok(bound) + } + pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver, Sender)> { let mut wakes = self.wakes.lock().await; let receiver = match wakes.entry(port) { @@ -127,6 +160,7 @@ impl EventChannel { mut file: File, wakers: WakeMap, mut unmask_receiver: Receiver, + mut unbind_receiver: Receiver, ) -> Result<()> { loop { select! { @@ -147,7 +181,28 @@ impl EventChannel { result = unmask_receiver.recv() => { match result { Some(port) => { - file.write_u32_le(port).await?; + unsafe { + let mut port = port; + let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::()); + if result != size_of::() as isize { + return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32))); + } + } + } + + None => { + break; + } + } + } + + result = unbind_receiver.recv() => { + match result { + Some(port) => { + unsafe { + let mut request = UnbindPort { port }; + sys::unbind(file.as_raw_fd(), &mut request)?; + } } None => { diff --git a/crates/xen/xengnt/src/lib.rs b/crates/xen/xengnt/src/lib.rs index 7998349..5ae66b5 100644 --- a/crates/xen/xengnt/src/lib.rs +++ b/crates/xen/xengnt/src/lib.rs @@ -14,6 +14,7 @@ use sys::{ use libc::{mmap, munmap, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE}; +#[derive(Clone)] pub struct GrantDevice { handle: Arc, } @@ -142,6 +143,7 @@ impl GrantAlloc { } } +#[derive(Clone)] pub struct GrantTab { device: GrantDevice, }