script/
task_queue.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Machinery for [task-queue](https://html.spec.whatwg.org/multipage/#task-queue).
6
7use std::cell::Cell;
8use std::collections::VecDeque;
9use std::default::Default;
10
11use base::id::PipelineId;
12use crossbeam_channel::{self, Receiver, Sender};
13use rustc_hash::{FxHashMap, FxHashSet};
14use strum::VariantArray;
15
16use crate::dom::bindings::cell::DomRefCell;
17use crate::dom::worker::TrustedWorkerAddress;
18use crate::script_runtime::ScriptThreadEventCategory;
19use crate::task::TaskBox;
20use crate::task_source::TaskSourceName;
21
22#[derive(MallocSizeOf)]
23pub(crate) struct QueuedTask {
24    pub(crate) worker: Option<TrustedWorkerAddress>,
25    pub(crate) event_category: ScriptThreadEventCategory,
26    #[ignore_malloc_size_of = "TaskBox is difficult"]
27    pub(crate) task: Box<dyn TaskBox>,
28    pub(crate) pipeline_id: Option<PipelineId>,
29    pub(crate) task_source: TaskSourceName,
30}
31
32/// Defining the operations used to convert from a msg T to a QueuedTask.
33pub(crate) trait QueuedTaskConversion {
34    fn task_source_name(&self) -> Option<&TaskSourceName>;
35    fn pipeline_id(&self) -> Option<PipelineId>;
36    fn into_queued_task(self) -> Option<QueuedTask>;
37    fn from_queued_task(queued_task: QueuedTask) -> Self;
38    fn inactive_msg() -> Self;
39    fn wake_up_msg() -> Self;
40    fn is_wake_up(&self) -> bool;
41}
42
43#[derive(MallocSizeOf)]
44pub(crate) struct TaskQueue<T> {
45    /// The original port on which the task-sources send tasks as messages.
46    port: Receiver<T>,
47    /// A sender to ensure the port doesn't block on select while there are throttled tasks.
48    wake_up_sender: Sender<T>,
49    /// A queue from which the event-loop can drain tasks.
50    msg_queue: DomRefCell<VecDeque<T>>,
51    /// A "business" counter, reset for each iteration of the event-loop
52    taken_task_counter: Cell<u64>,
53    /// Tasks that will be throttled for as long as we are "busy".
54    throttled: DomRefCell<FxHashMap<TaskSourceName, VecDeque<QueuedTask>>>,
55    /// Tasks for not fully-active documents.
56    inactive: DomRefCell<FxHashMap<PipelineId, VecDeque<QueuedTask>>>,
57}
58
59impl<T: QueuedTaskConversion> TaskQueue<T> {
60    pub(crate) fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
61        TaskQueue {
62            port,
63            wake_up_sender,
64            msg_queue: DomRefCell::new(VecDeque::new()),
65            taken_task_counter: Default::default(),
66            throttled: Default::default(),
67            inactive: Default::default(),
68        }
69    }
70
71    /// Release previously held-back tasks for documents that are now fully-active.
72    /// <https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active>
73    fn release_tasks_for_fully_active_documents(
74        &self,
75        fully_active: &FxHashSet<PipelineId>,
76    ) -> Vec<T> {
77        self.inactive
78            .borrow_mut()
79            .iter_mut()
80            .filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
81            .flat_map(|(_, inactive_queue)| {
82                inactive_queue
83                    .drain(0..)
84                    .map(|queued_task| T::from_queued_task(queued_task))
85            })
86            .collect()
87    }
88
89    /// Hold back tasks for currently not fully-active documents.
90    /// <https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active>
91    fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
92        let mut inactive = self.inactive.borrow_mut();
93        let inactive_queue = inactive.entry(*pipeline_id).or_default();
94        inactive_queue.push_back(
95            msg.into_queued_task()
96                .expect("Incoming messages should always be convertible into queued tasks"),
97        );
98        let mut msg_queue = self.msg_queue.borrow_mut();
99        if msg_queue.is_empty() {
100            // Ensure there is at least one message.
101            // Otherwise if the just stored inactive message
102            // was the first and last of this iteration,
103            // it will result in a spurious wake-up of the event-loop.
104            msg_queue.push_back(T::inactive_msg());
105        }
106    }
107
108    /// Process incoming tasks, immediately sending priority ones downstream,
109    /// and categorizing potential throttles.
110    fn process_incoming_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
111        // 1. Make any previously stored task from now fully-active document available.
112        let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
113
114        // 2. Process the first message(artifact of the fact that select always returns a message).
115        if !first_msg.is_wake_up() {
116            incoming.push(first_msg);
117        }
118
119        // 3. Process any other incoming message.
120        while let Ok(msg) = self.port.try_recv() {
121            if !msg.is_wake_up() {
122                incoming.push(msg);
123            }
124        }
125
126        // 4. Filter tasks from non-priority task-sources.
127        // TODO: This can use `extract_if` once that is stabilized.
128        let mut to_be_throttled = Vec::new();
129        let mut index = 0;
130        while index != incoming.len() {
131            index += 1; // By default we go to the next index of the vector.
132
133            let task_source = match incoming[index - 1].task_source_name() {
134                Some(task_source) => task_source,
135                None => continue,
136            };
137
138            match task_source {
139                TaskSourceName::PerformanceTimeline => {
140                    to_be_throttled.push(incoming.remove(index - 1));
141                    index -= 1; // We've removed an element, so the next has the same index.
142                },
143                _ => {
144                    // A task that will not be throttled, start counting "business"
145                    self.taken_task_counter
146                        .set(self.taken_task_counter.get() + 1);
147                },
148            }
149        }
150
151        for msg in incoming {
152            // Always run "update the rendering" tasks,
153            // TODO: fix "fully active" concept for iframes.
154            if let Some(TaskSourceName::Rendering) = msg.task_source_name() {
155                self.msg_queue.borrow_mut().push_back(msg);
156                continue;
157            }
158            if let Some(pipeline_id) = msg.pipeline_id() {
159                if !fully_active.contains(&pipeline_id) {
160                    self.store_task_for_inactive_pipeline(msg, &pipeline_id);
161                    continue;
162                }
163            }
164            // Immediately send non-throttled tasks for processing.
165            self.msg_queue.borrow_mut().push_back(msg);
166        }
167
168        for msg in to_be_throttled {
169            // Categorize tasks per task queue.
170            let Some(queued_task) = msg.into_queued_task() else {
171                unreachable!(
172                    "A message to be throttled should always be convertible into a queued task"
173                );
174            };
175            let mut throttled_tasks = self.throttled.borrow_mut();
176            throttled_tasks
177                .entry(queued_task.task_source)
178                .or_default()
179                .push_back(queued_task);
180        }
181    }
182
183    /// Reset the queue for a new iteration of the event-loop,
184    /// returning the port about whose readiness we want to be notified.
185    pub(crate) fn select(&self) -> &crossbeam_channel::Receiver<T> {
186        // This is a new iteration of the event-loop, so we reset the "business" counter.
187        self.taken_task_counter.set(0);
188        // We want to be notified when the script-port is ready to receive.
189        // Hence that's the one we need to include in the select.
190        &self.port
191    }
192
193    /// Take a message from the front of the queue, without waiting if empty.
194    pub(crate) fn recv(&self) -> Result<T, ()> {
195        self.msg_queue.borrow_mut().pop_front().ok_or(())
196    }
197
198    /// Take all tasks again and then run `recv()`.
199    pub(crate) fn take_tasks_and_recv(
200        &self,
201        fully_active: &FxHashSet<PipelineId>,
202    ) -> Result<T, ()> {
203        self.take_tasks(T::wake_up_msg(), fully_active);
204        self.recv()
205    }
206
207    /// Drain the queue for the current iteration of the event-loop.
208    /// Holding-back throttles above a given high-water mark.
209    pub(crate) fn take_tasks(&self, first_msg: T, fully_active: &FxHashSet<PipelineId>) {
210        // High-watermark: once reached, throttled tasks will be held-back.
211        const PER_ITERATION_MAX: u64 = 5;
212        // Always first check for new tasks, but don't reset 'taken_task_counter'.
213        self.process_incoming_tasks(first_msg, fully_active);
214        let mut throttled = self.throttled.borrow_mut();
215        let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
216        let mut task_source_cycler = TaskSourceName::VARIANTS.iter().cycle();
217        // "being busy", is defined as having more than x tasks for this loop's iteration.
218        // As long as we're not busy, and there are throttled tasks left:
219        loop {
220            let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
221            let none_left = throttled_length == 0;
222            match (max_reached, none_left) {
223                (_, true) => break,
224                (true, false) => {
225                    // We have reached the high-watermark for this iteration of the event-loop,
226                    // yet also have throttled messages left in the queue.
227                    // Ensure the select wakes up in the next iteration of the event-loop
228                    let _ = self.wake_up_sender.send(T::wake_up_msg());
229                    break;
230                },
231                (false, false) => {
232                    // Cycle through non-priority task sources, taking one throttled task from each.
233                    let task_source = task_source_cycler.next().unwrap();
234                    let throttled_queue = match throttled.get_mut(task_source) {
235                        Some(queue) => queue,
236                        None => continue,
237                    };
238                    let queued_task = match throttled_queue.pop_front() {
239                        Some(queued_task) => queued_task,
240                        None => continue,
241                    };
242                    let msg = T::from_queued_task(queued_task);
243
244                    // Hold back tasks for currently inactive documents.
245                    if let Some(pipeline_id) = msg.pipeline_id() {
246                        if !fully_active.contains(&pipeline_id) {
247                            self.store_task_for_inactive_pipeline(msg, &pipeline_id);
248                            // Reduce the length of throttles,
249                            // but don't add the task to "msg_queue",
250                            // and neither increment "taken_task_counter".
251                            throttled_length -= 1;
252                            continue;
253                        }
254                    }
255
256                    // Make the task available for the event-loop to handle as a message.
257                    self.msg_queue.borrow_mut().push_back(msg);
258                    self.taken_task_counter
259                        .set(self.taken_task_counter.get() + 1);
260                    throttled_length -= 1;
261                },
262            }
263        }
264    }
265}