tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, park, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::OwnedTasks;
66use crate::runtime::{
67    blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::task::coop;
71use crate::util::atomic_cell::AtomicCell;
72use crate::util::rand::{FastRand, RngSeedGenerator};
73
74use std::cell::RefCell;
75use std::task::Waker;
76use std::thread;
77use std::time::Duration;
78
79mod metrics;
80
81cfg_taskdump! {
82    mod taskdump;
83}
84
85cfg_not_taskdump! {
86    mod taskdump_mock;
87}
88
89#[cfg(all(tokio_unstable, feature = "time"))]
90use crate::loom::sync::atomic::AtomicBool;
91
92#[cfg(all(tokio_unstable, feature = "time"))]
93use crate::runtime::time_alt;
94
95#[cfg(all(tokio_unstable, feature = "time"))]
96use crate::runtime::scheduler::util;
97
98/// A scheduler worker
99pub(super) struct Worker {
100    /// Reference to scheduler's handle
101    handle: Arc<Handle>,
102
103    /// Index holding this worker's remote state
104    index: usize,
105
106    /// Used to hand-off a worker's core to another thread.
107    core: AtomicCell<Core>,
108}
109
110/// Core data
111struct Core {
112    /// Used to schedule bookkeeping tasks every so often.
113    tick: u32,
114
115    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
116    /// they go to the back of the `run_queue`.
117    lifo_enabled: bool,
118
119    /// The worker-local run queue.
120    run_queue: queue::Local<Arc<Handle>>,
121
122    #[cfg(all(tokio_unstable, feature = "time"))]
123    time_context: time_alt::LocalContext,
124
125    /// True if the worker is currently searching for more work. Searching
126    /// involves attempting to steal from other workers.
127    is_searching: bool,
128
129    /// True if the scheduler is being shutdown
130    is_shutdown: bool,
131
132    /// True if the scheduler is being traced
133    is_traced: bool,
134
135    /// Whether or not the worker has just returned from a park in which we
136    /// parked on the I/O driver.
137    had_driver: park::HadDriver,
138
139    /// If `true`, the worker should eagerly notify another worker when polling
140    /// the first task after returning from a park in which it parked on the I/O
141    /// or time driver.
142    enable_eager_driver_handoff: bool,
143
144    /// Parker
145    ///
146    /// Stored in an `Option` as the parker is added / removed to make the
147    /// borrow checker happy.
148    park: Option<Parker>,
149
150    /// Per-worker runtime stats
151    stats: Stats,
152
153    /// How often to check the global queue
154    global_queue_interval: u32,
155
156    /// Fast random number generator.
157    rand: FastRand,
158}
159
160/// State shared across all workers
161pub(crate) struct Shared {
162    /// Per-worker remote state. All other workers have access to this and is
163    /// how they communicate between each other.
164    remotes: Box<[Remote]>,
165
166    /// Global task queue used for:
167    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
168    ///  2. Submit work to the scheduler when a worker run queue is saturated
169    pub(super) inject: inject::Shared<Arc<Handle>>,
170
171    /// Coordinates idle workers
172    idle: Idle,
173
174    /// Collection of all active tasks spawned onto this executor.
175    pub(crate) owned: OwnedTasks<Arc<Handle>>,
176
177    /// Data synchronized by the scheduler mutex
178    pub(super) synced: Mutex<Synced>,
179
180    /// Cores that have observed the shutdown signal
181    ///
182    /// The core is **not** placed back in the worker to avoid it from being
183    /// stolen by a thread that was spawned as part of `block_in_place`.
184    #[allow(clippy::vec_box)] // we're moving an already-boxed value
185    shutdown_cores: Mutex<Vec<Box<Core>>>,
186
187    /// The number of cores that have observed the trace signal.
188    pub(super) trace_status: TraceStatus,
189
190    /// Scheduler configuration options
191    config: Config,
192
193    /// Collects metrics from the runtime.
194    pub(super) scheduler_metrics: SchedulerMetrics,
195
196    pub(super) worker_metrics: Box<[WorkerMetrics]>,
197
198    /// Only held to trigger some code on drop. This is used to get internal
199    /// runtime metrics that can be useful when doing performance
200    /// investigations. This does nothing (empty struct, no drop impl) unless
201    /// the `tokio_internal_mt_counters` `cfg` flag is set.
202    _counters: Counters,
203}
204
205/// Data synchronized by the scheduler mutex
206pub(crate) struct Synced {
207    /// Synchronized state for `Idle`.
208    pub(super) idle: idle::Synced,
209
210    /// Synchronized state for `Inject`.
211    pub(crate) inject: inject::Synced,
212
213    #[cfg(all(tokio_unstable, feature = "time"))]
214    /// Timers pending to be registered.
215    /// This is used to register a timer but the [`Core`]
216    /// is not available in the current thread.
217    inject_timers: Vec<time_alt::EntryHandle>,
218}
219
220/// Used to communicate with a worker from other threads.
221struct Remote {
222    /// Steals tasks from this worker.
223    pub(super) steal: queue::Steal<Arc<Handle>>,
224
225    /// Unparks the associated worker thread
226    unpark: Unparker,
227}
228
229/// Thread-local context
230pub(crate) struct Context {
231    /// Worker
232    worker: Arc<Worker>,
233
234    /// Core data
235    core: RefCell<Option<Box<Core>>>,
236
237    /// Tasks to wake after resource drivers are polled. This is mostly to
238    /// handle yielded tasks.
239    pub(crate) defer: Defer,
240}
241
242/// Starts the workers
243pub(crate) struct Launch(Vec<Arc<Worker>>);
244
245/// Running a task may consume the core. If the core is still available when
246/// running the task completes, it is returned. Otherwise, the worker will need
247/// to stop processing.
248type RunResult = Result<Box<Core>, ()>;
249
250/// A notified task handle
251type Notified = task::Notified<Arc<Handle>>;
252
253/// Value picked out of thin-air. Running the LIFO slot a handful of times
254/// seems sufficient to benefit from locality. More than 3 times probably is
255/// over-weighting. The value can be tuned in the future with data that shows
256/// improvements.
257const MAX_LIFO_POLLS_PER_TICK: usize = 3;
258
259#[allow(clippy::too_many_arguments)]
260pub(super) fn create(
261    size: usize,
262    park: Parker,
263    driver_handle: driver::Handle,
264    blocking_spawner: blocking::Spawner,
265    seed_generator: RngSeedGenerator,
266    config: Config,
267    timer_flavor: TimerFlavor,
268    name: Option<String>,
269) -> (Arc<Handle>, Launch) {
270    let mut cores = Vec::with_capacity(size);
271    let mut remotes = Vec::with_capacity(size);
272    let mut worker_metrics = Vec::with_capacity(size);
273
274    // Create the local queues
275    for _ in 0..size {
276        let (steal, run_queue) = queue::local();
277
278        let park = park.clone();
279        let unpark = park.unpark();
280        let metrics = WorkerMetrics::from_config(&config);
281        let stats = Stats::new(&metrics);
282
283        cores.push(Box::new(Core {
284            tick: 0,
285            lifo_enabled: !config.disable_lifo_slot,
286            run_queue,
287            #[cfg(all(tokio_unstable, feature = "time"))]
288            time_context: time_alt::LocalContext::new(),
289            is_searching: false,
290            is_shutdown: false,
291            is_traced: false,
292            enable_eager_driver_handoff: config.enable_eager_driver_handoff,
293            had_driver: park::HadDriver::No,
294            park: Some(park),
295            global_queue_interval: stats.tuned_global_queue_interval(&config),
296            stats,
297            rand: FastRand::from_seed(config.seed_generator.next_seed()),
298        }));
299
300        remotes.push(Remote { steal, unpark });
301        worker_metrics.push(metrics);
302    }
303
304    let (idle, idle_synced) = Idle::new(size);
305    let (inject, inject_synced) = inject::Shared::new();
306
307    let remotes_len = remotes.len();
308    let handle = Arc::new(Handle {
309        name,
310        task_hooks: TaskHooks::from_config(&config),
311        shared: Shared {
312            remotes: remotes.into_boxed_slice(),
313            inject,
314            idle,
315            owned: OwnedTasks::new(size),
316            synced: Mutex::new(Synced {
317                idle: idle_synced,
318                inject: inject_synced,
319                #[cfg(all(tokio_unstable, feature = "time"))]
320                inject_timers: Vec::new(),
321            }),
322            shutdown_cores: Mutex::new(vec![]),
323            trace_status: TraceStatus::new(remotes_len),
324            config,
325            scheduler_metrics: SchedulerMetrics::new(),
326            worker_metrics: worker_metrics.into_boxed_slice(),
327            _counters: Counters,
328        },
329        driver: driver_handle,
330        blocking_spawner,
331        seed_generator,
332        timer_flavor,
333        #[cfg(all(tokio_unstable, feature = "time"))]
334        is_shutdown: AtomicBool::new(false),
335    });
336
337    let mut launch = Launch(vec![]);
338
339    for (index, core) in cores.drain(..).enumerate() {
340        launch.0.push(Arc::new(Worker {
341            handle: handle.clone(),
342            index,
343            core: AtomicCell::new(Some(core)),
344        }));
345    }
346
347    (handle, launch)
348}
349
350#[track_caller]
351pub(crate) fn block_in_place<F, R>(f: F) -> R
352where
353    F: FnOnce() -> R,
354{
355    // Try to steal the worker core back
356    struct Reset {
357        take_core: bool,
358        budget: coop::Budget,
359    }
360
361    impl Drop for Reset {
362        fn drop(&mut self) {
363            with_current(|maybe_cx| {
364                if let Some(cx) = maybe_cx {
365                    if self.take_core {
366                        let core = cx.worker.core.take();
367
368                        if core.is_some() {
369                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
370                                .set_thread_id(thread::current().id());
371                        }
372
373                        let mut cx_core = cx.core.borrow_mut();
374                        assert!(cx_core.is_none());
375                        *cx_core = core;
376                    }
377
378                    // Reset the task budget as we are re-entering the
379                    // runtime.
380                    coop::set(self.budget);
381                }
382            });
383        }
384    }
385
386    let mut had_entered = false;
387    let mut take_core = false;
388
389    let setup_result = with_current(|maybe_cx| {
390        match (
391            crate::runtime::context::current_enter_context(),
392            maybe_cx.is_some(),
393        ) {
394            (context::EnterRuntime::Entered { .. }, true) => {
395                // We are on a thread pool runtime thread, so we just need to
396                // set up blocking.
397                had_entered = true;
398            }
399            (
400                context::EnterRuntime::Entered {
401                    allow_block_in_place,
402                },
403                false,
404            ) => {
405                // We are on an executor, but _not_ on the thread pool.  That is
406                // _only_ okay if we are in a thread pool runtime's block_on
407                // method:
408                if allow_block_in_place {
409                    had_entered = true;
410                    return Ok(());
411                } else {
412                    // This probably means we are on the current_thread runtime or in a
413                    // LocalSet, where it is _not_ okay to block.
414                    return Err(
415                        "can call blocking only when running on the multi-threaded runtime",
416                    );
417                }
418            }
419            (context::EnterRuntime::NotEntered, true) => {
420                // This is a nested call to block_in_place (we already exited).
421                // All the necessary setup has already been done.
422                return Ok(());
423            }
424            (context::EnterRuntime::NotEntered, false) => {
425                // We are outside of the tokio runtime, so blocking is fine.
426                // We can also skip all of the thread pool blocking setup steps.
427                return Ok(());
428            }
429        }
430
431        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
432
433        // Since deferred tasks don't stay on `core`, make sure to wake them
434        // before blocking.
435        cx.defer.wake();
436
437        // Get the worker core. If none is set, then blocking is fine!
438        let mut core = match cx.core.borrow_mut().take() {
439            Some(core) => core,
440            None => return Ok(()),
441        };
442
443        // If we heavily call `spawn_blocking`, there might be no available thread to
444        // run this core. Except for the task in the lifo_slot, all tasks can be
445        // stolen, so we move the task out of the lifo_slot to the run_queue.
446        if let Some(task) = core.run_queue.pop_lifo() {
447            core.run_queue
448                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
449        }
450
451        // We are taking the core from the context and sending it to another
452        // thread.
453        take_core = true;
454
455        // The parker should be set here
456        assert!(core.park.is_some());
457
458        // In order to block, the core must be sent to another thread for
459        // execution.
460        //
461        // First, move the core back into the worker's shared core slot.
462        cx.worker.core.set(core);
463
464        // Next, clone the worker handle and send it to a new thread for
465        // processing.
466        //
467        // Once the blocking task is done executing, we will attempt to
468        // steal the core back.
469        let worker = cx.worker.clone();
470        runtime::spawn_blocking(move || run(worker));
471        Ok(())
472    });
473
474    if let Err(panic_message) = setup_result {
475        panic!("{}", panic_message);
476    }
477
478    if had_entered {
479        // Unset the current task's budget. Blocking sections are not
480        // constrained by task budgets.
481        let _reset = Reset {
482            take_core,
483            budget: coop::stop(),
484        };
485
486        crate::runtime::context::exit_runtime(f)
487    } else {
488        f()
489    }
490}
491
492impl Launch {
493    pub(crate) fn launch(mut self) {
494        for worker in self.0.drain(..) {
495            runtime::spawn_blocking(move || run(worker));
496        }
497    }
498}
499
500fn run(worker: Arc<Worker>) {
501    #[allow(dead_code)]
502    struct AbortOnPanic;
503
504    impl Drop for AbortOnPanic {
505        fn drop(&mut self) {
506            if std::thread::panicking() {
507                eprintln!("worker thread panicking; aborting process");
508                std::process::abort();
509            }
510        }
511    }
512
513    // Catching panics on worker threads in tests is quite tricky. Instead, when
514    // debug assertions are enabled, we just abort the process.
515    #[cfg(debug_assertions)]
516    let _abort_on_panic = AbortOnPanic;
517
518    // Acquire a core. If this fails, then another thread is running this
519    // worker and there is nothing further to do.
520    let core = match worker.core.take() {
521        Some(core) => core,
522        None => return,
523    };
524
525    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
526
527    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
528
529    crate::runtime::context::enter_runtime(&handle, true, |_| {
530        // Set the worker context.
531        let cx = scheduler::Context::MultiThread(Context {
532            worker,
533            core: RefCell::new(None),
534            defer: Defer::new(),
535        });
536
537        context::set_scheduler(&cx, || {
538            let cx = cx.expect_multi_thread();
539
540            // This should always be an error. It only returns a `Result` to support
541            // using `?` to short circuit.
542            assert!(cx.run(core).is_err());
543
544            // Check if there are any deferred tasks to notify. This can happen when
545            // the worker core is lost due to `block_in_place()` being called from
546            // within the task.
547            cx.defer.wake();
548        });
549    });
550}
551
552impl Context {
553    fn run(&self, mut core: Box<Core>) -> RunResult {
554        // Reset `lifo_enabled` here in case the core was previously stolen from
555        // a task that had the LIFO slot disabled.
556        self.reset_lifo_enabled(&mut core);
557
558        // Start as "processing" tasks as polling tasks from the local queue
559        // will be one of the first things we do.
560        core.stats.start_processing_scheduled_tasks();
561
562        while !core.is_shutdown {
563            self.assert_lifo_enabled_is_correct(&core);
564
565            if core.is_traced {
566                core = self.worker.handle.trace_core(core);
567            }
568
569            // Increment the tick
570            core.tick();
571
572            // Run maintenance, if needed
573            core = self.maintenance(core);
574
575            // First, check work available to the current worker.
576            if let Some(task) = core.next_task(&self.worker) {
577                core = self.run_task(task, core)?;
578                continue;
579            }
580
581            // We consumed all work in the queues and will start searching for work.
582            core.stats.end_processing_scheduled_tasks();
583
584            // There is no more **local** work to process, try to steal work
585            // from other workers.
586            if let Some(task) = core.steal_work(&self.worker) {
587                // Found work, switch back to processing
588                core.stats.start_processing_scheduled_tasks();
589                core = self.run_task(task, core)?;
590            } else {
591                // Wait for work
592                core = if !self.defer.is_empty() {
593                    self.park_yield(core)
594                } else {
595                    self.park(core)
596                };
597                core.stats.start_processing_scheduled_tasks();
598            }
599        }
600
601        #[cfg(all(tokio_unstable, feature = "time"))]
602        {
603            match self.worker.handle.timer_flavor {
604                TimerFlavor::Traditional => {}
605                TimerFlavor::Alternative => {
606                    util::time_alt::shutdown_local_timers(
607                        &mut core.time_context.wheel,
608                        &mut core.time_context.canc_rx,
609                        self.worker.handle.take_remote_timers(),
610                        &self.worker.handle.driver,
611                    );
612                }
613            }
614        }
615
616        core.pre_shutdown(&self.worker);
617        // Signal shutdown
618        self.worker.handle.shutdown_core(core);
619        Err(())
620    }
621
622    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
623        #[cfg(tokio_unstable)]
624        let task_meta = task.task_meta();
625
626        let task = self.worker.handle.shared.owned.assert_owner(task);
627
628        // Make sure the worker is not in the **searching** state. This enables
629        // another idle worker to try to steal work.
630        let notified_parked_worker = core.transition_from_searching(&self.worker);
631
632        // If the setting to wake eagerly when releasing the I/O driver is
633        // enabled, and this worker had the driver, wake a parked worker to come
634        // grab it from us.
635        //
636        // Note that this is only done when we are *actually* about to poll a
637        // task, rather than whenever the worker has unparked. When the worker
638        // has been unparked, it may not actually have any tasks to poll, and if
639        // it's still holding the I/O driver, it should just go back to polling
640        // the driver again, rather than trying to wake someone else spuriously.
641        //
642        // Note that this explicitly checks `cfg!(tokio_unstable)` in addition,
643        // as that should result in this whole expression being eliminated at
644        // compile-time when unstable features are disabled.
645        if cfg!(tokio_unstable)
646            && core.enable_eager_driver_handoff
647            && core.had_driver == park::HadDriver::Yes
648            && !notified_parked_worker
649        // don't do it a second time
650        {
651            core.had_driver = park::HadDriver::No;
652            self.worker.handle.notify_parked_local();
653        }
654
655        self.assert_lifo_enabled_is_correct(&core);
656
657        // Measure the poll start time. Note that we may end up polling other
658        // tasks under this measurement. In this case, the tasks came from the
659        // LIFO slot and are considered part of the current task for scheduling
660        // purposes. These tasks inherent the "parent"'s limits.
661        core.stats.start_poll();
662
663        // Make the core available to the runtime context
664        *self.core.borrow_mut() = Some(core);
665
666        // Run the task
667        coop::budget(|| {
668            // Unlike the poll time above, poll start callback is attached to the task id,
669            // so it is tightly associated with the actual poll invocation.
670            #[cfg(tokio_unstable)]
671            self.worker
672                .handle
673                .task_hooks
674                .poll_start_callback(&task_meta);
675
676            task.run();
677
678            #[cfg(tokio_unstable)]
679            self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
680
681            let mut lifo_polls = 0;
682
683            // As long as there is budget remaining and a task exists in the
684            // `lifo_slot`, then keep running.
685            loop {
686                // Check if we still have the core. If not, the core was stolen
687                // by another worker.
688                let mut core = match self.core.borrow_mut().take() {
689                    Some(core) => core,
690                    None => {
691                        // In this case, we cannot call `reset_lifo_enabled()`
692                        // because the core was stolen. The stealer will handle
693                        // that at the top of `Context::run`
694                        return Err(());
695                    }
696                };
697
698                // Check for a task in the LIFO slot
699                let task = match core.run_queue.pop_lifo() {
700                    Some(task) => task,
701                    None => {
702                        self.reset_lifo_enabled(&mut core);
703                        core.stats.end_poll();
704                        return Ok(core);
705                    }
706                };
707
708                if !coop::has_budget_remaining() {
709                    core.stats.end_poll();
710
711                    // Not enough budget left to run the LIFO task, push it to
712                    // the back of the queue and return.
713                    core.run_queue.push_back_or_overflow(
714                        task,
715                        &*self.worker.handle,
716                        &mut core.stats,
717                    );
718                    // If we hit this point, the LIFO slot should be enabled.
719                    // There is no need to reset it.
720                    debug_assert!(core.lifo_enabled);
721                    return Ok(core);
722                }
723
724                // Track that we are about to run a task from the LIFO slot.
725                lifo_polls += 1;
726                super::counters::inc_lifo_schedules();
727
728                // Disable the LIFO slot if we reach our limit
729                //
730                // In ping-ping style workloads where task A notifies task B,
731                // which notifies task A again, continuously prioritizing the
732                // LIFO slot can cause starvation as these two tasks will
733                // repeatedly schedule the other. To mitigate this, we limit the
734                // number of times the LIFO slot is prioritized.
735                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
736                    core.lifo_enabled = false;
737                    super::counters::inc_lifo_capped();
738                }
739
740                // Run the LIFO task, then loop
741                *self.core.borrow_mut() = Some(core);
742                let task = self.worker.handle.shared.owned.assert_owner(task);
743
744                #[cfg(tokio_unstable)]
745                let task_meta = task.task_meta();
746
747                #[cfg(tokio_unstable)]
748                self.worker
749                    .handle
750                    .task_hooks
751                    .poll_start_callback(&task_meta);
752
753                task.run();
754
755                #[cfg(tokio_unstable)]
756                self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
757            }
758        })
759    }
760
761    fn reset_lifo_enabled(&self, core: &mut Core) {
762        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
763    }
764
765    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
766        debug_assert_eq!(
767            core.lifo_enabled,
768            !self.worker.handle.shared.config.disable_lifo_slot
769        );
770    }
771
772    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
773        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
774            super::counters::inc_num_maintenance();
775
776            core.stats.end_processing_scheduled_tasks();
777
778            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
779            // to run without actually putting the thread to sleep.
780            core = self.park_yield(core);
781
782            // Run regularly scheduled maintenance
783            core.maintenance(&self.worker);
784
785            core.stats.start_processing_scheduled_tasks();
786        }
787
788        core
789    }
790
791    /// Parks the worker thread while waiting for tasks to execute.
792    ///
793    /// This function checks if indeed there's no more work left to be done before parking.
794    /// Also important to notice that, before parking, the worker thread will try to take
795    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
796    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
797    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
798    /// When the local queue is saturated, the overflow tasks are added to the injection queue
799    /// from where other workers can pick them up.
800    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
801    /// after all the IOs get dispatched
802    fn park(&self, mut core: Box<Core>) -> Box<Core> {
803        if let Some(f) = &self.worker.handle.shared.config.before_park {
804            f();
805        }
806
807        if core.transition_to_parked(&self.worker) {
808            while !core.is_shutdown && !core.is_traced {
809                core.stats.about_to_park();
810                core.stats
811                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
812
813                core = self.park_internal(core, None);
814
815                core.stats.unparked();
816
817                // Run regularly scheduled maintenance
818                core.maintenance(&self.worker);
819
820                if core.transition_from_parked(&self.worker) {
821                    break;
822                }
823            }
824        }
825
826        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
827            f();
828        }
829        core
830    }
831
832    fn park_yield(&self, core: Box<Core>) -> Box<Core> {
833        self.park_internal(core, Some(Duration::from_millis(0)))
834    }
835
836    fn park_internal(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
837        self.assert_lifo_enabled_is_correct(&core);
838
839        // Take the parker out of core
840        let mut park = core.park.take().expect("park missing");
841        // Store `core` in context
842        *self.core.borrow_mut() = Some(core);
843
844        #[cfg(feature = "time")]
845        let (duration, auto_advance_duration) = match self.worker.handle.timer_flavor {
846            TimerFlavor::Traditional => (duration, None::<Duration>),
847            #[cfg(tokio_unstable)]
848            TimerFlavor::Alternative => {
849                // Must happens after taking out the parker, as the `Handle::schedule_local`
850                // will delay the notify if the parker taken out.
851                //
852                // See comments in `Handle::schedule_local` for more details.
853                let MaintainLocalTimer {
854                    park_duration: duration,
855                    auto_advance_duration,
856                } = self.maintain_local_timers_before_parking(duration);
857                (duration, auto_advance_duration)
858            }
859        };
860
861        // Park thread
862        let had_driver = if let Some(timeout) = duration {
863            park.park_timeout(&self.worker.handle.driver, timeout)
864        } else {
865            park.park(&self.worker.handle.driver)
866        };
867
868        self.defer.wake();
869
870        #[cfg(feature = "time")]
871        match self.worker.handle.timer_flavor {
872            TimerFlavor::Traditional => {
873                // suppress unused variable warning
874                let _ = auto_advance_duration;
875            }
876            #[cfg(tokio_unstable)]
877            TimerFlavor::Alternative => {
878                // Must happens before placing back the parker, as the `Handle::schedule_local`
879                // will delay the notify if the parker is still in `core`.
880                //
881                // See comments in `Handle::schedule_local` for more details.
882                self.maintain_local_timers_after_parking(auto_advance_duration);
883            }
884        }
885
886        // Remove `core` from context
887        core = self.core.borrow_mut().take().expect("core missing");
888
889        // Place `park` back in `core`
890        core.park = Some(park);
891        core.had_driver = had_driver;
892
893        if core.should_notify_others() {
894            self.worker.handle.notify_parked_local();
895        }
896        core
897    }
898
899    pub(crate) fn defer(&self, waker: &Waker) {
900        if self.core.borrow().is_none() {
901            // If there is no core, then the worker is currently in a block_in_place. In this case,
902            // we cannot use the defer queue as we aren't really in the current runtime.
903            waker.wake_by_ref();
904        } else {
905            self.defer.defer(waker);
906        }
907    }
908
909    #[cfg(all(tokio_unstable, feature = "time"))]
910    /// Maintain local timers before parking the resource driver.
911    ///
912    /// * Remove cancelled timers from the local timer wheel.
913    /// * Register remote timers to the local timer wheel.
914    /// * Adjust the park duration based on
915    ///   * the next timer expiration time.
916    ///   * whether auto-advancing is required (feature = "test-util").
917    ///
918    /// # Returns
919    ///
920    /// `(Box<Core>, park_duration, auto_advance_duration)`
921    fn maintain_local_timers_before_parking(
922        &self,
923        park_duration: Option<Duration>,
924    ) -> MaintainLocalTimer {
925        let handle = &self.worker.handle;
926        let mut wake_queue = time_alt::WakeQueue::new();
927
928        let (should_yield, next_timer) = with_current(|maybe_cx| {
929            let cx = maybe_cx.expect("function should be called when core is present");
930            assert_eq!(
931                Arc::as_ptr(&cx.worker.handle),
932                Arc::as_ptr(&self.worker.handle),
933                "function should be called on the exact same worker"
934            );
935
936            let mut maybe_core = cx.core.borrow_mut();
937            let core = maybe_core.as_mut().expect("core missing");
938            let time_cx = &mut core.time_context;
939
940            util::time_alt::process_registration_queue(
941                &mut time_cx.registration_queue,
942                &mut time_cx.wheel,
943                &time_cx.canc_tx,
944                &mut wake_queue,
945            );
946            util::time_alt::insert_inject_timers(
947                &mut time_cx.wheel,
948                &time_cx.canc_tx,
949                handle.take_remote_timers(),
950                &mut wake_queue,
951            );
952            util::time_alt::remove_cancelled_timers(&mut time_cx.wheel, &mut time_cx.canc_rx);
953            let should_yield = !wake_queue.is_empty();
954
955            let next_timer = util::time_alt::next_expiration_time(&time_cx.wheel, &handle.driver);
956
957            (should_yield, next_timer)
958        });
959
960        wake_queue.wake_all();
961
962        if should_yield {
963            MaintainLocalTimer {
964                park_duration: Some(Duration::from_millis(0)),
965                auto_advance_duration: None,
966            }
967        } else {
968            // get the minimum duration
969            let dur = util::time_alt::min_duration(park_duration, next_timer);
970            if util::time_alt::pre_auto_advance(&handle.driver, dur) {
971                MaintainLocalTimer {
972                    park_duration: Some(Duration::ZERO),
973                    auto_advance_duration: dur,
974                }
975            } else {
976                MaintainLocalTimer {
977                    park_duration: dur,
978                    auto_advance_duration: None,
979                }
980            }
981        }
982    }
983
984    #[cfg(all(tokio_unstable, feature = "time"))]
985    /// Maintain local timers after unparking the resource driver.
986    ///
987    /// * Auto-advance time, if required (feature = "test-util").
988    /// * Process expired timers.
989    fn maintain_local_timers_after_parking(&self, auto_advance_duration: Option<Duration>) {
990        let handle = &self.worker.handle;
991        let mut wake_queue = time_alt::WakeQueue::new();
992
993        with_current(|maybe_cx| {
994            let cx = maybe_cx.expect("function should be called when core is present");
995            assert_eq!(
996                Arc::as_ptr(&cx.worker.handle),
997                Arc::as_ptr(&self.worker.handle),
998                "function should be called on the exact same worker"
999            );
1000
1001            let mut maybe_core = cx.core.borrow_mut();
1002            let core = maybe_core.as_mut().expect("core missing");
1003            let time_cx = &mut core.time_context;
1004
1005            util::time_alt::post_auto_advance(&handle.driver, auto_advance_duration);
1006            util::time_alt::process_expired_timers(
1007                &mut time_cx.wheel,
1008                &handle.driver,
1009                &mut wake_queue,
1010            );
1011        });
1012
1013        wake_queue.wake_all();
1014    }
1015
1016    #[cfg(all(tokio_unstable, feature = "time"))]
1017    fn with_core<F, R>(&self, f: F) -> R
1018    where
1019        F: FnOnce(Option<&mut Core>) -> R,
1020    {
1021        match self.core.borrow_mut().as_mut() {
1022            Some(core) => f(Some(core)),
1023            None => f(None),
1024        }
1025    }
1026
1027    #[cfg(all(tokio_unstable, feature = "time"))]
1028    pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
1029    where
1030        F: FnOnce(Option<time_alt::TempLocalContext<'_>>) -> R,
1031    {
1032        self.with_core(|maybe_core| match maybe_core {
1033            Some(core) if core.is_shutdown => f(Some(time_alt::TempLocalContext::new_shutdown())),
1034            Some(core) => f(Some(time_alt::TempLocalContext::new_running(
1035                &mut core.time_context,
1036            ))),
1037            None => f(None),
1038        })
1039    }
1040
1041    #[cfg(tokio_unstable)]
1042    pub(crate) fn worker_index(&self) -> usize {
1043        self.worker.index
1044    }
1045}
1046
1047impl Core {
1048    /// Increment the tick
1049    fn tick(&mut self) {
1050        self.tick = self.tick.wrapping_add(1);
1051    }
1052
1053    /// Return the next notified task available to this worker.
1054    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
1055        if self.tick % self.global_queue_interval == 0 {
1056            // Update the global queue interval, if needed
1057            self.tune_global_queue_interval(worker);
1058
1059            worker
1060                .handle
1061                .next_remote_task()
1062                .or_else(|| self.next_local_task())
1063        } else {
1064            let maybe_task = self.next_local_task();
1065
1066            if maybe_task.is_some() {
1067                return maybe_task;
1068            }
1069
1070            if worker.inject().is_empty() {
1071                return None;
1072            }
1073
1074            let cap = usize::min(
1075                // Other threads can only **remove** tasks from the current
1076                // worker's `run_queue`. So, we can be confident that by the
1077                // time we call `run_queue.push_back` below, there will be *at
1078                // least* `cap` available slots in the queue.
1079                //
1080                // Note that even though `next_local_task()` just returned
1081                // `None`, this may be different from `max_capacity()` if
1082                // another worker is currently stealing tasks from us.
1083                self.run_queue.remaining_slots(),
1084                // We want to make sure that all of the tasks we take end up in
1085                // the first half of the local queue. This ensures that the
1086                // tasks do not get pushed to the inject queue again if overflow
1087                // occurs, as overflow only affects tasks in the second half of
1088                // the local queue.
1089                //
1090                // Note that even if there are concurrent stealers, we do not
1091                // need to consider the value of `remaining_slots()` because a
1092                // future call to `push_overflow()` can only succeed once that
1093                // concurrent stealer has finished stealing, so at that point
1094                // the tasks we are adding now will be in the first half.
1095                self.run_queue.max_capacity() / 2,
1096            );
1097
1098            // The worker is currently idle, pull a batch of work from the
1099            // injection queue. We don't want to pull *all* the work so other
1100            // workers can also get some.
1101            let n = usize::min(
1102                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
1103                cap,
1104            );
1105
1106            // Take at least one task since the first task is returned directly
1107            // and not pushed onto the local queue.
1108            let n = usize::max(1, n);
1109
1110            let mut synced = worker.handle.shared.synced.lock();
1111            // safety: passing in the correct `inject::Synced`.
1112            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
1113
1114            // Pop the first task to return immediately
1115            let ret = tasks.next();
1116
1117            // Push the rest of the on the run queue
1118            self.run_queue.push_back(tasks);
1119
1120            ret
1121        }
1122    }
1123
1124    fn next_local_task(&mut self) -> Option<Notified> {
1125        self.run_queue.pop_lifo().or_else(|| self.run_queue.pop())
1126    }
1127
1128    /// Function responsible for stealing tasks from another worker
1129    ///
1130    /// Note: Only if less than half the workers are searching for tasks to steal
1131    /// a new worker will actually try to steal. The idea is to make sure not all
1132    /// workers will be trying to steal at the same time.
1133    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
1134        if !self.transition_to_searching(worker) {
1135            return None;
1136        }
1137
1138        let num = worker.handle.shared.remotes.len();
1139        // Start from a random worker
1140        let start = self.rand.fastrand_n(num as u32) as usize;
1141
1142        for i in 0..num {
1143            let i = (start + i) % num;
1144
1145            // Don't steal from ourself! We know we don't have work.
1146            if i == worker.index {
1147                continue;
1148            }
1149
1150            let target = &worker.handle.shared.remotes[i];
1151            if let Some(task) = target
1152                .steal
1153                .steal_into(&mut self.run_queue, &mut self.stats)
1154            {
1155                return Some(task);
1156            }
1157        }
1158
1159        // Fallback on checking the global queue
1160        worker.handle.next_remote_task()
1161    }
1162
1163    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
1164        if !self.is_searching {
1165            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
1166        }
1167
1168        self.is_searching
1169    }
1170
1171    fn transition_from_searching(&mut self, worker: &Worker) -> bool {
1172        if !self.is_searching {
1173            return false;
1174        }
1175
1176        self.is_searching = false;
1177        worker.handle.transition_worker_from_searching()
1178    }
1179
1180    fn has_tasks(&self) -> bool {
1181        self.run_queue.has_tasks()
1182    }
1183
1184    fn should_notify_others(&self) -> bool {
1185        // If there are tasks available to steal, but this worker is not
1186        // looking for tasks to steal, notify another worker.
1187        if self.is_searching {
1188            return false;
1189        }
1190        self.run_queue.len() > 1
1191    }
1192
1193    /// Prepares the worker state for parking.
1194    ///
1195    /// Returns true if the transition happened, false if there is work to do first.
1196    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
1197        // Workers should not park if they have work to do
1198        if self.has_tasks() || self.is_traced {
1199            return false;
1200        }
1201
1202        // When the final worker transitions **out** of searching to parked, it
1203        // must check all the queues one last time in case work materialized
1204        // between the last work scan and transitioning out of searching.
1205        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
1206            &worker.handle.shared,
1207            worker.index,
1208            self.is_searching,
1209        );
1210
1211        // The worker is no longer searching. Setting this is the local cache
1212        // only.
1213        self.is_searching = false;
1214
1215        if is_last_searcher {
1216            worker.handle.notify_if_work_pending();
1217        }
1218
1219        true
1220    }
1221
1222    /// Returns `true` if the transition happened.
1223    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
1224        // If a task is in the lifo slot/run queue, then we must unpark regardless of
1225        // being notified
1226        if self.has_tasks() {
1227            // When a worker wakes, it should only transition to the "searching"
1228            // state when the wake originates from another worker *or* a new task
1229            // is pushed. We do *not* want the worker to transition to "searching"
1230            // when it wakes when the I/O driver receives new events.
1231            self.is_searching = !worker
1232                .handle
1233                .shared
1234                .idle
1235                .unpark_worker_by_id(&worker.handle.shared, worker.index);
1236            return true;
1237        }
1238
1239        if worker
1240            .handle
1241            .shared
1242            .idle
1243            .is_parked(&worker.handle.shared, worker.index)
1244        {
1245            return false;
1246        }
1247
1248        // When unparked, the worker is in the searching state.
1249        self.is_searching = true;
1250        true
1251    }
1252
1253    /// Runs maintenance work such as checking the pool's state.
1254    fn maintenance(&mut self, worker: &Worker) {
1255        self.stats
1256            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1257
1258        if !self.is_shutdown {
1259            // Check if the scheduler has been shutdown
1260            let synced = worker.handle.shared.synced.lock();
1261            self.is_shutdown = worker.inject().is_closed(&synced.inject);
1262        }
1263
1264        if !self.is_traced {
1265            // Check if the worker should be tracing.
1266            self.is_traced = worker.handle.shared.trace_status.trace_requested();
1267        }
1268    }
1269
1270    /// Signals all tasks to shut down, and waits for them to complete. Must run
1271    /// before we enter the single-threaded phase of shutdown processing.
1272    fn pre_shutdown(&mut self, worker: &Worker) {
1273        // Start from a random inner list
1274        let start = self
1275            .rand
1276            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1277        // Signal to all tasks to shut down.
1278        worker
1279            .handle
1280            .shared
1281            .owned
1282            .close_and_shutdown_all(start as usize);
1283
1284        self.stats
1285            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1286    }
1287
1288    /// Shuts down the core.
1289    fn shutdown(&mut self, handle: &Handle) {
1290        // Take the core
1291        let mut park = self.park.take().expect("park missing");
1292
1293        // Drain the queue
1294        while self.next_local_task().is_some() {}
1295
1296        park.shutdown(&handle.driver);
1297    }
1298
1299    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1300        let next = self
1301            .stats
1302            .tuned_global_queue_interval(&worker.handle.shared.config);
1303
1304        // Smooth out jitter
1305        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1306            self.global_queue_interval = next;
1307        }
1308    }
1309}
1310
1311impl Worker {
1312    /// Returns a reference to the scheduler's injection queue.
1313    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1314        &self.handle.shared.inject
1315    }
1316}
1317
1318impl Handle {
1319    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1320        with_current(|maybe_cx| {
1321            if let Some(cx) = maybe_cx {
1322                // Make sure the task is part of the **current** scheduler.
1323                if self.ptr_eq(&cx.worker.handle) {
1324                    // And the current thread still holds a core
1325                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1326                        self.schedule_local(core, task, is_yield);
1327                        return;
1328                    }
1329                }
1330            }
1331
1332            // Otherwise, use the inject queue.
1333            self.push_remote_task(task);
1334            self.notify_parked_remote();
1335        });
1336    }
1337
1338    // Separated case to reduce LLVM codegen in `Handle::bind_new_task`.
1339    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1340        if let Some(task) = task {
1341            self.schedule_task(task, false);
1342        }
1343    }
1344
1345    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1346        core.stats.inc_local_schedule_count();
1347
1348        // Spawning from the worker thread. If scheduling a "yield" then the
1349        // task must always be pushed to the back of the queue, enabling other
1350        // tasks to be executed. If **not** a yield, then there is more
1351        // flexibility and the task may go to the front of the queue.
1352        if is_yield || !core.lifo_enabled {
1353            core.run_queue
1354                .push_back_or_overflow(task, self, &mut core.stats);
1355        } else {
1356            // Push to the LIFO slot
1357            if let Some(prev) = core.run_queue.push_lifo(task) {
1358                // There was a previous task in the LIFO slot which needs
1359                // to be pushed to the back of the run queue.
1360                core.run_queue
1361                    .push_back_or_overflow(prev, self, &mut core.stats);
1362            }
1363        };
1364
1365        // Only notify if not currently parked. If `park` is `None`, then the
1366        // scheduling is from a resource driver. As notifications often come in
1367        // batches, the notification is delayed until the park is complete.
1368        if core.park.is_some() {
1369            self.notify_parked_local();
1370        }
1371    }
1372
1373    fn next_remote_task(&self) -> Option<Notified> {
1374        if self.shared.inject.is_empty() {
1375            return None;
1376        }
1377
1378        let mut synced = self.shared.synced.lock();
1379        // safety: passing in correct `idle::Synced`
1380        unsafe { self.shared.inject.pop(&mut synced.inject) }
1381    }
1382
1383    fn push_remote_task(&self, task: Notified) {
1384        self.shared.scheduler_metrics.inc_remote_schedule_count();
1385
1386        let mut synced = self.shared.synced.lock();
1387        // safety: passing in correct `idle::Synced`
1388        unsafe {
1389            self.shared.inject.push(&mut synced.inject, task);
1390        }
1391    }
1392
1393    #[cfg(all(tokio_unstable, feature = "time"))]
1394    pub(crate) fn push_remote_timer(&self, hdl: time_alt::EntryHandle) {
1395        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1396        {
1397            let mut synced = self.shared.synced.lock();
1398            synced.inject_timers.push(hdl);
1399        }
1400        self.notify_parked_remote();
1401    }
1402
1403    #[cfg(all(tokio_unstable, feature = "time"))]
1404    pub(crate) fn take_remote_timers(&self) -> Vec<time_alt::EntryHandle> {
1405        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1406        // It's ok to lost the race, as another worker is
1407        // draining the inject_timers.
1408        match self.shared.synced.try_lock() {
1409            Some(mut synced) => std::mem::take(&mut synced.inject_timers),
1410            None => Vec::new(),
1411        }
1412    }
1413
1414    pub(super) fn close(&self) {
1415        if self
1416            .shared
1417            .inject
1418            .close(&mut self.shared.synced.lock().inject)
1419        {
1420            self.notify_all();
1421        }
1422    }
1423
1424    /// Notify a parked worker.
1425    ///
1426    /// Returns `true` if a worker was notified, `false` otherwise.
1427    fn notify_parked_local(&self) -> bool {
1428        super::counters::inc_num_inc_notify_local();
1429
1430        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1431            super::counters::inc_num_unparks_local();
1432            self.shared.remotes[index].unpark.unpark(&self.driver);
1433            true
1434        } else {
1435            false
1436        }
1437    }
1438
1439    fn notify_parked_remote(&self) {
1440        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1441            self.shared.remotes[index].unpark.unpark(&self.driver);
1442        }
1443    }
1444
1445    pub(super) fn notify_all(&self) {
1446        for remote in &self.shared.remotes[..] {
1447            remote.unpark.unpark(&self.driver);
1448        }
1449    }
1450
1451    fn notify_if_work_pending(&self) {
1452        for remote in &self.shared.remotes[..] {
1453            if !remote.steal.is_empty() {
1454                self.notify_parked_local();
1455                return;
1456            }
1457        }
1458
1459        if !self.shared.inject.is_empty() {
1460            self.notify_parked_local();
1461        }
1462    }
1463
1464    /// Returns `true` if another parked worker was notified, `false` otherwise.
1465    fn transition_worker_from_searching(&self) -> bool {
1466        if self.shared.idle.transition_worker_from_searching() {
1467            // We are the final searching worker. Because work was found, we
1468            // need to notify another worker.
1469            self.notify_parked_local()
1470        } else {
1471            false
1472        }
1473    }
1474
1475    /// Signals that a worker has observed the shutdown signal and has replaced
1476    /// its core back into its handle.
1477    ///
1478    /// If all workers have reached this point, the final cleanup is performed.
1479    fn shutdown_core(&self, core: Box<Core>) {
1480        let mut cores = self.shared.shutdown_cores.lock();
1481        cores.push(core);
1482
1483        if cores.len() != self.shared.remotes.len() {
1484            return;
1485        }
1486
1487        debug_assert!(self.shared.owned.is_empty());
1488
1489        for mut core in cores.drain(..) {
1490            core.shutdown(self);
1491        }
1492
1493        // Drain the injection queue
1494        //
1495        // We already shut down every task, so we can simply drop the tasks.
1496        while let Some(task) = self.next_remote_task() {
1497            drop(task);
1498        }
1499    }
1500
1501    fn ptr_eq(&self, other: &Handle) -> bool {
1502        std::ptr::eq(self, other)
1503    }
1504}
1505
1506impl Overflow<Arc<Handle>> for Handle {
1507    fn push(&self, task: task::Notified<Arc<Handle>>) {
1508        self.push_remote_task(task);
1509    }
1510
1511    fn push_batch<I>(&self, iter: I)
1512    where
1513        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1514    {
1515        unsafe {
1516            self.shared.inject.push_batch(self, iter);
1517        }
1518    }
1519}
1520
1521pub(crate) struct InjectGuard<'a> {
1522    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1523}
1524
1525impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1526    fn as_mut(&mut self) -> &mut inject::Synced {
1527        &mut self.lock.inject
1528    }
1529}
1530
1531impl<'a> Lock<inject::Synced> for &'a Handle {
1532    type Handle = InjectGuard<'a>;
1533
1534    fn lock(self) -> Self::Handle {
1535        InjectGuard {
1536            lock: self.shared.synced.lock(),
1537        }
1538    }
1539}
1540
1541#[cfg(all(tokio_unstable, feature = "time"))]
1542/// Returned by [`Context::maintain_local_timers_before_parking`].
1543struct MaintainLocalTimer {
1544    park_duration: Option<Duration>,
1545    auto_advance_duration: Option<Duration>,
1546}
1547
1548#[track_caller]
1549fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1550    use scheduler::Context::MultiThread;
1551
1552    context::with_scheduler(|ctx| match ctx {
1553        Some(MultiThread(ctx)) => f(Some(ctx)),
1554        _ => f(None),
1555    })
1556}