tokio/runtime/
driver.rs

1//! Abstracts out the entire chain of runtime sub-drivers into common types.
2
3// Eventually, this file will see significant refactoring / cleanup. For now, we
4// don't need to worry much about dead code with certain feature permutations.
5#![cfg_attr(
6    any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
7    allow(dead_code)
8)]
9
10use crate::runtime::park::{ParkThread, UnparkThread};
11
12use std::io;
13use std::time::Duration;
14
15#[derive(Debug)]
16pub(crate) struct Driver {
17    inner: TimeDriver,
18}
19
20#[derive(Debug)]
21pub(crate) struct Handle {
22    /// IO driver handle
23    pub(crate) io: IoHandle,
24
25    /// Signal driver handle
26    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
27    pub(crate) signal: SignalHandle,
28
29    /// Time driver handle
30    pub(crate) time: TimeHandle,
31
32    /// Source of `Instant::now()`
33    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
34    pub(crate) clock: Clock,
35}
36
37pub(crate) struct Cfg {
38    pub(crate) enable_io: bool,
39    pub(crate) enable_time: bool,
40    pub(crate) enable_pause_time: bool,
41    pub(crate) start_paused: bool,
42    pub(crate) nevents: usize,
43    pub(crate) timer_flavor: crate::runtime::TimerFlavor,
44}
45
46impl Driver {
47    pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48        let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
49
50        let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
51
52        let (time_driver, time_handle) =
53            create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
54
55        Ok((
56            Self { inner: time_driver },
57            Handle {
58                io: io_handle,
59                signal: signal_handle,
60                time: time_handle,
61                clock,
62            },
63        ))
64    }
65
66    pub(crate) fn park(&mut self, handle: &Handle) {
67        self.inner.park(handle);
68    }
69
70    pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
71        self.inner.park_timeout(handle, duration);
72    }
73
74    pub(crate) fn shutdown(&mut self, handle: &Handle) {
75        self.inner.shutdown(handle);
76    }
77}
78
79impl Handle {
80    pub(crate) fn unpark(&self) {
81        #[cfg(feature = "time")]
82        if let Some(handle) = &self.time {
83            handle.unpark();
84        }
85
86        self.io.unpark();
87    }
88
89    cfg_io_driver! {
90        #[track_caller]
91        pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
92            self.io
93                .as_ref()
94                .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
95        }
96    }
97
98    cfg_signal_internal_and_unix! {
99        #[track_caller]
100        pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle {
101            self.signal
102                .as_ref()
103                .expect("there is no signal driver running, must be called from the context of Tokio runtime")
104        }
105    }
106
107    cfg_time! {
108        /// Returns a reference to the time driver handle.
109        ///
110        /// Panics if no time driver is present.
111        #[track_caller]
112        pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
113            self.time
114                .as_ref()
115                .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
116        }
117
118        #[cfg(tokio_unstable)]
119        pub(crate) fn with_time<F, R>(&self, f: F) -> R
120        where
121            F: FnOnce(Option<&crate::runtime::time::Handle>) -> R,
122        {
123            f(self.time.as_ref())
124        }
125
126        pub(crate) fn clock(&self) -> &Clock {
127            &self.clock
128        }
129    }
130}
131
132// ===== io driver =====
133
134cfg_io_driver! {
135    pub(crate) type IoDriver = crate::runtime::io::Driver;
136
137    #[derive(Debug)]
138    pub(crate) enum IoStack {
139        Enabled(ProcessDriver),
140        Disabled(ParkThread),
141    }
142
143    #[derive(Debug)]
144    pub(crate) enum IoHandle {
145        Enabled(crate::runtime::io::Handle),
146        Disabled(UnparkThread),
147    }
148
149    fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
150        #[cfg(loom)]
151        assert!(!enabled);
152
153        let ret = if enabled {
154            let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
155
156            let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
157            let process_driver = create_process_driver(signal_driver);
158
159            (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
160        } else {
161            let park_thread = ParkThread::new();
162            let unpark_thread = park_thread.unpark();
163            (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
164        };
165
166        Ok(ret)
167    }
168
169    impl IoStack {
170        pub(crate) fn park(&mut self, handle: &Handle) {
171            match self {
172                IoStack::Enabled(v) => v.park(handle),
173                IoStack::Disabled(v) => v.park(),
174            }
175        }
176
177        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
178            match self {
179                IoStack::Enabled(v) => v.park_timeout(handle, duration),
180                IoStack::Disabled(v) => v.park_timeout(duration),
181            }
182        }
183
184        pub(crate) fn shutdown(&mut self, handle: &Handle) {
185            match self {
186                IoStack::Enabled(v) => v.shutdown(handle),
187                IoStack::Disabled(v) => v.shutdown(),
188            }
189        }
190    }
191
192    impl IoHandle {
193        pub(crate) fn unpark(&self) {
194            match self {
195                IoHandle::Enabled(handle) => handle.unpark(),
196                IoHandle::Disabled(handle) => handle.unpark(),
197            }
198        }
199
200        pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
201            match self {
202                IoHandle::Enabled(v) => Some(v),
203                IoHandle::Disabled(..) => None,
204            }
205        }
206    }
207}
208
209cfg_not_io_driver! {
210    pub(crate) type IoHandle = UnparkThread;
211
212    #[derive(Debug)]
213    pub(crate) struct IoStack(ParkThread);
214
215    fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
216        let park_thread = ParkThread::new();
217        let unpark_thread = park_thread.unpark();
218        Ok((IoStack(park_thread), unpark_thread, Default::default()))
219    }
220
221    impl IoStack {
222        pub(crate) fn park(&mut self, _handle: &Handle) {
223            self.0.park();
224        }
225
226        pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
227            self.0.park_timeout(duration);
228        }
229
230        pub(crate) fn shutdown(&mut self, _handle: &Handle) {
231            self.0.shutdown();
232        }
233
234        /// This is not a "real" driver, so it is not considered enabled.
235        pub(crate) fn is_enabled(&self) -> bool {
236            false
237        }
238    }
239}
240
241// ===== signal driver =====
242
243cfg_signal_internal_and_unix! {
244    type SignalDriver = crate::runtime::signal::Driver;
245    pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;
246
247    fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
248        let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
249        let handle = driver.handle();
250        Ok((driver, Some(handle)))
251    }
252}
253
254cfg_not_signal_internal! {
255    pub(crate) type SignalHandle = ();
256
257    cfg_io_driver! {
258        type SignalDriver = IoDriver;
259
260        fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
261            Ok((io_driver, ()))
262        }
263    }
264}
265
266// ===== process driver =====
267
268cfg_process_driver! {
269    type ProcessDriver = crate::runtime::process::Driver;
270
271    fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
272        ProcessDriver::new(signal_driver)
273    }
274}
275
276cfg_not_process_driver! {
277    cfg_io_driver! {
278        type ProcessDriver = SignalDriver;
279
280        fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
281            signal_driver
282        }
283    }
284}
285
286// ===== time driver =====
287
288cfg_time! {
289    #[derive(Debug)]
290    pub(crate) enum TimeDriver {
291        Enabled {
292            driver: crate::runtime::time::Driver,
293        },
294        EnabledAlt(IoStack),
295        Disabled(IoStack),
296    }
297
298    pub(crate) type Clock = crate::time::Clock;
299    pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
300
301    fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
302        crate::time::Clock::new(enable_pausing, start_paused)
303    }
304
305    fn create_time_driver(
306        enable: bool,
307        timer_flavor: crate::runtime::TimerFlavor,
308        io_stack: IoStack,
309        clock: &Clock,
310    ) -> (TimeDriver, TimeHandle) {
311        if enable {
312            match timer_flavor {
313                crate::runtime::TimerFlavor::Traditional => {
314                    let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
315                    (TimeDriver::Enabled { driver }, Some(handle))
316                }
317                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
318                crate::runtime::TimerFlavor::Alternative => {
319                    (TimeDriver::EnabledAlt(io_stack), Some(crate::runtime::time::Driver::new_alt(clock)))
320                }
321            }
322        } else {
323            (TimeDriver::Disabled(io_stack), None)
324        }
325    }
326
327    impl TimeDriver {
328        pub(crate) fn park(&mut self, handle: &Handle) {
329            match self {
330                TimeDriver::Enabled { driver, .. } => driver.park(handle),
331                TimeDriver::EnabledAlt(v) => v.park(handle),
332                TimeDriver::Disabled(v) => v.park(handle),
333            }
334        }
335
336        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
337            match self {
338                TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
339                TimeDriver::EnabledAlt(v) => v.park_timeout(handle, duration),
340                TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
341            }
342        }
343
344        pub(crate) fn shutdown(&mut self, handle: &Handle) {
345            match self {
346                TimeDriver::Enabled { driver } => driver.shutdown(handle),
347                TimeDriver::EnabledAlt(v) => v.shutdown(handle),
348                TimeDriver::Disabled(v) => v.shutdown(handle),
349            }
350        }
351    }
352}
353
354cfg_not_time! {
355    type TimeDriver = IoStack;
356
357    pub(crate) type Clock = ();
358    pub(crate) type TimeHandle = ();
359
360    fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
361        ()
362    }
363
364    fn create_time_driver(
365        _enable: bool,
366        _timer_flavor: crate::runtime::TimerFlavor,
367        io_stack: IoStack,
368        _clock: &Clock,
369    ) -> (TimeDriver, TimeHandle) {
370        (io_stack, ())
371    }
372}
373
374cfg_io_uring! {
375    pub(crate) mod op;
376}