tokio/runtime/time/
mod.rs

1// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
2// in the future, this will change to the reverse. For now, suppress this
3// warning and generally stick with being explicit about unsafety.
4#![allow(unused_unsafe)]
5#![cfg_attr(not(feature = "rt"), allow(dead_code))]
6
7//! Time driver.
8
9mod entry;
10pub(crate) use entry::TimerEntry;
11use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION};
12
13mod handle;
14pub(crate) use self::handle::Handle;
15
16mod source;
17pub(crate) use source::TimeSource;
18
19mod wheel;
20
21#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
22use super::time_alt;
23
24use crate::loom::sync::atomic::{AtomicBool, Ordering};
25use crate::loom::sync::Mutex;
26use crate::runtime::driver::{self, IoHandle, IoStack};
27use crate::time::error::Error;
28use crate::time::{Clock, Duration};
29use crate::util::WakeList;
30
31use std::fmt;
32use std::{num::NonZeroU64, ptr::NonNull};
33
34/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
35///
36/// A `Driver` instance tracks the state necessary for managing time and
37/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
38///
39/// It is expected that a single instance manages many individual [`Sleep`][sleep]
40/// instances. The `Driver` implementation is thread-safe and, as such, is able
41/// to handle callers from across threads.
42///
43/// After creating the `Driver` instance, the caller must repeatedly call `park`
44/// or `park_timeout`. The time driver will perform no work unless `park` or
45/// `park_timeout` is called repeatedly.
46///
47/// The driver has a resolution of one millisecond. Any unit of time that falls
48/// between milliseconds are rounded up to the next millisecond.
49///
50/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
51/// elapsed will be notified with an error. At this point, calling `poll` on the
52/// [`Sleep`][sleep] instance will result in panic.
53///
54/// # Implementation
55///
56/// The time driver is based on the [paper by Varghese and Lauck][paper].
57///
58/// A hashed timing wheel is a vector of slots, where each slot handles a time
59/// slice. As time progresses, the timer walks over the slot for the current
60/// instant, and processes each entry for that slot. When the timer reaches the
61/// end of the wheel, it starts again at the beginning.
62///
63/// The implementation maintains six wheels arranged in a set of levels. As the
64/// levels go up, the slots of the associated wheel represent larger intervals
65/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
66/// time equal to the wheel at the lower level. At level zero, each slot
67/// represents one millisecond of time.
68///
69/// The wheels are:
70///
71/// * Level 0: 64 x 1 millisecond slots.
72/// * Level 1: 64 x 64 millisecond slots.
73/// * Level 2: 64 x ~4 second slots.
74/// * Level 3: 64 x ~4 minute slots.
75/// * Level 4: 64 x ~4 hour slots.
76/// * Level 5: 64 x ~12 day slots.
77///
78/// When the timer processes entries at level zero, it will notify all the
79/// `Sleep` instances as their deadlines have been reached. For all higher
80/// levels, all entries will be redistributed across the wheel at the next level
81/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
82/// either be canceled (dropped) or their associated entries will reach level
83/// zero and be notified.
84///
85/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
86/// [sleep]: crate::time::Sleep
87/// [timeout]: crate::time::Timeout
88/// [interval]: crate::time::Interval
89#[derive(Debug)]
90pub(crate) struct Driver {
91    /// Parker to delegate to.
92    park: IoStack,
93}
94
95enum Inner {
96    Traditional {
97        // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
98        state: Mutex<InnerState>,
99
100        /// True if the driver is being shutdown.
101        is_shutdown: AtomicBool,
102
103        // When `true`, a call to `park_timeout` should immediately return and time
104        // should not advance. One reason for this to be `true` is if the task
105        // passed to `Runtime::block_on` called `task::yield_now()`.
106        //
107        // While it may look racy, it only has any effect when the clock is paused
108        // and pausing the clock is restricted to a single-threaded runtime.
109        #[cfg(feature = "test-util")]
110        did_wake: AtomicBool,
111    },
112
113    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
114    Alternative {
115        /// True if the driver is being shutdown.
116        is_shutdown: AtomicBool,
117
118        // When `true`, a call to `park_timeout` should immediately return and time
119        // should not advance. One reason for this to be `true` is if the task
120        // passed to `Runtime::block_on` called `task::yield_now()`.
121        //
122        // While it may look racy, it only has any effect when the clock is paused
123        // and pausing the clock is restricted to a single-threaded runtime.
124        #[cfg(feature = "test-util")]
125        did_wake: AtomicBool,
126    },
127}
128
129/// Time state shared which must be protected by a `Mutex`
130struct InnerState {
131    /// The earliest time at which we promise to wake up without unparking.
132    next_wake: Option<NonZeroU64>,
133
134    /// Timer wheel.
135    wheel: wheel::Wheel,
136}
137
138// ===== impl Driver =====
139
140impl Driver {
141    /// Creates a new `Driver` instance that uses `park` to block the current
142    /// thread and `time_source` to get the current time and convert to ticks.
143    ///
144    /// Specifying the source of time is useful when testing.
145    pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
146        let time_source = TimeSource::new(clock);
147
148        let handle = Handle {
149            time_source,
150            inner: Inner::Traditional {
151                state: Mutex::new(InnerState {
152                    next_wake: None,
153                    wheel: wheel::Wheel::new(),
154                }),
155                is_shutdown: AtomicBool::new(false),
156
157                #[cfg(feature = "test-util")]
158                did_wake: AtomicBool::new(false),
159            },
160        };
161
162        let driver = Driver { park };
163
164        (driver, handle)
165    }
166
167    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
168    pub(crate) fn new_alt(clock: &Clock) -> Handle {
169        let time_source = TimeSource::new(clock);
170
171        Handle {
172            time_source,
173            inner: Inner::Alternative {
174                is_shutdown: AtomicBool::new(false),
175                #[cfg(feature = "test-util")]
176                did_wake: AtomicBool::new(false),
177            },
178        }
179    }
180
181    pub(crate) fn park(&mut self, handle: &driver::Handle) {
182        self.park_internal(handle, None);
183    }
184
185    pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
186        self.park_internal(handle, Some(duration));
187    }
188
189    pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
190        let handle = rt_handle.time();
191
192        if handle.is_shutdown() {
193            return;
194        }
195
196        match &handle.inner {
197            Inner::Traditional { is_shutdown, .. } => {
198                is_shutdown.store(true, Ordering::SeqCst);
199            }
200            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
201            Inner::Alternative { is_shutdown, .. } => {
202                is_shutdown.store(true, Ordering::SeqCst);
203            }
204        }
205
206        // Advance time forward to the end of time.
207
208        handle.process_at_time(u64::MAX);
209
210        self.park.shutdown(rt_handle);
211    }
212
213    fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
214        let handle = rt_handle.time();
215        let mut lock = handle.inner.lock();
216
217        assert!(!handle.is_shutdown());
218
219        let next_wake = lock.wheel.next_expiration_time();
220        lock.next_wake =
221            next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
222
223        drop(lock);
224
225        match next_wake {
226            Some(when) => {
227                let now = handle.time_source.now(rt_handle.clock());
228                // Note that we effectively round up to 1ms here - this avoids
229                // very short-duration microsecond-resolution sleeps that the OS
230                // might treat as zero-length.
231                let mut duration = handle
232                    .time_source
233                    .tick_to_duration(when.saturating_sub(now));
234
235                if duration > Duration::from_millis(0) {
236                    if let Some(limit) = limit {
237                        duration = std::cmp::min(limit, duration);
238                    }
239
240                    self.park_thread_timeout(rt_handle, duration);
241                } else {
242                    self.park.park_timeout(rt_handle, Duration::from_secs(0));
243                }
244            }
245            None => {
246                if let Some(duration) = limit {
247                    self.park_thread_timeout(rt_handle, duration);
248                } else {
249                    self.park.park(rt_handle);
250                }
251            }
252        }
253
254        // Process pending timers after waking up
255        handle.process(rt_handle.clock());
256    }
257
258    cfg_test_util! {
259        fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
260            let handle = rt_handle.time();
261            let clock = rt_handle.clock();
262
263            if clock.can_auto_advance() {
264                self.park.park_timeout(rt_handle, Duration::from_secs(0));
265
266                // If the time driver was woken, then the park completed
267                // before the "duration" elapsed (usually caused by a
268                // yield in `Runtime::block_on`). In this case, we don't
269                // advance the clock.
270                if !handle.did_wake() {
271                    // Simulate advancing time
272                    if let Err(msg) = clock.advance(duration) {
273                        panic!("{}", msg);
274                    }
275                }
276            } else {
277                self.park.park_timeout(rt_handle, duration);
278            }
279        }
280    }
281
282    cfg_not_test_util! {
283        fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
284            self.park.park_timeout(rt_handle, duration);
285        }
286    }
287}
288
289impl Handle {
290    pub(self) fn process(&self, clock: &Clock) {
291        let now = self.time_source().now(clock);
292
293        self.process_at_time(now);
294    }
295
296    pub(self) fn process_at_time(&self, mut now: u64) {
297        let mut waker_list = WakeList::new();
298
299        let mut lock = self.inner.lock();
300
301        if now < lock.wheel.elapsed() {
302            // Time went backwards! This normally shouldn't happen as the Rust language
303            // guarantees that an Instant is monotonic, but can happen when running
304            // Linux in a VM on a Windows host due to std incorrectly trusting the
305            // hardware clock to be monotonic.
306            //
307            // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
308            now = lock.wheel.elapsed();
309        }
310
311        while let Some(entry) = lock.wheel.poll(now) {
312            debug_assert!(unsafe { entry.is_pending() });
313
314            // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
315            if let Some(waker) = unsafe { entry.fire(Ok(())) } {
316                waker_list.push(waker);
317
318                if !waker_list.can_push() {
319                    // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
320                    drop(lock);
321
322                    waker_list.wake_all();
323
324                    lock = self.inner.lock();
325                }
326            }
327        }
328
329        lock.next_wake = lock
330            .wheel
331            .poll_at()
332            .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
333
334        drop(lock);
335
336        waker_list.wake_all();
337    }
338
339    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
340    pub(crate) fn process_at_time_alt(
341        &self,
342        wheel: &mut time_alt::Wheel,
343        mut now: u64,
344        wake_queue: &mut time_alt::WakeQueue,
345    ) {
346        if now < wheel.elapsed() {
347            // Time went backwards! This normally shouldn't happen as the Rust language
348            // guarantees that an Instant is monotonic, but can happen when running
349            // Linux in a VM on a Windows host due to std incorrectly trusting the
350            // hardware clock to be monotonic.
351            //
352            // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
353            now = wheel.elapsed();
354        }
355
356        wheel.take_expired(now, wake_queue);
357    }
358
359    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
360    pub(crate) fn shutdown_alt(&self, wheel: &mut time_alt::Wheel) {
361        // self.is_shutdown.store(true, Ordering::SeqCst);
362        // Advance time forward to the end of time.
363        // This will ensure that all timers are fired.
364        let max_tick = u64::MAX;
365        let mut wake_queue = time_alt::WakeQueue::new();
366        self.process_at_time_alt(wheel, max_tick, &mut wake_queue);
367        wake_queue.wake_all();
368    }
369
370    /// Removes a registered timer from the driver.
371    ///
372    /// The timer will be moved to the cancelled state. Wakers will _not_ be
373    /// invoked. If the timer is already completed, this function is a no-op.
374    ///
375    /// This function always acquires the driver lock, even if the entry does
376    /// not appear to be registered.
377    ///
378    /// SAFETY: The timer must not be registered with some other driver, and
379    /// `add_entry` must not be called concurrently.
380    pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
381        unsafe {
382            let mut lock = self.inner.lock();
383
384            if entry.as_ref().might_be_registered() {
385                lock.wheel.remove(entry);
386            }
387
388            entry.as_ref().handle().fire(Ok(()));
389        }
390    }
391
392    /// Removes and re-adds an entry to the driver.
393    ///
394    /// SAFETY: The timer must be either unregistered, or registered with this
395    /// driver. No other threads are allowed to concurrently manipulate the
396    /// timer at all (the current thread should hold an exclusive reference to
397    /// the `TimerEntry`)
398    pub(self) unsafe fn reregister(
399        &self,
400        unpark: &IoHandle,
401        new_tick: u64,
402        entry: NonNull<TimerShared>,
403    ) {
404        let waker = unsafe {
405            let mut lock = self.inner.lock();
406
407            // We may have raced with a firing/deregistration, so check before
408            // deregistering.
409            if unsafe { entry.as_ref().might_be_registered() } {
410                lock.wheel.remove(entry);
411            }
412
413            // Now that we have exclusive control of this entry, mint a handle to reinsert it.
414            let entry = entry.as_ref().handle();
415
416            if self.is_shutdown() {
417                unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
418            } else {
419                entry.set_expiration(new_tick);
420
421                // Note: We don't have to worry about racing with some other resetting
422                // thread, because add_entry and reregister require exclusive control of
423                // the timer entry.
424                match unsafe { lock.wheel.insert(entry) } {
425                    Ok(when) => {
426                        if lock
427                            .next_wake
428                            .map(|next_wake| when < next_wake.get())
429                            .unwrap_or(true)
430                        {
431                            unpark.unpark();
432                        }
433
434                        None
435                    }
436                    Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
437                        entry.fire(Ok(()))
438                    },
439                }
440            }
441
442            // Must release lock before invoking waker to avoid the risk of deadlock.
443        };
444
445        // The timer was fired synchronously as a result of the reregistration.
446        // Wake the waker; this is needed because we might reset _after_ a poll,
447        // and otherwise the task won't be awoken to poll again.
448        if let Some(waker) = waker {
449            waker.wake();
450        }
451    }
452
453    cfg_test_util! {
454        pub(super) fn did_wake(&self) -> bool {
455            match &self.inner {
456                Inner::Traditional { did_wake, .. } => did_wake.swap(false, Ordering::SeqCst),
457                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
458                Inner::Alternative { did_wake, .. } => did_wake.swap(false, Ordering::SeqCst),
459            }
460        }
461    }
462}
463
464// ===== impl Inner =====
465
466impl Inner {
467    /// Locks the driver's inner structure
468    pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
469        match self {
470            Inner::Traditional { state, .. } => state.lock(),
471            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
472            Inner::Alternative { .. } => unreachable!("unreachable in alternative timer"),
473        }
474    }
475
476    // Check whether the driver has been shutdown
477    pub(super) fn is_shutdown(&self) -> bool {
478        match self {
479            Inner::Traditional { is_shutdown, .. } => is_shutdown.load(Ordering::SeqCst),
480            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
481            Inner::Alternative { is_shutdown, .. } => is_shutdown.load(Ordering::SeqCst),
482        }
483    }
484}
485
486impl fmt::Debug for Inner {
487    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
488        fmt.debug_struct("Inner").finish()
489    }
490}
491
492#[cfg(test)]
493mod tests;