From da75a51c6e8c1cc95745d989279d94d31507063f Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 5 Mar 2024 19:36:44 -0800 Subject: [PATCH] controller: internal client support for TCP and TLS connections --- Cargo.toml | 1 + controller/Cargo.toml | 1 + controller/bin/control.rs | 2 +- controller/src/client.rs | 134 +++++++++++++++++++++----------------- 4 files changed, 78 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e813f4a..b6503ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ path-absolutize = "3.1.1" tokio-tun = "0.11.2" tokio-listener = "0.3.1" trait-variant = "0.1.1" +tokio-native-tls = "0.3.1" signal-hook = "0.3.17" [workspace.dependencies.uuid] diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 65221bf..a672fc4 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -15,6 +15,7 @@ cli-tables = { workspace = true } clap = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +tokio-native-tls = { workspace = true } [dependencies.krata] path = "../shared" diff --git a/controller/bin/control.rs b/controller/bin/control.rs index 31be9d4..f617200 100644 --- a/controller/bin/control.rs +++ b/controller/bin/control.rs @@ -54,7 +54,7 @@ async fn main() -> Result<()> { let args = ControllerArgs::parse(); let stream = UnixStream::connect(&args.connection).await?; - let transport = KrataClientTransport::new(stream).await?; + let transport = KrataClientTransport::from_unix(stream).await?; let client = KrataClient::new(transport).await?; match args.command { diff --git a/controller/src/client.rs b/controller/src/client.rs index 021a203..ef031f3 100644 --- a/controller/src/client.rs +++ b/controller/src/client.rs @@ -8,7 +8,7 @@ use krata::{ use log::{trace, warn}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - net::{unix, UnixStream}, + net::{TcpStream, UnixStream}, select, sync::{ mpsc::{channel, Receiver, Sender}, @@ -16,6 +16,7 @@ use tokio::{ }, task::JoinHandle, }; +use tokio_native_tls::TlsStream; use tokio_stream::{wrappers::LinesStream, StreamExt}; const QUEUE_MAX_LEN: usize = 100; @@ -32,67 +33,82 @@ impl Drop for KrataClientTransport { } } -impl KrataClientTransport { - pub async fn new(stream: UnixStream) -> Result { - let (read, write) = stream.into_split(); - let (tx_sender, tx_receiver) = channel::(QUEUE_MAX_LEN); - let (rx_sender, rx_receiver) = channel::(QUEUE_MAX_LEN); +macro_rules! transport_new { + ($name:ident, $stream:ty, $processor:ident) => { + pub async fn $name(stream: $stream) -> Result { + let (tx_sender, tx_receiver) = channel::(QUEUE_MAX_LEN); + let (rx_sender, rx_receiver) = channel::(QUEUE_MAX_LEN); - let task = tokio::task::spawn(async move { - if let Err(error) = - KrataClientTransport::process_unix_stream(read, write, rx_sender, tx_receiver).await - { - warn!("failed to process krata transport messages: {}", error); - } - }); - - Ok(Self { - sender: tx_sender, - receiver: rx_receiver, - task, - }) - } - - async fn process_unix_stream( - read: unix::OwnedReadHalf, - mut write: unix::OwnedWriteHalf, - rx_sender: Sender, - mut tx_receiver: Receiver, - ) -> Result<()> { - let mut read = LinesStream::new(BufReader::new(read).lines()); - loop { - select! { - x = tx_receiver.recv() => match x { - Some(message) => { - let mut line = serde_json::to_string(&message)?; - trace!("sending line '{}'", line); - line.push('\n'); - write.write_all(line.as_bytes()).await?; - }, - - None => { - break; - } - }, - - x = read.next() => match x { - Some(Ok(line)) => { - let message = serde_json::from_str::(&line)?; - rx_sender.send(message).await?; - }, - - Some(Err(error)) => { - return Err(error.into()); - }, - - None => { - break; - } + let task = tokio::task::spawn(async move { + if let Err(error) = + KrataClientTransport::$processor(stream, rx_sender, tx_receiver).await + { + warn!("failed to process krata transport messages: {}", error); } - }; + }); + + Ok(Self { + sender: tx_sender, + receiver: rx_receiver, + task, + }) } - Ok(()) - } + }; +} + +macro_rules! transport_processor { + ($name:ident, $stream:ty) => { + async fn $name( + stream: $stream, + rx_sender: Sender, + mut tx_receiver: Receiver, + ) -> Result<()> { + let (read, mut write) = tokio::io::split(stream); + let mut read = LinesStream::new(BufReader::new(read).lines()); + loop { + select! { + x = tx_receiver.recv() => match x { + Some(message) => { + let mut line = serde_json::to_string(&message)?; + trace!("sending line '{}'", line); + line.push('\n'); + write.write_all(line.as_bytes()).await?; + }, + + None => { + break; + } + }, + + x = read.next() => match x { + Some(Ok(line)) => { + let message = serde_json::from_str::(&line)?; + rx_sender.send(message).await?; + }, + + Some(Err(error)) => { + return Err(error.into()); + }, + + None => { + break; + } + } + }; + } + Ok(()) + } + }; +} + +impl KrataClientTransport { + transport_new!(from_unix, UnixStream, process_unix_stream); + transport_new!(from_tcp, TcpStream, process_tcp_stream); + transport_new!(from_tls_tcp, TlsStream, process_tls_tcp_stream); + + transport_processor!(process_unix_stream, UnixStream); + transport_processor!(process_tcp_stream, TcpStream); + transport_processor!(process_tls_tcp_stream, TlsStream); } type RequestsMap = Arc>>>;