1use std::cell::Cell;
8use std::collections::VecDeque;
9use std::default::Default;
10
11use base::id::PipelineId;
12use crossbeam_channel::{self, Receiver, Sender};
13use rustc_hash::{FxHashMap, FxHashSet};
14use strum::VariantArray;
15
16use crate::dom::bindings::cell::DomRefCell;
17use crate::dom::worker::TrustedWorkerAddress;
18use crate::script_runtime::ScriptThreadEventCategory;
19use crate::task::TaskBox;
20use crate::task_source::TaskSourceName;
21
22pub(crate) type QueuedTask = (
23 Option<TrustedWorkerAddress>,
24 ScriptThreadEventCategory,
25 Box<dyn TaskBox>,
26 Option<PipelineId>,
27 TaskSourceName,
28);
29
30pub(crate) trait QueuedTaskConversion {
32 fn task_source_name(&self) -> Option<&TaskSourceName>;
33 fn pipeline_id(&self) -> Option<PipelineId>;
34 fn into_queued_task(self) -> Option<QueuedTask>;
35 fn from_queued_task(queued_task: QueuedTask) -> Self;
36 fn inactive_msg() -> Self;
37 fn wake_up_msg() -> Self;
38 fn is_wake_up(&self) -> bool;
39}
40
41pub(crate) struct TaskQueue<T> {
42 port: Receiver<T>,
44 wake_up_sender: Sender<T>,
46 msg_queue: DomRefCell<VecDeque<T>>,
48 taken_task_counter: Cell<u64>,
50 throttled: DomRefCell<FxHashMap<TaskSourceName, VecDeque<QueuedTask>>>,
52 inactive: DomRefCell<FxHashMap<PipelineId, VecDeque<QueuedTask>>>,
54}
55
56impl<T: QueuedTaskConversion> TaskQueue<T> {
57 pub(crate) fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
58 TaskQueue {
59 port,
60 wake_up_sender,
61 msg_queue: DomRefCell::new(VecDeque::new()),
62 taken_task_counter: Default::default(),
63 throttled: Default::default(),
64 inactive: Default::default(),
65 }
66 }
67
68 fn release_tasks_for_fully_active_documents(
71 &self,
72 fully_active: &FxHashSet<PipelineId>,
73 ) -> Vec<T> {
74 self.inactive
75 .borrow_mut()
76 .iter_mut()
77 .filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
78 .flat_map(|(_, inactive_queue)| {
79 inactive_queue
80 .drain(0..)
81 .map(|queued_task| T::from_queued_task(queued_task))
82 })
83 .collect()
84 }
85
86 fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
89 let mut inactive = self.inactive.borrow_mut();
90 let inactive_queue = inactive.entry(*pipeline_id).or_default();
91 inactive_queue.push_back(
92 msg.into_queued_task()
93 .expect("Incoming messages should always be convertible into queued tasks"),
94 );
95 let mut msg_queue = self.msg_queue.borrow_mut();
96 if msg_queue.is_empty() {
97 msg_queue.push_back(T::inactive_msg());
102 }
103 }
104
105 fn process_incoming_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
108 let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
110
111 if !first_msg.is_wake_up() {
113 incoming.push(first_msg);
114 }
115
116 while let Ok(msg) = self.port.try_recv() {
118 if !msg.is_wake_up() {
119 incoming.push(msg);
120 }
121 }
122
123 let mut to_be_throttled = Vec::new();
126 let mut index = 0;
127 while index != incoming.len() {
128 index += 1; let task_source = match incoming[index - 1].task_source_name() {
131 Some(task_source) => task_source,
132 None => continue,
133 };
134
135 match task_source {
136 TaskSourceName::PerformanceTimeline => {
137 to_be_throttled.push(incoming.remove(index - 1));
138 index -= 1; },
140 _ => {
141 self.taken_task_counter
143 .set(self.taken_task_counter.get() + 1);
144 },
145 }
146 }
147
148 for msg in incoming {
149 if let Some(TaskSourceName::Rendering) = msg.task_source_name() {
152 self.msg_queue.borrow_mut().push_back(msg);
153 continue;
154 }
155 if let Some(pipeline_id) = msg.pipeline_id() {
156 if !fully_active.contains(&pipeline_id) {
157 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
158 continue;
159 }
160 }
161 self.msg_queue.borrow_mut().push_back(msg);
163 }
164
165 for msg in to_be_throttled {
166 let (worker, category, boxed, pipeline_id, task_source) = match msg.into_queued_task() {
168 Some(queued_task) => queued_task,
169 None => unreachable!(
170 "A message to be throttled should always be convertible into a queued task"
171 ),
172 };
173 let mut throttled_tasks = self.throttled.borrow_mut();
174 throttled_tasks.entry(task_source).or_default().push_back((
175 worker,
176 category,
177 boxed,
178 pipeline_id,
179 task_source,
180 ));
181 }
182 }
183
184 pub(crate) fn select(&self) -> &crossbeam_channel::Receiver<T> {
187 self.taken_task_counter.set(0);
189 &self.port
192 }
193
194 pub(crate) fn recv(&self) -> Result<T, ()> {
196 self.msg_queue.borrow_mut().pop_front().ok_or(())
197 }
198
199 pub(crate) fn take_tasks_and_recv(
201 &self,
202 fully_active: &FxHashSet<PipelineId>,
203 ) -> Result<T, ()> {
204 self.take_tasks(T::wake_up_msg(), fully_active);
205 self.recv()
206 }
207
208 pub(crate) fn take_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
211 const PER_ITERATION_MAX: u64 = 5;
213 self.process_incoming_tasks(first_msg, fully_active);
215 let mut throttled = self.throttled.borrow_mut();
216 let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
217 let mut task_source_cycler = TaskSourceName::VARIANTS.iter().cycle();
218 loop {
221 let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
222 let none_left = throttled_length == 0;
223 match (max_reached, none_left) {
224 (_, true) => break,
225 (true, false) => {
226 let _ = self.wake_up_sender.send(T::wake_up_msg());
230 break;
231 },
232 (false, false) => {
233 let task_source = task_source_cycler.next().unwrap();
235 let throttled_queue = match throttled.get_mut(task_source) {
236 Some(queue) => queue,
237 None => continue,
238 };
239 let queued_task = match throttled_queue.pop_front() {
240 Some(queued_task) => queued_task,
241 None => continue,
242 };
243 let msg = T::from_queued_task(queued_task);
244
245 if let Some(pipeline_id) = msg.pipeline_id() {
247 if !fully_active.contains(&pipeline_id) {
248 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
249 throttled_length -= 1;
253 continue;
254 }
255 }
256
257 self.msg_queue.borrow_mut().push_back(msg);
259 self.taken_task_counter
260 .set(self.taken_task_counter.get() + 1);
261 throttled_length -= 1;
262 },
263 }
264 }
265 }
266}