1use std::borrow::ToOwned;
8use std::collections::HashMap;
9use std::fs::File;
10use std::io::prelude::*;
11use std::io::{self, BufReader, BufWriter};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use std::thread;
15use std::time::Duration;
16
17use base::generic_channel::GenericSender;
18use base::id::CookieStoreId;
19use cookie::Cookie;
20use crossbeam_channel::Sender;
21use devtools_traits::DevtoolsControlMsg;
22use embedder_traits::EmbedderProxy;
23use hyper_serde::Serde;
24use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
25use log::{debug, trace, warn};
26use net_traits::blob_url_store::parse_blob_url;
27use net_traits::filemanager_thread::FileTokenCheck;
28use net_traits::indexeddb_thread::IndexedDBThreadMsg;
29use net_traits::pub_domains::public_suffix_list_size_of;
30use net_traits::request::{Destination, RequestBuilder, RequestId};
31use net_traits::response::{Response, ResponseInit};
32use net_traits::storage_thread::StorageThreadMsg;
33use net_traits::{
34 AsyncRuntime, CookieAsyncResponse, CookieData, CookieSource, CoreResourceMsg,
35 CoreResourceThread, CustomResponseMediator, DiscardFetch, FetchChannels, FetchTaskTarget,
36 ResourceFetchTiming, ResourceThreads, ResourceTimingType, WebSocketDomAction,
37 WebSocketNetworkEvent,
38};
39use profile_traits::mem::{
40 ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, ReportsChan,
41 perform_memory_report,
42};
43use profile_traits::path;
44use profile_traits::time::ProfilerChan;
45use rustls::RootCertStore;
46use serde::{Deserialize, Serialize};
47use servo_arc::Arc as ServoArc;
48use servo_url::{ImmutableOrigin, ServoUrl};
49
50use crate::async_runtime::{init_async_runtime, spawn_task};
51use crate::connector::{
52 CACertificates, CertificateErrorOverrideManager, create_http_client, create_tls_config,
53};
54use crate::cookie::ServoCookie;
55use crate::cookie_storage::CookieStorage;
56use crate::fetch::cors_cache::CorsCache;
57use crate::fetch::fetch_params::FetchParams;
58use crate::fetch::methods::{CancellationListener, FetchContext, fetch};
59use crate::filemanager_thread::FileManager;
60use crate::hsts::{self, HstsList};
61use crate::http_cache::HttpCache;
62use crate::http_loader::{HttpState, http_redirect_fetch};
63use crate::indexeddb::idb_thread::IndexedDBThreadFactory;
64use crate::protocols::ProtocolRegistry;
65use crate::request_interceptor::RequestInterceptor;
66use crate::storage_thread::StorageThreadFactory;
67use crate::websocket_loader;
68
69fn load_root_cert_store_from_file(file_path: String) -> io::Result<RootCertStore> {
71 let mut root_cert_store = RootCertStore::empty();
72
73 let mut pem = BufReader::new(File::open(file_path)?);
74 let certs: Result<Vec<_>, _> = rustls_pemfile::certs(&mut pem).collect();
75 root_cert_store.add_parsable_certificates(certs?);
76 Ok(root_cert_store)
77}
78
79#[allow(clippy::too_many_arguments)]
81pub fn new_resource_threads(
82 devtools_sender: Option<Sender<DevtoolsControlMsg>>,
83 time_profiler_chan: ProfilerChan,
84 mem_profiler_chan: MemProfilerChan,
85 embedder_proxy: EmbedderProxy,
86 config_dir: Option<PathBuf>,
87 certificate_path: Option<String>,
88 ignore_certificate_errors: bool,
89 protocols: Arc<ProtocolRegistry>,
90) -> (ResourceThreads, ResourceThreads, Box<dyn AsyncRuntime>) {
91 let async_runtime = init_async_runtime();
93
94 let ca_certificates = match certificate_path {
95 Some(path) => match load_root_cert_store_from_file(path) {
96 Ok(root_cert_store) => CACertificates::Override(root_cert_store),
97 Err(error) => {
98 warn!("Could not load CA file. Falling back to defaults. {error:?}");
99 CACertificates::Default
100 },
101 },
102 None => CACertificates::Default,
103 };
104
105 let (public_core, private_core) = new_core_resource_thread(
106 devtools_sender,
107 time_profiler_chan,
108 mem_profiler_chan.clone(),
109 embedder_proxy,
110 config_dir.clone(),
111 ca_certificates,
112 ignore_certificate_errors,
113 protocols,
114 );
115 let idb: IpcSender<IndexedDBThreadMsg> = IndexedDBThreadFactory::new(config_dir.clone());
116 let storage: GenericSender<StorageThreadMsg> =
117 StorageThreadFactory::new(config_dir, mem_profiler_chan);
118 (
119 ResourceThreads::new(public_core, storage.clone(), idb.clone()),
120 ResourceThreads::new(private_core, storage, idb),
121 async_runtime,
122 )
123}
124
125#[allow(clippy::too_many_arguments)]
127pub fn new_core_resource_thread(
128 devtools_sender: Option<Sender<DevtoolsControlMsg>>,
129 time_profiler_chan: ProfilerChan,
130 mem_profiler_chan: MemProfilerChan,
131 embedder_proxy: EmbedderProxy,
132 config_dir: Option<PathBuf>,
133 ca_certificates: CACertificates,
134 ignore_certificate_errors: bool,
135 protocols: Arc<ProtocolRegistry>,
136) -> (CoreResourceThread, CoreResourceThread) {
137 let (public_setup_chan, public_setup_port) = ipc::channel().unwrap();
138 let (private_setup_chan, private_setup_port) = ipc::channel().unwrap();
139 let (report_chan, report_port) = ipc::channel().unwrap();
140
141 thread::Builder::new()
142 .name("ResourceManager".to_owned())
143 .spawn(move || {
144 let resource_manager = CoreResourceManager::new(
145 devtools_sender,
146 time_profiler_chan,
147 embedder_proxy.clone(),
148 ca_certificates.clone(),
149 ignore_certificate_errors,
150 );
151
152 let mut channel_manager = ResourceChannelManager {
153 resource_manager,
154 config_dir,
155 ca_certificates,
156 ignore_certificate_errors,
157 cancellation_listeners: Default::default(),
158 cookie_listeners: Default::default(),
159 };
160
161 mem_profiler_chan.run_with_memory_reporting(
162 || {
163 channel_manager.start(
164 public_setup_port,
165 private_setup_port,
166 report_port,
167 protocols,
168 embedder_proxy,
169 )
170 },
171 String::from("network-cache-reporter"),
172 report_chan,
173 |report_chan| report_chan,
174 );
175 })
176 .expect("Thread spawning failed");
177 (public_setup_chan, private_setup_chan)
178}
179
180struct ResourceChannelManager {
181 resource_manager: CoreResourceManager,
182 config_dir: Option<PathBuf>,
183 ca_certificates: CACertificates,
184 ignore_certificate_errors: bool,
185 cancellation_listeners: HashMap<RequestId, Weak<CancellationListener>>,
186 cookie_listeners: HashMap<CookieStoreId, IpcSender<CookieAsyncResponse>>,
187}
188
189fn create_http_states(
190 config_dir: Option<&Path>,
191 ca_certificates: CACertificates,
192 ignore_certificate_errors: bool,
193 embedder_proxy: EmbedderProxy,
194) -> (Arc<HttpState>, Arc<HttpState>) {
195 let mut hsts_list = HstsList::default();
196 let mut auth_cache = AuthCache::default();
197 let http_cache = HttpCache::default();
198 let mut cookie_jar = CookieStorage::new(150);
199 if let Some(config_dir) = config_dir {
200 read_json_from_file(&mut auth_cache, config_dir, "auth_cache.json");
201 read_json_from_file(&mut hsts_list, config_dir, "hsts_list.json");
202 read_json_from_file(&mut cookie_jar, config_dir, "cookie_jar.json");
203 }
204
205 let override_manager = CertificateErrorOverrideManager::new();
206 let http_state = HttpState {
207 hsts_list: RwLock::new(hsts_list),
208 cookie_jar: RwLock::new(cookie_jar),
209 auth_cache: RwLock::new(auth_cache),
210 history_states: RwLock::new(HashMap::new()),
211 http_cache: RwLock::new(http_cache),
212 http_cache_state: Mutex::new(HashMap::new()),
213 client: create_http_client(create_tls_config(
214 ca_certificates.clone(),
215 ignore_certificate_errors,
216 override_manager.clone(),
217 )),
218 override_manager,
219 embedder_proxy: Mutex::new(embedder_proxy.clone()),
220 };
221
222 let override_manager = CertificateErrorOverrideManager::new();
223 let private_http_state = HttpState {
224 hsts_list: RwLock::new(HstsList::default()),
225 cookie_jar: RwLock::new(CookieStorage::new(150)),
226 auth_cache: RwLock::new(AuthCache::default()),
227 history_states: RwLock::new(HashMap::new()),
228 http_cache: RwLock::new(HttpCache::default()),
229 http_cache_state: Mutex::new(HashMap::new()),
230 client: create_http_client(create_tls_config(
231 ca_certificates,
232 ignore_certificate_errors,
233 override_manager.clone(),
234 )),
235 override_manager,
236 embedder_proxy: Mutex::new(embedder_proxy),
237 };
238
239 (Arc::new(http_state), Arc::new(private_http_state))
240}
241
242impl ResourceChannelManager {
243 #[allow(unsafe_code)]
244 fn start(
245 &mut self,
246 public_receiver: IpcReceiver<CoreResourceMsg>,
247 private_receiver: IpcReceiver<CoreResourceMsg>,
248 memory_reporter: IpcReceiver<ReportsChan>,
249 protocols: Arc<ProtocolRegistry>,
250 embedder_proxy: EmbedderProxy,
251 ) {
252 let (public_http_state, private_http_state) = create_http_states(
253 self.config_dir.as_deref(),
254 self.ca_certificates.clone(),
255 self.ignore_certificate_errors,
256 embedder_proxy,
257 );
258
259 let mut rx_set = IpcReceiverSet::new().unwrap();
260 let private_id = rx_set.add(private_receiver).unwrap();
261 let public_id = rx_set.add(public_receiver).unwrap();
262 let reporter_id = rx_set.add(memory_reporter).unwrap();
263
264 loop {
265 for receiver in rx_set.select().unwrap().into_iter() {
266 if let ipc::IpcSelectionResult::ChannelClosed(..) = receiver {
268 continue;
269 }
270 let (id, data) = receiver.unwrap();
271 if id == reporter_id {
273 if let Ok(msg) = data.to() {
274 self.process_report(msg, &public_http_state, &private_http_state);
275 continue;
276 }
277 } else {
278 let group = if id == private_id {
279 &private_http_state
280 } else {
281 assert_eq!(id, public_id);
282 &public_http_state
283 };
284 if let Ok(msg) = data.to() {
285 if !self.process_msg(msg, group, Arc::clone(&protocols)) {
286 return;
287 }
288 }
289 }
290 }
291 }
292 }
293
294 fn process_report(
295 &mut self,
296 msg: ReportsChan,
297 public_http_state: &Arc<HttpState>,
298 private_http_state: &Arc<HttpState>,
299 ) {
300 perform_memory_report(|ops| {
301 let mut reports = public_http_state.memory_reports("public", ops);
302 reports.extend(private_http_state.memory_reports("private", ops));
303 reports.extend(vec![
304 Report {
305 path: path!["hsts-preload-list"],
306 kind: ReportKind::ExplicitJemallocHeapSize,
307 size: hsts::hsts_preload_size_of(ops),
308 },
309 Report {
310 path: path!["public-suffix-list"],
311 kind: ReportKind::ExplicitJemallocHeapSize,
312 size: public_suffix_list_size_of(ops),
313 },
314 ]);
315 msg.send(ProcessReports::new(reports));
316 })
317 }
318
319 fn cancellation_listener(&self, request_id: RequestId) -> Option<Arc<CancellationListener>> {
320 self.cancellation_listeners
321 .get(&request_id)
322 .and_then(Weak::upgrade)
323 }
324
325 fn get_or_create_cancellation_listener(
326 &mut self,
327 request_id: RequestId,
328 ) -> Arc<CancellationListener> {
329 if let Some(listener) = self.cancellation_listener(request_id) {
330 return listener;
331 }
332
333 self.cancellation_listeners
335 .retain(|_, listener| listener.strong_count() > 0);
336
337 let cancellation_listener = Arc::new(Default::default());
338 self.cancellation_listeners
339 .insert(request_id, Arc::downgrade(&cancellation_listener));
340 cancellation_listener
341 }
342
343 fn send_cookie_response(&self, store_id: CookieStoreId, data: CookieData) {
344 let Some(sender) = self.cookie_listeners.get(&store_id) else {
345 warn!(
346 "Async cookie request made for store id that is non-existent {:?}",
347 store_id
348 );
349 return;
350 };
351 let res = sender.send(CookieAsyncResponse { data });
352 if res.is_err() {
353 warn!("Unable to send cookie response to script thread");
354 }
355 }
356
357 fn process_msg(
359 &mut self,
360 msg: CoreResourceMsg,
361 http_state: &Arc<HttpState>,
362 protocols: Arc<ProtocolRegistry>,
363 ) -> bool {
364 match msg {
365 CoreResourceMsg::Fetch(request_builder, channels) => match channels {
366 FetchChannels::ResponseMsg(sender) => {
367 let cancellation_listener =
368 self.get_or_create_cancellation_listener(request_builder.id);
369 self.resource_manager.fetch(
370 request_builder,
371 None,
372 sender,
373 http_state,
374 cancellation_listener,
375 protocols,
376 );
377 },
378 FetchChannels::WebSocket {
379 event_sender,
380 action_receiver,
381 } => self.resource_manager.websocket_connect(
382 request_builder,
383 event_sender,
384 action_receiver,
385 http_state,
386 ),
387 FetchChannels::Prefetch => self.resource_manager.fetch(
388 request_builder,
389 None,
390 DiscardFetch,
391 http_state,
392 Arc::new(Default::default()),
393 protocols,
394 ),
395 },
396 CoreResourceMsg::Cancel(request_ids) => {
397 for cancellation_listener in request_ids
398 .into_iter()
399 .filter_map(|request_id| self.cancellation_listener(request_id))
400 {
401 cancellation_listener.cancel();
402 }
403 },
404 CoreResourceMsg::DeleteCookies(request) => {
405 http_state
406 .cookie_jar
407 .write()
408 .unwrap()
409 .clear_storage(&request);
410 return true;
411 },
412 CoreResourceMsg::DeleteCookie(request, name) => {
413 http_state
414 .cookie_jar
415 .write()
416 .unwrap()
417 .delete_cookie_with_name(&request, name);
418 return true;
419 },
420 CoreResourceMsg::DeleteCookieAsync(cookie_store_id, url, name) => {
421 http_state
422 .cookie_jar
423 .write()
424 .unwrap()
425 .delete_cookie_with_name(&url, name);
426 self.send_cookie_response(cookie_store_id, CookieData::Delete(Ok(())));
427 },
428 CoreResourceMsg::FetchRedirect(request_builder, res_init, sender) => {
429 let cancellation_listener =
430 self.get_or_create_cancellation_listener(request_builder.id);
431 self.resource_manager.fetch(
432 request_builder,
433 Some(res_init),
434 sender,
435 http_state,
436 cancellation_listener,
437 protocols,
438 )
439 },
440 CoreResourceMsg::SetCookieForUrl(request, cookie, source) => self
441 .resource_manager
442 .set_cookie_for_url(&request, cookie.into_inner().to_owned(), source, http_state),
443 CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
444 for cookie in cookies {
445 self.resource_manager.set_cookie_for_url(
446 &request,
447 cookie.into_inner(),
448 source,
449 http_state,
450 );
451 }
452 },
453 CoreResourceMsg::SetCookieForUrlAsync(cookie_store_id, url, cookie, source) => {
454 self.resource_manager.set_cookie_for_url(
455 &url,
456 cookie.into_inner().to_owned(),
457 source,
458 http_state,
459 );
460 self.send_cookie_response(cookie_store_id, CookieData::Set(Ok(())));
461 },
462 CoreResourceMsg::GetCookiesForUrl(url, consumer, source) => {
463 let mut cookie_jar = http_state.cookie_jar.write().unwrap();
464 cookie_jar.remove_expired_cookies_for_url(&url);
465 consumer
466 .send(cookie_jar.cookies_for_url(&url, source))
467 .unwrap();
468 },
469 CoreResourceMsg::GetCookieDataForUrlAsync(cookie_store_id, url, name) => {
470 let mut cookie_jar = http_state.cookie_jar.write().unwrap();
471 cookie_jar.remove_expired_cookies_for_url(&url);
472 let cookie = cookie_jar
473 .query_cookies(&url, name)
474 .into_iter()
475 .map(Serde)
476 .next();
477 self.send_cookie_response(cookie_store_id, CookieData::Get(cookie));
478 },
479 CoreResourceMsg::GetAllCookieDataForUrlAsync(cookie_store_id, url, name) => {
480 let mut cookie_jar = http_state.cookie_jar.write().unwrap();
481 cookie_jar.remove_expired_cookies_for_url(&url);
482 let cookies = cookie_jar
483 .query_cookies(&url, name)
484 .into_iter()
485 .map(Serde)
486 .collect();
487 self.send_cookie_response(cookie_store_id, CookieData::GetAll(cookies));
488 },
489 CoreResourceMsg::NewCookieListener(cookie_store_id, sender, _url) => {
490 self.cookie_listeners.insert(cookie_store_id, sender);
492 },
493 CoreResourceMsg::RemoveCookieListener(cookie_store_id) => {
494 self.cookie_listeners.remove(&cookie_store_id);
495 },
496 CoreResourceMsg::NetworkMediator(mediator_chan, origin) => {
497 self.resource_manager
498 .sw_managers
499 .insert(origin, mediator_chan);
500 },
501 CoreResourceMsg::GetCookiesDataForUrl(url, consumer, source) => {
502 let mut cookie_jar = http_state.cookie_jar.write().unwrap();
503 cookie_jar.remove_expired_cookies_for_url(&url);
504 let cookies = cookie_jar
505 .cookies_data_for_url(&url, source)
506 .map(Serde)
507 .collect();
508 consumer.send(cookies).unwrap();
509 },
510 CoreResourceMsg::GetHistoryState(history_state_id, consumer) => {
511 let history_states = http_state.history_states.read().unwrap();
512 consumer
513 .send(history_states.get(&history_state_id).cloned())
514 .unwrap();
515 },
516 CoreResourceMsg::SetHistoryState(history_state_id, structured_data) => {
517 let mut history_states = http_state.history_states.write().unwrap();
518 history_states.insert(history_state_id, structured_data);
519 },
520 CoreResourceMsg::RemoveHistoryStates(states_to_remove) => {
521 let mut history_states = http_state.history_states.write().unwrap();
522 for history_state in states_to_remove {
523 history_states.remove(&history_state);
524 }
525 },
526 CoreResourceMsg::ClearCache => {
527 http_state.http_cache.write().unwrap().clear();
528 },
529 CoreResourceMsg::ToFileManager(msg) => self.resource_manager.filemanager.handle(msg),
530 CoreResourceMsg::Exit(sender) => {
531 if let Some(ref config_dir) = self.config_dir {
532 match http_state.auth_cache.read() {
533 Ok(auth_cache) => {
534 write_json_to_file(&*auth_cache, config_dir, "auth_cache.json")
535 },
536 Err(_) => warn!("Error writing auth cache to disk"),
537 }
538 match http_state.cookie_jar.read() {
539 Ok(jar) => write_json_to_file(&*jar, config_dir, "cookie_jar.json"),
540 Err(_) => warn!("Error writing cookie jar to disk"),
541 }
542 match http_state.hsts_list.read() {
543 Ok(hsts) => write_json_to_file(&*hsts, config_dir, "hsts_list.json"),
544 Err(_) => warn!("Error writing hsts list to disk"),
545 }
546 }
547 self.resource_manager.exit();
548 let _ = sender.send(());
549 return false;
550 },
551 }
552 true
553 }
554}
555
556pub fn read_json_from_file<T>(data: &mut T, config_dir: &Path, filename: &str)
557where
558 T: for<'de> Deserialize<'de>,
559{
560 let path = config_dir.join(filename);
561 let display = path.display();
562
563 let mut file = match File::open(&path) {
564 Err(why) => {
565 warn!("couldn't open {}: {}", display, why);
566 return;
567 },
568 Ok(file) => file,
569 };
570
571 let mut string_buffer: String = String::new();
572 match file.read_to_string(&mut string_buffer) {
573 Err(why) => panic!("couldn't read from {}: {}", display, why),
574 Ok(_) => trace!("successfully read from {}", display),
575 }
576
577 match serde_json::from_str(&string_buffer) {
578 Ok(decoded_buffer) => *data = decoded_buffer,
579 Err(why) => warn!("Could not decode buffer{}", why),
580 }
581}
582
583pub fn write_json_to_file<T>(data: &T, config_dir: &Path, filename: &str)
584where
585 T: Serialize,
586{
587 let path = config_dir.join(filename);
588 let display = path.display();
589
590 let mut file = match File::create(&path) {
591 Err(why) => panic!("couldn't create {}: {}", display, why),
592 Ok(file) => file,
593 };
594 let mut writer = BufWriter::new(&mut file);
595 serde_json::to_writer_pretty(&mut writer, data).expect("Could not serialize to file");
596 trace!("successfully wrote to {display}");
597}
598
599#[derive(Clone, Debug, Deserialize, Serialize)]
600pub struct AuthCacheEntry {
601 pub user_name: String,
602 pub password: String,
603}
604
605impl Default for AuthCache {
606 fn default() -> Self {
607 Self {
608 version: 1,
609 entries: HashMap::new(),
610 }
611 }
612}
613
614#[derive(Clone, Debug, Deserialize, Serialize)]
615pub struct AuthCache {
616 pub version: u32,
617 pub entries: HashMap<String, AuthCacheEntry>,
618}
619
620pub struct CoreResourceManager {
621 devtools_sender: Option<Sender<DevtoolsControlMsg>>,
622 sw_managers: HashMap<ImmutableOrigin, IpcSender<CustomResponseMediator>>,
623 filemanager: FileManager,
624 request_interceptor: RequestInterceptor,
625 thread_pool: Arc<CoreResourceThreadPool>,
626 ca_certificates: CACertificates,
627 ignore_certificate_errors: bool,
628}
629
630struct ThreadPoolState {
632 active_workers: u32,
634 active: bool,
636}
637
638impl ThreadPoolState {
639 pub fn new() -> ThreadPoolState {
640 ThreadPoolState {
641 active_workers: 0,
642 active: true,
643 }
644 }
645
646 pub fn is_active(&self) -> bool {
648 self.active
649 }
650
651 pub fn active_workers(&self) -> u32 {
653 self.active_workers
654 }
655
656 pub fn switch_to_inactive(&mut self) {
658 self.active = false;
659 }
660
661 pub fn increment_active(&mut self) {
663 self.active_workers += 1;
664 }
665
666 pub fn decrement_active(&mut self) {
668 self.active_workers -= 1;
669 }
670}
671
672pub struct CoreResourceThreadPool {
674 pool: rayon::ThreadPool,
675 state: Arc<Mutex<ThreadPoolState>>,
676}
677
678impl CoreResourceThreadPool {
679 pub fn new(num_threads: usize, pool_name: String) -> CoreResourceThreadPool {
680 debug!("Creating new CoreResourceThreadPool with {num_threads} threads!");
681 let pool = rayon::ThreadPoolBuilder::new()
682 .thread_name(move |i| format!("{pool_name}#{i}"))
683 .num_threads(num_threads)
684 .build()
685 .unwrap();
686 let state = Arc::new(Mutex::new(ThreadPoolState::new()));
687 CoreResourceThreadPool { pool, state }
688 }
689
690 pub fn spawn<OP>(&self, work: OP)
696 where
697 OP: FnOnce() + Send + 'static,
698 {
699 {
700 let mut state = self.state.lock().unwrap();
701 if state.is_active() {
702 state.increment_active();
703 } else {
704 return;
706 }
707 }
708
709 let state = self.state.clone();
710
711 self.pool.spawn(move || {
712 {
713 let mut state = state.lock().unwrap();
714 if !state.is_active() {
715 return state.decrement_active();
718 }
719 }
720 work();
722 {
723 let mut state = state.lock().unwrap();
725 state.decrement_active();
726 }
727 });
728 }
729
730 pub fn exit(&self) {
734 {
735 let mut state = self.state.lock().unwrap();
736 state.switch_to_inactive();
737 }
738 let mut rounds = 0;
739 loop {
740 rounds += 1;
741 {
742 let state = self.state.lock().unwrap();
743 let still_active = state.active_workers();
744
745 if still_active == 0 || rounds == 10 {
746 if still_active > 0 {
747 debug!(
748 "Exiting CoreResourceThreadPool with {:?} still working(should be zero)",
749 still_active
750 );
751 }
752 break;
753 }
754 }
755 thread::sleep(Duration::from_millis(100));
756 }
757 }
758}
759
760impl CoreResourceManager {
761 pub fn new(
762 devtools_sender: Option<Sender<DevtoolsControlMsg>>,
763 _profiler_chan: ProfilerChan,
764 embedder_proxy: EmbedderProxy,
765 ca_certificates: CACertificates,
766 ignore_certificate_errors: bool,
767 ) -> CoreResourceManager {
768 let num_threads = thread::available_parallelism()
769 .map(|i| i.get())
770 .unwrap_or(servo_config::pref!(threadpools_fallback_worker_num) as usize)
771 .min(servo_config::pref!(threadpools_resource_workers_max).max(1) as usize);
772 let pool = CoreResourceThreadPool::new(num_threads, "CoreResourceThreadPool".to_string());
773 let pool_handle = Arc::new(pool);
774 CoreResourceManager {
775 devtools_sender,
776 sw_managers: Default::default(),
777 filemanager: FileManager::new(embedder_proxy.clone(), Arc::downgrade(&pool_handle)),
778 request_interceptor: RequestInterceptor::new(embedder_proxy),
779 thread_pool: pool_handle,
780 ca_certificates,
781 ignore_certificate_errors,
782 }
783 }
784
785 pub fn exit(&mut self) {
787 self.thread_pool.exit();
791
792 debug!("Exited CoreResourceManager");
793 }
794
795 fn set_cookie_for_url(
796 &mut self,
797 request: &ServoUrl,
798 cookie: Cookie<'static>,
799 source: CookieSource,
800 http_state: &Arc<HttpState>,
801 ) {
802 if let Some(cookie) = ServoCookie::new_wrapped(cookie, request, source) {
803 let mut cookie_jar = http_state.cookie_jar.write().unwrap();
804 cookie_jar.push(cookie, request, source)
805 }
806 }
807
808 fn fetch<Target: 'static + FetchTaskTarget + Send>(
809 &self,
810 request_builder: RequestBuilder,
811 res_init_: Option<ResponseInit>,
812 mut sender: Target,
813 http_state: &Arc<HttpState>,
814 cancellation_listener: Arc<CancellationListener>,
815 protocols: Arc<ProtocolRegistry>,
816 ) {
817 let http_state = http_state.clone();
818 let dc = self.devtools_sender.clone();
819 let filemanager = self.filemanager.clone();
820 let request_interceptor = self.request_interceptor.clone();
821
822 let timing_type = match request_builder.destination {
823 Destination::Document => ResourceTimingType::Navigation,
824 _ => ResourceTimingType::Resource,
825 };
826
827 let request = request_builder.build();
828 let url = request.current_url();
829
830 let (file_token, blob_url_file_id) = match url.scheme() {
841 "blob" => {
842 if let Ok((id, _)) = parse_blob_url(&url) {
843 (self.filemanager.get_token_for_file(&id), Some(id))
844 } else {
845 (FileTokenCheck::ShouldFail, None)
846 }
847 },
848 _ => (FileTokenCheck::NotRequired, None),
849 };
850
851 spawn_task(async move {
852 let context = FetchContext {
857 state: http_state,
858 user_agent: servo_config::pref!(user_agent),
859 devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
860 filemanager: Arc::new(Mutex::new(filemanager)),
861 file_token,
862 request_interceptor: Arc::new(Mutex::new(request_interceptor)),
863 cancellation_listener,
864 timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))),
865 protocols,
866 };
867
868 match res_init_ {
869 Some(res_init) => {
870 let response = Response::from_init(res_init, timing_type);
871
872 let mut fetch_params = FetchParams::new(request);
873 http_redirect_fetch(
874 &mut fetch_params,
875 &mut CorsCache::default(),
876 response,
877 true,
878 &mut sender,
879 &mut None,
880 &context,
881 )
882 .await;
883 },
884 None => {
885 fetch(request, &mut sender, &context).await;
886 },
887 };
888
889 if let Some(id) = blob_url_file_id.as_ref() {
891 context
892 .filemanager
893 .lock()
894 .unwrap()
895 .invalidate_token(&context.file_token, id);
896 }
897 });
898 }
899
900 fn websocket_connect(
901 &self,
902 request: RequestBuilder,
903 event_sender: IpcSender<WebSocketNetworkEvent>,
904 action_receiver: IpcReceiver<WebSocketDomAction>,
905 http_state: &Arc<HttpState>,
906 ) {
907 websocket_loader::init(
908 request,
909 event_sender,
910 action_receiver,
911 http_state.clone(),
912 self.ca_certificates.clone(),
913 self.ignore_certificate_errors,
914 );
915 }
916}