tokio/runtime/time/wheel/
mod.rs

1use crate::runtime::time::{TimerHandle, TimerShared};
2use crate::time::error::InsertError;
3
4mod level;
5pub(crate) use self::level::Expiration;
6use self::level::Level;
7
8use std::{array, ptr::NonNull};
9
10use super::entry::STATE_DEREGISTERED;
11use super::EntryList;
12
13/// Timing wheel implementation.
14///
15/// This type provides the hashed timing wheel implementation that backs
16/// [`Driver`].
17///
18/// See [`Driver`] documentation for some implementation notes.
19///
20/// [`Driver`]: crate::runtime::time::Driver
21#[derive(Debug)]
22pub(crate) struct Wheel {
23    /// The number of milliseconds elapsed since the wheel started.
24    elapsed: u64,
25
26    /// Timer wheel.
27    ///
28    /// Levels:
29    ///
30    /// * 1 ms slots / 64 ms range
31    /// * 64 ms slots / ~ 4 sec range
32    /// * ~ 4 sec slots / ~ 4 min range
33    /// * ~ 4 min slots / ~ 4 hr range
34    /// * ~ 4 hr slots / ~ 12 day range
35    /// * ~ 12 day slots / ~ 2 yr range
36    levels: Box<[Level; NUM_LEVELS]>,
37
38    /// Entries queued for firing
39    pending: EntryList,
40}
41
42/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
43/// each, the timer is able to track time up to 2 years into the future with a
44/// precision of 1 millisecond.
45const NUM_LEVELS: usize = 6;
46
47/// The maximum duration of a `Sleep`.
48pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
49
50impl Wheel {
51    /// Creates a new timing wheel.
52    pub(crate) fn new() -> Wheel {
53        Wheel {
54            elapsed: 0,
55            levels: Box::new(array::from_fn(Level::new)),
56            pending: EntryList::new(),
57        }
58    }
59
60    /// Returns the number of milliseconds that have elapsed since the timing
61    /// wheel's creation.
62    pub(crate) fn elapsed(&self) -> u64 {
63        self.elapsed
64    }
65
66    /// Inserts an entry into the timing wheel.
67    ///
68    /// # Arguments
69    ///
70    /// * `item`: The item to insert into the wheel.
71    ///
72    /// # Return
73    ///
74    /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
75    ///
76    /// `Err(Elapsed)` indicates that `when` represents an instant that has
77    /// already passed. In this case, the caller should fire the timeout
78    /// immediately.
79    ///
80    /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
81    ///
82    /// # Safety
83    ///
84    /// This function registers item into an intrusive linked list. The caller
85    /// must ensure that `item` is pinned and will not be dropped without first
86    /// being deregistered.
87    pub(crate) unsafe fn insert(
88        &mut self,
89        item: TimerHandle,
90    ) -> Result<u64, (TimerHandle, InsertError)> {
91        let when = unsafe { item.sync_when() };
92
93        if when <= self.elapsed {
94            return Err((item, InsertError::Elapsed));
95        }
96
97        // Get the level at which the entry should be stored
98        let level = self.level_for(when);
99
100        unsafe {
101            self.levels[level].add_entry(item);
102        }
103
104        debug_assert!({
105            self.levels[level]
106                .next_expiration(self.elapsed)
107                .map(|e| e.deadline >= self.elapsed)
108                .unwrap_or(true)
109        });
110
111        Ok(when)
112    }
113
114    /// Removes `item` from the timing wheel.
115    pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
116        unsafe {
117            let when = item.as_ref().registered_when();
118            if when == STATE_DEREGISTERED {
119                self.pending.remove(item);
120            } else {
121                debug_assert!(
122                    self.elapsed <= when,
123                    "elapsed={}; when={}",
124                    self.elapsed,
125                    when
126                );
127
128                let level = self.level_for(when);
129                self.levels[level].remove_entry(item);
130            }
131        }
132    }
133
134    /// Instant at which to poll.
135    pub(crate) fn poll_at(&self) -> Option<u64> {
136        self.next_expiration().map(|expiration| expiration.deadline)
137    }
138
139    /// Advances the timer up to the instant represented by `now`.
140    pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
141        loop {
142            if let Some(handle) = self.pending.pop_back() {
143                return Some(handle);
144            }
145
146            match self.next_expiration() {
147                Some(ref expiration) if expiration.deadline <= now => {
148                    self.process_expiration(expiration);
149
150                    self.set_elapsed(expiration.deadline);
151                }
152                _ => {
153                    // in this case the poll did not indicate an expiration
154                    // _and_ we were not able to find a next expiration in
155                    // the current list of timers.  advance to the poll's
156                    // current time and do nothing else.
157                    self.set_elapsed(now);
158                    break;
159                }
160            }
161        }
162
163        self.pending.pop_back()
164    }
165
166    /// Returns the instant at which the next timeout expires.
167    fn next_expiration(&self) -> Option<Expiration> {
168        if !self.pending.is_empty() {
169            // Expire immediately as we have things pending firing
170            return Some(Expiration {
171                level: 0,
172                slot: 0,
173                deadline: self.elapsed,
174            });
175        }
176
177        // Check all levels
178        for (level_num, level) in self.levels.iter().enumerate() {
179            if let Some(expiration) = level.next_expiration(self.elapsed) {
180                // There cannot be any expirations at a higher level that happen
181                // before this one.
182                debug_assert!(self.no_expirations_before(level_num + 1, expiration.deadline));
183
184                return Some(expiration);
185            }
186        }
187
188        None
189    }
190
191    /// Returns the tick at which this timer wheel next needs to perform some
192    /// processing, or None if there are no timers registered.
193    pub(super) fn next_expiration_time(&self) -> Option<u64> {
194        self.next_expiration().map(|ex| ex.deadline)
195    }
196
197    /// Used for debug assertions
198    fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
199        let mut res = true;
200
201        for level in &self.levels[start_level..] {
202            if let Some(e2) = level.next_expiration(self.elapsed) {
203                if e2.deadline < before {
204                    res = false;
205                }
206            }
207        }
208
209        res
210    }
211
212    /// iteratively find entries that are between the wheel's current
213    /// time and the expiration time.  for each in that population either
214    /// queue it for notification (in the case of the last level) or tier
215    /// it down to the next level (in all other cases).
216    pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
217        // Note that we need to take _all_ of the entries off the list before
218        // processing any of them. This is important because it's possible that
219        // those entries might need to be reinserted into the same slot.
220        //
221        // This happens only on the highest level, when an entry is inserted
222        // more than MAX_DURATION into the future. When this happens, we wrap
223        // around, and process some entries a multiple of MAX_DURATION before
224        // they actually need to be dropped down a level. We then reinsert them
225        // back into the same position; we must make sure we don't then process
226        // those entries again or we'll end up in an infinite loop.
227        let mut entries = self.take_entries(expiration);
228
229        while let Some(item) = entries.pop_back() {
230            if expiration.level == 0 {
231                debug_assert_eq!(unsafe { item.registered_when() }, expiration.deadline);
232            }
233
234            // Try to expire the entry; this is cheap (doesn't synchronize) if
235            // the timer is not expired, and updates registered_when.
236            match unsafe { item.mark_pending(expiration.deadline) } {
237                Ok(()) => {
238                    // Item was expired
239                    self.pending.push_front(item);
240                }
241                Err(expiration_tick) => {
242                    let level = level_for(expiration.deadline, expiration_tick);
243                    unsafe {
244                        self.levels[level].add_entry(item);
245                    }
246                }
247            }
248        }
249    }
250
251    fn set_elapsed(&mut self, when: u64) {
252        assert!(
253            self.elapsed <= when,
254            "elapsed={:?}; when={:?}",
255            self.elapsed,
256            when
257        );
258
259        if when > self.elapsed {
260            self.elapsed = when;
261        }
262    }
263
264    /// Obtains the list of entries that need processing for the given expiration.
265    fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
266        self.levels[expiration.level].take_slot(expiration.slot)
267    }
268
269    fn level_for(&self, when: u64) -> usize {
270        level_for(self.elapsed, when)
271    }
272}
273
274fn level_for(elapsed: u64, when: u64) -> usize {
275    const SLOT_MASK: u64 = (1 << 6) - 1;
276
277    // Mask in the trailing bits ignored by the level calculation in order to cap
278    // the possible leading zeros
279    let mut masked = elapsed ^ when | SLOT_MASK;
280
281    if masked >= MAX_DURATION {
282        // Fudge the timer into the top level
283        masked = MAX_DURATION - 1;
284    }
285
286    let leading_zeros = masked.leading_zeros() as usize;
287    let significant = 63 - leading_zeros;
288
289    significant / NUM_LEVELS
290}
291
292#[cfg(all(test, not(loom)))]
293mod test {
294    use super::*;
295
296    #[test]
297    fn test_level_for() {
298        for pos in 0..64 {
299            assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
300        }
301
302        for level in 1..5 {
303            for pos in level..64 {
304                let a = pos * 64_usize.pow(level as u32);
305                assert_eq!(
306                    level,
307                    level_for(0, a as u64),
308                    "level_for({a}) -- binary = {a:b}"
309                );
310
311                if pos > level {
312                    let a = a - 1;
313                    assert_eq!(
314                        level,
315                        level_for(0, a as u64),
316                        "level_for({a}) -- binary = {a:b}"
317                    );
318                }
319
320                if pos < 64 {
321                    let a = a + 1;
322                    assert_eq!(
323                        level,
324                        level_for(0, a as u64),
325                        "level_for({a}) -- binary = {a:b}"
326                    );
327                }
328            }
329        }
330    }
331}