1use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use log::debug;
10
11struct ThreadPoolState {
13 active_workers: u32,
15 active: bool,
17}
18
19impl ThreadPoolState {
20 pub fn new() -> ThreadPoolState {
21 ThreadPoolState {
22 active_workers: 0,
23 active: true,
24 }
25 }
26
27 pub fn is_active(&self) -> bool {
29 self.active
30 }
31
32 pub fn active_workers(&self) -> u32 {
34 self.active_workers
35 }
36
37 pub fn switch_to_inactive(&mut self) {
39 self.active = false;
40 }
41
42 pub fn increment_active(&mut self) {
44 self.active_workers += 1;
45 }
46
47 pub fn decrement_active(&mut self) {
49 self.active_workers -= 1;
50 }
51}
52
53pub struct ThreadPool {
55 pool: rayon::ThreadPool,
56 state: Arc<Mutex<ThreadPoolState>>,
57}
58
59impl ThreadPool {
60 pub fn new(num_threads: usize, pool_name: String) -> Self {
61 debug!("Creating new ThreadPool with {num_threads} threads!");
62 let pool = rayon::ThreadPoolBuilder::new()
63 .thread_name(move |i| format!("{pool_name}#{i}"))
64 .num_threads(num_threads)
65 .build()
66 .unwrap();
67 let state = Arc::new(Mutex::new(ThreadPoolState::new()));
68 Self { pool, state }
69 }
70
71 pub fn spawn<OP>(&self, work: OP)
77 where
78 OP: FnOnce() + Send + 'static,
79 {
80 {
81 let mut state = self.state.lock().unwrap();
82 if state.is_active() {
83 state.increment_active();
84 } else {
85 return;
87 }
88 }
89
90 let state = self.state.clone();
91
92 self.pool.spawn(move || {
93 {
94 let mut state = state.lock().unwrap();
95 if !state.is_active() {
96 return state.decrement_active();
99 }
100 }
101 work();
103 {
104 let mut state = state.lock().unwrap();
106 state.decrement_active();
107 }
108 });
109 }
110
111 pub fn exit(&self) {
115 {
116 let mut state = self.state.lock().unwrap();
117 state.switch_to_inactive();
118 }
119 let mut rounds = 0;
120 loop {
121 rounds += 1;
122 {
123 let state = self.state.lock().unwrap();
124 let still_active = state.active_workers();
125
126 if still_active == 0 || rounds == 10 {
127 if still_active > 0 {
128 debug!(
129 "Exiting ThreadPool with {:?} still working(should be zero)",
130 still_active
131 );
132 }
133 break;
134 }
135 }
136 thread::sleep(Duration::from_millis(100));
137 }
138 }
139}