1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17pub struct IoSource<T> {
38 state: IoSourceState,
39 inner: T,
40 #[cfg(debug_assertions)]
41 selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45 pub fn new(io: T) -> IoSource<T> {
47 IoSource {
48 state: IoSourceState::new(),
49 inner: io,
50 #[cfg(debug_assertions)]
51 selector_id: SelectorId::new(),
52 }
53 }
54
55 pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66 where
67 F: FnOnce(&T) -> io::Result<R>,
68 {
69 self.state.do_io(f, &self.inner)
70 }
71
72 pub fn into_inner(self) -> T {
81 self.inner
82 }
83}
84
85impl<T> Deref for IoSource<T> {
90 type Target = T;
91
92 fn deref(&self) -> &Self::Target {
93 &self.inner
94 }
95}
96
97impl<T> DerefMut for IoSource<T> {
102 fn deref_mut(&mut self) -> &mut Self::Target {
103 &mut self.inner
104 }
105}
106
107#[cfg(any(
108 unix,
109 target_os = "hermit",
110 all(target_os = "wasi", not(target_env = "p1"))
111))]
112impl<T> event::Source for IoSource<T>
113where
114 T: AsRawFd,
115{
116 fn register(
117 &mut self,
118 registry: &Registry,
119 token: Token,
120 interests: Interest,
121 ) -> io::Result<()> {
122 #[cfg(debug_assertions)]
123 self.selector_id.associate(registry)?;
124 self.state
125 .register(registry, token, interests, self.inner.as_raw_fd())
126 }
127
128 fn reregister(
129 &mut self,
130 registry: &Registry,
131 token: Token,
132 interests: Interest,
133 ) -> io::Result<()> {
134 #[cfg(debug_assertions)]
135 self.selector_id.check_association(registry)?;
136 self.state
137 .reregister(registry, token, interests, self.inner.as_raw_fd())
138 }
139
140 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
141 #[cfg(debug_assertions)]
142 self.selector_id.remove_association(registry)?;
143 self.state.deregister(registry, self.inner.as_raw_fd())
144 }
145}
146
147#[cfg(windows)]
148impl<T> event::Source for IoSource<T>
149where
150 T: AsRawSocket,
151{
152 fn register(
153 &mut self,
154 registry: &Registry,
155 token: Token,
156 interests: Interest,
157 ) -> io::Result<()> {
158 #[cfg(debug_assertions)]
159 self.selector_id.associate(registry)?;
160 self.state
161 .register(registry, token, interests, self.inner.as_raw_socket())
162 }
163
164 fn reregister(
165 &mut self,
166 registry: &Registry,
167 token: Token,
168 interests: Interest,
169 ) -> io::Result<()> {
170 #[cfg(debug_assertions)]
171 self.selector_id.check_association(registry)?;
172 self.state.reregister(registry, token, interests)
173 }
174
175 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
176 #[cfg(debug_assertions)]
177 self.selector_id.remove_association(_registry)?;
178 self.state.deregister()
179 }
180}
181
182#[cfg(all(target_os = "wasi", target_env = "p1"))]
183impl<T> event::Source for IoSource<T>
184where
185 T: AsRawFd,
186{
187 fn register(
188 &mut self,
189 registry: &Registry,
190 token: Token,
191 interests: Interest,
192 ) -> io::Result<()> {
193 #[cfg(debug_assertions)]
194 self.selector_id.associate(registry)?;
195 registry
196 .selector()
197 .register(self.inner.as_raw_fd() as _, token, interests)
198 }
199
200 fn reregister(
201 &mut self,
202 registry: &Registry,
203 token: Token,
204 interests: Interest,
205 ) -> io::Result<()> {
206 #[cfg(debug_assertions)]
207 self.selector_id.check_association(registry)?;
208 registry
209 .selector()
210 .reregister(self.inner.as_raw_fd() as _, token, interests)
211 }
212
213 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
214 #[cfg(debug_assertions)]
215 self.selector_id.remove_association(registry)?;
216 registry.selector().deregister(self.inner.as_raw_fd() as _)
217 }
218}
219
220impl<T> fmt::Debug for IoSource<T>
221where
222 T: fmt::Debug,
223{
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 self.inner.fmt(f)
226 }
227}
228
229#[cfg(debug_assertions)]
231#[derive(Debug)]
232struct SelectorId {
233 id: AtomicUsize,
234}
235
236#[cfg(debug_assertions)]
237impl SelectorId {
238 const UNASSOCIATED: usize = 0;
241
242 const fn new() -> SelectorId {
244 SelectorId {
245 id: AtomicUsize::new(Self::UNASSOCIATED),
246 }
247 }
248
249 fn associate(&self, registry: &Registry) -> io::Result<()> {
252 let registry_id = registry.selector().id();
253 let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
254
255 if previous_id == Self::UNASSOCIATED {
256 Ok(())
257 } else {
258 Err(io::Error::new(
259 io::ErrorKind::AlreadyExists,
260 "I/O source already registered with a `Registry`",
261 ))
262 }
263 }
264
265 fn check_association(&self, registry: &Registry) -> io::Result<()> {
269 let registry_id = registry.selector().id();
270 let id = self.id.load(Ordering::Acquire);
271
272 if id == registry_id {
273 Ok(())
274 } else if id == Self::UNASSOCIATED {
275 Err(io::Error::new(
276 io::ErrorKind::NotFound,
277 "I/O source not registered with `Registry`",
278 ))
279 } else {
280 Err(io::Error::new(
281 io::ErrorKind::AlreadyExists,
282 "I/O source already registered with a different `Registry`",
283 ))
284 }
285 }
286
287 fn remove_association(&self, registry: &Registry) -> io::Result<()> {
290 let registry_id = registry.selector().id();
291 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
292
293 if previous_id == registry_id {
294 Ok(())
295 } else {
296 Err(io::Error::new(
297 io::ErrorKind::NotFound,
298 "I/O source not registered with `Registry`",
299 ))
300 }
301 }
302}
303
304#[cfg(debug_assertions)]
305impl Clone for SelectorId {
306 fn clone(&self) -> SelectorId {
307 SelectorId {
308 id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
309 }
310 }
311}