fix: implement channel destruction propagation

This commit is contained in:
Alex Zenla
2024-04-10 11:14:11 +00:00
parent 0bf39685cf
commit b803af4368
3 changed files with 60 additions and 41 deletions

View File

@ -79,7 +79,7 @@ impl Drop for DaemonConsoleHandle {
pub struct DaemonConsole { pub struct DaemonConsole {
listeners: ListenerMap, listeners: ListenerMap,
buffers: BufferMap, buffers: BufferMap,
receiver: Receiver<(u32, Vec<u8>)>, receiver: Receiver<(u32, Option<Vec<u8>>)>,
sender: Sender<(u32, Vec<u8>)>, sender: Sender<(u32, Vec<u8>)>,
task: JoinHandle<()>, task: JoinHandle<()>,
} }
@ -124,16 +124,22 @@ impl DaemonConsole {
}; };
let mut buffers = self.buffers.lock().await; let mut buffers = self.buffers.lock().await;
let buffer = buffers if let Some(data) = data {
.entry(domid) let buffer = buffers
.or_insert_with_key(|_| RawConsoleBuffer::boxed()); .entry(domid)
buffer.extend_from_slice(&data); .or_insert_with_key(|_| RawConsoleBuffer::boxed());
drop(buffers); buffer.extend_from_slice(&data);
let mut listeners = self.listeners.lock().await; drop(buffers);
if let Some(senders) = listeners.get_mut(&domid) { let mut listeners = self.listeners.lock().await;
senders.retain(|sender| { if let Some(senders) = listeners.get_mut(&domid) {
!matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_))) senders.retain(|sender| {
}); !matches!(sender.try_send(data.to_vec()), Err(TrySendError::Closed(_)))
});
}
} else {
buffers.remove(&domid);
let mut listeners = self.listeners.lock().await;
listeners.remove(&domid);
} }
} }
Ok(()) Ok(())

View File

@ -52,7 +52,7 @@ pub struct DaemonIdm {
tx_sender: Sender<(u32, IdmPacket)>, tx_sender: Sender<(u32, IdmPacket)>,
tx_raw_sender: Sender<(u32, Vec<u8>)>, tx_raw_sender: Sender<(u32, Vec<u8>)>,
tx_receiver: Receiver<(u32, IdmPacket)>, tx_receiver: Receiver<(u32, IdmPacket)>,
rx_receiver: Receiver<(u32, Vec<u8>)>, rx_receiver: Receiver<(u32, Option<Vec<u8>>)>,
task: JoinHandle<()>, task: JoinHandle<()>,
} }
@ -98,29 +98,37 @@ impl DaemonIdm {
select! { select! {
x = self.rx_receiver.recv() => match x { x = self.rx_receiver.recv() => match x {
Some((domid, data)) => { Some((domid, data)) => {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new()); if let Some(data) = data {
buffer.extend_from_slice(&data); let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
if buffer.len() < 2 { buffer.extend_from_slice(&data);
continue; if buffer.len() < 2 {
} continue;
let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize; }
let needed = size + 2; let size = (buffer[0] as u16 | (buffer[1] as u16) << 8) as usize;
if buffer.len() < needed { let needed = size + 2;
continue; if buffer.len() < needed {
} continue;
let mut packet = buffer.split_to(needed); }
packet.advance(2); let mut packet = buffer.split_to(needed);
match IdmPacket::decode(packet) { packet.advance(2);
Ok(packet) => { match IdmPacket::decode(packet) {
let guard = self.feeds.lock().await; Ok(packet) => {
if let Some(feed) = guard.get(&domid) { let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
let _ = feed.try_send(packet); let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet);
}
}
Err(packet) => {
warn!("received invalid packet from domain {}: {}", domid, packet);
} }
} }
} else {
Err(packet) => { let mut clients = self.clients.lock().await;
warn!("received invalid packet from domain {}: {}", domid, packet); let mut feeds = self.feeds.lock().await;
} clients.remove(&domid);
feeds.remove(&domid);
} }
}, },

View File

@ -48,7 +48,7 @@ pub struct ChannelService {
gnttab: GrantTab, gnttab: GrantTab,
input_receiver: Receiver<(u32, Vec<u8>)>, input_receiver: Receiver<(u32, Vec<u8>)>,
pub input_sender: Sender<(u32, Vec<u8>)>, pub input_sender: Sender<(u32, Vec<u8>)>,
output_sender: Sender<(u32, Vec<u8>)>, output_sender: Sender<(u32, Option<Vec<u8>>)>,
} }
impl ChannelService { impl ChannelService {
@ -58,7 +58,7 @@ impl ChannelService {
) -> Result<( ) -> Result<(
ChannelService, ChannelService,
Sender<(u32, Vec<u8>)>, Sender<(u32, Vec<u8>)>,
Receiver<(u32, Vec<u8>)>, Receiver<(u32, Option<Vec<u8>>)>,
)> { )> {
let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN); let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
@ -203,12 +203,14 @@ pub struct ChannelBackend {
pub domid: u32, pub domid: u32,
pub id: u32, pub id: u32,
pub sender: Sender<Vec<u8>>, pub sender: Sender<Vec<u8>>,
raw_sender: Sender<(u32, Option<Vec<u8>>)>,
task: JoinHandle<()>, task: JoinHandle<()>,
} }
impl Drop for ChannelBackend { impl Drop for ChannelBackend {
fn drop(&mut self) { fn drop(&mut self) {
self.task.abort(); self.task.abort();
let _ = self.raw_sender.try_send((self.domid, None));
debug!( debug!(
"destroyed channel backend for domain {} channel {}", "destroyed channel backend for domain {} channel {}",
self.domid, self.id self.domid, self.id
@ -226,7 +228,7 @@ impl ChannelBackend {
store: XsdClient, store: XsdClient,
evtchn: EventChannel, evtchn: EventChannel,
gnttab: GrantTab, gnttab: GrantTab,
output_sender: Sender<(u32, Vec<u8>)>, output_sender: Sender<(u32, Option<Vec<u8>>)>,
use_reserved_ref: Option<u64>, use_reserved_ref: Option<u64>,
) -> Result<ChannelBackend> { ) -> Result<ChannelBackend> {
let processor = KrataChannelBackendProcessor { let processor = KrataChannelBackendProcessor {
@ -242,11 +244,14 @@ impl ChannelBackend {
let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN); let (input_sender, input_receiver) = channel(SINGLE_CHANNEL_QUEUE_LEN);
let task = processor.launch(output_sender, input_receiver).await?; let task = processor
.launch(output_sender.clone(), input_receiver)
.await?;
Ok(ChannelBackend { Ok(ChannelBackend {
domid, domid,
id, id,
task, task,
raw_sender: output_sender,
sender: input_sender, sender: input_sender,
}) })
} }
@ -304,7 +309,7 @@ impl KrataChannelBackendProcessor {
async fn launch( async fn launch(
&self, &self,
output_sender: Sender<(u32, Vec<u8>)>, output_sender: Sender<(u32, Option<Vec<u8>>)>,
input_receiver: Receiver<Vec<u8>>, input_receiver: Receiver<Vec<u8>>,
) -> Result<JoinHandle<()>> { ) -> Result<JoinHandle<()>> {
let owned = self.clone(); let owned = self.clone();
@ -321,7 +326,7 @@ impl KrataChannelBackendProcessor {
async fn processor( async fn processor(
&self, &self,
sender: Sender<(u32, Vec<u8>)>, sender: Sender<(u32, Option<Vec<u8>>)>,
mut receiver: Receiver<Vec<u8>>, mut receiver: Receiver<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
self.init().await?; self.init().await?;
@ -396,7 +401,7 @@ impl KrataChannelBackendProcessor {
unsafe { unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?; let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() { if !buffer.is_empty() {
sender.send((self.domid, buffer)).await?; sender.send((self.domid, Some(buffer))).await?;
} }
}; };
@ -466,7 +471,7 @@ impl KrataChannelBackendProcessor {
unsafe { unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?; let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() { if !buffer.is_empty() {
sender.send((self.domid, buffer)).await?; sender.send((self.domid, Some(buffer))).await?;
} }
}; };
channel.unmask_sender.send(channel.local_port).await?; channel.unmask_sender.send(channel.local_port).await?;