tokio/runtime/scheduler/multi_thread/
handle.rs

1use crate::future::Future;
2use crate::loom::sync::Arc;
3use crate::runtime::scheduler::multi_thread::worker;
4use crate::runtime::task::{Notified, Task, TaskHarnessScheduleHooks};
5use crate::runtime::{
6    blocking, driver,
7    task::{self, JoinHandle, SpawnLocation},
8    TaskHooks, TaskMeta, TimerFlavor,
9};
10use crate::util::RngSeedGenerator;
11
12use std::fmt;
13use std::num::NonZeroU64;
14
15mod metrics;
16
17cfg_taskdump! {
18    mod taskdump;
19}
20
21#[cfg(all(tokio_unstable, feature = "time"))]
22use crate::loom::sync::atomic::{AtomicBool, Ordering::SeqCst};
23
24/// Handle to the multi thread scheduler
25pub(crate) struct Handle {
26    /// The name of the runtime
27    pub(super) name: Option<String>,
28
29    /// Task spawner
30    pub(super) shared: worker::Shared,
31
32    /// Resource driver handles
33    pub(crate) driver: driver::Handle,
34
35    /// Blocking pool spawner
36    pub(crate) blocking_spawner: blocking::Spawner,
37
38    /// Current random number generator seed
39    pub(crate) seed_generator: RngSeedGenerator,
40
41    /// User-supplied hooks to invoke for things
42    pub(crate) task_hooks: TaskHooks,
43
44    #[cfg_attr(not(feature = "time"), allow(dead_code))]
45    /// Timer flavor used by the runtime
46    pub(crate) timer_flavor: TimerFlavor,
47
48    #[cfg(all(tokio_unstable, feature = "time"))]
49    /// Indicates that the runtime is shutting down.
50    pub(crate) is_shutdown: AtomicBool,
51}
52
53impl Handle {
54    /// Spawns a future onto the thread pool
55    pub(crate) fn spawn<F>(
56        me: &Arc<Self>,
57        future: F,
58        id: task::Id,
59        spawned_at: SpawnLocation,
60    ) -> JoinHandle<F::Output>
61    where
62        F: crate::future::Future + Send + 'static,
63        F::Output: Send + 'static,
64    {
65        Self::bind_new_task(me, future, id, spawned_at)
66    }
67
68    #[cfg(all(tokio_unstable, feature = "time"))]
69    pub(crate) fn is_shutdown(&self) -> bool {
70        self.is_shutdown
71            .load(crate::loom::sync::atomic::Ordering::SeqCst)
72    }
73
74    pub(crate) fn shutdown(&self) {
75        self.close();
76        #[cfg(all(tokio_unstable, feature = "time"))]
77        self.is_shutdown.store(true, SeqCst);
78    }
79
80    #[track_caller]
81    pub(super) fn bind_new_task<T>(
82        me: &Arc<Self>,
83        future: T,
84        id: task::Id,
85        spawned_at: SpawnLocation,
86    ) -> JoinHandle<T::Output>
87    where
88        T: Future + Send + 'static,
89        T::Output: Send + 'static,
90    {
91        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
92
93        me.task_hooks.spawn(&TaskMeta {
94            id,
95            spawned_at,
96            _phantom: Default::default(),
97        });
98
99        me.schedule_option_task_without_yield(notified);
100
101        handle
102    }
103}
104
105impl task::Schedule for Arc<Handle> {
106    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
107        self.shared.owned.remove(task)
108    }
109
110    fn schedule(&self, task: Notified<Self>) {
111        self.schedule_task(task, false);
112    }
113
114    fn hooks(&self) -> TaskHarnessScheduleHooks {
115        TaskHarnessScheduleHooks {
116            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
117        }
118    }
119
120    fn yield_now(&self, task: Notified<Self>) {
121        self.schedule_task(task, true);
122    }
123}
124
125impl Handle {
126    pub(crate) fn owned_id(&self) -> NonZeroU64 {
127        self.shared.owned.id
128    }
129
130    pub(crate) fn name(&self) -> Option<&str> {
131        self.name.as_deref()
132    }
133}
134
135impl fmt::Debug for Handle {
136    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
137        fmt.debug_struct("multi_thread::Handle { ... }").finish()
138    }
139}