tokio/runtime/scheduler/current_thread/
mod.rs

1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6    self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9    blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25/// Executes tasks on the current thread
26pub(crate) struct CurrentThread {
27    /// Core scheduler data is acquired by a thread entering `block_on`.
28    core: AtomicCell<Core>,
29
30    /// Notifier for waking up other threads to steal the
31    /// driver.
32    notify: Notify,
33}
34
35/// Handle to the current thread scheduler
36pub(crate) struct Handle {
37    /// Scheduler state shared across threads
38    shared: Shared,
39
40    /// Resource driver handles
41    pub(crate) driver: driver::Handle,
42
43    /// Blocking pool spawner
44    pub(crate) blocking_spawner: blocking::Spawner,
45
46    /// Current random number generator seed
47    pub(crate) seed_generator: RngSeedGenerator,
48
49    /// User-supplied hooks to invoke for things
50    pub(crate) task_hooks: TaskHooks,
51
52    /// If this is a `LocalRuntime`, flags the owning thread ID.
53    pub(crate) local_tid: Option<ThreadId>,
54}
55
56/// Data required for executing the scheduler. The struct is passed around to
57/// a function that will perform the scheduling work and acts as a capability token.
58struct Core {
59    /// Scheduler run queue
60    tasks: VecDeque<Notified>,
61
62    /// Current tick
63    tick: u32,
64
65    /// Runtime driver
66    ///
67    /// The driver is removed before starting to park the thread
68    driver: Option<Driver>,
69
70    /// Metrics batch
71    metrics: MetricsBatch,
72
73    /// How often to check the global queue
74    global_queue_interval: u32,
75
76    /// True if a task panicked without being handled and the runtime is
77    /// configured to shutdown on unhandled panic.
78    unhandled_panic: bool,
79}
80
81/// Scheduler state shared between threads.
82struct Shared {
83    /// Remote run queue
84    inject: Inject<Arc<Handle>>,
85
86    /// Collection of all active tasks spawned onto this executor.
87    owned: OwnedTasks<Arc<Handle>>,
88
89    /// Indicates whether the blocked on thread was woken.
90    woken: AtomicBool,
91
92    /// Scheduler configuration options
93    config: Config,
94
95    /// Keeps track of various runtime metrics.
96    scheduler_metrics: SchedulerMetrics,
97
98    /// This scheduler only has one worker.
99    worker_metrics: WorkerMetrics,
100}
101
102/// Thread-local context.
103///
104/// pub(crate) to store in `runtime::context`.
105pub(crate) struct Context {
106    /// Scheduler handle
107    handle: Arc<Handle>,
108
109    /// Scheduler core, enabling the holder of `Context` to execute the
110    /// scheduler.
111    core: RefCell<Option<Box<Core>>>,
112
113    /// Deferred tasks, usually ones that called `task::yield_now()`.
114    pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119/// Initial queue capacity.
120const INITIAL_CAPACITY: usize = 64;
121
122/// Used if none is specified. This is a temporary constant and will be removed
123/// as we unify tuning logic between the multi-thread and current-thread
124/// schedulers.
125const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128    pub(crate) fn new(
129        driver: Driver,
130        driver_handle: driver::Handle,
131        blocking_spawner: blocking::Spawner,
132        seed_generator: RngSeedGenerator,
133        config: Config,
134        local_tid: Option<ThreadId>,
135    ) -> (CurrentThread, Arc<Handle>) {
136        let worker_metrics = WorkerMetrics::from_config(&config);
137        worker_metrics.set_thread_id(thread::current().id());
138
139        // Get the configured global queue interval, or use the default.
140        let global_queue_interval = config
141            .global_queue_interval
142            .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144        let handle = Arc::new(Handle {
145            task_hooks: TaskHooks {
146                task_spawn_callback: config.before_spawn.clone(),
147                task_terminate_callback: config.after_termination.clone(),
148                #[cfg(tokio_unstable)]
149                before_poll_callback: config.before_poll.clone(),
150                #[cfg(tokio_unstable)]
151                after_poll_callback: config.after_poll.clone(),
152            },
153            shared: Shared {
154                inject: Inject::new(),
155                owned: OwnedTasks::new(1),
156                woken: AtomicBool::new(false),
157                config,
158                scheduler_metrics: SchedulerMetrics::new(),
159                worker_metrics,
160            },
161            driver: driver_handle,
162            blocking_spawner,
163            seed_generator,
164            local_tid,
165        });
166
167        let core = AtomicCell::new(Some(Box::new(Core {
168            tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169            tick: 0,
170            driver: Some(driver),
171            metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172            global_queue_interval,
173            unhandled_panic: false,
174        })));
175
176        let scheduler = CurrentThread {
177            core,
178            notify: Notify::new(),
179        };
180
181        (scheduler, handle)
182    }
183
184    #[track_caller]
185    pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186        pin!(future);
187
188        crate::runtime::context::enter_runtime(handle, false, |blocking| {
189            let handle = handle.as_current_thread();
190
191            // Attempt to steal the scheduler core and block_on the future if we can
192            // there, otherwise, lets select on a notification that the core is
193            // available or the future is complete.
194            loop {
195                if let Some(core) = self.take_core(handle) {
196                    handle
197                        .shared
198                        .worker_metrics
199                        .set_thread_id(thread::current().id());
200                    return core.block_on(future);
201                } else {
202                    let notified = self.notify.notified();
203                    pin!(notified);
204
205                    if let Some(out) = blocking
206                        .block_on(poll_fn(|cx| {
207                            if notified.as_mut().poll(cx).is_ready() {
208                                return Ready(None);
209                            }
210
211                            if let Ready(out) = future.as_mut().poll(cx) {
212                                return Ready(Some(out));
213                            }
214
215                            Pending
216                        }))
217                        .expect("Failed to `Enter::block_on`")
218                    {
219                        return out;
220                    }
221                }
222            }
223        })
224    }
225
226    fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227        let core = self.core.take()?;
228
229        Some(CoreGuard {
230            context: scheduler::Context::CurrentThread(Context {
231                handle: handle.clone(),
232                core: RefCell::new(Some(core)),
233                defer: Defer::new(),
234            }),
235            scheduler: self,
236        })
237    }
238
239    pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240        let handle = handle.as_current_thread();
241
242        // Avoid a double panic if we are currently panicking and
243        // the lock may be poisoned.
244
245        let core = match self.take_core(handle) {
246            Some(core) => core,
247            None if std::thread::panicking() => return,
248            None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249        };
250
251        // Check that the thread-local is not being destroyed
252        let tls_available = context::with_current(|_| ()).is_ok();
253
254        if tls_available {
255            core.enter(|core, _context| {
256                let core = shutdown2(core, handle);
257                (core, ())
258            });
259        } else {
260            // Shutdown without setting the context. `tokio::spawn` calls will
261            // fail, but those will fail either way because the thread-local is
262            // not available anymore.
263            let context = core.context.expect_current_thread();
264            let core = context.core.borrow_mut().take().unwrap();
265
266            let core = shutdown2(core, handle);
267            *context.core.borrow_mut() = Some(core);
268        }
269    }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273    // Drain the OwnedTasks collection. This call also closes the
274    // collection, ensuring that no tasks are ever pushed after this
275    // call returns.
276    handle.shared.owned.close_and_shutdown_all(0);
277
278    // Drain local queue
279    // We already shut down every task, so we just need to drop the task.
280    while let Some(task) = core.next_local_task(handle) {
281        drop(task);
282    }
283
284    // Close the injection queue
285    handle.shared.inject.close();
286
287    // Drain remote queue
288    while let Some(task) = handle.shared.inject.pop() {
289        drop(task);
290    }
291
292    assert!(handle.shared.owned.is_empty());
293
294    // Submit metrics
295    core.submit_metrics(handle);
296
297    // Shutdown the resource drivers
298    if let Some(driver) = core.driver.as_mut() {
299        driver.shutdown(&handle.driver);
300    }
301
302    core
303}
304
305impl fmt::Debug for CurrentThread {
306    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307        fmt.debug_struct("CurrentThread").finish()
308    }
309}
310
311// ===== impl Core =====
312
313impl Core {
314    /// Get and increment the current tick
315    fn tick(&mut self) {
316        self.tick = self.tick.wrapping_add(1);
317    }
318
319    fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320        if self.tick % self.global_queue_interval == 0 {
321            handle
322                .next_remote_task()
323                .or_else(|| self.next_local_task(handle))
324        } else {
325            self.next_local_task(handle)
326                .or_else(|| handle.next_remote_task())
327        }
328    }
329
330    fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331        let ret = self.tasks.pop_front();
332        handle
333            .shared
334            .worker_metrics
335            .set_queue_depth(self.tasks.len());
336        ret
337    }
338
339    fn push_task(&mut self, handle: &Handle, task: Notified) {
340        self.tasks.push_back(task);
341        self.metrics.inc_local_schedule_count();
342        handle
343            .shared
344            .worker_metrics
345            .set_queue_depth(self.tasks.len());
346    }
347
348    fn submit_metrics(&mut self, handle: &Handle) {
349        self.metrics.submit(&handle.shared.worker_metrics, 0);
350    }
351}
352
353#[cfg(feature = "taskdump")]
354fn wake_deferred_tasks_and_free(context: &Context) {
355    let wakers = context.defer.take_deferred();
356    for waker in wakers {
357        waker.wake();
358    }
359}
360
361// ===== impl Context =====
362
363impl Context {
364    /// Execute the closure with the given scheduler core stored in the
365    /// thread-local context.
366    fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367        core.metrics.start_poll();
368        let mut ret = self.enter(core, || crate::task::coop::budget(f));
369        ret.0.metrics.end_poll();
370        ret
371    }
372
373    /// Blocks the current thread until an event is received by the driver,
374    /// including I/O events, timer events, ...
375    fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376        let mut driver = core.driver.take().expect("driver missing");
377
378        if let Some(f) = &handle.shared.config.before_park {
379            let (c, ()) = self.enter(core, || f());
380            core = c;
381        }
382
383        // This check will fail if `before_park` spawns a task for us to run
384        // instead of parking the thread
385        if core.tasks.is_empty() {
386            // Park until the thread is signaled
387            core.metrics.about_to_park();
388            core.submit_metrics(handle);
389
390            core = self.park_internal(core, handle, &mut driver, None);
391
392            core.metrics.unparked();
393            core.submit_metrics(handle);
394        }
395
396        if let Some(f) = &handle.shared.config.after_unpark {
397            let (c, ()) = self.enter(core, || f());
398            core = c;
399        }
400
401        core.driver = Some(driver);
402        core
403    }
404
405    /// Checks the driver for new events without blocking the thread.
406    fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
407        let mut driver = core.driver.take().expect("driver missing");
408
409        core.submit_metrics(handle);
410
411        core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
412
413        core.driver = Some(driver);
414        core
415    }
416
417    fn park_internal(
418        &self,
419        core: Box<Core>,
420        handle: &Handle,
421        driver: &mut Driver,
422        duration: Option<Duration>,
423    ) -> Box<Core> {
424        let (core, ()) = self.enter(core, || {
425            match duration {
426                Some(dur) => driver.park_timeout(&handle.driver, dur),
427                None => driver.park(&handle.driver),
428            }
429            self.defer.wake();
430        });
431
432        core
433    }
434
435    fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
436        // Store the scheduler core in the thread-local context
437        //
438        // A drop-guard is employed at a higher level.
439        *self.core.borrow_mut() = Some(core);
440
441        // Execute the closure while tracking the execution budget
442        let ret = f();
443
444        // Take the scheduler core back
445        let core = self.core.borrow_mut().take().expect("core missing");
446        (core, ret)
447    }
448
449    pub(crate) fn defer(&self, waker: &Waker) {
450        self.defer.defer(waker);
451    }
452}
453
454// ===== impl Handle =====
455
456impl Handle {
457    /// Spawns a future onto the `CurrentThread` scheduler
458    #[track_caller]
459    pub(crate) fn spawn<F>(
460        me: &Arc<Self>,
461        future: F,
462        id: crate::runtime::task::Id,
463        spawned_at: SpawnLocation,
464    ) -> JoinHandle<F::Output>
465    where
466        F: crate::future::Future + Send + 'static,
467        F::Output: Send + 'static,
468    {
469        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
470
471        me.task_hooks.spawn(&TaskMeta {
472            id,
473            spawned_at,
474            _phantom: Default::default(),
475        });
476
477        if let Some(notified) = notified {
478            me.schedule(notified);
479        }
480
481        handle
482    }
483
484    /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
485    ///
486    /// # Safety
487    ///
488    /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
489    /// provably cannot be driven from or moved to different threads from the one on which the task
490    /// is spawned.
491    #[track_caller]
492    pub(crate) unsafe fn spawn_local<F>(
493        me: &Arc<Self>,
494        future: F,
495        id: crate::runtime::task::Id,
496        spawned_at: SpawnLocation,
497    ) -> JoinHandle<F::Output>
498    where
499        F: crate::future::Future + 'static,
500        F::Output: 'static,
501    {
502        // Safety: the caller guarantees that the this is only called on a `LocalRuntime`.
503        let (handle, notified) = unsafe {
504            me.shared
505                .owned
506                .bind_local(future, me.clone(), id, spawned_at)
507        };
508
509        me.task_hooks.spawn(&TaskMeta {
510            id,
511            spawned_at,
512            _phantom: Default::default(),
513        });
514
515        if let Some(notified) = notified {
516            me.schedule(notified);
517        }
518
519        handle
520    }
521
522    /// Capture a snapshot of this runtime's state.
523    #[cfg(all(
524        tokio_unstable,
525        feature = "taskdump",
526        target_os = "linux",
527        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
528    ))]
529    pub(crate) fn dump(&self) -> crate::runtime::Dump {
530        use crate::runtime::dump;
531        use task::trace::trace_current_thread;
532
533        let mut traces = vec![];
534
535        // todo: how to make this work outside of a runtime context?
536        context::with_scheduler(|maybe_context| {
537            // drain the local queue
538            let context = if let Some(context) = maybe_context {
539                context.expect_current_thread()
540            } else {
541                return;
542            };
543            let mut maybe_core = context.core.borrow_mut();
544            let core = if let Some(core) = maybe_core.as_mut() {
545                core
546            } else {
547                return;
548            };
549            let local = &mut core.tasks;
550
551            if self.shared.inject.is_closed() {
552                return;
553            }
554
555            traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
556                .into_iter()
557                .map(|(id, trace)| dump::Task::new(id, trace))
558                .collect();
559
560            // Avoid double borrow panic
561            drop(maybe_core);
562
563            // Taking a taskdump could wakes every task, but we probably don't want
564            // the `yield_now` vector to be that large under normal circumstances.
565            // Therefore, we free its allocation.
566            wake_deferred_tasks_and_free(context);
567        });
568
569        dump::Dump::new(traces)
570    }
571
572    fn next_remote_task(&self) -> Option<Notified> {
573        self.shared.inject.pop()
574    }
575
576    fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
577        // Set woken to true when enter block_on, ensure outer future
578        // be polled for the first time when enter loop
579        me.shared.woken.store(true, Release);
580        waker_ref(me)
581    }
582
583    // reset woken to false and return original value
584    pub(crate) fn reset_woken(&self) -> bool {
585        self.shared.woken.swap(false, AcqRel)
586    }
587
588    pub(crate) fn num_alive_tasks(&self) -> usize {
589        self.shared.owned.num_alive_tasks()
590    }
591
592    pub(crate) fn injection_queue_depth(&self) -> usize {
593        self.shared.inject.len()
594    }
595
596    pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
597        assert_eq!(0, worker);
598        &self.shared.worker_metrics
599    }
600}
601
602cfg_unstable_metrics! {
603    impl Handle {
604        pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
605            &self.shared.scheduler_metrics
606        }
607
608        pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
609            self.worker_metrics(worker).queue_depth()
610        }
611
612        pub(crate) fn num_blocking_threads(&self) -> usize {
613            self.blocking_spawner.num_threads()
614        }
615
616        pub(crate) fn num_idle_blocking_threads(&self) -> usize {
617            self.blocking_spawner.num_idle_threads()
618        }
619
620        pub(crate) fn blocking_queue_depth(&self) -> usize {
621            self.blocking_spawner.queue_depth()
622        }
623
624        cfg_64bit_metrics! {
625            pub(crate) fn spawned_tasks_count(&self) -> u64 {
626                self.shared.owned.spawned_tasks_count()
627            }
628        }
629    }
630}
631
632use std::num::NonZeroU64;
633
634impl Handle {
635    pub(crate) fn owned_id(&self) -> NonZeroU64 {
636        self.shared.owned.id
637    }
638}
639
640impl fmt::Debug for Handle {
641    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
642        fmt.debug_struct("current_thread::Handle { ... }").finish()
643    }
644}
645
646// ===== impl Shared =====
647
648impl Schedule for Arc<Handle> {
649    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
650        self.shared.owned.remove(task)
651    }
652
653    fn schedule(&self, task: task::Notified<Self>) {
654        use scheduler::Context::CurrentThread;
655
656        context::with_scheduler(|maybe_cx| match maybe_cx {
657            Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
658                let mut core = cx.core.borrow_mut();
659
660                // If `None`, the runtime is shutting down, so there is no need
661                // to schedule the task.
662                if let Some(core) = core.as_mut() {
663                    core.push_task(self, task);
664                }
665            }
666            _ => {
667                // Track that a task was scheduled from **outside** of the runtime.
668                self.shared.scheduler_metrics.inc_remote_schedule_count();
669
670                // Schedule the task
671                self.shared.inject.push(task);
672                self.driver.unpark();
673            }
674        });
675    }
676
677    fn hooks(&self) -> TaskHarnessScheduleHooks {
678        TaskHarnessScheduleHooks {
679            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
680        }
681    }
682
683    cfg_unstable! {
684        fn unhandled_panic(&self) {
685            use crate::runtime::UnhandledPanic;
686
687            match self.shared.config.unhandled_panic {
688                UnhandledPanic::Ignore => {
689                    // Do nothing
690                }
691                UnhandledPanic::ShutdownRuntime => {
692                    use scheduler::Context::CurrentThread;
693
694                    // This hook is only called from within the runtime, so
695                    // `context::with_scheduler` should match with `&self`, i.e.
696                    // there is no opportunity for a nested scheduler to be
697                    // called.
698                    context::with_scheduler(|maybe_cx| match maybe_cx {
699                        Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
700                            let mut core = cx.core.borrow_mut();
701
702                            // If `None`, the runtime is shutting down, so there is no need to signal shutdown
703                            if let Some(core) = core.as_mut() {
704                                core.unhandled_panic = true;
705                                self.shared.owned.close_and_shutdown_all(0);
706                            }
707                        }
708                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
709                    })
710                }
711            }
712        }
713    }
714}
715
716impl Wake for Handle {
717    fn wake(arc_self: Arc<Self>) {
718        Wake::wake_by_ref(&arc_self);
719    }
720
721    /// Wake by reference
722    fn wake_by_ref(arc_self: &Arc<Self>) {
723        arc_self.shared.woken.store(true, Release);
724        arc_self.driver.unpark();
725    }
726}
727
728// ===== CoreGuard =====
729
730/// Used to ensure we always place the `Core` value back into its slot in
731/// `CurrentThread`, even if the future panics.
732struct CoreGuard<'a> {
733    context: scheduler::Context,
734    scheduler: &'a CurrentThread,
735}
736
737impl CoreGuard<'_> {
738    #[track_caller]
739    fn block_on<F: Future>(self, future: F) -> F::Output {
740        let ret = self.enter(|mut core, context| {
741            let waker = Handle::waker_ref(&context.handle);
742            let mut cx = std::task::Context::from_waker(&waker);
743
744            pin!(future);
745
746            core.metrics.start_processing_scheduled_tasks();
747
748            'outer: loop {
749                let handle = &context.handle;
750
751                if handle.reset_woken() {
752                    let (c, res) = context.enter(core, || {
753                        crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
754                    });
755
756                    core = c;
757
758                    if let Ready(v) = res {
759                        return (core, Some(v));
760                    }
761                }
762
763                for _ in 0..handle.shared.config.event_interval {
764                    // Make sure we didn't hit an unhandled_panic
765                    if core.unhandled_panic {
766                        return (core, None);
767                    }
768
769                    core.tick();
770
771                    let entry = core.next_task(handle);
772
773                    let task = match entry {
774                        Some(entry) => entry,
775                        None => {
776                            core.metrics.end_processing_scheduled_tasks();
777
778                            core = if !context.defer.is_empty() {
779                                context.park_yield(core, handle)
780                            } else {
781                                context.park(core, handle)
782                            };
783
784                            core.metrics.start_processing_scheduled_tasks();
785
786                            // Try polling the `block_on` future next
787                            continue 'outer;
788                        }
789                    };
790
791                    let task = context.handle.shared.owned.assert_owner(task);
792
793                    #[cfg(tokio_unstable)]
794                    let task_meta = task.task_meta();
795
796                    let (c, ()) = context.run_task(core, || {
797                        #[cfg(tokio_unstable)]
798                        context.handle.task_hooks.poll_start_callback(&task_meta);
799
800                        task.run();
801
802                        #[cfg(tokio_unstable)]
803                        context.handle.task_hooks.poll_stop_callback(&task_meta);
804                    });
805
806                    core = c;
807                }
808
809                core.metrics.end_processing_scheduled_tasks();
810
811                // Yield to the driver, this drives the timer and pulls any
812                // pending I/O events.
813                core = context.park_yield(core, handle);
814
815                core.metrics.start_processing_scheduled_tasks();
816            }
817        });
818
819        match ret {
820            Some(ret) => ret,
821            None => {
822                // `block_on` panicked.
823                panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
824            }
825        }
826    }
827
828    /// Enters the scheduler context. This sets the queue and other necessary
829    /// scheduler state in the thread-local.
830    fn enter<F, R>(self, f: F) -> R
831    where
832        F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
833    {
834        let context = self.context.expect_current_thread();
835
836        // Remove `core` from `context` to pass into the closure.
837        let core = context.core.borrow_mut().take().expect("core missing");
838
839        // Call the closure and place `core` back
840        let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
841
842        *context.core.borrow_mut() = Some(core);
843
844        ret
845    }
846}
847
848impl Drop for CoreGuard<'_> {
849    fn drop(&mut self) {
850        let context = self.context.expect_current_thread();
851
852        if let Some(core) = context.core.borrow_mut().take() {
853            // Replace old scheduler back into the state to allow
854            // other threads to pick it up and drive it.
855            self.scheduler.core.set(core);
856
857            // Wake up other possible threads that could steal the driver.
858            self.scheduler.notify.notify_one();
859        }
860    }
861}