krata: event-based network backend startup and api enhancements

This commit is contained in:
Alex Zenla
2024-03-27 02:54:39 +00:00
parent 63c0feb053
commit 66465793cd
29 changed files with 346 additions and 229 deletions

View File

@ -7,11 +7,15 @@ resolver = "2"
[dependencies]
anyhow = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
prost-reflect = { workspace = true }
serde = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
url = { workspace = true }
[build-dependencies]

View File

@ -0,0 +1,61 @@
use crate::{dial::ControlDialAddress, v1::control::control_service_client::ControlServiceClient};
#[cfg(not(unix))]
use anyhow::anyhow;
use anyhow::Result;
#[cfg(unix)]
use tokio::net::UnixStream;
#[cfg(unix)]
use tonic::transport::Uri;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
#[cfg(unix)]
use tower::service_fn;
pub struct ControlClientProvider {}
impl ControlClientProvider {
pub async fn dial(addr: ControlDialAddress) -> Result<ControlServiceClient<Channel>> {
let channel = match addr {
ControlDialAddress::UnixSocket { path } => {
#[cfg(not(unix))]
return Err(anyhow!(
"unix sockets are not supported on this platform (path {})",
path
));
#[cfg(unix)]
ControlClientProvider::dial_unix_socket(path).await?
}
ControlDialAddress::Tcp { host, port } => {
Endpoint::try_from(format!("http://{}:{}", host, port))?
.connect()
.await?
}
ControlDialAddress::Tls {
host,
port,
insecure: _,
} => {
let tls_config = ClientTlsConfig::new().domain_name(&host);
let address = format!("https://{}:{}", host, port);
Channel::from_shared(address)?
.tls_config(tls_config)?
.connect()
.await?
}
};
Ok(ControlServiceClient::new(channel))
}
#[cfg(unix)]
async fn dial_unix_socket(path: String) -> Result<Channel> {
// This URL is not actually used but is required to be specified.
Ok(Endpoint::try_from(format!("unix://localhost/{}", path))?
.connect_with_connector(service_fn(|uri: Uri| {
let path = uri.path().to_string();
UnixStream::connect(path)
}))
.await?)
}
}

View File

@ -0,0 +1,57 @@
use std::sync::Arc;
use crate::v1::control::{watch_events_reply::Event, WatchEventsReply};
use anyhow::Result;
use log::trace;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_stream::StreamExt;
use tonic::Streaming;
#[derive(Clone)]
pub struct EventStream {
sender: Arc<broadcast::Sender<Event>>,
task: Arc<JoinHandle<()>>,
}
impl EventStream {
pub async fn open(mut events: Streaming<WatchEventsReply>) -> Result<Self> {
let (sender, _) = broadcast::channel(1000);
let emit = sender.clone();
let task = tokio::task::spawn(async move {
loop {
let Some(result) = events.next().await else {
break;
};
let reply = match result {
Ok(reply) => reply,
Err(error) => {
trace!("event stream processing failed: {}", error);
break;
}
};
let Some(event) = reply.event else {
continue;
};
let _ = emit.send(event);
}
});
Ok(Self {
sender: Arc::new(sender),
task: Arc::new(task),
})
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
impl Drop for EventStream {
fn drop(&mut self) {
if Arc::strong_count(&self.task) <= 1 {
self.task.abort();
}
}
}

View File

@ -1,10 +1,13 @@
use once_cell::sync::Lazy;
use prost_reflect::DescriptorPool;
pub mod dial;
pub mod launchcfg;
pub mod v1;
pub mod client;
pub mod dial;
pub mod events;
pub mod launchcfg;
#[cfg(target_os = "linux")]
pub mod ethtool;