mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-02 21:00:55 +00:00
chore(code): additional code cleanup
This commit is contained in:
parent
2a4802a75d
commit
8616ed7d9b
@ -133,52 +133,82 @@ impl DaemonIdm {
|
||||
})
|
||||
}
|
||||
|
||||
async fn process_rx_packet(
|
||||
&mut self,
|
||||
domid: u32,
|
||||
data: Option<Vec<u8>>,
|
||||
buffers: &mut HashMap<u32, BytesMut>,
|
||||
) -> Result<()> {
|
||||
if let Some(data) = data {
|
||||
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
|
||||
buffer.extend_from_slice(&data);
|
||||
loop {
|
||||
if buffer.len() < 6 {
|
||||
break;
|
||||
}
|
||||
|
||||
if buffer[0] != 0xff || buffer[1] != 0xff {
|
||||
buffer.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
|
||||
let needed = size + 6;
|
||||
if buffer.len() < needed {
|
||||
break;
|
||||
}
|
||||
let mut packet = buffer.split_to(needed);
|
||||
packet.advance(6);
|
||||
match IdmTransportPacket::decode(packet) {
|
||||
Ok(packet) => {
|
||||
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
|
||||
let guard = self.feeds.lock().await;
|
||||
if let Some(feed) = guard.get(&domid) {
|
||||
let _ = feed.try_send(packet.clone());
|
||||
}
|
||||
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet });
|
||||
}
|
||||
|
||||
Err(packet) => {
|
||||
warn!("received invalid packet from domain {}: {}", domid, packet);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut clients = self.clients.lock().await;
|
||||
let mut feeds = self.feeds.lock().await;
|
||||
clients.remove(&domid);
|
||||
feeds.remove(&domid);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tx_packet(&mut self, domid: u32, packet: IdmTransportPacket) -> Result<()> {
|
||||
let data = packet.encode_to_vec();
|
||||
let mut buffer = vec![0u8; 6];
|
||||
let length = data.len() as u32;
|
||||
buffer[0] = 0xff;
|
||||
buffer[1] = 0xff;
|
||||
buffer[2] = length as u8;
|
||||
buffer[3] = (length << 8) as u8;
|
||||
buffer[4] = (length << 16) as u8;
|
||||
buffer[5] = (length << 24) as u8;
|
||||
buffer.extend_from_slice(&data);
|
||||
self.tx_raw_sender.send((domid, buffer)).await?;
|
||||
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
|
||||
from: 0,
|
||||
to: domid,
|
||||
packet,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process(&mut self, buffers: &mut HashMap<u32, BytesMut>) -> Result<()> {
|
||||
loop {
|
||||
select! {
|
||||
x = self.rx_receiver.recv() => match x {
|
||||
Some((domid, data)) => {
|
||||
if let Some(data) = data {
|
||||
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
|
||||
buffer.extend_from_slice(&data);
|
||||
loop {
|
||||
if buffer.len() < 6 {
|
||||
break;
|
||||
}
|
||||
|
||||
if buffer[0] != 0xff || buffer[1] != 0xff {
|
||||
buffer.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
|
||||
let needed = size + 6;
|
||||
if buffer.len() < needed {
|
||||
break;
|
||||
}
|
||||
let mut packet = buffer.split_to(needed);
|
||||
packet.advance(6);
|
||||
match IdmTransportPacket::decode(packet) {
|
||||
Ok(packet) => {
|
||||
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
|
||||
let guard = self.feeds.lock().await;
|
||||
if let Some(feed) = guard.get(&domid) {
|
||||
let _ = feed.try_send(packet.clone());
|
||||
}
|
||||
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet });
|
||||
}
|
||||
|
||||
Err(packet) => {
|
||||
warn!("received invalid packet from domain {}: {}", domid, packet);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut clients = self.clients.lock().await;
|
||||
let mut feeds = self.feeds.lock().await;
|
||||
clients.remove(&domid);
|
||||
feeds.remove(&domid);
|
||||
}
|
||||
self.process_rx_packet(domid, data, buffers).await?;
|
||||
},
|
||||
|
||||
None => {
|
||||
@ -187,18 +217,7 @@ impl DaemonIdm {
|
||||
},
|
||||
x = self.tx_receiver.recv() => match x {
|
||||
Some((domid, packet)) => {
|
||||
let data = packet.encode_to_vec();
|
||||
let mut buffer = vec![0u8; 6];
|
||||
let length = data.len() as u32;
|
||||
buffer[0] = 0xff;
|
||||
buffer[1] = 0xff;
|
||||
buffer[2] = length as u8;
|
||||
buffer[3] = (length << 8) as u8;
|
||||
buffer[4] = (length << 16) as u8;
|
||||
buffer[5] = (length << 24) as u8;
|
||||
buffer.extend_from_slice(&data);
|
||||
self.tx_raw_sender.send((domid, buffer)).await?;
|
||||
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet });
|
||||
self.tx_packet(domid, packet).await?;
|
||||
},
|
||||
|
||||
None => {
|
||||
|
Loading…
Reference in New Issue
Block a user