net/
resource_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! A thread that takes a URL and streams back the binary data.
6
7use std::borrow::ToOwned;
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::{self, BufReader};
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Weak};
13use std::thread;
14
15use base::id::CookieStoreId;
16use base::threadpool::ThreadPool;
17use cookie::Cookie;
18use crossbeam_channel::Sender;
19use devtools_traits::DevtoolsControlMsg;
20use embedder_traits::EmbedderProxy;
21use hyper_serde::Serde;
22use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
23use log::{debug, trace, warn};
24use net_traits::blob_url_store::parse_blob_url;
25use net_traits::filemanager_thread::FileTokenCheck;
26use net_traits::pub_domains::public_suffix_list_size_of;
27use net_traits::request::{Destination, RequestBuilder, RequestId};
28use net_traits::response::{Response, ResponseInit};
29use net_traits::{
30    AsyncRuntime, CookieAsyncResponse, CookieData, CookieSource, CoreResourceMsg,
31    CoreResourceThread, CustomResponseMediator, DiscardFetch, FetchChannels, FetchTaskTarget,
32    ResourceFetchTiming, ResourceThreads, ResourceTimingType, WebSocketDomAction,
33    WebSocketNetworkEvent,
34};
35use parking_lot::{Mutex, RwLock};
36use profile_traits::mem::{
37    ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, ReportsChan,
38    perform_memory_report,
39};
40use profile_traits::path;
41use profile_traits::time::ProfilerChan;
42use rustc_hash::FxHashMap;
43use rustls::RootCertStore;
44use serde::{Deserialize, Serialize};
45use servo_arc::Arc as ServoArc;
46use servo_url::{ImmutableOrigin, ServoUrl};
47
48use crate::async_runtime::{init_async_runtime, spawn_task};
49use crate::connector::{
50    CACertificates, CertificateErrorOverrideManager, create_http_client, create_tls_config,
51};
52use crate::cookie::ServoCookie;
53use crate::cookie_storage::CookieStorage;
54use crate::fetch::cors_cache::CorsCache;
55use crate::fetch::fetch_params::FetchParams;
56use crate::fetch::methods::{CancellationListener, FetchContext, WebSocketChannel, fetch};
57use crate::filemanager_thread::FileManager;
58use crate::hsts::{self, HstsList};
59use crate::http_cache::HttpCache;
60use crate::http_loader::{HttpState, http_redirect_fetch};
61use crate::protocols::ProtocolRegistry;
62use crate::request_interceptor::RequestInterceptor;
63use crate::websocket_loader::create_handshake_request;
64
65/// Load a file with CA certificate and produce a RootCertStore with the results.
66fn load_root_cert_store_from_file(file_path: String) -> io::Result<RootCertStore> {
67    let mut root_cert_store = RootCertStore::empty();
68
69    let mut pem = BufReader::new(File::open(file_path)?);
70    let certs: Result<Vec<_>, _> = rustls_pemfile::certs(&mut pem).collect();
71    root_cert_store.add_parsable_certificates(certs?);
72    Ok(root_cert_store)
73}
74
75/// Returns a tuple of (public, private) senders to the new threads.
76#[allow(clippy::too_many_arguments)]
77pub fn new_resource_threads(
78    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
79    time_profiler_chan: ProfilerChan,
80    mem_profiler_chan: MemProfilerChan,
81    embedder_proxy: EmbedderProxy,
82    config_dir: Option<PathBuf>,
83    certificate_path: Option<String>,
84    ignore_certificate_errors: bool,
85    protocols: Arc<ProtocolRegistry>,
86) -> (ResourceThreads, ResourceThreads, Box<dyn AsyncRuntime>) {
87    // Initialize the async runtime, and get a handle to it for use in clean shutdown.
88    let async_runtime = init_async_runtime();
89
90    let ca_certificates = match certificate_path {
91        Some(path) => match load_root_cert_store_from_file(path) {
92            Ok(root_cert_store) => CACertificates::Override(root_cert_store),
93            Err(error) => {
94                warn!("Could not load CA file. Falling back to defaults. {error:?}");
95                CACertificates::Default
96            },
97        },
98        None => CACertificates::Default,
99    };
100
101    let (public_core, private_core) = new_core_resource_thread(
102        devtools_sender,
103        time_profiler_chan,
104        mem_profiler_chan.clone(),
105        embedder_proxy,
106        config_dir.clone(),
107        ca_certificates,
108        ignore_certificate_errors,
109        protocols,
110    );
111    (
112        ResourceThreads::new(public_core),
113        ResourceThreads::new(private_core),
114        async_runtime,
115    )
116}
117
118/// Create a CoreResourceThread
119#[allow(clippy::too_many_arguments)]
120pub fn new_core_resource_thread(
121    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
122    time_profiler_chan: ProfilerChan,
123    mem_profiler_chan: MemProfilerChan,
124    embedder_proxy: EmbedderProxy,
125    config_dir: Option<PathBuf>,
126    ca_certificates: CACertificates,
127    ignore_certificate_errors: bool,
128    protocols: Arc<ProtocolRegistry>,
129) -> (CoreResourceThread, CoreResourceThread) {
130    let (public_setup_chan, public_setup_port) = ipc::channel().unwrap();
131    let (private_setup_chan, private_setup_port) = ipc::channel().unwrap();
132    let (report_chan, report_port) = ipc::channel().unwrap();
133
134    thread::Builder::new()
135        .name("ResourceManager".to_owned())
136        .spawn(move || {
137            let resource_manager = CoreResourceManager::new(
138                devtools_sender,
139                time_profiler_chan,
140                embedder_proxy.clone(),
141                ca_certificates.clone(),
142                ignore_certificate_errors,
143            );
144
145            let mut channel_manager = ResourceChannelManager {
146                resource_manager,
147                config_dir,
148                ca_certificates,
149                ignore_certificate_errors,
150                cancellation_listeners: Default::default(),
151                cookie_listeners: Default::default(),
152            };
153
154            mem_profiler_chan.run_with_memory_reporting(
155                || {
156                    channel_manager.start(
157                        public_setup_port,
158                        private_setup_port,
159                        report_port,
160                        protocols,
161                        embedder_proxy,
162                    )
163                },
164                String::from("network-cache-reporter"),
165                report_chan,
166                |report_chan| report_chan,
167            );
168        })
169        .expect("Thread spawning failed");
170    (public_setup_chan, private_setup_chan)
171}
172
173struct ResourceChannelManager {
174    resource_manager: CoreResourceManager,
175    config_dir: Option<PathBuf>,
176    ca_certificates: CACertificates,
177    ignore_certificate_errors: bool,
178    cancellation_listeners: FxHashMap<RequestId, Weak<CancellationListener>>,
179    cookie_listeners: FxHashMap<CookieStoreId, IpcSender<CookieAsyncResponse>>,
180}
181
182fn create_http_states(
183    config_dir: Option<&Path>,
184    ca_certificates: CACertificates,
185    ignore_certificate_errors: bool,
186    embedder_proxy: EmbedderProxy,
187) -> (Arc<HttpState>, Arc<HttpState>) {
188    let mut hsts_list = HstsList::default();
189    let mut auth_cache = AuthCache::default();
190    let http_cache = HttpCache::default();
191    let mut cookie_jar = CookieStorage::new(150);
192    if let Some(config_dir) = config_dir {
193        base::read_json_from_file(&mut auth_cache, config_dir, "auth_cache.json");
194        base::read_json_from_file(&mut hsts_list, config_dir, "hsts_list.json");
195        base::read_json_from_file(&mut cookie_jar, config_dir, "cookie_jar.json");
196    }
197
198    let override_manager = CertificateErrorOverrideManager::new();
199    let http_state = HttpState {
200        hsts_list: RwLock::new(hsts_list),
201        cookie_jar: RwLock::new(cookie_jar),
202        auth_cache: RwLock::new(auth_cache),
203        history_states: RwLock::new(FxHashMap::default()),
204        http_cache: RwLock::new(http_cache),
205        http_cache_state: Mutex::new(HashMap::new()),
206        client: create_http_client(create_tls_config(
207            ca_certificates.clone(),
208            ignore_certificate_errors,
209            override_manager.clone(),
210        )),
211        override_manager,
212        embedder_proxy: Mutex::new(embedder_proxy.clone()),
213    };
214
215    let override_manager = CertificateErrorOverrideManager::new();
216    let private_http_state = HttpState {
217        hsts_list: RwLock::new(HstsList::default()),
218        cookie_jar: RwLock::new(CookieStorage::new(150)),
219        auth_cache: RwLock::new(AuthCache::default()),
220        history_states: RwLock::new(FxHashMap::default()),
221        http_cache: RwLock::new(HttpCache::default()),
222        http_cache_state: Mutex::new(HashMap::new()),
223        client: create_http_client(create_tls_config(
224            ca_certificates,
225            ignore_certificate_errors,
226            override_manager.clone(),
227        )),
228        override_manager,
229        embedder_proxy: Mutex::new(embedder_proxy),
230    };
231
232    (Arc::new(http_state), Arc::new(private_http_state))
233}
234
235impl ResourceChannelManager {
236    fn start(
237        &mut self,
238        public_receiver: IpcReceiver<CoreResourceMsg>,
239        private_receiver: IpcReceiver<CoreResourceMsg>,
240        memory_reporter: IpcReceiver<ReportsChan>,
241        protocols: Arc<ProtocolRegistry>,
242        embedder_proxy: EmbedderProxy,
243    ) {
244        let (public_http_state, private_http_state) = create_http_states(
245            self.config_dir.as_deref(),
246            self.ca_certificates.clone(),
247            self.ignore_certificate_errors,
248            embedder_proxy,
249        );
250
251        let mut rx_set = IpcReceiverSet::new().unwrap();
252        let private_id = rx_set.add(private_receiver).unwrap();
253        let public_id = rx_set.add(public_receiver).unwrap();
254        let reporter_id = rx_set.add(memory_reporter).unwrap();
255
256        loop {
257            for receiver in rx_set.select().unwrap().into_iter() {
258                // Handles case where profiler thread shuts down before resource thread.
259                if let ipc::IpcSelectionResult::ChannelClosed(..) = receiver {
260                    continue;
261                }
262                let (id, data) = receiver.unwrap();
263                // If message is memory report, get the size_of of public and private http caches
264                if id == reporter_id {
265                    if let Ok(msg) = data.to() {
266                        self.process_report(msg, &public_http_state, &private_http_state);
267                        continue;
268                    }
269                } else {
270                    let group = if id == private_id {
271                        &private_http_state
272                    } else {
273                        assert_eq!(id, public_id);
274                        &public_http_state
275                    };
276                    if let Ok(msg) = data.to() {
277                        if !self.process_msg(msg, group, Arc::clone(&protocols)) {
278                            return;
279                        }
280                    }
281                }
282            }
283        }
284    }
285
286    fn process_report(
287        &mut self,
288        msg: ReportsChan,
289        public_http_state: &Arc<HttpState>,
290        private_http_state: &Arc<HttpState>,
291    ) {
292        perform_memory_report(|ops| {
293            let mut reports = public_http_state.memory_reports("public", ops);
294            reports.extend(private_http_state.memory_reports("private", ops));
295            reports.extend(vec![
296                Report {
297                    path: path!["hsts-preload-list"],
298                    kind: ReportKind::ExplicitJemallocHeapSize,
299                    size: hsts::hsts_preload_size_of(ops),
300                },
301                Report {
302                    path: path!["public-suffix-list"],
303                    kind: ReportKind::ExplicitJemallocHeapSize,
304                    size: public_suffix_list_size_of(ops),
305                },
306            ]);
307            msg.send(ProcessReports::new(reports));
308        })
309    }
310
311    fn cancellation_listener(&self, request_id: RequestId) -> Option<Arc<CancellationListener>> {
312        self.cancellation_listeners
313            .get(&request_id)
314            .and_then(Weak::upgrade)
315    }
316
317    fn get_or_create_cancellation_listener(
318        &mut self,
319        request_id: RequestId,
320    ) -> Arc<CancellationListener> {
321        if let Some(listener) = self.cancellation_listener(request_id) {
322            return listener;
323        }
324
325        // Clear away any cancellation listeners that are no longer valid.
326        self.cancellation_listeners
327            .retain(|_, listener| listener.strong_count() > 0);
328
329        let cancellation_listener = Arc::new(Default::default());
330        self.cancellation_listeners
331            .insert(request_id, Arc::downgrade(&cancellation_listener));
332        cancellation_listener
333    }
334
335    fn send_cookie_response(&self, store_id: CookieStoreId, data: CookieData) {
336        let Some(sender) = self.cookie_listeners.get(&store_id) else {
337            warn!(
338                "Async cookie request made for store id that is non-existent {:?}",
339                store_id
340            );
341            return;
342        };
343        let res = sender.send(CookieAsyncResponse { data });
344        if res.is_err() {
345            warn!("Unable to send cookie response to script thread");
346        }
347    }
348
349    /// Returns false if the thread should exit.
350    fn process_msg(
351        &mut self,
352        msg: CoreResourceMsg,
353        http_state: &Arc<HttpState>,
354        protocols: Arc<ProtocolRegistry>,
355    ) -> bool {
356        match msg {
357            CoreResourceMsg::Fetch(request_builder, channels) => match channels {
358                FetchChannels::ResponseMsg(sender) => {
359                    let cancellation_listener =
360                        self.get_or_create_cancellation_listener(request_builder.id);
361                    self.resource_manager.fetch(
362                        request_builder,
363                        None,
364                        sender,
365                        http_state,
366                        cancellation_listener,
367                        protocols,
368                    );
369                },
370                FetchChannels::WebSocket {
371                    event_sender,
372                    action_receiver,
373                } => {
374                    let cancellation_listener =
375                        self.get_or_create_cancellation_listener(request_builder.id);
376
377                    self.resource_manager.websocket_connect(
378                        request_builder,
379                        event_sender,
380                        action_receiver,
381                        http_state,
382                        cancellation_listener,
383                        protocols,
384                    )
385                },
386                FetchChannels::Prefetch => self.resource_manager.fetch(
387                    request_builder,
388                    None,
389                    DiscardFetch,
390                    http_state,
391                    Arc::new(Default::default()),
392                    protocols,
393                ),
394            },
395            CoreResourceMsg::Cancel(request_ids) => {
396                for cancellation_listener in request_ids
397                    .into_iter()
398                    .filter_map(|request_id| self.cancellation_listener(request_id))
399                {
400                    cancellation_listener.cancel();
401                }
402            },
403            CoreResourceMsg::DeleteCookies(request, sender) => {
404                http_state
405                    .cookie_jar
406                    .write()
407                    .clear_storage(request.as_ref());
408                if let Some(sender) = sender {
409                    let _ = sender.send(());
410                }
411                return true;
412            },
413            CoreResourceMsg::DeleteCookie(request, name) => {
414                http_state
415                    .cookie_jar
416                    .write()
417                    .delete_cookie_with_name(&request, name);
418                return true;
419            },
420            CoreResourceMsg::DeleteCookieAsync(cookie_store_id, url, name) => {
421                http_state
422                    .cookie_jar
423                    .write()
424                    .delete_cookie_with_name(&url, name);
425                self.send_cookie_response(cookie_store_id, CookieData::Delete(Ok(())));
426            },
427            CoreResourceMsg::FetchRedirect(request_builder, res_init, sender) => {
428                let cancellation_listener =
429                    self.get_or_create_cancellation_listener(request_builder.id);
430                self.resource_manager.fetch(
431                    request_builder,
432                    Some(res_init),
433                    sender,
434                    http_state,
435                    cancellation_listener,
436                    protocols,
437                )
438            },
439            CoreResourceMsg::SetCookieForUrl(request, cookie, source) => self
440                .resource_manager
441                .set_cookie_for_url(&request, cookie.into_inner().to_owned(), source, http_state),
442            CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
443                for cookie in cookies {
444                    self.resource_manager.set_cookie_for_url(
445                        &request,
446                        cookie.into_inner(),
447                        source,
448                        http_state,
449                    );
450                }
451            },
452            CoreResourceMsg::SetCookieForUrlAsync(cookie_store_id, url, cookie, source) => {
453                self.resource_manager.set_cookie_for_url(
454                    &url,
455                    cookie.into_inner().to_owned(),
456                    source,
457                    http_state,
458                );
459                self.send_cookie_response(cookie_store_id, CookieData::Set(Ok(())));
460            },
461            CoreResourceMsg::GetCookiesForUrl(url, consumer, source) => {
462                let mut cookie_jar = http_state.cookie_jar.write();
463                cookie_jar.remove_expired_cookies_for_url(&url);
464                consumer
465                    .send(cookie_jar.cookies_for_url(&url, source))
466                    .unwrap();
467            },
468            CoreResourceMsg::GetCookieDataForUrlAsync(cookie_store_id, url, name) => {
469                let mut cookie_jar = http_state.cookie_jar.write();
470                cookie_jar.remove_expired_cookies_for_url(&url);
471                let cookie = cookie_jar
472                    .query_cookies(&url, name)
473                    .into_iter()
474                    .map(Serde)
475                    .next();
476                self.send_cookie_response(cookie_store_id, CookieData::Get(cookie));
477            },
478            CoreResourceMsg::GetAllCookieDataForUrlAsync(cookie_store_id, url, name) => {
479                let mut cookie_jar = http_state.cookie_jar.write();
480                cookie_jar.remove_expired_cookies_for_url(&url);
481                let cookies = cookie_jar
482                    .query_cookies(&url, name)
483                    .into_iter()
484                    .map(Serde)
485                    .collect();
486                self.send_cookie_response(cookie_store_id, CookieData::GetAll(cookies));
487            },
488            CoreResourceMsg::NewCookieListener(cookie_store_id, sender, _url) => {
489                // TODO: Use the URL for setting up the actual monitoring
490                self.cookie_listeners.insert(cookie_store_id, sender);
491            },
492            CoreResourceMsg::RemoveCookieListener(cookie_store_id) => {
493                self.cookie_listeners.remove(&cookie_store_id);
494            },
495            CoreResourceMsg::NetworkMediator(mediator_chan, origin) => {
496                self.resource_manager
497                    .sw_managers
498                    .insert(origin, mediator_chan);
499            },
500            CoreResourceMsg::GetCookiesDataForUrl(url, consumer, source) => {
501                let mut cookie_jar = http_state.cookie_jar.write();
502                cookie_jar.remove_expired_cookies_for_url(&url);
503                let cookies = cookie_jar
504                    .cookies_data_for_url(&url, source)
505                    .map(Serde)
506                    .collect();
507                consumer.send(cookies).unwrap();
508            },
509            CoreResourceMsg::GetHistoryState(history_state_id, consumer) => {
510                let history_states = http_state.history_states.read();
511                consumer
512                    .send(history_states.get(&history_state_id).cloned())
513                    .unwrap();
514            },
515            CoreResourceMsg::SetHistoryState(history_state_id, structured_data) => {
516                let mut history_states = http_state.history_states.write();
517                history_states.insert(history_state_id, structured_data);
518            },
519            CoreResourceMsg::RemoveHistoryStates(states_to_remove) => {
520                let mut history_states = http_state.history_states.write();
521                for history_state in states_to_remove {
522                    history_states.remove(&history_state);
523                }
524            },
525            CoreResourceMsg::ClearCache => {
526                http_state.http_cache.write().clear();
527            },
528            CoreResourceMsg::ToFileManager(msg) => self.resource_manager.filemanager.handle(msg),
529            CoreResourceMsg::Exit(sender) => {
530                if let Some(ref config_dir) = self.config_dir {
531                    let auth_cache = http_state.auth_cache.read();
532                    base::write_json_to_file(&*auth_cache, config_dir, "auth_cache.json");
533                    let jar = http_state.cookie_jar.read();
534                    base::write_json_to_file(&*jar, config_dir, "cookie_jar.json");
535                    let hsts = http_state.hsts_list.read();
536                    base::write_json_to_file(&*hsts, config_dir, "hsts_list.json");
537                }
538                self.resource_manager.exit();
539                let _ = sender.send(());
540                return false;
541            },
542        }
543        true
544    }
545}
546
547#[derive(Clone, Debug, Deserialize, Serialize)]
548pub struct AuthCacheEntry {
549    pub user_name: String,
550    pub password: String,
551}
552
553impl Default for AuthCache {
554    fn default() -> Self {
555        Self {
556            version: 1,
557            entries: HashMap::new(),
558        }
559    }
560}
561
562#[derive(Clone, Debug, Deserialize, Serialize)]
563pub struct AuthCache {
564    pub version: u32,
565    pub entries: HashMap<String, AuthCacheEntry>,
566}
567
568pub struct CoreResourceManager {
569    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
570    sw_managers: HashMap<ImmutableOrigin, IpcSender<CustomResponseMediator>>,
571    filemanager: FileManager,
572    request_interceptor: RequestInterceptor,
573    thread_pool: Arc<ThreadPool>,
574    ca_certificates: CACertificates,
575    ignore_certificate_errors: bool,
576}
577
578impl CoreResourceManager {
579    pub fn new(
580        devtools_sender: Option<Sender<DevtoolsControlMsg>>,
581        _profiler_chan: ProfilerChan,
582        embedder_proxy: EmbedderProxy,
583        ca_certificates: CACertificates,
584        ignore_certificate_errors: bool,
585    ) -> CoreResourceManager {
586        let num_threads = thread::available_parallelism()
587            .map(|i| i.get())
588            .unwrap_or(servo_config::pref!(threadpools_fallback_worker_num) as usize)
589            .min(servo_config::pref!(threadpools_resource_workers_max).max(1) as usize);
590        let pool = ThreadPool::new(num_threads, "CoreResourceThreadPool".to_string());
591        let pool_handle = Arc::new(pool);
592        CoreResourceManager {
593            devtools_sender,
594            sw_managers: Default::default(),
595            filemanager: FileManager::new(embedder_proxy.clone(), Arc::downgrade(&pool_handle)),
596            request_interceptor: RequestInterceptor::new(embedder_proxy),
597            thread_pool: pool_handle,
598            ca_certificates,
599            ignore_certificate_errors,
600        }
601    }
602
603    /// Exit the core resource manager.
604    pub fn exit(&mut self) {
605        // Prevents further work from being spawned on the pool,
606        // blocks until all workers in the pool are done,
607        // or a short timeout has been reached.
608        self.thread_pool.exit();
609
610        debug!("Exited CoreResourceManager");
611    }
612
613    fn set_cookie_for_url(
614        &mut self,
615        request: &ServoUrl,
616        cookie: Cookie<'static>,
617        source: CookieSource,
618        http_state: &Arc<HttpState>,
619    ) {
620        if let Some(cookie) = ServoCookie::new_wrapped(cookie, request, source) {
621            let mut cookie_jar = http_state.cookie_jar.write();
622            cookie_jar.push(cookie, request, source)
623        }
624    }
625
626    fn fetch<Target: 'static + FetchTaskTarget + Send>(
627        &self,
628        request_builder: RequestBuilder,
629        res_init_: Option<ResponseInit>,
630        mut sender: Target,
631        http_state: &Arc<HttpState>,
632        cancellation_listener: Arc<CancellationListener>,
633        protocols: Arc<ProtocolRegistry>,
634    ) {
635        let http_state = http_state.clone();
636        let dc = self.devtools_sender.clone();
637        let filemanager = self.filemanager.clone();
638        let request_interceptor = self.request_interceptor.clone();
639
640        let timing_type = match request_builder.destination {
641            Destination::Document => ResourceTimingType::Navigation,
642            _ => ResourceTimingType::Resource,
643        };
644
645        let request = request_builder.build();
646        let url = request.current_url();
647
648        // In the case of a valid blob URL, acquiring a token granting access to a file,
649        // regardless if the URL is revoked after token acquisition.
650        //
651        // TODO: to make more tests pass, acquire this token earlier,
652        // probably in a separate message flow.
653        //
654        // In such a setup, the token would not be acquired here,
655        // but could instead be contained in the actual CoreResourceMsg::Fetch message.
656        //
657        // See https://github.com/servo/servo/issues/25226
658        let (file_token, blob_url_file_id) = match url.scheme() {
659            "blob" => {
660                if let Ok((id, _)) = parse_blob_url(&url) {
661                    (self.filemanager.get_token_for_file(&id), Some(id))
662                } else {
663                    (FileTokenCheck::ShouldFail, None)
664                }
665            },
666            _ => (FileTokenCheck::NotRequired, None),
667        };
668
669        let ca_certificates = self.ca_certificates.clone();
670        let ignore_certificate_errors = self.ignore_certificate_errors;
671
672        spawn_task(async move {
673            // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed)
674            // todo load context / mimesniff in fetch
675            // todo referrer policy?
676            // todo service worker stuff
677            let context = FetchContext {
678                state: http_state,
679                user_agent: servo_config::pref!(user_agent),
680                devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
681                filemanager: Arc::new(Mutex::new(filemanager)),
682                file_token,
683                request_interceptor: Arc::new(Mutex::new(request_interceptor)),
684                cancellation_listener,
685                timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))),
686                protocols,
687                websocket_chan: None,
688                ca_certificates,
689                ignore_certificate_errors,
690            };
691
692            match res_init_ {
693                Some(res_init) => {
694                    let response = Response::from_init(res_init, timing_type);
695
696                    let mut fetch_params = FetchParams::new(request);
697                    http_redirect_fetch(
698                        &mut fetch_params,
699                        &mut CorsCache::default(),
700                        response,
701                        true,
702                        &mut sender,
703                        &mut None,
704                        &context,
705                    )
706                    .await;
707                },
708                None => {
709                    fetch(request, &mut sender, &context).await;
710                },
711            };
712
713            // Remove token after fetch.
714            if let Some(id) = blob_url_file_id.as_ref() {
715                context
716                    .filemanager
717                    .lock()
718                    .invalidate_token(&context.file_token, id);
719            }
720        });
721    }
722
723    /// <https://websockets.spec.whatwg.org/#concept-websocket-establish>
724    fn websocket_connect(
725        &self,
726        mut request: RequestBuilder,
727        event_sender: IpcSender<WebSocketNetworkEvent>,
728        action_receiver: IpcReceiver<WebSocketDomAction>,
729        http_state: &Arc<HttpState>,
730        cancellation_listener: Arc<CancellationListener>,
731        protocols: Arc<ProtocolRegistry>,
732    ) {
733        let http_state = http_state.clone();
734        let dc = self.devtools_sender.clone();
735        let filemanager = self.filemanager.clone();
736        let request_interceptor = self.request_interceptor.clone();
737
738        let ca_certificates = self.ca_certificates.clone();
739        let ignore_certificate_errors = self.ignore_certificate_errors;
740
741        spawn_task(async move {
742            let mut event_sender = event_sender;
743
744            // Let requestURL be a copy of url, with its scheme set to "http", if url’s scheme is
745            // "ws"; otherwise to "https"
746            let scheme = match request.url.scheme() {
747                "ws" => "http",
748                _ => "https",
749            };
750            request
751                .url
752                .as_mut_url()
753                .set_scheme(scheme)
754                .unwrap_or_else(|_| panic!("Can't set scheme to {scheme}"));
755
756            match create_handshake_request(request, http_state.clone()) {
757                Ok(request) => {
758                    let context = FetchContext {
759                        state: http_state,
760                        user_agent: servo_config::pref!(user_agent),
761                        devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
762                        filemanager: Arc::new(Mutex::new(filemanager)),
763                        file_token: FileTokenCheck::NotRequired,
764                        request_interceptor: Arc::new(Mutex::new(request_interceptor)),
765                        cancellation_listener,
766                        timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
767                            request.timing_type(),
768                        ))),
769                        protocols: protocols.clone(),
770                        websocket_chan: Some(Arc::new(Mutex::new(WebSocketChannel::new(
771                            event_sender.clone(),
772                            Some(action_receiver),
773                        )))),
774                        ca_certificates,
775                        ignore_certificate_errors,
776                    };
777                    fetch(request, &mut event_sender, &context).await;
778                },
779                Err(e) => {
780                    trace!("unable to create websocket handshake request {:?}", e);
781                    let _ = event_sender.send(WebSocketNetworkEvent::Fail);
782                },
783            }
784        });
785    }
786}