tokio/runtime/scheduler/multi_thread/
idle.rs1use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::scheduler::multi_thread::Shared;
5
6use std::fmt;
7use std::sync::atomic::Ordering::{self, SeqCst};
8
9pub(super) struct Idle {
10    state: AtomicUsize,
15
16    num_workers: usize,
18}
19
20pub(super) struct Synced {
22    sleepers: Vec<usize>,
24}
25
26const UNPARK_SHIFT: usize = 16;
27const UNPARK_MASK: usize = !SEARCH_MASK;
28const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
29
30#[derive(Copy, Clone)]
31struct State(usize);
32
33impl Idle {
34    pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
35        let init = State::new(num_workers);
36
37        let idle = Idle {
38            state: AtomicUsize::new(init.into()),
39            num_workers,
40        };
41
42        let synced = Synced {
43            sleepers: Vec::with_capacity(num_workers),
44        };
45
46        (idle, synced)
47    }
48
49    pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
52        if !self.notify_should_wakeup() {
62            return None;
63        }
64
65        let mut lock = shared.synced.lock();
67
68        if !self.notify_should_wakeup() {
70            return None;
71        }
72
73        State::unpark_one(&self.state, 1);
76
77        let ret = lock.idle.sleepers.pop();
79        debug_assert!(ret.is_some());
80
81        ret
82    }
83
84    pub(super) fn transition_worker_to_parked(
87        &self,
88        shared: &Shared,
89        worker: usize,
90        is_searching: bool,
91    ) -> bool {
92        let mut lock = shared.synced.lock();
94
95        let ret = State::dec_num_unparked(&self.state, is_searching);
97
98        lock.idle.sleepers.push(worker);
100
101        ret
102    }
103
104    pub(super) fn transition_worker_to_searching(&self) -> bool {
105        let state = State::load(&self.state, SeqCst);
106        if 2 * state.num_searching() >= self.num_workers {
107            return false;
108        }
109
110        State::inc_num_searching(&self.state, SeqCst);
114        true
115    }
116
117    pub(super) fn transition_worker_from_searching(&self) -> bool {
122        State::dec_num_searching(&self.state)
123    }
124
125    pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
130        let mut lock = shared.synced.lock();
131        let sleepers = &mut lock.idle.sleepers;
132
133        for index in 0..sleepers.len() {
134            if sleepers[index] == worker_id {
135                sleepers.swap_remove(index);
136
137                State::unpark_one(&self.state, 0);
139
140                return true;
141            }
142        }
143
144        false
145    }
146
147    pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
149        let lock = shared.synced.lock();
150        lock.idle.sleepers.contains(&worker_id)
151    }
152
153    fn notify_should_wakeup(&self) -> bool {
154        let state = State(self.state.fetch_add(0, SeqCst));
155        state.num_searching() == 0 && state.num_unparked() < self.num_workers
156    }
157}
158
159impl State {
160    fn new(num_workers: usize) -> State {
161        let ret = State(num_workers << UNPARK_SHIFT);
163        debug_assert_eq!(num_workers, ret.num_unparked());
164        debug_assert_eq!(0, ret.num_searching());
165        ret
166    }
167
168    fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
169        State(cell.load(ordering))
170    }
171
172    fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
173        cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
174    }
175
176    fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
177        cell.fetch_add(1, ordering);
178    }
179
180    fn dec_num_searching(cell: &AtomicUsize) -> bool {
182        let state = State(cell.fetch_sub(1, SeqCst));
183        state.num_searching() == 1
184    }
185
186    fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
190        let mut dec = 1 << UNPARK_SHIFT;
191
192        if is_searching {
193            dec += 1;
194        }
195
196        let prev = State(cell.fetch_sub(dec, SeqCst));
197        is_searching && prev.num_searching() == 1
198    }
199
200    fn num_searching(self) -> usize {
202        self.0 & SEARCH_MASK
203    }
204
205    fn num_unparked(self) -> usize {
207        (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
208    }
209}
210
211impl From<usize> for State {
212    fn from(src: usize) -> State {
213        State(src)
214    }
215}
216
217impl From<State> for usize {
218    fn from(src: State) -> usize {
219        src.0
220    }
221}
222
223impl fmt::Debug for State {
224    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225        fmt.debug_struct("worker::State")
226            .field("num_unparked", &self.num_unparked())
227            .field("num_searching", &self.num_searching())
228            .finish()
229    }
230}
231
232#[test]
233fn test_state() {
234    assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
235    assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
236
237    let state = State::new(10);
238    assert_eq!(10, state.num_unparked());
239    assert_eq!(0, state.num_searching());
240}