1use std::cell::Cell;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::default::Default;
10
11use base::id::PipelineId;
12use crossbeam_channel::{self, Receiver, Sender};
13use strum::VariantArray;
14
15use crate::dom::bindings::cell::DomRefCell;
16use crate::dom::worker::TrustedWorkerAddress;
17use crate::script_runtime::ScriptThreadEventCategory;
18use crate::task::TaskBox;
19use crate::task_source::TaskSourceName;
20
21pub(crate) type QueuedTask = (
22 Option<TrustedWorkerAddress>,
23 ScriptThreadEventCategory,
24 Box<dyn TaskBox>,
25 Option<PipelineId>,
26 TaskSourceName,
27);
28
29pub(crate) trait QueuedTaskConversion {
31 fn task_source_name(&self) -> Option<&TaskSourceName>;
32 fn pipeline_id(&self) -> Option<PipelineId>;
33 fn into_queued_task(self) -> Option<QueuedTask>;
34 fn from_queued_task(queued_task: QueuedTask) -> Self;
35 fn inactive_msg() -> Self;
36 fn wake_up_msg() -> Self;
37 fn is_wake_up(&self) -> bool;
38}
39
40pub(crate) struct TaskQueue<T> {
41 port: Receiver<T>,
43 wake_up_sender: Sender<T>,
45 msg_queue: DomRefCell<VecDeque<T>>,
47 taken_task_counter: Cell<u64>,
49 throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>>,
51 inactive: DomRefCell<HashMap<PipelineId, VecDeque<QueuedTask>>>,
53}
54
55impl<T: QueuedTaskConversion> TaskQueue<T> {
56 pub(crate) fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
57 TaskQueue {
58 port,
59 wake_up_sender,
60 msg_queue: DomRefCell::new(VecDeque::new()),
61 taken_task_counter: Default::default(),
62 throttled: Default::default(),
63 inactive: Default::default(),
64 }
65 }
66
67 fn release_tasks_for_fully_active_documents(
70 &self,
71 fully_active: &HashSet<PipelineId>,
72 ) -> Vec<T> {
73 self.inactive
74 .borrow_mut()
75 .iter_mut()
76 .filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
77 .flat_map(|(_, inactive_queue)| {
78 inactive_queue
79 .drain(0..)
80 .map(|queued_task| T::from_queued_task(queued_task))
81 })
82 .collect()
83 }
84
85 fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
88 let mut inactive = self.inactive.borrow_mut();
89 let inactive_queue = inactive.entry(*pipeline_id).or_default();
90 inactive_queue.push_back(
91 msg.into_queued_task()
92 .expect("Incoming messages should always be convertible into queued tasks"),
93 );
94 let mut msg_queue = self.msg_queue.borrow_mut();
95 if msg_queue.is_empty() {
96 msg_queue.push_back(T::inactive_msg());
101 }
102 }
103
104 fn process_incoming_tasks(&self, first_msg: T, fully_active: &HashSet<PipelineId>) {
107 let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
109
110 if !first_msg.is_wake_up() {
112 incoming.push(first_msg);
113 }
114
115 while let Ok(msg) = self.port.try_recv() {
117 if !msg.is_wake_up() {
118 incoming.push(msg);
119 }
120 }
121
122 let mut to_be_throttled = Vec::new();
125 let mut index = 0;
126 while index != incoming.len() {
127 index += 1; let task_source = match incoming[index - 1].task_source_name() {
130 Some(task_source) => task_source,
131 None => continue,
132 };
133
134 match task_source {
135 TaskSourceName::PerformanceTimeline => {
136 to_be_throttled.push(incoming.remove(index - 1));
137 index -= 1; },
139 _ => {
140 self.taken_task_counter
142 .set(self.taken_task_counter.get() + 1);
143 },
144 }
145 }
146
147 for msg in incoming {
148 if let Some(TaskSourceName::Rendering) = msg.task_source_name() {
151 self.msg_queue.borrow_mut().push_back(msg);
152 continue;
153 }
154 if let Some(pipeline_id) = msg.pipeline_id() {
155 if !fully_active.contains(&pipeline_id) {
156 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
157 continue;
158 }
159 }
160 self.msg_queue.borrow_mut().push_back(msg);
162 }
163
164 for msg in to_be_throttled {
165 let (worker, category, boxed, pipeline_id, task_source) = match msg.into_queued_task() {
167 Some(queued_task) => queued_task,
168 None => unreachable!(
169 "A message to be throttled should always be convertible into a queued task"
170 ),
171 };
172 let mut throttled_tasks = self.throttled.borrow_mut();
173 throttled_tasks.entry(task_source).or_default().push_back((
174 worker,
175 category,
176 boxed,
177 pipeline_id,
178 task_source,
179 ));
180 }
181 }
182
183 pub(crate) fn select(&self) -> &crossbeam_channel::Receiver<T> {
186 self.taken_task_counter.set(0);
188 &self.port
191 }
192
193 pub(crate) fn recv(&self) -> Result<T, ()> {
195 self.msg_queue.borrow_mut().pop_front().ok_or(())
196 }
197
198 pub(crate) fn take_tasks_and_recv(&self, fully_active: &HashSet<PipelineId>) -> Result<T, ()> {
200 self.take_tasks(T::wake_up_msg(), fully_active);
201 self.recv()
202 }
203
204 pub(crate) fn take_tasks(&self, first_msg: T, fully_active: &HashSet<PipelineId>) {
207 const PER_ITERATION_MAX: u64 = 5;
209 self.process_incoming_tasks(first_msg, fully_active);
211 let mut throttled = self.throttled.borrow_mut();
212 let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
213 let mut task_source_cycler = TaskSourceName::VARIANTS.iter().cycle();
214 loop {
217 let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
218 let none_left = throttled_length == 0;
219 match (max_reached, none_left) {
220 (_, true) => break,
221 (true, false) => {
222 let _ = self.wake_up_sender.send(T::wake_up_msg());
226 break;
227 },
228 (false, false) => {
229 let task_source = task_source_cycler.next().unwrap();
231 let throttled_queue = match throttled.get_mut(task_source) {
232 Some(queue) => queue,
233 None => continue,
234 };
235 let queued_task = match throttled_queue.pop_front() {
236 Some(queued_task) => queued_task,
237 None => continue,
238 };
239 let msg = T::from_queued_task(queued_task);
240
241 if let Some(pipeline_id) = msg.pipeline_id() {
243 if !fully_active.contains(&pipeline_id) {
244 self.store_task_for_inactive_pipeline(msg, &pipeline_id);
245 throttled_length -= 1;
249 continue;
250 }
251 }
252
253 self.msg_queue.borrow_mut().push_back(msg);
255 self.taken_task_counter
256 .set(self.taken_task_counter.get() + 1);
257 throttled_length -= 1;
258 },
259 }
260 }
261 }
262}