crossbeam_channel/
waker.rs1use std::ptr;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Mutex;
6use std::thread::{self, ThreadId};
7use std::vec::Vec;
8
9use crate::context::Context;
10use crate::select::{Operation, Selected};
11
12pub(crate) struct Entry {
14    pub(crate) oper: Operation,
16
17    pub(crate) packet: *mut (),
19
20    pub(crate) cx: Context,
22}
23
24pub(crate) struct Waker {
29    selectors: Vec<Entry>,
31
32    observers: Vec<Entry>,
34}
35
36impl Waker {
37    #[inline]
39    pub(crate) fn new() -> Self {
40        Waker {
41            selectors: Vec::new(),
42            observers: Vec::new(),
43        }
44    }
45
46    #[inline]
48    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
49        self.register_with_packet(oper, ptr::null_mut(), cx);
50    }
51
52    #[inline]
54    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
55        self.selectors.push(Entry {
56            oper,
57            packet,
58            cx: cx.clone(),
59        });
60    }
61
62    #[inline]
64    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
65        if let Some((i, _)) = self
66            .selectors
67            .iter()
68            .enumerate()
69            .find(|&(_, entry)| entry.oper == oper)
70        {
71            let entry = self.selectors.remove(i);
72            Some(entry)
73        } else {
74            None
75        }
76    }
77
78    #[inline]
80    pub(crate) fn try_select(&mut self) -> Option<Entry> {
81        if self.selectors.is_empty() {
82            None
83        } else {
84            let thread_id = current_thread_id();
85
86            self.selectors
87                .iter()
88                .position(|selector| {
89                    selector.cx.thread_id() != thread_id
91                        && selector .cx
93                            .try_select(Selected::Operation(selector.oper))
94                            .is_ok()
95                        && {
96                            selector.cx.store_packet(selector.packet);
98                            selector.cx.unpark();
100                            true
101                        }
102                })
103                .map(|pos| self.selectors.remove(pos))
106        }
107    }
108
109    #[inline]
111    pub(crate) fn can_select(&self) -> bool {
112        if self.selectors.is_empty() {
113            false
114        } else {
115            let thread_id = current_thread_id();
116
117            self.selectors.iter().any(|entry| {
118                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119            })
120        }
121    }
122
123    #[inline]
125    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126        self.observers.push(Entry {
127            oper,
128            packet: ptr::null_mut(),
129            cx: cx.clone(),
130        });
131    }
132
133    #[inline]
135    pub(crate) fn unwatch(&mut self, oper: Operation) {
136        self.observers.retain(|e| e.oper != oper);
137    }
138
139    #[inline]
141    pub(crate) fn notify(&mut self) {
142        for entry in self.observers.drain(..) {
143            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144                entry.cx.unpark();
145            }
146        }
147    }
148
149    #[inline]
151    pub(crate) fn disconnect(&mut self) {
152        for entry in self.selectors.iter() {
153            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154                entry.cx.unpark();
160            }
161        }
162
163        self.notify();
164    }
165}
166
167impl Drop for Waker {
168    #[inline]
169    fn drop(&mut self) {
170        debug_assert_eq!(self.selectors.len(), 0);
171        debug_assert_eq!(self.observers.len(), 0);
172    }
173}
174
175pub(crate) struct SyncWaker {
179    inner: Mutex<Waker>,
181
182    is_empty: AtomicBool,
184}
185
186impl SyncWaker {
187    #[inline]
189    pub(crate) fn new() -> Self {
190        SyncWaker {
191            inner: Mutex::new(Waker::new()),
192            is_empty: AtomicBool::new(true),
193        }
194    }
195
196    #[inline]
198    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199        let mut inner = self.inner.lock().unwrap();
200        inner.register(oper, cx);
201        self.is_empty.store(
202            inner.selectors.is_empty() && inner.observers.is_empty(),
203            Ordering::SeqCst,
204        );
205    }
206
207    #[inline]
209    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210        let mut inner = self.inner.lock().unwrap();
211        let entry = inner.unregister(oper);
212        self.is_empty.store(
213            inner.selectors.is_empty() && inner.observers.is_empty(),
214            Ordering::SeqCst,
215        );
216        entry
217    }
218
219    #[inline]
221    pub(crate) fn notify(&self) {
222        if !self.is_empty.load(Ordering::SeqCst) {
223            let mut inner = self.inner.lock().unwrap();
224            if !self.is_empty.load(Ordering::SeqCst) {
225                inner.try_select();
226                inner.notify();
227                self.is_empty.store(
228                    inner.selectors.is_empty() && inner.observers.is_empty(),
229                    Ordering::SeqCst,
230                );
231            }
232        }
233    }
234
235    #[inline]
237    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238        let mut inner = self.inner.lock().unwrap();
239        inner.watch(oper, cx);
240        self.is_empty.store(
241            inner.selectors.is_empty() && inner.observers.is_empty(),
242            Ordering::SeqCst,
243        );
244    }
245
246    #[inline]
248    pub(crate) fn unwatch(&self, oper: Operation) {
249        let mut inner = self.inner.lock().unwrap();
250        inner.unwatch(oper);
251        self.is_empty.store(
252            inner.selectors.is_empty() && inner.observers.is_empty(),
253            Ordering::SeqCst,
254        );
255    }
256
257    #[inline]
259    pub(crate) fn disconnect(&self) {
260        let mut inner = self.inner.lock().unwrap();
261        inner.disconnect();
262        self.is_empty.store(
263            inner.selectors.is_empty() && inner.observers.is_empty(),
264            Ordering::SeqCst,
265        );
266    }
267}
268
269impl Drop for SyncWaker {
270    #[inline]
271    fn drop(&mut self) {
272        debug_assert!(self.is_empty.load(Ordering::SeqCst));
273    }
274}
275
276#[inline]
278fn current_thread_id() -> ThreadId {
279    std::thread_local! {
280        static THREAD_ID: ThreadId = thread::current().id();
282    }
283
284    THREAD_ID
285        .try_with(|id| *id)
286        .unwrap_or_else(|_| thread::current().id())
287}