Skip to main content

tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, park, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::OwnedTasks;
66use crate::runtime::{
67    blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::task::coop;
71use crate::util::atomic_cell::AtomicCell;
72use crate::util::rand::{FastRand, RngSeedGenerator};
73
74use std::cell::RefCell;
75use std::task::Waker;
76use std::thread;
77use std::time::Duration;
78
79mod metrics;
80
81cfg_taskdump! {
82    mod taskdump;
83}
84
85cfg_not_taskdump! {
86    mod taskdump_mock;
87}
88
89#[cfg(all(tokio_unstable, feature = "time"))]
90use crate::loom::sync::atomic::AtomicBool;
91
92#[cfg(all(tokio_unstable, feature = "time"))]
93use crate::runtime::time_alt;
94
95#[cfg(all(tokio_unstable, feature = "time"))]
96use crate::runtime::scheduler::util;
97
98/// A scheduler worker
99pub(super) struct Worker {
100    /// Reference to scheduler's handle
101    handle: Arc<Handle>,
102
103    /// Index holding this worker's remote state
104    index: usize,
105
106    /// Used to hand-off a worker's core to another thread.
107    core: AtomicCell<Core>,
108}
109
110/// Core data
111struct Core {
112    /// Used to schedule bookkeeping tasks every so often.
113    tick: u32,
114
115    /// When a task is scheduled from a worker, it is stored in this slot. The
116    /// worker will check this slot for a task **before** checking the run
117    /// queue. This effectively results in the **last** scheduled task to be run
118    /// next (LIFO). This is an optimization for improving locality which
119    /// benefits message passing patterns and helps to reduce latency.
120    lifo_slot: Option<Notified>,
121
122    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
123    /// they go to the back of the `run_queue`.
124    lifo_enabled: bool,
125
126    /// The worker-local run queue.
127    run_queue: queue::Local<Arc<Handle>>,
128
129    #[cfg(all(tokio_unstable, feature = "time"))]
130    time_context: time_alt::LocalContext,
131
132    /// True if the worker is currently searching for more work. Searching
133    /// involves attempting to steal from other workers.
134    is_searching: bool,
135
136    /// True if the scheduler is being shutdown
137    is_shutdown: bool,
138
139    /// True if the scheduler is being traced
140    is_traced: bool,
141
142    /// Whether or not the worker has just returned from a park in which we
143    /// parked on the I/O driver.
144    had_driver: park::HadDriver,
145
146    /// If `true`, the worker should eagerly notify another worker when polling
147    /// the first task after returning from a park in which it parked on the I/O
148    /// or time driver.
149    enable_eager_driver_handoff: bool,
150
151    /// Parker
152    ///
153    /// Stored in an `Option` as the parker is added / removed to make the
154    /// borrow checker happy.
155    park: Option<Parker>,
156
157    /// Per-worker runtime stats
158    stats: Stats,
159
160    /// How often to check the global queue
161    global_queue_interval: u32,
162
163    /// Fast random number generator.
164    rand: FastRand,
165}
166
167/// State shared across all workers
168pub(crate) struct Shared {
169    /// Per-worker remote state. All other workers have access to this and is
170    /// how they communicate between each other.
171    remotes: Box<[Remote]>,
172
173    /// Global task queue used for:
174    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
175    ///  2. Submit work to the scheduler when a worker run queue is saturated
176    pub(super) inject: inject::Shared<Arc<Handle>>,
177
178    /// Coordinates idle workers
179    idle: Idle,
180
181    /// Collection of all active tasks spawned onto this executor.
182    pub(crate) owned: OwnedTasks<Arc<Handle>>,
183
184    /// Data synchronized by the scheduler mutex
185    pub(super) synced: Mutex<Synced>,
186
187    /// Cores that have observed the shutdown signal
188    ///
189    /// The core is **not** placed back in the worker to avoid it from being
190    /// stolen by a thread that was spawned as part of `block_in_place`.
191    #[allow(clippy::vec_box)] // we're moving an already-boxed value
192    shutdown_cores: Mutex<Vec<Box<Core>>>,
193
194    /// The number of cores that have observed the trace signal.
195    pub(super) trace_status: TraceStatus,
196
197    /// Scheduler configuration options
198    config: Config,
199
200    /// Collects metrics from the runtime.
201    pub(super) scheduler_metrics: SchedulerMetrics,
202
203    pub(super) worker_metrics: Box<[WorkerMetrics]>,
204
205    /// Only held to trigger some code on drop. This is used to get internal
206    /// runtime metrics that can be useful when doing performance
207    /// investigations. This does nothing (empty struct, no drop impl) unless
208    /// the `tokio_internal_mt_counters` `cfg` flag is set.
209    _counters: Counters,
210}
211
212/// Data synchronized by the scheduler mutex
213pub(crate) struct Synced {
214    /// Synchronized state for `Idle`.
215    pub(super) idle: idle::Synced,
216
217    /// Synchronized state for `Inject`.
218    pub(crate) inject: inject::Synced,
219
220    #[cfg(all(tokio_unstable, feature = "time"))]
221    /// Timers pending to be registered.
222    /// This is used to register a timer but the [`Core`]
223    /// is not available in the current thread.
224    inject_timers: Vec<time_alt::EntryHandle>,
225}
226
227/// Used to communicate with a worker from other threads.
228struct Remote {
229    /// Steals tasks from this worker.
230    pub(super) steal: queue::Steal<Arc<Handle>>,
231
232    /// Unparks the associated worker thread
233    unpark: Unparker,
234}
235
236/// Thread-local context
237pub(crate) struct Context {
238    /// Worker
239    worker: Arc<Worker>,
240
241    /// Core data
242    core: RefCell<Option<Box<Core>>>,
243
244    /// Tasks to wake after resource drivers are polled. This is mostly to
245    /// handle yielded tasks.
246    pub(crate) defer: Defer,
247}
248
249/// Starts the workers
250pub(crate) struct Launch(Vec<Arc<Worker>>);
251
252/// Running a task may consume the core. If the core is still available when
253/// running the task completes, it is returned. Otherwise, the worker will need
254/// to stop processing.
255type RunResult = Result<Box<Core>, ()>;
256
257/// A notified task handle
258type Notified = task::Notified<Arc<Handle>>;
259
260/// Value picked out of thin-air. Running the LIFO slot a handful of times
261/// seems sufficient to benefit from locality. More than 3 times probably is
262/// over-weighting. The value can be tuned in the future with data that shows
263/// improvements.
264const MAX_LIFO_POLLS_PER_TICK: usize = 3;
265
266#[allow(clippy::too_many_arguments)]
267pub(super) fn create(
268    size: usize,
269    park: Parker,
270    driver_handle: driver::Handle,
271    blocking_spawner: blocking::Spawner,
272    seed_generator: RngSeedGenerator,
273    config: Config,
274    timer_flavor: TimerFlavor,
275    name: Option<String>,
276) -> (Arc<Handle>, Launch) {
277    let mut cores = Vec::with_capacity(size);
278    let mut remotes = Vec::with_capacity(size);
279    let mut worker_metrics = Vec::with_capacity(size);
280
281    // Create the local queues
282    for _ in 0..size {
283        let (steal, run_queue) = queue::local();
284
285        let park = park.clone();
286        let unpark = park.unpark();
287        let metrics = WorkerMetrics::from_config(&config);
288        let stats = Stats::new(&metrics);
289
290        cores.push(Box::new(Core {
291            tick: 0,
292            lifo_slot: None,
293            lifo_enabled: !config.disable_lifo_slot,
294            run_queue,
295            #[cfg(all(tokio_unstable, feature = "time"))]
296            time_context: time_alt::LocalContext::new(),
297            is_searching: false,
298            is_shutdown: false,
299            is_traced: false,
300            enable_eager_driver_handoff: config.enable_eager_driver_handoff,
301            had_driver: park::HadDriver::No,
302            park: Some(park),
303            global_queue_interval: stats.tuned_global_queue_interval(&config),
304            stats,
305            rand: FastRand::from_seed(config.seed_generator.next_seed()),
306        }));
307
308        remotes.push(Remote { steal, unpark });
309        worker_metrics.push(metrics);
310    }
311
312    let (idle, idle_synced) = Idle::new(size);
313    let (inject, inject_synced) = inject::Shared::new();
314
315    let remotes_len = remotes.len();
316    let handle = Arc::new(Handle {
317        name,
318        task_hooks: TaskHooks::from_config(&config),
319        shared: Shared {
320            remotes: remotes.into_boxed_slice(),
321            inject,
322            idle,
323            owned: OwnedTasks::new(size),
324            synced: Mutex::new(Synced {
325                idle: idle_synced,
326                inject: inject_synced,
327                #[cfg(all(tokio_unstable, feature = "time"))]
328                inject_timers: Vec::new(),
329            }),
330            shutdown_cores: Mutex::new(vec![]),
331            trace_status: TraceStatus::new(remotes_len),
332            config,
333            scheduler_metrics: SchedulerMetrics::new(),
334            worker_metrics: worker_metrics.into_boxed_slice(),
335            _counters: Counters,
336        },
337        driver: driver_handle,
338        blocking_spawner,
339        seed_generator,
340        timer_flavor,
341        #[cfg(all(tokio_unstable, feature = "time"))]
342        is_shutdown: AtomicBool::new(false),
343    });
344
345    let mut launch = Launch(vec![]);
346
347    for (index, core) in cores.drain(..).enumerate() {
348        launch.0.push(Arc::new(Worker {
349            handle: handle.clone(),
350            index,
351            core: AtomicCell::new(Some(core)),
352        }));
353    }
354
355    (handle, launch)
356}
357
358#[track_caller]
359pub(crate) fn block_in_place<F, R>(f: F) -> R
360where
361    F: FnOnce() -> R,
362{
363    // Try to steal the worker core back
364    struct Reset {
365        take_core: bool,
366        budget: coop::Budget,
367    }
368
369    impl Drop for Reset {
370        fn drop(&mut self) {
371            with_current(|maybe_cx| {
372                if let Some(cx) = maybe_cx {
373                    if self.take_core {
374                        let core = cx.worker.core.take();
375
376                        if core.is_some() {
377                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
378                                .set_thread_id(thread::current().id());
379                        }
380
381                        let mut cx_core = cx.core.borrow_mut();
382                        assert!(cx_core.is_none());
383                        *cx_core = core;
384                    }
385
386                    // Reset the task budget as we are re-entering the
387                    // runtime.
388                    coop::set(self.budget);
389                }
390            });
391        }
392    }
393
394    let mut had_entered = false;
395    let mut take_core = false;
396
397    let setup_result = with_current(|maybe_cx| {
398        match (
399            crate::runtime::context::current_enter_context(),
400            maybe_cx.is_some(),
401        ) {
402            (context::EnterRuntime::Entered { .. }, true) => {
403                // We are on a thread pool runtime thread, so we just need to
404                // set up blocking.
405                had_entered = true;
406            }
407            (
408                context::EnterRuntime::Entered {
409                    allow_block_in_place,
410                },
411                false,
412            ) => {
413                // We are on an executor, but _not_ on the thread pool.  That is
414                // _only_ okay if we are in a thread pool runtime's block_on
415                // method:
416                if allow_block_in_place {
417                    had_entered = true;
418                    return Ok(());
419                } else {
420                    // This probably means we are on the current_thread runtime or in a
421                    // LocalSet, where it is _not_ okay to block.
422                    return Err(
423                        "can call blocking only when running on the multi-threaded runtime",
424                    );
425                }
426            }
427            (context::EnterRuntime::NotEntered, true) => {
428                // This is a nested call to block_in_place (we already exited).
429                // All the necessary setup has already been done.
430                return Ok(());
431            }
432            (context::EnterRuntime::NotEntered, false) => {
433                // We are outside of the tokio runtime, so blocking is fine.
434                // We can also skip all of the thread pool blocking setup steps.
435                return Ok(());
436            }
437        }
438
439        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
440
441        // Since deferred tasks don't stay on `core`, make sure to wake them
442        // before blocking.
443        cx.defer.wake();
444
445        // Get the worker core. If none is set, then blocking is fine!
446        let mut core = match cx.core.borrow_mut().take() {
447            Some(core) => core,
448            None => return Ok(()),
449        };
450
451        // If we heavily call `spawn_blocking`, there might be no available thread to
452        // run this core. Except for the task in the lifo_slot, all tasks can be
453        // stolen, so we move the task out of the lifo_slot to the run_queue.
454        if let Some(task) = core.lifo_slot.take() {
455            core.run_queue
456                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
457        }
458
459        // We are taking the core from the context and sending it to another
460        // thread.
461        take_core = true;
462
463        // The parker should be set here
464        assert!(core.park.is_some());
465
466        // In order to block, the core must be sent to another thread for
467        // execution.
468        //
469        // First, move the core back into the worker's shared core slot.
470        cx.worker.core.set(core);
471
472        // Next, clone the worker handle and send it to a new thread for
473        // processing.
474        //
475        // Once the blocking task is done executing, we will attempt to
476        // steal the core back.
477        let worker = cx.worker.clone();
478        runtime::spawn_blocking(move || run(worker));
479        Ok(())
480    });
481
482    if let Err(panic_message) = setup_result {
483        panic!("{}", panic_message);
484    }
485
486    if had_entered {
487        // Unset the current task's budget. Blocking sections are not
488        // constrained by task budgets.
489        let _reset = Reset {
490            take_core,
491            budget: coop::stop(),
492        };
493
494        crate::runtime::context::exit_runtime(f)
495    } else {
496        f()
497    }
498}
499
500impl Launch {
501    pub(crate) fn launch(mut self) {
502        for worker in self.0.drain(..) {
503            runtime::spawn_blocking(move || run(worker));
504        }
505    }
506}
507
508fn run(worker: Arc<Worker>) {
509    #[allow(dead_code)]
510    struct AbortOnPanic;
511
512    impl Drop for AbortOnPanic {
513        fn drop(&mut self) {
514            if std::thread::panicking() {
515                eprintln!("worker thread panicking; aborting process");
516                std::process::abort();
517            }
518        }
519    }
520
521    // Catching panics on worker threads in tests is quite tricky. Instead, when
522    // debug assertions are enabled, we just abort the process.
523    #[cfg(debug_assertions)]
524    let _abort_on_panic = AbortOnPanic;
525
526    // Acquire a core. If this fails, then another thread is running this
527    // worker and there is nothing further to do.
528    let core = match worker.core.take() {
529        Some(core) => core,
530        None => return,
531    };
532
533    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
534
535    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
536
537    crate::runtime::context::enter_runtime(&handle, true, |_| {
538        // Set the worker context.
539        let cx = scheduler::Context::MultiThread(Context {
540            worker,
541            core: RefCell::new(None),
542            defer: Defer::new(),
543        });
544
545        context::set_scheduler(&cx, || {
546            let cx = cx.expect_multi_thread();
547
548            // This should always be an error. It only returns a `Result` to support
549            // using `?` to short circuit.
550            assert!(cx.run(core).is_err());
551
552            // Check if there are any deferred tasks to notify. This can happen when
553            // the worker core is lost due to `block_in_place()` being called from
554            // within the task.
555            cx.defer.wake();
556        });
557    });
558}
559
560impl Context {
561    fn run(&self, mut core: Box<Core>) -> RunResult {
562        // Reset `lifo_enabled` here in case the core was previously stolen from
563        // a task that had the LIFO slot disabled.
564        self.reset_lifo_enabled(&mut core);
565
566        // Start as "processing" tasks as polling tasks from the local queue
567        // will be one of the first things we do.
568        core.stats.start_processing_scheduled_tasks();
569
570        while !core.is_shutdown {
571            self.assert_lifo_enabled_is_correct(&core);
572
573            if core.is_traced {
574                core = self.worker.handle.trace_core(core);
575            }
576
577            // Increment the tick
578            core.tick();
579
580            // Run maintenance, if needed
581            core = self.maintenance(core);
582
583            // First, check work available to the current worker.
584            if let Some(task) = core.next_task(&self.worker) {
585                core = self.run_task(task, core)?;
586                continue;
587            }
588
589            // We consumed all work in the queues and will start searching for work.
590            core.stats.end_processing_scheduled_tasks();
591
592            // There is no more **local** work to process, try to steal work
593            // from other workers.
594            if let Some(task) = core.steal_work(&self.worker) {
595                // Found work, switch back to processing
596                core.stats.start_processing_scheduled_tasks();
597                core = self.run_task(task, core)?;
598            } else {
599                // Wait for work
600                core = if !self.defer.is_empty() {
601                    self.park_yield(core)
602                } else {
603                    self.park(core)
604                };
605                core.stats.start_processing_scheduled_tasks();
606            }
607        }
608
609        #[cfg(all(tokio_unstable, feature = "time"))]
610        {
611            match self.worker.handle.timer_flavor {
612                TimerFlavor::Traditional => {}
613                TimerFlavor::Alternative => {
614                    util::time_alt::shutdown_local_timers(
615                        &mut core.time_context.wheel,
616                        &mut core.time_context.canc_rx,
617                        self.worker.handle.take_remote_timers(),
618                        &self.worker.handle.driver,
619                    );
620                }
621            }
622        }
623
624        core.pre_shutdown(&self.worker);
625        // Signal shutdown
626        self.worker.handle.shutdown_core(core);
627        Err(())
628    }
629
630    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
631        #[cfg(tokio_unstable)]
632        let task_meta = task.task_meta();
633
634        let task = self.worker.handle.shared.owned.assert_owner(task);
635
636        // Make sure the worker is not in the **searching** state. This enables
637        // another idle worker to try to steal work.
638        let notified_parked_worker = core.transition_from_searching(&self.worker);
639
640        // If the setting to wake eagerly when releasing the I/O driver is
641        // enabled, and this worker had the driver, wake a parked worker to come
642        // grab it from us.
643        //
644        // Note that this is only done when we are *actually* about to poll a
645        // task, rather than whenever the worker has unparked. When the worker
646        // has been unparked, it may not actually have any tasks to poll, and if
647        // it's still holding the I/O driver, it should just go back to polling
648        // the driver again, rather than trying to wake someone else spuriously.
649        //
650        // Note that this explicitly checks `cfg!(tokio_unstable)` in addition,
651        // as that should result in this whole expression being eliminated at
652        // compile-time when unstable features are disabled.
653        if cfg!(tokio_unstable)
654            && core.enable_eager_driver_handoff
655            && core.had_driver == park::HadDriver::Yes
656            && !notified_parked_worker
657        // don't do it a second time
658        {
659            core.had_driver = park::HadDriver::No;
660            self.worker.handle.notify_parked_local();
661        }
662
663        self.assert_lifo_enabled_is_correct(&core);
664
665        // Measure the poll start time. Note that we may end up polling other
666        // tasks under this measurement. In this case, the tasks came from the
667        // LIFO slot and are considered part of the current task for scheduling
668        // purposes. These tasks inherent the "parent"'s limits.
669        core.stats.start_poll();
670
671        // Make the core available to the runtime context
672        *self.core.borrow_mut() = Some(core);
673
674        // Run the task
675        coop::budget(|| {
676            // Unlike the poll time above, poll start callback is attached to the task id,
677            // so it is tightly associated with the actual poll invocation.
678            #[cfg(tokio_unstable)]
679            self.worker
680                .handle
681                .task_hooks
682                .poll_start_callback(&task_meta);
683
684            task.run();
685
686            #[cfg(tokio_unstable)]
687            self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
688
689            let mut lifo_polls = 0;
690
691            // As long as there is budget remaining and a task exists in the
692            // `lifo_slot`, then keep running.
693            loop {
694                // Check if we still have the core. If not, the core was stolen
695                // by another worker.
696                let mut core = match self.core.borrow_mut().take() {
697                    Some(core) => core,
698                    None => {
699                        // In this case, we cannot call `reset_lifo_enabled()`
700                        // because the core was stolen. The stealer will handle
701                        // that at the top of `Context::run`
702                        return Err(());
703                    }
704                };
705
706                // Check for a task in the LIFO slot
707                let task = match core.lifo_slot.take() {
708                    Some(task) => task,
709                    None => {
710                        self.reset_lifo_enabled(&mut core);
711                        core.stats.end_poll();
712                        return Ok(core);
713                    }
714                };
715
716                if !coop::has_budget_remaining() {
717                    core.stats.end_poll();
718
719                    // Not enough budget left to run the LIFO task, push it to
720                    // the back of the queue and return.
721                    core.run_queue.push_back_or_overflow(
722                        task,
723                        &*self.worker.handle,
724                        &mut core.stats,
725                    );
726                    // If we hit this point, the LIFO slot should be enabled.
727                    // There is no need to reset it.
728                    debug_assert!(core.lifo_enabled);
729                    return Ok(core);
730                }
731
732                // Track that we are about to run a task from the LIFO slot.
733                lifo_polls += 1;
734                super::counters::inc_lifo_schedules();
735
736                // Disable the LIFO slot if we reach our limit
737                //
738                // In ping-ping style workloads where task A notifies task B,
739                // which notifies task A again, continuously prioritizing the
740                // LIFO slot can cause starvation as these two tasks will
741                // repeatedly schedule the other. To mitigate this, we limit the
742                // number of times the LIFO slot is prioritized.
743                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
744                    core.lifo_enabled = false;
745                    super::counters::inc_lifo_capped();
746                }
747
748                // Run the LIFO task, then loop
749                *self.core.borrow_mut() = Some(core);
750                let task = self.worker.handle.shared.owned.assert_owner(task);
751
752                #[cfg(tokio_unstable)]
753                let task_meta = task.task_meta();
754
755                #[cfg(tokio_unstable)]
756                self.worker
757                    .handle
758                    .task_hooks
759                    .poll_start_callback(&task_meta);
760
761                task.run();
762
763                #[cfg(tokio_unstable)]
764                self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
765            }
766        })
767    }
768
769    fn reset_lifo_enabled(&self, core: &mut Core) {
770        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
771    }
772
773    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
774        debug_assert_eq!(
775            core.lifo_enabled,
776            !self.worker.handle.shared.config.disable_lifo_slot
777        );
778    }
779
780    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
781        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
782            super::counters::inc_num_maintenance();
783
784            core.stats.end_processing_scheduled_tasks();
785
786            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
787            // to run without actually putting the thread to sleep.
788            core = self.park_yield(core);
789
790            // Run regularly scheduled maintenance
791            core.maintenance(&self.worker);
792
793            core.stats.start_processing_scheduled_tasks();
794        }
795
796        core
797    }
798
799    /// Parks the worker thread while waiting for tasks to execute.
800    ///
801    /// This function checks if indeed there's no more work left to be done before parking.
802    /// Also important to notice that, before parking, the worker thread will try to take
803    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
804    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
805    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
806    /// When the local queue is saturated, the overflow tasks are added to the injection queue
807    /// from where other workers can pick them up.
808    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
809    /// after all the IOs get dispatched
810    fn park(&self, mut core: Box<Core>) -> Box<Core> {
811        if let Some(f) = &self.worker.handle.shared.config.before_park {
812            f();
813        }
814
815        if core.transition_to_parked(&self.worker) {
816            while !core.is_shutdown && !core.is_traced {
817                core.stats.about_to_park();
818                core.stats
819                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
820
821                core = self.park_internal(core, None);
822
823                core.stats.unparked();
824
825                // Run regularly scheduled maintenance
826                core.maintenance(&self.worker);
827
828                if core.transition_from_parked(&self.worker) {
829                    break;
830                }
831            }
832        }
833
834        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
835            f();
836        }
837        core
838    }
839
840    fn park_yield(&self, core: Box<Core>) -> Box<Core> {
841        self.park_internal(core, Some(Duration::from_millis(0)))
842    }
843
844    fn park_internal(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
845        self.assert_lifo_enabled_is_correct(&core);
846
847        // Take the parker out of core
848        let mut park = core.park.take().expect("park missing");
849        // Store `core` in context
850        *self.core.borrow_mut() = Some(core);
851
852        #[cfg(feature = "time")]
853        let (duration, auto_advance_duration) = match self.worker.handle.timer_flavor {
854            TimerFlavor::Traditional => (duration, None::<Duration>),
855            #[cfg(tokio_unstable)]
856            TimerFlavor::Alternative => {
857                // Must happens after taking out the parker, as the `Handle::schedule_local`
858                // will delay the notify if the parker taken out.
859                //
860                // See comments in `Handle::schedule_local` for more details.
861                let MaintainLocalTimer {
862                    park_duration: duration,
863                    auto_advance_duration,
864                } = self.maintain_local_timers_before_parking(duration);
865                (duration, auto_advance_duration)
866            }
867        };
868
869        // Park thread
870        let had_driver = if let Some(timeout) = duration {
871            park.park_timeout(&self.worker.handle.driver, timeout)
872        } else {
873            park.park(&self.worker.handle.driver)
874        };
875
876        self.defer.wake();
877
878        #[cfg(feature = "time")]
879        match self.worker.handle.timer_flavor {
880            TimerFlavor::Traditional => {
881                // suppress unused variable warning
882                let _ = auto_advance_duration;
883            }
884            #[cfg(tokio_unstable)]
885            TimerFlavor::Alternative => {
886                // Must happens before placing back the parker, as the `Handle::schedule_local`
887                // will delay the notify if the parker is still in `core`.
888                //
889                // See comments in `Handle::schedule_local` for more details.
890                self.maintain_local_timers_after_parking(auto_advance_duration);
891            }
892        }
893
894        // Remove `core` from context
895        core = self.core.borrow_mut().take().expect("core missing");
896
897        // Place `park` back in `core`
898        core.park = Some(park);
899        core.had_driver = had_driver;
900
901        if core.should_notify_others() {
902            self.worker.handle.notify_parked_local();
903        }
904        core
905    }
906
907    pub(crate) fn defer(&self, waker: &Waker) {
908        if self.core.borrow().is_none() {
909            // If there is no core, then the worker is currently in a block_in_place. In this case,
910            // we cannot use the defer queue as we aren't really in the current runtime.
911            waker.wake_by_ref();
912        } else {
913            self.defer.defer(waker);
914        }
915    }
916
917    #[cfg(all(tokio_unstable, feature = "time"))]
918    /// Maintain local timers before parking the resource driver.
919    ///
920    /// * Remove cancelled timers from the local timer wheel.
921    /// * Register remote timers to the local timer wheel.
922    /// * Adjust the park duration based on
923    ///   * the next timer expiration time.
924    ///   * whether auto-advancing is required (feature = "test-util").
925    ///
926    /// # Returns
927    ///
928    /// `(Box<Core>, park_duration, auto_advance_duration)`
929    fn maintain_local_timers_before_parking(
930        &self,
931        park_duration: Option<Duration>,
932    ) -> MaintainLocalTimer {
933        let handle = &self.worker.handle;
934        let mut wake_queue = time_alt::WakeQueue::new();
935
936        let (should_yield, next_timer) = with_current(|maybe_cx| {
937            let cx = maybe_cx.expect("function should be called when core is present");
938            assert_eq!(
939                Arc::as_ptr(&cx.worker.handle),
940                Arc::as_ptr(&self.worker.handle),
941                "function should be called on the exact same worker"
942            );
943
944            let mut maybe_core = cx.core.borrow_mut();
945            let core = maybe_core.as_mut().expect("core missing");
946            let time_cx = &mut core.time_context;
947
948            util::time_alt::process_registration_queue(
949                &mut time_cx.registration_queue,
950                &mut time_cx.wheel,
951                &time_cx.canc_tx,
952                &mut wake_queue,
953            );
954            util::time_alt::insert_inject_timers(
955                &mut time_cx.wheel,
956                &time_cx.canc_tx,
957                handle.take_remote_timers(),
958                &mut wake_queue,
959            );
960            util::time_alt::remove_cancelled_timers(&mut time_cx.wheel, &mut time_cx.canc_rx);
961            let should_yield = !wake_queue.is_empty();
962
963            let next_timer = util::time_alt::next_expiration_time(&time_cx.wheel, &handle.driver);
964
965            (should_yield, next_timer)
966        });
967
968        wake_queue.wake_all();
969
970        if should_yield {
971            MaintainLocalTimer {
972                park_duration: Some(Duration::from_millis(0)),
973                auto_advance_duration: None,
974            }
975        } else {
976            // get the minimum duration
977            let dur = util::time_alt::min_duration(park_duration, next_timer);
978            if util::time_alt::pre_auto_advance(&handle.driver, dur) {
979                MaintainLocalTimer {
980                    park_duration: Some(Duration::ZERO),
981                    auto_advance_duration: dur,
982                }
983            } else {
984                MaintainLocalTimer {
985                    park_duration: dur,
986                    auto_advance_duration: None,
987                }
988            }
989        }
990    }
991
992    #[cfg(all(tokio_unstable, feature = "time"))]
993    /// Maintain local timers after unparking the resource driver.
994    ///
995    /// * Auto-advance time, if required (feature = "test-util").
996    /// * Process expired timers.
997    fn maintain_local_timers_after_parking(&self, auto_advance_duration: Option<Duration>) {
998        let handle = &self.worker.handle;
999        let mut wake_queue = time_alt::WakeQueue::new();
1000
1001        with_current(|maybe_cx| {
1002            let cx = maybe_cx.expect("function should be called when core is present");
1003            assert_eq!(
1004                Arc::as_ptr(&cx.worker.handle),
1005                Arc::as_ptr(&self.worker.handle),
1006                "function should be called on the exact same worker"
1007            );
1008
1009            let mut maybe_core = cx.core.borrow_mut();
1010            let core = maybe_core.as_mut().expect("core missing");
1011            let time_cx = &mut core.time_context;
1012
1013            util::time_alt::post_auto_advance(&handle.driver, auto_advance_duration);
1014            util::time_alt::process_expired_timers(
1015                &mut time_cx.wheel,
1016                &handle.driver,
1017                &mut wake_queue,
1018            );
1019        });
1020
1021        wake_queue.wake_all();
1022    }
1023
1024    #[cfg(all(tokio_unstable, feature = "time"))]
1025    fn with_core<F, R>(&self, f: F) -> R
1026    where
1027        F: FnOnce(Option<&mut Core>) -> R,
1028    {
1029        match self.core.borrow_mut().as_mut() {
1030            Some(core) => f(Some(core)),
1031            None => f(None),
1032        }
1033    }
1034
1035    #[cfg(all(tokio_unstable, feature = "time"))]
1036    pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
1037    where
1038        F: FnOnce(Option<time_alt::TempLocalContext<'_>>) -> R,
1039    {
1040        self.with_core(|maybe_core| match maybe_core {
1041            Some(core) if core.is_shutdown => f(Some(time_alt::TempLocalContext::new_shutdown())),
1042            Some(core) => f(Some(time_alt::TempLocalContext::new_running(
1043                &mut core.time_context,
1044            ))),
1045            None => f(None),
1046        })
1047    }
1048
1049    #[cfg(tokio_unstable)]
1050    pub(crate) fn worker_index(&self) -> usize {
1051        self.worker.index
1052    }
1053}
1054
1055impl Core {
1056    /// Increment the tick
1057    fn tick(&mut self) {
1058        self.tick = self.tick.wrapping_add(1);
1059    }
1060
1061    /// Return the next notified task available to this worker.
1062    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
1063        if self.tick % self.global_queue_interval == 0 {
1064            // Update the global queue interval, if needed
1065            self.tune_global_queue_interval(worker);
1066
1067            worker
1068                .handle
1069                .next_remote_task()
1070                .or_else(|| self.next_local_task())
1071        } else {
1072            let maybe_task = self.next_local_task();
1073
1074            if maybe_task.is_some() {
1075                return maybe_task;
1076            }
1077
1078            if worker.inject().is_empty() {
1079                return None;
1080            }
1081
1082            let cap = usize::min(
1083                // Other threads can only **remove** tasks from the current
1084                // worker's `run_queue`. So, we can be confident that by the
1085                // time we call `run_queue.push_back` below, there will be *at
1086                // least* `cap` available slots in the queue.
1087                //
1088                // Note that even though `next_local_task()` just returned
1089                // `None`, this may be different from `max_capacity()` if
1090                // another worker is currently stealing tasks from us.
1091                self.run_queue.remaining_slots(),
1092                // We want to make sure that all of the tasks we take end up in
1093                // the first half of the local queue. This ensures that the
1094                // tasks do not get pushed to the inject queue again if overflow
1095                // occurs, as overflow only affects tasks in the second half of
1096                // the local queue.
1097                //
1098                // Note that even if there are concurrent stealers, we do not
1099                // need to consider the value of `remaining_slots()` because a
1100                // future call to `push_overflow()` can only succeed once that
1101                // concurrent stealer has finished stealing, so at that point
1102                // the tasks we are adding now will be in the first half.
1103                self.run_queue.max_capacity() / 2,
1104            );
1105
1106            // The worker is currently idle, pull a batch of work from the
1107            // injection queue. We don't want to pull *all* the work so other
1108            // workers can also get some.
1109            let n = usize::min(
1110                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
1111                cap,
1112            );
1113
1114            // Take at least one task since the first task is returned directly
1115            // and not pushed onto the local queue.
1116            let n = usize::max(1, n);
1117
1118            let mut synced = worker.handle.shared.synced.lock();
1119            // safety: passing in the correct `inject::Synced`.
1120            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
1121
1122            // Pop the first task to return immediately
1123            let ret = tasks.next();
1124
1125            // Push the rest of the on the run queue
1126            self.run_queue.push_back(tasks);
1127
1128            ret
1129        }
1130    }
1131
1132    fn next_local_task(&mut self) -> Option<Notified> {
1133        self.lifo_slot.take().or_else(|| self.run_queue.pop())
1134    }
1135
1136    /// Function responsible for stealing tasks from another worker
1137    ///
1138    /// Note: Only if less than half the workers are searching for tasks to steal
1139    /// a new worker will actually try to steal. The idea is to make sure not all
1140    /// workers will be trying to steal at the same time.
1141    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
1142        if !self.transition_to_searching(worker) {
1143            return None;
1144        }
1145
1146        let num = worker.handle.shared.remotes.len();
1147        // Start from a random worker
1148        let start = self.rand.fastrand_n(num as u32) as usize;
1149
1150        for i in 0..num {
1151            let i = (start + i) % num;
1152
1153            // Don't steal from ourself! We know we don't have work.
1154            if i == worker.index {
1155                continue;
1156            }
1157
1158            let target = &worker.handle.shared.remotes[i];
1159            if let Some(task) = target
1160                .steal
1161                .steal_into(&mut self.run_queue, &mut self.stats)
1162            {
1163                return Some(task);
1164            }
1165        }
1166
1167        // Fallback on checking the global queue
1168        worker.handle.next_remote_task()
1169    }
1170
1171    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
1172        if !self.is_searching {
1173            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
1174        }
1175
1176        self.is_searching
1177    }
1178
1179    fn transition_from_searching(&mut self, worker: &Worker) -> bool {
1180        if !self.is_searching {
1181            return false;
1182        }
1183
1184        self.is_searching = false;
1185        worker.handle.transition_worker_from_searching()
1186    }
1187
1188    fn has_tasks(&self) -> bool {
1189        self.lifo_slot.is_some() || self.run_queue.has_tasks()
1190    }
1191
1192    fn should_notify_others(&self) -> bool {
1193        // If there are tasks available to steal, but this worker is not
1194        // looking for tasks to steal, notify another worker.
1195        if self.is_searching {
1196            return false;
1197        }
1198        self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
1199    }
1200
1201    /// Prepares the worker state for parking.
1202    ///
1203    /// Returns true if the transition happened, false if there is work to do first.
1204    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
1205        // Workers should not park if they have work to do
1206        if self.has_tasks() || self.is_traced {
1207            return false;
1208        }
1209
1210        // When the final worker transitions **out** of searching to parked, it
1211        // must check all the queues one last time in case work materialized
1212        // between the last work scan and transitioning out of searching.
1213        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
1214            &worker.handle.shared,
1215            worker.index,
1216            self.is_searching,
1217        );
1218
1219        // The worker is no longer searching. Setting this is the local cache
1220        // only.
1221        self.is_searching = false;
1222
1223        if is_last_searcher {
1224            worker.handle.notify_if_work_pending();
1225        }
1226
1227        true
1228    }
1229
1230    /// Returns `true` if the transition happened.
1231    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
1232        // If a task is in the lifo slot/run queue, then we must unpark regardless of
1233        // being notified
1234        if self.has_tasks() {
1235            // When a worker wakes, it should only transition to the "searching"
1236            // state when the wake originates from another worker *or* a new task
1237            // is pushed. We do *not* want the worker to transition to "searching"
1238            // when it wakes when the I/O driver receives new events.
1239            self.is_searching = !worker
1240                .handle
1241                .shared
1242                .idle
1243                .unpark_worker_by_id(&worker.handle.shared, worker.index);
1244            return true;
1245        }
1246
1247        if worker
1248            .handle
1249            .shared
1250            .idle
1251            .is_parked(&worker.handle.shared, worker.index)
1252        {
1253            return false;
1254        }
1255
1256        // When unparked, the worker is in the searching state.
1257        self.is_searching = true;
1258        true
1259    }
1260
1261    /// Runs maintenance work such as checking the pool's state.
1262    fn maintenance(&mut self, worker: &Worker) {
1263        self.stats
1264            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1265
1266        if !self.is_shutdown {
1267            // Check if the scheduler has been shutdown
1268            let synced = worker.handle.shared.synced.lock();
1269            self.is_shutdown = worker.inject().is_closed(&synced.inject);
1270        }
1271
1272        if !self.is_traced {
1273            // Check if the worker should be tracing.
1274            self.is_traced = worker.handle.shared.trace_status.trace_requested();
1275        }
1276    }
1277
1278    /// Signals all tasks to shut down, and waits for them to complete. Must run
1279    /// before we enter the single-threaded phase of shutdown processing.
1280    fn pre_shutdown(&mut self, worker: &Worker) {
1281        // Start from a random inner list
1282        let start = self
1283            .rand
1284            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1285        // Signal to all tasks to shut down.
1286        worker
1287            .handle
1288            .shared
1289            .owned
1290            .close_and_shutdown_all(start as usize);
1291
1292        self.stats
1293            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1294    }
1295
1296    /// Shuts down the core.
1297    fn shutdown(&mut self, handle: &Handle) {
1298        // Take the core
1299        let mut park = self.park.take().expect("park missing");
1300
1301        // Drain the queue
1302        while self.next_local_task().is_some() {}
1303
1304        park.shutdown(&handle.driver);
1305    }
1306
1307    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1308        let next = self
1309            .stats
1310            .tuned_global_queue_interval(&worker.handle.shared.config);
1311
1312        // Smooth out jitter
1313        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1314            self.global_queue_interval = next;
1315        }
1316    }
1317}
1318
1319impl Worker {
1320    /// Returns a reference to the scheduler's injection queue.
1321    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1322        &self.handle.shared.inject
1323    }
1324}
1325
1326impl Handle {
1327    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1328        with_current(|maybe_cx| {
1329            if let Some(cx) = maybe_cx {
1330                // Make sure the task is part of the **current** scheduler.
1331                if self.ptr_eq(&cx.worker.handle) {
1332                    // And the current thread still holds a core
1333                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1334                        self.schedule_local(core, task, is_yield);
1335                        return;
1336                    }
1337                }
1338            }
1339
1340            // Otherwise, use the inject queue.
1341            self.push_remote_task(task);
1342            self.notify_parked_remote();
1343        });
1344    }
1345
1346    // Separated case to reduce LLVM codegen in `Handle::bind_new_task`.
1347    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1348        if let Some(task) = task {
1349            self.schedule_task(task, false);
1350        }
1351    }
1352
1353    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1354        core.stats.inc_local_schedule_count();
1355
1356        // Spawning from the worker thread. If scheduling a "yield" then the
1357        // task must always be pushed to the back of the queue, enabling other
1358        // tasks to be executed. If **not** a yield, then there is more
1359        // flexibility and the task may go to the front of the queue.
1360        let should_notify = if is_yield || !core.lifo_enabled {
1361            core.run_queue
1362                .push_back_or_overflow(task, self, &mut core.stats);
1363            true
1364        } else {
1365            // Push to the LIFO slot
1366            let prev = core.lifo_slot.take();
1367            let ret = prev.is_some();
1368
1369            if let Some(prev) = prev {
1370                core.run_queue
1371                    .push_back_or_overflow(prev, self, &mut core.stats);
1372            }
1373
1374            core.lifo_slot = Some(task);
1375
1376            ret
1377        };
1378
1379        // Only notify if not currently parked. If `park` is `None`, then the
1380        // scheduling is from a resource driver. As notifications often come in
1381        // batches, the notification is delayed until the park is complete.
1382        if should_notify && core.park.is_some() {
1383            self.notify_parked_local();
1384        }
1385    }
1386
1387    fn next_remote_task(&self) -> Option<Notified> {
1388        if self.shared.inject.is_empty() {
1389            return None;
1390        }
1391
1392        let mut synced = self.shared.synced.lock();
1393        // safety: passing in correct `idle::Synced`
1394        unsafe { self.shared.inject.pop(&mut synced.inject) }
1395    }
1396
1397    fn push_remote_task(&self, task: Notified) {
1398        self.shared.scheduler_metrics.inc_remote_schedule_count();
1399
1400        let mut synced = self.shared.synced.lock();
1401        // safety: passing in correct `idle::Synced`
1402        unsafe {
1403            self.shared.inject.push(&mut synced.inject, task);
1404        }
1405    }
1406
1407    #[cfg(all(tokio_unstable, feature = "time"))]
1408    pub(crate) fn push_remote_timer(&self, hdl: time_alt::EntryHandle) {
1409        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1410        {
1411            let mut synced = self.shared.synced.lock();
1412            synced.inject_timers.push(hdl);
1413        }
1414        self.notify_parked_remote();
1415    }
1416
1417    #[cfg(all(tokio_unstable, feature = "time"))]
1418    pub(crate) fn take_remote_timers(&self) -> Vec<time_alt::EntryHandle> {
1419        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1420        // It's ok to lost the race, as another worker is
1421        // draining the inject_timers.
1422        match self.shared.synced.try_lock() {
1423            Some(mut synced) => std::mem::take(&mut synced.inject_timers),
1424            None => Vec::new(),
1425        }
1426    }
1427
1428    pub(super) fn close(&self) {
1429        if self
1430            .shared
1431            .inject
1432            .close(&mut self.shared.synced.lock().inject)
1433        {
1434            self.notify_all();
1435        }
1436    }
1437
1438    /// Notify a parked worker.
1439    ///
1440    /// Returns `true` if a worker was notified, `false` otherwise.
1441    fn notify_parked_local(&self) -> bool {
1442        super::counters::inc_num_inc_notify_local();
1443
1444        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1445            super::counters::inc_num_unparks_local();
1446            self.shared.remotes[index].unpark.unpark(&self.driver);
1447            true
1448        } else {
1449            false
1450        }
1451    }
1452
1453    fn notify_parked_remote(&self) {
1454        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1455            self.shared.remotes[index].unpark.unpark(&self.driver);
1456        }
1457    }
1458
1459    pub(super) fn notify_all(&self) {
1460        for remote in &self.shared.remotes[..] {
1461            remote.unpark.unpark(&self.driver);
1462        }
1463    }
1464
1465    fn notify_if_work_pending(&self) {
1466        for remote in &self.shared.remotes[..] {
1467            if !remote.steal.is_empty() {
1468                self.notify_parked_local();
1469                return;
1470            }
1471        }
1472
1473        if !self.shared.inject.is_empty() {
1474            self.notify_parked_local();
1475        }
1476    }
1477
1478    /// Returns `true` if another parked worker was notified, `false` otherwise.
1479    fn transition_worker_from_searching(&self) -> bool {
1480        if self.shared.idle.transition_worker_from_searching() {
1481            // We are the final searching worker. Because work was found, we
1482            // need to notify another worker.
1483            self.notify_parked_local()
1484        } else {
1485            false
1486        }
1487    }
1488
1489    /// Signals that a worker has observed the shutdown signal and has replaced
1490    /// its core back into its handle.
1491    ///
1492    /// If all workers have reached this point, the final cleanup is performed.
1493    fn shutdown_core(&self, core: Box<Core>) {
1494        let mut cores = self.shared.shutdown_cores.lock();
1495        cores.push(core);
1496
1497        if cores.len() != self.shared.remotes.len() {
1498            return;
1499        }
1500
1501        debug_assert!(self.shared.owned.is_empty());
1502
1503        for mut core in cores.drain(..) {
1504            core.shutdown(self);
1505        }
1506
1507        // Drain the injection queue
1508        //
1509        // We already shut down every task, so we can simply drop the tasks.
1510        while let Some(task) = self.next_remote_task() {
1511            drop(task);
1512        }
1513    }
1514
1515    fn ptr_eq(&self, other: &Handle) -> bool {
1516        std::ptr::eq(self, other)
1517    }
1518}
1519
1520impl Overflow<Arc<Handle>> for Handle {
1521    fn push(&self, task: task::Notified<Arc<Handle>>) {
1522        self.push_remote_task(task);
1523    }
1524
1525    fn push_batch<I>(&self, iter: I)
1526    where
1527        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1528    {
1529        unsafe {
1530            self.shared.inject.push_batch(self, iter);
1531        }
1532    }
1533}
1534
1535pub(crate) struct InjectGuard<'a> {
1536    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1537}
1538
1539impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1540    fn as_mut(&mut self) -> &mut inject::Synced {
1541        &mut self.lock.inject
1542    }
1543}
1544
1545impl<'a> Lock<inject::Synced> for &'a Handle {
1546    type Handle = InjectGuard<'a>;
1547
1548    fn lock(self) -> Self::Handle {
1549        InjectGuard {
1550            lock: self.shared.synced.lock(),
1551        }
1552    }
1553}
1554
1555#[cfg(all(tokio_unstable, feature = "time"))]
1556/// Returned by [`Context::maintain_local_timers_before_parking`].
1557struct MaintainLocalTimer {
1558    park_duration: Option<Duration>,
1559    auto_advance_duration: Option<Duration>,
1560}
1561
1562#[track_caller]
1563fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1564    use scheduler::Context::MultiThread;
1565
1566    context::with_scheduler(|ctx| match ctx {
1567        Some(MultiThread(ctx)) => f(Some(ctx)),
1568        _ => f(None),
1569    })
1570}