tokio/runtime/scheduler/multi_thread/
handle.rs

1use crate::future::Future;
2use crate::loom::sync::Arc;
3use crate::runtime::scheduler::multi_thread::worker;
4use crate::runtime::task::{Notified, Task, TaskHarnessScheduleHooks};
5use crate::runtime::{
6    blocking, driver,
7    task::{self, JoinHandle, SpawnLocation},
8    TaskHooks, TaskMeta, TimerFlavor,
9};
10use crate::util::RngSeedGenerator;
11
12use std::fmt;
13use std::num::NonZeroU64;
14
15mod metrics;
16
17cfg_taskdump! {
18    mod taskdump;
19}
20
21#[cfg(all(tokio_unstable, feature = "time"))]
22use crate::loom::sync::atomic::{AtomicBool, Ordering::SeqCst};
23
24/// Handle to the multi thread scheduler
25pub(crate) struct Handle {
26    /// Task spawner
27    pub(super) shared: worker::Shared,
28
29    /// Resource driver handles
30    pub(crate) driver: driver::Handle,
31
32    /// Blocking pool spawner
33    pub(crate) blocking_spawner: blocking::Spawner,
34
35    /// Current random number generator seed
36    pub(crate) seed_generator: RngSeedGenerator,
37
38    /// User-supplied hooks to invoke for things
39    pub(crate) task_hooks: TaskHooks,
40
41    #[cfg_attr(not(feature = "time"), allow(dead_code))]
42    /// Timer flavor used by the runtime
43    pub(crate) timer_flavor: TimerFlavor,
44
45    #[cfg(all(tokio_unstable, feature = "time"))]
46    /// Indicates that the runtime is shutting down.
47    pub(crate) is_shutdown: AtomicBool,
48}
49
50impl Handle {
51    /// Spawns a future onto the thread pool
52    pub(crate) fn spawn<F>(
53        me: &Arc<Self>,
54        future: F,
55        id: task::Id,
56        spawned_at: SpawnLocation,
57    ) -> JoinHandle<F::Output>
58    where
59        F: crate::future::Future + Send + 'static,
60        F::Output: Send + 'static,
61    {
62        Self::bind_new_task(me, future, id, spawned_at)
63    }
64
65    #[cfg(all(tokio_unstable, feature = "time"))]
66    pub(crate) fn is_shutdown(&self) -> bool {
67        self.is_shutdown
68            .load(crate::loom::sync::atomic::Ordering::SeqCst)
69    }
70
71    pub(crate) fn shutdown(&self) {
72        self.close();
73        #[cfg(all(tokio_unstable, feature = "time"))]
74        self.is_shutdown.store(true, SeqCst);
75    }
76
77    #[track_caller]
78    pub(super) fn bind_new_task<T>(
79        me: &Arc<Self>,
80        future: T,
81        id: task::Id,
82        spawned_at: SpawnLocation,
83    ) -> JoinHandle<T::Output>
84    where
85        T: Future + Send + 'static,
86        T::Output: Send + 'static,
87    {
88        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
89
90        me.task_hooks.spawn(&TaskMeta {
91            id,
92            spawned_at,
93            _phantom: Default::default(),
94        });
95
96        me.schedule_option_task_without_yield(notified);
97
98        handle
99    }
100}
101
102impl task::Schedule for Arc<Handle> {
103    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
104        self.shared.owned.remove(task)
105    }
106
107    fn schedule(&self, task: Notified<Self>) {
108        self.schedule_task(task, false);
109    }
110
111    fn hooks(&self) -> TaskHarnessScheduleHooks {
112        TaskHarnessScheduleHooks {
113            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
114        }
115    }
116
117    fn yield_now(&self, task: Notified<Self>) {
118        self.schedule_task(task, true);
119    }
120}
121
122impl Handle {
123    pub(crate) fn owned_id(&self) -> NonZeroU64 {
124        self.shared.owned.id
125    }
126}
127
128impl fmt::Debug for Handle {
129    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
130        fmt.debug_struct("multi_thread::Handle { ... }").finish()
131    }
132}