timers/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */

//! A generic timer scheduler module that can be integrated into a crossbeam based event
//! loop or used to launch a background timer thread.

#![deny(unsafe_code)]

use std::cmp::{self, Ord};
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};

use base::id::PipelineId;
use crossbeam_channel::{after, never, Receiver};
use malloc_size_of_derive::MallocSizeOf;
use serde::{Deserialize, Serialize};

/// Describes the source that requested the [`TimerEvent`].
#[derive(Clone, Copy, Debug, Deserialize, MallocSizeOf, Serialize)]
pub enum TimerSource {
    /// The event was requested from a window (`ScriptThread`).
    FromWindow(PipelineId),
    /// The event was requested from a worker (`DedicatedGlobalWorkerScope`).
    FromWorker,
}

/// The id to be used for a [`TimerEvent`] is defined by the corresponding [`TimerEventRequest`].
#[derive(Clone, Copy, Debug, Deserialize, Eq, MallocSizeOf, PartialEq, Serialize)]
pub struct TimerEventId(pub u32);

/// A notification that a timer has fired. [`TimerSource`] must be `FromWindow` when
/// dispatched to `ScriptThread` and must be `FromWorker` when dispatched to a
/// `DedicatedGlobalWorkerScope`
#[derive(Debug, Deserialize, Serialize)]
pub struct TimerEvent(pub TimerSource, pub TimerEventId);

/// A callback to pass to the [`TimerScheduler`] to be called when the timer is
/// dispatched.
pub type BoxedTimerCallback = Box<dyn Fn(TimerEvent) + Send + 'static>;

/// Requests a TimerEvent-Message be sent after the given duration.
#[derive(MallocSizeOf)]
pub struct TimerEventRequest {
    #[ignore_malloc_size_of = "Size of a boxed function"]
    pub callback: BoxedTimerCallback,
    pub source: TimerSource,
    pub id: TimerEventId,
    pub duration: Duration,
}

impl TimerEventRequest {
    fn dispatch(self) {
        (self.callback)(TimerEvent(self.source, self.id))
    }
}

#[derive(MallocSizeOf)]
struct ScheduledEvent {
    request: TimerEventRequest,
    for_time: Instant,
}

impl Ord for ScheduledEvent {
    fn cmp(&self, other: &ScheduledEvent) -> cmp::Ordering {
        self.for_time.cmp(&other.for_time).reverse()
    }
}

impl PartialOrd for ScheduledEvent {
    fn partial_cmp(&self, other: &ScheduledEvent) -> Option<cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Eq for ScheduledEvent {}
impl PartialEq for ScheduledEvent {
    fn eq(&self, other: &ScheduledEvent) -> bool {
        std::ptr::eq(self, other)
    }
}

/// A queue of [`TimerEventRequest`]s that are stored in order of next-to-fire.
#[derive(Default, MallocSizeOf)]
pub struct TimerScheduler {
    /// A priority queue of future events, sorted by due time.
    queue: BinaryHeap<ScheduledEvent>,
}

impl TimerScheduler {
    /// Schedule a new timer for on this [`TimerScheduler`].
    pub fn schedule_timer(&mut self, request: TimerEventRequest) {
        let for_time = Instant::now() + request.duration;
        self.queue.push(ScheduledEvent { request, for_time });
    }

    /// Get a [`Receiver<Instant>`] that receives a message after waiting for the next timer
    /// to fire. If there are no timers, the channel will *never* send a message.
    pub fn wait_channel(&self) -> Receiver<Instant> {
        self.queue
            .peek()
            .map(|event| {
                let now = Instant::now();
                if event.for_time < now {
                    after(Duration::ZERO)
                } else {
                    after(event.for_time - now)
                }
            })
            .unwrap_or_else(never)
    }

    /// Dispatch any timer events from this [`TimerScheduler`]'s `queue` when `now` is
    /// past the due time of the event.
    pub fn dispatch_completed_timers(&mut self) {
        let now = Instant::now();
        loop {
            match self.queue.peek() {
                // Dispatch the event if its due time is past.
                Some(event) if event.for_time <= now => {},
                // Otherwise, we're done dispatching events.
                _ => break,
            }
            // Remove the event from the priority queue (Note this only executes when the
            // first event has been dispatched
            self.queue
                .pop()
                .expect("Expected request")
                .request
                .dispatch();
        }
    }
}