1use std::cell::OnceCell;
14use std::cmp::max;
15use std::collections::{HashMap, hash_map};
16use std::rc::Rc;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicIsize, Ordering};
19use std::thread;
20
21use base::id::PipelineId;
22use crossbeam_channel::{Receiver, Sender, unbounded};
23use dom_struct::dom_struct;
24use js::jsapi::{GCReason, JS_GC, JS_GetGCParameter, JSGCParamKey, JSTracer};
25use malloc_size_of::malloc_size_of_is_0;
26use net_traits::IpcSend;
27use net_traits::request::{Destination, RequestBuilder, RequestMode};
28use servo_url::{ImmutableOrigin, ServoUrl};
29use style::thread_state::{self, ThreadState};
30use swapper::{Swapper, swapper};
31use uuid::Uuid;
32
33use crate::conversions::Convert;
34use crate::dom::bindings::codegen::Bindings::RequestBinding::RequestCredentials;
35use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
36use crate::dom::bindings::codegen::Bindings::WorkletBinding::{WorkletMethods, WorkletOptions};
37use crate::dom::bindings::error::Error;
38use crate::dom::bindings::inheritance::Castable;
39use crate::dom::bindings::refcounted::TrustedPromise;
40use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
41use crate::dom::bindings::root::{Dom, DomRoot};
42use crate::dom::bindings::str::USVString;
43use crate::dom::bindings::trace::{CustomTraceable, JSTraceable, RootedTraceableBox};
44use crate::dom::csp::Violation;
45use crate::dom::globalscope::GlobalScope;
46use crate::dom::promise::Promise;
47#[cfg(feature = "testbinding")]
48use crate::dom::testworkletglobalscope::TestWorkletTask;
49use crate::dom::window::Window;
50use crate::dom::workletglobalscope::{
51 WorkletGlobalScope, WorkletGlobalScopeInit, WorkletGlobalScopeType, WorkletTask,
52};
53use crate::fetch::{CspViolationsProcessor, load_whole_resource};
54use crate::messaging::{CommonScriptMsg, MainThreadScriptMsg};
55use crate::realms::InRealm;
56use crate::script_runtime::{CanGc, Runtime, ScriptThreadEventCategory};
57use crate::script_thread::ScriptThread;
58use crate::task::TaskBox;
59use crate::task_source::TaskSourceName;
60
61const WORKLET_THREAD_POOL_SIZE: u32 = 3;
63const MIN_GC_THRESHOLD: u32 = 1_000_000;
64
65#[derive(JSTraceable, MallocSizeOf)]
66struct DroppableField {
67 worklet_id: WorkletId,
68 #[ignore_malloc_size_of = "Difficult to measure memory usage of Rc<...> types"]
71 thread_pool: OnceCell<Rc<WorkletThreadPool>>,
72}
73
74impl Drop for DroppableField {
75 fn drop(&mut self) {
76 let worklet_id = self.worklet_id;
77 if let Some(thread_pool) = self.thread_pool.get_mut() {
78 thread_pool.exit_worklet(worklet_id);
79 }
80 }
81}
82
83#[dom_struct]
84pub(crate) struct Worklet {
86 reflector: Reflector,
87 window: Dom<Window>,
88 global_type: WorkletGlobalScopeType,
89 droppable_field: DroppableField,
90}
91
92impl Worklet {
93 fn new_inherited(window: &Window, global_type: WorkletGlobalScopeType) -> Worklet {
94 Worklet {
95 reflector: Reflector::new(),
96 window: Dom::from_ref(window),
97 global_type,
98 droppable_field: DroppableField {
99 worklet_id: WorkletId::new(),
100 thread_pool: OnceCell::new(),
101 },
102 }
103 }
104
105 pub(crate) fn new(
106 window: &Window,
107 global_type: WorkletGlobalScopeType,
108 can_gc: CanGc,
109 ) -> DomRoot<Worklet> {
110 debug!("Creating worklet {:?}.", global_type);
111 reflect_dom_object(
112 Box::new(Worklet::new_inherited(window, global_type)),
113 window,
114 can_gc,
115 )
116 }
117
118 pub(crate) fn worklet_id(&self) -> WorkletId {
119 self.droppable_field.worklet_id
120 }
121
122 #[allow(dead_code)]
123 pub(crate) fn worklet_global_scope_type(&self) -> WorkletGlobalScopeType {
124 self.global_type
125 }
126}
127
128impl WorkletMethods<crate::DomTypeHolder> for Worklet {
129 fn AddModule(
131 &self,
132 module_url: USVString,
133 options: &WorkletOptions,
134 comp: InRealm,
135 can_gc: CanGc,
136 ) -> Rc<Promise> {
137 let promise = Promise::new_in_current_realm(comp, can_gc);
139
140 let module_url_record = match self.window.Document().base_url().join(&module_url.0) {
142 Ok(url) => url,
143 Err(err) => {
144 debug!("URL {:?} parse error {:?}.", module_url.0, err);
146 promise.reject_error(Error::Syntax, can_gc);
147 return promise;
148 },
149 };
150 debug!("Adding Worklet module {}.", module_url_record);
151
152 let pending_tasks_struct = PendingTasksStruct::new();
154
155 self.droppable_field
156 .thread_pool
157 .get_or_init(|| ScriptThread::worklet_thread_pool(self.global().image_cache()))
158 .fetch_and_invoke_a_worklet_script(
159 self.window.pipeline_id(),
160 self.droppable_field.worklet_id,
161 self.global_type,
162 self.window.origin().immutable().clone(),
163 self.window.as_global_scope().api_base_url(),
164 module_url_record,
165 options.credentials,
166 pending_tasks_struct,
167 &promise,
168 );
169
170 debug!("Returning promise.");
172 promise
173 }
174}
175
176#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, PartialEq)]
178pub(crate) struct WorkletId(#[no_trace] Uuid);
179
180malloc_size_of_is_0!(WorkletId);
181
182impl WorkletId {
183 fn new() -> WorkletId {
184 WorkletId(servo_rand::random_uuid())
185 }
186}
187
188#[derive(Clone, Debug)]
190struct PendingTasksStruct(Arc<AtomicIsize>);
191
192impl PendingTasksStruct {
193 fn new() -> PendingTasksStruct {
194 PendingTasksStruct(Arc::new(AtomicIsize::new(
195 WORKLET_THREAD_POOL_SIZE as isize,
196 )))
197 }
198
199 fn set_counter_to(&self, value: isize) -> isize {
200 self.0.swap(value, Ordering::AcqRel)
201 }
202
203 fn decrement_counter_by(&self, offset: isize) -> isize {
204 self.0.fetch_sub(offset, Ordering::AcqRel)
205 }
206}
207
208#[derive(Clone, JSTraceable)]
258pub(crate) struct WorkletThreadPool {
259 #[no_trace]
261 primary_sender: Sender<WorkletData>,
262 #[no_trace]
263 hot_backup_sender: Sender<WorkletData>,
264 #[no_trace]
265 cold_backup_sender: Sender<WorkletData>,
266 #[no_trace]
268 control_sender_0: Sender<WorkletControl>,
269 #[no_trace]
270 control_sender_1: Sender<WorkletControl>,
271 #[no_trace]
272 control_sender_2: Sender<WorkletControl>,
273}
274
275impl Drop for WorkletThreadPool {
276 fn drop(&mut self) {
277 let _ = self.cold_backup_sender.send(WorkletData::Quit);
278 let _ = self.hot_backup_sender.send(WorkletData::Quit);
279 let _ = self.primary_sender.send(WorkletData::Quit);
280 }
281}
282
283impl WorkletThreadPool {
284 pub(crate) fn spawn(global_init: WorkletGlobalScopeInit) -> WorkletThreadPool {
287 let primary_role = WorkletThreadRole::new(false, false);
288 let hot_backup_role = WorkletThreadRole::new(true, false);
289 let cold_backup_role = WorkletThreadRole::new(false, true);
290 let primary_sender = primary_role.sender.clone();
291 let hot_backup_sender = hot_backup_role.sender.clone();
292 let cold_backup_sender = cold_backup_role.sender.clone();
293 let init = WorkletThreadInit {
294 primary_sender: primary_sender.clone(),
295 hot_backup_sender: hot_backup_sender.clone(),
296 cold_backup_sender: cold_backup_sender.clone(),
297 global_init,
298 };
299 WorkletThreadPool {
300 primary_sender,
301 hot_backup_sender,
302 cold_backup_sender,
303 control_sender_0: WorkletThread::spawn(primary_role, init.clone(), 0),
304 control_sender_1: WorkletThread::spawn(hot_backup_role, init.clone(), 1),
305 control_sender_2: WorkletThread::spawn(cold_backup_role, init, 2),
306 }
307 }
308
309 #[allow(clippy::too_many_arguments)]
314 fn fetch_and_invoke_a_worklet_script(
315 &self,
316 pipeline_id: PipelineId,
317 worklet_id: WorkletId,
318 global_type: WorkletGlobalScopeType,
319 origin: ImmutableOrigin,
320 base_url: ServoUrl,
321 script_url: ServoUrl,
322 credentials: RequestCredentials,
323 pending_tasks_struct: PendingTasksStruct,
324 promise: &Rc<Promise>,
325 ) {
326 for sender in &[
328 &self.control_sender_0,
329 &self.control_sender_1,
330 &self.control_sender_2,
331 ] {
332 let _ = sender.send(WorkletControl::FetchAndInvokeAWorkletScript {
333 pipeline_id,
334 worklet_id,
335 global_type,
336 origin: origin.clone(),
337 base_url: base_url.clone(),
338 script_url: script_url.clone(),
339 credentials,
340 pending_tasks_struct: pending_tasks_struct.clone(),
341 promise: TrustedPromise::new(promise.clone()),
342 });
343 }
344 self.wake_threads();
345 }
346
347 pub(crate) fn exit_worklet(&self, worklet_id: WorkletId) {
348 for sender in &[
349 &self.control_sender_0,
350 &self.control_sender_1,
351 &self.control_sender_2,
352 ] {
353 let _ = sender.send(WorkletControl::ExitWorklet(worklet_id));
354 }
355 self.wake_threads();
356 }
357
358 #[cfg(feature = "testbinding")]
360 pub(crate) fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
361 let (sender, receiver) = unbounded();
362 let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
363 let _ = self.primary_sender.send(msg);
364 receiver.recv().expect("Test worklet has died?")
365 }
366
367 fn wake_threads(&self) {
368 let _ = self.cold_backup_sender.send(WorkletData::WakeUp);
370 let _ = self.hot_backup_sender.send(WorkletData::WakeUp);
371 let _ = self.primary_sender.send(WorkletData::WakeUp);
372 }
373}
374
375enum WorkletData {
377 Task(WorkletId, WorkletTask),
378 StartSwapRoles(Sender<WorkletData>),
379 FinishSwapRoles(Swapper<WorkletThreadRole>),
380 WakeUp,
381 Quit,
382}
383
384enum WorkletControl {
386 ExitWorklet(WorkletId),
387 FetchAndInvokeAWorkletScript {
388 pipeline_id: PipelineId,
389 worklet_id: WorkletId,
390 global_type: WorkletGlobalScopeType,
391 origin: ImmutableOrigin,
392 base_url: ServoUrl,
393 script_url: ServoUrl,
394 credentials: RequestCredentials,
395 pending_tasks_struct: PendingTasksStruct,
396 promise: TrustedPromise,
397 },
398}
399
400struct WorkletThreadRole {
407 receiver: Receiver<WorkletData>,
408 sender: Sender<WorkletData>,
409 is_hot_backup: bool,
410 is_cold_backup: bool,
411}
412
413impl WorkletThreadRole {
414 fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
415 let (sender, receiver) = unbounded();
416 WorkletThreadRole {
417 sender,
418 receiver,
419 is_hot_backup,
420 is_cold_backup,
421 }
422 }
423}
424
425#[derive(Clone)]
427struct WorkletThreadInit {
428 primary_sender: Sender<WorkletData>,
430 hot_backup_sender: Sender<WorkletData>,
431 cold_backup_sender: Sender<WorkletData>,
432
433 global_init: WorkletGlobalScopeInit,
435}
436
437struct WorkletCspProcessor {}
438
439impl CspViolationsProcessor for WorkletCspProcessor {
440 fn process_csp_violations(&self, _violations: Vec<Violation>) {}
441}
442
443#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
445struct WorkletThread {
446 role: WorkletThreadRole,
448
449 control_receiver: Receiver<WorkletControl>,
451
452 primary_sender: Sender<WorkletData>,
454 hot_backup_sender: Sender<WorkletData>,
455 cold_backup_sender: Sender<WorkletData>,
456
457 global_init: WorkletGlobalScopeInit,
459
460 global_scopes: HashMap<WorkletId, Dom<WorkletGlobalScope>>,
462
463 control_buffer: Option<WorkletControl>,
465
466 runtime: Runtime,
468 should_gc: bool,
469 gc_threshold: u32,
470}
471
472#[allow(unsafe_code)]
473unsafe impl JSTraceable for WorkletThread {
474 unsafe fn trace(&self, trc: *mut JSTracer) {
475 debug!("Tracing worklet thread.");
476 self.global_scopes.trace(trc);
477 }
478}
479
480impl WorkletThread {
481 #[allow(unsafe_code)]
483 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
484 fn spawn(
485 role: WorkletThreadRole,
486 init: WorkletThreadInit,
487 thread_index: u8,
488 ) -> Sender<WorkletControl> {
489 let (control_sender, control_receiver) = unbounded();
490 let _ = thread::Builder::new()
491 .name(format!("Worklet#{thread_index}"))
492 .spawn(move || {
493 debug!("Initializing worklet thread.");
497 thread_state::initialize(ThreadState::SCRIPT | ThreadState::IN_WORKER);
498 let mut thread = RootedTraceableBox::new(WorkletThread {
499 role,
500 control_receiver,
501 primary_sender: init.primary_sender,
502 hot_backup_sender: init.hot_backup_sender,
503 cold_backup_sender: init.cold_backup_sender,
504 global_init: init.global_init,
505 global_scopes: HashMap::new(),
506 control_buffer: None,
507 runtime: Runtime::new(None),
508 should_gc: false,
509 gc_threshold: MIN_GC_THRESHOLD,
510 });
511 thread.run(CanGc::note());
512 })
513 .expect("Couldn't start worklet thread");
514 control_sender
515 }
516
517 fn run(&mut self, can_gc: CanGc) {
519 loop {
520 let message = self.role.receiver.recv().unwrap();
522 match message {
523 WorkletData::Task(id, task) => {
525 self.perform_a_worklet_task(id, task);
526 },
527 WorkletData::StartSwapRoles(sender) => {
534 let (our_swapper, their_swapper) = swapper();
535 match sender.send(WorkletData::FinishSwapRoles(their_swapper)) {
536 Ok(_) => {},
537 Err(_) => {
538 return;
541 },
542 };
543 let _ = our_swapper.swap(&mut self.role);
544 },
545 WorkletData::FinishSwapRoles(swapper) => {
548 let _ = swapper.swap(&mut self.role);
549 },
550 WorkletData::WakeUp => {},
552 WorkletData::Quit => {
554 return;
555 },
556 }
557 if self.role.is_cold_backup {
561 if let Some(control) = self.control_buffer.take() {
562 self.process_control(control, can_gc);
563 }
564 while let Ok(control) = self.control_receiver.try_recv() {
565 self.process_control(control, can_gc);
566 }
567 self.gc();
568 } else if self.control_buffer.is_none() {
569 if let Ok(control) = self.control_receiver.try_recv() {
570 self.control_buffer = Some(control);
571 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
572 let _ = self.cold_backup_sender.send(msg);
573 }
574 }
575 if self.current_memory_usage() > self.gc_threshold {
579 if self.role.is_hot_backup || self.role.is_cold_backup {
580 self.should_gc = false;
581 self.gc();
582 } else if !self.should_gc {
583 self.should_gc = true;
584 let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
585 let _ = self.hot_backup_sender.send(msg);
586 }
587 }
588 }
589 }
590
591 #[allow(unsafe_code)]
593 fn current_memory_usage(&self) -> u32 {
594 unsafe { JS_GetGCParameter(self.runtime.cx(), JSGCParamKey::JSGC_BYTES) }
595 }
596
597 #[allow(unsafe_code)]
599 fn gc(&mut self) {
600 debug!(
601 "BEGIN GC (usage = {}, threshold = {}).",
602 self.current_memory_usage(),
603 self.gc_threshold
604 );
605 unsafe { JS_GC(self.runtime.cx(), GCReason::API) };
606 self.gc_threshold = max(MIN_GC_THRESHOLD, self.current_memory_usage() * 2);
607 debug!(
608 "END GC (usage = {}, threshold = {}).",
609 self.current_memory_usage(),
610 self.gc_threshold
611 );
612 }
613
614 fn get_worklet_global_scope(
617 &mut self,
618 pipeline_id: PipelineId,
619 worklet_id: WorkletId,
620 global_type: WorkletGlobalScopeType,
621 base_url: ServoUrl,
622 ) -> DomRoot<WorkletGlobalScope> {
623 match self.global_scopes.entry(worklet_id) {
624 hash_map::Entry::Occupied(entry) => DomRoot::from_ref(entry.get()),
625 hash_map::Entry::Vacant(entry) => {
626 debug!("Creating new worklet global scope.");
627 let executor = WorkletExecutor::new(worklet_id, self.primary_sender.clone());
628 let result = WorkletGlobalScope::new(
629 global_type,
630 pipeline_id,
631 base_url,
632 executor,
633 &self.global_init,
634 );
635 entry.insert(Dom::from_ref(&*result));
636 result
637 },
638 }
639 }
640
641 #[allow(clippy::too_many_arguments)]
644 fn fetch_and_invoke_a_worklet_script(
645 &self,
646 global_scope: &WorkletGlobalScope,
647 pipeline_id: PipelineId,
648 origin: ImmutableOrigin,
649 script_url: ServoUrl,
650 credentials: RequestCredentials,
651 pending_tasks_struct: PendingTasksStruct,
652 promise: TrustedPromise,
653 can_gc: CanGc,
654 ) {
655 debug!("Fetching from {}.", script_url);
656 let resource_fetcher = self.global_init.resource_threads.sender();
664 let request = RequestBuilder::new(
665 None,
666 script_url,
667 global_scope.upcast::<GlobalScope>().get_referrer(),
668 )
669 .destination(Destination::Script)
670 .mode(RequestMode::CorsMode)
671 .credentials_mode(credentials.convert())
672 .origin(origin);
673
674 let script = load_whole_resource(
675 request,
676 &resource_fetcher,
677 global_scope.upcast::<GlobalScope>(),
678 &WorkletCspProcessor {},
679 can_gc,
680 )
681 .ok()
682 .and_then(|(_, bytes)| String::from_utf8(bytes).ok());
683
684 let ok = script
691 .map(|s| global_scope.evaluate_js(&s, can_gc).is_ok())
692 .unwrap_or(false);
693
694 if !ok {
695 debug!("Failed to load script.");
697 let old_counter = pending_tasks_struct.set_counter_to(-1);
698 if old_counter > 0 {
699 self.run_in_script_thread(promise.reject_task(Error::Abort));
700 }
701 } else {
702 debug!("Finished adding script.");
704 let old_counter = pending_tasks_struct.decrement_counter_by(1);
705 if old_counter == 1 {
706 debug!("Resolving promise.");
707 let msg = MainThreadScriptMsg::WorkletLoaded(pipeline_id);
708 self.global_init
709 .to_script_thread_sender
710 .send(msg)
711 .expect("Worklet thread outlived script thread.");
712 self.run_in_script_thread(promise.resolve_task(()));
713 }
714 }
715 }
716
717 fn perform_a_worklet_task(&self, worklet_id: WorkletId, task: WorkletTask) {
719 match self.global_scopes.get(&worklet_id) {
720 Some(global) => global.perform_a_worklet_task(task),
721 None => warn!("No such worklet as {:?}.", worklet_id),
722 }
723 }
724
725 fn process_control(&mut self, control: WorkletControl, can_gc: CanGc) {
727 match control {
728 WorkletControl::ExitWorklet(worklet_id) => {
729 self.global_scopes.remove(&worklet_id);
730 },
731 WorkletControl::FetchAndInvokeAWorkletScript {
732 pipeline_id,
733 worklet_id,
734 global_type,
735 origin,
736 base_url,
737 script_url,
738 credentials,
739 pending_tasks_struct,
740 promise,
741 } => {
742 let global =
743 self.get_worklet_global_scope(pipeline_id, worklet_id, global_type, base_url);
744 self.fetch_and_invoke_a_worklet_script(
745 &global,
746 pipeline_id,
747 origin,
748 script_url,
749 credentials,
750 pending_tasks_struct,
751 promise,
752 can_gc,
753 )
754 },
755 }
756 }
757
758 fn run_in_script_thread<T>(&self, task: T)
760 where
761 T: TaskBox + 'static,
762 {
763 let msg = CommonScriptMsg::Task(
766 ScriptThreadEventCategory::WorkletEvent,
767 Box::new(task),
768 None,
769 TaskSourceName::DOMManipulation,
770 );
771 let msg = MainThreadScriptMsg::Common(msg);
772 self.global_init
773 .to_script_thread_sender
774 .send(msg)
775 .expect("Worklet thread outlived script thread.");
776 }
777}
778
779#[derive(Clone, JSTraceable, MallocSizeOf)]
781pub(crate) struct WorkletExecutor {
782 worklet_id: WorkletId,
783 #[no_trace]
784 primary_sender: Sender<WorkletData>,
785}
786
787impl WorkletExecutor {
788 fn new(worklet_id: WorkletId, primary_sender: Sender<WorkletData>) -> WorkletExecutor {
789 WorkletExecutor {
790 worklet_id,
791 primary_sender,
792 }
793 }
794
795 pub(crate) fn schedule_a_worklet_task(&self, task: WorkletTask) {
797 let _ = self
798 .primary_sender
799 .send(WorkletData::Task(self.worklet_id, task));
800 }
801}