tokio/runtime/io/
driver.rs

1// Signal handling
2cfg_signal_internal_and_unix! {
3    mod signal;
4}
5cfg_io_uring! {
6    mod uring;
7    use uring::UringContext;
8    use crate::loom::sync::atomic::AtomicUsize;
9}
10
11use crate::io::interest::Interest;
12use crate::io::ready::Ready;
13use crate::loom::sync::Mutex;
14use crate::runtime::driver;
15use crate::runtime::io::registration_set;
16use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
17
18use mio::event::Source;
19use std::fmt;
20use std::io;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// I/O driver, backed by Mio.
25pub(crate) struct Driver {
26    /// True when an event with the signal token is received
27    signal_ready: bool,
28
29    /// Reuse the `mio::Events` value across calls to poll.
30    events: mio::Events,
31
32    /// The system event queue.
33    poll: mio::Poll,
34}
35
36/// A reference to an I/O driver.
37pub(crate) struct Handle {
38    /// Registers I/O resources.
39    registry: mio::Registry,
40
41    /// Tracks all registrations
42    registrations: RegistrationSet,
43
44    /// State that should be synchronized
45    synced: Mutex<registration_set::Synced>,
46
47    /// Used to wake up the reactor from a call to `turn`.
48    /// Not supported on `Wasi` due to lack of threading support.
49    #[cfg(not(target_os = "wasi"))]
50    waker: mio::Waker,
51
52    pub(crate) metrics: IoDriverMetrics,
53
54    #[cfg(all(
55        tokio_unstable,
56        feature = "io-uring",
57        feature = "rt",
58        feature = "fs",
59        target_os = "linux",
60    ))]
61    pub(crate) uring_context: Mutex<UringContext>,
62
63    #[cfg(all(
64        tokio_unstable,
65        feature = "io-uring",
66        feature = "rt",
67        feature = "fs",
68        target_os = "linux",
69    ))]
70    pub(crate) uring_state: AtomicUsize,
71}
72
73#[derive(Debug)]
74pub(crate) struct ReadyEvent {
75    pub(super) tick: u8,
76    pub(crate) ready: Ready,
77    pub(super) is_shutdown: bool,
78}
79
80cfg_net_unix!(
81    impl ReadyEvent {
82        pub(crate) fn with_ready(&self, ready: Ready) -> Self {
83            Self {
84                ready,
85                tick: self.tick,
86                is_shutdown: self.is_shutdown,
87            }
88        }
89    }
90);
91
92#[derive(Debug, Eq, PartialEq, Clone, Copy)]
93pub(super) enum Direction {
94    Read,
95    Write,
96}
97
98pub(super) enum Tick {
99    Set,
100    Clear(u8),
101}
102
103const TOKEN_WAKEUP: mio::Token = mio::Token(0);
104const TOKEN_SIGNAL: mio::Token = mio::Token(1);
105
106fn _assert_kinds() {
107    fn _assert<T: Send + Sync>() {}
108
109    _assert::<Handle>();
110}
111
112// ===== impl Driver =====
113
114impl Driver {
115    /// Creates a new event loop, returning any error that happened during the
116    /// creation.
117    pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
118        let poll = mio::Poll::new()?;
119        #[cfg(not(target_os = "wasi"))]
120        let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
121        let registry = poll.registry().try_clone()?;
122
123        let driver = Driver {
124            signal_ready: false,
125            events: mio::Events::with_capacity(nevents),
126            poll,
127        };
128
129        let (registrations, synced) = RegistrationSet::new();
130
131        let handle = Handle {
132            registry,
133            registrations,
134            synced: Mutex::new(synced),
135            #[cfg(not(target_os = "wasi"))]
136            waker,
137            metrics: IoDriverMetrics::default(),
138            #[cfg(all(
139                tokio_unstable,
140                feature = "io-uring",
141                feature = "rt",
142                feature = "fs",
143                target_os = "linux",
144            ))]
145            uring_context: Mutex::new(UringContext::new()),
146            #[cfg(all(
147                tokio_unstable,
148                feature = "io-uring",
149                feature = "rt",
150                feature = "fs",
151                target_os = "linux",
152            ))]
153            uring_state: AtomicUsize::new(0),
154        };
155
156        Ok((driver, handle))
157    }
158
159    pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
160        let handle = rt_handle.io();
161        self.turn(handle, None);
162    }
163
164    pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
165        let handle = rt_handle.io();
166        self.turn(handle, Some(duration));
167    }
168
169    pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
170        let handle = rt_handle.io();
171        let ios = handle.registrations.shutdown(&mut handle.synced.lock());
172
173        // `shutdown()` must be called without holding the lock.
174        for io in ios {
175            io.shutdown();
176        }
177    }
178
179    fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
180        debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
181
182        handle.release_pending_registrations();
183
184        let events = &mut self.events;
185
186        // Block waiting for an event to happen, peeling out how many events
187        // happened.
188        match self.poll.poll(events, max_wait) {
189            Ok(()) => {}
190            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
191            #[cfg(target_os = "wasi")]
192            Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
193                // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
194                // just return from the park, as there would be nothing, which wakes us up.
195            }
196            Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
197        }
198
199        // Process all the events that came in, dispatching appropriately
200        let mut ready_count = 0;
201        for event in events.iter() {
202            let token = event.token();
203
204            if token == TOKEN_WAKEUP {
205                // Nothing to do, the event is used to unblock the I/O driver
206            } else if token == TOKEN_SIGNAL {
207                self.signal_ready = true;
208            } else {
209                let ready = Ready::from_mio(event);
210                let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
211
212                // Safety: we ensure that the pointers used as tokens are not freed
213                // until they are both deregistered from mio **and** we know the I/O
214                // driver is not concurrently polling. The I/O driver holds ownership of
215                // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
216                let io: &ScheduledIo = unsafe { &*ptr };
217
218                io.set_readiness(Tick::Set, |curr| curr | ready);
219                io.wake(ready);
220
221                ready_count += 1;
222            }
223        }
224
225        #[cfg(all(
226            tokio_unstable,
227            feature = "io-uring",
228            feature = "rt",
229            feature = "fs",
230            target_os = "linux",
231        ))]
232        {
233            let mut guard = handle.get_uring().lock();
234            let ctx = &mut *guard;
235            ctx.dispatch_completions();
236        }
237
238        handle.metrics.incr_ready_count_by(ready_count);
239    }
240}
241
242impl fmt::Debug for Driver {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        write!(f, "Driver")
245    }
246}
247
248impl Handle {
249    /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
250    /// makes the next call to `turn` return immediately.
251    ///
252    /// This method is intended to be used in situations where a notification
253    /// needs to otherwise be sent to the main reactor. If the reactor is
254    /// currently blocked inside of `turn` then it will wake up and soon return
255    /// after this method has been called. If the reactor is not currently
256    /// blocked in `turn`, then the next call to `turn` will not block and
257    /// return immediately.
258    pub(crate) fn unpark(&self) {
259        #[cfg(not(target_os = "wasi"))]
260        self.waker.wake().expect("failed to wake I/O driver");
261    }
262
263    /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
264    ///
265    /// The registration token is returned.
266    pub(super) fn add_source(
267        &self,
268        source: &mut impl mio::event::Source,
269        interest: Interest,
270    ) -> io::Result<Arc<ScheduledIo>> {
271        let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
272        let token = scheduled_io.token();
273
274        // we should remove the `scheduled_io` from the `registrations` set if registering
275        // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
276        if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
277            // safety: `scheduled_io` is part of the `registrations` set.
278            unsafe {
279                self.registrations
280                    .remove(&mut self.synced.lock(), &scheduled_io)
281            };
282
283            return Err(e);
284        }
285
286        // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
287        self.metrics.incr_fd_count();
288
289        Ok(scheduled_io)
290    }
291
292    /// Deregisters an I/O resource from the reactor.
293    pub(super) fn deregister_source(
294        &self,
295        registration: &Arc<ScheduledIo>,
296        source: &mut impl Source,
297    ) -> io::Result<()> {
298        // Deregister the source with the OS poller **first**
299        self.registry.deregister(source)?;
300
301        if self
302            .registrations
303            .deregister(&mut self.synced.lock(), registration)
304        {
305            self.unpark();
306        }
307
308        self.metrics.dec_fd_count();
309
310        Ok(())
311    }
312
313    fn release_pending_registrations(&self) {
314        if self.registrations.needs_release() {
315            self.registrations.release(&mut self.synced.lock());
316        }
317    }
318}
319
320impl fmt::Debug for Handle {
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        write!(f, "Handle")
323    }
324}
325
326impl Direction {
327    pub(super) fn mask(self) -> Ready {
328        match self {
329            Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
330            Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
331        }
332    }
333}