tokio/runtime/scheduler/multi_thread/
park.rs

1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::{Duration, Instant};
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17    inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21    inner: Arc<Inner>,
22}
23
24/// Represents how a worker thread was parked
25#[derive(Copy, Clone, Eq, PartialEq)]
26pub(crate) enum HadDriver {
27    Yes,
28    No,
29}
30
31struct Inner {
32    /// Avoids entering the park if possible
33    state: AtomicUsize,
34
35    /// Used to coordinate access to the driver / `condvar`
36    mutex: Mutex<()>,
37
38    /// `Condvar` to block on if the driver is unavailable.
39    condvar: Condvar,
40
41    /// Resource (I/O, time, ...) driver
42    shared: Arc<Shared>,
43}
44
45const EMPTY: usize = 0;
46const PARKED_CONDVAR: usize = 1;
47const PARKED_DRIVER: usize = 2;
48const NOTIFIED: usize = 3;
49
50/// Shared across multiple Parker handles
51struct Shared {
52    /// Shared driver. Only one thread at a time can use this
53    driver: TryLock<Driver>,
54}
55
56impl Parker {
57    pub(crate) fn new(driver: Driver) -> Parker {
58        Parker {
59            inner: Arc::new(Inner {
60                state: AtomicUsize::new(EMPTY),
61                mutex: Mutex::new(()),
62                condvar: Condvar::new(),
63                shared: Arc::new(Shared {
64                    driver: TryLock::new(driver),
65                }),
66            }),
67        }
68    }
69
70    pub(crate) fn unpark(&self) -> Unparker {
71        Unparker {
72            inner: self.inner.clone(),
73        }
74    }
75
76    pub(crate) fn park(&mut self, handle: &driver::Handle) -> HadDriver {
77        self.inner.park(handle)
78    }
79
80    /// Parks the current thread for up to `duration`.
81    ///
82    /// This function tries to acquire the driver lock. If it succeeds, it
83    /// parks using the driver. Otherwise, it fails back to using a condvar,
84    /// unless the duration is zero, in which case it returns immediately.
85    pub(crate) fn park_timeout(
86        &mut self,
87        handle: &driver::Handle,
88        duration: Duration,
89    ) -> HadDriver {
90        if let Some(mut driver) = self.inner.shared.driver.try_lock() {
91            self.inner.park_driver(&mut driver, handle, Some(duration))
92        } else if !duration.is_zero() {
93            self.inner.park_condvar(Some(duration));
94            HadDriver::No
95        } else {
96            // https://github.com/tokio-rs/tokio/issues/6536
97            // Hacky, but it's just for loom tests. The counter gets incremented during
98            // `park_timeout`, but we still have to increment the counter if we can't acquire the
99            // lock.
100            #[cfg(loom)]
101            CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
102            HadDriver::No
103        }
104    }
105
106    pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
107        self.inner.shutdown(handle);
108    }
109}
110
111impl Clone for Parker {
112    fn clone(&self) -> Parker {
113        Parker {
114            inner: Arc::new(Inner {
115                state: AtomicUsize::new(EMPTY),
116                mutex: Mutex::new(()),
117                condvar: Condvar::new(),
118                shared: self.inner.shared.clone(),
119            }),
120        }
121    }
122}
123
124impl Unparker {
125    pub(crate) fn unpark(&self, driver: &driver::Handle) {
126        self.inner.unpark(driver);
127    }
128}
129
130impl Inner {
131    /// Parks the current thread for at most `dur`.
132    fn park(&self, handle: &driver::Handle) -> HadDriver {
133        // If we were previously notified then we consume this notification and
134        // return quickly.
135        if self
136            .state
137            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
138            .is_ok()
139        {
140            return HadDriver::No;
141        }
142
143        if let Some(mut driver) = self.shared.driver.try_lock() {
144            self.park_driver(&mut driver, handle, None)
145        } else {
146            self.park_condvar(None);
147            HadDriver::No
148        }
149    }
150
151    /// Parks the current thread using a condvar for up to `duration`.
152    ///
153    /// If `duration` is `None`, parks indefinitely until notified.
154    ///
155    /// # Panics
156    ///
157    /// Panics if `duration` is `Some` and the duration is zero.
158    fn park_condvar(&self, duration: Option<Duration>) {
159        // Otherwise we need to coordinate going to sleep
160        let mut m = self.mutex.lock();
161
162        match self
163            .state
164            .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
165        {
166            Ok(_) => {}
167            Err(NOTIFIED) => {
168                // We must read here, even though we know it will be `NOTIFIED`.
169                // This is because `unpark` may have been called again since we read
170                // `NOTIFIED` in the `compare_exchange` above. We must perform an
171                // acquire operation that synchronizes with that `unpark` to observe
172                // any writes it made before the call to unpark. To do that we must
173                // read from the write it made to `state`.
174                let old = self.state.swap(EMPTY, SeqCst);
175                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
176
177                return;
178            }
179            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
180        }
181
182        let timeout_at = duration.map(|d| {
183            Instant::now()
184                .checked_add(d)
185                // best effort to avoid overflow and still provide a usable timeout
186                .unwrap_or(Instant::now() + Duration::from_secs(1))
187        });
188
189        loop {
190            let is_timeout;
191            (m, is_timeout) = match timeout_at {
192                Some(timeout_at) => {
193                    let dur = timeout_at.saturating_duration_since(Instant::now());
194                    if !dur.is_zero() {
195                        // Ideally, we would use `condvar.wait_timeout_until` here, but it is not available
196                        // in `loom`. So we manually compute the timeout.
197                        let (m, res) = self.condvar.wait_timeout(m, dur).unwrap();
198                        (m, res.timed_out())
199                    } else {
200                        (m, true)
201                    }
202                }
203                None => (self.condvar.wait(m).unwrap(), false),
204            };
205
206            if is_timeout {
207                match self.state.swap(EMPTY, SeqCst) {
208                    PARKED_CONDVAR => return, // timed out, and no notification received
209                    NOTIFIED => return,       // notification and timeout happened concurrently
210                    actual @ (PARKED_DRIVER | EMPTY) => {
211                        panic!("inconsistent park_timeout state, actual = {actual}")
212                    }
213                    invalid => panic!("invalid park_timeout state, actual = {invalid}"),
214                }
215            } else if self
216                .state
217                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
218                .is_ok()
219            {
220                // got a notification
221                return;
222            }
223
224            // spurious wakeup, go back to sleep
225        }
226    }
227
228    fn park_driver(
229        &self,
230        driver: &mut Driver,
231        handle: &driver::Handle,
232        duration: Option<Duration>,
233    ) -> HadDriver {
234        if duration.as_ref().is_some_and(Duration::is_zero) {
235            // zero duration doesn't actually park the thread, it just
236            // polls the I/O events, timers, etc.
237            driver.park_timeout(handle, Duration::ZERO);
238            return HadDriver::Yes;
239        }
240
241        match self
242            .state
243            .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
244        {
245            Ok(_) => {}
246            Err(NOTIFIED) => {
247                // We must read here, even though we know it will be `NOTIFIED`.
248                // This is because `unpark` may have been called again since we read
249                // `NOTIFIED` in the `compare_exchange` above. We must perform an
250                // acquire operation that synchronizes with that `unpark` to observe
251                // any writes it made before the call to unpark. To do that we must
252                // read from the write it made to `state`.
253                let old = self.state.swap(EMPTY, SeqCst);
254                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
255
256                return HadDriver::No;
257            }
258            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
259        }
260
261        if let Some(duration) = duration {
262            debug_assert_ne!(duration, Duration::ZERO);
263            driver.park_timeout(handle, duration);
264        } else {
265            driver.park(handle);
266        }
267
268        match self.state.swap(EMPTY, SeqCst) {
269            NOTIFIED => {}      // got a notification, hurray!
270            PARKED_DRIVER => {} // no notification, alas
271            n => panic!("inconsistent park_timeout state: {n}"),
272        }
273
274        HadDriver::Yes
275    }
276
277    fn unpark(&self, driver: &driver::Handle) {
278        // To ensure the unparked thread will observe any writes we made before
279        // this call, we must perform a release operation that `park` can
280        // synchronize with. To do that we must write `NOTIFIED` even if `state`
281        // is already `NOTIFIED`. That is why this must be a swap rather than a
282        // compare-and-swap that returns if it reads `NOTIFIED` on failure.
283        match self.state.swap(NOTIFIED, SeqCst) {
284            EMPTY => {}    // no one was waiting
285            NOTIFIED => {} // already unparked
286            PARKED_CONDVAR => self.unpark_condvar(),
287            PARKED_DRIVER => driver.unpark(),
288            actual => panic!("inconsistent state in unpark; actual = {actual}"),
289        }
290    }
291
292    fn unpark_condvar(&self) {
293        // There is a period between when the parked thread sets `state` to
294        // `PARKED` (or last checked `state` in the case of a spurious wake
295        // up) and when it actually waits on `cvar`. If we were to notify
296        // during this period it would be ignored and then when the parked
297        // thread went to sleep it would never wake up. Fortunately, it has
298        // `lock` locked at this stage so we can acquire `lock` to wait until
299        // it is ready to receive the notification.
300        //
301        // Releasing `lock` before the call to `notify_one` means that when the
302        // parked thread wakes it doesn't get woken only to have to wait for us
303        // to release `lock`.
304        drop(self.mutex.lock());
305
306        self.condvar.notify_one();
307    }
308
309    fn shutdown(&self, handle: &driver::Handle) {
310        if let Some(mut driver) = self.shared.driver.try_lock() {
311            driver.shutdown(handle);
312        }
313
314        self.condvar.notify_all();
315    }
316}