1#![deny(unsafe_code)]
9
10use std::cmp::{self, Ord};
11use std::collections::BinaryHeap;
12use std::time::{Duration, Instant};
13
14use crossbeam_channel::{Receiver, after, never};
15use malloc_size_of_derive::MallocSizeOf;
16
17pub type BoxedTimerCallback = Box<dyn Fn() + Send + 'static>;
20
21#[derive(MallocSizeOf)]
23pub struct TimerEventRequest {
24 #[ignore_malloc_size_of = "Size of a boxed function"]
25 pub callback: BoxedTimerCallback,
26 pub duration: Duration,
27}
28
29impl TimerEventRequest {
30 fn dispatch(self) {
31 (self.callback)()
32 }
33}
34
35#[derive(MallocSizeOf)]
36struct ScheduledEvent {
37 id: TimerId,
38 request: TimerEventRequest,
39 for_time: Instant,
40}
41
42impl Ord for ScheduledEvent {
43 fn cmp(&self, other: &ScheduledEvent) -> cmp::Ordering {
44 self.for_time.cmp(&other.for_time).reverse()
45 }
46}
47
48impl PartialOrd for ScheduledEvent {
49 fn partial_cmp(&self, other: &ScheduledEvent) -> Option<cmp::Ordering> {
50 Some(self.cmp(other))
51 }
52}
53
54impl Eq for ScheduledEvent {}
55impl PartialEq for ScheduledEvent {
56 fn eq(&self, other: &ScheduledEvent) -> bool {
57 std::ptr::eq(self, other)
58 }
59}
60
61#[derive(Clone, Copy, MallocSizeOf, PartialEq)]
62pub struct TimerId(usize);
63
64#[derive(Default, MallocSizeOf)]
66pub struct TimerScheduler {
67 queue: BinaryHeap<ScheduledEvent>,
69
70 current_id: usize,
72}
73
74impl TimerScheduler {
75 pub fn schedule_timer(&mut self, request: TimerEventRequest) -> TimerId {
77 let for_time = Instant::now() + request.duration;
78
79 let id = TimerId(self.current_id);
80 self.current_id += 1;
81
82 self.queue.push(ScheduledEvent {
83 id,
84 request,
85 for_time,
86 });
87 id
88 }
89
90 pub fn cancel_timer(&mut self, id: TimerId) {
93 self.queue.retain(|event| event.id != id);
94 }
95
96 pub fn wait_channel(&self) -> Receiver<Instant> {
99 self.queue
100 .peek()
101 .map(|event| {
102 let now = Instant::now();
103 if event.for_time < now {
104 after(Duration::ZERO)
105 } else {
106 after(event.for_time - now)
107 }
108 })
109 .unwrap_or_else(never)
110 }
111
112 pub fn dispatch_completed_timers(&mut self) {
115 let now = Instant::now();
116 loop {
117 match self.queue.peek() {
118 Some(event) if event.for_time <= now => {},
120 _ => break,
122 }
123 self.queue
126 .pop()
127 .expect("Expected request")
128 .request
129 .dispatch();
130 }
131 }
132}