From 8616ed7d9b17c58fae0583ccd8c7cc817b51dd5a Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sat, 20 Jul 2024 23:22:59 -0700 Subject: [PATCH] chore(code): additional code cleanup --- crates/daemon/src/idm.rs | 125 ++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 53 deletions(-) diff --git a/crates/daemon/src/idm.rs b/crates/daemon/src/idm.rs index 767b627..333432c 100644 --- a/crates/daemon/src/idm.rs +++ b/crates/daemon/src/idm.rs @@ -133,52 +133,82 @@ impl DaemonIdm { }) } + async fn process_rx_packet( + &mut self, + domid: u32, + data: Option>, + buffers: &mut HashMap, + ) -> Result<()> { + if let Some(data) = data { + let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); + buffer.extend_from_slice(&data); + loop { + if buffer.len() < 6 { + break; + } + + if buffer[0] != 0xff || buffer[1] != 0xff { + buffer.clear(); + break; + } + + let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize; + let needed = size + 6; + if buffer.len() < needed { + break; + } + let mut packet = buffer.split_to(needed); + packet.advance(6); + match IdmTransportPacket::decode(packet) { + Ok(packet) => { + let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; + let guard = self.feeds.lock().await; + if let Some(feed) = guard.get(&domid) { + let _ = feed.try_send(packet.clone()); + } + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet }); + } + + Err(packet) => { + warn!("received invalid packet from domain {}: {}", domid, packet); + } + } + } + } else { + let mut clients = self.clients.lock().await; + let mut feeds = self.feeds.lock().await; + clients.remove(&domid); + feeds.remove(&domid); + } + Ok(()) + } + + async fn tx_packet(&mut self, domid: u32, packet: IdmTransportPacket) -> Result<()> { + let data = packet.encode_to_vec(); + let mut buffer = vec![0u8; 6]; + let length = data.len() as u32; + buffer[0] = 0xff; + buffer[1] = 0xff; + buffer[2] = length as u8; + buffer[3] = (length << 8) as u8; + buffer[4] = (length << 16) as u8; + buffer[5] = (length << 24) as u8; + buffer.extend_from_slice(&data); + self.tx_raw_sender.send((domid, buffer)).await?; + let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { + from: 0, + to: domid, + packet, + }); + Ok(()) + } + async fn process(&mut self, buffers: &mut HashMap) -> Result<()> { loop { select! { x = self.rx_receiver.recv() => match x { Some((domid, data)) => { - if let Some(data) = data { - let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); - buffer.extend_from_slice(&data); - loop { - if buffer.len() < 6 { - break; - } - - if buffer[0] != 0xff || buffer[1] != 0xff { - buffer.clear(); - break; - } - - let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize; - let needed = size + 6; - if buffer.len() < needed { - break; - } - let mut packet = buffer.split_to(needed); - packet.advance(6); - match IdmTransportPacket::decode(packet) { - Ok(packet) => { - let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?; - let guard = self.feeds.lock().await; - if let Some(feed) = guard.get(&domid) { - let _ = feed.try_send(packet.clone()); - } - let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet }); - } - - Err(packet) => { - warn!("received invalid packet from domain {}: {}", domid, packet); - } - } - } - } else { - let mut clients = self.clients.lock().await; - let mut feeds = self.feeds.lock().await; - clients.remove(&domid); - feeds.remove(&domid); - } + self.process_rx_packet(domid, data, buffers).await?; }, None => { @@ -187,18 +217,7 @@ impl DaemonIdm { }, x = self.tx_receiver.recv() => match x { Some((domid, packet)) => { - let data = packet.encode_to_vec(); - let mut buffer = vec![0u8; 6]; - let length = data.len() as u32; - buffer[0] = 0xff; - buffer[1] = 0xff; - buffer[2] = length as u8; - buffer[3] = (length << 8) as u8; - buffer[4] = (length << 16) as u8; - buffer[5] = (length << 24) as u8; - buffer.extend_from_slice(&data); - self.tx_raw_sender.send((domid, buffer)).await?; - let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet }); + self.tx_packet(domid, packet).await?; }, None => {