tokio/runtime/scheduler/
mod.rs

1cfg_rt! {
2    pub(crate) mod current_thread;
3    pub(crate) use current_thread::CurrentThread;
4
5    mod defer;
6    use defer::Defer;
7
8    pub(crate) mod inject;
9    pub(crate) use inject::Inject;
10
11    use crate::runtime::TaskHooks;
12
13    use crate::runtime::WorkerMetrics;
14}
15
16cfg_rt_multi_thread! {
17    mod block_in_place;
18    pub(crate) use block_in_place::block_in_place;
19
20    mod lock;
21    use lock::Lock;
22
23    pub(crate) mod multi_thread;
24    pub(crate) use multi_thread::MultiThread;
25}
26
27pub(super) mod util;
28
29use crate::runtime::driver;
30
31#[derive(Debug, Clone)]
32pub(crate) enum Handle {
33    #[cfg(feature = "rt")]
34    CurrentThread(Arc<current_thread::Handle>),
35
36    #[cfg(feature = "rt-multi-thread")]
37    MultiThread(Arc<multi_thread::Handle>),
38
39    // TODO: This is to avoid triggering "dead code" warnings many other places
40    // in the codebase. Remove this during a later cleanup
41    #[cfg(not(feature = "rt"))]
42    #[allow(dead_code)]
43    Disabled,
44}
45
46#[cfg(feature = "rt")]
47pub(super) enum Context {
48    CurrentThread(current_thread::Context),
49
50    #[cfg(feature = "rt-multi-thread")]
51    MultiThread(multi_thread::Context),
52}
53
54impl Handle {
55    #[cfg_attr(not(feature = "full"), allow(dead_code))]
56    pub(crate) fn driver(&self) -> &driver::Handle {
57        match *self {
58            #[cfg(feature = "rt")]
59            Handle::CurrentThread(ref h) => &h.driver,
60
61            #[cfg(feature = "rt-multi-thread")]
62            Handle::MultiThread(ref h) => &h.driver,
63
64            #[cfg(not(feature = "rt"))]
65            Handle::Disabled => unreachable!(),
66        }
67    }
68}
69
70cfg_rt! {
71    use crate::future::Future;
72    use crate::loom::sync::Arc;
73    use crate::runtime::{blocking, task::{Id, SpawnLocation}};
74    use crate::runtime::context;
75    use crate::task::JoinHandle;
76    use crate::util::RngSeedGenerator;
77    use std::task::Waker;
78
79    macro_rules! match_flavor {
80        ($self:expr, $ty:ident($h:ident) => $e:expr) => {
81            match $self {
82                $ty::CurrentThread($h) => $e,
83
84                #[cfg(feature = "rt-multi-thread")]
85                $ty::MultiThread($h) => $e,
86            }
87        }
88    }
89
90    impl Handle {
91        #[track_caller]
92        pub(crate) fn current() -> Handle {
93            match context::with_current(Clone::clone) {
94                Ok(handle) => handle,
95                Err(e) => panic!("{}", e),
96            }
97        }
98
99        pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
100            match_flavor!(self, Handle(h) => &h.blocking_spawner)
101        }
102
103        pub(crate) fn is_local(&self) -> bool {
104            match self {
105                Handle::CurrentThread(h) => h.local_tid.is_some(),
106
107                #[cfg(feature = "rt-multi-thread")]
108                Handle::MultiThread(_) => false,
109            }
110        }
111
112        #[cfg(feature = "time")]
113        pub(crate) fn timer_flavor(&self) -> crate::runtime::TimerFlavor {
114            match self {
115                Handle::CurrentThread(_) => crate::runtime::TimerFlavor::Traditional,
116
117                #[cfg(feature = "rt-multi-thread")]
118                Handle::MultiThread(h) => h.timer_flavor,
119            }
120        }
121
122        #[cfg(all(tokio_unstable, feature = "rt-multi-thread", feature = "time"))]
123        /// Returns true if both handles belong to the same runtime instance.
124        pub(crate) fn is_same_runtime(&self, other: &Handle) -> bool {
125            match (self, other) {
126                (Handle::CurrentThread(a), Handle::CurrentThread(b)) => Arc::ptr_eq(a, b),
127                #[cfg(feature = "rt-multi-thread")]
128                (Handle::MultiThread(a), Handle::MultiThread(b)) => Arc::ptr_eq(a, b),
129                #[cfg(feature = "rt-multi-thread")]
130                _ => false, // different runtime types
131            }
132        }
133
134        #[cfg(all(tokio_unstable, feature = "rt-multi-thread", feature = "time"))]
135        /// Returns true if the runtime is shutting down.
136        pub(crate) fn is_shutdown(&self) -> bool {
137            match self {
138                Handle::CurrentThread(_) => panic!("the alternative timer implementation is not supported on CurrentThread runtime"),
139                Handle::MultiThread(h) => h.is_shutdown(),
140            }
141        }
142
143        #[cfg(all(tokio_unstable, feature = "rt-multi-thread", feature = "time"))]
144        /// Push a timer entry that was created outside of this runtime
145        /// into the runtime-global queue. The pushed timer will be
146        /// processed by a random worker thread.
147        pub(crate) fn push_remote_timer(&self, entry_hdl: crate::runtime::time_alt::EntryHandle) {
148            match self {
149                Handle::CurrentThread(_) => panic!("the alternative timer implementation is not supported on CurrentThread runtime"),
150                Handle::MultiThread(h) => h.push_remote_timer(entry_hdl),
151            }
152        }
153
154        /// Returns true if this is a local runtime and the runtime is owned by the current thread.
155        pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
156            match self {
157                Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
158
159                #[cfg(feature = "rt-multi-thread")]
160                Handle::MultiThread(_) => false,
161            }
162        }
163
164        pub(crate) fn spawn<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
165        where
166            F: Future + Send + 'static,
167            F::Output: Send + 'static,
168        {
169            match self {
170                Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id, spawned_at),
171
172                #[cfg(feature = "rt-multi-thread")]
173                Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id, spawned_at),
174            }
175        }
176
177        /// Spawn a local task
178        ///
179        /// # Safety
180        ///
181        /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
182        /// by the current thread.
183        #[allow(irrefutable_let_patterns)]
184        #[track_caller]
185        pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
186        where
187            F: Future + 'static,
188            F::Output: 'static,
189        {
190            if let Handle::CurrentThread(h) = self {
191                // Safety: caller guarantees that this is a `LocalRuntime`.
192                unsafe { current_thread::Handle::spawn_local(h, future, id, spawned_at) }
193            } else {
194                panic!("Only current_thread and LocalSet have spawn_local internals implemented")
195            }
196        }
197
198        pub(crate) fn shutdown(&self) {
199            match *self {
200                Handle::CurrentThread(_) => {},
201
202                #[cfg(feature = "rt-multi-thread")]
203                Handle::MultiThread(ref h) => h.shutdown(),
204            }
205        }
206
207        pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
208            match_flavor!(self, Handle(h) => &h.seed_generator)
209        }
210
211        pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
212            match self {
213                Handle::CurrentThread(handle) => handle,
214                #[cfg(feature = "rt-multi-thread")]
215                _ => panic!("not a CurrentThread handle"),
216            }
217        }
218
219        pub(crate) fn hooks(&self) -> &TaskHooks {
220            match self {
221                Handle::CurrentThread(h) => &h.task_hooks,
222                #[cfg(feature = "rt-multi-thread")]
223                Handle::MultiThread(h) => &h.task_hooks,
224            }
225        }
226    }
227
228    impl Handle {
229        pub(crate) fn num_workers(&self) -> usize {
230            match self {
231                Handle::CurrentThread(_) => 1,
232                #[cfg(feature = "rt-multi-thread")]
233                Handle::MultiThread(handle) => handle.num_workers(),
234            }
235        }
236
237        pub(crate) fn num_alive_tasks(&self) -> usize {
238            match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
239        }
240
241        pub(crate) fn injection_queue_depth(&self) -> usize {
242            match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
243        }
244
245        pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
246            match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
247        }
248    }
249
250    cfg_unstable_metrics! {
251        use crate::runtime::SchedulerMetrics;
252
253        impl Handle {
254            cfg_64bit_metrics! {
255                pub(crate) fn spawned_tasks_count(&self) -> u64 {
256                    match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
257                }
258            }
259
260            pub(crate) fn num_blocking_threads(&self) -> usize {
261                match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
262            }
263
264            pub(crate) fn num_idle_blocking_threads(&self) -> usize {
265                match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
266            }
267
268            pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
269                match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
270            }
271
272            pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
273                match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
274            }
275
276            pub(crate) fn blocking_queue_depth(&self) -> usize {
277                match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
278            }
279        }
280    }
281
282    impl Context {
283        #[track_caller]
284        pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
285            match self {
286                Context::CurrentThread(context) => context,
287                #[cfg(feature = "rt-multi-thread")]
288                _ => panic!("expected `CurrentThread::Context`")
289            }
290        }
291
292        pub(crate) fn defer(&self, waker: &Waker) {
293            match_flavor!(self, Context(context) => context.defer(waker));
294        }
295
296        #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
297        pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
298        where
299            F: FnOnce(Option<crate::runtime::time_alt::TempLocalContext<'_>>) -> R,
300        {
301            match self {
302                Context::CurrentThread(_) => panic!("the alternative timer implementation is not supported on CurrentThread runtime"),
303                Context::MultiThread(context) => context.with_time_temp_local_context(f),
304            }
305        }
306
307        cfg_rt_multi_thread! {
308            #[track_caller]
309            pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
310                match self {
311                    Context::MultiThread(context) => context,
312                    _ => panic!("expected `MultiThread::Context`")
313                }
314            }
315        }
316    }
317}
318
319cfg_not_rt! {
320    #[cfg(any(
321        feature = "net",
322        all(unix, feature = "process"),
323        all(unix, feature = "signal"),
324        feature = "time",
325    ))]
326    impl Handle {
327        #[track_caller]
328        pub(crate) fn current() -> Handle {
329            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
330        }
331
332        #[cfg_attr(not(feature = "time"), allow(dead_code))]
333        #[track_caller]
334        pub(crate) fn timer_flavor(&self) -> crate::runtime::TimerFlavor {
335            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
336        }
337    }
338}