script/
task_queue.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Machinery for [task-queue](https://html.spec.whatwg.org/multipage/#task-queue).
6
7use std::cell::Cell;
8use std::collections::VecDeque;
9use std::default::Default;
10
11use base::id::PipelineId;
12use crossbeam_channel::{self, Receiver, Sender};
13use rustc_hash::{FxHashMap, FxHashSet};
14use strum::VariantArray;
15
16use crate::dom::bindings::cell::DomRefCell;
17use crate::dom::worker::TrustedWorkerAddress;
18use crate::script_runtime::ScriptThreadEventCategory;
19use crate::task::TaskBox;
20use crate::task_source::TaskSourceName;
21
22pub(crate) type QueuedTask = (
23    Option<TrustedWorkerAddress>,
24    ScriptThreadEventCategory,
25    Box<dyn TaskBox>,
26    Option<PipelineId>,
27    TaskSourceName,
28);
29
30/// Defining the operations used to convert from a msg T to a QueuedTask.
31pub(crate) trait QueuedTaskConversion {
32    fn task_source_name(&self) -> Option<&TaskSourceName>;
33    fn pipeline_id(&self) -> Option<PipelineId>;
34    fn into_queued_task(self) -> Option<QueuedTask>;
35    fn from_queued_task(queued_task: QueuedTask) -> Self;
36    fn inactive_msg() -> Self;
37    fn wake_up_msg() -> Self;
38    fn is_wake_up(&self) -> bool;
39}
40
41pub(crate) struct TaskQueue<T> {
42    /// The original port on which the task-sources send tasks as messages.
43    port: Receiver<T>,
44    /// A sender to ensure the port doesn't block on select while there are throttled tasks.
45    wake_up_sender: Sender<T>,
46    /// A queue from which the event-loop can drain tasks.
47    msg_queue: DomRefCell<VecDeque<T>>,
48    /// A "business" counter, reset for each iteration of the event-loop
49    taken_task_counter: Cell<u64>,
50    /// Tasks that will be throttled for as long as we are "busy".
51    throttled: DomRefCell<FxHashMap<TaskSourceName, VecDeque<QueuedTask>>>,
52    /// Tasks for not fully-active documents.
53    inactive: DomRefCell<FxHashMap<PipelineId, VecDeque<QueuedTask>>>,
54}
55
56impl<T: QueuedTaskConversion> TaskQueue<T> {
57    pub(crate) fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
58        TaskQueue {
59            port,
60            wake_up_sender,
61            msg_queue: DomRefCell::new(VecDeque::new()),
62            taken_task_counter: Default::default(),
63            throttled: Default::default(),
64            inactive: Default::default(),
65        }
66    }
67
68    /// Release previously held-back tasks for documents that are now fully-active.
69    /// <https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active>
70    fn release_tasks_for_fully_active_documents(
71        &self,
72        fully_active: &FxHashSet<PipelineId>,
73    ) -> Vec<T> {
74        self.inactive
75            .borrow_mut()
76            .iter_mut()
77            .filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
78            .flat_map(|(_, inactive_queue)| {
79                inactive_queue
80                    .drain(0..)
81                    .map(|queued_task| T::from_queued_task(queued_task))
82            })
83            .collect()
84    }
85
86    /// Hold back tasks for currently not fully-active documents.
87    /// <https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active>
88    fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
89        let mut inactive = self.inactive.borrow_mut();
90        let inactive_queue = inactive.entry(*pipeline_id).or_default();
91        inactive_queue.push_back(
92            msg.into_queued_task()
93                .expect("Incoming messages should always be convertible into queued tasks"),
94        );
95        let mut msg_queue = self.msg_queue.borrow_mut();
96        if msg_queue.is_empty() {
97            // Ensure there is at least one message.
98            // Otherwise if the just stored inactive message
99            // was the first and last of this iteration,
100            // it will result in a spurious wake-up of the event-loop.
101            msg_queue.push_back(T::inactive_msg());
102        }
103    }
104
105    /// Process incoming tasks, immediately sending priority ones downstream,
106    /// and categorizing potential throttles.
107    fn process_incoming_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
108        // 1. Make any previously stored task from now fully-active document available.
109        let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
110
111        // 2. Process the first message(artifact of the fact that select always returns a message).
112        if !first_msg.is_wake_up() {
113            incoming.push(first_msg);
114        }
115
116        // 3. Process any other incoming message.
117        while let Ok(msg) = self.port.try_recv() {
118            if !msg.is_wake_up() {
119                incoming.push(msg);
120            }
121        }
122
123        // 4. Filter tasks from non-priority task-sources.
124        // TODO: This can use `extract_if` once that is stabilized.
125        let mut to_be_throttled = Vec::new();
126        let mut index = 0;
127        while index != incoming.len() {
128            index += 1; // By default we go to the next index of the vector.
129
130            let task_source = match incoming[index - 1].task_source_name() {
131                Some(task_source) => task_source,
132                None => continue,
133            };
134
135            match task_source {
136                TaskSourceName::PerformanceTimeline => {
137                    to_be_throttled.push(incoming.remove(index - 1));
138                    index -= 1; // We've removed an element, so the next has the same index.
139                },
140                _ => {
141                    // A task that will not be throttled, start counting "business"
142                    self.taken_task_counter
143                        .set(self.taken_task_counter.get() + 1);
144                },
145            }
146        }
147
148        for msg in incoming {
149            // Always run "update the rendering" tasks,
150            // TODO: fix "fully active" concept for iframes.
151            if let Some(TaskSourceName::Rendering) = msg.task_source_name() {
152                self.msg_queue.borrow_mut().push_back(msg);
153                continue;
154            }
155            if let Some(pipeline_id) = msg.pipeline_id() {
156                if !fully_active.contains(&pipeline_id) {
157                    self.store_task_for_inactive_pipeline(msg, &pipeline_id);
158                    continue;
159                }
160            }
161            // Immediately send non-throttled tasks for processing.
162            self.msg_queue.borrow_mut().push_back(msg);
163        }
164
165        for msg in to_be_throttled {
166            // Categorize tasks per task queue.
167            let (worker, category, boxed, pipeline_id, task_source) = match msg.into_queued_task() {
168                Some(queued_task) => queued_task,
169                None => unreachable!(
170                    "A message to be throttled should always be convertible into a queued task"
171                ),
172            };
173            let mut throttled_tasks = self.throttled.borrow_mut();
174            throttled_tasks.entry(task_source).or_default().push_back((
175                worker,
176                category,
177                boxed,
178                pipeline_id,
179                task_source,
180            ));
181        }
182    }
183
184    /// Reset the queue for a new iteration of the event-loop,
185    /// returning the port about whose readiness we want to be notified.
186    pub(crate) fn select(&self) -> &crossbeam_channel::Receiver<T> {
187        // This is a new iteration of the event-loop, so we reset the "business" counter.
188        self.taken_task_counter.set(0);
189        // We want to be notified when the script-port is ready to receive.
190        // Hence that's the one we need to include in the select.
191        &self.port
192    }
193
194    /// Take a message from the front of the queue, without waiting if empty.
195    pub(crate) fn recv(&self) -> Result<T, ()> {
196        self.msg_queue.borrow_mut().pop_front().ok_or(())
197    }
198
199    /// Take all tasks again and then run `recv()`.
200    pub(crate) fn take_tasks_and_recv(
201        &self,
202        fully_active: &FxHashSet<PipelineId>,
203    ) -> Result<T, ()> {
204        self.take_tasks(T::wake_up_msg(), fully_active);
205        self.recv()
206    }
207
208    /// Drain the queue for the current iteration of the event-loop.
209    /// Holding-back throttles above a given high-water mark.
210    pub(crate) fn take_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
211        // High-watermark: once reached, throttled tasks will be held-back.
212        const PER_ITERATION_MAX: u64 = 5;
213        // Always first check for new tasks, but don't reset 'taken_task_counter'.
214        self.process_incoming_tasks(first_msg, fully_active);
215        let mut throttled = self.throttled.borrow_mut();
216        let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
217        let mut task_source_cycler = TaskSourceName::VARIANTS.iter().cycle();
218        // "being busy", is defined as having more than x tasks for this loop's iteration.
219        // As long as we're not busy, and there are throttled tasks left:
220        loop {
221            let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
222            let none_left = throttled_length == 0;
223            match (max_reached, none_left) {
224                (_, true) => break,
225                (true, false) => {
226                    // We have reached the high-watermark for this iteration of the event-loop,
227                    // yet also have throttled messages left in the queue.
228                    // Ensure the select wakes up in the next iteration of the event-loop
229                    let _ = self.wake_up_sender.send(T::wake_up_msg());
230                    break;
231                },
232                (false, false) => {
233                    // Cycle through non-priority task sources, taking one throttled task from each.
234                    let task_source = task_source_cycler.next().unwrap();
235                    let throttled_queue = match throttled.get_mut(task_source) {
236                        Some(queue) => queue,
237                        None => continue,
238                    };
239                    let queued_task = match throttled_queue.pop_front() {
240                        Some(queued_task) => queued_task,
241                        None => continue,
242                    };
243                    let msg = T::from_queued_task(queued_task);
244
245                    // Hold back tasks for currently inactive documents.
246                    if let Some(pipeline_id) = msg.pipeline_id() {
247                        if !fully_active.contains(&pipeline_id) {
248                            self.store_task_for_inactive_pipeline(msg, &pipeline_id);
249                            // Reduce the length of throttles,
250                            // but don't add the task to "msg_queue",
251                            // and neither increment "taken_task_counter".
252                            throttled_length -= 1;
253                            continue;
254                        }
255                    }
256
257                    // Make the task available for the event-loop to handle as a message.
258                    self.msg_queue.borrow_mut().push_back(msg);
259                    self.taken_task_counter
260                        .set(self.taken_task_counter.get() + 1);
261                    throttled_length -= 1;
262                },
263            }
264        }
265    }
266}