tokio/runtime/scheduler/multi_thread/
park.rs

1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::{Duration, Instant};
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17    inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21    inner: Arc<Inner>,
22}
23
24struct Inner {
25    /// Avoids entering the park if possible
26    state: AtomicUsize,
27
28    /// Used to coordinate access to the driver / `condvar`
29    mutex: Mutex<()>,
30
31    /// `Condvar` to block on if the driver is unavailable.
32    condvar: Condvar,
33
34    /// Resource (I/O, time, ...) driver
35    shared: Arc<Shared>,
36}
37
38const EMPTY: usize = 0;
39const PARKED_CONDVAR: usize = 1;
40const PARKED_DRIVER: usize = 2;
41const NOTIFIED: usize = 3;
42
43/// Shared across multiple Parker handles
44struct Shared {
45    /// Shared driver. Only one thread at a time can use this
46    driver: TryLock<Driver>,
47}
48
49impl Parker {
50    pub(crate) fn new(driver: Driver) -> Parker {
51        Parker {
52            inner: Arc::new(Inner {
53                state: AtomicUsize::new(EMPTY),
54                mutex: Mutex::new(()),
55                condvar: Condvar::new(),
56                shared: Arc::new(Shared {
57                    driver: TryLock::new(driver),
58                }),
59            }),
60        }
61    }
62
63    pub(crate) fn unpark(&self) -> Unparker {
64        Unparker {
65            inner: self.inner.clone(),
66        }
67    }
68
69    pub(crate) fn park(&mut self, handle: &driver::Handle) {
70        self.inner.park(handle);
71    }
72
73    /// Parks the current thread for up to `duration`.
74    ///
75    /// This function tries to acquire the driver lock. If it succeeds, it
76    /// parks using the driver. Otherwise, it fails back to using a condvar,
77    /// unless the duration is zero, in which case it returns immediately.
78    pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
79        if let Some(mut driver) = self.inner.shared.driver.try_lock() {
80            self.inner.park_driver(&mut driver, handle, Some(duration));
81        } else if !duration.is_zero() {
82            self.inner.park_condvar(Some(duration));
83        } else {
84            // https://github.com/tokio-rs/tokio/issues/6536
85            // Hacky, but it's just for loom tests. The counter gets incremented during
86            // `park_timeout`, but we still have to increment the counter if we can't acquire the
87            // lock.
88            #[cfg(loom)]
89            CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
90        }
91    }
92
93    pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
94        self.inner.shutdown(handle);
95    }
96}
97
98impl Clone for Parker {
99    fn clone(&self) -> Parker {
100        Parker {
101            inner: Arc::new(Inner {
102                state: AtomicUsize::new(EMPTY),
103                mutex: Mutex::new(()),
104                condvar: Condvar::new(),
105                shared: self.inner.shared.clone(),
106            }),
107        }
108    }
109}
110
111impl Unparker {
112    pub(crate) fn unpark(&self, driver: &driver::Handle) {
113        self.inner.unpark(driver);
114    }
115}
116
117impl Inner {
118    /// Parks the current thread for at most `dur`.
119    fn park(&self, handle: &driver::Handle) {
120        // If we were previously notified then we consume this notification and
121        // return quickly.
122        if self
123            .state
124            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
125            .is_ok()
126        {
127            return;
128        }
129
130        if let Some(mut driver) = self.shared.driver.try_lock() {
131            self.park_driver(&mut driver, handle, None);
132        } else {
133            self.park_condvar(None);
134        }
135    }
136
137    /// Parks the current thread using a condvar for up to `duration`.
138    ///
139    /// If `duration` is `None`, parks indefinitely until notified.
140    ///
141    /// # Panics
142    ///
143    /// Panics if `duration` is `Some` and the duration is zero.
144    fn park_condvar(&self, duration: Option<Duration>) {
145        // Otherwise we need to coordinate going to sleep
146        let mut m = self.mutex.lock();
147
148        match self
149            .state
150            .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
151        {
152            Ok(_) => {}
153            Err(NOTIFIED) => {
154                // We must read here, even though we know it will be `NOTIFIED`.
155                // This is because `unpark` may have been called again since we read
156                // `NOTIFIED` in the `compare_exchange` above. We must perform an
157                // acquire operation that synchronizes with that `unpark` to observe
158                // any writes it made before the call to unpark. To do that we must
159                // read from the write it made to `state`.
160                let old = self.state.swap(EMPTY, SeqCst);
161                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
162
163                return;
164            }
165            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
166        }
167
168        let timeout_at = duration.map(|d| {
169            Instant::now()
170                .checked_add(d)
171                // best effort to avoid overflow and still provide a usable timeout
172                .unwrap_or(Instant::now() + Duration::from_secs(1))
173        });
174
175        loop {
176            let is_timeout;
177            (m, is_timeout) = match timeout_at {
178                Some(timeout_at) => {
179                    let dur = timeout_at.saturating_duration_since(Instant::now());
180                    if !dur.is_zero() {
181                        // Ideally, we would use `condvar.wait_timeout_until` here, but it is not available
182                        // in `loom`. So we manually compute the timeout.
183                        let (m, res) = self.condvar.wait_timeout(m, dur).unwrap();
184                        (m, res.timed_out())
185                    } else {
186                        (m, true)
187                    }
188                }
189                None => (self.condvar.wait(m).unwrap(), false),
190            };
191
192            if is_timeout {
193                match self.state.swap(EMPTY, SeqCst) {
194                    PARKED_CONDVAR => return, // timed out, and no notification received
195                    NOTIFIED => return,       // notification and timeout happened concurrently
196                    actual @ (PARKED_DRIVER | EMPTY) => {
197                        panic!("inconsistent park_timeout state, actual = {actual}")
198                    }
199                    invalid => panic!("invalid park_timeout state, actual = {invalid}"),
200                }
201            } else if self
202                .state
203                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
204                .is_ok()
205            {
206                // got a notification
207                return;
208            }
209
210            // spurious wakeup, go back to sleep
211        }
212    }
213
214    fn park_driver(
215        &self,
216        driver: &mut Driver,
217        handle: &driver::Handle,
218        duration: Option<Duration>,
219    ) {
220        if duration.as_ref().is_some_and(Duration::is_zero) {
221            // zero duration doesn't actually park the thread, it just
222            // polls the I/O events, timers, etc.
223            driver.park_timeout(handle, Duration::ZERO);
224            return;
225        }
226
227        match self
228            .state
229            .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
230        {
231            Ok(_) => {}
232            Err(NOTIFIED) => {
233                // We must read here, even though we know it will be `NOTIFIED`.
234                // This is because `unpark` may have been called again since we read
235                // `NOTIFIED` in the `compare_exchange` above. We must perform an
236                // acquire operation that synchronizes with that `unpark` to observe
237                // any writes it made before the call to unpark. To do that we must
238                // read from the write it made to `state`.
239                let old = self.state.swap(EMPTY, SeqCst);
240                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
241
242                return;
243            }
244            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
245        }
246
247        if let Some(duration) = duration {
248            debug_assert_ne!(duration, Duration::ZERO);
249            driver.park_timeout(handle, duration);
250        } else {
251            driver.park(handle);
252        }
253
254        match self.state.swap(EMPTY, SeqCst) {
255            NOTIFIED => {}      // got a notification, hurray!
256            PARKED_DRIVER => {} // no notification, alas
257            n => panic!("inconsistent park_timeout state: {n}"),
258        }
259    }
260
261    fn unpark(&self, driver: &driver::Handle) {
262        // To ensure the unparked thread will observe any writes we made before
263        // this call, we must perform a release operation that `park` can
264        // synchronize with. To do that we must write `NOTIFIED` even if `state`
265        // is already `NOTIFIED`. That is why this must be a swap rather than a
266        // compare-and-swap that returns if it reads `NOTIFIED` on failure.
267        match self.state.swap(NOTIFIED, SeqCst) {
268            EMPTY => {}    // no one was waiting
269            NOTIFIED => {} // already unparked
270            PARKED_CONDVAR => self.unpark_condvar(),
271            PARKED_DRIVER => driver.unpark(),
272            actual => panic!("inconsistent state in unpark; actual = {actual}"),
273        }
274    }
275
276    fn unpark_condvar(&self) {
277        // There is a period between when the parked thread sets `state` to
278        // `PARKED` (or last checked `state` in the case of a spurious wake
279        // up) and when it actually waits on `cvar`. If we were to notify
280        // during this period it would be ignored and then when the parked
281        // thread went to sleep it would never wake up. Fortunately, it has
282        // `lock` locked at this stage so we can acquire `lock` to wait until
283        // it is ready to receive the notification.
284        //
285        // Releasing `lock` before the call to `notify_one` means that when the
286        // parked thread wakes it doesn't get woken only to have to wait for us
287        // to release `lock`.
288        drop(self.mutex.lock());
289
290        self.condvar.notify_one();
291    }
292
293    fn shutdown(&self, handle: &driver::Handle) {
294        if let Some(mut driver) = self.shared.driver.try_lock() {
295            driver.shutdown(handle);
296        }
297
298        self.condvar.notify_all();
299    }
300}