mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-05 22:21:31 +00:00
feat: implement request response idm system
This commit is contained in:
@ -6,7 +6,10 @@ use anyhow::Result;
|
|||||||
use cgroups_rs::Cgroup;
|
use cgroups_rs::Cgroup;
|
||||||
use krata::idm::{
|
use krata::idm::{
|
||||||
client::IdmClient,
|
client::IdmClient,
|
||||||
protocol::{idm_event::Event, IdmEvent, IdmExitEvent},
|
protocol::{
|
||||||
|
idm_event::Event, idm_request::Request, idm_response::Response, IdmEvent, IdmExitEvent,
|
||||||
|
IdmPingResponse, IdmRequest,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use nix::unistd::Pid;
|
use nix::unistd::Pid;
|
||||||
@ -31,6 +34,7 @@ impl GuestBackground {
|
|||||||
|
|
||||||
pub async fn run(&mut self) -> Result<()> {
|
pub async fn run(&mut self) -> Result<()> {
|
||||||
let mut event_subscription = self.idm.subscribe().await?;
|
let mut event_subscription = self.idm.subscribe().await?;
|
||||||
|
let mut requests_subscription = self.idm.requests().await?;
|
||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
x = event_subscription.recv() => match x {
|
x = event_subscription.recv() => match x {
|
||||||
@ -48,6 +52,21 @@ impl GuestBackground {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
x = requests_subscription.recv() => match x {
|
||||||
|
Ok(request) => {
|
||||||
|
self.handle_idm_request(request).await?;
|
||||||
|
},
|
||||||
|
|
||||||
|
Err(broadcast::error::RecvError::Closed) => {
|
||||||
|
debug!("idm packet channel closed");
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
event = self.wait.recv() => match event {
|
event = self.wait.recv() => match event {
|
||||||
Some(event) => self.child_event(event).await?,
|
Some(event) => self.child_event(event).await?,
|
||||||
None => {
|
None => {
|
||||||
@ -59,6 +78,16 @@ impl GuestBackground {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_idm_request(&mut self, packet: IdmRequest) -> Result<()> {
|
||||||
|
let id = packet.id;
|
||||||
|
if let Some(Request::Ping(_)) = packet.request {
|
||||||
|
self.idm
|
||||||
|
.respond(id, Response::Ping(IdmPingResponse {}))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
|
async fn child_event(&mut self, event: ChildEvent) -> Result<()> {
|
||||||
if event.pid == self.child {
|
if event.pid == self.child {
|
||||||
self.idm
|
self.idm
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
use std::{path::Path, sync::Arc};
|
use std::{collections::HashMap, path::Path, sync::Arc};
|
||||||
|
|
||||||
use crate::idm::protocol::idm_packet::Content;
|
use crate::idm::protocol::idm_packet::Content;
|
||||||
|
|
||||||
use super::protocol::{IdmEvent, IdmPacket};
|
use super::protocol::{
|
||||||
|
idm_request::Request, idm_response::Response, IdmEvent, IdmPacket, IdmRequest, IdmResponse,
|
||||||
|
};
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use log::{debug, error};
|
use log::{debug, error};
|
||||||
@ -15,11 +17,13 @@ use tokio::{
|
|||||||
sync::{
|
sync::{
|
||||||
broadcast,
|
broadcast,
|
||||||
mpsc::{channel, Receiver, Sender},
|
mpsc::{channel, Receiver, Sender},
|
||||||
Mutex,
|
oneshot, Mutex,
|
||||||
},
|
},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type RequestMap = Arc<Mutex<HashMap<u64, oneshot::Sender<IdmResponse>>>>;
|
||||||
|
|
||||||
const IDM_PACKET_QUEUE_LEN: usize = 100;
|
const IDM_PACKET_QUEUE_LEN: usize = 100;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -77,8 +81,11 @@ impl IdmBackend for IdmFileBackend {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct IdmClient {
|
pub struct IdmClient {
|
||||||
|
request_backend_sender: broadcast::Sender<IdmRequest>,
|
||||||
|
next_request_id: Arc<Mutex<u64>>,
|
||||||
event_receiver_sender: broadcast::Sender<IdmEvent>,
|
event_receiver_sender: broadcast::Sender<IdmEvent>,
|
||||||
tx_sender: Sender<IdmPacket>,
|
tx_sender: Sender<IdmPacket>,
|
||||||
|
requests: RequestMap,
|
||||||
task: Arc<JoinHandle<()>>,
|
task: Arc<JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,18 +99,31 @@ impl Drop for IdmClient {
|
|||||||
|
|
||||||
impl IdmClient {
|
impl IdmClient {
|
||||||
pub async fn new(backend: Box<dyn IdmBackend>) -> Result<IdmClient> {
|
pub async fn new(backend: Box<dyn IdmBackend>) -> Result<IdmClient> {
|
||||||
|
let requests = Arc::new(Mutex::new(HashMap::new()));
|
||||||
let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
|
let (event_sender, event_receiver) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
|
||||||
|
let (internal_request_backend_sender, _) = broadcast::channel(IDM_PACKET_QUEUE_LEN);
|
||||||
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
let (tx_sender, tx_receiver) = channel(IDM_PACKET_QUEUE_LEN);
|
||||||
let backend_event_sender = event_sender.clone();
|
let backend_event_sender = event_sender.clone();
|
||||||
|
let request_backend_sender = internal_request_backend_sender.clone();
|
||||||
let task = tokio::task::spawn(async move {
|
let task = tokio::task::spawn(async move {
|
||||||
if let Err(error) =
|
if let Err(error) = IdmClient::process(
|
||||||
IdmClient::process(backend, backend_event_sender, event_receiver, tx_receiver).await
|
backend,
|
||||||
|
backend_event_sender,
|
||||||
|
requests,
|
||||||
|
internal_request_backend_sender,
|
||||||
|
event_receiver,
|
||||||
|
tx_receiver,
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
debug!("failed to handle idm client processing: {}", error);
|
debug!("failed to handle idm client processing: {}", error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(IdmClient {
|
Ok(IdmClient {
|
||||||
|
next_request_id: Arc::new(Mutex::new(0)),
|
||||||
event_receiver_sender: event_sender.clone(),
|
event_receiver_sender: event_sender.clone(),
|
||||||
|
request_backend_sender,
|
||||||
|
requests: Arc::new(Mutex::new(HashMap::new())),
|
||||||
tx_sender,
|
tx_sender,
|
||||||
task: Arc::new(task),
|
task: Arc::new(task),
|
||||||
})
|
})
|
||||||
@ -129,13 +149,57 @@ impl IdmClient {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn requests(&self) -> Result<broadcast::Receiver<IdmRequest>> {
|
||||||
|
Ok(self.request_backend_sender.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn respond(&self, id: u64, response: Response) -> Result<()> {
|
||||||
|
let packet = IdmPacket {
|
||||||
|
content: Some(Content::Response(IdmResponse {
|
||||||
|
id,
|
||||||
|
response: Some(response),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
self.tx_sender.send(packet).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn subscribe(&self) -> Result<broadcast::Receiver<IdmEvent>> {
|
pub async fn subscribe(&self) -> Result<broadcast::Receiver<IdmEvent>> {
|
||||||
Ok(self.event_receiver_sender.subscribe())
|
Ok(self.event_receiver_sender.subscribe())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn send(&self, request: Request) -> Result<Response> {
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
let mut requests = self.requests.lock().await;
|
||||||
|
let req = {
|
||||||
|
let mut guard = self.next_request_id.lock().await;
|
||||||
|
let req = *guard;
|
||||||
|
*guard = req.wrapping_add(1);
|
||||||
|
req
|
||||||
|
};
|
||||||
|
requests.insert(req, sender);
|
||||||
|
drop(requests);
|
||||||
|
self.tx_sender
|
||||||
|
.send(IdmPacket {
|
||||||
|
content: Some(Content::Request(IdmRequest {
|
||||||
|
id: req,
|
||||||
|
request: Some(request),
|
||||||
|
})),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(response) = receiver.await?.response {
|
||||||
|
Ok(response)
|
||||||
|
} else {
|
||||||
|
Err(anyhow!("response did not contain any content"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process(
|
async fn process(
|
||||||
mut backend: Box<dyn IdmBackend>,
|
mut backend: Box<dyn IdmBackend>,
|
||||||
event_sender: broadcast::Sender<IdmEvent>,
|
event_sender: broadcast::Sender<IdmEvent>,
|
||||||
|
requests: RequestMap,
|
||||||
|
request_backend_sender: broadcast::Sender<IdmRequest>,
|
||||||
_event_receiver: broadcast::Receiver<IdmEvent>,
|
_event_receiver: broadcast::Receiver<IdmEvent>,
|
||||||
mut receiver: Receiver<IdmPacket>,
|
mut receiver: Receiver<IdmPacket>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@ -148,6 +212,18 @@ impl IdmClient {
|
|||||||
let _ = event_sender.send(event);
|
let _ = event_sender.send(event);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
Some(Content::Request(request)) => {
|
||||||
|
let _ = request_backend_sender.send(request);
|
||||||
|
},
|
||||||
|
|
||||||
|
Some(Content::Response(response)) => {
|
||||||
|
let mut requests = requests.lock().await;
|
||||||
|
if let Some(sender) = requests.remove(&response.id) {
|
||||||
|
drop(requests);
|
||||||
|
let _ = sender.send(response);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user