krata: fix guest destruction

This commit is contained in:
Alex Zenla
2024-03-30 03:49:13 +00:00
parent d659b3aa55
commit da9e6cac14
12 changed files with 103 additions and 122 deletions

View File

@ -6,12 +6,16 @@ option java_multiple_files = true;
option java_package = "dev.krata.proto.internal.idm"; option java_package = "dev.krata.proto.internal.idm";
option java_outer_classname = "IdmProto"; option java_outer_classname = "IdmProto";
message IdmExitMessage { message IdmExitEvent {
int32 code = 1; int32 code = 1;
} }
message IdmEvent {
oneof event {
IdmExitEvent exit = 1;
}
}
message IdmPacket { message IdmPacket {
oneof message { IdmEvent event = 1;
IdmExitMessage exit = 1;
}
} }

View File

@ -4,7 +4,7 @@ use cli_tables::Table;
use krata::{ use krata::{
events::EventStream, events::EventStream,
v1::{ v1::{
common::{guest_image_spec::Image, Guest}, common::{guest_image_spec::Image, Guest, GuestStatus},
control::{ control::{
control_service_client::ControlServiceClient, ListGuestsRequest, ResolveGuestRequest, control_service_client::ControlServiceClient, ListGuestsRequest, ResolveGuestRequest,
}, },
@ -14,7 +14,7 @@ use krata::{
use serde_json::Value; use serde_json::Value;
use tonic::{transport::Channel, Request}; use tonic::{transport::Channel, Request};
use crate::format::{guest_state_text, kv2line, proto2dynamic, proto2kv}; use crate::format::{guest_state_text, guest_status_text, kv2line, proto2dynamic, proto2kv};
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)] #[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum ListFormat { enum ListFormat {
@ -24,6 +24,7 @@ enum ListFormat {
Jsonl, Jsonl,
Yaml, Yaml,
KeyValue, KeyValue,
Simple,
} }
#[derive(Parser)] #[derive(Parser)]
@ -40,7 +41,7 @@ impl ListCommand {
mut client: ControlServiceClient<Channel>, mut client: ControlServiceClient<Channel>,
_events: EventStream, _events: EventStream,
) -> Result<()> { ) -> Result<()> {
let guests = if let Some(ref guest) = self.guest { let mut guests = if let Some(ref guest) = self.guest {
let reply = client let reply = client
.resolve_guest(Request::new(ResolveGuestRequest { .resolve_guest(Request::new(ResolveGuestRequest {
name: guest.clone(), name: guest.clone(),
@ -60,11 +61,36 @@ impl ListCommand {
.guests .guests
}; };
guests.sort_by(|a, b| {
a.spec
.as_ref()
.map(|x| x.name.as_str())
.unwrap_or("")
.cmp(b.spec.as_ref().map(|x| x.name.as_str()).unwrap_or(""))
});
match self.format { match self.format {
ListFormat::CliTable => { ListFormat::CliTable => {
self.print_guest_table(guests)?; self.print_guest_table(guests)?;
} }
ListFormat::Simple => {
for guest in guests {
let state = guest_status_text(
guest
.state
.as_ref()
.map(|x| x.status())
.unwrap_or(GuestStatus::Unknown),
);
let name = guest.spec.as_ref().map(|x| x.name.as_str()).unwrap_or("");
let network = guest.state.as_ref().and_then(|x| x.network.as_ref());
let ipv4 = network.map(|x| x.guest_ipv4.as_str()).unwrap_or("");
let ipv6 = network.map(|x| x.guest_ipv6.as_str()).unwrap_or("");
println!("{}\t{}\t{}\t{}\t{}", guest.id, state, name, ipv4, ipv6);
}
}
ListFormat::Json | ListFormat::JsonPretty | ListFormat::Yaml => { ListFormat::Json | ListFormat::JsonPretty | ListFormat::Yaml => {
let mut values = Vec::new(); let mut values = Vec::new();
for guest in guests { for guest in guests {

View File

@ -6,7 +6,7 @@ use std::{
use anyhow::Result; use anyhow::Result;
use krata::{ use krata::{
idm::protocol::{idm_packet::Message, IdmPacket}, idm::protocol::{idm_event::Event, IdmPacket},
v1::common::{GuestExitInfo, GuestState, GuestStatus}, v1::common::{GuestExitInfo, GuestState, GuestStatus},
}; };
use log::error; use log::error;
@ -117,7 +117,7 @@ impl DaemonEventGenerator {
} }
async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> { async fn handle_idm_packet(&mut self, id: Uuid, packet: IdmPacket) -> Result<()> {
if let Some(Message::Exit(exit)) = packet.message { if let Some(Event::Exit(exit)) = packet.event.and_then(|x| x.event) {
self.handle_exit_code(id, exit.code).await?; self.handle_exit_code(id, exit.code).await?;
} }
Ok(()) Ok(())

View File

@ -5,7 +5,7 @@ use crate::{
use anyhow::Result; use anyhow::Result;
use krata::idm::{ use krata::idm::{
client::IdmClient, client::IdmClient,
protocol::{idm_packet::Message, IdmExitMessage, IdmPacket}, protocol::{idm_event::Event, IdmEvent, IdmExitEvent, IdmPacket},
}; };
use log::error; use log::error;
use nix::unistd::Pid; use nix::unistd::Pid;
@ -56,7 +56,9 @@ impl GuestBackground {
self.idm self.idm
.sender .sender
.send(IdmPacket { .send(IdmPacket {
message: Some(Message::Exit(IdmExitMessage { code: event.status })), event: Some(IdmEvent {
event: Some(Event::Exit(IdmExitEvent { code: event.status })),
}),
}) })
.await?; .await?;
death(event.status).await?; death(event.status).await?;

View File

@ -153,7 +153,7 @@ impl NetworkBackend {
pub async fn launch(self) -> Result<JoinHandle<()>> { pub async fn launch(self) -> Result<JoinHandle<()>> {
Ok(tokio::task::spawn(async move { Ok(tokio::task::spawn(async move {
info!( info!(
"lauched network backend for krata guest {}", "launched network backend for krata guest {}",
self.metadata.uuid self.metadata.uuid
); );
if let Err(error) = self.run().await { if let Err(error) = self.run().await {

View File

@ -291,6 +291,12 @@ impl AsyncRawSocketChannel {
debug!("failed to transmit: would block"); debug!("failed to transmit: would block");
continue; continue;
} }
// device no longer exists
if error.raw_os_error() == Some(6) {
break;
}
return Err(anyhow!( return Err(anyhow!(
"failed to write {} bytes to raw socket: {}", "failed to write {} bytes to raw socket: {}",
packet.len(), packet.len(),

View File

@ -79,10 +79,11 @@ impl ChannelService {
async fn process(&mut self) -> Result<()> { async fn process(&mut self) -> Result<()> {
self.scan_all_backends().await?; self.scan_all_backends().await?;
let mut watch_handle = self.store.create_watch().await?; let mut watch_handle = self
self.store .store
.bind_watch(&watch_handle, "/local/domain/0/backend/console".to_string()) .create_watch("/local/domain/0/backend/console")
.await?; .await?;
self.store.bind_watch(&watch_handle).await?;
loop { loop {
select! { select! {
x = watch_handle.receiver.recv() => match x { x = watch_handle.receiver.recv() => match x {
@ -310,10 +311,11 @@ impl KrataChannelBackendProcessor {
mut receiver: Receiver<Vec<u8>>, mut receiver: Receiver<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
self.init().await?; self.init().await?;
let mut frontend_state_change = self.store.create_watch().await?; let mut frontend_state_change = self
self.store .store
.bind_watch(&frontend_state_change, format!("{}/state", self.frontend)) .create_watch(format!("{}/state", self.frontend))
.await?; .await?;
self.store.bind_watch(&frontend_state_change).await?;
let (ring_ref, port) = loop { let (ring_ref, port) = loop {
match frontend_state_change.receiver.recv().await { match frontend_state_change.receiver.recv().await {
@ -382,10 +384,11 @@ impl KrataChannelBackendProcessor {
} }
}; };
let mut self_state_change = self.store.create_watch().await?; let mut self_state_change = self
self.store .store
.bind_watch(&self_state_change, format!("{}/state", self.backend)) .create_watch(format!("{}/state", self.backend))
.await?; .await?;
self.store.bind_watch(&self_state_change).await?;
loop { loop {
select! { select! {
x = self_state_change.receiver.recv() => match x { x = self_state_change.receiver.recv() => match x {

View File

@ -7,15 +7,11 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use log::error;
use loopdev::LoopControl; use loopdev::LoopControl;
use tokio::{ use tokio::sync::Mutex;
sync::{mpsc::Sender, Mutex},
task::JoinHandle,
};
use uuid::Uuid; use uuid::Uuid;
use xenclient::XenClient; use xenclient::XenClient;
use xenstore::{XsdClient, XsdInterface, XsdWatchHandle}; use xenstore::{XsdClient, XsdInterface};
use self::{ use self::{
autoloop::AutoLoop, autoloop::AutoLoop,
@ -30,7 +26,7 @@ pub mod channel;
pub mod console; pub mod console;
pub mod launch; pub mod launch;
pub struct ContainerLoopInfo { pub struct GuestLoopInfo {
pub device: String, pub device: String,
pub file: String, pub file: String,
pub delete: Option<String>, pub delete: Option<String>,
@ -45,7 +41,7 @@ pub struct GuestInfo {
pub uuid: Uuid, pub uuid: Uuid,
pub domid: u32, pub domid: u32,
pub image: String, pub image: String,
pub loops: Vec<ContainerLoopInfo>, pub loops: Vec<GuestLoopInfo>,
pub guest_ipv4: Option<IpNetwork>, pub guest_ipv4: Option<IpNetwork>,
pub guest_ipv6: Option<IpNetwork>, pub guest_ipv6: Option<IpNetwork>,
pub guest_mac: Option<String>, pub guest_mac: Option<String>,
@ -231,7 +227,7 @@ impl RuntimeContext {
Ok(None) Ok(None)
} }
fn parse_loop_set(input: &Option<String>) -> Vec<ContainerLoopInfo> { fn parse_loop_set(input: &Option<String>) -> Vec<GuestLoopInfo> {
let Some(input) = input else { let Some(input) = input else {
return Vec::new(); return Vec::new();
}; };
@ -242,7 +238,7 @@ impl RuntimeContext {
.map(|x| (x[0].clone(), x[1].clone(), x[2].clone())) .map(|x| (x[0].clone(), x[1].clone(), x[2].clone()))
.collect::<Vec<(String, String, String)>>(); .collect::<Vec<(String, String, String)>>();
sets.iter() sets.iter()
.map(|(device, file, delete)| ContainerLoopInfo { .map(|(device, file, delete)| GuestLoopInfo {
device: device.clone(), device: device.clone(),
file: file.clone(), file: file.clone(),
delete: if delete == "none" { delete: if delete == "none" {
@ -251,7 +247,7 @@ impl RuntimeContext {
Some(delete.clone()) Some(delete.clone())
}, },
}) })
.collect::<Vec<ContainerLoopInfo>>() .collect::<Vec<GuestLoopInfo>>()
} }
} }
@ -276,29 +272,6 @@ impl Runtime {
launcher.launch(&mut context, request).await launcher.launch(&mut context, request).await
} }
pub async fn subscribe_exit_code(
&self,
uuid: Uuid,
sender: Sender<(Uuid, i32)>,
) -> Result<JoinHandle<()>> {
let mut context = self.context.lock().await;
let info = context
.resolve(uuid)
.await?
.ok_or_else(|| anyhow!("unable to resolve guest: {}", uuid))?;
let path = format!("/local/domain/{}/krata/guest/exit-code", info.domid);
let handle = context.xen.store.create_watch().await?;
context.xen.store.bind_watch(&handle, &path).await?;
let watch = ExitCodeWatch {
handle,
sender,
store: context.xen.store.clone(),
uuid,
path,
};
watch.launch().await
}
pub async fn destroy(&self, uuid: Uuid) -> Result<Uuid> { pub async fn destroy(&self, uuid: Uuid) -> Result<Uuid> {
let mut context = self.context.lock().await; let mut context = self.context.lock().await;
let info = context let info = context
@ -372,44 +345,3 @@ fn path_as_string(path: &Path) -> Result<String> {
.ok_or_else(|| anyhow!("unable to convert path to string")) .ok_or_else(|| anyhow!("unable to convert path to string"))
.map(|x| x.to_string()) .map(|x| x.to_string())
} }
struct ExitCodeWatch {
store: XsdClient,
handle: XsdWatchHandle,
uuid: Uuid,
sender: Sender<(Uuid, i32)>,
path: String,
}
impl ExitCodeWatch {
pub async fn launch(mut self) -> Result<JoinHandle<()>> {
Ok(tokio::task::spawn(async move {
if let Err(error) = self.process().await {
error!("failed to watch exit for guest {}: {}", self.uuid, error);
}
}))
}
async fn process(&mut self) -> Result<()> {
loop {
match self.handle.receiver.recv().await {
Some(_) => {
let exit_code_string = self.store.read_string(&self.path).await?;
if let Some(exit_code) = exit_code_string.and_then(|x| i32::from_str(&x).ok()) {
match self.sender.try_send((self.uuid, exit_code)) {
Ok(_) => {}
Err(error) => {
return Err(error.into());
}
}
return Ok(());
}
}
None => {
return Ok(());
}
}
}
}
}

View File

@ -20,12 +20,12 @@ use crate::boot::BootSetup;
use crate::elfloader::ElfImageLoader; use crate::elfloader::ElfImageLoader;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use boot::BootState; use boot::BootState;
use log::{trace, warn}; use log::{debug, trace, warn};
use tokio::time::timeout;
use std::fs::read; use std::fs::read;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::thread;
use std::time::Duration; use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
use xencall::sys::{CreateDomain, XEN_DOMCTL_CDF_HAP, XEN_DOMCTL_CDF_HVM_GUEST}; use xencall::sys::{CreateDomain, XEN_DOMCTL_CDF_HAP, XEN_DOMCTL_CDF_HVM_GUEST};
@ -759,6 +759,7 @@ impl XenClient {
for backend in &backend_paths { for backend in &backend_paths {
let state_path = format!("{}/state", backend); let state_path = format!("{}/state", backend);
let mut watch = self.store.create_watch(&state_path).await?;
let online_path = format!("{}/online", backend); let online_path = format!("{}/online", backend);
let tx = self.store.transaction().await?; let tx = self.store.transaction().await?;
let state = tx.read_string(&state_path).await?.unwrap_or(String::new()); let state = tx.read_string(&state_path).await?.unwrap_or(String::new());
@ -769,22 +770,25 @@ impl XenClient {
if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 { if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 {
tx.write_string(&state_path, "5").await?; tx.write_string(&state_path, "5").await?;
} }
self.store.bind_watch(&watch).await?;
tx.commit().await?; tx.commit().await?;
let mut count: u32 = 0; let mut count: u32 = 0;
loop { loop {
if count >= 100 { if count >= 3 {
warn!("unable to safely destroy backend: {}", backend); debug!("unable to safely destroy backend: {}", backend);
break; break;
} }
let Some(state) = self.store.read_string(&state_path).await? else { let _ = timeout(Duration::from_secs(1), watch.receiver.recv()).await;
break; let state = self
}; .store
.read_string(&state_path)
.await?
.unwrap_or_else(|| "6".to_string());
let state = i64::from_str(&state).unwrap_or(-1); let state = i64::from_str(&state).unwrap_or(-1);
if state == 6 { if state == 6 {
break; break;
} }
thread::sleep(Duration::from_millis(100));
count += 1; count += 1;
} }
} }
@ -818,7 +822,7 @@ impl XenClient {
if tty.is_some() { if tty.is_some() {
break; break;
} }
thread::sleep(Duration::from_millis(200)); tokio::time::sleep(Duration::from_millis(200)).await;
} }
let Some(tty) = tty else { let Some(tty) = tty else {
return Err(Error::TtyNotFound); return Err(Error::TtyNotFound);

View File

@ -7,8 +7,8 @@ async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let path = args().nth(1).unwrap_or("/local/domain".to_string()); let path = args().nth(1).unwrap_or("/local/domain".to_string());
let client = XsdClient::open().await?; let client = XsdClient::open().await?;
let mut handle = client.create_watch().await?; let mut handle = client.create_watch(path).await?;
client.bind_watch(&handle, path).await?; client.bind_watch(&handle).await?;
let mut count = 0; let mut count = 0;
loop { loop {
let Some(event) = handle.receiver.recv().await else { let Some(event) = handle.receiver.recv().await else {

View File

@ -65,7 +65,7 @@ pub struct XsdSocket {
next_watch_id: Arc<Mutex<u32>>, next_watch_id: Arc<Mutex<u32>>,
processor_task: Arc<JoinHandle<()>>, processor_task: Arc<JoinHandle<()>>,
rx_task: Arc<JoinHandle<()>>, rx_task: Arc<JoinHandle<()>>,
unwatch_sender: Sender<u32>, unwatch_sender: Sender<(u32, String)>,
} }
impl XsdSocket { impl XsdSocket {
@ -100,7 +100,7 @@ impl XsdSocket {
let (rx_sender, rx_receiver) = channel::<XsdMessage>(10); let (rx_sender, rx_receiver) = channel::<XsdMessage>(10);
let (tx_sender, tx_receiver) = channel::<XsdMessage>(10); let (tx_sender, tx_receiver) = channel::<XsdMessage>(10);
let (unwatch_sender, unwatch_receiver) = channel::<u32>(1000); let (unwatch_sender, unwatch_receiver) = channel::<(u32, String)>(1000);
let read: File = handle.try_clone().await?; let read: File = handle.try_clone().await?;
let mut processor = XsdSocketProcessor { let mut processor = XsdSocketProcessor {
@ -141,7 +141,7 @@ impl XsdSocket {
let req = { let req = {
let mut guard = self.next_request_id.lock().await; let mut guard = self.next_request_id.lock().await;
let req = *guard; let req = *guard;
*guard = req + 1; *guard = req.wrapping_add(1);
req req
}; };
let (sender, receiver) = oneshot_channel::<XsdMessage>(); let (sender, receiver) = oneshot_channel::<XsdMessage>();
@ -177,12 +177,12 @@ impl XsdSocket {
self.send_buf(tx, typ, &buf).await self.send_buf(tx, typ, &buf).await
} }
pub async fn add_watch(&self) -> Result<(u32, Receiver<String>, Sender<u32>)> { pub async fn add_watch(&self) -> Result<(u32, Receiver<String>, Sender<(u32, String)>)> {
let id = { let id = {
let mut guard = self.next_watch_id.lock().await; let mut guard = self.next_watch_id.lock().await;
let req = *guard; let watch = *guard;
*guard = req + 1; *guard = watch.wrapping_add(1);
req watch
}; };
let (sender, receiver) = channel(10); let (sender, receiver) = channel(10);
self.watches.lock().await.insert(id, WatchState { sender }); self.watches.lock().await.insert(id, WatchState { sender });
@ -197,7 +197,7 @@ struct XsdSocketProcessor {
next_request_id: Arc<Mutex<u32>>, next_request_id: Arc<Mutex<u32>>,
tx_receiver: Receiver<XsdMessage>, tx_receiver: Receiver<XsdMessage>,
rx_receiver: Receiver<XsdMessage>, rx_receiver: Receiver<XsdMessage>,
unwatch_receiver: Receiver<u32>, unwatch_receiver: Receiver<(u32, String)>,
} }
impl XsdSocketProcessor { impl XsdSocketProcessor {
@ -326,16 +326,18 @@ impl XsdSocketProcessor {
}, },
x = self.unwatch_receiver.recv() => match x { x = self.unwatch_receiver.recv() => match x {
Some(id) => { Some((id, path)) => {
let req = { let req = {
let mut guard = self.next_request_id.lock().await; let mut guard = self.next_request_id.lock().await;
let req = *guard; let req = *guard;
*guard = req + 1; *guard = req.wrapping_add(1);
req req
}; };
let mut payload = id.to_string().as_bytes().to_vec(); let mut payload = id.to_string().as_bytes().to_vec();
payload.push(0); payload.push(0);
payload.extend_from_slice(path.to_string().as_bytes());
payload.push(0);
let header = XsdMessageHeader { let header = XsdMessageHeader {
typ: XSD_UNWATCH, typ: XSD_UNWATCH,
req, req,

View File

@ -43,14 +43,15 @@ impl XsPermission {
} }
pub struct XsdWatchHandle { pub struct XsdWatchHandle {
pub path: String,
pub id: u32, pub id: u32,
unwatch_sender: Sender<u32>, unwatch_sender: Sender<(u32, String)>,
pub receiver: Receiver<String>, pub receiver: Receiver<String>,
} }
impl Drop for XsdWatchHandle { impl Drop for XsdWatchHandle {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.unwatch_sender.try_send(self.id); let _ = self.unwatch_sender.try_send((self.id, self.path.clone()));
} }
} }
@ -192,17 +193,18 @@ impl XsdClient {
response.parse_bool() response.parse_bool()
} }
pub async fn create_watch(&self) -> Result<XsdWatchHandle> { pub async fn create_watch<P: AsRef<str>>(&self, path: P) -> Result<XsdWatchHandle> {
let (id, receiver, unwatch_sender) = self.socket.add_watch().await?; let (id, receiver, unwatch_sender) = self.socket.add_watch().await?;
Ok(XsdWatchHandle { Ok(XsdWatchHandle {
path: path.as_ref().to_string(),
id, id,
receiver, receiver,
unwatch_sender, unwatch_sender,
}) })
} }
pub async fn bind_watch<P: AsRef<str>>(&self, handle: &XsdWatchHandle, path: P) -> Result<()> { pub async fn bind_watch(&self, handle: &XsdWatchHandle) -> Result<()> {
self.bind_watch_id(handle.id, path).await self.bind_watch_id(handle.id, &handle.path).await
} }
pub async fn bind_watch_id<P: AsRef<str>>(&self, id: u32, path: P) -> Result<()> { pub async fn bind_watch_id<P: AsRef<str>>(&self, id: u32, path: P) -> Result<()> {