async_process/reaper/
signal.rs

1//! A version of the reaper that waits for a signal to check for process progress.
2
3use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
7
8use std::io;
9use std::mem;
10use std::sync::Mutex;
11
12pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
13
14/// The zombie process reaper.
15pub(crate) struct Reaper {
16    /// An event delivered every time the SIGCHLD signal occurs.
17    sigchld: Event,
18
19    /// The list of zombie processes.
20    zombies: Mutex<Vec<std::process::Child>>,
21
22    /// The pipe that delivers signal notifications.
23    pipe: Pipe,
24
25    /// Locking this mutex indicates that we are polling the SIGCHLD event.
26    driver_guard: AsyncMutex<()>,
27}
28
29impl Reaper {
30    /// Create a new reaper.
31    pub(crate) fn new() -> Self {
32        Reaper {
33            sigchld: Event::new(),
34            zombies: Mutex::new(Vec::new()),
35            pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36            driver_guard: AsyncMutex::new(()),
37        }
38    }
39
40    /// Lock the driver thread.
41    pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42        self.driver_guard.lock().await
43    }
44
45    /// Reap zombie processes forever.
46    pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47        loop {
48            // Wait for the next SIGCHLD signal.
49            self.pipe.wait().await;
50
51            // Notify all listeners waiting on the SIGCHLD event.
52            self.sigchld.notify(usize::MAX);
53
54            // Reap zombie processes, but make sure we don't hold onto the lock for too long!
55            let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56            let mut i = 0;
57            'reap_zombies: loop {
58                for _ in 0..50 {
59                    if i >= zombies.len() {
60                        break 'reap_zombies;
61                    }
62
63                    if let Ok(None) = zombies[i].try_wait() {
64                        i += 1;
65                    } else {
66                        #[allow(clippy::zombie_processes)] // removed only when process done or errored
67                        zombies.swap_remove(i);
68                    }
69                }
70
71                // Be a good citizen; yield if there are a lot of processes.
72                //
73                // After we yield, check if there are more zombie processes.
74                future::yield_now().await;
75                zombies.append(&mut self.zombies.lock().unwrap());
76            }
77
78            // Put zombie processes back.
79            self.zombies.lock().unwrap().append(&mut zombies);
80        }
81    }
82
83    /// Register a process with this reaper.
84    pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
85        self.pipe.register(&child)?;
86        Ok(ChildGuard { inner: Some(child) })
87    }
88
89    /// Wait for an event to occur for a child process.
90    pub(crate) async fn status(
91        &'static self,
92        child: &Mutex<crate::ChildGuard>,
93    ) -> io::Result<std::process::ExitStatus> {
94        loop {
95            // Wait on the child process.
96            if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
97                return Ok(status);
98            }
99
100            // Start listening.
101            event_listener::listener!(self.sigchld => listener);
102
103            // Try again.
104            if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
105                return Ok(status);
106            }
107
108            // Wait on the listener.
109            listener.await;
110        }
111    }
112
113    /// Do we have any registered zombie processes?
114    pub(crate) fn has_zombies(&'static self) -> bool {
115        !self
116            .zombies
117            .lock()
118            .unwrap_or_else(|x| x.into_inner())
119            .is_empty()
120    }
121}
122
123/// The wrapper around the child.
124pub(crate) struct ChildGuard {
125    inner: Option<std::process::Child>,
126}
127
128impl ChildGuard {
129    /// Get a mutable reference to the inner child.
130    pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
131        self.inner.as_mut().unwrap()
132    }
133
134    /// Begin the reaping process for this child.
135    pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
136        if let Ok(None) = self.get_mut().try_wait() {
137            reaper
138                .zombies
139                .lock()
140                .unwrap()
141                .push(self.inner.take().unwrap());
142        }
143    }
144}
145
146/// Waits for the next SIGCHLD signal.
147struct Pipe {
148    /// The iterator over SIGCHLD signals.
149    signals: Signals,
150}
151
152impl Pipe {
153    /// Creates a new pipe.
154    fn new() -> io::Result<Pipe> {
155        Ok(Pipe {
156            signals: Signals::new(Some(Signal::Child))?,
157        })
158    }
159
160    /// Waits for the next SIGCHLD signal.
161    async fn wait(&self) {
162        (&self.signals).next().await;
163    }
164
165    /// Register a process object into this pipe.
166    fn register(&self, _child: &std::process::Child) -> io::Result<()> {
167        Ok(())
168    }
169}