async_process/reaper/
wait.rs1use async_channel::{Receiver, Sender};
9use async_task::Runnable;
10use futures_lite::future;
11
12use std::io;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Mutex;
15use std::task::{Context, Poll};
16
17pub(crate) struct Reaper {
19 sender: Sender<Runnable>,
21
22 recv: Receiver<Runnable>,
24
25 zombies: AtomicUsize,
27}
28
29impl Reaper {
30 pub(crate) fn new() -> Self {
32 let (sender, recv) = async_channel::unbounded();
33 Self {
34 sender,
35 recv,
36 zombies: AtomicUsize::new(0),
37 }
38 }
39
40 pub(crate) async fn reap(&'static self) -> ! {
42 loop {
43 let task = match self.recv.recv().await {
45 Ok(task) => task,
46 Err(_) => panic!("sender should never be closed"),
47 };
48
49 task.run();
51 }
52 }
53
54 pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
56 Ok(ChildGuard {
57 inner: Some(WaitableChild::new(child)?),
58 })
59 }
60
61 pub(crate) async fn status(
63 &'static self,
64 child: &Mutex<crate::ChildGuard>,
65 ) -> io::Result<std::process::ExitStatus> {
66 future::poll_fn(|cx| {
67 let mut child = child.lock().unwrap();
69
70 #[allow(clippy::infallible_destructuring_match)] let inner = match &mut child.inner {
73 super::ChildGuard::Wait(inner) => inner,
74 #[cfg(not(windows))]
75 _ => unreachable!(),
76 };
77
78 inner.inner.as_mut().unwrap().poll_wait(cx)
80 })
81 .await
82 }
83
84 pub(crate) fn has_zombies(&'static self) -> bool {
86 self.zombies.load(Ordering::SeqCst) > 0
87 }
88}
89
90pub(crate) struct ChildGuard {
92 inner: Option<WaitableChild>,
93}
94
95impl ChildGuard {
96 pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
98 self.inner.as_mut().unwrap().get_mut()
99 }
100
101 pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
103 let future = {
105 let mut inner = self.inner.take().unwrap();
106 async move {
107 reaper.zombies.fetch_add(1, Ordering::Relaxed);
109
110 let _guard = crate::CallOnDrop(|| {
112 reaper.zombies.fetch_sub(1, Ordering::SeqCst);
113 });
114
115 let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
117 if let Err(_e) = result {
118 #[cfg(feature = "tracing")]
119 tracing::error!("error while polling zombie process: {}", _e);
120 }
121 }
122 };
123
124 let schedule = move |runnable| {
126 reaper.sender.try_send(runnable).ok();
127 };
128
129 let (runnable, task) = async_task::spawn(future, schedule);
131 task.detach();
132 runnable.schedule();
133 }
134}
135
136cfg_if::cfg_if! {
137 if #[cfg(target_os = "linux")] {
138 use async_io::Async;
139 use rustix::process;
140 use std::os::unix::io::OwnedFd;
141
142 struct WaitableChild {
144 child: std::process::Child,
145 handle: Async<OwnedFd>,
146 }
147
148 impl WaitableChild {
149 fn new(child: std::process::Child) -> io::Result<Self> {
150 let pidfd = process::pidfd_open(
151 process::Pid::from_child(&child),
152 process::PidfdFlags::empty()
153 )?;
154
155 Ok(Self {
156 child,
157 handle: Async::new(pidfd)?
158 })
159 }
160
161 fn get_mut(&mut self) -> &mut std::process::Child {
162 &mut self.child
163 }
164
165 fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
166 loop {
167 if let Some(status) = self.child.try_wait()? {
168 return Poll::Ready(Ok(status));
169 }
170
171 futures_lite::ready!(self.handle.poll_readable(cx))?;
173 }
174 }
175 }
176
177 pub(crate) fn available() -> bool {
179 let result = process::pidfd_open(
181 process::getpid(),
182 process::PidfdFlags::empty()
183 );
184
185 result.is_ok()
187 }
188 } else if #[cfg(windows)] {
189 use async_io::os::windows::Waitable;
190
191 struct WaitableChild {
193 inner: Waitable<std::process::Child>,
194 }
195
196 impl WaitableChild {
197 fn new(child: std::process::Child) -> io::Result<Self> {
198 Ok(Self {
199 inner: Waitable::new(child)?
200 })
201 }
202
203 fn get_mut(&mut self) -> &mut std::process::Child {
204 unsafe {
206 self.inner.get_mut()
207 }
208 }
209
210 fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
211 loop {
212 if let Some(status) = self.get_mut().try_wait()? {
213 return Poll::Ready(Ok(status));
214 }
215
216 futures_lite::ready!(self.inner.poll_ready(cx))?;
218 }
219 }
220 }
221
222 pub(crate) fn available() -> bool {
224 true
225 }
226 }
227}