diff --git a/crates/xen/xenstore/src/bus.rs b/crates/xen/xenstore/src/bus.rs index fc209c6..5a890f5 100644 --- a/crates/xen/xenstore/src/bus.rs +++ b/crates/xen/xenstore/src/bus.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, ffi::CString, - io::ErrorKind, + io::Read, os::{ fd::{AsRawFd, FromRawFd, IntoRawFd}, unix::fs::FileTypeExt, @@ -9,11 +9,10 @@ use std::{ sync::Arc, }; -use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; -use log::warn; +use log::{debug, warn}; use tokio::{ fs::{metadata, File}, - io::{AsyncReadExt, AsyncWriteExt}, + io::AsyncWriteExt, net::UnixStream, select, sync::{ @@ -64,8 +63,8 @@ pub struct XsdSocket { next_request_id: Arc>, next_watch_id: Arc>, processor_task: Arc>, - rx_task: Arc>, unwatch_sender: Sender<(u32, String)>, + _rx_task: Arc>, } impl XsdSocket { @@ -78,15 +77,10 @@ impl XsdSocket { let file = if socket { let stream = UnixStream::connect(path).await?; let stream = stream.into_std()?; - stream.set_nonblocking(true)?; + stream.set_nonblocking(false)?; unsafe { File::from_raw_fd(stream.into_raw_fd()) } } else { - File::options() - .read(true) - .write(true) - .custom_flags(O_NONBLOCK) - .open(path) - .await? + File::options().read(true).write(true).open(path).await? }; XsdSocket::from_handle(file).await @@ -101,7 +95,7 @@ impl XsdSocket { let (rx_sender, rx_receiver) = channel::(10); let (tx_sender, tx_receiver) = channel::(10); let (unwatch_sender, unwatch_receiver) = channel::<(u32, String)>(1000); - let read: File = handle.try_clone().await?; + let read: std::fs::File = unsafe { std::fs::File::from_raw_fd(handle.as_raw_fd()) }; let mut processor = XsdSocketProcessor { handle, @@ -119,11 +113,13 @@ impl XsdSocket { } }); - let rx_task = tokio::task::spawn(async move { - if let Err(error) = XsdSocketProcessor::process_rx(read, rx_sender).await { - warn!("failed to process xen store responses: {}", error); - } - }); + let rx_task = std::thread::Builder::new() + .name("xenstore-reader".to_string()) + .spawn(move || { + if let Err(error) = XsdSocketProcessor::process_rx(read, rx_sender) { + debug!("failed to process xen store bus: {}", error); + } + })?; Ok(XsdSocket { tx_sender, @@ -132,8 +128,8 @@ impl XsdSocket { next_request_id, next_watch_id: Arc::new(Mutex::new(0u32)), processor_task: Arc::new(processor_task), - rx_task: Arc::new(rx_task), unwatch_sender, + _rx_task: Arc::new(rx_task), }) } @@ -201,80 +197,28 @@ struct XsdSocketProcessor { } impl XsdSocketProcessor { - async fn process_rx(mut read: File, rx_sender: Sender) -> Result<()> { + fn process_rx(mut read: std::fs::File, rx_sender: Sender) -> Result<()> { let mut header_buffer: Vec = vec![0u8; XsdMessageHeader::SIZE]; let mut buffer: Vec = vec![0u8; XEN_BUS_MAX_PACKET_SIZE - XsdMessageHeader::SIZE]; loop { - select! { - message = XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, &mut read) => { - let message = message?; - rx_sender.send(message).await?; - }, - - _ = rx_sender.closed() => { - break; - } - }; + let message = + XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, &mut read)?; + rx_sender.blocking_send(message)?; } - Ok(()) } - fn set_nonblocking(fd: i32, nonblock: bool) -> Result<()> { - let mut flags = unsafe { fcntl(fd, F_GETFL) }; - if flags == -1 { - return Err(Error::Io(std::io::Error::new( - ErrorKind::Unsupported, - "failed to get fd flags", - ))); - } - if nonblock { - flags |= O_NONBLOCK; - } else { - flags &= !O_NONBLOCK; - } - let result = unsafe { fcntl(fd, F_SETFL, flags) }; - if result == -1 { - return Err(Error::Io(std::io::Error::new( - ErrorKind::Unsupported, - "failed to set fd flags", - ))); - } - Ok(()) - } - - async fn read_message( + fn read_message( header_buffer: &mut [u8], buffer: &mut [u8], - read: &mut File, + read: &mut std::fs::File, ) -> Result { - XsdSocketProcessor::set_nonblocking(read.as_raw_fd(), true)?; - let header_size = loop { - match read.read_exact(header_buffer).await { - Ok(size) => break size, - Err(error) => { - if error.kind() == ErrorKind::WouldBlock { - tokio::task::yield_now().await; - continue; - } - return Err(error.into()); - } - }; - }; - - if header_size < XsdMessageHeader::SIZE { - return Err(Error::InvalidBusData); - } - + read.read_exact(header_buffer)?; let header = XsdMessageHeader::decode(header_buffer)?; if header.len as usize > buffer.len() { return Err(Error::InvalidBusData); } let payload_buffer = &mut buffer[0..header.len as usize]; - XsdSocketProcessor::set_nonblocking(read.as_raw_fd(), false)?; - let payload_size = read.read_exact(payload_buffer).await?; - if payload_size != header.len as usize { - return Err(Error::InvalidBusData); - } + read.read_exact(payload_buffer)?; Ok(XsdMessage { header, payload: payload_buffer.to_vec(), @@ -392,10 +336,6 @@ impl XsdMessage { impl Drop for XsdSocket { fn drop(&mut self) { - if Arc::strong_count(&self.rx_task) <= 1 { - self.rx_task.abort(); - } - if Arc::strong_count(&self.processor_task) <= 1 { self.processor_task.abort(); }