use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use base::id::{PipelineNamespace, ServiceWorkerId, ServiceWorkerRegistrationId};
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use net_traits::{CoreResourceMsg, CustomResponseMediator};
use script_traits::{
DOMMessage, Job, JobError, JobResult, JobResultValue, JobType, SWManagerMsg, SWManagerSenders,
ScopeThings, ServiceWorkerManagerFactory, ServiceWorkerMsg,
};
use servo_config::pref;
use servo_url::{ImmutableOrigin, ServoUrl};
use crate::dom::abstractworker::WorkerScriptMsg;
use crate::dom::serviceworkerglobalscope::{
ServiceWorkerControlMsg, ServiceWorkerGlobalScope, ServiceWorkerScriptMsg,
};
use crate::dom::serviceworkerregistration::longest_prefix_match;
use crate::script_runtime::ThreadSafeJSContext;
enum Message {
FromResource(CustomResponseMediator),
FromConstellation(Box<ServiceWorkerMsg>),
}
#[derive(Clone)]
struct ServiceWorker {
pub id: ServiceWorkerId,
pub script_url: ServoUrl,
pub sender: Sender<ServiceWorkerScriptMsg>,
}
impl ServiceWorker {
fn new(
script_url: ServoUrl,
sender: Sender<ServiceWorkerScriptMsg>,
id: ServiceWorkerId,
) -> ServiceWorker {
ServiceWorker {
id,
script_url,
sender,
}
}
fn forward_dom_message(&self, msg: DOMMessage) {
let DOMMessage { origin, data } = msg;
let _ = self.sender.send(ServiceWorkerScriptMsg::CommonWorker(
WorkerScriptMsg::DOMMessage { origin, data },
));
}
fn send_message(&self, msg: ServiceWorkerScriptMsg) {
let _ = self.sender.send(msg);
}
}
#[allow(dead_code)]
enum RegistrationUpdateTarget {
Installing,
Waiting,
Active,
}
impl Drop for ServiceWorkerRegistration {
fn drop(&mut self) {
if self
.control_sender
.take()
.expect("No control sender to worker thread.")
.send(ServiceWorkerControlMsg::Exit)
.is_err()
{
warn!("Failed to send exit message to service worker scope.");
}
self.closing
.take()
.expect("No close flag for worker")
.store(true, Ordering::SeqCst);
self.context
.take()
.expect("No context to request interrupt.")
.request_interrupt_callback();
if self
.join_handle
.take()
.expect("No handle to join on worker.")
.join()
.is_err()
{
warn!("Failed to join on service worker thread.");
}
}
}
struct ServiceWorkerRegistration {
id: ServiceWorkerRegistrationId,
active_worker: Option<ServiceWorker>,
waiting_worker: Option<ServiceWorker>,
installing_worker: Option<ServiceWorker>,
control_sender: Option<Sender<ServiceWorkerControlMsg>>,
join_handle: Option<JoinHandle<()>>,
context: Option<ThreadSafeJSContext>,
closing: Option<Arc<AtomicBool>>,
}
impl ServiceWorkerRegistration {
pub fn new() -> ServiceWorkerRegistration {
ServiceWorkerRegistration {
id: ServiceWorkerRegistrationId::new(),
active_worker: None,
waiting_worker: None,
installing_worker: None,
join_handle: None,
control_sender: None,
context: None,
closing: None,
}
}
fn note_worker_thread(
&mut self,
join_handle: JoinHandle<()>,
control_sender: Sender<ServiceWorkerControlMsg>,
context: ThreadSafeJSContext,
closing: Arc<AtomicBool>,
) {
assert!(self.join_handle.is_none());
self.join_handle = Some(join_handle);
assert!(self.control_sender.is_none());
self.control_sender = Some(control_sender);
assert!(self.context.is_none());
self.context = Some(context);
assert!(self.closing.is_none());
self.closing = Some(closing);
}
fn get_newest_worker(&self) -> Option<ServiceWorker> {
if let Some(worker) = self.active_worker.as_ref() {
return Some(worker.clone());
}
if let Some(worker) = self.waiting_worker.as_ref() {
return Some(worker.clone());
}
if let Some(worker) = self.installing_worker.as_ref() {
return Some(worker.clone());
}
None
}
fn update_registration_state(
&mut self,
target: RegistrationUpdateTarget,
worker: ServiceWorker,
) {
match target {
RegistrationUpdateTarget::Active => {
self.active_worker = Some(worker);
},
RegistrationUpdateTarget::Waiting => {
self.waiting_worker = Some(worker);
},
RegistrationUpdateTarget::Installing => {
self.installing_worker = Some(worker);
},
}
}
}
pub struct ServiceWorkerManager {
registrations: HashMap<ServoUrl, ServiceWorkerRegistration>,
_constellation_sender: IpcSender<SWManagerMsg>,
own_sender: IpcSender<ServiceWorkerMsg>,
own_port: Receiver<ServiceWorkerMsg>,
resource_receiver: Receiver<CustomResponseMediator>,
}
impl ServiceWorkerManager {
fn new(
own_sender: IpcSender<ServiceWorkerMsg>,
from_constellation_receiver: Receiver<ServiceWorkerMsg>,
resource_port: Receiver<CustomResponseMediator>,
constellation_sender: IpcSender<SWManagerMsg>,
) -> ServiceWorkerManager {
PipelineNamespace::auto_install();
ServiceWorkerManager {
registrations: HashMap::new(),
own_sender,
own_port: from_constellation_receiver,
resource_receiver: resource_port,
_constellation_sender: constellation_sender,
}
}
pub fn get_matching_scope(&self, load_url: &ServoUrl) -> Option<ServoUrl> {
for scope in self.registrations.keys() {
if longest_prefix_match(scope, load_url) {
return Some(scope.clone());
}
}
None
}
fn handle_message(&mut self) {
while let Ok(message) = self.receive_message() {
let should_continue = match message {
Message::FromConstellation(msg) => self.handle_message_from_constellation(*msg),
Message::FromResource(msg) => self.handle_message_from_resource(msg),
};
if !should_continue {
for registration in self.registrations.drain() {
drop(registration);
}
break;
}
}
}
fn handle_message_from_resource(&mut self, mediator: CustomResponseMediator) -> bool {
if serviceworker_enabled() {
if let Some(scope) = self.get_matching_scope(&mediator.load_url) {
if let Some(registration) = self.registrations.get(&scope) {
if let Some(ref worker) = registration.active_worker {
worker.send_message(ServiceWorkerScriptMsg::Response(mediator));
return true;
}
}
}
}
let _ = mediator.response_chan.send(None);
true
}
fn receive_message(&mut self) -> Result<Message, RecvError> {
select! {
recv(self.own_port) -> msg => msg.map(|m| Message::FromConstellation(Box::new(m))),
recv(self.resource_receiver) -> msg => msg.map(Message::FromResource),
}
}
fn handle_message_from_constellation(&mut self, msg: ServiceWorkerMsg) -> bool {
match msg {
ServiceWorkerMsg::Timeout(_scope) => {
},
ServiceWorkerMsg::ForwardDOMMessage(msg, scope_url) => {
if let Some(registration) = self.registrations.get_mut(&scope_url) {
if let Some(ref worker) = registration.active_worker {
worker.forward_dom_message(msg);
}
}
},
ServiceWorkerMsg::ScheduleJob(job) => match job.job_type {
JobType::Register => {
self.handle_register_job(job);
},
JobType::Update => {
self.handle_update_job(job);
},
JobType::Unregister => {
},
},
ServiceWorkerMsg::Exit => return false,
}
true
}
fn handle_register_job(&mut self, mut job: Job) {
if !job.script_url.is_origin_trustworthy() {
let _ = job
.client
.send(JobResult::RejectPromise(JobError::SecurityError));
return;
}
if job.script_url.origin() != job.referrer.origin() ||
job.scope_url.origin() != job.referrer.origin()
{
let _ = job
.client
.send(JobResult::RejectPromise(JobError::SecurityError));
return;
}
if let Some(registration) = self.registrations.get(&job.scope_url) {
let newest_worker = registration.get_newest_worker();
if newest_worker.is_some() {
let client = job.client.clone();
let _ = client.send(JobResult::ResolvePromise(
job,
JobResultValue::Registration {
id: registration.id,
installing_worker: registration
.installing_worker
.as_ref()
.map(|worker| worker.id),
waiting_worker: registration
.waiting_worker
.as_ref()
.map(|worker| worker.id),
active_worker: registration.active_worker.as_ref().map(|worker| worker.id),
},
));
}
} else {
let new_registration = ServiceWorkerRegistration::new();
self.registrations
.insert(job.scope_url.clone(), new_registration);
job.job_type = JobType::Update;
let _ = self.own_sender.send(ServiceWorkerMsg::ScheduleJob(job));
}
}
fn handle_update_job(&mut self, job: Job) {
if let Some(registration) = self.registrations.get_mut(&job.scope_url) {
let newest_worker = registration.get_newest_worker();
if let Some(worker) = newest_worker {
if worker.script_url != job.script_url {
let _ = job
.client
.send(JobResult::RejectPromise(JobError::TypeError));
return;
}
}
let scope_things = job
.scope_things
.clone()
.expect("Update job should have scope things.");
let (new_worker, join_handle, control_sender, context, closing) =
update_serviceworker(self.own_sender.clone(), job.scope_url.clone(), scope_things);
registration.note_worker_thread(join_handle, control_sender, context, closing);
registration
.update_registration_state(RegistrationUpdateTarget::Installing, new_worker);
let client = job.client.clone();
let _ = client.send(JobResult::ResolvePromise(
job,
JobResultValue::Registration {
id: registration.id,
installing_worker: registration
.installing_worker
.as_ref()
.map(|worker| worker.id),
waiting_worker: registration.waiting_worker.as_ref().map(|worker| worker.id),
active_worker: registration.active_worker.as_ref().map(|worker| worker.id),
},
));
} else {
let _ = job
.client
.send(JobResult::RejectPromise(JobError::TypeError));
}
}
}
fn update_serviceworker(
own_sender: IpcSender<ServiceWorkerMsg>,
scope_url: ServoUrl,
scope_things: ScopeThings,
) -> (
ServiceWorker,
JoinHandle<()>,
Sender<ServiceWorkerControlMsg>,
ThreadSafeJSContext,
Arc<AtomicBool>,
) {
let (sender, receiver) = unbounded();
let (_devtools_sender, devtools_receiver) = ipc::channel().unwrap();
let worker_id = ServiceWorkerId::new();
let (control_sender, control_receiver) = unbounded();
let (context_sender, context_receiver) = unbounded();
let closing = Arc::new(AtomicBool::new(false));
let join_handle = ServiceWorkerGlobalScope::run_serviceworker_scope(
scope_things.clone(),
sender.clone(),
receiver,
devtools_receiver,
own_sender,
scope_url.clone(),
control_receiver,
context_sender,
closing.clone(),
);
let context = context_receiver
.recv()
.expect("Couldn't receive a context for worker.");
(
ServiceWorker::new(scope_things.script_url, sender, worker_id),
join_handle,
control_sender,
context,
closing,
)
}
impl ServiceWorkerManagerFactory for ServiceWorkerManager {
fn create(sw_senders: SWManagerSenders, origin: ImmutableOrigin) {
let (resource_chan, resource_port) = ipc::channel().unwrap();
let SWManagerSenders {
resource_sender,
own_sender,
receiver,
swmanager_sender: constellation_sender,
} = sw_senders;
let from_constellation = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(receiver);
let resource_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(resource_port);
let _ = resource_sender.send(CoreResourceMsg::NetworkMediator(resource_chan, origin));
let swmanager_thread = move || {
ServiceWorkerManager::new(
own_sender,
from_constellation,
resource_port,
constellation_sender,
)
.handle_message()
};
if thread::Builder::new()
.name("SvcWorkerManager".to_owned())
.spawn(swmanager_thread)
.is_err()
{
warn!("ServiceWorkerManager thread spawning failed");
}
}
}
pub fn serviceworker_enabled() -> bool {
pref!(dom.serviceworker.enabled)
}