async_process/reaper/
signal.rs1use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
7
8use std::io;
9use std::mem;
10use std::sync::Mutex;
11
12pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
13
14pub(crate) struct Reaper {
16 sigchld: Event,
18
19 zombies: Mutex<Vec<std::process::Child>>,
21
22 pipe: Pipe,
24
25 driver_guard: AsyncMutex<()>,
27}
28
29impl Reaper {
30 pub(crate) fn new() -> Self {
32 Reaper {
33 sigchld: Event::new(),
34 zombies: Mutex::new(Vec::new()),
35 pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36 driver_guard: AsyncMutex::new(()),
37 }
38 }
39
40 pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42 self.driver_guard.lock().await
43 }
44
45 pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47 loop {
48 self.pipe.wait().await;
50
51 self.sigchld.notify(usize::MAX);
53
54 let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56 let mut i = 0;
57 'reap_zombies: loop {
58 for _ in 0..50 {
59 if i >= zombies.len() {
60 break 'reap_zombies;
61 }
62
63 if let Ok(None) = zombies[i].try_wait() {
64 i += 1;
65 } else {
66 #[allow(clippy::zombie_processes)] zombies.swap_remove(i);
68 }
69 }
70
71 future::yield_now().await;
75 zombies.append(&mut self.zombies.lock().unwrap());
76 }
77
78 self.zombies.lock().unwrap().append(&mut zombies);
80 }
81 }
82
83 pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
85 self.pipe.register(&child)?;
86 Ok(ChildGuard { inner: Some(child) })
87 }
88
89 pub(crate) async fn status(
91 &'static self,
92 child: &Mutex<crate::ChildGuard>,
93 ) -> io::Result<std::process::ExitStatus> {
94 loop {
95 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
97 return Ok(status);
98 }
99
100 event_listener::listener!(self.sigchld => listener);
102
103 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
105 return Ok(status);
106 }
107
108 listener.await;
110 }
111 }
112
113 pub(crate) fn has_zombies(&'static self) -> bool {
115 !self
116 .zombies
117 .lock()
118 .unwrap_or_else(|x| x.into_inner())
119 .is_empty()
120 }
121}
122
123pub(crate) struct ChildGuard {
125 inner: Option<std::process::Child>,
126}
127
128impl ChildGuard {
129 pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
131 self.inner.as_mut().unwrap()
132 }
133
134 pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
136 if let Ok(None) = self.get_mut().try_wait() {
137 reaper
138 .zombies
139 .lock()
140 .unwrap()
141 .push(self.inner.take().unwrap());
142 }
143 }
144}
145
146struct Pipe {
148 signals: Signals,
150}
151
152impl Pipe {
153 fn new() -> io::Result<Pipe> {
155 Ok(Pipe {
156 signals: Signals::new(Some(Signal::Child))?,
157 })
158 }
159
160 async fn wait(&self) {
162 (&self.signals).next().await;
163 }
164
165 fn register(&self, _child: &std::process::Child) -> io::Result<()> {
167 Ok(())
168 }
169}