1use std::cell::OnceCell;
14use std::cmp::max;
15use std::collections::hash_map;
16use std::rc::Rc;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicIsize, Ordering};
19use std::thread;
20
21use base::id::PipelineId;
22use crossbeam_channel::{Receiver, Sender, unbounded};
23use dom_struct::dom_struct;
24use js::jsapi::{GCReason, JS_GC, JS_GetGCParameter, JSGCParamKey, JSTracer};
25use malloc_size_of::malloc_size_of_is_0;
26use net_traits::IpcSend;
27use net_traits::request::{Destination, RequestBuilder, RequestMode};
28use rustc_hash::FxHashMap;
29use servo_url::{ImmutableOrigin, ServoUrl};
30use style::thread_state::{self, ThreadState};
31use swapper::{Swapper, swapper};
32use uuid::Uuid;
33
34use crate::conversions::Convert;
35use crate::dom::bindings::codegen::Bindings::RequestBinding::RequestCredentials;
36use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
37use crate::dom::bindings::codegen::Bindings::WorkletBinding::{WorkletMethods, WorkletOptions};
38use crate::dom::bindings::error::Error;
39use crate::dom::bindings::inheritance::Castable;
40use crate::dom::bindings::refcounted::TrustedPromise;
41use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
42use crate::dom::bindings::root::{Dom, DomRoot};
43use crate::dom::bindings::str::USVString;
44use crate::dom::bindings::trace::{CustomTraceable, JSTraceable, RootedTraceableBox};
45use crate::dom::csp::Violation;
46use crate::dom::globalscope::GlobalScope;
47use crate::dom::promise::Promise;
48#[cfg(feature = "testbinding")]
49use crate::dom::testworkletglobalscope::TestWorkletTask;
50use crate::dom::window::Window;
51use crate::dom::workletglobalscope::{
52 WorkletGlobalScope, WorkletGlobalScopeInit, WorkletGlobalScopeType, WorkletTask,
53};
54use crate::fetch::{CspViolationsProcessor, load_whole_resource};
55use crate::messaging::{CommonScriptMsg, MainThreadScriptMsg};
56use crate::realms::InRealm;
57use crate::script_runtime::{CanGc, Runtime, ScriptThreadEventCategory};
58use crate::script_thread::ScriptThread;
59use crate::task::TaskBox;
60use crate::task_source::TaskSourceName;
61
62const WORKLET_THREAD_POOL_SIZE: u32 = 3;
64const MIN_GC_THRESHOLD: u32 = 1_000_000;
65
66#[derive(JSTraceable, MallocSizeOf)]
67struct DroppableField {
68 worklet_id: WorkletId,
69 #[ignore_malloc_size_of = "Difficult to measure memory usage of Rc<...> types"]
72 thread_pool: OnceCell<Rc<WorkletThreadPool>>,
73}
74
75impl Drop for DroppableField {
76 fn drop(&mut self) {
77 let worklet_id = self.worklet_id;
78 if let Some(thread_pool) = self.thread_pool.get_mut() {
79 thread_pool.exit_worklet(worklet_id);
80 }
81 }
82}
83
84#[dom_struct]
85pub(crate) struct Worklet {
87 reflector: Reflector,
88 window: Dom<Window>,
89 global_type: WorkletGlobalScopeType,
90 droppable_field: DroppableField,
91}
92
93impl Worklet {
94 fn new_inherited(window: &Window, global_type: WorkletGlobalScopeType) -> Worklet {
95 Worklet {
96 reflector: Reflector::new(),
97 window: Dom::from_ref(window),
98 global_type,
99 droppable_field: DroppableField {
100 worklet_id: WorkletId::new(),
101 thread_pool: OnceCell::new(),
102 },
103 }
104 }
105
106 pub(crate) fn new(
107 window: &Window,
108 global_type: WorkletGlobalScopeType,
109 can_gc: CanGc,
110 ) -> DomRoot<Worklet> {
111 debug!("Creating worklet {:?}.", global_type);
112 reflect_dom_object(
113 Box::new(Worklet::new_inherited(window, global_type)),
114 window,
115 can_gc,
116 )
117 }
118
119 pub(crate) fn worklet_id(&self) -> WorkletId {
120 self.droppable_field.worklet_id
121 }
122
123 #[allow(dead_code)]
124 pub(crate) fn worklet_global_scope_type(&self) -> WorkletGlobalScopeType {
125 self.global_type
126 }
127}
128
129impl WorkletMethods<crate::DomTypeHolder> for Worklet {
130 fn AddModule(
132 &self,
133 module_url: USVString,
134 options: &WorkletOptions,
135 comp: InRealm,
136 can_gc: CanGc,
137 ) -> Rc<Promise> {
138 let promise = Promise::new_in_current_realm(comp, can_gc);
140
141 let module_url_record = match self.window.Document().base_url().join(&module_url.0) {
143 Ok(url) => url,
144 Err(err) => {
145 debug!("URL {:?} parse error {:?}.", module_url.0, err);
147 promise.reject_error(Error::Syntax(None), can_gc);
148 return promise;
149 },
150 };
151 debug!("Adding Worklet module {}.", module_url_record);
152
153 let pending_tasks_struct = PendingTasksStruct::new();
155
156 self.droppable_field
157 .thread_pool
158 .get_or_init(|| ScriptThread::worklet_thread_pool(self.global().image_cache()))
159 .fetch_and_invoke_a_worklet_script(
160 self.window.pipeline_id(),
161 self.droppable_field.worklet_id,
162 self.global_type,
163 self.window.origin().immutable().clone(),
164 self.window.as_global_scope().api_base_url(),
165 module_url_record,
166 options.credentials,
167 pending_tasks_struct,
168 &promise,
169 );
170
171 debug!("Returning promise.");
173 promise
174 }
175}
176
177#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, PartialEq)]
179pub(crate) struct WorkletId(#[no_trace] Uuid);
180
181malloc_size_of_is_0!(WorkletId);
182
183impl WorkletId {
184 fn new() -> WorkletId {
185 WorkletId(servo_rand::random_uuid())
186 }
187}
188
189#[derive(Clone, Debug)]
191struct PendingTasksStruct(Arc<AtomicIsize>);
192
193impl PendingTasksStruct {
194 fn new() -> PendingTasksStruct {
195 PendingTasksStruct(Arc::new(AtomicIsize::new(
196 WORKLET_THREAD_POOL_SIZE as isize,
197 )))
198 }
199
200 fn set_counter_to(&self, value: isize) -> isize {
201 self.0.swap(value, Ordering::AcqRel)
202 }
203
204 fn decrement_counter_by(&self, offset: isize) -> isize {
205 self.0.fetch_sub(offset, Ordering::AcqRel)
206 }
207}
208
209#[derive(Clone, JSTraceable)]
259pub(crate) struct WorkletThreadPool {
260 #[no_trace]
262 primary_sender: Sender<WorkletData>,
263 #[no_trace]
264 hot_backup_sender: Sender<WorkletData>,
265 #[no_trace]
266 cold_backup_sender: Sender<WorkletData>,
267 #[no_trace]
269 control_sender_0: Sender<WorkletControl>,
270 #[no_trace]
271 control_sender_1: Sender<WorkletControl>,
272 #[no_trace]
273 control_sender_2: Sender<WorkletControl>,
274}
275
276impl Drop for WorkletThreadPool {
277 fn drop(&mut self) {
278 let _ = self.cold_backup_sender.send(WorkletData::Quit);
279 let _ = self.hot_backup_sender.send(WorkletData::Quit);
280 let _ = self.primary_sender.send(WorkletData::Quit);
281 }
282}
283
284impl WorkletThreadPool {
285 pub(crate) fn spawn(global_init: WorkletGlobalScopeInit) -> WorkletThreadPool {
288 let primary_role = WorkletThreadRole::new(false, false);
289 let hot_backup_role = WorkletThreadRole::new(true, false);
290 let cold_backup_role = WorkletThreadRole::new(false, true);
291 let primary_sender = primary_role.sender.clone();
292 let hot_backup_sender = hot_backup_role.sender.clone();
293 let cold_backup_sender = cold_backup_role.sender.clone();
294 let init = WorkletThreadInit {
295 primary_sender: primary_sender.clone(),
296 hot_backup_sender: hot_backup_sender.clone(),
297 cold_backup_sender: cold_backup_sender.clone(),
298 global_init,
299 };
300 WorkletThreadPool {
301 primary_sender,
302 hot_backup_sender,
303 cold_backup_sender,
304 control_sender_0: WorkletThread::spawn(primary_role, init.clone(), 0),
305 control_sender_1: WorkletThread::spawn(hot_backup_role, init.clone(), 1),
306 control_sender_2: WorkletThread::spawn(cold_backup_role, init, 2),
307 }
308 }
309
310 #[allow(clippy::too_many_arguments)]
315 fn fetch_and_invoke_a_worklet_script(
316 &self,
317 pipeline_id: PipelineId,
318 worklet_id: WorkletId,
319 global_type: WorkletGlobalScopeType,
320 origin: ImmutableOrigin,
321 base_url: ServoUrl,
322 script_url: ServoUrl,
323 credentials: RequestCredentials,
324 pending_tasks_struct: PendingTasksStruct,
325 promise: &Rc<Promise>,
326 ) {
327 for sender in &[
329 &self.control_sender_0,
330 &self.control_sender_1,
331 &self.control_sender_2,
332 ] {
333 let _ = sender.send(WorkletControl::FetchAndInvokeAWorkletScript {
334 pipeline_id,
335 worklet_id,
336 global_type,
337 origin: origin.clone(),
338 base_url: base_url.clone(),
339 script_url: script_url.clone(),
340 credentials,
341 pending_tasks_struct: pending_tasks_struct.clone(),
342 promise: TrustedPromise::new(promise.clone()),
343 });
344 }
345 self.wake_threads();
346 }
347
348 pub(crate) fn exit_worklet(&self, worklet_id: WorkletId) {
349 for sender in &[
350 &self.control_sender_0,
351 &self.control_sender_1,
352 &self.control_sender_2,
353 ] {
354 let _ = sender.send(WorkletControl::ExitWorklet(worklet_id));
355 }
356 self.wake_threads();
357 }
358
359 #[cfg(feature = "testbinding")]
361 pub(crate) fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
362 let (sender, receiver) = unbounded();
363 let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
364 let _ = self.primary_sender.send(msg);
365 receiver.recv().expect("Test worklet has died?")
366 }
367
368 fn wake_threads(&self) {
369 let _ = self.cold_backup_sender.send(WorkletData::WakeUp);
371 let _ = self.hot_backup_sender.send(WorkletData::WakeUp);
372 let _ = self.primary_sender.send(WorkletData::WakeUp);
373 }
374}
375
376enum WorkletData {
378 Task(WorkletId, WorkletTask),
379 StartSwapRoles(Sender<WorkletData>),
380 FinishSwapRoles(Swapper<WorkletThreadRole>),
381 WakeUp,
382 Quit,
383}
384
385enum WorkletControl {
387 ExitWorklet(WorkletId),
388 FetchAndInvokeAWorkletScript {
389 pipeline_id: PipelineId,
390 worklet_id: WorkletId,
391 global_type: WorkletGlobalScopeType,
392 origin: ImmutableOrigin,
393 base_url: ServoUrl,
394 script_url: ServoUrl,
395 credentials: RequestCredentials,
396 pending_tasks_struct: PendingTasksStruct,
397 promise: TrustedPromise,
398 },
399}
400
401struct WorkletThreadRole {
408 receiver: Receiver<WorkletData>,
409 sender: Sender<WorkletData>,
410 is_hot_backup: bool,
411 is_cold_backup: bool,
412}
413
414impl WorkletThreadRole {
415 fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
416 let (sender, receiver) = unbounded();
417 WorkletThreadRole {
418 sender,
419 receiver,
420 is_hot_backup,
421 is_cold_backup,
422 }
423 }
424}
425
426#[derive(Clone)]
428struct WorkletThreadInit {
429 primary_sender: Sender<WorkletData>,
431 hot_backup_sender: Sender<WorkletData>,
432 cold_backup_sender: Sender<WorkletData>,
433
434 global_init: WorkletGlobalScopeInit,
436}
437
438struct WorkletCspProcessor {}
439
440impl CspViolationsProcessor for WorkletCspProcessor {
441 fn process_csp_violations(&self, _violations: Vec<Violation>) {}
442}
443
444#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
446struct WorkletThread {
447 role: WorkletThreadRole,
449
450 control_receiver: Receiver<WorkletControl>,
452
453 primary_sender: Sender<WorkletData>,
455 hot_backup_sender: Sender<WorkletData>,
456 cold_backup_sender: Sender<WorkletData>,
457
458 global_init: WorkletGlobalScopeInit,
460
461 global_scopes: FxHashMap<WorkletId, Dom<WorkletGlobalScope>>,
463
464 control_buffer: Option<WorkletControl>,
466
467 runtime: Runtime,
469 should_gc: bool,
470 gc_threshold: u32,
471}
472
473#[allow(unsafe_code)]
474unsafe impl JSTraceable for WorkletThread {
475 unsafe fn trace(&self, trc: *mut JSTracer) {
476 debug!("Tracing worklet thread.");
477 self.global_scopes.trace(trc);
478 }
479}
480
481impl WorkletThread {
482 #[allow(unsafe_code)]
484 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
485 fn spawn(
486 role: WorkletThreadRole,
487 init: WorkletThreadInit,
488 thread_index: u8,
489 ) -> Sender<WorkletControl> {
490 let (control_sender, control_receiver) = unbounded();
491 let _ = thread::Builder::new()
492 .name(format!("Worklet#{thread_index}"))
493 .spawn(move || {
494 debug!("Initializing worklet thread.");
498 thread_state::initialize(ThreadState::SCRIPT | ThreadState::IN_WORKER);
499 let mut thread = RootedTraceableBox::new(WorkletThread {
500 role,
501 control_receiver,
502 primary_sender: init.primary_sender,
503 hot_backup_sender: init.hot_backup_sender,
504 cold_backup_sender: init.cold_backup_sender,
505 global_init: init.global_init,
506 global_scopes: FxHashMap::default(),
507 control_buffer: None,
508 runtime: Runtime::new(None),
509 should_gc: false,
510 gc_threshold: MIN_GC_THRESHOLD,
511 });
512 thread.run(CanGc::note());
513 })
514 .expect("Couldn't start worklet thread");
515 control_sender
516 }
517
518 fn run(&mut self, can_gc: CanGc) {
520 loop {
521 let message = self.role.receiver.recv().unwrap();
523 match message {
524 WorkletData::Task(id, task) => {
526 self.perform_a_worklet_task(id, task);
527 },
528 WorkletData::StartSwapRoles(sender) => {
535 let (our_swapper, their_swapper) = swapper();
536 match sender.send(WorkletData::FinishSwapRoles(their_swapper)) {
537 Ok(_) => {},
538 Err(_) => {
539 return;
542 },
543 };
544 let _ = our_swapper.swap(&mut self.role);
545 },
546 WorkletData::FinishSwapRoles(swapper) => {
549 let _ = swapper.swap(&mut self.role);
550 },
551 WorkletData::WakeUp => {},
553 WorkletData::Quit => {
555 return;
556 },
557 }
558 if self.role.is_cold_backup {
562 if let Some(control) = self.control_buffer.take() {
563 self.process_control(control, can_gc);
564 }
565 while let Ok(control) = self.control_receiver.try_recv() {
566 self.process_control(control, can_gc);
567 }
568 self.gc();
569 } else if self.control_buffer.is_none() {
570 if let Ok(control) = self.control_receiver.try_recv() {
571 self.control_buffer = Some(control);
572 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
573 let _ = self.cold_backup_sender.send(msg);
574 }
575 }
576 if self.current_memory_usage() > self.gc_threshold {
580 if self.role.is_hot_backup || self.role.is_cold_backup {
581 self.should_gc = false;
582 self.gc();
583 } else if !self.should_gc {
584 self.should_gc = true;
585 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
586 let _ = self.hot_backup_sender.send(msg);
587 }
588 }
589 }
590 }
591
592 #[allow(unsafe_code)]
594 fn current_memory_usage(&self) -> u32 {
595 unsafe { JS_GetGCParameter(self.runtime.cx(), JSGCParamKey::JSGC_BYTES) }
596 }
597
598 #[allow(unsafe_code)]
600 fn gc(&mut self) {
601 debug!(
602 "BEGIN GC (usage = {}, threshold = {}).",
603 self.current_memory_usage(),
604 self.gc_threshold
605 );
606 unsafe { JS_GC(self.runtime.cx(), GCReason::API) };
607 self.gc_threshold = max(MIN_GC_THRESHOLD, self.current_memory_usage() * 2);
608 debug!(
609 "END GC (usage = {}, threshold = {}).",
610 self.current_memory_usage(),
611 self.gc_threshold
612 );
613 }
614
615 fn get_worklet_global_scope(
618 &mut self,
619 pipeline_id: PipelineId,
620 worklet_id: WorkletId,
621 global_type: WorkletGlobalScopeType,
622 base_url: ServoUrl,
623 ) -> DomRoot<WorkletGlobalScope> {
624 match self.global_scopes.entry(worklet_id) {
625 hash_map::Entry::Occupied(entry) => DomRoot::from_ref(entry.get()),
626 hash_map::Entry::Vacant(entry) => {
627 debug!("Creating new worklet global scope.");
628 let executor = WorkletExecutor::new(worklet_id, self.primary_sender.clone());
629 let result = WorkletGlobalScope::new(
630 global_type,
631 pipeline_id,
632 base_url,
633 executor,
634 &self.global_init,
635 );
636 entry.insert(Dom::from_ref(&*result));
637 result
638 },
639 }
640 }
641
642 #[allow(clippy::too_many_arguments)]
645 fn fetch_and_invoke_a_worklet_script(
646 &self,
647 global_scope: &WorkletGlobalScope,
648 pipeline_id: PipelineId,
649 origin: ImmutableOrigin,
650 script_url: ServoUrl,
651 credentials: RequestCredentials,
652 pending_tasks_struct: PendingTasksStruct,
653 promise: TrustedPromise,
654 can_gc: CanGc,
655 ) {
656 debug!("Fetching from {}.", script_url);
657 let resource_fetcher = self.global_init.resource_threads.sender();
665 let request = RequestBuilder::new(
666 None,
667 script_url,
668 global_scope.upcast::<GlobalScope>().get_referrer(),
669 )
670 .destination(Destination::Script)
671 .mode(RequestMode::CorsMode)
672 .credentials_mode(credentials.convert())
673 .origin(origin);
674
675 let script = load_whole_resource(
676 request,
677 &resource_fetcher,
678 global_scope.upcast::<GlobalScope>(),
679 &WorkletCspProcessor {},
680 can_gc,
681 )
682 .ok()
683 .and_then(|(_, bytes)| String::from_utf8(bytes).ok());
684
685 let ok = script
692 .map(|s| global_scope.evaluate_js(&s, can_gc).is_ok())
693 .unwrap_or(false);
694
695 if !ok {
696 debug!("Failed to load script.");
698 let old_counter = pending_tasks_struct.set_counter_to(-1);
699 if old_counter > 0 {
700 self.run_in_script_thread(promise.reject_task(Error::Abort));
701 }
702 } else {
703 debug!("Finished adding script.");
705 let old_counter = pending_tasks_struct.decrement_counter_by(1);
706 if old_counter == 1 {
707 debug!("Resolving promise.");
708 let msg = MainThreadScriptMsg::WorkletLoaded(pipeline_id);
709 self.global_init
710 .to_script_thread_sender
711 .send(msg)
712 .expect("Worklet thread outlived script thread.");
713 self.run_in_script_thread(promise.resolve_task(()));
714 }
715 }
716 }
717
718 fn perform_a_worklet_task(&self, worklet_id: WorkletId, task: WorkletTask) {
720 match self.global_scopes.get(&worklet_id) {
721 Some(global) => global.perform_a_worklet_task(task),
722 None => warn!("No such worklet as {:?}.", worklet_id),
723 }
724 }
725
726 fn process_control(&mut self, control: WorkletControl, can_gc: CanGc) {
728 match control {
729 WorkletControl::ExitWorklet(worklet_id) => {
730 self.global_scopes.remove(&worklet_id);
731 },
732 WorkletControl::FetchAndInvokeAWorkletScript {
733 pipeline_id,
734 worklet_id,
735 global_type,
736 origin,
737 base_url,
738 script_url,
739 credentials,
740 pending_tasks_struct,
741 promise,
742 } => {
743 let global =
744 self.get_worklet_global_scope(pipeline_id, worklet_id, global_type, base_url);
745 self.fetch_and_invoke_a_worklet_script(
746 &global,
747 pipeline_id,
748 origin,
749 script_url,
750 credentials,
751 pending_tasks_struct,
752 promise,
753 can_gc,
754 )
755 },
756 }
757 }
758
759 fn run_in_script_thread<T>(&self, task: T)
761 where
762 T: TaskBox + 'static,
763 {
764 let msg = CommonScriptMsg::Task(
767 ScriptThreadEventCategory::WorkletEvent,
768 Box::new(task),
769 None,
770 TaskSourceName::DOMManipulation,
771 );
772 let msg = MainThreadScriptMsg::Common(msg);
773 self.global_init
774 .to_script_thread_sender
775 .send(msg)
776 .expect("Worklet thread outlived script thread.");
777 }
778}
779
780#[derive(Clone, JSTraceable, MallocSizeOf)]
782pub(crate) struct WorkletExecutor {
783 worklet_id: WorkletId,
784 #[no_trace]
785 primary_sender: Sender<WorkletData>,
786}
787
788impl WorkletExecutor {
789 fn new(worklet_id: WorkletId, primary_sender: Sender<WorkletData>) -> WorkletExecutor {
790 WorkletExecutor {
791 worklet_id,
792 primary_sender,
793 }
794 }
795
796 pub(crate) fn schedule_a_worklet_task(&self, task: WorkletTask) {
798 let _ = self
799 .primary_sender
800 .send(WorkletData::Task(self.worklet_id, task));
801 }
802}