base/
threadpool.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use log::debug;
10
11/// The state of the thread-pool used by CoreResource.
12struct ThreadPoolState {
13    /// The number of active workers.
14    active_workers: u32,
15    /// Whether the pool can spawn additional work.
16    active: bool,
17}
18
19impl ThreadPoolState {
20    pub fn new() -> ThreadPoolState {
21        ThreadPoolState {
22            active_workers: 0,
23            active: true,
24        }
25    }
26
27    /// Is the pool still able to spawn new work?
28    pub fn is_active(&self) -> bool {
29        self.active
30    }
31
32    /// How many workers are currently active?
33    pub fn active_workers(&self) -> u32 {
34        self.active_workers
35    }
36
37    /// Prevent additional work from being spawned.
38    pub fn switch_to_inactive(&mut self) {
39        self.active = false;
40    }
41
42    /// Add to the count of active workers.
43    pub fn increment_active(&mut self) {
44        self.active_workers += 1;
45    }
46
47    /// Substract from the count of active workers.
48    pub fn decrement_active(&mut self) {
49        self.active_workers -= 1;
50    }
51}
52
53/// Threadpool used by Fetch and file operations.
54pub struct ThreadPool {
55    pool: rayon::ThreadPool,
56    state: Arc<Mutex<ThreadPoolState>>,
57}
58
59impl ThreadPool {
60    pub fn new(num_threads: usize, pool_name: String) -> Self {
61        debug!("Creating new ThreadPool with {num_threads} threads!");
62        let pool = rayon::ThreadPoolBuilder::new()
63            .thread_name(move |i| format!("{pool_name}#{i}"))
64            .num_threads(num_threads)
65            .build()
66            .unwrap();
67        let state = Arc::new(Mutex::new(ThreadPoolState::new()));
68        Self { pool, state }
69    }
70
71    /// Spawn work on the thread-pool, if still active.
72    ///
73    /// There is no need to give feedback to the caller,
74    /// because if we do not perform work,
75    /// it is because the system as a whole is exiting.
76    pub fn spawn<OP>(&self, work: OP)
77    where
78        OP: FnOnce() + Send + 'static,
79    {
80        {
81            let mut state = self.state.lock().unwrap();
82            if state.is_active() {
83                state.increment_active();
84            } else {
85                // Don't spawn any work.
86                return;
87            }
88        }
89
90        let state = self.state.clone();
91
92        self.pool.spawn(move || {
93            {
94                let mut state = state.lock().unwrap();
95                if !state.is_active() {
96                    // Decrement number of active workers and return,
97                    // without doing any work.
98                    return state.decrement_active();
99                }
100            }
101            // Perform work.
102            work();
103            {
104                // Decrement number of active workers.
105                let mut state = state.lock().unwrap();
106                state.decrement_active();
107            }
108        });
109    }
110
111    /// Prevent further work from being spawned,
112    /// and wait until all workers are done,
113    /// or a timeout of roughly one second has been reached.
114    pub fn exit(&self) {
115        {
116            let mut state = self.state.lock().unwrap();
117            state.switch_to_inactive();
118        }
119        let mut rounds = 0;
120        loop {
121            rounds += 1;
122            {
123                let state = self.state.lock().unwrap();
124                let still_active = state.active_workers();
125
126                if still_active == 0 || rounds == 10 {
127                    if still_active > 0 {
128                        debug!(
129                            "Exiting ThreadPool with {:?} still working(should be zero)",
130                            still_active
131                        );
132                    }
133                    break;
134                }
135            }
136            thread::sleep(Duration::from_millis(100));
137        }
138    }
139}