1use std::cell::OnceCell;
14use std::cmp::max;
15use std::collections::hash_map;
16use std::rc::Rc;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicIsize, Ordering};
19use std::thread;
20
21use base::IpcSend;
22use base::id::{PipelineId, WebViewId};
23use crossbeam_channel::{Receiver, Sender, unbounded};
24use dom_struct::dom_struct;
25use js::jsapi::{GCReason, JS_GC, JS_GetGCParameter, JSGCParamKey, JSTracer};
26use malloc_size_of::malloc_size_of_is_0;
27use net_traits::policy_container::PolicyContainer;
28use net_traits::request::{Destination, RequestBuilder, RequestMode};
29use rustc_hash::FxHashMap;
30use servo_url::{ImmutableOrigin, ServoUrl};
31use style::thread_state::{self, ThreadState};
32use swapper::{Swapper, swapper};
33use uuid::Uuid;
34
35use crate::conversions::Convert;
36use crate::dom::bindings::codegen::Bindings::RequestBinding::RequestCredentials;
37use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
38use crate::dom::bindings::codegen::Bindings::WorkletBinding::{WorkletMethods, WorkletOptions};
39use crate::dom::bindings::error::Error;
40use crate::dom::bindings::inheritance::Castable;
41use crate::dom::bindings::refcounted::TrustedPromise;
42use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
43use crate::dom::bindings::root::{Dom, DomRoot};
44use crate::dom::bindings::str::USVString;
45use crate::dom::bindings::trace::{CustomTraceable, JSTraceable, RootedTraceableBox};
46use crate::dom::csp::Violation;
47use crate::dom::globalscope::GlobalScope;
48use crate::dom::promise::Promise;
49#[cfg(feature = "testbinding")]
50use crate::dom::testworkletglobalscope::TestWorkletTask;
51use crate::dom::window::Window;
52use crate::dom::workletglobalscope::{
53 WorkletGlobalScope, WorkletGlobalScopeInit, WorkletGlobalScopeType, WorkletTask,
54};
55use crate::fetch::{CspViolationsProcessor, load_whole_resource};
56use crate::messaging::{CommonScriptMsg, MainThreadScriptMsg};
57use crate::realms::InRealm;
58use crate::script_runtime::{CanGc, Runtime, ScriptThreadEventCategory};
59use crate::script_thread::ScriptThread;
60use crate::task::TaskBox;
61use crate::task_source::TaskSourceName;
62
63const WORKLET_THREAD_POOL_SIZE: u32 = 3;
65const MIN_GC_THRESHOLD: u32 = 1_000_000;
66
67#[derive(JSTraceable, MallocSizeOf)]
68struct DroppableField {
69 worklet_id: WorkletId,
70 #[ignore_malloc_size_of = "Difficult to measure memory usage of Rc<...> types"]
73 thread_pool: OnceCell<Rc<WorkletThreadPool>>,
74}
75
76impl Drop for DroppableField {
77 fn drop(&mut self) {
78 let worklet_id = self.worklet_id;
79 if let Some(thread_pool) = self.thread_pool.get_mut() {
80 thread_pool.exit_worklet(worklet_id);
81 }
82 }
83}
84
85#[dom_struct]
86pub(crate) struct Worklet {
88 reflector: Reflector,
89 window: Dom<Window>,
90 global_type: WorkletGlobalScopeType,
91 droppable_field: DroppableField,
92}
93
94impl Worklet {
95 fn new_inherited(window: &Window, global_type: WorkletGlobalScopeType) -> Worklet {
96 Worklet {
97 reflector: Reflector::new(),
98 window: Dom::from_ref(window),
99 global_type,
100 droppable_field: DroppableField {
101 worklet_id: WorkletId::new(),
102 thread_pool: OnceCell::new(),
103 },
104 }
105 }
106
107 pub(crate) fn new(
108 window: &Window,
109 global_type: WorkletGlobalScopeType,
110 can_gc: CanGc,
111 ) -> DomRoot<Worklet> {
112 debug!("Creating worklet {:?}.", global_type);
113 reflect_dom_object(
114 Box::new(Worklet::new_inherited(window, global_type)),
115 window,
116 can_gc,
117 )
118 }
119
120 pub(crate) fn worklet_id(&self) -> WorkletId {
121 self.droppable_field.worklet_id
122 }
123
124 #[allow(dead_code)]
125 pub(crate) fn worklet_global_scope_type(&self) -> WorkletGlobalScopeType {
126 self.global_type
127 }
128}
129
130impl WorkletMethods<crate::DomTypeHolder> for Worklet {
131 fn AddModule(
133 &self,
134 module_url: USVString,
135 options: &WorkletOptions,
136 comp: InRealm,
137 can_gc: CanGc,
138 ) -> Rc<Promise> {
139 let promise = Promise::new_in_current_realm(comp, can_gc);
141
142 let module_url_record = match self.window.Document().base_url().join(&module_url.0) {
144 Ok(url) => url,
145 Err(err) => {
146 debug!("URL {:?} parse error {:?}.", module_url.0, err);
148 promise.reject_error(Error::Syntax(None), can_gc);
149 return promise;
150 },
151 };
152 debug!("Adding Worklet module {}.", module_url_record);
153
154 let pending_tasks_struct = PendingTasksStruct::new();
156 let global_scope = self.window.as_global_scope();
157
158 self.droppable_field
159 .thread_pool
160 .get_or_init(|| ScriptThread::worklet_thread_pool(self.global().image_cache()))
161 .fetch_and_invoke_a_worklet_script(
162 self.window.webview_id(),
163 self.window.pipeline_id(),
164 self.droppable_field.worklet_id,
165 self.global_type,
166 self.window.origin().immutable().clone(),
167 global_scope.api_base_url(),
168 module_url_record,
169 global_scope.policy_container(),
170 options.credentials,
171 pending_tasks_struct,
172 &promise,
173 );
174
175 debug!("Returning promise.");
177 promise
178 }
179}
180
181#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, PartialEq)]
183pub(crate) struct WorkletId(#[no_trace] Uuid);
184
185malloc_size_of_is_0!(WorkletId);
186
187impl WorkletId {
188 fn new() -> WorkletId {
189 WorkletId(Uuid::new_v4())
190 }
191}
192
193#[derive(Clone, Debug)]
195struct PendingTasksStruct(Arc<AtomicIsize>);
196
197impl PendingTasksStruct {
198 fn new() -> PendingTasksStruct {
199 PendingTasksStruct(Arc::new(AtomicIsize::new(
200 WORKLET_THREAD_POOL_SIZE as isize,
201 )))
202 }
203
204 fn set_counter_to(&self, value: isize) -> isize {
205 self.0.swap(value, Ordering::AcqRel)
206 }
207
208 fn decrement_counter_by(&self, offset: isize) -> isize {
209 self.0.fetch_sub(offset, Ordering::AcqRel)
210 }
211}
212
213#[derive(Clone, JSTraceable)]
263pub(crate) struct WorkletThreadPool {
264 #[no_trace]
266 primary_sender: Sender<WorkletData>,
267 #[no_trace]
268 hot_backup_sender: Sender<WorkletData>,
269 #[no_trace]
270 cold_backup_sender: Sender<WorkletData>,
271 #[no_trace]
273 control_sender_0: Sender<WorkletControl>,
274 #[no_trace]
275 control_sender_1: Sender<WorkletControl>,
276 #[no_trace]
277 control_sender_2: Sender<WorkletControl>,
278}
279
280impl Drop for WorkletThreadPool {
281 fn drop(&mut self) {
282 let _ = self.cold_backup_sender.send(WorkletData::Quit);
283 let _ = self.hot_backup_sender.send(WorkletData::Quit);
284 let _ = self.primary_sender.send(WorkletData::Quit);
285 }
286}
287
288impl WorkletThreadPool {
289 pub(crate) fn spawn(global_init: WorkletGlobalScopeInit) -> WorkletThreadPool {
292 let primary_role = WorkletThreadRole::new(false, false);
293 let hot_backup_role = WorkletThreadRole::new(true, false);
294 let cold_backup_role = WorkletThreadRole::new(false, true);
295 let primary_sender = primary_role.sender.clone();
296 let hot_backup_sender = hot_backup_role.sender.clone();
297 let cold_backup_sender = cold_backup_role.sender.clone();
298 let init = WorkletThreadInit {
299 primary_sender: primary_sender.clone(),
300 hot_backup_sender: hot_backup_sender.clone(),
301 cold_backup_sender: cold_backup_sender.clone(),
302 global_init,
303 };
304 WorkletThreadPool {
305 primary_sender,
306 hot_backup_sender,
307 cold_backup_sender,
308 control_sender_0: WorkletThread::spawn(primary_role, init.clone(), 0),
309 control_sender_1: WorkletThread::spawn(hot_backup_role, init.clone(), 1),
310 control_sender_2: WorkletThread::spawn(cold_backup_role, init, 2),
311 }
312 }
313
314 #[allow(clippy::too_many_arguments)]
319 fn fetch_and_invoke_a_worklet_script(
320 &self,
321 webview_id: WebViewId,
322 pipeline_id: PipelineId,
323 worklet_id: WorkletId,
324 global_type: WorkletGlobalScopeType,
325 origin: ImmutableOrigin,
326 base_url: ServoUrl,
327 script_url: ServoUrl,
328 policy_container: PolicyContainer,
329 credentials: RequestCredentials,
330 pending_tasks_struct: PendingTasksStruct,
331 promise: &Rc<Promise>,
332 ) {
333 for sender in &[
335 &self.control_sender_0,
336 &self.control_sender_1,
337 &self.control_sender_2,
338 ] {
339 let _ = sender.send(WorkletControl::FetchAndInvokeAWorkletScript {
340 webview_id,
341 pipeline_id,
342 worklet_id,
343 global_type,
344 origin: origin.clone(),
345 base_url: base_url.clone(),
346 script_url: script_url.clone(),
347 policy_container: policy_container.clone(),
348 credentials,
349 pending_tasks_struct: pending_tasks_struct.clone(),
350 promise: TrustedPromise::new(promise.clone()),
351 });
352 }
353 self.wake_threads();
354 }
355
356 pub(crate) fn exit_worklet(&self, worklet_id: WorkletId) {
357 for sender in &[
358 &self.control_sender_0,
359 &self.control_sender_1,
360 &self.control_sender_2,
361 ] {
362 let _ = sender.send(WorkletControl::ExitWorklet(worklet_id));
363 }
364 self.wake_threads();
365 }
366
367 #[cfg(feature = "testbinding")]
369 pub(crate) fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
370 let (sender, receiver) = unbounded();
371 let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
372 let _ = self.primary_sender.send(msg);
373 receiver.recv().expect("Test worklet has died?")
374 }
375
376 fn wake_threads(&self) {
377 let _ = self.cold_backup_sender.send(WorkletData::WakeUp);
379 let _ = self.hot_backup_sender.send(WorkletData::WakeUp);
380 let _ = self.primary_sender.send(WorkletData::WakeUp);
381 }
382}
383
384enum WorkletData {
386 Task(WorkletId, WorkletTask),
387 StartSwapRoles(Sender<WorkletData>),
388 FinishSwapRoles(Swapper<WorkletThreadRole>),
389 WakeUp,
390 Quit,
391}
392
393enum WorkletControl {
395 ExitWorklet(WorkletId),
396 FetchAndInvokeAWorkletScript {
397 webview_id: WebViewId,
398 pipeline_id: PipelineId,
399 worklet_id: WorkletId,
400 global_type: WorkletGlobalScopeType,
401 origin: ImmutableOrigin,
402 base_url: ServoUrl,
403 script_url: ServoUrl,
404 policy_container: PolicyContainer,
405 credentials: RequestCredentials,
406 pending_tasks_struct: PendingTasksStruct,
407 promise: TrustedPromise,
408 },
409}
410
411struct WorkletThreadRole {
418 receiver: Receiver<WorkletData>,
419 sender: Sender<WorkletData>,
420 is_hot_backup: bool,
421 is_cold_backup: bool,
422}
423
424impl WorkletThreadRole {
425 fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
426 let (sender, receiver) = unbounded();
427 WorkletThreadRole {
428 sender,
429 receiver,
430 is_hot_backup,
431 is_cold_backup,
432 }
433 }
434}
435
436#[derive(Clone)]
438struct WorkletThreadInit {
439 primary_sender: Sender<WorkletData>,
441 hot_backup_sender: Sender<WorkletData>,
442 cold_backup_sender: Sender<WorkletData>,
443
444 global_init: WorkletGlobalScopeInit,
446}
447
448struct WorkletCspProcessor {}
449
450impl CspViolationsProcessor for WorkletCspProcessor {
451 fn process_csp_violations(&self, _violations: Vec<Violation>) {}
452}
453
454#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
456struct WorkletThread {
457 role: WorkletThreadRole,
459
460 control_receiver: Receiver<WorkletControl>,
462
463 primary_sender: Sender<WorkletData>,
465 hot_backup_sender: Sender<WorkletData>,
466 cold_backup_sender: Sender<WorkletData>,
467
468 global_init: WorkletGlobalScopeInit,
470
471 global_scopes: FxHashMap<WorkletId, Dom<WorkletGlobalScope>>,
473
474 control_buffer: Option<WorkletControl>,
476
477 runtime: Runtime,
479 should_gc: bool,
480 gc_threshold: u32,
481}
482
483#[allow(unsafe_code)]
484unsafe impl JSTraceable for WorkletThread {
485 unsafe fn trace(&self, trc: *mut JSTracer) {
486 debug!("Tracing worklet thread.");
487 self.global_scopes.trace(trc);
488 }
489}
490
491impl WorkletThread {
492 #[allow(unsafe_code)]
494 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
495 fn spawn(
496 role: WorkletThreadRole,
497 init: WorkletThreadInit,
498 thread_index: u8,
499 ) -> Sender<WorkletControl> {
500 let (control_sender, control_receiver) = unbounded();
501 let _ = thread::Builder::new()
502 .name(format!("Worklet#{thread_index}"))
503 .spawn(move || {
504 debug!("Initializing worklet thread.");
508 thread_state::initialize(ThreadState::SCRIPT | ThreadState::IN_WORKER);
509 let mut thread = RootedTraceableBox::new(WorkletThread {
510 role,
511 control_receiver,
512 primary_sender: init.primary_sender,
513 hot_backup_sender: init.hot_backup_sender,
514 cold_backup_sender: init.cold_backup_sender,
515 global_init: init.global_init,
516 global_scopes: FxHashMap::default(),
517 control_buffer: None,
518 runtime: Runtime::new(None),
519 should_gc: false,
520 gc_threshold: MIN_GC_THRESHOLD,
521 });
522 thread.run(CanGc::note());
523 })
524 .expect("Couldn't start worklet thread");
525 control_sender
526 }
527
528 fn run(&mut self, can_gc: CanGc) {
530 loop {
531 let message = self.role.receiver.recv().unwrap();
533 match message {
534 WorkletData::Task(id, task) => {
536 self.perform_a_worklet_task(id, task);
537 },
538 WorkletData::StartSwapRoles(sender) => {
545 let (our_swapper, their_swapper) = swapper();
546 match sender.send(WorkletData::FinishSwapRoles(their_swapper)) {
547 Ok(_) => {},
548 Err(_) => {
549 return;
552 },
553 };
554 let _ = our_swapper.swap(&mut self.role);
555 },
556 WorkletData::FinishSwapRoles(swapper) => {
559 let _ = swapper.swap(&mut self.role);
560 },
561 WorkletData::WakeUp => {},
563 WorkletData::Quit => {
565 return;
566 },
567 }
568 if self.role.is_cold_backup {
572 if let Some(control) = self.control_buffer.take() {
573 self.process_control(control, can_gc);
574 }
575 while let Ok(control) = self.control_receiver.try_recv() {
576 self.process_control(control, can_gc);
577 }
578 self.gc();
579 } else if self.control_buffer.is_none() {
580 if let Ok(control) = self.control_receiver.try_recv() {
581 self.control_buffer = Some(control);
582 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
583 let _ = self.cold_backup_sender.send(msg);
584 }
585 }
586 if self.current_memory_usage() > self.gc_threshold {
590 if self.role.is_hot_backup || self.role.is_cold_backup {
591 self.should_gc = false;
592 self.gc();
593 } else if !self.should_gc {
594 self.should_gc = true;
595 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
596 let _ = self.hot_backup_sender.send(msg);
597 }
598 }
599 }
600 }
601
602 #[allow(unsafe_code)]
604 fn current_memory_usage(&self) -> u32 {
605 unsafe { JS_GetGCParameter(self.runtime.cx(), JSGCParamKey::JSGC_BYTES) }
606 }
607
608 #[allow(unsafe_code)]
610 fn gc(&mut self) {
611 debug!(
612 "BEGIN GC (usage = {}, threshold = {}).",
613 self.current_memory_usage(),
614 self.gc_threshold
615 );
616 unsafe { JS_GC(self.runtime.cx(), GCReason::API) };
617 self.gc_threshold = max(MIN_GC_THRESHOLD, self.current_memory_usage() * 2);
618 debug!(
619 "END GC (usage = {}, threshold = {}).",
620 self.current_memory_usage(),
621 self.gc_threshold
622 );
623 }
624
625 fn get_worklet_global_scope(
628 &mut self,
629 webview_id: WebViewId,
630 pipeline_id: PipelineId,
631 worklet_id: WorkletId,
632 global_type: WorkletGlobalScopeType,
633 base_url: ServoUrl,
634 ) -> DomRoot<WorkletGlobalScope> {
635 match self.global_scopes.entry(worklet_id) {
636 hash_map::Entry::Occupied(entry) => DomRoot::from_ref(entry.get()),
637 hash_map::Entry::Vacant(entry) => {
638 debug!("Creating new worklet global scope.");
639 let executor = WorkletExecutor::new(worklet_id, self.primary_sender.clone());
640 let result = WorkletGlobalScope::new(
641 global_type,
642 webview_id,
643 pipeline_id,
644 base_url,
645 executor,
646 &self.global_init,
647 );
648 entry.insert(Dom::from_ref(&*result));
649 result
650 },
651 }
652 }
653
654 #[allow(clippy::too_many_arguments)]
657 fn fetch_and_invoke_a_worklet_script(
658 &self,
659 global_scope: &WorkletGlobalScope,
660 pipeline_id: PipelineId,
661 origin: ImmutableOrigin,
662 script_url: ServoUrl,
663 policy_container: PolicyContainer,
664 credentials: RequestCredentials,
665 pending_tasks_struct: PendingTasksStruct,
666 promise: TrustedPromise,
667 can_gc: CanGc,
668 ) {
669 debug!("Fetching from {}.", script_url);
670 let global = global_scope.upcast::<GlobalScope>();
678 let resource_fetcher = self.global_init.resource_threads.sender();
679 let request = RequestBuilder::new(None, script_url, global.get_referrer())
680 .destination(Destination::Script)
681 .mode(RequestMode::CorsMode)
682 .credentials_mode(credentials.convert())
683 .policy_container(policy_container)
684 .origin(origin);
685
686 let script = load_whole_resource(
687 request,
688 &resource_fetcher,
689 global,
690 &WorkletCspProcessor {},
691 can_gc,
692 )
693 .ok()
694 .and_then(|(_, bytes)| String::from_utf8(bytes).ok());
695
696 let ok = script.is_some_and(|s| global_scope.evaluate_js(&s, can_gc).is_ok());
703
704 if !ok {
705 debug!("Failed to load script.");
707 let old_counter = pending_tasks_struct.set_counter_to(-1);
708 if old_counter > 0 {
709 self.run_in_script_thread(promise.reject_task(Error::Abort));
710 }
711 } else {
712 debug!("Finished adding script.");
714 let old_counter = pending_tasks_struct.decrement_counter_by(1);
715 if old_counter == 1 {
716 debug!("Resolving promise.");
717 let msg = MainThreadScriptMsg::WorkletLoaded(pipeline_id);
718 self.global_init
719 .to_script_thread_sender
720 .send(msg)
721 .expect("Worklet thread outlived script thread.");
722 self.run_in_script_thread(promise.resolve_task(()));
723 }
724 }
725 }
726
727 fn perform_a_worklet_task(&self, worklet_id: WorkletId, task: WorkletTask) {
729 match self.global_scopes.get(&worklet_id) {
730 Some(global) => global.perform_a_worklet_task(task),
731 None => warn!("No such worklet as {:?}.", worklet_id),
732 }
733 }
734
735 fn process_control(&mut self, control: WorkletControl, can_gc: CanGc) {
737 match control {
738 WorkletControl::ExitWorklet(worklet_id) => {
739 self.global_scopes.remove(&worklet_id);
740 },
741 WorkletControl::FetchAndInvokeAWorkletScript {
742 webview_id,
743 pipeline_id,
744 worklet_id,
745 global_type,
746 origin,
747 base_url,
748 script_url,
749 policy_container,
750 credentials,
751 pending_tasks_struct,
752 promise,
753 } => {
754 let global = self.get_worklet_global_scope(
755 webview_id,
756 pipeline_id,
757 worklet_id,
758 global_type,
759 base_url,
760 );
761 self.fetch_and_invoke_a_worklet_script(
762 &global,
763 pipeline_id,
764 origin,
765 script_url,
766 policy_container,
767 credentials,
768 pending_tasks_struct,
769 promise,
770 can_gc,
771 )
772 },
773 }
774 }
775
776 fn run_in_script_thread<T>(&self, task: T)
778 where
779 T: TaskBox + 'static,
780 {
781 let msg = CommonScriptMsg::Task(
784 ScriptThreadEventCategory::WorkletEvent,
785 Box::new(task),
786 None,
787 TaskSourceName::DOMManipulation,
788 );
789 let msg = MainThreadScriptMsg::Common(msg);
790 self.global_init
791 .to_script_thread_sender
792 .send(msg)
793 .expect("Worklet thread outlived script thread.");
794 }
795}
796
797#[derive(Clone, JSTraceable, MallocSizeOf)]
799pub(crate) struct WorkletExecutor {
800 worklet_id: WorkletId,
801 #[no_trace]
802 primary_sender: Sender<WorkletData>,
803}
804
805impl WorkletExecutor {
806 fn new(worklet_id: WorkletId, primary_sender: Sender<WorkletData>) -> WorkletExecutor {
807 WorkletExecutor {
808 worklet_id,
809 primary_sender,
810 }
811 }
812
813 pub(crate) fn schedule_a_worklet_task(&self, task: WorkletTask) {
815 let _ = self
816 .primary_sender
817 .send(WorkletData::Task(self.worklet_id, task));
818 }
819}