Skip to main content

servo_base/
threadpool.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::{Arc, Mutex, OnceLock};
6use std::thread;
7use std::time::Duration;
8
9use log::debug;
10use servo_config::pref;
11
12/// The state of the thread-pool used by CoreResource.
13struct ThreadPoolState {
14    /// The number of active workers.
15    active_workers: u32,
16    /// Whether the pool can spawn additional work.
17    active: bool,
18}
19
20impl ThreadPoolState {
21    pub fn new() -> ThreadPoolState {
22        ThreadPoolState {
23            active_workers: 0,
24            active: true,
25        }
26    }
27
28    /// Is the pool still able to spawn new work?
29    pub fn is_active(&self) -> bool {
30        self.active
31    }
32
33    /// How many workers are currently active?
34    pub fn active_workers(&self) -> u32 {
35        self.active_workers
36    }
37
38    /// Prevent additional work from being spawned.
39    pub fn switch_to_inactive(&mut self) {
40        self.active = false;
41    }
42
43    /// Add to the count of active workers.
44    pub fn increment_active(&mut self) {
45        self.active_workers += 1;
46    }
47
48    /// Subtract from the count of active workers.
49    pub fn decrement_active(&mut self) {
50        self.active_workers -= 1;
51    }
52}
53
54/// The thread pools used throughout Servo, apart from those used by Layout and WebRender which
55/// are handled separately.
56pub struct ThreadPool {
57    pool: rayon::ThreadPool,
58    state: Arc<Mutex<ThreadPoolState>>,
59}
60
61static GLOBAL_THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
62
63impl ThreadPool {
64    /// Get the global thread pool for the process.
65    pub fn global() -> Arc<Self> {
66        let pool = GLOBAL_THREAD_POOL.get_or_init(|| {
67            let paralellism = thread::available_parallelism()
68                .map(|parallelism| parallelism.get())
69                .unwrap_or(pref!(thread_pool_fallback_workers) as usize)
70                .min(pref!(thread_pool_workers_max) as usize);
71            let pool = rayon::ThreadPoolBuilder::new()
72                .thread_name(move |i| format!("GlobalPool#{i}"))
73                .num_threads(paralellism)
74                .build()
75                .unwrap();
76            Arc::new(Self {
77                pool,
78                state: Arc::new(Mutex::new(ThreadPoolState::new())),
79            })
80        });
81        pool.clone()
82    }
83
84    /// Spawn work on the thread-pool, if still active.
85    ///
86    /// There is no need to give feedback to the caller,
87    /// because if we do not perform work,
88    /// it is because the system as a whole is exiting.
89    pub fn spawn<OP>(&self, work: OP)
90    where
91        OP: FnOnce() + Send + 'static,
92    {
93        {
94            let mut state = self.state.lock().unwrap();
95            if state.is_active() {
96                state.increment_active();
97            } else {
98                // Don't spawn any work.
99                return;
100            }
101        }
102
103        let state = self.state.clone();
104
105        self.pool.spawn(move || {
106            {
107                let mut state = state.lock().unwrap();
108                if !state.is_active() {
109                    // Decrement number of active workers and return,
110                    // without doing any work.
111                    return state.decrement_active();
112                }
113            }
114            // Perform work.
115            work();
116            {
117                // Decrement number of active workers.
118                let mut state = state.lock().unwrap();
119                state.decrement_active();
120            }
121        });
122    }
123
124    /// Prevent further work from being spawned,
125    /// and wait until all workers are done,
126    /// or a timeout of roughly one second has been reached.
127    pub fn exit(&self) {
128        {
129            let mut state = self.state.lock().unwrap();
130            state.switch_to_inactive();
131        }
132        let mut rounds = 0;
133        loop {
134            rounds += 1;
135            {
136                let state = self.state.lock().unwrap();
137                let still_active = state.active_workers();
138
139                if still_active == 0 || rounds == 10 {
140                    if still_active > 0 {
141                        debug!(
142                            "Exiting ThreadPool with {:?} still working(should be zero)",
143                            still_active
144                        );
145                    }
146                    break;
147                }
148            }
149            thread::sleep(Duration::from_millis(100));
150        }
151    }
152}