timers/
lib.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! A generic timer scheduler module that can be integrated into a crossbeam based event
6//! loop or used to launch a background timer thread.
7
8#![deny(unsafe_code)]
9
10use std::cmp::{self, Ord};
11use std::collections::BinaryHeap;
12use std::time::{Duration, Instant};
13
14use crossbeam_channel::{Receiver, after, never};
15use malloc_size_of_derive::MallocSizeOf;
16
17/// A callback to pass to the [`TimerScheduler`] to be called when the timer is
18/// dispatched.
19pub type BoxedTimerCallback = Box<dyn Fn() + Send + 'static>;
20
21/// Requests a TimerEvent-Message be sent after the given duration.
22#[derive(MallocSizeOf)]
23pub struct TimerEventRequest {
24    #[ignore_malloc_size_of = "Size of a boxed function"]
25    pub callback: BoxedTimerCallback,
26    pub duration: Duration,
27}
28
29impl TimerEventRequest {
30    fn dispatch(self) {
31        (self.callback)()
32    }
33}
34
35#[derive(MallocSizeOf)]
36struct ScheduledEvent {
37    id: TimerId,
38    request: TimerEventRequest,
39    for_time: Instant,
40}
41
42impl Ord for ScheduledEvent {
43    fn cmp(&self, other: &ScheduledEvent) -> cmp::Ordering {
44        self.for_time.cmp(&other.for_time).reverse()
45    }
46}
47
48impl PartialOrd for ScheduledEvent {
49    fn partial_cmp(&self, other: &ScheduledEvent) -> Option<cmp::Ordering> {
50        Some(self.cmp(other))
51    }
52}
53
54impl Eq for ScheduledEvent {}
55impl PartialEq for ScheduledEvent {
56    fn eq(&self, other: &ScheduledEvent) -> bool {
57        std::ptr::eq(self, other)
58    }
59}
60
61#[derive(Clone, Copy, MallocSizeOf, PartialEq)]
62pub struct TimerId(usize);
63
64/// A queue of [`TimerEventRequest`]s that are stored in order of next-to-fire.
65#[derive(Default, MallocSizeOf)]
66pub struct TimerScheduler {
67    /// A priority queue of future events, sorted by due time.
68    queue: BinaryHeap<ScheduledEvent>,
69
70    /// The current timer id, used to generate new ones.
71    current_id: usize,
72}
73
74impl TimerScheduler {
75    /// Schedule a new timer for on this [`TimerScheduler`].
76    pub fn schedule_timer(&mut self, request: TimerEventRequest) -> TimerId {
77        let for_time = Instant::now() + request.duration;
78
79        let id = TimerId(self.current_id);
80        self.current_id += 1;
81
82        self.queue.push(ScheduledEvent {
83            id,
84            request,
85            for_time,
86        });
87        id
88    }
89
90    /// Cancel a timer with the given [`TimerId`]. If a timer with that id is not
91    /// currently waiting to fire, do nothing.
92    pub fn cancel_timer(&mut self, id: TimerId) {
93        self.queue.retain(|event| event.id != id);
94    }
95
96    /// Get a [`Receiver<Instant>`] that receives a message after waiting for the next timer
97    /// to fire. If there are no timers, the channel will *never* send a message.
98    pub fn wait_channel(&self) -> Receiver<Instant> {
99        self.queue
100            .peek()
101            .map(|event| {
102                let now = Instant::now();
103                if event.for_time < now {
104                    after(Duration::ZERO)
105                } else {
106                    after(event.for_time - now)
107                }
108            })
109            .unwrap_or_else(never)
110    }
111
112    /// Dispatch any timer events from this [`TimerScheduler`]'s `queue` when `now` is
113    /// past the due time of the event.
114    pub fn dispatch_completed_timers(&mut self) {
115        let now = Instant::now();
116        loop {
117            match self.queue.peek() {
118                // Dispatch the event if its due time is past.
119                Some(event) if event.for_time <= now => {},
120                // Otherwise, we're done dispatching events.
121                _ => break,
122            }
123            // Remove the event from the priority queue (Note this only executes when the
124            // first event has been dispatched
125            self.queue
126                .pop()
127                .expect("Expected request")
128                .request
129                .dispatch();
130        }
131    }
132}