async_process/reaper/
wait.rs

1//! A version of the reaper that waits on some polling primitive.
2//!
3//! This uses:
4//!
5//! - pidfd on Linux
6//! - Waitable objects on Windows
7
8use async_channel::{Receiver, Sender};
9use async_task::Runnable;
10use futures_lite::future;
11
12use std::io;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Mutex;
15use std::task::{Context, Poll};
16
17/// The zombie process reaper.
18pub(crate) struct Reaper {
19    /// The channel for sending new runnables.
20    sender: Sender<Runnable>,
21
22    /// The channel for receiving new runnables.
23    recv: Receiver<Runnable>,
24
25    /// Number of zombie processes.
26    zombies: AtomicUsize,
27}
28
29impl Reaper {
30    /// Create a new reaper.
31    pub(crate) fn new() -> Self {
32        let (sender, recv) = async_channel::unbounded();
33        Self {
34            sender,
35            recv,
36            zombies: AtomicUsize::new(0),
37        }
38    }
39
40    /// Reap zombie processes forever.
41    pub(crate) async fn reap(&'static self) -> ! {
42        loop {
43            // Fetch the next task.
44            let task = match self.recv.recv().await {
45                Ok(task) => task,
46                Err(_) => panic!("sender should never be closed"),
47            };
48
49            // Poll the task.
50            task.run();
51        }
52    }
53
54    /// Register a child into this reaper.
55    pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
56        Ok(ChildGuard {
57            inner: Some(WaitableChild::new(child)?),
58        })
59    }
60
61    /// Wait for a child to complete.
62    pub(crate) async fn status(
63        &'static self,
64        child: &Mutex<crate::ChildGuard>,
65    ) -> io::Result<std::process::ExitStatus> {
66        future::poll_fn(|cx| {
67            // Lock the child.
68            let mut child = child.lock().unwrap();
69
70            // Get the inner child value.
71            #[allow(clippy::infallible_destructuring_match)] // false positive: should respect cfg
72            let inner = match &mut child.inner {
73                super::ChildGuard::Wait(inner) => inner,
74                #[cfg(not(windows))]
75                _ => unreachable!(),
76            };
77
78            // Poll for the next value.
79            inner.inner.as_mut().unwrap().poll_wait(cx)
80        })
81        .await
82    }
83
84    /// Do we have any registered zombie processes?
85    pub(crate) fn has_zombies(&'static self) -> bool {
86        self.zombies.load(Ordering::SeqCst) > 0
87    }
88}
89
90/// The wrapper around the child.
91pub(crate) struct ChildGuard {
92    inner: Option<WaitableChild>,
93}
94
95impl ChildGuard {
96    /// Get a mutable reference to the inner child.
97    pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
98        self.inner.as_mut().unwrap().get_mut()
99    }
100
101    /// Begin the reaping process for this child.
102    pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
103        // Create a future for polling this child.
104        let future = {
105            let mut inner = self.inner.take().unwrap();
106            async move {
107                // Increment the zombie count.
108                reaper.zombies.fetch_add(1, Ordering::Relaxed);
109
110                // Decrement the zombie count once we are done.
111                let _guard = crate::CallOnDrop(|| {
112                    reaper.zombies.fetch_sub(1, Ordering::SeqCst);
113                });
114
115                // Wait on this child forever.
116                let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
117                if let Err(_e) = result {
118                    #[cfg(feature = "tracing")]
119                    tracing::error!("error while polling zombie process: {}", _e);
120                }
121            }
122        };
123
124        // Create a function for scheduling this future.
125        let schedule = move |runnable| {
126            reaper.sender.try_send(runnable).ok();
127        };
128
129        // Spawn the task and run it forever.
130        let (runnable, task) = async_task::spawn(future, schedule);
131        task.detach();
132        runnable.schedule();
133    }
134}
135
136cfg_if::cfg_if! {
137    if #[cfg(target_os = "linux")] {
138        use async_io::Async;
139        use rustix::process;
140        use std::os::unix::io::OwnedFd;
141
142        /// Waitable version of `std::process::Child`
143        struct WaitableChild {
144            child: std::process::Child,
145            handle: Async<OwnedFd>,
146        }
147
148        impl WaitableChild {
149            fn new(child: std::process::Child) -> io::Result<Self> {
150                let pidfd = process::pidfd_open(
151                    process::Pid::from_child(&child),
152                    process::PidfdFlags::empty()
153                )?;
154
155                Ok(Self {
156                    child,
157                    handle: Async::new(pidfd)?
158                })
159            }
160
161            fn get_mut(&mut self) -> &mut std::process::Child {
162                &mut self.child
163            }
164
165            fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
166                loop {
167                    if let Some(status) = self.child.try_wait()? {
168                        return Poll::Ready(Ok(status));
169                    }
170
171                    // Wait for us to become readable.
172                    futures_lite::ready!(self.handle.poll_readable(cx))?;
173                }
174            }
175        }
176
177        /// Tell if we are able to use this backend.
178        pub(crate) fn available() -> bool {
179            // Create a Pidfd for the current process and see if it works.
180            let result = process::pidfd_open(
181                process::getpid(),
182                process::PidfdFlags::empty()
183            );
184
185            // Tell if it was okay or not.
186            result.is_ok()
187        }
188    } else if #[cfg(windows)] {
189        use async_io::os::windows::Waitable;
190
191        /// Waitable version of `std::process::Child`.
192        struct WaitableChild {
193            inner: Waitable<std::process::Child>,
194        }
195
196        impl WaitableChild {
197            fn new(child: std::process::Child) -> io::Result<Self> {
198                Ok(Self {
199                    inner: Waitable::new(child)?
200                })
201            }
202
203            fn get_mut(&mut self) -> &mut std::process::Child {
204                // SAFETY: We never move the child out.
205                unsafe {
206                    self.inner.get_mut()
207                }
208            }
209
210            fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
211                loop {
212                    if let Some(status) = self.get_mut().try_wait()? {
213                        return Poll::Ready(Ok(status));
214                    }
215
216                    // Wait for us to become readable.
217                    futures_lite::ready!(self.inner.poll_ready(cx))?;
218                }
219            }
220        }
221
222        /// Tell if we are able to use this backend.
223        pub(crate) fn available() -> bool {
224            true
225        }
226    }
227}