use std::cell::OnceCell;
use std::cmp::max;
use std::collections::{hash_map, HashMap};
use std::rc::Rc;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::thread;
use base::id::PipelineId;
use crossbeam_channel::{unbounded, Receiver, Sender};
use dom_struct::dom_struct;
use js::jsapi::{GCReason, JSGCParamKey, JSTracer, JS_GetGCParameter, JS_GC};
use malloc_size_of::malloc_size_of_is_0;
use net_traits::request::{Destination, RequestBuilder, RequestMode};
use net_traits::IpcSend;
use servo_url::{ImmutableOrigin, ServoUrl};
use style::thread_state::{self, ThreadState};
use swapper::{swapper, Swapper};
use uuid::Uuid;
use crate::conversions::Convert;
use crate::dom::bindings::codegen::Bindings::RequestBinding::RequestCredentials;
use crate::dom::bindings::codegen::Bindings::WindowBinding::Window_Binding::WindowMethods;
use crate::dom::bindings::codegen::Bindings::WorkletBinding::{WorkletMethods, WorkletOptions};
use crate::dom::bindings::error::Error;
use crate::dom::bindings::inheritance::Castable;
use crate::dom::bindings::refcounted::TrustedPromise;
use crate::dom::bindings::reflector::{reflect_dom_object, Reflector};
use crate::dom::bindings::root::{Dom, DomRoot, RootCollection, ThreadLocalStackRoots};
use crate::dom::bindings::str::USVString;
use crate::dom::bindings::trace::{CustomTraceable, JSTraceable, RootedTraceableBox};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::testworkletglobalscope::TestWorkletTask;
use crate::dom::window::Window;
use crate::dom::workletglobalscope::{
WorkletGlobalScope, WorkletGlobalScopeInit, WorkletGlobalScopeType, WorkletTask,
};
use crate::fetch::load_whole_resource;
use crate::messaging::MainThreadScriptMsg;
use crate::realms::InRealm;
use crate::script_runtime::{CanGc, CommonScriptMsg, Runtime, ScriptThreadEventCategory};
use crate::script_thread::ScriptThread;
use crate::task::TaskBox;
use crate::task_source::TaskSourceName;
const WORKLET_THREAD_POOL_SIZE: u32 = 3;
const MIN_GC_THRESHOLD: u32 = 1_000_000;
#[derive(JSTraceable, MallocSizeOf)]
struct DroppableField {
worklet_id: WorkletId,
#[ignore_malloc_size_of = "Difficult to measure memory usage of Rc<...> types"]
thread_pool: OnceCell<Rc<WorkletThreadPool>>,
}
impl Drop for DroppableField {
fn drop(&mut self) {
let worklet_id = self.worklet_id;
if let Some(thread_pool) = self.thread_pool.get_mut() {
thread_pool.exit_worklet(worklet_id);
}
}
}
#[dom_struct]
pub struct Worklet {
reflector: Reflector,
window: Dom<Window>,
global_type: WorkletGlobalScopeType,
droppable_field: DroppableField,
}
impl Worklet {
fn new_inherited(window: &Window, global_type: WorkletGlobalScopeType) -> Worklet {
Worklet {
reflector: Reflector::new(),
window: Dom::from_ref(window),
global_type,
droppable_field: DroppableField {
worklet_id: WorkletId::new(),
thread_pool: OnceCell::new(),
},
}
}
pub fn new(window: &Window, global_type: WorkletGlobalScopeType) -> DomRoot<Worklet> {
debug!("Creating worklet {:?}.", global_type);
reflect_dom_object(
Box::new(Worklet::new_inherited(window, global_type)),
window,
CanGc::note(),
)
}
pub fn worklet_id(&self) -> WorkletId {
self.droppable_field.worklet_id
}
#[allow(dead_code)]
pub fn worklet_global_scope_type(&self) -> WorkletGlobalScopeType {
self.global_type
}
}
impl WorkletMethods<crate::DomTypeHolder> for Worklet {
fn AddModule(
&self,
module_url: USVString,
options: &WorkletOptions,
comp: InRealm,
can_gc: CanGc,
) -> Rc<Promise> {
let promise = Promise::new_in_current_realm(comp, can_gc);
let module_url_record = match self.window.Document().base_url().join(&module_url.0) {
Ok(url) => url,
Err(err) => {
debug!("URL {:?} parse error {:?}.", module_url.0, err);
promise.reject_error(Error::Syntax);
return promise;
},
};
debug!("Adding Worklet module {}.", module_url_record);
let pending_tasks_struct = PendingTasksStruct::new();
let global = self.window.upcast::<GlobalScope>();
self.droppable_field
.thread_pool
.get_or_init(ScriptThread::worklet_thread_pool)
.fetch_and_invoke_a_worklet_script(
global.pipeline_id(),
self.droppable_field.worklet_id,
self.global_type,
self.window.origin().immutable().clone(),
global.api_base_url(),
module_url_record,
options.credentials,
pending_tasks_struct,
&promise,
);
debug!("Returning promise.");
promise
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, PartialEq)]
pub struct WorkletId(#[no_trace] Uuid);
malloc_size_of_is_0!(WorkletId);
impl WorkletId {
fn new() -> WorkletId {
WorkletId(servo_rand::random_uuid())
}
}
#[derive(Clone, Debug)]
struct PendingTasksStruct(Arc<AtomicIsize>);
impl PendingTasksStruct {
fn new() -> PendingTasksStruct {
PendingTasksStruct(Arc::new(AtomicIsize::new(
WORKLET_THREAD_POOL_SIZE as isize,
)))
}
fn set_counter_to(&self, value: isize) -> isize {
self.0.swap(value, Ordering::AcqRel)
}
fn decrement_counter_by(&self, offset: isize) -> isize {
self.0.fetch_sub(offset, Ordering::AcqRel)
}
}
#[derive(Clone, JSTraceable)]
pub struct WorkletThreadPool {
#[no_trace]
primary_sender: Sender<WorkletData>,
#[no_trace]
hot_backup_sender: Sender<WorkletData>,
#[no_trace]
cold_backup_sender: Sender<WorkletData>,
#[no_trace]
control_sender_0: Sender<WorkletControl>,
#[no_trace]
control_sender_1: Sender<WorkletControl>,
#[no_trace]
control_sender_2: Sender<WorkletControl>,
}
impl Drop for WorkletThreadPool {
fn drop(&mut self) {
let _ = self.cold_backup_sender.send(WorkletData::Quit);
let _ = self.hot_backup_sender.send(WorkletData::Quit);
let _ = self.primary_sender.send(WorkletData::Quit);
}
}
impl WorkletThreadPool {
pub(crate) fn spawn(global_init: WorkletGlobalScopeInit) -> WorkletThreadPool {
let primary_role = WorkletThreadRole::new(false, false);
let hot_backup_role = WorkletThreadRole::new(true, false);
let cold_backup_role = WorkletThreadRole::new(false, true);
let primary_sender = primary_role.sender.clone();
let hot_backup_sender = hot_backup_role.sender.clone();
let cold_backup_sender = cold_backup_role.sender.clone();
let init = WorkletThreadInit {
primary_sender: primary_sender.clone(),
hot_backup_sender: hot_backup_sender.clone(),
cold_backup_sender: cold_backup_sender.clone(),
global_init,
};
WorkletThreadPool {
primary_sender,
hot_backup_sender,
cold_backup_sender,
control_sender_0: WorkletThread::spawn(primary_role, init.clone(), 0),
control_sender_1: WorkletThread::spawn(hot_backup_role, init.clone(), 1),
control_sender_2: WorkletThread::spawn(cold_backup_role, init, 2),
}
}
#[allow(clippy::too_many_arguments)]
fn fetch_and_invoke_a_worklet_script(
&self,
pipeline_id: PipelineId,
worklet_id: WorkletId,
global_type: WorkletGlobalScopeType,
origin: ImmutableOrigin,
base_url: ServoUrl,
script_url: ServoUrl,
credentials: RequestCredentials,
pending_tasks_struct: PendingTasksStruct,
promise: &Rc<Promise>,
) {
for sender in &[
&self.control_sender_0,
&self.control_sender_1,
&self.control_sender_2,
] {
let _ = sender.send(WorkletControl::FetchAndInvokeAWorkletScript {
pipeline_id,
worklet_id,
global_type,
origin: origin.clone(),
base_url: base_url.clone(),
script_url: script_url.clone(),
credentials,
pending_tasks_struct: pending_tasks_struct.clone(),
promise: TrustedPromise::new(promise.clone()),
});
}
self.wake_threads();
}
pub(crate) fn exit_worklet(&self, worklet_id: WorkletId) {
for sender in &[
&self.control_sender_0,
&self.control_sender_1,
&self.control_sender_2,
] {
let _ = sender.send(WorkletControl::ExitWorklet(worklet_id));
}
self.wake_threads();
}
pub fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
let (sender, receiver) = unbounded();
let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
let _ = self.primary_sender.send(msg);
receiver.recv().expect("Test worklet has died?")
}
fn wake_threads(&self) {
let _ = self.cold_backup_sender.send(WorkletData::WakeUp);
let _ = self.hot_backup_sender.send(WorkletData::WakeUp);
let _ = self.primary_sender.send(WorkletData::WakeUp);
}
}
enum WorkletData {
Task(WorkletId, WorkletTask),
StartSwapRoles(Sender<WorkletData>),
FinishSwapRoles(Swapper<WorkletThreadRole>),
WakeUp,
Quit,
}
enum WorkletControl {
ExitWorklet(WorkletId),
FetchAndInvokeAWorkletScript {
pipeline_id: PipelineId,
worklet_id: WorkletId,
global_type: WorkletGlobalScopeType,
origin: ImmutableOrigin,
base_url: ServoUrl,
script_url: ServoUrl,
credentials: RequestCredentials,
pending_tasks_struct: PendingTasksStruct,
promise: TrustedPromise,
},
}
struct WorkletThreadRole {
receiver: Receiver<WorkletData>,
sender: Sender<WorkletData>,
is_hot_backup: bool,
is_cold_backup: bool,
}
impl WorkletThreadRole {
fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
let (sender, receiver) = unbounded();
WorkletThreadRole {
sender,
receiver,
is_hot_backup,
is_cold_backup,
}
}
}
#[derive(Clone)]
struct WorkletThreadInit {
primary_sender: Sender<WorkletData>,
hot_backup_sender: Sender<WorkletData>,
cold_backup_sender: Sender<WorkletData>,
global_init: WorkletGlobalScopeInit,
}
#[crown::unrooted_must_root_lint::must_root]
struct WorkletThread {
role: WorkletThreadRole,
control_receiver: Receiver<WorkletControl>,
primary_sender: Sender<WorkletData>,
hot_backup_sender: Sender<WorkletData>,
cold_backup_sender: Sender<WorkletData>,
global_init: WorkletGlobalScopeInit,
global_scopes: HashMap<WorkletId, Dom<WorkletGlobalScope>>,
control_buffer: Option<WorkletControl>,
runtime: Runtime,
should_gc: bool,
gc_threshold: u32,
}
#[allow(unsafe_code)]
unsafe impl JSTraceable for WorkletThread {
unsafe fn trace(&self, trc: *mut JSTracer) {
debug!("Tracing worklet thread.");
self.global_scopes.trace(trc);
}
}
impl WorkletThread {
#[allow(unsafe_code)]
#[allow(crown::unrooted_must_root)]
fn spawn(
role: WorkletThreadRole,
init: WorkletThreadInit,
thread_index: u8,
) -> Sender<WorkletControl> {
let (control_sender, control_receiver) = unbounded();
let _ = thread::Builder::new()
.name(format!("Worklet#{thread_index}"))
.spawn(move || {
debug!("Initializing worklet thread.");
thread_state::initialize(ThreadState::SCRIPT | ThreadState::IN_WORKER);
let roots = RootCollection::new();
let _stack_roots = ThreadLocalStackRoots::new(&roots);
let mut thread = RootedTraceableBox::new(WorkletThread {
role,
control_receiver,
primary_sender: init.primary_sender,
hot_backup_sender: init.hot_backup_sender,
cold_backup_sender: init.cold_backup_sender,
global_init: init.global_init,
global_scopes: HashMap::new(),
control_buffer: None,
runtime: Runtime::new(None),
should_gc: false,
gc_threshold: MIN_GC_THRESHOLD,
});
thread.run();
})
.expect("Couldn't start worklet thread");
control_sender
}
fn run(&mut self) {
loop {
let message = self.role.receiver.recv().unwrap();
match message {
WorkletData::Task(id, task) => {
self.perform_a_worklet_task(id, task);
},
WorkletData::StartSwapRoles(sender) => {
let (our_swapper, their_swapper) = swapper();
match sender.send(WorkletData::FinishSwapRoles(their_swapper)) {
Ok(_) => {},
Err(_) => {
return;
},
};
let _ = our_swapper.swap(&mut self.role);
},
WorkletData::FinishSwapRoles(swapper) => {
let _ = swapper.swap(&mut self.role);
},
WorkletData::WakeUp => {},
WorkletData::Quit => {
return;
},
}
if self.role.is_cold_backup {
if let Some(control) = self.control_buffer.take() {
self.process_control(control, CanGc::note());
}
while let Ok(control) = self.control_receiver.try_recv() {
self.process_control(control, CanGc::note());
}
self.gc();
} else if self.control_buffer.is_none() {
if let Ok(control) = self.control_receiver.try_recv() {
self.control_buffer = Some(control);
let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
let _ = self.cold_backup_sender.send(msg);
}
}
if self.current_memory_usage() > self.gc_threshold {
if self.role.is_hot_backup || self.role.is_cold_backup {
self.should_gc = false;
self.gc();
} else if !self.should_gc {
self.should_gc = true;
let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
let _ = self.hot_backup_sender.send(msg);
}
}
}
}
#[allow(unsafe_code)]
fn current_memory_usage(&self) -> u32 {
unsafe { JS_GetGCParameter(self.runtime.cx(), JSGCParamKey::JSGC_BYTES) }
}
#[allow(unsafe_code)]
fn gc(&mut self) {
debug!(
"BEGIN GC (usage = {}, threshold = {}).",
self.current_memory_usage(),
self.gc_threshold
);
unsafe { JS_GC(self.runtime.cx(), GCReason::API) };
self.gc_threshold = max(MIN_GC_THRESHOLD, self.current_memory_usage() * 2);
debug!(
"END GC (usage = {}, threshold = {}).",
self.current_memory_usage(),
self.gc_threshold
);
}
fn get_worklet_global_scope(
&mut self,
pipeline_id: PipelineId,
worklet_id: WorkletId,
global_type: WorkletGlobalScopeType,
base_url: ServoUrl,
) -> DomRoot<WorkletGlobalScope> {
match self.global_scopes.entry(worklet_id) {
hash_map::Entry::Occupied(entry) => DomRoot::from_ref(entry.get()),
hash_map::Entry::Vacant(entry) => {
debug!("Creating new worklet global scope.");
let executor = WorkletExecutor::new(worklet_id, self.primary_sender.clone());
let result = WorkletGlobalScope::new(
global_type,
&self.runtime,
pipeline_id,
base_url,
executor,
&self.global_init,
);
entry.insert(Dom::from_ref(&*result));
result
},
}
}
#[allow(clippy::too_many_arguments)]
fn fetch_and_invoke_a_worklet_script(
&self,
global_scope: &WorkletGlobalScope,
pipeline_id: PipelineId,
origin: ImmutableOrigin,
script_url: ServoUrl,
credentials: RequestCredentials,
pending_tasks_struct: PendingTasksStruct,
promise: TrustedPromise,
can_gc: CanGc,
) {
debug!("Fetching from {}.", script_url);
let resource_fetcher = self.global_init.resource_threads.sender();
let request = RequestBuilder::new(
script_url,
global_scope.upcast::<GlobalScope>().get_referrer(),
)
.destination(Destination::Script)
.mode(RequestMode::CorsMode)
.credentials_mode(credentials.convert())
.origin(origin);
let script = load_whole_resource(
request,
&resource_fetcher,
global_scope.upcast::<GlobalScope>(),
can_gc,
)
.ok()
.and_then(|(_, bytes)| String::from_utf8(bytes).ok());
let ok = script
.map(|script| global_scope.evaluate_js(&script, can_gc))
.unwrap_or(false);
if !ok {
debug!("Failed to load script.");
let old_counter = pending_tasks_struct.set_counter_to(-1);
if old_counter > 0 {
self.run_in_script_thread(promise.reject_task(Error::Abort));
}
} else {
debug!("Finished adding script.");
let old_counter = pending_tasks_struct.decrement_counter_by(1);
if old_counter == 1 {
debug!("Resolving promise.");
let msg = MainThreadScriptMsg::WorkletLoaded(pipeline_id);
self.global_init
.to_script_thread_sender
.send(msg)
.expect("Worklet thread outlived script thread.");
self.run_in_script_thread(promise.resolve_task(()));
}
}
}
fn perform_a_worklet_task(&self, worklet_id: WorkletId, task: WorkletTask) {
match self.global_scopes.get(&worklet_id) {
Some(global) => global.perform_a_worklet_task(task),
None => warn!("No such worklet as {:?}.", worklet_id),
}
}
fn process_control(&mut self, control: WorkletControl, can_gc: CanGc) {
match control {
WorkletControl::ExitWorklet(worklet_id) => {
self.global_scopes.remove(&worklet_id);
},
WorkletControl::FetchAndInvokeAWorkletScript {
pipeline_id,
worklet_id,
global_type,
origin,
base_url,
script_url,
credentials,
pending_tasks_struct,
promise,
} => {
let global =
self.get_worklet_global_scope(pipeline_id, worklet_id, global_type, base_url);
self.fetch_and_invoke_a_worklet_script(
&global,
pipeline_id,
origin,
script_url,
credentials,
pending_tasks_struct,
promise,
can_gc,
)
},
}
}
fn run_in_script_thread<T>(&self, task: T)
where
T: TaskBox + 'static,
{
let msg = CommonScriptMsg::Task(
ScriptThreadEventCategory::WorkletEvent,
Box::new(task),
None,
TaskSourceName::DOMManipulation,
);
let msg = MainThreadScriptMsg::Common(msg);
self.global_init
.to_script_thread_sender
.send(msg)
.expect("Worklet thread outlived script thread.");
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
pub struct WorkletExecutor {
worklet_id: WorkletId,
#[ignore_malloc_size_of = "channels are hard"]
#[no_trace]
primary_sender: Sender<WorkletData>,
}
impl WorkletExecutor {
fn new(worklet_id: WorkletId, primary_sender: Sender<WorkletData>) -> WorkletExecutor {
WorkletExecutor {
worklet_id,
primary_sender,
}
}
pub fn schedule_a_worklet_task(&self, task: WorkletTask) {
let _ = self
.primary_sender
.send(WorkletData::Task(self.worklet_id, task));
}
}