1use std::cell::Cell;
8use std::collections::VecDeque;
9use std::default::Default;
10
11use base::id::PipelineId;
12use crossbeam_channel::{self, Receiver, Sender};
13use rustc_hash::{FxHashMap, FxHashSet};
14use strum::VariantArray;
15
16use crate::dom::bindings::cell::DomRefCell;
17use crate::dom::worker::TrustedWorkerAddress;
18use crate::script_runtime::ScriptThreadEventCategory;
19use crate::task::TaskBox;
20use crate::task_source::TaskSourceName;
21
22#[derive(MallocSizeOf)]
23pub(crate) struct QueuedTask {
24 pub(crate) worker: Option<TrustedWorkerAddress>,
25 pub(crate) event_category: ScriptThreadEventCategory,
26 #[ignore_malloc_size_of = "TaskBox is difficult"]
27 pub(crate) task: Box<dyn TaskBox>,
28 pub(crate) pipeline_id: Option<PipelineId>,
29 pub(crate) task_source: TaskSourceName,
30}
31
32pub(crate) trait QueuedTaskConversion {
34 fn task_source_name(&self) -> Option<&TaskSourceName>;
35 fn pipeline_id(&self) -> Option<PipelineId>;
36 fn into_queued_task(self) -> Option<QueuedTask>;
37 fn from_queued_task(queued_task: QueuedTask) -> Self;
38 fn inactive_msg() -> Self;
39 fn wake_up_msg() -> Self;
40 fn is_wake_up(&self) -> bool;
41}
42
43#[derive(MallocSizeOf)]
44pub(crate) struct TaskQueue<T> {
45 port: Receiver<T>,
47 wake_up_sender: Sender<T>,
49 msg_queue: DomRefCell<VecDeque<T>>,
51 taken_task_counter: Cell<u64>,
53 throttled: DomRefCell<FxHashMap<TaskSourceName, VecDeque<QueuedTask>>>,
55 inactive: DomRefCell<FxHashMap<PipelineId, VecDeque<QueuedTask>>>,
57}
58
59impl<T: QueuedTaskConversion> TaskQueue<T> {
60 pub(crate) fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
61 TaskQueue {
62 port,
63 wake_up_sender,
64 msg_queue: DomRefCell::new(VecDeque::new()),
65 taken_task_counter: Default::default(),
66 throttled: Default::default(),
67 inactive: Default::default(),
68 }
69 }
70
71 fn release_tasks_for_fully_active_documents(
74 &self,
75 fully_active: &FxHashSet<PipelineId>,
76 ) -> Vec<T> {
77 self.inactive
78 .borrow_mut()
79 .iter_mut()
80 .filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
81 .flat_map(|(_, inactive_queue)| {
82 inactive_queue
83 .drain(0..)
84 .map(|queued_task| T::from_queued_task(queued_task))
85 })
86 .collect()
87 }
88
89 fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
92 let mut inactive = self.inactive.borrow_mut();
93 let inactive_queue = inactive.entry(*pipeline_id).or_default();
94 inactive_queue.push_back(
95 msg.into_queued_task()
96 .expect("Incoming messages should always be convertible into queued tasks"),
97 );
98 let mut msg_queue = self.msg_queue.borrow_mut();
99 if msg_queue.is_empty() {
100 msg_queue.push_back(T::inactive_msg());
105 }
106 }
107
108 fn process_incoming_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
111 let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
113
114 if !first_msg.is_wake_up() {
116 incoming.push(first_msg);
117 }
118
119 while let Ok(msg) = self.port.try_recv() {
121 if !msg.is_wake_up() {
122 incoming.push(msg);
123 }
124 }
125
126 let mut to_be_throttled = Vec::new();
129 let mut index = 0;
130 while index != incoming.len() {
131 index += 1; let task_source = match incoming[index - 1].task_source_name() {
134 Some(task_source) => task_source,
135 None => continue,
136 };
137
138 match task_source {
139 TaskSourceName::PerformanceTimeline => {
140 to_be_throttled.push(incoming.remove(index - 1));
141 index -= 1; },
143 _ => {
144 self.taken_task_counter
146 .set(self.taken_task_counter.get() + 1);
147 },
148 }
149 }
150
151 for msg in incoming {
152 if let Some(TaskSourceName::Rendering) = msg.task_source_name() {
155 self.msg_queue.borrow_mut().push_back(msg);
156 continue;
157 }
158 if let Some(pipeline_id) = msg.pipeline_id() {
159 if !fully_active.contains(&pipeline_id) {
160 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
161 continue;
162 }
163 }
164 self.msg_queue.borrow_mut().push_back(msg);
166 }
167
168 for msg in to_be_throttled {
169 let Some(queued_task) = msg.into_queued_task() else {
171 unreachable!(
172 "A message to be throttled should always be convertible into a queued task"
173 );
174 };
175 let mut throttled_tasks = self.throttled.borrow_mut();
176 throttled_tasks
177 .entry(queued_task.task_source)
178 .or_default()
179 .push_back(queued_task);
180 }
181 }
182
183 pub(crate) fn select(&self) -> &crossbeam_channel::Receiver<T> {
186 self.taken_task_counter.set(0);
188 &self.port
191 }
192
193 pub(crate) fn recv(&self) -> Result<T, ()> {
195 self.msg_queue.borrow_mut().pop_front().ok_or(())
196 }
197
198 pub(crate) fn take_tasks_and_recv(
200 &self,
201 fully_active: &FxHashSet<PipelineId>,
202 ) -> Result<T, ()> {
203 self.take_tasks(T::wake_up_msg(), fully_active);
204 self.recv()
205 }
206
207 pub(crate) fn take_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
210 const PER_ITERATION_MAX: u64 = 5;
212 self.process_incoming_tasks(first_msg, fully_active);
214 let mut throttled = self.throttled.borrow_mut();
215 let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
216 let mut task_source_cycler = TaskSourceName::VARIANTS.iter().cycle();
217 loop {
220 let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
221 let none_left = throttled_length == 0;
222 match (max_reached, none_left) {
223 (_, true) => break,
224 (true, false) => {
225 let _ = self.wake_up_sender.send(T::wake_up_msg());
229 break;
230 },
231 (false, false) => {
232 let task_source = task_source_cycler.next().unwrap();
234 let throttled_queue = match throttled.get_mut(task_source) {
235 Some(queue) => queue,
236 None => continue,
237 };
238 let queued_task = match throttled_queue.pop_front() {
239 Some(queued_task) => queued_task,
240 None => continue,
241 };
242 let msg = T::from_queued_task(queued_task);
243
244 if let Some(pipeline_id) = msg.pipeline_id() {
246 if !fully_active.contains(&pipeline_id) {
247 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
248 throttled_length -= 1;
252 continue;
253 }
254 }
255
256 self.msg_queue.borrow_mut().push_back(msg);
258 self.taken_task_counter
259 .set(self.taken_task_counter.get() + 1);
260 throttled_length -= 1;
261 },
262 }
263 }
264 }
265}