1use std::cell::OnceCell;
14use std::cmp::max;
15use std::collections::hash_map;
16use std::rc::Rc;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicIsize, Ordering};
19use std::thread;
20
21use base::IpcSend;
22use base::id::PipelineId;
23use crossbeam_channel::{Receiver, Sender, unbounded};
24use dom_struct::dom_struct;
25use js::jsapi::{GCReason, JS_GC, JS_GetGCParameter, JSGCParamKey, JSTracer};
26use malloc_size_of::malloc_size_of_is_0;
27use net_traits::policy_container::PolicyContainer;
28use net_traits::request::{Destination, RequestBuilder, RequestMode};
29use rustc_hash::FxHashMap;
30use servo_url::{ImmutableOrigin, ServoUrl};
31use style::thread_state::{self, ThreadState};
32use swapper::{Swapper, swapper};
33use uuid::Uuid;
34
35use crate::conversions::Convert;
36use crate::dom::bindings::codegen::Bindings::RequestBinding::RequestCredentials;
37use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
38use crate::dom::bindings::codegen::Bindings::WorkletBinding::{WorkletMethods, WorkletOptions};
39use crate::dom::bindings::error::Error;
40use crate::dom::bindings::inheritance::Castable;
41use crate::dom::bindings::refcounted::TrustedPromise;
42use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
43use crate::dom::bindings::root::{Dom, DomRoot};
44use crate::dom::bindings::str::USVString;
45use crate::dom::bindings::trace::{CustomTraceable, JSTraceable, RootedTraceableBox};
46use crate::dom::csp::Violation;
47use crate::dom::globalscope::GlobalScope;
48use crate::dom::promise::Promise;
49#[cfg(feature = "testbinding")]
50use crate::dom::testworkletglobalscope::TestWorkletTask;
51use crate::dom::window::Window;
52use crate::dom::workletglobalscope::{
53 WorkletGlobalScope, WorkletGlobalScopeInit, WorkletGlobalScopeType, WorkletTask,
54};
55use crate::fetch::{CspViolationsProcessor, load_whole_resource};
56use crate::messaging::{CommonScriptMsg, MainThreadScriptMsg};
57use crate::realms::InRealm;
58use crate::script_runtime::{CanGc, Runtime, ScriptThreadEventCategory};
59use crate::script_thread::ScriptThread;
60use crate::task::TaskBox;
61use crate::task_source::TaskSourceName;
62
63const WORKLET_THREAD_POOL_SIZE: u32 = 3;
65const MIN_GC_THRESHOLD: u32 = 1_000_000;
66
67#[derive(JSTraceable, MallocSizeOf)]
68struct DroppableField {
69 worklet_id: WorkletId,
70 #[ignore_malloc_size_of = "Difficult to measure memory usage of Rc<...> types"]
73 thread_pool: OnceCell<Rc<WorkletThreadPool>>,
74}
75
76impl Drop for DroppableField {
77 fn drop(&mut self) {
78 let worklet_id = self.worklet_id;
79 if let Some(thread_pool) = self.thread_pool.get_mut() {
80 thread_pool.exit_worklet(worklet_id);
81 }
82 }
83}
84
85#[dom_struct]
86pub(crate) struct Worklet {
88 reflector: Reflector,
89 window: Dom<Window>,
90 global_type: WorkletGlobalScopeType,
91 droppable_field: DroppableField,
92}
93
94impl Worklet {
95 fn new_inherited(window: &Window, global_type: WorkletGlobalScopeType) -> Worklet {
96 Worklet {
97 reflector: Reflector::new(),
98 window: Dom::from_ref(window),
99 global_type,
100 droppable_field: DroppableField {
101 worklet_id: WorkletId::new(),
102 thread_pool: OnceCell::new(),
103 },
104 }
105 }
106
107 pub(crate) fn new(
108 window: &Window,
109 global_type: WorkletGlobalScopeType,
110 can_gc: CanGc,
111 ) -> DomRoot<Worklet> {
112 debug!("Creating worklet {:?}.", global_type);
113 reflect_dom_object(
114 Box::new(Worklet::new_inherited(window, global_type)),
115 window,
116 can_gc,
117 )
118 }
119
120 pub(crate) fn worklet_id(&self) -> WorkletId {
121 self.droppable_field.worklet_id
122 }
123
124 #[allow(dead_code)]
125 pub(crate) fn worklet_global_scope_type(&self) -> WorkletGlobalScopeType {
126 self.global_type
127 }
128}
129
130impl WorkletMethods<crate::DomTypeHolder> for Worklet {
131 fn AddModule(
133 &self,
134 module_url: USVString,
135 options: &WorkletOptions,
136 comp: InRealm,
137 can_gc: CanGc,
138 ) -> Rc<Promise> {
139 let promise = Promise::new_in_current_realm(comp, can_gc);
141
142 let module_url_record = match self.window.Document().base_url().join(&module_url.0) {
144 Ok(url) => url,
145 Err(err) => {
146 debug!("URL {:?} parse error {:?}.", module_url.0, err);
148 promise.reject_error(Error::Syntax(None), can_gc);
149 return promise;
150 },
151 };
152 debug!("Adding Worklet module {}.", module_url_record);
153
154 let pending_tasks_struct = PendingTasksStruct::new();
156 let global_scope = self.window.as_global_scope();
157
158 self.droppable_field
159 .thread_pool
160 .get_or_init(|| ScriptThread::worklet_thread_pool(self.global().image_cache()))
161 .fetch_and_invoke_a_worklet_script(
162 self.window.pipeline_id(),
163 self.droppable_field.worklet_id,
164 self.global_type,
165 self.window.origin().immutable().clone(),
166 global_scope.api_base_url(),
167 module_url_record,
168 global_scope.policy_container(),
169 options.credentials,
170 pending_tasks_struct,
171 &promise,
172 );
173
174 debug!("Returning promise.");
176 promise
177 }
178}
179
180#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, PartialEq)]
182pub(crate) struct WorkletId(#[no_trace] Uuid);
183
184malloc_size_of_is_0!(WorkletId);
185
186impl WorkletId {
187 fn new() -> WorkletId {
188 WorkletId(servo_rand::random_uuid())
189 }
190}
191
192#[derive(Clone, Debug)]
194struct PendingTasksStruct(Arc<AtomicIsize>);
195
196impl PendingTasksStruct {
197 fn new() -> PendingTasksStruct {
198 PendingTasksStruct(Arc::new(AtomicIsize::new(
199 WORKLET_THREAD_POOL_SIZE as isize,
200 )))
201 }
202
203 fn set_counter_to(&self, value: isize) -> isize {
204 self.0.swap(value, Ordering::AcqRel)
205 }
206
207 fn decrement_counter_by(&self, offset: isize) -> isize {
208 self.0.fetch_sub(offset, Ordering::AcqRel)
209 }
210}
211
212#[derive(Clone, JSTraceable)]
262pub(crate) struct WorkletThreadPool {
263 #[no_trace]
265 primary_sender: Sender<WorkletData>,
266 #[no_trace]
267 hot_backup_sender: Sender<WorkletData>,
268 #[no_trace]
269 cold_backup_sender: Sender<WorkletData>,
270 #[no_trace]
272 control_sender_0: Sender<WorkletControl>,
273 #[no_trace]
274 control_sender_1: Sender<WorkletControl>,
275 #[no_trace]
276 control_sender_2: Sender<WorkletControl>,
277}
278
279impl Drop for WorkletThreadPool {
280 fn drop(&mut self) {
281 let _ = self.cold_backup_sender.send(WorkletData::Quit);
282 let _ = self.hot_backup_sender.send(WorkletData::Quit);
283 let _ = self.primary_sender.send(WorkletData::Quit);
284 }
285}
286
287impl WorkletThreadPool {
288 pub(crate) fn spawn(global_init: WorkletGlobalScopeInit) -> WorkletThreadPool {
291 let primary_role = WorkletThreadRole::new(false, false);
292 let hot_backup_role = WorkletThreadRole::new(true, false);
293 let cold_backup_role = WorkletThreadRole::new(false, true);
294 let primary_sender = primary_role.sender.clone();
295 let hot_backup_sender = hot_backup_role.sender.clone();
296 let cold_backup_sender = cold_backup_role.sender.clone();
297 let init = WorkletThreadInit {
298 primary_sender: primary_sender.clone(),
299 hot_backup_sender: hot_backup_sender.clone(),
300 cold_backup_sender: cold_backup_sender.clone(),
301 global_init,
302 };
303 WorkletThreadPool {
304 primary_sender,
305 hot_backup_sender,
306 cold_backup_sender,
307 control_sender_0: WorkletThread::spawn(primary_role, init.clone(), 0),
308 control_sender_1: WorkletThread::spawn(hot_backup_role, init.clone(), 1),
309 control_sender_2: WorkletThread::spawn(cold_backup_role, init, 2),
310 }
311 }
312
313 #[allow(clippy::too_many_arguments)]
318 fn fetch_and_invoke_a_worklet_script(
319 &self,
320 pipeline_id: PipelineId,
321 worklet_id: WorkletId,
322 global_type: WorkletGlobalScopeType,
323 origin: ImmutableOrigin,
324 base_url: ServoUrl,
325 script_url: ServoUrl,
326 policy_container: PolicyContainer,
327 credentials: RequestCredentials,
328 pending_tasks_struct: PendingTasksStruct,
329 promise: &Rc<Promise>,
330 ) {
331 for sender in &[
333 &self.control_sender_0,
334 &self.control_sender_1,
335 &self.control_sender_2,
336 ] {
337 let _ = sender.send(WorkletControl::FetchAndInvokeAWorkletScript {
338 pipeline_id,
339 worklet_id,
340 global_type,
341 origin: origin.clone(),
342 base_url: base_url.clone(),
343 script_url: script_url.clone(),
344 policy_container: policy_container.clone(),
345 credentials,
346 pending_tasks_struct: pending_tasks_struct.clone(),
347 promise: TrustedPromise::new(promise.clone()),
348 });
349 }
350 self.wake_threads();
351 }
352
353 pub(crate) fn exit_worklet(&self, worklet_id: WorkletId) {
354 for sender in &[
355 &self.control_sender_0,
356 &self.control_sender_1,
357 &self.control_sender_2,
358 ] {
359 let _ = sender.send(WorkletControl::ExitWorklet(worklet_id));
360 }
361 self.wake_threads();
362 }
363
364 #[cfg(feature = "testbinding")]
366 pub(crate) fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
367 let (sender, receiver) = unbounded();
368 let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
369 let _ = self.primary_sender.send(msg);
370 receiver.recv().expect("Test worklet has died?")
371 }
372
373 fn wake_threads(&self) {
374 let _ = self.cold_backup_sender.send(WorkletData::WakeUp);
376 let _ = self.hot_backup_sender.send(WorkletData::WakeUp);
377 let _ = self.primary_sender.send(WorkletData::WakeUp);
378 }
379}
380
381enum WorkletData {
383 Task(WorkletId, WorkletTask),
384 StartSwapRoles(Sender<WorkletData>),
385 FinishSwapRoles(Swapper<WorkletThreadRole>),
386 WakeUp,
387 Quit,
388}
389
390enum WorkletControl {
392 ExitWorklet(WorkletId),
393 FetchAndInvokeAWorkletScript {
394 pipeline_id: PipelineId,
395 worklet_id: WorkletId,
396 global_type: WorkletGlobalScopeType,
397 origin: ImmutableOrigin,
398 base_url: ServoUrl,
399 script_url: ServoUrl,
400 policy_container: PolicyContainer,
401 credentials: RequestCredentials,
402 pending_tasks_struct: PendingTasksStruct,
403 promise: TrustedPromise,
404 },
405}
406
407struct WorkletThreadRole {
414 receiver: Receiver<WorkletData>,
415 sender: Sender<WorkletData>,
416 is_hot_backup: bool,
417 is_cold_backup: bool,
418}
419
420impl WorkletThreadRole {
421 fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
422 let (sender, receiver) = unbounded();
423 WorkletThreadRole {
424 sender,
425 receiver,
426 is_hot_backup,
427 is_cold_backup,
428 }
429 }
430}
431
432#[derive(Clone)]
434struct WorkletThreadInit {
435 primary_sender: Sender<WorkletData>,
437 hot_backup_sender: Sender<WorkletData>,
438 cold_backup_sender: Sender<WorkletData>,
439
440 global_init: WorkletGlobalScopeInit,
442}
443
444struct WorkletCspProcessor {}
445
446impl CspViolationsProcessor for WorkletCspProcessor {
447 fn process_csp_violations(&self, _violations: Vec<Violation>) {}
448}
449
450#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
452struct WorkletThread {
453 role: WorkletThreadRole,
455
456 control_receiver: Receiver<WorkletControl>,
458
459 primary_sender: Sender<WorkletData>,
461 hot_backup_sender: Sender<WorkletData>,
462 cold_backup_sender: Sender<WorkletData>,
463
464 global_init: WorkletGlobalScopeInit,
466
467 global_scopes: FxHashMap<WorkletId, Dom<WorkletGlobalScope>>,
469
470 control_buffer: Option<WorkletControl>,
472
473 runtime: Runtime,
475 should_gc: bool,
476 gc_threshold: u32,
477}
478
479#[allow(unsafe_code)]
480unsafe impl JSTraceable for WorkletThread {
481 unsafe fn trace(&self, trc: *mut JSTracer) {
482 debug!("Tracing worklet thread.");
483 self.global_scopes.trace(trc);
484 }
485}
486
487impl WorkletThread {
488 #[allow(unsafe_code)]
490 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
491 fn spawn(
492 role: WorkletThreadRole,
493 init: WorkletThreadInit,
494 thread_index: u8,
495 ) -> Sender<WorkletControl> {
496 let (control_sender, control_receiver) = unbounded();
497 let _ = thread::Builder::new()
498 .name(format!("Worklet#{thread_index}"))
499 .spawn(move || {
500 debug!("Initializing worklet thread.");
504 thread_state::initialize(ThreadState::SCRIPT | ThreadState::IN_WORKER);
505 let mut thread = RootedTraceableBox::new(WorkletThread {
506 role,
507 control_receiver,
508 primary_sender: init.primary_sender,
509 hot_backup_sender: init.hot_backup_sender,
510 cold_backup_sender: init.cold_backup_sender,
511 global_init: init.global_init,
512 global_scopes: FxHashMap::default(),
513 control_buffer: None,
514 runtime: Runtime::new(None),
515 should_gc: false,
516 gc_threshold: MIN_GC_THRESHOLD,
517 });
518 thread.run(CanGc::note());
519 })
520 .expect("Couldn't start worklet thread");
521 control_sender
522 }
523
524 fn run(&mut self, can_gc: CanGc) {
526 loop {
527 let message = self.role.receiver.recv().unwrap();
529 match message {
530 WorkletData::Task(id, task) => {
532 self.perform_a_worklet_task(id, task);
533 },
534 WorkletData::StartSwapRoles(sender) => {
541 let (our_swapper, their_swapper) = swapper();
542 match sender.send(WorkletData::FinishSwapRoles(their_swapper)) {
543 Ok(_) => {},
544 Err(_) => {
545 return;
548 },
549 };
550 let _ = our_swapper.swap(&mut self.role);
551 },
552 WorkletData::FinishSwapRoles(swapper) => {
555 let _ = swapper.swap(&mut self.role);
556 },
557 WorkletData::WakeUp => {},
559 WorkletData::Quit => {
561 return;
562 },
563 }
564 if self.role.is_cold_backup {
568 if let Some(control) = self.control_buffer.take() {
569 self.process_control(control, can_gc);
570 }
571 while let Ok(control) = self.control_receiver.try_recv() {
572 self.process_control(control, can_gc);
573 }
574 self.gc();
575 } else if self.control_buffer.is_none() {
576 if let Ok(control) = self.control_receiver.try_recv() {
577 self.control_buffer = Some(control);
578 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
579 let _ = self.cold_backup_sender.send(msg);
580 }
581 }
582 if self.current_memory_usage() > self.gc_threshold {
586 if self.role.is_hot_backup || self.role.is_cold_backup {
587 self.should_gc = false;
588 self.gc();
589 } else if !self.should_gc {
590 self.should_gc = true;
591 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
592 let _ = self.hot_backup_sender.send(msg);
593 }
594 }
595 }
596 }
597
598 #[allow(unsafe_code)]
600 fn current_memory_usage(&self) -> u32 {
601 unsafe { JS_GetGCParameter(self.runtime.cx(), JSGCParamKey::JSGC_BYTES) }
602 }
603
604 #[allow(unsafe_code)]
606 fn gc(&mut self) {
607 debug!(
608 "BEGIN GC (usage = {}, threshold = {}).",
609 self.current_memory_usage(),
610 self.gc_threshold
611 );
612 unsafe { JS_GC(self.runtime.cx(), GCReason::API) };
613 self.gc_threshold = max(MIN_GC_THRESHOLD, self.current_memory_usage() * 2);
614 debug!(
615 "END GC (usage = {}, threshold = {}).",
616 self.current_memory_usage(),
617 self.gc_threshold
618 );
619 }
620
621 fn get_worklet_global_scope(
624 &mut self,
625 pipeline_id: PipelineId,
626 worklet_id: WorkletId,
627 global_type: WorkletGlobalScopeType,
628 base_url: ServoUrl,
629 ) -> DomRoot<WorkletGlobalScope> {
630 match self.global_scopes.entry(worklet_id) {
631 hash_map::Entry::Occupied(entry) => DomRoot::from_ref(entry.get()),
632 hash_map::Entry::Vacant(entry) => {
633 debug!("Creating new worklet global scope.");
634 let executor = WorkletExecutor::new(worklet_id, self.primary_sender.clone());
635 let result = WorkletGlobalScope::new(
636 global_type,
637 pipeline_id,
638 base_url,
639 executor,
640 &self.global_init,
641 );
642 entry.insert(Dom::from_ref(&*result));
643 result
644 },
645 }
646 }
647
648 #[allow(clippy::too_many_arguments)]
651 fn fetch_and_invoke_a_worklet_script(
652 &self,
653 global_scope: &WorkletGlobalScope,
654 pipeline_id: PipelineId,
655 origin: ImmutableOrigin,
656 script_url: ServoUrl,
657 policy_container: PolicyContainer,
658 credentials: RequestCredentials,
659 pending_tasks_struct: PendingTasksStruct,
660 promise: TrustedPromise,
661 can_gc: CanGc,
662 ) {
663 debug!("Fetching from {}.", script_url);
664 let global = global_scope.upcast::<GlobalScope>();
672 let resource_fetcher = self.global_init.resource_threads.sender();
673 let request = RequestBuilder::new(None, script_url, global.get_referrer())
674 .destination(Destination::Script)
675 .mode(RequestMode::CorsMode)
676 .credentials_mode(credentials.convert())
677 .policy_container(policy_container)
678 .origin(origin);
679
680 let script = load_whole_resource(
681 request,
682 &resource_fetcher,
683 global,
684 &WorkletCspProcessor {},
685 can_gc,
686 )
687 .ok()
688 .and_then(|(_, bytes)| String::from_utf8(bytes).ok());
689
690 let ok = script
697 .map(|s| global_scope.evaluate_js(&s, can_gc).is_ok())
698 .unwrap_or(false);
699
700 if !ok {
701 debug!("Failed to load script.");
703 let old_counter = pending_tasks_struct.set_counter_to(-1);
704 if old_counter > 0 {
705 self.run_in_script_thread(promise.reject_task(Error::Abort));
706 }
707 } else {
708 debug!("Finished adding script.");
710 let old_counter = pending_tasks_struct.decrement_counter_by(1);
711 if old_counter == 1 {
712 debug!("Resolving promise.");
713 let msg = MainThreadScriptMsg::WorkletLoaded(pipeline_id);
714 self.global_init
715 .to_script_thread_sender
716 .send(msg)
717 .expect("Worklet thread outlived script thread.");
718 self.run_in_script_thread(promise.resolve_task(()));
719 }
720 }
721 }
722
723 fn perform_a_worklet_task(&self, worklet_id: WorkletId, task: WorkletTask) {
725 match self.global_scopes.get(&worklet_id) {
726 Some(global) => global.perform_a_worklet_task(task),
727 None => warn!("No such worklet as {:?}.", worklet_id),
728 }
729 }
730
731 fn process_control(&mut self, control: WorkletControl, can_gc: CanGc) {
733 match control {
734 WorkletControl::ExitWorklet(worklet_id) => {
735 self.global_scopes.remove(&worklet_id);
736 },
737 WorkletControl::FetchAndInvokeAWorkletScript {
738 pipeline_id,
739 worklet_id,
740 global_type,
741 origin,
742 base_url,
743 script_url,
744 policy_container,
745 credentials,
746 pending_tasks_struct,
747 promise,
748 } => {
749 let global =
750 self.get_worklet_global_scope(pipeline_id, worklet_id, global_type, base_url);
751 self.fetch_and_invoke_a_worklet_script(
752 &global,
753 pipeline_id,
754 origin,
755 script_url,
756 policy_container,
757 credentials,
758 pending_tasks_struct,
759 promise,
760 can_gc,
761 )
762 },
763 }
764 }
765
766 fn run_in_script_thread<T>(&self, task: T)
768 where
769 T: TaskBox + 'static,
770 {
771 let msg = CommonScriptMsg::Task(
774 ScriptThreadEventCategory::WorkletEvent,
775 Box::new(task),
776 None,
777 TaskSourceName::DOMManipulation,
778 );
779 let msg = MainThreadScriptMsg::Common(msg);
780 self.global_init
781 .to_script_thread_sender
782 .send(msg)
783 .expect("Worklet thread outlived script thread.");
784 }
785}
786
787#[derive(Clone, JSTraceable, MallocSizeOf)]
789pub(crate) struct WorkletExecutor {
790 worklet_id: WorkletId,
791 #[no_trace]
792 primary_sender: Sender<WorkletData>,
793}
794
795impl WorkletExecutor {
796 fn new(worklet_id: WorkletId, primary_sender: Sender<WorkletData>) -> WorkletExecutor {
797 WorkletExecutor {
798 worklet_id,
799 primary_sender,
800 }
801 }
802
803 pub(crate) fn schedule_a_worklet_task(&self, task: WorkletTask) {
805 let _ = self
806 .primary_sender
807 .send(WorkletData::Task(self.worklet_id, task));
808 }
809}