1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17pub struct IoSource<T> {
38    state: IoSourceState,
39    inner: T,
40    #[cfg(debug_assertions)]
41    selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45    pub fn new(io: T) -> IoSource<T> {
47        IoSource {
48            state: IoSourceState::new(),
49            inner: io,
50            #[cfg(debug_assertions)]
51            selector_id: SelectorId::new(),
52        }
53    }
54
55    pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66    where
67        F: FnOnce(&T) -> io::Result<R>,
68    {
69        self.state.do_io(f, &self.inner)
70    }
71
72    pub fn into_inner(self) -> T {
81        self.inner
82    }
83}
84
85impl<T> Deref for IoSource<T> {
90    type Target = T;
91
92    fn deref(&self) -> &Self::Target {
93        &self.inner
94    }
95}
96
97impl<T> DerefMut for IoSource<T> {
102    fn deref_mut(&mut self) -> &mut Self::Target {
103        &mut self.inner
104    }
105}
106
107#[cfg(any(unix, target_os = "hermit"))]
108impl<T> event::Source for IoSource<T>
109where
110    T: AsRawFd,
111{
112    fn register(
113        &mut self,
114        registry: &Registry,
115        token: Token,
116        interests: Interest,
117    ) -> io::Result<()> {
118        #[cfg(debug_assertions)]
119        self.selector_id.associate(registry)?;
120        self.state
121            .register(registry, token, interests, self.inner.as_raw_fd())
122    }
123
124    fn reregister(
125        &mut self,
126        registry: &Registry,
127        token: Token,
128        interests: Interest,
129    ) -> io::Result<()> {
130        #[cfg(debug_assertions)]
131        self.selector_id.check_association(registry)?;
132        self.state
133            .reregister(registry, token, interests, self.inner.as_raw_fd())
134    }
135
136    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
137        #[cfg(debug_assertions)]
138        self.selector_id.remove_association(registry)?;
139        self.state.deregister(registry, self.inner.as_raw_fd())
140    }
141}
142
143#[cfg(windows)]
144impl<T> event::Source for IoSource<T>
145where
146    T: AsRawSocket,
147{
148    fn register(
149        &mut self,
150        registry: &Registry,
151        token: Token,
152        interests: Interest,
153    ) -> io::Result<()> {
154        #[cfg(debug_assertions)]
155        self.selector_id.associate(registry)?;
156        self.state
157            .register(registry, token, interests, self.inner.as_raw_socket())
158    }
159
160    fn reregister(
161        &mut self,
162        registry: &Registry,
163        token: Token,
164        interests: Interest,
165    ) -> io::Result<()> {
166        #[cfg(debug_assertions)]
167        self.selector_id.check_association(registry)?;
168        self.state.reregister(registry, token, interests)
169    }
170
171    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
172        #[cfg(debug_assertions)]
173        self.selector_id.remove_association(_registry)?;
174        self.state.deregister()
175    }
176}
177
178#[cfg(target_os = "wasi")]
179impl<T> event::Source for IoSource<T>
180where
181    T: AsRawFd,
182{
183    fn register(
184        &mut self,
185        registry: &Registry,
186        token: Token,
187        interests: Interest,
188    ) -> io::Result<()> {
189        #[cfg(debug_assertions)]
190        self.selector_id.associate(registry)?;
191        registry
192            .selector()
193            .register(self.inner.as_raw_fd() as _, token, interests)
194    }
195
196    fn reregister(
197        &mut self,
198        registry: &Registry,
199        token: Token,
200        interests: Interest,
201    ) -> io::Result<()> {
202        #[cfg(debug_assertions)]
203        self.selector_id.check_association(registry)?;
204        registry
205            .selector()
206            .reregister(self.inner.as_raw_fd() as _, token, interests)
207    }
208
209    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
210        #[cfg(debug_assertions)]
211        self.selector_id.remove_association(registry)?;
212        registry.selector().deregister(self.inner.as_raw_fd() as _)
213    }
214}
215
216impl<T> fmt::Debug for IoSource<T>
217where
218    T: fmt::Debug,
219{
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        self.inner.fmt(f)
222    }
223}
224
225#[cfg(debug_assertions)]
227#[derive(Debug)]
228struct SelectorId {
229    id: AtomicUsize,
230}
231
232#[cfg(debug_assertions)]
233impl SelectorId {
234    const UNASSOCIATED: usize = 0;
237
238    const fn new() -> SelectorId {
240        SelectorId {
241            id: AtomicUsize::new(Self::UNASSOCIATED),
242        }
243    }
244
245    fn associate(&self, registry: &Registry) -> io::Result<()> {
248        let registry_id = registry.selector().id();
249        let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
250
251        if previous_id == Self::UNASSOCIATED {
252            Ok(())
253        } else {
254            Err(io::Error::new(
255                io::ErrorKind::AlreadyExists,
256                "I/O source already registered with a `Registry`",
257            ))
258        }
259    }
260
261    fn check_association(&self, registry: &Registry) -> io::Result<()> {
265        let registry_id = registry.selector().id();
266        let id = self.id.load(Ordering::Acquire);
267
268        if id == registry_id {
269            Ok(())
270        } else if id == Self::UNASSOCIATED {
271            Err(io::Error::new(
272                io::ErrorKind::NotFound,
273                "I/O source not registered with `Registry`",
274            ))
275        } else {
276            Err(io::Error::new(
277                io::ErrorKind::AlreadyExists,
278                "I/O source already registered with a different `Registry`",
279            ))
280        }
281    }
282
283    fn remove_association(&self, registry: &Registry) -> io::Result<()> {
286        let registry_id = registry.selector().id();
287        let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
288
289        if previous_id == registry_id {
290            Ok(())
291        } else {
292            Err(io::Error::new(
293                io::ErrorKind::NotFound,
294                "I/O source not registered with `Registry`",
295            ))
296        }
297    }
298}
299
300#[cfg(debug_assertions)]
301impl Clone for SelectorId {
302    fn clone(&self) -> SelectorId {
303        SelectorId {
304            id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
305        }
306    }
307}