krata: implement event stream retries

This commit is contained in:
Alex Zenla
2024-03-31 01:11:50 +00:00
parent 6d6bdade87
commit 15d5ed5a45
8 changed files with 103 additions and 72 deletions

View File

@ -1,11 +1,14 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use crate::v1::control::{watch_events_reply::Event, WatchEventsReply};
use crate::v1::control::{
control_service_client::ControlServiceClient, watch_events_reply::Event, WatchEventsReply,
WatchEventsRequest,
};
use anyhow::Result;
use log::trace;
use tokio::{sync::broadcast, task::JoinHandle};
use log::{error, trace, warn};
use tokio::{sync::broadcast, task::JoinHandle, time::sleep};
use tokio_stream::StreamExt;
use tonic::Streaming;
use tonic::{transport::Channel, Streaming};
#[derive(Clone)]
pub struct EventStream {
@ -14,27 +17,12 @@ pub struct EventStream {
}
impl EventStream {
pub async fn open(mut events: Streaming<WatchEventsReply>) -> Result<Self> {
pub async fn open(client: ControlServiceClient<Channel>) -> Result<Self> {
let (sender, _) = broadcast::channel(1000);
let emit = sender.clone();
let task = tokio::task::spawn(async move {
loop {
let Some(result) = events.next().await else {
break;
};
let reply = match result {
Ok(reply) => reply,
Err(error) => {
trace!("event stream processing failed: {}", error);
break;
}
};
let Some(event) = reply.event else {
continue;
};
let _ = emit.send(event);
if let Err(error) = EventStream::process(client, emit).await {
error!("failed to process event stream: {}", error);
}
});
Ok(Self {
@ -43,6 +31,48 @@ impl EventStream {
})
}
async fn process(
mut client: ControlServiceClient<Channel>,
emit: broadcast::Sender<Event>,
) -> Result<()> {
let mut events: Option<Streaming<WatchEventsReply>> = None;
loop {
let mut stream = match events {
Some(stream) => stream,
None => {
let result = client.watch_events(WatchEventsRequest {}).await;
if let Err(error) = result {
warn!("failed to watch events: {}", error);
sleep(Duration::from_secs(1)).await;
continue;
}
result.unwrap().into_inner()
}
};
let Some(result) = stream.next().await else {
events = None;
continue;
};
let reply = match result {
Ok(reply) => reply,
Err(error) => {
trace!("event stream processing failed: {}", error);
events = None;
continue;
}
};
let Some(event) = reply.event else {
events = Some(stream);
continue;
};
let _ = emit.send(event);
events = Some(stream);
}
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}