1cfg_signal_internal_and_unix! {
3 mod signal;
4}
5cfg_io_uring! {
6 mod uring;
7 use uring::UringContext;
8 use crate::loom::sync::atomic::AtomicUsize;
9}
10
11use crate::io::interest::Interest;
12use crate::io::ready::Ready;
13use crate::loom::sync::Mutex;
14use crate::runtime::driver;
15use crate::runtime::io::registration_set;
16use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
17
18use mio::event::Source;
19use std::fmt;
20use std::io;
21use std::sync::Arc;
22use std::time::Duration;
23
24pub(crate) struct Driver {
26 signal_ready: bool,
28
29 events: mio::Events,
31
32 poll: mio::Poll,
34}
35
36pub(crate) struct Handle {
38 registry: mio::Registry,
40
41 registrations: RegistrationSet,
43
44 synced: Mutex<registration_set::Synced>,
46
47 #[cfg(not(target_os = "wasi"))]
50 waker: mio::Waker,
51
52 pub(crate) metrics: IoDriverMetrics,
53
54 #[cfg(all(
55 tokio_unstable,
56 feature = "io-uring",
57 feature = "rt",
58 feature = "fs",
59 target_os = "linux",
60 ))]
61 pub(crate) uring_context: Mutex<UringContext>,
62
63 #[cfg(all(
64 tokio_unstable,
65 feature = "io-uring",
66 feature = "rt",
67 feature = "fs",
68 target_os = "linux",
69 ))]
70 pub(crate) uring_state: AtomicUsize,
71}
72
73#[derive(Debug)]
74pub(crate) struct ReadyEvent {
75 pub(super) tick: u8,
76 pub(crate) ready: Ready,
77 pub(super) is_shutdown: bool,
78}
79
80cfg_net_unix!(
81 impl ReadyEvent {
82 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
83 Self {
84 ready,
85 tick: self.tick,
86 is_shutdown: self.is_shutdown,
87 }
88 }
89 }
90);
91
92#[derive(Debug, Eq, PartialEq, Clone, Copy)]
93pub(super) enum Direction {
94 Read,
95 Write,
96}
97
98pub(super) enum Tick {
99 Set,
100 Clear(u8),
101}
102
103const TOKEN_WAKEUP: mio::Token = mio::Token(0);
104const TOKEN_SIGNAL: mio::Token = mio::Token(1);
105
106fn _assert_kinds() {
107 fn _assert<T: Send + Sync>() {}
108
109 _assert::<Handle>();
110}
111
112impl Driver {
115 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
118 let poll = mio::Poll::new()?;
119 #[cfg(not(target_os = "wasi"))]
120 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
121 let registry = poll.registry().try_clone()?;
122
123 let driver = Driver {
124 signal_ready: false,
125 events: mio::Events::with_capacity(nevents),
126 poll,
127 };
128
129 let (registrations, synced) = RegistrationSet::new();
130
131 let handle = Handle {
132 registry,
133 registrations,
134 synced: Mutex::new(synced),
135 #[cfg(not(target_os = "wasi"))]
136 waker,
137 metrics: IoDriverMetrics::default(),
138 #[cfg(all(
139 tokio_unstable,
140 feature = "io-uring",
141 feature = "rt",
142 feature = "fs",
143 target_os = "linux",
144 ))]
145 uring_context: Mutex::new(UringContext::new()),
146 #[cfg(all(
147 tokio_unstable,
148 feature = "io-uring",
149 feature = "rt",
150 feature = "fs",
151 target_os = "linux",
152 ))]
153 uring_state: AtomicUsize::new(0),
154 };
155
156 Ok((driver, handle))
157 }
158
159 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
160 let handle = rt_handle.io();
161 self.turn(handle, None);
162 }
163
164 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
165 let handle = rt_handle.io();
166 self.turn(handle, Some(duration));
167 }
168
169 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
170 let handle = rt_handle.io();
171 let ios = handle.registrations.shutdown(&mut handle.synced.lock());
172
173 for io in ios {
175 io.shutdown();
176 }
177 }
178
179 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
180 debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
181
182 handle.release_pending_registrations();
183
184 let events = &mut self.events;
185
186 match self.poll.poll(events, max_wait) {
189 Ok(()) => {}
190 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
191 #[cfg(target_os = "wasi")]
192 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
193 }
196 Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
197 }
198
199 let mut ready_count = 0;
201 for event in events.iter() {
202 let token = event.token();
203
204 if token == TOKEN_WAKEUP {
205 } else if token == TOKEN_SIGNAL {
207 self.signal_ready = true;
208 } else {
209 let ready = Ready::from_mio(event);
210 let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
211
212 let io: &ScheduledIo = unsafe { &*ptr };
217
218 io.set_readiness(Tick::Set, |curr| curr | ready);
219 io.wake(ready);
220
221 ready_count += 1;
222 }
223 }
224
225 #[cfg(all(
226 tokio_unstable,
227 feature = "io-uring",
228 feature = "rt",
229 feature = "fs",
230 target_os = "linux",
231 ))]
232 {
233 let mut guard = handle.get_uring().lock();
234 let ctx = &mut *guard;
235 ctx.dispatch_completions();
236 }
237
238 handle.metrics.incr_ready_count_by(ready_count);
239 }
240}
241
242impl fmt::Debug for Driver {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 write!(f, "Driver")
245 }
246}
247
248impl Handle {
249 pub(crate) fn unpark(&self) {
259 #[cfg(not(target_os = "wasi"))]
260 self.waker.wake().expect("failed to wake I/O driver");
261 }
262
263 pub(super) fn add_source(
267 &self,
268 source: &mut impl mio::event::Source,
269 interest: Interest,
270 ) -> io::Result<Arc<ScheduledIo>> {
271 let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
272 let token = scheduled_io.token();
273
274 if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
277 unsafe {
279 self.registrations
280 .remove(&mut self.synced.lock(), &scheduled_io)
281 };
282
283 return Err(e);
284 }
285
286 self.metrics.incr_fd_count();
288
289 Ok(scheduled_io)
290 }
291
292 pub(super) fn deregister_source(
294 &self,
295 registration: &Arc<ScheduledIo>,
296 source: &mut impl Source,
297 ) -> io::Result<()> {
298 self.registry.deregister(source)?;
300
301 if self
302 .registrations
303 .deregister(&mut self.synced.lock(), registration)
304 {
305 self.unpark();
306 }
307
308 self.metrics.dec_fd_count();
309
310 Ok(())
311 }
312
313 fn release_pending_registrations(&self) {
314 if self.registrations.needs_release() {
315 self.registrations.release(&mut self.synced.lock());
316 }
317 }
318}
319
320impl fmt::Debug for Handle {
321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322 write!(f, "Handle")
323 }
324}
325
326impl Direction {
327 pub(super) fn mask(self) -> Ready {
328 match self {
329 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
330 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
331 }
332 }
333}