mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 13:11:31 +00:00
fix(zone): waitpid should be limited when no child processes exist (fixes #304)
This commit is contained in:
@ -16,6 +16,7 @@ use krata::idm::{
|
|||||||
};
|
};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use nix::unistd::Pid;
|
use nix::unistd::Pid;
|
||||||
|
use tokio::sync::broadcast::Receiver;
|
||||||
use tokio::{select, sync::broadcast};
|
use tokio::{select, sync::broadcast};
|
||||||
|
|
||||||
pub struct ZoneBackground {
|
pub struct ZoneBackground {
|
||||||
@ -23,15 +24,18 @@ pub struct ZoneBackground {
|
|||||||
child: Pid,
|
child: Pid,
|
||||||
_cgroup: Cgroup,
|
_cgroup: Cgroup,
|
||||||
wait: ChildWait,
|
wait: ChildWait,
|
||||||
|
child_receiver: Receiver<ChildEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ZoneBackground {
|
impl ZoneBackground {
|
||||||
pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
|
pub async fn new(idm: IdmInternalClient, cgroup: Cgroup, child: Pid) -> Result<ZoneBackground> {
|
||||||
|
let (wait, child_receiver) = ChildWait::new()?;
|
||||||
Ok(ZoneBackground {
|
Ok(ZoneBackground {
|
||||||
idm,
|
idm,
|
||||||
child,
|
child,
|
||||||
_cgroup: cgroup,
|
_cgroup: cgroup,
|
||||||
wait: ChildWait::new()?,
|
wait,
|
||||||
|
child_receiver,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,7 +43,6 @@ impl ZoneBackground {
|
|||||||
let mut event_subscription = self.idm.subscribe().await?;
|
let mut event_subscription = self.idm.subscribe().await?;
|
||||||
let mut requests_subscription = self.idm.requests().await?;
|
let mut requests_subscription = self.idm.requests().await?;
|
||||||
let mut request_streams_subscription = self.idm.request_streams().await?;
|
let mut request_streams_subscription = self.idm.request_streams().await?;
|
||||||
let mut wait_subscription = self.wait.subscribe().await?;
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
x = event_subscription.recv() => match x {
|
x = event_subscription.recv() => match x {
|
||||||
@ -86,7 +89,7 @@ impl ZoneBackground {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
event = wait_subscription.recv() => match event {
|
event = self.child_receiver.recv() => match event {
|
||||||
Ok(event) => self.child_event(event).await?,
|
Ok(event) => self.child_event(event).await?,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
break;
|
break;
|
||||||
|
@ -1,3 +1,9 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
|
||||||
|
use log::warn;
|
||||||
|
use nix::unistd::Pid;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
use std::{
|
use std::{
|
||||||
ptr::addr_of_mut,
|
ptr::addr_of_mut,
|
||||||
sync::{
|
sync::{
|
||||||
@ -6,11 +12,6 @@ use std::{
|
|||||||
},
|
},
|
||||||
thread::{self, JoinHandle},
|
thread::{self, JoinHandle},
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
|
|
||||||
use log::warn;
|
|
||||||
use nix::unistd::Pid;
|
|
||||||
use tokio::sync::broadcast::{channel, Receiver, Sender};
|
use tokio::sync::broadcast::{channel, Receiver, Sender};
|
||||||
|
|
||||||
const CHILD_WAIT_QUEUE_LEN: usize = 10;
|
const CHILD_WAIT_QUEUE_LEN: usize = 10;
|
||||||
@ -29,8 +30,8 @@ pub struct ChildWait {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ChildWait {
|
impl ChildWait {
|
||||||
pub fn new() -> Result<ChildWait> {
|
pub fn new() -> Result<(ChildWait, Receiver<ChildEvent>)> {
|
||||||
let (sender, _) = channel(CHILD_WAIT_QUEUE_LEN);
|
let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN);
|
||||||
let signal = Arc::new(AtomicBool::new(false));
|
let signal = Arc::new(AtomicBool::new(false));
|
||||||
let mut processor = ChildWaitTask {
|
let mut processor = ChildWaitTask {
|
||||||
sender: sender.clone(),
|
sender: sender.clone(),
|
||||||
@ -41,11 +42,14 @@ impl ChildWait {
|
|||||||
warn!("failed to process child updates: {}", error);
|
warn!("failed to process child updates: {}", error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(ChildWait {
|
Ok((
|
||||||
|
ChildWait {
|
||||||
sender,
|
sender,
|
||||||
signal,
|
signal,
|
||||||
_task: Arc::new(task),
|
_task: Arc::new(task),
|
||||||
})
|
},
|
||||||
|
receiver,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn subscribe(&self) -> Result<Receiver<ChildEvent>> {
|
pub async fn subscribe(&self) -> Result<Receiver<ChildEvent>> {
|
||||||
@ -63,7 +67,13 @@ impl ChildWaitTask {
|
|||||||
loop {
|
loop {
|
||||||
let mut status: c_int = 0;
|
let mut status: c_int = 0;
|
||||||
let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) };
|
let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) };
|
||||||
|
// pid being -1 indicates an error occurred, wait 100 microseconds to avoid
|
||||||
|
// overloading the channel. Right now we don't consider any other errors
|
||||||
|
// but that is fine for now, as waitpid shouldn't ever stop anyway.
|
||||||
|
if pid == -1 {
|
||||||
|
sleep(Duration::from_micros(100));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if WIFEXITED(status) {
|
if WIFEXITED(status) {
|
||||||
let event = ChildEvent {
|
let event = ChildEvent {
|
||||||
pid: Pid::from_raw(pid),
|
pid: Pid::from_raw(pid),
|
||||||
|
Reference in New Issue
Block a user