mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 05:10:55 +00:00
krata: improvements to event handling during reconciliation
This commit is contained in:
@ -11,9 +11,8 @@ use krata::{
|
||||
WatchEventsRequest,
|
||||
},
|
||||
};
|
||||
use kratactl::{client::ControlClientProvider, console::StdioConsoleStream};
|
||||
use kratactl::{client::ControlClientProvider, console::StdioConsoleStream, events::EventStream};
|
||||
use log::error;
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::Request;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
@ -62,10 +61,13 @@ async fn main() -> Result<()> {
|
||||
|
||||
let args = ControllerArgs::parse();
|
||||
let mut client = ControlClientProvider::dial(args.connection.parse()?).await?;
|
||||
let mut events = client
|
||||
.watch_events(WatchEventsRequest {})
|
||||
.await?
|
||||
.into_inner();
|
||||
let events = EventStream::open(
|
||||
client
|
||||
.watch_events(WatchEventsRequest {})
|
||||
.await?
|
||||
.into_inner(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
match args.command {
|
||||
Commands::Launch {
|
||||
@ -95,41 +97,7 @@ async fn main() -> Result<()> {
|
||||
.into_inner();
|
||||
let id = response.guest_id;
|
||||
if attach {
|
||||
while let Some(event) = events.next().await {
|
||||
let reply = event?;
|
||||
match reply.event {
|
||||
Some(Event::GuestChanged(changed)) => {
|
||||
let Some(guest) = changed.guest else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if guest.id != id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some(ref error) = state.error_info {
|
||||
error!("guest error: {}", error.message);
|
||||
}
|
||||
|
||||
if state.status() == GuestStatus::Destroyed {
|
||||
error!("guest destroyed");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
if state.status() == GuestStatus::Started {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
wait_guest_started(&id, events.clone()).await?;
|
||||
let input = StdioConsoleStream::stdin_stream(id.clone()).await;
|
||||
let output = client.console_data(input).await?.into_inner();
|
||||
let exit_hook_task =
|
||||
@ -208,19 +176,17 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
Commands::Watch {} => {
|
||||
let response = client
|
||||
.watch_events(Request::new(WatchEventsRequest {}))
|
||||
.await?;
|
||||
let mut stream = response.into_inner();
|
||||
while let Some(reply) = stream.message().await? {
|
||||
let Some(event) = reply.event else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut stream = events.subscribe();
|
||||
loop {
|
||||
let event = stream.recv().await?;
|
||||
match event {
|
||||
Event::GuestChanged(changed) => {
|
||||
if let Some(guest) = changed.guest {
|
||||
println!("event=guest.changed guest={}", guest.id);
|
||||
println!(
|
||||
"event=guest.changed guest={} status={}",
|
||||
guest.id,
|
||||
guest_status_text(guest.state.unwrap_or_default().status())
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -232,7 +198,7 @@ async fn main() -> Result<()> {
|
||||
|
||||
fn guest_status_text(status: GuestStatus) -> String {
|
||||
match status {
|
||||
GuestStatus::Unknown => "unknown",
|
||||
GuestStatus::Destroy => "destroying",
|
||||
GuestStatus::Destroyed => "destroyed",
|
||||
GuestStatus::Start => "starting",
|
||||
GuestStatus::Exited => "exited",
|
||||
@ -254,3 +220,38 @@ fn guest_state_text(state: GuestState) -> String {
|
||||
}
|
||||
text
|
||||
}
|
||||
|
||||
async fn wait_guest_started(id: &str, events: EventStream) -> Result<()> {
|
||||
let mut stream = events.subscribe();
|
||||
while let Ok(event) = stream.recv().await {
|
||||
match event {
|
||||
Event::GuestChanged(changed) => {
|
||||
let Some(guest) = changed.guest else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if guest.id != id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some(ref error) = state.error_info {
|
||||
error!("guest error: {}", error.message);
|
||||
}
|
||||
|
||||
if state.status() == GuestStatus::Destroyed {
|
||||
error!("guest destroyed");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
if state.status() == GuestStatus::Started {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -7,9 +7,9 @@ use anyhow::Result;
|
||||
use async_stream::stream;
|
||||
use krata::{
|
||||
common::GuestStatus,
|
||||
control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest, WatchEventsReply},
|
||||
control::{watch_events_reply::Event, ConsoleDataReply, ConsoleDataRequest},
|
||||
};
|
||||
use log::{debug, error, warn};
|
||||
use log::{debug, warn};
|
||||
use termion::raw::IntoRawMode;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
@ -19,6 +19,8 @@ use tokio::{
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use tonic::Streaming;
|
||||
|
||||
use crate::events::EventStream;
|
||||
|
||||
pub struct StdioConsoleStream;
|
||||
|
||||
impl StdioConsoleStream {
|
||||
@ -59,46 +61,31 @@ impl StdioConsoleStream {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn guest_exit_hook(
|
||||
id: String,
|
||||
mut events: Streaming<WatchEventsReply>,
|
||||
) -> Result<JoinHandle<()>> {
|
||||
pub async fn guest_exit_hook(id: String, events: EventStream) -> Result<JoinHandle<()>> {
|
||||
Ok(tokio::task::spawn(async move {
|
||||
while let Some(result) = events.next().await {
|
||||
match result {
|
||||
Err(error) => {
|
||||
error!("failed to handle events for exit hook: {}", error);
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(reply) => {
|
||||
let Some(event) = reply.event else {
|
||||
let mut stream = events.subscribe();
|
||||
while let Ok(event) = stream.recv().await {
|
||||
match event {
|
||||
Event::GuestChanged(changed) => {
|
||||
let Some(guest) = changed.guest else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match event {
|
||||
Event::GuestChanged(changed) => {
|
||||
let Some(guest) = changed.guest else {
|
||||
continue;
|
||||
};
|
||||
let Some(state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(state) = guest.state else {
|
||||
continue;
|
||||
};
|
||||
if guest.id != id {
|
||||
continue;
|
||||
}
|
||||
|
||||
if guest.id != id {
|
||||
continue;
|
||||
}
|
||||
if let Some(exit_info) = state.exit_info {
|
||||
std::process::exit(exit_info.code);
|
||||
}
|
||||
|
||||
if let Some(exit_info) = state.exit_info {
|
||||
std::process::exit(exit_info.code);
|
||||
}
|
||||
|
||||
if state.status() == GuestStatus::Destroyed {
|
||||
warn!("attached guest was destroyed");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
if state.status() == GuestStatus::Destroy {
|
||||
warn!("attached guest was destroyed");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
57
crates/kratactl/src/events.rs
Normal file
57
crates/kratactl/src/events.rs
Normal file
@ -0,0 +1,57 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use krata::control::{watch_events_reply::Event, WatchEventsReply};
|
||||
use log::trace;
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::Streaming;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EventStream {
|
||||
sender: Arc<broadcast::Sender<Event>>,
|
||||
task: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl EventStream {
|
||||
pub async fn open(mut events: Streaming<WatchEventsReply>) -> Result<Self> {
|
||||
let (sender, _) = broadcast::channel(1000);
|
||||
let emit = sender.clone();
|
||||
let task = tokio::task::spawn(async move {
|
||||
loop {
|
||||
let Some(result) = events.next().await else {
|
||||
break;
|
||||
};
|
||||
|
||||
let reply = match result {
|
||||
Ok(reply) => reply,
|
||||
Err(error) => {
|
||||
trace!("event stream processing failed: {}", error);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(event) = reply.event else {
|
||||
continue;
|
||||
};
|
||||
let _ = emit.send(event);
|
||||
}
|
||||
});
|
||||
Ok(Self {
|
||||
sender: Arc::new(sender),
|
||||
task: Arc::new(task),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
|
||||
self.sender.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EventStream {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.task) <= 1 {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,2 +1,3 @@
|
||||
pub mod client;
|
||||
pub mod console;
|
||||
pub mod events;
|
||||
|
Reference in New Issue
Block a user