tokio/runtime/time/mod.rs
1// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
2// in the future, this will change to the reverse. For now, suppress this
3// warning and generally stick with being explicit about unsafety.
4#![allow(unused_unsafe)]
5#![cfg_attr(not(feature = "rt"), allow(dead_code))]
6
7//! Time driver.
8
9mod entry;
10pub(crate) use entry::TimerEntry;
11use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION};
12
13mod handle;
14pub(crate) use self::handle::Handle;
15
16mod source;
17pub(crate) use source::TimeSource;
18
19mod wheel;
20
21#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
22use super::time_alt;
23
24use crate::loom::sync::atomic::{AtomicBool, Ordering};
25use crate::loom::sync::Mutex;
26use crate::runtime::driver::{self, IoHandle, IoStack};
27use crate::time::error::Error;
28use crate::time::{Clock, Duration};
29use crate::util::WakeList;
30
31use std::fmt;
32use std::{num::NonZeroU64, ptr::NonNull};
33
34/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
35///
36/// A `Driver` instance tracks the state necessary for managing time and
37/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
38///
39/// It is expected that a single instance manages many individual [`Sleep`][sleep]
40/// instances. The `Driver` implementation is thread-safe and, as such, is able
41/// to handle callers from across threads.
42///
43/// After creating the `Driver` instance, the caller must repeatedly call `park`
44/// or `park_timeout`. The time driver will perform no work unless `park` or
45/// `park_timeout` is called repeatedly.
46///
47/// The driver has a resolution of one millisecond. Any unit of time that falls
48/// between milliseconds are rounded up to the next millisecond.
49///
50/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not
51/// elapsed will be notified with an error. At this point, calling `poll` on the
52/// [`Sleep`][sleep] instance will result in panic.
53///
54/// # Implementation
55///
56/// The time driver is based on the [paper by Varghese and Lauck][paper].
57///
58/// A hashed timing wheel is a vector of slots, where each slot handles a time
59/// slice. As time progresses, the timer walks over the slot for the current
60/// instant, and processes each entry for that slot. When the timer reaches the
61/// end of the wheel, it starts again at the beginning.
62///
63/// The implementation maintains six wheels arranged in a set of levels. As the
64/// levels go up, the slots of the associated wheel represent larger intervals
65/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
66/// time equal to the wheel at the lower level. At level zero, each slot
67/// represents one millisecond of time.
68///
69/// The wheels are:
70///
71/// * Level 0: 64 x 1 millisecond slots.
72/// * Level 1: 64 x 64 millisecond slots.
73/// * Level 2: 64 x ~4 second slots.
74/// * Level 3: 64 x ~4 minute slots.
75/// * Level 4: 64 x ~4 hour slots.
76/// * Level 5: 64 x ~12 day slots.
77///
78/// When the timer processes entries at level zero, it will notify all the
79/// `Sleep` instances as their deadlines have been reached. For all higher
80/// levels, all entries will be redistributed across the wheel at the next level
81/// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will
82/// either be canceled (dropped) or their associated entries will reach level
83/// zero and be notified.
84///
85/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
86/// [sleep]: crate::time::Sleep
87/// [timeout]: crate::time::Timeout
88/// [interval]: crate::time::Interval
89#[derive(Debug)]
90pub(crate) struct Driver {
91 /// Parker to delegate to.
92 park: IoStack,
93}
94
95enum Inner {
96 Traditional {
97 // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
98 state: Mutex<InnerState>,
99
100 /// True if the driver is being shutdown.
101 is_shutdown: AtomicBool,
102
103 // When `true`, a call to `park_timeout` should immediately return and time
104 // should not advance. One reason for this to be `true` is if the task
105 // passed to `Runtime::block_on` called `task::yield_now()`.
106 //
107 // While it may look racy, it only has any effect when the clock is paused
108 // and pausing the clock is restricted to a single-threaded runtime.
109 #[cfg(feature = "test-util")]
110 did_wake: AtomicBool,
111 },
112
113 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
114 Alternative {
115 /// True if the driver is being shutdown.
116 is_shutdown: AtomicBool,
117
118 // When `true`, a call to `park_timeout` should immediately return and time
119 // should not advance. One reason for this to be `true` is if the task
120 // passed to `Runtime::block_on` called `task::yield_now()`.
121 //
122 // While it may look racy, it only has any effect when the clock is paused
123 // and pausing the clock is restricted to a single-threaded runtime.
124 #[cfg(feature = "test-util")]
125 did_wake: AtomicBool,
126 },
127}
128
129/// Time state shared which must be protected by a `Mutex`
130struct InnerState {
131 /// The earliest time at which we promise to wake up without unparking.
132 next_wake: Option<NonZeroU64>,
133
134 /// Timer wheel.
135 wheel: wheel::Wheel,
136}
137
138// ===== impl Driver =====
139
140impl Driver {
141 /// Creates a new `Driver` instance that uses `park` to block the current
142 /// thread and `time_source` to get the current time and convert to ticks.
143 ///
144 /// Specifying the source of time is useful when testing.
145 pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
146 let time_source = TimeSource::new(clock);
147
148 let handle = Handle {
149 time_source,
150 inner: Inner::Traditional {
151 state: Mutex::new(InnerState {
152 next_wake: None,
153 wheel: wheel::Wheel::new(),
154 }),
155 is_shutdown: AtomicBool::new(false),
156
157 #[cfg(feature = "test-util")]
158 did_wake: AtomicBool::new(false),
159 },
160 };
161
162 let driver = Driver { park };
163
164 (driver, handle)
165 }
166
167 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
168 pub(crate) fn new_alt(clock: &Clock) -> Handle {
169 let time_source = TimeSource::new(clock);
170
171 Handle {
172 time_source,
173 inner: Inner::Alternative {
174 is_shutdown: AtomicBool::new(false),
175 #[cfg(feature = "test-util")]
176 did_wake: AtomicBool::new(false),
177 },
178 }
179 }
180
181 pub(crate) fn park(&mut self, handle: &driver::Handle) {
182 self.park_internal(handle, None);
183 }
184
185 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
186 self.park_internal(handle, Some(duration));
187 }
188
189 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
190 let handle = rt_handle.time();
191
192 if handle.is_shutdown() {
193 return;
194 }
195
196 match &handle.inner {
197 Inner::Traditional { is_shutdown, .. } => {
198 is_shutdown.store(true, Ordering::SeqCst);
199 }
200 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
201 Inner::Alternative { is_shutdown, .. } => {
202 is_shutdown.store(true, Ordering::SeqCst);
203 }
204 }
205
206 // Advance time forward to the end of time.
207
208 handle.process_at_time(u64::MAX);
209
210 self.park.shutdown(rt_handle);
211 }
212
213 fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
214 let handle = rt_handle.time();
215 let mut lock = handle.inner.lock();
216
217 assert!(!handle.is_shutdown());
218
219 let next_wake = lock.wheel.next_expiration_time();
220 lock.next_wake =
221 next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
222
223 drop(lock);
224
225 match next_wake {
226 Some(when) => {
227 let now = handle.time_source.now(rt_handle.clock());
228 // Note that we effectively round up to 1ms here - this avoids
229 // very short-duration microsecond-resolution sleeps that the OS
230 // might treat as zero-length.
231 let mut duration = handle
232 .time_source
233 .tick_to_duration(when.saturating_sub(now));
234
235 if duration > Duration::from_millis(0) {
236 if let Some(limit) = limit {
237 duration = std::cmp::min(limit, duration);
238 }
239
240 self.park_thread_timeout(rt_handle, duration);
241 } else {
242 self.park.park_timeout(rt_handle, Duration::from_secs(0));
243 }
244 }
245 None => {
246 if let Some(duration) = limit {
247 self.park_thread_timeout(rt_handle, duration);
248 } else {
249 self.park.park(rt_handle);
250 }
251 }
252 }
253
254 // Process pending timers after waking up
255 handle.process(rt_handle.clock());
256 }
257
258 cfg_test_util! {
259 fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
260 let handle = rt_handle.time();
261 let clock = rt_handle.clock();
262
263 if clock.can_auto_advance() {
264 self.park.park_timeout(rt_handle, Duration::from_secs(0));
265
266 // If the time driver was woken, then the park completed
267 // before the "duration" elapsed (usually caused by a
268 // yield in `Runtime::block_on`). In this case, we don't
269 // advance the clock.
270 if !handle.did_wake() {
271 // Simulate advancing time
272 if let Err(msg) = clock.advance(duration) {
273 panic!("{}", msg);
274 }
275 }
276 } else {
277 self.park.park_timeout(rt_handle, duration);
278 }
279 }
280 }
281
282 cfg_not_test_util! {
283 fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
284 self.park.park_timeout(rt_handle, duration);
285 }
286 }
287}
288
289impl Handle {
290 pub(self) fn process(&self, clock: &Clock) {
291 let now = self.time_source().now(clock);
292
293 self.process_at_time(now);
294 }
295
296 pub(self) fn process_at_time(&self, mut now: u64) {
297 let mut waker_list = WakeList::new();
298
299 let mut lock = self.inner.lock();
300
301 if now < lock.wheel.elapsed() {
302 // Time went backwards! This normally shouldn't happen as the Rust language
303 // guarantees that an Instant is monotonic, but can happen when running
304 // Linux in a VM on a Windows host due to std incorrectly trusting the
305 // hardware clock to be monotonic.
306 //
307 // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
308 now = lock.wheel.elapsed();
309 }
310
311 while let Some(entry) = lock.wheel.poll(now) {
312 debug_assert!(unsafe { entry.is_pending() });
313
314 // SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
315 if let Some(waker) = unsafe { entry.fire(Ok(())) } {
316 waker_list.push(waker);
317
318 if !waker_list.can_push() {
319 // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
320 drop(lock);
321
322 waker_list.wake_all();
323
324 lock = self.inner.lock();
325 }
326 }
327 }
328
329 lock.next_wake = lock
330 .wheel
331 .poll_at()
332 .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
333
334 drop(lock);
335
336 waker_list.wake_all();
337 }
338
339 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
340 pub(crate) fn process_at_time_alt(
341 &self,
342 wheel: &mut time_alt::Wheel,
343 mut now: u64,
344 wake_queue: &mut time_alt::WakeQueue,
345 ) {
346 if now < wheel.elapsed() {
347 // Time went backwards! This normally shouldn't happen as the Rust language
348 // guarantees that an Instant is monotonic, but can happen when running
349 // Linux in a VM on a Windows host due to std incorrectly trusting the
350 // hardware clock to be monotonic.
351 //
352 // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
353 now = wheel.elapsed();
354 }
355
356 wheel.take_expired(now, wake_queue);
357 }
358
359 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
360 pub(crate) fn shutdown_alt(&self, wheel: &mut time_alt::Wheel) {
361 // self.is_shutdown.store(true, Ordering::SeqCst);
362 // Advance time forward to the end of time.
363 // This will ensure that all timers are fired.
364 let max_tick = u64::MAX;
365 let mut wake_queue = time_alt::WakeQueue::new();
366 self.process_at_time_alt(wheel, max_tick, &mut wake_queue);
367 wake_queue.wake_all();
368 }
369
370 /// Removes a registered timer from the driver.
371 ///
372 /// The timer will be moved to the cancelled state. Wakers will _not_ be
373 /// invoked. If the timer is already completed, this function is a no-op.
374 ///
375 /// This function always acquires the driver lock, even if the entry does
376 /// not appear to be registered.
377 ///
378 /// SAFETY: The timer must not be registered with some other driver, and
379 /// `add_entry` must not be called concurrently.
380 pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
381 unsafe {
382 let mut lock = self.inner.lock();
383
384 if entry.as_ref().might_be_registered() {
385 lock.wheel.remove(entry);
386 }
387
388 entry.as_ref().handle().fire(Ok(()));
389 }
390 }
391
392 /// Removes and re-adds an entry to the driver.
393 ///
394 /// SAFETY: The timer must be either unregistered, or registered with this
395 /// driver. No other threads are allowed to concurrently manipulate the
396 /// timer at all (the current thread should hold an exclusive reference to
397 /// the `TimerEntry`)
398 pub(self) unsafe fn reregister(
399 &self,
400 unpark: &IoHandle,
401 new_tick: u64,
402 entry: NonNull<TimerShared>,
403 ) {
404 let waker = unsafe {
405 let mut lock = self.inner.lock();
406
407 // We may have raced with a firing/deregistration, so check before
408 // deregistering.
409 if unsafe { entry.as_ref().might_be_registered() } {
410 lock.wheel.remove(entry);
411 }
412
413 // Now that we have exclusive control of this entry, mint a handle to reinsert it.
414 let entry = entry.as_ref().handle();
415
416 if self.is_shutdown() {
417 unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
418 } else {
419 entry.set_expiration(new_tick);
420
421 // Note: We don't have to worry about racing with some other resetting
422 // thread, because add_entry and reregister require exclusive control of
423 // the timer entry.
424 match unsafe { lock.wheel.insert(entry) } {
425 Ok(when) => {
426 if lock
427 .next_wake
428 .map(|next_wake| when < next_wake.get())
429 .unwrap_or(true)
430 {
431 unpark.unpark();
432 }
433
434 None
435 }
436 Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
437 entry.fire(Ok(()))
438 },
439 }
440 }
441
442 // Must release lock before invoking waker to avoid the risk of deadlock.
443 };
444
445 // The timer was fired synchronously as a result of the reregistration.
446 // Wake the waker; this is needed because we might reset _after_ a poll,
447 // and otherwise the task won't be awoken to poll again.
448 if let Some(waker) = waker {
449 waker.wake();
450 }
451 }
452
453 cfg_test_util! {
454 pub(super) fn did_wake(&self) -> bool {
455 match &self.inner {
456 Inner::Traditional { did_wake, .. } => did_wake.swap(false, Ordering::SeqCst),
457 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
458 Inner::Alternative { did_wake, .. } => did_wake.swap(false, Ordering::SeqCst),
459 }
460 }
461 }
462}
463
464// ===== impl Inner =====
465
466impl Inner {
467 /// Locks the driver's inner structure
468 pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
469 match self {
470 Inner::Traditional { state, .. } => state.lock(),
471 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
472 Inner::Alternative { .. } => unreachable!("unreachable in alternative timer"),
473 }
474 }
475
476 // Check whether the driver has been shutdown
477 pub(super) fn is_shutdown(&self) -> bool {
478 match self {
479 Inner::Traditional { is_shutdown, .. } => is_shutdown.load(Ordering::SeqCst),
480 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
481 Inner::Alternative { is_shutdown, .. } => is_shutdown.load(Ordering::SeqCst),
482 }
483 }
484}
485
486impl fmt::Debug for Inner {
487 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
488 fmt.debug_struct("Inner").finish()
489 }
490}
491
492#[cfg(test)]
493mod tests;