tokio/runtime/scheduler/multi_thread/park.rs
1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::{Duration, Instant};
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17 inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21 inner: Arc<Inner>,
22}
23
24/// Represents how a worker thread was parked
25#[derive(Copy, Clone, Eq, PartialEq)]
26pub(crate) enum HadDriver {
27 Yes,
28 No,
29}
30
31struct Inner {
32 /// Avoids entering the park if possible
33 state: AtomicUsize,
34
35 /// Used to coordinate access to the driver / `condvar`
36 mutex: Mutex<()>,
37
38 /// `Condvar` to block on if the driver is unavailable.
39 condvar: Condvar,
40
41 /// Resource (I/O, time, ...) driver
42 shared: Arc<Shared>,
43}
44
45const EMPTY: usize = 0;
46const PARKED_CONDVAR: usize = 1;
47const PARKED_DRIVER: usize = 2;
48const NOTIFIED: usize = 3;
49
50/// Shared across multiple Parker handles
51struct Shared {
52 /// Shared driver. Only one thread at a time can use this
53 driver: TryLock<Driver>,
54}
55
56impl Parker {
57 pub(crate) fn new(driver: Driver) -> Parker {
58 Parker {
59 inner: Arc::new(Inner {
60 state: AtomicUsize::new(EMPTY),
61 mutex: Mutex::new(()),
62 condvar: Condvar::new(),
63 shared: Arc::new(Shared {
64 driver: TryLock::new(driver),
65 }),
66 }),
67 }
68 }
69
70 pub(crate) fn unpark(&self) -> Unparker {
71 Unparker {
72 inner: self.inner.clone(),
73 }
74 }
75
76 pub(crate) fn park(&mut self, handle: &driver::Handle) -> HadDriver {
77 self.inner.park(handle)
78 }
79
80 /// Parks the current thread for up to `duration`.
81 ///
82 /// This function tries to acquire the driver lock. If it succeeds, it
83 /// parks using the driver. Otherwise, it fails back to using a condvar,
84 /// unless the duration is zero, in which case it returns immediately.
85 pub(crate) fn park_timeout(
86 &mut self,
87 handle: &driver::Handle,
88 duration: Duration,
89 ) -> HadDriver {
90 if let Some(mut driver) = self.inner.shared.driver.try_lock() {
91 self.inner.park_driver(&mut driver, handle, Some(duration))
92 } else if !duration.is_zero() {
93 self.inner.park_condvar(Some(duration));
94 HadDriver::No
95 } else {
96 // https://github.com/tokio-rs/tokio/issues/6536
97 // Hacky, but it's just for loom tests. The counter gets incremented during
98 // `park_timeout`, but we still have to increment the counter if we can't acquire the
99 // lock.
100 #[cfg(loom)]
101 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
102 HadDriver::No
103 }
104 }
105
106 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
107 self.inner.shutdown(handle);
108 }
109}
110
111impl Clone for Parker {
112 fn clone(&self) -> Parker {
113 Parker {
114 inner: Arc::new(Inner {
115 state: AtomicUsize::new(EMPTY),
116 mutex: Mutex::new(()),
117 condvar: Condvar::new(),
118 shared: self.inner.shared.clone(),
119 }),
120 }
121 }
122}
123
124impl Unparker {
125 pub(crate) fn unpark(&self, driver: &driver::Handle) {
126 self.inner.unpark(driver);
127 }
128}
129
130impl Inner {
131 /// Parks the current thread for at most `dur`.
132 fn park(&self, handle: &driver::Handle) -> HadDriver {
133 // If we were previously notified then we consume this notification and
134 // return quickly.
135 if self
136 .state
137 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
138 .is_ok()
139 {
140 return HadDriver::No;
141 }
142
143 if let Some(mut driver) = self.shared.driver.try_lock() {
144 self.park_driver(&mut driver, handle, None)
145 } else {
146 self.park_condvar(None);
147 HadDriver::No
148 }
149 }
150
151 /// Parks the current thread using a condvar for up to `duration`.
152 ///
153 /// If `duration` is `None`, parks indefinitely until notified.
154 ///
155 /// # Panics
156 ///
157 /// Panics if `duration` is `Some` and the duration is zero.
158 fn park_condvar(&self, duration: Option<Duration>) {
159 // Otherwise we need to coordinate going to sleep
160 let mut m = self.mutex.lock();
161
162 match self
163 .state
164 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
165 {
166 Ok(_) => {}
167 Err(NOTIFIED) => {
168 // We must read here, even though we know it will be `NOTIFIED`.
169 // This is because `unpark` may have been called again since we read
170 // `NOTIFIED` in the `compare_exchange` above. We must perform an
171 // acquire operation that synchronizes with that `unpark` to observe
172 // any writes it made before the call to unpark. To do that we must
173 // read from the write it made to `state`.
174 let old = self.state.swap(EMPTY, SeqCst);
175 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
176
177 return;
178 }
179 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
180 }
181
182 let timeout_at = duration.map(|d| {
183 Instant::now()
184 .checked_add(d)
185 // best effort to avoid overflow and still provide a usable timeout
186 .unwrap_or(Instant::now() + Duration::from_secs(1))
187 });
188
189 loop {
190 let is_timeout;
191 (m, is_timeout) = match timeout_at {
192 Some(timeout_at) => {
193 let dur = timeout_at.saturating_duration_since(Instant::now());
194 if !dur.is_zero() {
195 // Ideally, we would use `condvar.wait_timeout_until` here, but it is not available
196 // in `loom`. So we manually compute the timeout.
197 let (m, res) = self.condvar.wait_timeout(m, dur).unwrap();
198 (m, res.timed_out())
199 } else {
200 (m, true)
201 }
202 }
203 None => (self.condvar.wait(m).unwrap(), false),
204 };
205
206 if is_timeout {
207 match self.state.swap(EMPTY, SeqCst) {
208 PARKED_CONDVAR => return, // timed out, and no notification received
209 NOTIFIED => return, // notification and timeout happened concurrently
210 actual @ (PARKED_DRIVER | EMPTY) => {
211 panic!("inconsistent park_timeout state, actual = {actual}")
212 }
213 invalid => panic!("invalid park_timeout state, actual = {invalid}"),
214 }
215 } else if self
216 .state
217 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
218 .is_ok()
219 {
220 // got a notification
221 return;
222 }
223
224 // spurious wakeup, go back to sleep
225 }
226 }
227
228 fn park_driver(
229 &self,
230 driver: &mut Driver,
231 handle: &driver::Handle,
232 duration: Option<Duration>,
233 ) -> HadDriver {
234 if duration.as_ref().is_some_and(Duration::is_zero) {
235 // zero duration doesn't actually park the thread, it just
236 // polls the I/O events, timers, etc.
237 driver.park_timeout(handle, Duration::ZERO);
238 return HadDriver::Yes;
239 }
240
241 match self
242 .state
243 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
244 {
245 Ok(_) => {}
246 Err(NOTIFIED) => {
247 // We must read here, even though we know it will be `NOTIFIED`.
248 // This is because `unpark` may have been called again since we read
249 // `NOTIFIED` in the `compare_exchange` above. We must perform an
250 // acquire operation that synchronizes with that `unpark` to observe
251 // any writes it made before the call to unpark. To do that we must
252 // read from the write it made to `state`.
253 let old = self.state.swap(EMPTY, SeqCst);
254 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
255
256 return HadDriver::No;
257 }
258 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
259 }
260
261 if let Some(duration) = duration {
262 debug_assert_ne!(duration, Duration::ZERO);
263 driver.park_timeout(handle, duration);
264 } else {
265 driver.park(handle);
266 }
267
268 match self.state.swap(EMPTY, SeqCst) {
269 NOTIFIED => {} // got a notification, hurray!
270 PARKED_DRIVER => {} // no notification, alas
271 n => panic!("inconsistent park_timeout state: {n}"),
272 }
273
274 HadDriver::Yes
275 }
276
277 fn unpark(&self, driver: &driver::Handle) {
278 // To ensure the unparked thread will observe any writes we made before
279 // this call, we must perform a release operation that `park` can
280 // synchronize with. To do that we must write `NOTIFIED` even if `state`
281 // is already `NOTIFIED`. That is why this must be a swap rather than a
282 // compare-and-swap that returns if it reads `NOTIFIED` on failure.
283 match self.state.swap(NOTIFIED, SeqCst) {
284 EMPTY => {} // no one was waiting
285 NOTIFIED => {} // already unparked
286 PARKED_CONDVAR => self.unpark_condvar(),
287 PARKED_DRIVER => driver.unpark(),
288 actual => panic!("inconsistent state in unpark; actual = {actual}"),
289 }
290 }
291
292 fn unpark_condvar(&self) {
293 // There is a period between when the parked thread sets `state` to
294 // `PARKED` (or last checked `state` in the case of a spurious wake
295 // up) and when it actually waits on `cvar`. If we were to notify
296 // during this period it would be ignored and then when the parked
297 // thread went to sleep it would never wake up. Fortunately, it has
298 // `lock` locked at this stage so we can acquire `lock` to wait until
299 // it is ready to receive the notification.
300 //
301 // Releasing `lock` before the call to `notify_one` means that when the
302 // parked thread wakes it doesn't get woken only to have to wait for us
303 // to release `lock`.
304 drop(self.mutex.lock());
305
306 self.condvar.notify_one();
307 }
308
309 fn shutdown(&self, handle: &driver::Handle) {
310 if let Some(mut driver) = self.shared.driver.try_lock() {
311 driver.shutdown(handle);
312 }
313
314 self.condvar.notify_all();
315 }
316}