mio/
io_source.rs

1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4// TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
5// can use `std::os::fd` and be merged with the above.
6#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
18/// implementation.
19///
20/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
21///
22/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
23/// Mio supports registering any FD or socket that can be registered with the
24/// underlying OS selector. `IoSource` provides the necessary bridge.
25///
26/// [`RawFd`]: std::os::fd::RawFd
27/// [`RawSocket`]: std::os::windows::io::RawSocket
28///
29/// # Notes
30///
31/// To handle the registrations and events properly **all** I/O operations (such
32/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
33/// internal state is updated accordingly.
34///
35/// [`Poll`]: crate::Poll
36/// [`do_io`]: IoSource::do_io
37pub struct IoSource<T> {
38    state: IoSourceState,
39    inner: T,
40    #[cfg(debug_assertions)]
41    selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45    /// Create a new `IoSource`.
46    pub fn new(io: T) -> IoSource<T> {
47        IoSource {
48            state: IoSourceState::new(),
49            inner: io,
50            #[cfg(debug_assertions)]
51            selector_id: SelectorId::new(),
52        }
53    }
54
55    /// Execute an I/O operations ensuring that the socket receives more events
56    /// if it hits a [`WouldBlock`] error.
57    ///
58    /// # Notes
59    ///
60    /// This method is required to be called for **all** I/O operations to
61    /// ensure the user will receive events once the socket is ready again after
62    /// returning a [`WouldBlock`] error.
63    ///
64    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
65    pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66    where
67        F: FnOnce(&T) -> io::Result<R>,
68    {
69        self.state.do_io(f, &self.inner)
70    }
71
72    /// Returns the I/O source, dropping the state.
73    ///
74    /// # Notes
75    ///
76    /// To ensure no more events are to be received for this I/O source first
77    /// [`deregister`] it.
78    ///
79    /// [`deregister`]: Registry::deregister
80    pub fn into_inner(self) -> T {
81        self.inner
82    }
83}
84
85/// Be careful when using this method. All I/O operations that may block must go
86/// through the [`do_io`] method.
87///
88/// [`do_io`]: IoSource::do_io
89impl<T> Deref for IoSource<T> {
90    type Target = T;
91
92    fn deref(&self) -> &Self::Target {
93        &self.inner
94    }
95}
96
97/// Be careful when using this method. All I/O operations that may block must go
98/// through the [`do_io`] method.
99///
100/// [`do_io`]: IoSource::do_io
101impl<T> DerefMut for IoSource<T> {
102    fn deref_mut(&mut self) -> &mut Self::Target {
103        &mut self.inner
104    }
105}
106
107#[cfg(any(
108    unix,
109    target_os = "hermit",
110    all(target_os = "wasi", not(target_env = "p1"))
111))]
112impl<T> event::Source for IoSource<T>
113where
114    T: AsRawFd,
115{
116    fn register(
117        &mut self,
118        registry: &Registry,
119        token: Token,
120        interests: Interest,
121    ) -> io::Result<()> {
122        #[cfg(debug_assertions)]
123        self.selector_id.associate(registry)?;
124        self.state
125            .register(registry, token, interests, self.inner.as_raw_fd())
126    }
127
128    fn reregister(
129        &mut self,
130        registry: &Registry,
131        token: Token,
132        interests: Interest,
133    ) -> io::Result<()> {
134        #[cfg(debug_assertions)]
135        self.selector_id.check_association(registry)?;
136        self.state
137            .reregister(registry, token, interests, self.inner.as_raw_fd())
138    }
139
140    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
141        #[cfg(debug_assertions)]
142        self.selector_id.remove_association(registry)?;
143        self.state.deregister(registry, self.inner.as_raw_fd())
144    }
145}
146
147#[cfg(windows)]
148impl<T> event::Source for IoSource<T>
149where
150    T: AsRawSocket,
151{
152    fn register(
153        &mut self,
154        registry: &Registry,
155        token: Token,
156        interests: Interest,
157    ) -> io::Result<()> {
158        #[cfg(debug_assertions)]
159        self.selector_id.associate(registry)?;
160        self.state
161            .register(registry, token, interests, self.inner.as_raw_socket())
162    }
163
164    fn reregister(
165        &mut self,
166        registry: &Registry,
167        token: Token,
168        interests: Interest,
169    ) -> io::Result<()> {
170        #[cfg(debug_assertions)]
171        self.selector_id.check_association(registry)?;
172        self.state.reregister(registry, token, interests)
173    }
174
175    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
176        #[cfg(debug_assertions)]
177        self.selector_id.remove_association(_registry)?;
178        self.state.deregister()
179    }
180}
181
182#[cfg(all(target_os = "wasi", target_env = "p1"))]
183impl<T> event::Source for IoSource<T>
184where
185    T: AsRawFd,
186{
187    fn register(
188        &mut self,
189        registry: &Registry,
190        token: Token,
191        interests: Interest,
192    ) -> io::Result<()> {
193        #[cfg(debug_assertions)]
194        self.selector_id.associate(registry)?;
195        registry
196            .selector()
197            .register(self.inner.as_raw_fd() as _, token, interests)
198    }
199
200    fn reregister(
201        &mut self,
202        registry: &Registry,
203        token: Token,
204        interests: Interest,
205    ) -> io::Result<()> {
206        #[cfg(debug_assertions)]
207        self.selector_id.check_association(registry)?;
208        registry
209            .selector()
210            .reregister(self.inner.as_raw_fd() as _, token, interests)
211    }
212
213    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
214        #[cfg(debug_assertions)]
215        self.selector_id.remove_association(registry)?;
216        registry.selector().deregister(self.inner.as_raw_fd() as _)
217    }
218}
219
220impl<T> fmt::Debug for IoSource<T>
221where
222    T: fmt::Debug,
223{
224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225        self.inner.fmt(f)
226    }
227}
228
229/// Used to associate an `IoSource` with a `sys::Selector`.
230#[cfg(debug_assertions)]
231#[derive(Debug)]
232struct SelectorId {
233    id: AtomicUsize,
234}
235
236#[cfg(debug_assertions)]
237impl SelectorId {
238    /// Value of `id` if `SelectorId` is not associated with any
239    /// `sys::Selector`. Valid selector ids start at 1.
240    const UNASSOCIATED: usize = 0;
241
242    /// Create a new `SelectorId`.
243    const fn new() -> SelectorId {
244        SelectorId {
245            id: AtomicUsize::new(Self::UNASSOCIATED),
246        }
247    }
248
249    /// Associate an I/O source with `registry`, returning an error if its
250    /// already registered.
251    fn associate(&self, registry: &Registry) -> io::Result<()> {
252        let registry_id = registry.selector().id();
253        let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
254
255        if previous_id == Self::UNASSOCIATED {
256            Ok(())
257        } else {
258            Err(io::Error::new(
259                io::ErrorKind::AlreadyExists,
260                "I/O source already registered with a `Registry`",
261            ))
262        }
263    }
264
265    /// Check the association of an I/O source with `registry`, returning an
266    /// error if its registered with a different `Registry` or not registered at
267    /// all.
268    fn check_association(&self, registry: &Registry) -> io::Result<()> {
269        let registry_id = registry.selector().id();
270        let id = self.id.load(Ordering::Acquire);
271
272        if id == registry_id {
273            Ok(())
274        } else if id == Self::UNASSOCIATED {
275            Err(io::Error::new(
276                io::ErrorKind::NotFound,
277                "I/O source not registered with `Registry`",
278            ))
279        } else {
280            Err(io::Error::new(
281                io::ErrorKind::AlreadyExists,
282                "I/O source already registered with a different `Registry`",
283            ))
284        }
285    }
286
287    /// Remove a previously made association from `registry`, returns an error
288    /// if it was not previously associated with `registry`.
289    fn remove_association(&self, registry: &Registry) -> io::Result<()> {
290        let registry_id = registry.selector().id();
291        let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
292
293        if previous_id == registry_id {
294            Ok(())
295        } else {
296            Err(io::Error::new(
297                io::ErrorKind::NotFound,
298                "I/O source not registered with `Registry`",
299            ))
300        }
301    }
302}
303
304#[cfg(debug_assertions)]
305impl Clone for SelectorId {
306    fn clone(&self) -> SelectorId {
307        SelectorId {
308            id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
309        }
310    }
311}