tokio/runtime/scheduler/multi_thread/park.rs
1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::{Duration, Instant};
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17 inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21 inner: Arc<Inner>,
22}
23
24struct Inner {
25 /// Avoids entering the park if possible
26 state: AtomicUsize,
27
28 /// Used to coordinate access to the driver / `condvar`
29 mutex: Mutex<()>,
30
31 /// `Condvar` to block on if the driver is unavailable.
32 condvar: Condvar,
33
34 /// Resource (I/O, time, ...) driver
35 shared: Arc<Shared>,
36}
37
38const EMPTY: usize = 0;
39const PARKED_CONDVAR: usize = 1;
40const PARKED_DRIVER: usize = 2;
41const NOTIFIED: usize = 3;
42
43/// Shared across multiple Parker handles
44struct Shared {
45 /// Shared driver. Only one thread at a time can use this
46 driver: TryLock<Driver>,
47}
48
49impl Parker {
50 pub(crate) fn new(driver: Driver) -> Parker {
51 Parker {
52 inner: Arc::new(Inner {
53 state: AtomicUsize::new(EMPTY),
54 mutex: Mutex::new(()),
55 condvar: Condvar::new(),
56 shared: Arc::new(Shared {
57 driver: TryLock::new(driver),
58 }),
59 }),
60 }
61 }
62
63 pub(crate) fn unpark(&self) -> Unparker {
64 Unparker {
65 inner: self.inner.clone(),
66 }
67 }
68
69 pub(crate) fn park(&mut self, handle: &driver::Handle) {
70 self.inner.park(handle);
71 }
72
73 /// Parks the current thread for up to `duration`.
74 ///
75 /// This function tries to acquire the driver lock. If it succeeds, it
76 /// parks using the driver. Otherwise, it fails back to using a condvar,
77 /// unless the duration is zero, in which case it returns immediately.
78 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
79 if let Some(mut driver) = self.inner.shared.driver.try_lock() {
80 self.inner.park_driver(&mut driver, handle, Some(duration));
81 } else if !duration.is_zero() {
82 self.inner.park_condvar(Some(duration));
83 } else {
84 // https://github.com/tokio-rs/tokio/issues/6536
85 // Hacky, but it's just for loom tests. The counter gets incremented during
86 // `park_timeout`, but we still have to increment the counter if we can't acquire the
87 // lock.
88 #[cfg(loom)]
89 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
90 }
91 }
92
93 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
94 self.inner.shutdown(handle);
95 }
96}
97
98impl Clone for Parker {
99 fn clone(&self) -> Parker {
100 Parker {
101 inner: Arc::new(Inner {
102 state: AtomicUsize::new(EMPTY),
103 mutex: Mutex::new(()),
104 condvar: Condvar::new(),
105 shared: self.inner.shared.clone(),
106 }),
107 }
108 }
109}
110
111impl Unparker {
112 pub(crate) fn unpark(&self, driver: &driver::Handle) {
113 self.inner.unpark(driver);
114 }
115}
116
117impl Inner {
118 /// Parks the current thread for at most `dur`.
119 fn park(&self, handle: &driver::Handle) {
120 // If we were previously notified then we consume this notification and
121 // return quickly.
122 if self
123 .state
124 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
125 .is_ok()
126 {
127 return;
128 }
129
130 if let Some(mut driver) = self.shared.driver.try_lock() {
131 self.park_driver(&mut driver, handle, None);
132 } else {
133 self.park_condvar(None);
134 }
135 }
136
137 /// Parks the current thread using a condvar for up to `duration`.
138 ///
139 /// If `duration` is `None`, parks indefinitely until notified.
140 ///
141 /// # Panics
142 ///
143 /// Panics if `duration` is `Some` and the duration is zero.
144 fn park_condvar(&self, duration: Option<Duration>) {
145 // Otherwise we need to coordinate going to sleep
146 let mut m = self.mutex.lock();
147
148 match self
149 .state
150 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
151 {
152 Ok(_) => {}
153 Err(NOTIFIED) => {
154 // We must read here, even though we know it will be `NOTIFIED`.
155 // This is because `unpark` may have been called again since we read
156 // `NOTIFIED` in the `compare_exchange` above. We must perform an
157 // acquire operation that synchronizes with that `unpark` to observe
158 // any writes it made before the call to unpark. To do that we must
159 // read from the write it made to `state`.
160 let old = self.state.swap(EMPTY, SeqCst);
161 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
162
163 return;
164 }
165 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
166 }
167
168 let timeout_at = duration.map(|d| {
169 Instant::now()
170 .checked_add(d)
171 // best effort to avoid overflow and still provide a usable timeout
172 .unwrap_or(Instant::now() + Duration::from_secs(1))
173 });
174
175 loop {
176 let is_timeout;
177 (m, is_timeout) = match timeout_at {
178 Some(timeout_at) => {
179 let dur = timeout_at.saturating_duration_since(Instant::now());
180 if !dur.is_zero() {
181 // Ideally, we would use `condvar.wait_timeout_until` here, but it is not available
182 // in `loom`. So we manually compute the timeout.
183 let (m, res) = self.condvar.wait_timeout(m, dur).unwrap();
184 (m, res.timed_out())
185 } else {
186 (m, true)
187 }
188 }
189 None => (self.condvar.wait(m).unwrap(), false),
190 };
191
192 if is_timeout {
193 match self.state.swap(EMPTY, SeqCst) {
194 PARKED_CONDVAR => return, // timed out, and no notification received
195 NOTIFIED => return, // notification and timeout happened concurrently
196 actual @ (PARKED_DRIVER | EMPTY) => {
197 panic!("inconsistent park_timeout state, actual = {actual}")
198 }
199 invalid => panic!("invalid park_timeout state, actual = {invalid}"),
200 }
201 } else if self
202 .state
203 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
204 .is_ok()
205 {
206 // got a notification
207 return;
208 }
209
210 // spurious wakeup, go back to sleep
211 }
212 }
213
214 fn park_driver(
215 &self,
216 driver: &mut Driver,
217 handle: &driver::Handle,
218 duration: Option<Duration>,
219 ) {
220 if duration.as_ref().is_some_and(Duration::is_zero) {
221 // zero duration doesn't actually park the thread, it just
222 // polls the I/O events, timers, etc.
223 driver.park_timeout(handle, Duration::ZERO);
224 return;
225 }
226
227 match self
228 .state
229 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
230 {
231 Ok(_) => {}
232 Err(NOTIFIED) => {
233 // We must read here, even though we know it will be `NOTIFIED`.
234 // This is because `unpark` may have been called again since we read
235 // `NOTIFIED` in the `compare_exchange` above. We must perform an
236 // acquire operation that synchronizes with that `unpark` to observe
237 // any writes it made before the call to unpark. To do that we must
238 // read from the write it made to `state`.
239 let old = self.state.swap(EMPTY, SeqCst);
240 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
241
242 return;
243 }
244 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
245 }
246
247 if let Some(duration) = duration {
248 debug_assert_ne!(duration, Duration::ZERO);
249 driver.park_timeout(handle, duration);
250 } else {
251 driver.park(handle);
252 }
253
254 match self.state.swap(EMPTY, SeqCst) {
255 NOTIFIED => {} // got a notification, hurray!
256 PARKED_DRIVER => {} // no notification, alas
257 n => panic!("inconsistent park_timeout state: {n}"),
258 }
259 }
260
261 fn unpark(&self, driver: &driver::Handle) {
262 // To ensure the unparked thread will observe any writes we made before
263 // this call, we must perform a release operation that `park` can
264 // synchronize with. To do that we must write `NOTIFIED` even if `state`
265 // is already `NOTIFIED`. That is why this must be a swap rather than a
266 // compare-and-swap that returns if it reads `NOTIFIED` on failure.
267 match self.state.swap(NOTIFIED, SeqCst) {
268 EMPTY => {} // no one was waiting
269 NOTIFIED => {} // already unparked
270 PARKED_CONDVAR => self.unpark_condvar(),
271 PARKED_DRIVER => driver.unpark(),
272 actual => panic!("inconsistent state in unpark; actual = {actual}"),
273 }
274 }
275
276 fn unpark_condvar(&self) {
277 // There is a period between when the parked thread sets `state` to
278 // `PARKED` (or last checked `state` in the case of a spurious wake
279 // up) and when it actually waits on `cvar`. If we were to notify
280 // during this period it would be ignored and then when the parked
281 // thread went to sleep it would never wake up. Fortunately, it has
282 // `lock` locked at this stage so we can acquire `lock` to wait until
283 // it is ready to receive the notification.
284 //
285 // Releasing `lock` before the call to `notify_one` means that when the
286 // parked thread wakes it doesn't get woken only to have to wait for us
287 // to release `lock`.
288 drop(self.mutex.lock());
289
290 self.condvar.notify_one();
291 }
292
293 fn shutdown(&self, handle: &driver::Handle) {
294 if let Some(mut driver) = self.shared.driver.try_lock() {
295 driver.shutdown(handle);
296 }
297
298 self.condvar.notify_all();
299 }
300}