controller: implement support for dialing urls for transport

This commit is contained in:
Alex Zenla
2024-03-05 20:47:53 -08:00
parent e5f5f9493c
commit 31cf3044a4
5 changed files with 47 additions and 6 deletions

View File

@ -4,6 +4,7 @@ use anyhow::{anyhow, Result};
use krata::{
control::{Message, Request, RequestBox, Response},
stream::{ConnectionStreams, StreamContext},
KRATA_DEFAULT_TCP_PORT, KRATA_DEFAULT_TLS_PORT,
};
use log::{trace, warn};
use tokio::{
@ -16,8 +17,9 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_native_tls::TlsStream;
use tokio_native_tls::{native_tls::TlsConnector, TlsStream};
use tokio_stream::{wrappers::LinesStream, StreamExt};
use url::{Host, Url};
const QUEUE_MAX_LEN: usize = 100;
@ -106,6 +108,42 @@ impl KrataClientTransport {
transport_new!(from_tcp, TcpStream, process_tcp_stream);
transport_new!(from_tls_tcp, TlsStream<TcpStream>, process_tls_tcp_stream);
pub async fn dial(url: Url) -> Result<KrataClientTransport> {
match url.scheme() {
"unix" => {
let stream = UnixStream::connect(url.path()).await?;
Ok(KrataClientTransport::from_unix(stream).await?)
}
"tcp" => {
let address = format!(
"{}:{}",
url.host().unwrap_or(Host::Domain("localhost")),
url.port().unwrap_or(KRATA_DEFAULT_TCP_PORT)
);
let stream = TcpStream::connect(address).await?;
Ok(KrataClientTransport::from_tcp(stream).await?)
}
"tls" | "tls-insecure" => {
let insecure = url.scheme() == "tls-insecure";
let host = format!("{}", url.host().unwrap_or(Host::Domain("localhost")));
let address = format!("{}:{}", host, url.port().unwrap_or(KRATA_DEFAULT_TLS_PORT));
let stream = TcpStream::connect(address).await?;
let mut connector = TlsConnector::builder();
if insecure {
connector.danger_accept_invalid_certs(true);
}
let connector = connector.build()?;
let connector = tokio_native_tls::TlsConnector::from(connector);
let stream = connector.connect(&host, stream).await?;
Ok(KrataClientTransport::from_tls_tcp(stream).await?)
}
_ => Err(anyhow!("unsupported url scheme: {}", url.scheme())),
}
}
transport_processor!(process_unix_stream, UnixStream);
transport_processor!(process_tcp_stream, TcpStream);
transport_processor!(process_tls_tcp_stream, TlsStream<TcpStream>);