tokio/runtime/scheduler/current_thread/
mod.rs

1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6    self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9    blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25/// Executes tasks on the current thread
26pub(crate) struct CurrentThread {
27    /// Core scheduler data is acquired by a thread entering `block_on`.
28    core: AtomicCell<Core>,
29
30    /// Notifier for waking up other threads to steal the
31    /// driver.
32    notify: Notify,
33}
34
35/// Handle to the current thread scheduler
36pub(crate) struct Handle {
37    /// The name of the runtime
38    name: Option<String>,
39
40    /// Scheduler state shared across threads
41    shared: Shared,
42
43    /// Resource driver handles
44    pub(crate) driver: driver::Handle,
45
46    /// Blocking pool spawner
47    pub(crate) blocking_spawner: blocking::Spawner,
48
49    /// Current random number generator seed
50    pub(crate) seed_generator: RngSeedGenerator,
51
52    /// User-supplied hooks to invoke for things
53    pub(crate) task_hooks: TaskHooks,
54
55    /// If this is a `LocalRuntime`, flags the owning thread ID.
56    pub(crate) local_tid: Option<ThreadId>,
57}
58
59/// Data required for executing the scheduler. The struct is passed around to
60/// a function that will perform the scheduling work and acts as a capability token.
61struct Core {
62    /// Scheduler run queue
63    tasks: VecDeque<Notified>,
64
65    /// Current tick
66    tick: u32,
67
68    /// Runtime driver
69    ///
70    /// The driver is removed before starting to park the thread
71    driver: Option<Driver>,
72
73    /// Metrics batch
74    metrics: MetricsBatch,
75
76    /// How often to check the global queue
77    global_queue_interval: u32,
78
79    /// True if a task panicked without being handled and the runtime is
80    /// configured to shutdown on unhandled panic.
81    unhandled_panic: bool,
82}
83
84/// Scheduler state shared between threads.
85struct Shared {
86    /// Remote run queue
87    inject: Inject<Arc<Handle>>,
88
89    /// Collection of all active tasks spawned onto this executor.
90    owned: OwnedTasks<Arc<Handle>>,
91
92    /// Indicates whether the blocked on thread was woken.
93    woken: AtomicBool,
94
95    /// Scheduler configuration options
96    config: Config,
97
98    /// Keeps track of various runtime metrics.
99    scheduler_metrics: SchedulerMetrics,
100
101    /// This scheduler only has one worker.
102    worker_metrics: WorkerMetrics,
103}
104
105/// Thread-local context.
106///
107/// pub(crate) to store in `runtime::context`.
108pub(crate) struct Context {
109    /// Scheduler handle
110    handle: Arc<Handle>,
111
112    /// Scheduler core, enabling the holder of `Context` to execute the
113    /// scheduler.
114    core: RefCell<Option<Box<Core>>>,
115
116    /// Deferred tasks, usually ones that called `task::yield_now()`.
117    pub(crate) defer: Defer,
118}
119
120type Notified = task::Notified<Arc<Handle>>;
121
122/// Initial queue capacity.
123const INITIAL_CAPACITY: usize = 64;
124
125/// Used if none is specified. This is a temporary constant and will be removed
126/// as we unify tuning logic between the multi-thread and current-thread
127/// schedulers.
128const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
129
130impl CurrentThread {
131    pub(crate) fn new(
132        driver: Driver,
133        driver_handle: driver::Handle,
134        blocking_spawner: blocking::Spawner,
135        seed_generator: RngSeedGenerator,
136        config: Config,
137        local_tid: Option<ThreadId>,
138        name: Option<String>,
139    ) -> (CurrentThread, Arc<Handle>) {
140        let worker_metrics = WorkerMetrics::from_config(&config);
141        worker_metrics.set_thread_id(thread::current().id());
142
143        // Get the configured global queue interval, or use the default.
144        let global_queue_interval = config
145            .global_queue_interval
146            .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
147
148        let handle = Arc::new(Handle {
149            name,
150            task_hooks: TaskHooks {
151                task_spawn_callback: config.before_spawn.clone(),
152                task_terminate_callback: config.after_termination.clone(),
153                #[cfg(tokio_unstable)]
154                before_poll_callback: config.before_poll.clone(),
155                #[cfg(tokio_unstable)]
156                after_poll_callback: config.after_poll.clone(),
157            },
158            shared: Shared {
159                inject: Inject::new(),
160                owned: OwnedTasks::new(1),
161                woken: AtomicBool::new(false),
162                config,
163                scheduler_metrics: SchedulerMetrics::new(),
164                worker_metrics,
165            },
166            driver: driver_handle,
167            blocking_spawner,
168            seed_generator,
169            local_tid,
170        });
171
172        let core = AtomicCell::new(Some(Box::new(Core {
173            tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
174            tick: 0,
175            driver: Some(driver),
176            metrics: MetricsBatch::new(&handle.shared.worker_metrics),
177            global_queue_interval,
178            unhandled_panic: false,
179        })));
180
181        let scheduler = CurrentThread {
182            core,
183            notify: Notify::new(),
184        };
185
186        (scheduler, handle)
187    }
188
189    #[track_caller]
190    pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
191        pin!(future);
192
193        crate::runtime::context::enter_runtime(handle, false, |blocking| {
194            let handle = handle.as_current_thread();
195
196            // Attempt to steal the scheduler core and block_on the future if we can
197            // there, otherwise, lets select on a notification that the core is
198            // available or the future is complete.
199            loop {
200                if let Some(core) = self.take_core(handle) {
201                    handle
202                        .shared
203                        .worker_metrics
204                        .set_thread_id(thread::current().id());
205                    return core.block_on(future);
206                } else {
207                    let notified = self.notify.notified();
208                    pin!(notified);
209
210                    if let Some(out) = blocking
211                        .block_on(poll_fn(|cx| {
212                            if notified.as_mut().poll(cx).is_ready() {
213                                return Ready(None);
214                            }
215
216                            if let Ready(out) = future.as_mut().poll(cx) {
217                                return Ready(Some(out));
218                            }
219
220                            Pending
221                        }))
222                        .expect("Failed to `Enter::block_on`")
223                    {
224                        return out;
225                    }
226                }
227            }
228        })
229    }
230
231    fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
232        let core = self.core.take()?;
233
234        Some(CoreGuard {
235            context: scheduler::Context::CurrentThread(Context {
236                handle: handle.clone(),
237                core: RefCell::new(Some(core)),
238                defer: Defer::new(),
239            }),
240            scheduler: self,
241        })
242    }
243
244    pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
245        let handle = handle.as_current_thread();
246
247        // Avoid a double panic if we are currently panicking and
248        // the lock may be poisoned.
249
250        let core = match self.take_core(handle) {
251            Some(core) => core,
252            None if std::thread::panicking() => return,
253            None => panic!("Oh no! We never placed the Core back, this is a bug!"),
254        };
255
256        // Check that the thread-local is not being destroyed
257        let tls_available = context::with_current(|_| ()).is_ok();
258
259        if tls_available {
260            core.enter(|core, _context| {
261                let core = shutdown2(core, handle);
262                (core, ())
263            });
264        } else {
265            // Shutdown without setting the context. `tokio::spawn` calls will
266            // fail, but those will fail either way because the thread-local is
267            // not available anymore.
268            let context = core.context.expect_current_thread();
269            let core = context.core.borrow_mut().take().unwrap();
270
271            let core = shutdown2(core, handle);
272            *context.core.borrow_mut() = Some(core);
273        }
274    }
275}
276
277fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
278    // Drain the OwnedTasks collection. This call also closes the
279    // collection, ensuring that no tasks are ever pushed after this
280    // call returns.
281    handle.shared.owned.close_and_shutdown_all(0);
282
283    // Drain local queue
284    // We already shut down every task, so we just need to drop the task.
285    while let Some(task) = core.next_local_task(handle) {
286        drop(task);
287    }
288
289    // Close the injection queue
290    handle.shared.inject.close();
291
292    // Drain remote queue
293    while let Some(task) = handle.shared.inject.pop() {
294        drop(task);
295    }
296
297    assert!(handle.shared.owned.is_empty());
298
299    // Submit metrics
300    core.submit_metrics(handle);
301
302    // Shutdown the resource drivers
303    if let Some(driver) = core.driver.as_mut() {
304        driver.shutdown(&handle.driver);
305    }
306
307    core
308}
309
310impl fmt::Debug for CurrentThread {
311    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
312        fmt.debug_struct("CurrentThread").finish()
313    }
314}
315
316// ===== impl Core =====
317
318impl Core {
319    /// Get and increment the current tick
320    fn tick(&mut self) {
321        self.tick = self.tick.wrapping_add(1);
322    }
323
324    fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
325        if self.tick % self.global_queue_interval == 0 {
326            handle
327                .next_remote_task()
328                .or_else(|| self.next_local_task(handle))
329        } else {
330            self.next_local_task(handle)
331                .or_else(|| handle.next_remote_task())
332        }
333    }
334
335    fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
336        let ret = self.tasks.pop_front();
337        handle
338            .shared
339            .worker_metrics
340            .set_queue_depth(self.tasks.len());
341        ret
342    }
343
344    fn push_task(&mut self, handle: &Handle, task: Notified) {
345        self.tasks.push_back(task);
346        self.metrics.inc_local_schedule_count();
347        handle
348            .shared
349            .worker_metrics
350            .set_queue_depth(self.tasks.len());
351    }
352
353    fn submit_metrics(&mut self, handle: &Handle) {
354        self.metrics.submit(&handle.shared.worker_metrics, 0);
355    }
356}
357
358#[cfg(feature = "taskdump")]
359fn wake_deferred_tasks_and_free(context: &Context) {
360    let wakers = context.defer.take_deferred();
361    for waker in wakers {
362        waker.wake();
363    }
364}
365
366// ===== impl Context =====
367
368impl Context {
369    /// Execute the closure with the given scheduler core stored in the
370    /// thread-local context.
371    fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
372        core.metrics.start_poll();
373        let mut ret = self.enter(core, || crate::task::coop::budget(f));
374        ret.0.metrics.end_poll();
375        ret
376    }
377
378    /// Blocks the current thread until an event is received by the driver,
379    /// including I/O events, timer events, ...
380    fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
381        let mut driver = core.driver.take().expect("driver missing");
382
383        if let Some(f) = &handle.shared.config.before_park {
384            let (c, ()) = self.enter(core, || f());
385            core = c;
386        }
387
388        // If `before_park` spawns a task (or otherwise schedules work for us), then we should not
389        // park the thread.
390        if !self.has_pending_work(&core) {
391            // Park until the thread is signaled
392            core.metrics.about_to_park();
393            core.submit_metrics(handle);
394
395            core = self.park_internal(core, handle, &mut driver, None);
396
397            core.metrics.unparked();
398            core.submit_metrics(handle);
399        }
400
401        if let Some(f) = &handle.shared.config.after_unpark {
402            let (c, ()) = self.enter(core, || f());
403            core = c;
404        }
405
406        core.driver = Some(driver);
407        core
408    }
409
410    /// Checks the driver for new events without blocking the thread.
411    fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412        let mut driver = core.driver.take().expect("driver missing");
413
414        core.submit_metrics(handle);
415
416        core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
417
418        core.driver = Some(driver);
419        core
420    }
421
422    fn has_pending_work(&self, core: &Core) -> bool {
423        !core.tasks.is_empty() || !self.defer.is_empty() || self.handle.shared.woken.load(Acquire)
424    }
425
426    fn park_internal(
427        &self,
428        core: Box<Core>,
429        handle: &Handle,
430        driver: &mut Driver,
431        duration: Option<Duration>,
432    ) -> Box<Core> {
433        let (core, ()) = self.enter(core, || {
434            match duration {
435                Some(dur) => driver.park_timeout(&handle.driver, dur),
436                None => driver.park(&handle.driver),
437            }
438            self.defer.wake();
439        });
440
441        core
442    }
443
444    fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
445        // Store the scheduler core in the thread-local context
446        //
447        // A drop-guard is employed at a higher level.
448        *self.core.borrow_mut() = Some(core);
449
450        // Execute the closure while tracking the execution budget
451        let ret = f();
452
453        // Take the scheduler core back
454        let core = self.core.borrow_mut().take().expect("core missing");
455        (core, ret)
456    }
457
458    pub(crate) fn defer(&self, waker: &Waker) {
459        self.defer.defer(waker);
460    }
461}
462
463// ===== impl Handle =====
464
465impl Handle {
466    /// Spawns a future onto the `CurrentThread` scheduler
467    #[track_caller]
468    pub(crate) fn spawn<F>(
469        me: &Arc<Self>,
470        future: F,
471        id: crate::runtime::task::Id,
472        spawned_at: SpawnLocation,
473    ) -> JoinHandle<F::Output>
474    where
475        F: crate::future::Future + Send + 'static,
476        F::Output: Send + 'static,
477    {
478        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
479
480        me.task_hooks.spawn(&TaskMeta {
481            id,
482            spawned_at,
483            _phantom: Default::default(),
484        });
485
486        if let Some(notified) = notified {
487            me.schedule(notified);
488        }
489
490        handle
491    }
492
493    /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
494    ///
495    /// # Safety
496    ///
497    /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
498    /// provably cannot be driven from or moved to different threads from the one on which the task
499    /// is spawned.
500    #[track_caller]
501    pub(crate) unsafe fn spawn_local<F>(
502        me: &Arc<Self>,
503        future: F,
504        id: crate::runtime::task::Id,
505        spawned_at: SpawnLocation,
506    ) -> JoinHandle<F::Output>
507    where
508        F: crate::future::Future + 'static,
509        F::Output: 'static,
510    {
511        // Safety: the caller guarantees that this is only called on a `LocalRuntime`.
512        let (handle, notified) = unsafe {
513            me.shared
514                .owned
515                .bind_local(future, me.clone(), id, spawned_at)
516        };
517
518        me.task_hooks.spawn(&TaskMeta {
519            id,
520            spawned_at,
521            _phantom: Default::default(),
522        });
523
524        if let Some(notified) = notified {
525            me.schedule(notified);
526        }
527
528        handle
529    }
530
531    /// Capture a snapshot of this runtime's state.
532    #[cfg(all(
533        tokio_unstable,
534        feature = "taskdump",
535        target_os = "linux",
536        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
537    ))]
538    pub(crate) fn dump(&self) -> crate::runtime::Dump {
539        use crate::runtime::dump;
540        use task::trace::trace_current_thread;
541
542        let mut traces = vec![];
543
544        // todo: how to make this work outside of a runtime context?
545        context::with_scheduler(|maybe_context| {
546            // drain the local queue
547            let context = if let Some(context) = maybe_context {
548                context.expect_current_thread()
549            } else {
550                return;
551            };
552            let mut maybe_core = context.core.borrow_mut();
553            let core = if let Some(core) = maybe_core.as_mut() {
554                core
555            } else {
556                return;
557            };
558            let local = &mut core.tasks;
559
560            if self.shared.inject.is_closed() {
561                return;
562            }
563
564            traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
565                .into_iter()
566                .map(|(id, trace)| dump::Task::new(id, trace))
567                .collect();
568
569            // Avoid double borrow panic
570            drop(maybe_core);
571
572            // Taking a taskdump could wakes every task, but we probably don't want
573            // the `yield_now` vector to be that large under normal circumstances.
574            // Therefore, we free its allocation.
575            wake_deferred_tasks_and_free(context);
576        });
577
578        dump::Dump::new(traces)
579    }
580
581    fn next_remote_task(&self) -> Option<Notified> {
582        self.shared.inject.pop()
583    }
584
585    fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
586        // Set woken to true when enter block_on, ensure outer future
587        // be polled for the first time when enter loop
588        me.shared.woken.store(true, Release);
589        waker_ref(me)
590    }
591
592    // reset woken to false and return original value
593    pub(crate) fn reset_woken(&self) -> bool {
594        self.shared.woken.swap(false, AcqRel)
595    }
596
597    pub(crate) fn num_alive_tasks(&self) -> usize {
598        self.shared.owned.num_alive_tasks()
599    }
600
601    pub(crate) fn injection_queue_depth(&self) -> usize {
602        self.shared.inject.len()
603    }
604
605    pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
606        assert_eq!(0, worker);
607        &self.shared.worker_metrics
608    }
609}
610
611cfg_unstable_metrics! {
612    impl Handle {
613        pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
614            &self.shared.scheduler_metrics
615        }
616
617        pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
618            self.worker_metrics(worker).queue_depth()
619        }
620
621        pub(crate) fn num_blocking_threads(&self) -> usize {
622            self.blocking_spawner.num_threads()
623        }
624
625        pub(crate) fn num_idle_blocking_threads(&self) -> usize {
626            self.blocking_spawner.num_idle_threads()
627        }
628
629        pub(crate) fn blocking_queue_depth(&self) -> usize {
630            self.blocking_spawner.queue_depth()
631        }
632
633        cfg_64bit_metrics! {
634            pub(crate) fn spawned_tasks_count(&self) -> u64 {
635                self.shared.owned.spawned_tasks_count()
636            }
637        }
638    }
639}
640
641use std::num::NonZeroU64;
642
643impl Handle {
644    pub(crate) fn owned_id(&self) -> NonZeroU64 {
645        self.shared.owned.id
646    }
647
648    pub(crate) fn name(&self) -> Option<&str> {
649        self.name.as_deref()
650    }
651}
652
653impl fmt::Debug for Handle {
654    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
655        fmt.debug_struct("current_thread::Handle { ... }").finish()
656    }
657}
658
659// ===== impl Shared =====
660
661impl Schedule for Arc<Handle> {
662    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
663        self.shared.owned.remove(task)
664    }
665
666    fn schedule(&self, task: task::Notified<Self>) {
667        use scheduler::Context::CurrentThread;
668
669        context::with_scheduler(|maybe_cx| match maybe_cx {
670            Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
671                let mut core = cx.core.borrow_mut();
672
673                // If `None`, the runtime is shutting down, so there is no need
674                // to schedule the task.
675                if let Some(core) = core.as_mut() {
676                    core.push_task(self, task);
677                }
678            }
679            _ => {
680                // Track that a task was scheduled from **outside** of the runtime.
681                self.shared.scheduler_metrics.inc_remote_schedule_count();
682
683                // Schedule the task
684                self.shared.inject.push(task);
685                self.driver.unpark();
686            }
687        });
688    }
689
690    fn hooks(&self) -> TaskHarnessScheduleHooks {
691        TaskHarnessScheduleHooks {
692            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
693        }
694    }
695
696    cfg_unstable! {
697        fn unhandled_panic(&self) {
698            use crate::runtime::UnhandledPanic;
699
700            match self.shared.config.unhandled_panic {
701                UnhandledPanic::Ignore => {
702                    // Do nothing
703                }
704                UnhandledPanic::ShutdownRuntime => {
705                    use scheduler::Context::CurrentThread;
706
707                    // This hook is only called from within the runtime, so
708                    // `context::with_scheduler` should match with `&self`, i.e.
709                    // there is no opportunity for a nested scheduler to be
710                    // called.
711                    context::with_scheduler(|maybe_cx| match maybe_cx {
712                        Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
713                            let mut core = cx.core.borrow_mut();
714
715                            // If `None`, the runtime is shutting down, so there is no need to signal shutdown
716                            if let Some(core) = core.as_mut() {
717                                core.unhandled_panic = true;
718                                self.shared.owned.close_and_shutdown_all(0);
719                            }
720                        }
721                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
722                    })
723                }
724            }
725        }
726    }
727}
728
729impl Wake for Handle {
730    fn wake(arc_self: Arc<Self>) {
731        Wake::wake_by_ref(&arc_self);
732    }
733
734    /// Wake by reference
735    fn wake_by_ref(arc_self: &Arc<Self>) {
736        let already_woken = arc_self.shared.woken.swap(true, Release);
737
738        if !already_woken {
739            use scheduler::Context::CurrentThread;
740
741            // If we are already running on the runtime, then it's not required to wake up the
742            // runtime.
743            context::with_scheduler(|maybe_cx| match maybe_cx {
744                Some(CurrentThread(cx)) if Arc::ptr_eq(arc_self, &cx.handle) => {}
745                _ => {
746                    arc_self.driver.unpark();
747                }
748            });
749        }
750    }
751}
752
753// ===== CoreGuard =====
754
755/// Used to ensure we always place the `Core` value back into its slot in
756/// `CurrentThread`, even if the future panics.
757struct CoreGuard<'a> {
758    context: scheduler::Context,
759    scheduler: &'a CurrentThread,
760}
761
762impl CoreGuard<'_> {
763    #[track_caller]
764    fn block_on<F: Future>(self, future: F) -> F::Output {
765        let ret = self.enter(|mut core, context| {
766            let waker = Handle::waker_ref(&context.handle);
767            let mut cx = std::task::Context::from_waker(&waker);
768
769            pin!(future);
770
771            core.metrics.start_processing_scheduled_tasks();
772
773            'outer: loop {
774                let handle = &context.handle;
775
776                if handle.reset_woken() {
777                    let (c, res) = context.enter(core, || {
778                        crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
779                    });
780
781                    core = c;
782
783                    if let Ready(v) = res {
784                        return (core, Some(v));
785                    }
786                }
787
788                for _ in 0..handle.shared.config.event_interval {
789                    // Make sure we didn't hit an unhandled_panic
790                    if core.unhandled_panic {
791                        return (core, None);
792                    }
793
794                    core.tick();
795
796                    let entry = core.next_task(handle);
797
798                    let task = match entry {
799                        Some(entry) => entry,
800                        None => {
801                            core.metrics.end_processing_scheduled_tasks();
802
803                            core = if context.has_pending_work(&core) {
804                                context.park_yield(core, handle)
805                            } else {
806                                context.park(core, handle)
807                            };
808
809                            core.metrics.start_processing_scheduled_tasks();
810
811                            // Try polling the `block_on` future next
812                            continue 'outer;
813                        }
814                    };
815
816                    let task = context.handle.shared.owned.assert_owner(task);
817
818                    #[cfg(tokio_unstable)]
819                    let task_meta = task.task_meta();
820
821                    let (c, ()) = context.run_task(core, || {
822                        #[cfg(tokio_unstable)]
823                        context.handle.task_hooks.poll_start_callback(&task_meta);
824
825                        task.run();
826
827                        #[cfg(tokio_unstable)]
828                        context.handle.task_hooks.poll_stop_callback(&task_meta);
829                    });
830
831                    core = c;
832                }
833
834                core.metrics.end_processing_scheduled_tasks();
835
836                // Yield to the driver, this drives the timer and pulls any
837                // pending I/O events.
838                core = context.park_yield(core, handle);
839
840                core.metrics.start_processing_scheduled_tasks();
841            }
842        });
843
844        match ret {
845            Some(ret) => ret,
846            None => {
847                // `block_on` panicked.
848                panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
849            }
850        }
851    }
852
853    /// Enters the scheduler context. This sets the queue and other necessary
854    /// scheduler state in the thread-local.
855    fn enter<F, R>(self, f: F) -> R
856    where
857        F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
858    {
859        let context = self.context.expect_current_thread();
860
861        // Remove `core` from `context` to pass into the closure.
862        let core = context.core.borrow_mut().take().expect("core missing");
863
864        // Call the closure and place `core` back
865        let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
866
867        *context.core.borrow_mut() = Some(core);
868
869        ret
870    }
871}
872
873impl Drop for CoreGuard<'_> {
874    fn drop(&mut self) {
875        let context = self.context.expect_current_thread();
876
877        if let Some(core) = context.core.borrow_mut().take() {
878            // Replace old scheduler back into the state to allow
879            // other threads to pick it up and drive it.
880            self.scheduler.core.set(core);
881
882            // Wake up other possible threads that could steal the driver.
883            self.scheduler.notify.notify_one();
884        }
885    }
886}