net/
resource_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! A thread that takes a URL and streams back the binary data.
6
7use std::borrow::ToOwned;
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::prelude::*;
11use std::io::{self, BufReader, BufWriter};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use std::thread;
15use std::time::Duration;
16
17use base::generic_channel::GenericSender;
18use base::id::CookieStoreId;
19use cookie::Cookie;
20use crossbeam_channel::Sender;
21use devtools_traits::DevtoolsControlMsg;
22use embedder_traits::EmbedderProxy;
23use hyper_serde::Serde;
24use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
25use log::{debug, trace, warn};
26use net_traits::blob_url_store::parse_blob_url;
27use net_traits::filemanager_thread::FileTokenCheck;
28use net_traits::indexeddb_thread::IndexedDBThreadMsg;
29use net_traits::pub_domains::public_suffix_list_size_of;
30use net_traits::request::{Destination, RequestBuilder, RequestId};
31use net_traits::response::{Response, ResponseInit};
32use net_traits::storage_thread::StorageThreadMsg;
33use net_traits::{
34    AsyncRuntime, CookieAsyncResponse, CookieData, CookieSource, CoreResourceMsg,
35    CoreResourceThread, CustomResponseMediator, DiscardFetch, FetchChannels, FetchTaskTarget,
36    ResourceFetchTiming, ResourceThreads, ResourceTimingType, WebSocketDomAction,
37    WebSocketNetworkEvent,
38};
39use profile_traits::mem::{
40    ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, ReportsChan,
41    perform_memory_report,
42};
43use profile_traits::path;
44use profile_traits::time::ProfilerChan;
45use rustls::RootCertStore;
46use serde::{Deserialize, Serialize};
47use servo_arc::Arc as ServoArc;
48use servo_url::{ImmutableOrigin, ServoUrl};
49
50use crate::async_runtime::{init_async_runtime, spawn_task};
51use crate::connector::{
52    CACertificates, CertificateErrorOverrideManager, create_http_client, create_tls_config,
53};
54use crate::cookie::ServoCookie;
55use crate::cookie_storage::CookieStorage;
56use crate::fetch::cors_cache::CorsCache;
57use crate::fetch::fetch_params::FetchParams;
58use crate::fetch::methods::{CancellationListener, FetchContext, fetch};
59use crate::filemanager_thread::FileManager;
60use crate::hsts::{self, HstsList};
61use crate::http_cache::HttpCache;
62use crate::http_loader::{HttpState, http_redirect_fetch};
63use crate::indexeddb::idb_thread::IndexedDBThreadFactory;
64use crate::protocols::ProtocolRegistry;
65use crate::request_interceptor::RequestInterceptor;
66use crate::storage_thread::StorageThreadFactory;
67use crate::websocket_loader;
68
69/// Load a file with CA certificate and produce a RootCertStore with the results.
70fn load_root_cert_store_from_file(file_path: String) -> io::Result<RootCertStore> {
71    let mut root_cert_store = RootCertStore::empty();
72
73    let mut pem = BufReader::new(File::open(file_path)?);
74    let certs: Result<Vec<_>, _> = rustls_pemfile::certs(&mut pem).collect();
75    root_cert_store.add_parsable_certificates(certs?);
76    Ok(root_cert_store)
77}
78
79/// Returns a tuple of (public, private) senders to the new threads.
80#[allow(clippy::too_many_arguments)]
81pub fn new_resource_threads(
82    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
83    time_profiler_chan: ProfilerChan,
84    mem_profiler_chan: MemProfilerChan,
85    embedder_proxy: EmbedderProxy,
86    config_dir: Option<PathBuf>,
87    certificate_path: Option<String>,
88    ignore_certificate_errors: bool,
89    protocols: Arc<ProtocolRegistry>,
90) -> (ResourceThreads, ResourceThreads, Box<dyn AsyncRuntime>) {
91    // Initialize the async runtime, and get a handle to it for use in clean shutdown.
92    let async_runtime = init_async_runtime();
93
94    let ca_certificates = match certificate_path {
95        Some(path) => match load_root_cert_store_from_file(path) {
96            Ok(root_cert_store) => CACertificates::Override(root_cert_store),
97            Err(error) => {
98                warn!("Could not load CA file. Falling back to defaults. {error:?}");
99                CACertificates::Default
100            },
101        },
102        None => CACertificates::Default,
103    };
104
105    let (public_core, private_core) = new_core_resource_thread(
106        devtools_sender,
107        time_profiler_chan,
108        mem_profiler_chan.clone(),
109        embedder_proxy,
110        config_dir.clone(),
111        ca_certificates,
112        ignore_certificate_errors,
113        protocols,
114    );
115    let idb: IpcSender<IndexedDBThreadMsg> = IndexedDBThreadFactory::new(config_dir.clone());
116    let storage: GenericSender<StorageThreadMsg> =
117        StorageThreadFactory::new(config_dir, mem_profiler_chan);
118    (
119        ResourceThreads::new(public_core, storage.clone(), idb.clone()),
120        ResourceThreads::new(private_core, storage, idb),
121        async_runtime,
122    )
123}
124
125/// Create a CoreResourceThread
126#[allow(clippy::too_many_arguments)]
127pub fn new_core_resource_thread(
128    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
129    time_profiler_chan: ProfilerChan,
130    mem_profiler_chan: MemProfilerChan,
131    embedder_proxy: EmbedderProxy,
132    config_dir: Option<PathBuf>,
133    ca_certificates: CACertificates,
134    ignore_certificate_errors: bool,
135    protocols: Arc<ProtocolRegistry>,
136) -> (CoreResourceThread, CoreResourceThread) {
137    let (public_setup_chan, public_setup_port) = ipc::channel().unwrap();
138    let (private_setup_chan, private_setup_port) = ipc::channel().unwrap();
139    let (report_chan, report_port) = ipc::channel().unwrap();
140
141    thread::Builder::new()
142        .name("ResourceManager".to_owned())
143        .spawn(move || {
144            let resource_manager = CoreResourceManager::new(
145                devtools_sender,
146                time_profiler_chan,
147                embedder_proxy.clone(),
148                ca_certificates.clone(),
149                ignore_certificate_errors,
150            );
151
152            let mut channel_manager = ResourceChannelManager {
153                resource_manager,
154                config_dir,
155                ca_certificates,
156                ignore_certificate_errors,
157                cancellation_listeners: Default::default(),
158                cookie_listeners: Default::default(),
159            };
160
161            mem_profiler_chan.run_with_memory_reporting(
162                || {
163                    channel_manager.start(
164                        public_setup_port,
165                        private_setup_port,
166                        report_port,
167                        protocols,
168                        embedder_proxy,
169                    )
170                },
171                String::from("network-cache-reporter"),
172                report_chan,
173                |report_chan| report_chan,
174            );
175        })
176        .expect("Thread spawning failed");
177    (public_setup_chan, private_setup_chan)
178}
179
180struct ResourceChannelManager {
181    resource_manager: CoreResourceManager,
182    config_dir: Option<PathBuf>,
183    ca_certificates: CACertificates,
184    ignore_certificate_errors: bool,
185    cancellation_listeners: HashMap<RequestId, Weak<CancellationListener>>,
186    cookie_listeners: HashMap<CookieStoreId, IpcSender<CookieAsyncResponse>>,
187}
188
189fn create_http_states(
190    config_dir: Option<&Path>,
191    ca_certificates: CACertificates,
192    ignore_certificate_errors: bool,
193    embedder_proxy: EmbedderProxy,
194) -> (Arc<HttpState>, Arc<HttpState>) {
195    let mut hsts_list = HstsList::default();
196    let mut auth_cache = AuthCache::default();
197    let http_cache = HttpCache::default();
198    let mut cookie_jar = CookieStorage::new(150);
199    if let Some(config_dir) = config_dir {
200        read_json_from_file(&mut auth_cache, config_dir, "auth_cache.json");
201        read_json_from_file(&mut hsts_list, config_dir, "hsts_list.json");
202        read_json_from_file(&mut cookie_jar, config_dir, "cookie_jar.json");
203    }
204
205    let override_manager = CertificateErrorOverrideManager::new();
206    let http_state = HttpState {
207        hsts_list: RwLock::new(hsts_list),
208        cookie_jar: RwLock::new(cookie_jar),
209        auth_cache: RwLock::new(auth_cache),
210        history_states: RwLock::new(HashMap::new()),
211        http_cache: RwLock::new(http_cache),
212        http_cache_state: Mutex::new(HashMap::new()),
213        client: create_http_client(create_tls_config(
214            ca_certificates.clone(),
215            ignore_certificate_errors,
216            override_manager.clone(),
217        )),
218        override_manager,
219        embedder_proxy: Mutex::new(embedder_proxy.clone()),
220    };
221
222    let override_manager = CertificateErrorOverrideManager::new();
223    let private_http_state = HttpState {
224        hsts_list: RwLock::new(HstsList::default()),
225        cookie_jar: RwLock::new(CookieStorage::new(150)),
226        auth_cache: RwLock::new(AuthCache::default()),
227        history_states: RwLock::new(HashMap::new()),
228        http_cache: RwLock::new(HttpCache::default()),
229        http_cache_state: Mutex::new(HashMap::new()),
230        client: create_http_client(create_tls_config(
231            ca_certificates,
232            ignore_certificate_errors,
233            override_manager.clone(),
234        )),
235        override_manager,
236        embedder_proxy: Mutex::new(embedder_proxy),
237    };
238
239    (Arc::new(http_state), Arc::new(private_http_state))
240}
241
242impl ResourceChannelManager {
243    #[allow(unsafe_code)]
244    fn start(
245        &mut self,
246        public_receiver: IpcReceiver<CoreResourceMsg>,
247        private_receiver: IpcReceiver<CoreResourceMsg>,
248        memory_reporter: IpcReceiver<ReportsChan>,
249        protocols: Arc<ProtocolRegistry>,
250        embedder_proxy: EmbedderProxy,
251    ) {
252        let (public_http_state, private_http_state) = create_http_states(
253            self.config_dir.as_deref(),
254            self.ca_certificates.clone(),
255            self.ignore_certificate_errors,
256            embedder_proxy,
257        );
258
259        let mut rx_set = IpcReceiverSet::new().unwrap();
260        let private_id = rx_set.add(private_receiver).unwrap();
261        let public_id = rx_set.add(public_receiver).unwrap();
262        let reporter_id = rx_set.add(memory_reporter).unwrap();
263
264        loop {
265            for receiver in rx_set.select().unwrap().into_iter() {
266                // Handles case where profiler thread shuts down before resource thread.
267                if let ipc::IpcSelectionResult::ChannelClosed(..) = receiver {
268                    continue;
269                }
270                let (id, data) = receiver.unwrap();
271                // If message is memory report, get the size_of of public and private http caches
272                if id == reporter_id {
273                    if let Ok(msg) = data.to() {
274                        self.process_report(msg, &public_http_state, &private_http_state);
275                        continue;
276                    }
277                } else {
278                    let group = if id == private_id {
279                        &private_http_state
280                    } else {
281                        assert_eq!(id, public_id);
282                        &public_http_state
283                    };
284                    if let Ok(msg) = data.to() {
285                        if !self.process_msg(msg, group, Arc::clone(&protocols)) {
286                            return;
287                        }
288                    }
289                }
290            }
291        }
292    }
293
294    fn process_report(
295        &mut self,
296        msg: ReportsChan,
297        public_http_state: &Arc<HttpState>,
298        private_http_state: &Arc<HttpState>,
299    ) {
300        perform_memory_report(|ops| {
301            let mut reports = public_http_state.memory_reports("public", ops);
302            reports.extend(private_http_state.memory_reports("private", ops));
303            reports.extend(vec![
304                Report {
305                    path: path!["hsts-preload-list"],
306                    kind: ReportKind::ExplicitJemallocHeapSize,
307                    size: hsts::hsts_preload_size_of(ops),
308                },
309                Report {
310                    path: path!["public-suffix-list"],
311                    kind: ReportKind::ExplicitJemallocHeapSize,
312                    size: public_suffix_list_size_of(ops),
313                },
314            ]);
315            msg.send(ProcessReports::new(reports));
316        })
317    }
318
319    fn cancellation_listener(&self, request_id: RequestId) -> Option<Arc<CancellationListener>> {
320        self.cancellation_listeners
321            .get(&request_id)
322            .and_then(Weak::upgrade)
323    }
324
325    fn get_or_create_cancellation_listener(
326        &mut self,
327        request_id: RequestId,
328    ) -> Arc<CancellationListener> {
329        if let Some(listener) = self.cancellation_listener(request_id) {
330            return listener;
331        }
332
333        // Clear away any cancellation listeners that are no longer valid.
334        self.cancellation_listeners
335            .retain(|_, listener| listener.strong_count() > 0);
336
337        let cancellation_listener = Arc::new(Default::default());
338        self.cancellation_listeners
339            .insert(request_id, Arc::downgrade(&cancellation_listener));
340        cancellation_listener
341    }
342
343    fn send_cookie_response(&self, store_id: CookieStoreId, data: CookieData) {
344        let Some(sender) = self.cookie_listeners.get(&store_id) else {
345            warn!(
346                "Async cookie request made for store id that is non-existent {:?}",
347                store_id
348            );
349            return;
350        };
351        let res = sender.send(CookieAsyncResponse { data });
352        if res.is_err() {
353            warn!("Unable to send cookie response to script thread");
354        }
355    }
356
357    /// Returns false if the thread should exit.
358    fn process_msg(
359        &mut self,
360        msg: CoreResourceMsg,
361        http_state: &Arc<HttpState>,
362        protocols: Arc<ProtocolRegistry>,
363    ) -> bool {
364        match msg {
365            CoreResourceMsg::Fetch(request_builder, channels) => match channels {
366                FetchChannels::ResponseMsg(sender) => {
367                    let cancellation_listener =
368                        self.get_or_create_cancellation_listener(request_builder.id);
369                    self.resource_manager.fetch(
370                        request_builder,
371                        None,
372                        sender,
373                        http_state,
374                        cancellation_listener,
375                        protocols,
376                    );
377                },
378                FetchChannels::WebSocket {
379                    event_sender,
380                    action_receiver,
381                } => self.resource_manager.websocket_connect(
382                    request_builder,
383                    event_sender,
384                    action_receiver,
385                    http_state,
386                ),
387                FetchChannels::Prefetch => self.resource_manager.fetch(
388                    request_builder,
389                    None,
390                    DiscardFetch,
391                    http_state,
392                    Arc::new(Default::default()),
393                    protocols,
394                ),
395            },
396            CoreResourceMsg::Cancel(request_ids) => {
397                for cancellation_listener in request_ids
398                    .into_iter()
399                    .filter_map(|request_id| self.cancellation_listener(request_id))
400                {
401                    cancellation_listener.cancel();
402                }
403            },
404            CoreResourceMsg::DeleteCookies(request) => {
405                http_state
406                    .cookie_jar
407                    .write()
408                    .unwrap()
409                    .clear_storage(&request);
410                return true;
411            },
412            CoreResourceMsg::DeleteCookie(request, name) => {
413                http_state
414                    .cookie_jar
415                    .write()
416                    .unwrap()
417                    .delete_cookie_with_name(&request, name);
418                return true;
419            },
420            CoreResourceMsg::DeleteCookieAsync(cookie_store_id, url, name) => {
421                http_state
422                    .cookie_jar
423                    .write()
424                    .unwrap()
425                    .delete_cookie_with_name(&url, name);
426                self.send_cookie_response(cookie_store_id, CookieData::Delete(Ok(())));
427            },
428            CoreResourceMsg::FetchRedirect(request_builder, res_init, sender) => {
429                let cancellation_listener =
430                    self.get_or_create_cancellation_listener(request_builder.id);
431                self.resource_manager.fetch(
432                    request_builder,
433                    Some(res_init),
434                    sender,
435                    http_state,
436                    cancellation_listener,
437                    protocols,
438                )
439            },
440            CoreResourceMsg::SetCookieForUrl(request, cookie, source) => self
441                .resource_manager
442                .set_cookie_for_url(&request, cookie.into_inner().to_owned(), source, http_state),
443            CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
444                for cookie in cookies {
445                    self.resource_manager.set_cookie_for_url(
446                        &request,
447                        cookie.into_inner(),
448                        source,
449                        http_state,
450                    );
451                }
452            },
453            CoreResourceMsg::SetCookieForUrlAsync(cookie_store_id, url, cookie, source) => {
454                self.resource_manager.set_cookie_for_url(
455                    &url,
456                    cookie.into_inner().to_owned(),
457                    source,
458                    http_state,
459                );
460                self.send_cookie_response(cookie_store_id, CookieData::Set(Ok(())));
461            },
462            CoreResourceMsg::GetCookiesForUrl(url, consumer, source) => {
463                let mut cookie_jar = http_state.cookie_jar.write().unwrap();
464                cookie_jar.remove_expired_cookies_for_url(&url);
465                consumer
466                    .send(cookie_jar.cookies_for_url(&url, source))
467                    .unwrap();
468            },
469            CoreResourceMsg::GetCookieDataForUrlAsync(cookie_store_id, url, name) => {
470                let mut cookie_jar = http_state.cookie_jar.write().unwrap();
471                cookie_jar.remove_expired_cookies_for_url(&url);
472                let cookie = cookie_jar
473                    .query_cookies(&url, name)
474                    .into_iter()
475                    .map(Serde)
476                    .next();
477                self.send_cookie_response(cookie_store_id, CookieData::Get(cookie));
478            },
479            CoreResourceMsg::GetAllCookieDataForUrlAsync(cookie_store_id, url, name) => {
480                let mut cookie_jar = http_state.cookie_jar.write().unwrap();
481                cookie_jar.remove_expired_cookies_for_url(&url);
482                let cookies = cookie_jar
483                    .query_cookies(&url, name)
484                    .into_iter()
485                    .map(Serde)
486                    .collect();
487                self.send_cookie_response(cookie_store_id, CookieData::GetAll(cookies));
488            },
489            CoreResourceMsg::NewCookieListener(cookie_store_id, sender, _url) => {
490                // TODO: Use the URL for setting up the actual monitoring
491                self.cookie_listeners.insert(cookie_store_id, sender);
492            },
493            CoreResourceMsg::RemoveCookieListener(cookie_store_id) => {
494                self.cookie_listeners.remove(&cookie_store_id);
495            },
496            CoreResourceMsg::NetworkMediator(mediator_chan, origin) => {
497                self.resource_manager
498                    .sw_managers
499                    .insert(origin, mediator_chan);
500            },
501            CoreResourceMsg::GetCookiesDataForUrl(url, consumer, source) => {
502                let mut cookie_jar = http_state.cookie_jar.write().unwrap();
503                cookie_jar.remove_expired_cookies_for_url(&url);
504                let cookies = cookie_jar
505                    .cookies_data_for_url(&url, source)
506                    .map(Serde)
507                    .collect();
508                consumer.send(cookies).unwrap();
509            },
510            CoreResourceMsg::GetHistoryState(history_state_id, consumer) => {
511                let history_states = http_state.history_states.read().unwrap();
512                consumer
513                    .send(history_states.get(&history_state_id).cloned())
514                    .unwrap();
515            },
516            CoreResourceMsg::SetHistoryState(history_state_id, structured_data) => {
517                let mut history_states = http_state.history_states.write().unwrap();
518                history_states.insert(history_state_id, structured_data);
519            },
520            CoreResourceMsg::RemoveHistoryStates(states_to_remove) => {
521                let mut history_states = http_state.history_states.write().unwrap();
522                for history_state in states_to_remove {
523                    history_states.remove(&history_state);
524                }
525            },
526            CoreResourceMsg::ClearCache => {
527                http_state.http_cache.write().unwrap().clear();
528            },
529            CoreResourceMsg::ToFileManager(msg) => self.resource_manager.filemanager.handle(msg),
530            CoreResourceMsg::Exit(sender) => {
531                if let Some(ref config_dir) = self.config_dir {
532                    match http_state.auth_cache.read() {
533                        Ok(auth_cache) => {
534                            write_json_to_file(&*auth_cache, config_dir, "auth_cache.json")
535                        },
536                        Err(_) => warn!("Error writing auth cache to disk"),
537                    }
538                    match http_state.cookie_jar.read() {
539                        Ok(jar) => write_json_to_file(&*jar, config_dir, "cookie_jar.json"),
540                        Err(_) => warn!("Error writing cookie jar to disk"),
541                    }
542                    match http_state.hsts_list.read() {
543                        Ok(hsts) => write_json_to_file(&*hsts, config_dir, "hsts_list.json"),
544                        Err(_) => warn!("Error writing hsts list to disk"),
545                    }
546                }
547                self.resource_manager.exit();
548                let _ = sender.send(());
549                return false;
550            },
551        }
552        true
553    }
554}
555
556pub fn read_json_from_file<T>(data: &mut T, config_dir: &Path, filename: &str)
557where
558    T: for<'de> Deserialize<'de>,
559{
560    let path = config_dir.join(filename);
561    let display = path.display();
562
563    let mut file = match File::open(&path) {
564        Err(why) => {
565            warn!("couldn't open {}: {}", display, why);
566            return;
567        },
568        Ok(file) => file,
569    };
570
571    let mut string_buffer: String = String::new();
572    match file.read_to_string(&mut string_buffer) {
573        Err(why) => panic!("couldn't read from {}: {}", display, why),
574        Ok(_) => trace!("successfully read from {}", display),
575    }
576
577    match serde_json::from_str(&string_buffer) {
578        Ok(decoded_buffer) => *data = decoded_buffer,
579        Err(why) => warn!("Could not decode buffer{}", why),
580    }
581}
582
583pub fn write_json_to_file<T>(data: &T, config_dir: &Path, filename: &str)
584where
585    T: Serialize,
586{
587    let path = config_dir.join(filename);
588    let display = path.display();
589
590    let mut file = match File::create(&path) {
591        Err(why) => panic!("couldn't create {}: {}", display, why),
592        Ok(file) => file,
593    };
594    let mut writer = BufWriter::new(&mut file);
595    serde_json::to_writer_pretty(&mut writer, data).expect("Could not serialize to file");
596    trace!("successfully wrote to {display}");
597}
598
599#[derive(Clone, Debug, Deserialize, Serialize)]
600pub struct AuthCacheEntry {
601    pub user_name: String,
602    pub password: String,
603}
604
605impl Default for AuthCache {
606    fn default() -> Self {
607        Self {
608            version: 1,
609            entries: HashMap::new(),
610        }
611    }
612}
613
614#[derive(Clone, Debug, Deserialize, Serialize)]
615pub struct AuthCache {
616    pub version: u32,
617    pub entries: HashMap<String, AuthCacheEntry>,
618}
619
620pub struct CoreResourceManager {
621    devtools_sender: Option<Sender<DevtoolsControlMsg>>,
622    sw_managers: HashMap<ImmutableOrigin, IpcSender<CustomResponseMediator>>,
623    filemanager: FileManager,
624    request_interceptor: RequestInterceptor,
625    thread_pool: Arc<CoreResourceThreadPool>,
626    ca_certificates: CACertificates,
627    ignore_certificate_errors: bool,
628}
629
630/// The state of the thread-pool used by CoreResource.
631struct ThreadPoolState {
632    /// The number of active workers.
633    active_workers: u32,
634    /// Whether the pool can spawn additional work.
635    active: bool,
636}
637
638impl ThreadPoolState {
639    pub fn new() -> ThreadPoolState {
640        ThreadPoolState {
641            active_workers: 0,
642            active: true,
643        }
644    }
645
646    /// Is the pool still able to spawn new work?
647    pub fn is_active(&self) -> bool {
648        self.active
649    }
650
651    /// How many workers are currently active?
652    pub fn active_workers(&self) -> u32 {
653        self.active_workers
654    }
655
656    /// Prevent additional work from being spawned.
657    pub fn switch_to_inactive(&mut self) {
658        self.active = false;
659    }
660
661    /// Add to the count of active workers.
662    pub fn increment_active(&mut self) {
663        self.active_workers += 1;
664    }
665
666    /// Substract from the count of active workers.
667    pub fn decrement_active(&mut self) {
668        self.active_workers -= 1;
669    }
670}
671
672/// Threadpool used by Fetch and file operations.
673pub struct CoreResourceThreadPool {
674    pool: rayon::ThreadPool,
675    state: Arc<Mutex<ThreadPoolState>>,
676}
677
678impl CoreResourceThreadPool {
679    pub fn new(num_threads: usize, pool_name: String) -> CoreResourceThreadPool {
680        debug!("Creating new CoreResourceThreadPool with {num_threads} threads!");
681        let pool = rayon::ThreadPoolBuilder::new()
682            .thread_name(move |i| format!("{pool_name}#{i}"))
683            .num_threads(num_threads)
684            .build()
685            .unwrap();
686        let state = Arc::new(Mutex::new(ThreadPoolState::new()));
687        CoreResourceThreadPool { pool, state }
688    }
689
690    /// Spawn work on the thread-pool, if still active.
691    ///
692    /// There is no need to give feedback to the caller,
693    /// because if we do not perform work,
694    /// it is because the system as a whole is exiting.
695    pub fn spawn<OP>(&self, work: OP)
696    where
697        OP: FnOnce() + Send + 'static,
698    {
699        {
700            let mut state = self.state.lock().unwrap();
701            if state.is_active() {
702                state.increment_active();
703            } else {
704                // Don't spawn any work.
705                return;
706            }
707        }
708
709        let state = self.state.clone();
710
711        self.pool.spawn(move || {
712            {
713                let mut state = state.lock().unwrap();
714                if !state.is_active() {
715                    // Decrement number of active workers and return,
716                    // without doing any work.
717                    return state.decrement_active();
718                }
719            }
720            // Perform work.
721            work();
722            {
723                // Decrement number of active workers.
724                let mut state = state.lock().unwrap();
725                state.decrement_active();
726            }
727        });
728    }
729
730    /// Prevent further work from being spawned,
731    /// and wait until all workers are done,
732    /// or a timeout of roughly one second has been reached.
733    pub fn exit(&self) {
734        {
735            let mut state = self.state.lock().unwrap();
736            state.switch_to_inactive();
737        }
738        let mut rounds = 0;
739        loop {
740            rounds += 1;
741            {
742                let state = self.state.lock().unwrap();
743                let still_active = state.active_workers();
744
745                if still_active == 0 || rounds == 10 {
746                    if still_active > 0 {
747                        debug!(
748                            "Exiting CoreResourceThreadPool with {:?} still working(should be zero)",
749                            still_active
750                        );
751                    }
752                    break;
753                }
754            }
755            thread::sleep(Duration::from_millis(100));
756        }
757    }
758}
759
760impl CoreResourceManager {
761    pub fn new(
762        devtools_sender: Option<Sender<DevtoolsControlMsg>>,
763        _profiler_chan: ProfilerChan,
764        embedder_proxy: EmbedderProxy,
765        ca_certificates: CACertificates,
766        ignore_certificate_errors: bool,
767    ) -> CoreResourceManager {
768        let num_threads = thread::available_parallelism()
769            .map(|i| i.get())
770            .unwrap_or(servo_config::pref!(threadpools_fallback_worker_num) as usize)
771            .min(servo_config::pref!(threadpools_resource_workers_max).max(1) as usize);
772        let pool = CoreResourceThreadPool::new(num_threads, "CoreResourceThreadPool".to_string());
773        let pool_handle = Arc::new(pool);
774        CoreResourceManager {
775            devtools_sender,
776            sw_managers: Default::default(),
777            filemanager: FileManager::new(embedder_proxy.clone(), Arc::downgrade(&pool_handle)),
778            request_interceptor: RequestInterceptor::new(embedder_proxy),
779            thread_pool: pool_handle,
780            ca_certificates,
781            ignore_certificate_errors,
782        }
783    }
784
785    /// Exit the core resource manager.
786    pub fn exit(&mut self) {
787        // Prevents further work from being spawned on the pool,
788        // blocks until all workers in the pool are done,
789        // or a short timeout has been reached.
790        self.thread_pool.exit();
791
792        debug!("Exited CoreResourceManager");
793    }
794
795    fn set_cookie_for_url(
796        &mut self,
797        request: &ServoUrl,
798        cookie: Cookie<'static>,
799        source: CookieSource,
800        http_state: &Arc<HttpState>,
801    ) {
802        if let Some(cookie) = ServoCookie::new_wrapped(cookie, request, source) {
803            let mut cookie_jar = http_state.cookie_jar.write().unwrap();
804            cookie_jar.push(cookie, request, source)
805        }
806    }
807
808    fn fetch<Target: 'static + FetchTaskTarget + Send>(
809        &self,
810        request_builder: RequestBuilder,
811        res_init_: Option<ResponseInit>,
812        mut sender: Target,
813        http_state: &Arc<HttpState>,
814        cancellation_listener: Arc<CancellationListener>,
815        protocols: Arc<ProtocolRegistry>,
816    ) {
817        let http_state = http_state.clone();
818        let dc = self.devtools_sender.clone();
819        let filemanager = self.filemanager.clone();
820        let request_interceptor = self.request_interceptor.clone();
821
822        let timing_type = match request_builder.destination {
823            Destination::Document => ResourceTimingType::Navigation,
824            _ => ResourceTimingType::Resource,
825        };
826
827        let request = request_builder.build();
828        let url = request.current_url();
829
830        // In the case of a valid blob URL, acquiring a token granting access to a file,
831        // regardless if the URL is revoked after token acquisition.
832        //
833        // TODO: to make more tests pass, acquire this token earlier,
834        // probably in a separate message flow.
835        //
836        // In such a setup, the token would not be acquired here,
837        // but could instead be contained in the actual CoreResourceMsg::Fetch message.
838        //
839        // See https://github.com/servo/servo/issues/25226
840        let (file_token, blob_url_file_id) = match url.scheme() {
841            "blob" => {
842                if let Ok((id, _)) = parse_blob_url(&url) {
843                    (self.filemanager.get_token_for_file(&id), Some(id))
844                } else {
845                    (FileTokenCheck::ShouldFail, None)
846                }
847            },
848            _ => (FileTokenCheck::NotRequired, None),
849        };
850
851        spawn_task(async move {
852            // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed)
853            // todo load context / mimesniff in fetch
854            // todo referrer policy?
855            // todo service worker stuff
856            let context = FetchContext {
857                state: http_state,
858                user_agent: servo_config::pref!(user_agent),
859                devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
860                filemanager: Arc::new(Mutex::new(filemanager)),
861                file_token,
862                request_interceptor: Arc::new(Mutex::new(request_interceptor)),
863                cancellation_listener,
864                timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))),
865                protocols,
866            };
867
868            match res_init_ {
869                Some(res_init) => {
870                    let response = Response::from_init(res_init, timing_type);
871
872                    let mut fetch_params = FetchParams::new(request);
873                    http_redirect_fetch(
874                        &mut fetch_params,
875                        &mut CorsCache::default(),
876                        response,
877                        true,
878                        &mut sender,
879                        &mut None,
880                        &context,
881                    )
882                    .await;
883                },
884                None => {
885                    fetch(request, &mut sender, &context).await;
886                },
887            };
888
889            // Remove token after fetch.
890            if let Some(id) = blob_url_file_id.as_ref() {
891                context
892                    .filemanager
893                    .lock()
894                    .unwrap()
895                    .invalidate_token(&context.file_token, id);
896            }
897        });
898    }
899
900    fn websocket_connect(
901        &self,
902        request: RequestBuilder,
903        event_sender: IpcSender<WebSocketNetworkEvent>,
904        action_receiver: IpcReceiver<WebSocketDomAction>,
905        http_state: &Arc<HttpState>,
906    ) {
907        websocket_loader::init(
908            request,
909            event_sender,
910            action_receiver,
911            http_state.clone(),
912            self.ca_certificates.clone(),
913            self.ignore_certificate_errors,
914        );
915    }
916}