tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, TaskMeta};
9
10use crate::runtime::{LocalOptions, LocalRuntime};
11use crate::util::rand::{RngSeed, RngSeedGenerator};
12
13use crate::runtime::blocking::BlockingPool;
14use crate::runtime::scheduler::CurrentThread;
15use std::fmt;
16use std::io;
17use std::thread::ThreadId;
18use std::time::Duration;
19
20/// Builds Tokio Runtime with custom configuration values.
21///
22/// Methods can be chained in order to set the configuration values. The
23/// Runtime is constructed by calling [`build`].
24///
25/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
26/// or [`Builder::new_current_thread`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: method@Self::build
32/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
33/// [`Builder::new_current_thread`]: method@Self::new_current_thread
34///
35/// # Examples
36///
37/// ```
38/// # #[cfg(not(target_family = "wasm"))]
39/// # {
40/// use tokio::runtime::Builder;
41///
42/// fn main() {
43///     // build runtime
44///     let runtime = Builder::new_multi_thread()
45///         .worker_threads(4)
46///         .thread_name("my-custom-name")
47///         .thread_stack_size(3 * 1024 * 1024)
48///         .build()
49///         .unwrap();
50///
51///     // use runtime ...
52/// }
53/// # }
54/// ```
55pub struct Builder {
56    /// Runtime type
57    kind: Kind,
58
59    /// Name of the runtime.
60    name: Option<String>,
61
62    /// Whether or not to enable the I/O driver
63    enable_io: bool,
64    nevents: usize,
65
66    /// Whether or not to enable the time driver
67    enable_time: bool,
68
69    /// Whether or not the clock should start paused.
70    start_paused: bool,
71
72    /// The number of worker threads, used by Runtime.
73    ///
74    /// Only used when not using the current-thread executor.
75    worker_threads: Option<usize>,
76
77    /// Cap on thread usage.
78    max_blocking_threads: usize,
79
80    /// Name fn used for threads spawned by the runtime.
81    pub(super) thread_name: ThreadNameFn,
82
83    /// Stack size used for threads spawned by the runtime.
84    pub(super) thread_stack_size: Option<usize>,
85
86    /// Callback to run after each thread starts.
87    pub(super) after_start: Option<Callback>,
88
89    /// To run before each worker thread stops
90    pub(super) before_stop: Option<Callback>,
91
92    /// To run before each worker thread is parked.
93    pub(super) before_park: Option<Callback>,
94
95    /// To run after each thread is unparked.
96    pub(super) after_unpark: Option<Callback>,
97
98    /// To run before each task is spawned.
99    pub(super) before_spawn: Option<TaskCallback>,
100
101    /// To run before each poll
102    #[cfg(tokio_unstable)]
103    pub(super) before_poll: Option<TaskCallback>,
104
105    /// To run after each poll
106    #[cfg(tokio_unstable)]
107    pub(super) after_poll: Option<TaskCallback>,
108
109    /// To run after each task is terminated.
110    pub(super) after_termination: Option<TaskCallback>,
111
112    /// Customizable keep alive timeout for `BlockingPool`
113    pub(super) keep_alive: Option<Duration>,
114
115    /// How many ticks before pulling a task from the global/remote queue?
116    ///
117    /// When `None`, the value is unspecified and behavior details are left to
118    /// the scheduler. Each scheduler flavor could choose to either pick its own
119    /// default value or use some other strategy to decide when to poll from the
120    /// global queue. For example, the multi-threaded scheduler uses a
121    /// self-tuning strategy based on mean task poll times.
122    pub(super) global_queue_interval: Option<u32>,
123
124    /// How many ticks before yielding to the driver for timer and I/O events?
125    pub(super) event_interval: u32,
126
127    /// When true, the multi-threade scheduler LIFO slot should not be used.
128    ///
129    /// This option should only be exposed as unstable.
130    pub(super) disable_lifo_slot: bool,
131
132    /// Specify a random number generator seed to provide deterministic results
133    pub(super) seed_generator: RngSeedGenerator,
134
135    /// When true, enables task poll count histogram instrumentation.
136    pub(super) metrics_poll_count_histogram_enable: bool,
137
138    /// Configures the task poll count histogram
139    pub(super) metrics_poll_count_histogram: HistogramBuilder,
140
141    #[cfg(tokio_unstable)]
142    pub(super) unhandled_panic: UnhandledPanic,
143
144    timer_flavor: TimerFlavor,
145
146    /// Whether or not to enable eager hand-off for the I/O and time drivers (in
147    /// `tokio_unstable`).
148    enable_eager_driver_handoff: bool,
149}
150
151cfg_unstable! {
152    /// How the runtime should respond to unhandled panics.
153    ///
154    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
155    /// to configure the runtime behavior when a spawned task panics.
156    ///
157    /// See [`Builder::unhandled_panic`] for more details.
158    #[derive(Debug, Clone)]
159    #[non_exhaustive]
160    pub enum UnhandledPanic {
161        /// The runtime should ignore panics on spawned tasks.
162        ///
163        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
164        /// tasks continue running normally.
165        ///
166        /// This is the default behavior.
167        ///
168        /// # Examples
169        ///
170        /// ```
171        /// # #[cfg(not(target_family = "wasm"))]
172        /// # {
173        /// use tokio::runtime::{self, UnhandledPanic};
174        ///
175        /// # pub fn main() {
176        /// let rt = runtime::Builder::new_current_thread()
177        ///     .unhandled_panic(UnhandledPanic::Ignore)
178        ///     .build()
179        ///     .unwrap();
180        ///
181        /// let task1 = rt.spawn(async { panic!("boom"); });
182        /// let task2 = rt.spawn(async {
183        ///     // This task completes normally
184        ///     "done"
185        /// });
186        ///
187        /// rt.block_on(async {
188        ///     // The panic on the first task is forwarded to the `JoinHandle`
189        ///     assert!(task1.await.is_err());
190        ///
191        ///     // The second task completes normally
192        ///     assert!(task2.await.is_ok());
193        /// })
194        /// # }
195        /// # }
196        /// ```
197        ///
198        /// [`JoinHandle`]: struct@crate::task::JoinHandle
199        Ignore,
200
201        /// The runtime should immediately shutdown if a spawned task panics.
202        ///
203        /// The runtime will immediately shutdown even if the panicked task's
204        /// [`JoinHandle`] is still available. All further spawned tasks will be
205        /// immediately dropped and call to [`Runtime::block_on`] will panic.
206        ///
207        /// # Examples
208        ///
209        /// ```should_panic
210        /// use tokio::runtime::{self, UnhandledPanic};
211        ///
212        /// # pub fn main() {
213        /// let rt = runtime::Builder::new_current_thread()
214        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
215        ///     .build()
216        ///     .unwrap();
217        ///
218        /// rt.spawn(async { panic!("boom"); });
219        /// rt.spawn(async {
220        ///     // This task never completes.
221        /// });
222        ///
223        /// rt.block_on(async {
224        ///     // Do some work
225        /// # loop { tokio::task::yield_now().await; }
226        /// })
227        /// # }
228        /// ```
229        ///
230        /// [`JoinHandle`]: struct@crate::task::JoinHandle
231        ShutdownRuntime,
232    }
233}
234
235pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
236
237#[derive(Clone, Copy)]
238pub(crate) enum Kind {
239    CurrentThread,
240    #[cfg(feature = "rt-multi-thread")]
241    MultiThread,
242}
243
244impl Builder {
245    /// Returns a new builder with the current thread scheduler selected.
246    ///
247    /// Configuration methods can be chained on the return value.
248    ///
249    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
250    /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`].
251    ///
252    /// [`LocalSet`]: crate::task::LocalSet
253    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
254    /// [`build_local`]: crate::runtime::Builder::build_local
255    pub fn new_current_thread() -> Builder {
256        #[cfg(loom)]
257        const EVENT_INTERVAL: u32 = 4;
258        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
259        #[cfg(not(loom))]
260        const EVENT_INTERVAL: u32 = 61;
261
262        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
263    }
264
265    /// Returns a new builder with the multi thread scheduler selected.
266    ///
267    /// Configuration methods can be chained on the return value.
268    #[cfg(feature = "rt-multi-thread")]
269    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
270    pub fn new_multi_thread() -> Builder {
271        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
272        Builder::new(Kind::MultiThread, 61)
273    }
274
275    /// Returns a new runtime builder initialized with default configuration
276    /// values.
277    ///
278    /// Configuration methods can be chained on the return value.
279    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
280        Builder {
281            kind,
282
283            // Default runtime name
284            name: None,
285
286            // I/O defaults to "off"
287            enable_io: false,
288            nevents: 1024,
289
290            // Time defaults to "off"
291            enable_time: false,
292
293            // The clock starts not-paused
294            start_paused: false,
295
296            // Read from environment variable first in multi-threaded mode.
297            // Default to lazy auto-detection (one thread per CPU core)
298            worker_threads: None,
299
300            max_blocking_threads: 512,
301
302            // Default thread name
303            thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
304
305            // Do not set a stack size by default
306            thread_stack_size: None,
307
308            // No worker thread callbacks
309            after_start: None,
310            before_stop: None,
311            before_park: None,
312            after_unpark: None,
313
314            before_spawn: None,
315            after_termination: None,
316
317            #[cfg(tokio_unstable)]
318            before_poll: None,
319            #[cfg(tokio_unstable)]
320            after_poll: None,
321
322            keep_alive: None,
323
324            // Defaults for these values depend on the scheduler kind, so we get them
325            // as parameters.
326            global_queue_interval: None,
327            event_interval,
328
329            seed_generator: RngSeedGenerator::new(RngSeed::new()),
330
331            #[cfg(tokio_unstable)]
332            unhandled_panic: UnhandledPanic::Ignore,
333
334            metrics_poll_count_histogram_enable: false,
335
336            metrics_poll_count_histogram: HistogramBuilder::default(),
337
338            disable_lifo_slot: false,
339
340            timer_flavor: TimerFlavor::Traditional,
341
342            // Eager driver handoff is disabled by default.
343            enable_eager_driver_handoff: false,
344        }
345    }
346
347    /// Enables both I/O and time drivers.
348    ///
349    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
350    /// individually. If additional components are added to Tokio in the future,
351    /// `enable_all` will include these future components.
352    ///
353    /// # Examples
354    ///
355    /// ```
356    /// # #[cfg(not(target_family = "wasm"))]
357    /// # {
358    /// use tokio::runtime;
359    ///
360    /// let rt = runtime::Builder::new_multi_thread()
361    ///     .enable_all()
362    ///     .build()
363    ///     .unwrap();
364    /// # }
365    /// ```
366    pub fn enable_all(&mut self) -> &mut Self {
367        #[cfg(any(
368            feature = "net",
369            all(unix, feature = "process"),
370            all(unix, feature = "signal")
371        ))]
372        self.enable_io();
373
374        #[cfg(all(
375            tokio_unstable,
376            feature = "io-uring",
377            feature = "rt",
378            feature = "fs",
379            target_os = "linux",
380        ))]
381        self.enable_io_uring();
382
383        #[cfg(feature = "time")]
384        self.enable_time();
385
386        self
387    }
388
389    /// Enables the alternative timer implementation, which is disabled by default.
390    ///
391    /// The alternative timer implementation is an unstable feature that may
392    /// provide better performance on multi-threaded runtimes with a large number
393    /// of worker threads.
394    ///
395    /// This option only applies to multi-threaded runtimes. Attempting to use
396    /// this option with any other runtime type will have no effect.
397    ///
398    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// # #[cfg(not(target_family = "wasm"))]
404    /// # {
405    /// use tokio::runtime;
406    ///
407    /// let rt = runtime::Builder::new_multi_thread()
408    ///   .enable_alt_timer()
409    ///   .build()
410    ///   .unwrap();
411    /// # }
412    /// ```
413    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
414    #[cfg_attr(
415        docsrs,
416        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
417    )]
418    pub fn enable_alt_timer(&mut self) -> &mut Self {
419        self.enable_time();
420        self.timer_flavor = TimerFlavor::Alternative;
421        self
422    }
423
424    /// Enable eager hand-off of the I/O and time drivers for multi-threaded
425    /// runtimes, which is disabled by default.
426    ///
427    /// When this option is enabled, a worker thread which has parked on the I/O
428    /// or time driver will notify another worker thread once it is preparing to
429    /// begin polling a task from the run queue, so that the notified worker can
430    /// begin polling the I/O or time driver. This can reduce the latency with
431    /// which I/O and timer notifications are processed, especially when some
432    /// tasks have polls that take a long time to complete. In addition, it can
433    /// reduce the risk of a deadlock which may occur when a task blocks the
434    /// worker thread which is holding the I/O or time driver until some other
435    /// task, which is waiting for a notification from *that* driver, unblocks
436    /// it.
437    ///
438    /// This option is disabled by default, as enabling it may potentially
439    /// increase contention due to extra synchronization in cross-driver
440    /// wakeups.
441    ///
442    /// This option only applies to multi-threaded runtimes. Attempting to use
443    /// this option with any other runtime type will have no effect.
444    ///
445    /// **Note**: This is an [unstable API][unstable]. Eager driver hand-off is
446    /// an experimental feature whose behavior may be removed or changed in 1.x
447    /// releases. See [the documentation on unstable features][unstable] for
448    /// details.
449    ///
450    /// [unstable]: crate#unstable-features
451    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
452    #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt-multi-thread"))))]
453    pub fn enable_eager_driver_handoff(&mut self) -> &mut Self {
454        self.enable_eager_driver_handoff = true;
455        self
456    }
457
458    /// Sets the number of worker threads the `Runtime` will use.
459    ///
460    /// This can be any number above 0 though it is advised to keep this value
461    /// on the smaller side.
462    ///
463    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
464    ///
465    /// # Default
466    ///
467    /// The default value is the number of cores available to the system.
468    ///
469    /// When using the `current_thread` runtime this method has no effect.
470    ///
471    /// # Examples
472    ///
473    /// ## Multi threaded runtime with 4 threads
474    ///
475    /// ```
476    /// # #[cfg(not(target_family = "wasm"))]
477    /// # {
478    /// use tokio::runtime;
479    ///
480    /// // This will spawn a work-stealing runtime with 4 worker threads.
481    /// let rt = runtime::Builder::new_multi_thread()
482    ///     .worker_threads(4)
483    ///     .build()
484    ///     .unwrap();
485    ///
486    /// rt.spawn(async move {});
487    /// # }
488    /// ```
489    ///
490    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
491    ///
492    /// ```
493    /// use tokio::runtime;
494    ///
495    /// // Create a runtime that _must_ be driven from a call
496    /// // to `Runtime::block_on`.
497    /// let rt = runtime::Builder::new_current_thread()
498    ///     .build()
499    ///     .unwrap();
500    ///
501    /// // This will run the runtime and future on the current thread
502    /// rt.block_on(async move {});
503    /// ```
504    ///
505    /// # Panics
506    ///
507    /// This will panic if `val` is not larger than `0`.
508    #[track_caller]
509    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
510        assert!(val > 0, "Worker threads cannot be set to 0");
511        self.worker_threads = Some(val);
512        self
513    }
514
515    /// Specifies the limit for additional threads spawned by the Runtime.
516    ///
517    /// These threads are used for blocking operations like tasks spawned
518    /// through [`spawn_blocking`], this includes but is not limited to:
519    /// - [`fs`] operations
520    /// - dns resolution through [`ToSocketAddrs`]
521    /// - writing to [`Stdout`] or [`Stderr`]
522    /// - reading from [`Stdin`]
523    ///
524    /// Unlike the [`worker_threads`], they are not always active and will exit
525    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
526    ///
527    /// It's recommended to not set this limit too low in order to avoid hanging on operations
528    /// requiring [`spawn_blocking`].
529    ///
530    /// The default value is 512.
531    ///
532    /// # Queue Behavior
533    ///
534    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
535    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
536    /// method has not been reached, a new thread will be spawned. If no idle thread is available
537    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
538    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
539    /// it could potentially grow unbounded.
540    ///
541    /// # Panics
542    ///
543    /// This will panic if `val` is not larger than `0`.
544    ///
545    /// # Upgrading from 0.x
546    ///
547    /// In old versions `max_threads` limited both blocking and worker threads, but the
548    /// current `max_blocking_threads` does not include async worker threads in the count.
549    ///
550    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
551    /// [`fs`]: mod@crate::fs
552    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
553    /// [`Stdout`]: struct@crate::io::Stdout
554    /// [`Stdin`]: struct@crate::io::Stdin
555    /// [`Stderr`]: struct@crate::io::Stderr
556    /// [`worker_threads`]: Self::worker_threads
557    /// [`thread_keep_alive`]: Self::thread_keep_alive
558    #[track_caller]
559    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
560    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
561        assert!(val > 0, "Max blocking threads cannot be set to 0");
562        self.max_blocking_threads = val;
563        self
564    }
565
566    /// Sets name of threads spawned by the `Runtime`'s thread pool.
567    ///
568    /// The default name is "tokio-rt-worker".
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// # #[cfg(not(target_family = "wasm"))]
574    /// # {
575    /// # use tokio::runtime;
576    ///
577    /// # pub fn main() {
578    /// let rt = runtime::Builder::new_multi_thread()
579    ///     .thread_name("my-pool")
580    ///     .build();
581    /// # }
582    /// # }
583    /// ```
584    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
585        let val = val.into();
586        self.thread_name = std::sync::Arc::new(move || val.clone());
587        self
588    }
589
590    /// Sets the name of the runtime.
591    ///
592    /// # Examples
593    ///
594    /// ```
595    /// # #[cfg(not(target_family = "wasm"))]
596    /// # {
597    /// # use tokio::runtime;
598    ///
599    /// # pub fn main() {
600    /// let rt = runtime::Builder::new_multi_thread()
601    ///     .name("my-runtime")
602    ///     .build();
603    /// # }
604    /// # }
605    /// ```
606    /// # Panics
607    ///
608    /// This function will panic if an empty value is passed as an argument.
609    ///
610    #[track_caller]
611    pub fn name(&mut self, val: impl Into<String>) -> &mut Self {
612        let val = val.into();
613        assert!(!val.trim().is_empty(), "runtime name shouldn't be empty");
614        self.name = Some(val);
615        self
616    }
617
618    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
619    ///
620    /// The default name fn is `|| "tokio-rt-worker".into()`.
621    ///
622    /// # Examples
623    ///
624    /// ```
625    /// # #[cfg(not(target_family = "wasm"))]
626    /// # {
627    /// # use tokio::runtime;
628    /// # use std::sync::atomic::{AtomicUsize, Ordering};
629    /// # pub fn main() {
630    /// let rt = runtime::Builder::new_multi_thread()
631    ///     .thread_name_fn(|| {
632    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
633    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
634    ///        format!("my-pool-{}", id)
635    ///     })
636    ///     .build();
637    /// # }
638    /// # }
639    /// ```
640    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
641    where
642        F: Fn() -> String + Send + Sync + 'static,
643    {
644        self.thread_name = std::sync::Arc::new(f);
645        self
646    }
647
648    /// Sets the stack size (in bytes) for worker threads.
649    ///
650    /// The actual stack size may be greater than this value if the platform
651    /// specifies minimal stack size.
652    ///
653    /// The default stack size for spawned threads is 2 MiB, though this
654    /// particular stack size is subject to change in the future.
655    ///
656    /// # Examples
657    ///
658    /// ```
659    /// # #[cfg(not(target_family = "wasm"))]
660    /// # {
661    /// # use tokio::runtime;
662    ///
663    /// # pub fn main() {
664    /// let rt = runtime::Builder::new_multi_thread()
665    ///     .thread_stack_size(32 * 1024)
666    ///     .build();
667    /// # }
668    /// # }
669    /// ```
670    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
671        self.thread_stack_size = Some(val);
672        self
673    }
674
675    /// Executes function `f` after each thread is started but before it starts
676    /// doing work.
677    ///
678    /// This is intended for bookkeeping and monitoring use cases.
679    ///
680    /// # Examples
681    ///
682    /// ```
683    /// # #[cfg(not(target_family = "wasm"))]
684    /// # {
685    /// # use tokio::runtime;
686    /// # pub fn main() {
687    /// let runtime = runtime::Builder::new_multi_thread()
688    ///     .on_thread_start(|| {
689    ///         println!("thread started");
690    ///     })
691    ///     .build();
692    /// # }
693    /// # }
694    /// ```
695    #[cfg(not(loom))]
696    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
697    where
698        F: Fn() + Send + Sync + 'static,
699    {
700        self.after_start = Some(std::sync::Arc::new(f));
701        self
702    }
703
704    /// Executes function `f` before each thread stops.
705    ///
706    /// This is intended for bookkeeping and monitoring use cases.
707    ///
708    /// # Examples
709    ///
710    /// ```
711    /// # #[cfg(not(target_family = "wasm"))]
712    /// {
713    /// # use tokio::runtime;
714    /// # pub fn main() {
715    /// let runtime = runtime::Builder::new_multi_thread()
716    ///     .on_thread_stop(|| {
717    ///         println!("thread stopping");
718    ///     })
719    ///     .build();
720    /// # }
721    /// # }
722    /// ```
723    #[cfg(not(loom))]
724    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
725    where
726        F: Fn() + Send + Sync + 'static,
727    {
728        self.before_stop = Some(std::sync::Arc::new(f));
729        self
730    }
731
732    /// Executes function `f` just before a thread is parked (goes idle).
733    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
734    /// can be called, and may result in this thread being unparked immediately.
735    ///
736    /// This can be used to start work only when the executor is idle, or for bookkeeping
737    /// and monitoring purposes.
738    ///
739    /// Note: There can only be one park callback for a runtime; calling this function
740    /// more than once replaces the last callback defined, rather than adding to it.
741    ///
742    /// # Examples
743    ///
744    /// ## Multithreaded executor
745    /// ```
746    /// # #[cfg(not(target_family = "wasm"))]
747    /// # {
748    /// # use std::sync::Arc;
749    /// # use std::sync::atomic::{AtomicBool, Ordering};
750    /// # use tokio::runtime;
751    /// # use tokio::sync::Barrier;
752    /// # pub fn main() {
753    /// let once = AtomicBool::new(true);
754    /// let barrier = Arc::new(Barrier::new(2));
755    ///
756    /// let runtime = runtime::Builder::new_multi_thread()
757    ///     .worker_threads(1)
758    ///     .on_thread_park({
759    ///         let barrier = barrier.clone();
760    ///         move || {
761    ///             let barrier = barrier.clone();
762    ///             if once.swap(false, Ordering::Relaxed) {
763    ///                 tokio::spawn(async move { barrier.wait().await; });
764    ///            }
765    ///         }
766    ///     })
767    ///     .build()
768    ///     .unwrap();
769    ///
770    /// runtime.block_on(async {
771    ///    barrier.wait().await;
772    /// })
773    /// # }
774    /// # }
775    /// ```
776    /// ## Current thread executor
777    /// ```
778    /// # use std::sync::Arc;
779    /// # use std::sync::atomic::{AtomicBool, Ordering};
780    /// # use tokio::runtime;
781    /// # use tokio::sync::Barrier;
782    /// # pub fn main() {
783    /// let once = AtomicBool::new(true);
784    /// let barrier = Arc::new(Barrier::new(2));
785    ///
786    /// let runtime = runtime::Builder::new_current_thread()
787    ///     .on_thread_park({
788    ///         let barrier = barrier.clone();
789    ///         move || {
790    ///             let barrier = barrier.clone();
791    ///             if once.swap(false, Ordering::Relaxed) {
792    ///                 tokio::spawn(async move { barrier.wait().await; });
793    ///            }
794    ///         }
795    ///     })
796    ///     .build()
797    ///     .unwrap();
798    ///
799    /// runtime.block_on(async {
800    ///    barrier.wait().await;
801    /// })
802    /// # }
803    /// ```
804    #[cfg(not(loom))]
805    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
806    where
807        F: Fn() + Send + Sync + 'static,
808    {
809        self.before_park = Some(std::sync::Arc::new(f));
810        self
811    }
812
813    /// Executes function `f` just after a thread unparks (starts executing tasks).
814    ///
815    /// This is intended for bookkeeping and monitoring use cases; note that work
816    /// in this callback will increase latencies when the application has allowed one or
817    /// more runtime threads to go idle.
818    ///
819    /// Note: There can only be one unpark callback for a runtime; calling this function
820    /// more than once replaces the last callback defined, rather than adding to it.
821    ///
822    /// # Examples
823    ///
824    /// ```
825    /// # #[cfg(not(target_family = "wasm"))]
826    /// # {
827    /// # use tokio::runtime;
828    /// # pub fn main() {
829    /// let runtime = runtime::Builder::new_multi_thread()
830    ///     .on_thread_unpark(|| {
831    ///         println!("thread unparking");
832    ///     })
833    ///     .build();
834    ///
835    /// runtime.unwrap().block_on(async {
836    ///    tokio::task::yield_now().await;
837    ///    println!("Hello from Tokio!");
838    /// })
839    /// # }
840    /// # }
841    /// ```
842    #[cfg(not(loom))]
843    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
844    where
845        F: Fn() + Send + Sync + 'static,
846    {
847        self.after_unpark = Some(std::sync::Arc::new(f));
848        self
849    }
850
851    /// Executes function `f` just before a task is spawned.
852    ///
853    /// `f` is called within the Tokio context, so functions like
854    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
855    /// invoked immediately.
856    ///
857    /// This can be used for bookkeeping or monitoring purposes.
858    ///
859    /// Note: There can only be one spawn callback for a runtime; calling this function more
860    /// than once replaces the last callback defined, rather than adding to it.
861    ///
862    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
863    ///
864    /// **Note**: This is an [unstable API][unstable]. The public API of this type
865    /// may break in 1.x releases. See [the documentation on unstable
866    /// features][unstable] for details.
867    ///
868    /// [unstable]: crate#unstable-features
869    ///
870    /// # Examples
871    ///
872    /// ```
873    /// # use tokio::runtime;
874    /// # pub fn main() {
875    /// let runtime = runtime::Builder::new_current_thread()
876    ///     .on_task_spawn(|_| {
877    ///         println!("spawning task");
878    ///     })
879    ///     .build()
880    ///     .unwrap();
881    ///
882    /// runtime.block_on(async {
883    ///     tokio::task::spawn(std::future::ready(()));
884    ///
885    ///     for _ in 0..64 {
886    ///         tokio::task::yield_now().await;
887    ///     }
888    /// })
889    /// # }
890    /// ```
891    #[cfg(all(not(loom), tokio_unstable))]
892    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
893    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
894    where
895        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
896    {
897        self.before_spawn = Some(std::sync::Arc::new(f));
898        self
899    }
900
901    /// Executes function `f` just before a task is polled
902    ///
903    /// `f` is called within the Tokio context, so functions like
904    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
905    /// invoked immediately.
906    ///
907    /// **Note**: This is an [unstable API][unstable]. The public API of this type
908    /// may break in 1.x releases. See [the documentation on unstable
909    /// features][unstable] for details.
910    ///
911    /// [unstable]: crate#unstable-features
912    ///
913    /// # Examples
914    ///
915    /// ```
916    /// # #[cfg(not(target_family = "wasm"))]
917    /// # {
918    /// # use std::sync::{atomic::AtomicUsize, Arc};
919    /// # use tokio::task::yield_now;
920    /// # pub fn main() {
921    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
922    /// let poll_start = poll_start_counter.clone();
923    /// let rt = tokio::runtime::Builder::new_multi_thread()
924    ///     .enable_all()
925    ///     .on_before_task_poll(move |meta| {
926    ///         println!("task {} is about to be polled", meta.id())
927    ///     })
928    ///     .build()
929    ///     .unwrap();
930    /// let task = rt.spawn(async {
931    ///     yield_now().await;
932    /// });
933    /// let _ = rt.block_on(task);
934    ///
935    /// # }
936    /// # }
937    /// ```
938    #[cfg(tokio_unstable)]
939    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
940    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
941    where
942        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
943    {
944        self.before_poll = Some(std::sync::Arc::new(f));
945        self
946    }
947
948    /// Executes function `f` just after a task is polled
949    ///
950    /// `f` is called within the Tokio context, so functions like
951    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
952    /// invoked immediately.
953    ///
954    /// **Note**: This is an [unstable API][unstable]. The public API of this type
955    /// may break in 1.x releases. See [the documentation on unstable
956    /// features][unstable] for details.
957    ///
958    /// [unstable]: crate#unstable-features
959    ///
960    /// # Examples
961    ///
962    /// ```
963    /// # #[cfg(not(target_family = "wasm"))]
964    /// # {
965    /// # use std::sync::{atomic::AtomicUsize, Arc};
966    /// # use tokio::task::yield_now;
967    /// # pub fn main() {
968    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
969    /// let poll_stop = poll_stop_counter.clone();
970    /// let rt = tokio::runtime::Builder::new_multi_thread()
971    ///     .enable_all()
972    ///     .on_after_task_poll(move |meta| {
973    ///         println!("task {} completed polling", meta.id());
974    ///     })
975    ///     .build()
976    ///     .unwrap();
977    /// let task = rt.spawn(async {
978    ///     yield_now().await;
979    /// });
980    /// let _ = rt.block_on(task);
981    ///
982    /// # }
983    /// # }
984    /// ```
985    #[cfg(tokio_unstable)]
986    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
987    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
988    where
989        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
990    {
991        self.after_poll = Some(std::sync::Arc::new(f));
992        self
993    }
994
995    /// Executes function `f` just after a task is terminated.
996    ///
997    /// `f` is called within the Tokio context, so functions like
998    /// [`tokio::spawn`](crate::spawn) can be called.
999    ///
1000    /// This can be used for bookkeeping or monitoring purposes.
1001    ///
1002    /// Note: There can only be one task termination callback for a runtime; calling this
1003    /// function more than once replaces the last callback defined, rather than adding to it.
1004    ///
1005    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
1006    ///
1007    /// **Note**: This is an [unstable API][unstable]. The public API of this type
1008    /// may break in 1.x releases. See [the documentation on unstable
1009    /// features][unstable] for details.
1010    ///
1011    /// [unstable]: crate#unstable-features
1012    ///
1013    /// # Examples
1014    ///
1015    /// ```
1016    /// # use tokio::runtime;
1017    /// # pub fn main() {
1018    /// let runtime = runtime::Builder::new_current_thread()
1019    ///     .on_task_terminate(|_| {
1020    ///         println!("killing task");
1021    ///     })
1022    ///     .build()
1023    ///     .unwrap();
1024    ///
1025    /// runtime.block_on(async {
1026    ///     tokio::task::spawn(std::future::ready(()));
1027    ///
1028    ///     for _ in 0..64 {
1029    ///         tokio::task::yield_now().await;
1030    ///     }
1031    /// })
1032    /// # }
1033    /// ```
1034    #[cfg(all(not(loom), tokio_unstable))]
1035    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1036    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
1037    where
1038        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
1039    {
1040        self.after_termination = Some(std::sync::Arc::new(f));
1041        self
1042    }
1043
1044    /// Creates the configured `Runtime`.
1045    ///
1046    /// The returned `Runtime` instance is ready to spawn tasks.
1047    ///
1048    /// # Examples
1049    ///
1050    /// ```
1051    /// # #[cfg(not(target_family = "wasm"))]
1052    /// # {
1053    /// use tokio::runtime::Builder;
1054    ///
1055    /// let rt  = Builder::new_multi_thread().build().unwrap();
1056    ///
1057    /// rt.block_on(async {
1058    ///     println!("Hello from the Tokio runtime");
1059    /// });
1060    /// # }
1061    /// ```
1062    pub fn build(&mut self) -> io::Result<Runtime> {
1063        match &self.kind {
1064            Kind::CurrentThread => self.build_current_thread_runtime(),
1065            #[cfg(feature = "rt-multi-thread")]
1066            Kind::MultiThread => self.build_threaded_runtime(),
1067        }
1068    }
1069
1070    /// Creates the configured [`LocalRuntime`].
1071    ///
1072    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
1073    ///
1074    /// # Panics
1075    ///
1076    /// This will panic if the runtime is configured with [`new_multi_thread()`].
1077    ///
1078    /// [`new_multi_thread()`]: Builder::new_multi_thread
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// use tokio::runtime::{Builder, LocalOptions};
1084    ///
1085    /// let rt = Builder::new_current_thread()
1086    ///     .build_local(LocalOptions::default())
1087    ///     .unwrap();
1088    ///
1089    /// rt.spawn_local(async {
1090    ///     println!("Hello from the Tokio runtime");
1091    /// });
1092    /// ```
1093    #[allow(unused_variables, unreachable_patterns)]
1094    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1095        match &self.kind {
1096            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1097            #[cfg(feature = "rt-multi-thread")]
1098            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1099        }
1100    }
1101
1102    fn get_cfg(&self) -> driver::Cfg {
1103        driver::Cfg {
1104            enable_pause_time: match self.kind {
1105                Kind::CurrentThread => true,
1106                #[cfg(feature = "rt-multi-thread")]
1107                Kind::MultiThread => false,
1108            },
1109            enable_io: self.enable_io,
1110            enable_time: self.enable_time,
1111            start_paused: self.start_paused,
1112            nevents: self.nevents,
1113            timer_flavor: self.timer_flavor,
1114        }
1115    }
1116
1117    /// Sets a custom timeout for a thread in the blocking pool.
1118    ///
1119    /// By default, the timeout for a thread is set to 10 seconds. This can
1120    /// be overridden using `.thread_keep_alive()`.
1121    ///
1122    /// # Example
1123    ///
1124    /// ```
1125    /// # #[cfg(not(target_family = "wasm"))]
1126    /// # {
1127    /// # use tokio::runtime;
1128    /// # use std::time::Duration;
1129    /// # pub fn main() {
1130    /// let rt = runtime::Builder::new_multi_thread()
1131    ///     .thread_keep_alive(Duration::from_millis(100))
1132    ///     .build();
1133    /// # }
1134    /// # }
1135    /// ```
1136    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1137        self.keep_alive = Some(duration);
1138        self
1139    }
1140
1141    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1142    /// task queue.
1143    ///
1144    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1145    ///
1146    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1147    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1148    ///
1149    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1150    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1151    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1152    /// getting started on new work, especially if tasks frequently yield rather than complete
1153    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1154    /// tasks from the local queue will be executed only if the global queue is empty.
1155    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1156    /// tasks quickly complete polling.
1157    ///
1158    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1159    ///
1160    /// # Panics
1161    ///
1162    /// This function will panic if 0 is passed as an argument.
1163    ///
1164    /// # Examples
1165    ///
1166    /// ```
1167    /// # #[cfg(not(target_family = "wasm"))]
1168    /// # {
1169    /// # use tokio::runtime;
1170    /// # pub fn main() {
1171    /// let rt = runtime::Builder::new_multi_thread()
1172    ///     .global_queue_interval(31)
1173    ///     .build();
1174    /// # }
1175    /// # }
1176    /// ```
1177    #[track_caller]
1178    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1179        assert!(val > 0, "global_queue_interval must be greater than 0");
1180        self.global_queue_interval = Some(val);
1181        self
1182    }
1183
1184    /// Sets the number of scheduler ticks after which the scheduler will poll for
1185    /// external events (timers, I/O, and so on).
1186    ///
1187    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1188    ///
1189    /// By default, the event interval is `61` for all scheduler types.
1190    ///
1191    /// Setting the event interval determines the effective "priority" of delivering
1192    /// these external events (which may wake up additional tasks), compared to
1193    /// executing tasks that are currently ready to run. A smaller value is useful
1194    /// when tasks frequently spend a long time in polling, or infrequently yield,
1195    /// which can result in overly long delays picking up I/O events. Conversely,
1196    /// picking up new events requires extra synchronization and syscall overhead,
1197    /// so if tasks generally complete their polling quickly, a higher event interval
1198    /// will minimize that overhead while still keeping the scheduler responsive to
1199    /// events.
1200    ///
1201    /// # Panics
1202    ///
1203    /// This function will panic if 0 is passed as an argument.
1204    ///
1205    /// # Examples
1206    ///
1207    /// ```
1208    /// # #[cfg(not(target_family = "wasm"))]
1209    /// # {
1210    /// # use tokio::runtime;
1211    /// # pub fn main() {
1212    /// let rt = runtime::Builder::new_multi_thread()
1213    ///     .event_interval(31)
1214    ///     .build();
1215    /// # }
1216    /// # }
1217    /// ```
1218    #[track_caller]
1219    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1220        assert!(val > 0, "event_interval must be greater than 0");
1221        self.event_interval = val;
1222        self
1223    }
1224
1225    cfg_unstable! {
1226        /// Configure how the runtime responds to an unhandled panic on a
1227        /// spawned task.
1228        ///
1229        /// By default, an unhandled panic (i.e. a panic not caught by
1230        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1231        /// execution. The panic's error value is forwarded to the task's
1232        /// [`JoinHandle`] and all other spawned tasks continue running.
1233        ///
1234        /// The `unhandled_panic` option enables configuring this behavior.
1235        ///
1236        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1237        ///   spawned tasks have no impact on the runtime's execution.
1238        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1239        ///   shutdown immediately when a spawned task panics even if that
1240        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1241        ///   will immediately terminate and further calls to
1242        ///   [`Runtime::block_on`] will panic.
1243        ///
1244        /// # Panics
1245        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1246        /// on a runtime other than the current thread runtime.
1247        ///
1248        /// # Unstable
1249        ///
1250        /// This option is currently unstable and its implementation is
1251        /// incomplete. The API may change or be removed in the future. See
1252        /// issue [tokio-rs/tokio#4516] for more details.
1253        ///
1254        /// # Examples
1255        ///
1256        /// The following demonstrates a runtime configured to shutdown on
1257        /// panic. The first spawned task panics and results in the runtime
1258        /// shutting down. The second spawned task never has a chance to
1259        /// execute. The call to `block_on` will panic due to the runtime being
1260        /// forcibly shutdown.
1261        ///
1262        /// ```should_panic
1263        /// use tokio::runtime::{self, UnhandledPanic};
1264        ///
1265        /// # pub fn main() {
1266        /// let rt = runtime::Builder::new_current_thread()
1267        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1268        ///     .build()
1269        ///     .unwrap();
1270        ///
1271        /// rt.spawn(async { panic!("boom"); });
1272        /// rt.spawn(async {
1273        ///     // This task never completes.
1274        /// });
1275        ///
1276        /// rt.block_on(async {
1277        ///     // Do some work
1278        /// # loop { tokio::task::yield_now().await; }
1279        /// })
1280        /// # }
1281        /// ```
1282        ///
1283        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1284        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1285        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1286            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1287                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1288            }
1289
1290            self.unhandled_panic = behavior;
1291            self
1292        }
1293
1294        /// Disables the LIFO task scheduler heuristic.
1295        ///
1296        /// The multi-threaded scheduler includes a heuristic for optimizing
1297        /// message-passing patterns. This heuristic results in the **last**
1298        /// scheduled task being polled first.
1299        ///
1300        /// To implement this heuristic, each worker thread has a slot which
1301        /// holds the task that should be polled next. In earlier versions of
1302        /// Tokio, this slot could not be stolen by other worker threads, which
1303        /// can result in lower total throughput when tasks tend to have longer
1304        /// poll times.
1305        ///
1306        /// This configuration option will disable this heuristic resulting in
1307        /// all scheduled tasks being pushed into the worker-local queue. This
1308        /// was intended as a workaround for the LIFO slot not being stealable.
1309        /// As of Tokio 1.51, tasks can be stolen from the LIFO slot. In a
1310        /// future version, this option may be deprecated.
1311        ///
1312        /// # Unstable
1313        ///
1314        /// This configuration option was considered a workaround for the LIFO
1315        /// slot not being stealable. Since this is no longer the case, we will
1316        /// revisit whether or not this option is necessary. See
1317        /// issue [tokio-rs/tokio#4941].
1318        ///
1319        /// # Examples
1320        ///
1321        /// ```
1322        /// # #[cfg(not(target_family = "wasm"))]
1323        /// # {
1324        /// use tokio::runtime;
1325        ///
1326        /// let rt = runtime::Builder::new_multi_thread()
1327        ///     .disable_lifo_slot()
1328        ///     .build()
1329        ///     .unwrap();
1330        /// # }
1331        /// ```
1332        ///
1333        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1334        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1335        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1336            self.disable_lifo_slot = true;
1337            self
1338        }
1339
1340        /// Specifies the random number generation seed to use within all
1341        /// threads associated with the runtime being built.
1342        ///
1343        /// This option is intended to make certain parts of the runtime
1344        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1345        /// [`tokio::select!`] it will ensure that the order that branches are
1346        /// polled is deterministic.
1347        ///
1348        /// In addition to the code specifying `rng_seed` and interacting with
1349        /// the runtime, the internals of Tokio and the Rust compiler may affect
1350        /// the sequences of random numbers. In order to ensure repeatable
1351        /// results, the version of Tokio, the versions of all other
1352        /// dependencies that interact with Tokio, and the Rust compiler version
1353        /// should also all remain constant.
1354        ///
1355        /// # Examples
1356        ///
1357        /// ```
1358        /// # use tokio::runtime::{self, RngSeed};
1359        /// # pub fn main() {
1360        /// let seed = RngSeed::from_bytes(b"place your seed here");
1361        /// let rt = runtime::Builder::new_current_thread()
1362        ///     .rng_seed(seed)
1363        ///     .build();
1364        /// # }
1365        /// ```
1366        ///
1367        /// [`tokio::select!`]: crate::select
1368        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1369            self.seed_generator = RngSeedGenerator::new(seed);
1370            self
1371        }
1372    }
1373
1374    cfg_unstable_metrics! {
1375        /// Enables tracking the distribution of task poll times.
1376        ///
1377        /// Task poll times are not instrumented by default as doing so requires
1378        /// calling [`Instant::now()`] twice per task poll, which could add
1379        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1380        /// metrics data.
1381        ///
1382        /// The histogram uses fixed bucket sizes. In other words, the histogram
1383        /// buckets are not dynamic based on input values. Use the
1384        /// `metrics_poll_time_histogram` builder methods to configure the
1385        /// histogram details.
1386        ///
1387        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1388        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1389        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1390        /// to select [`LogHistogram`] instead.
1391        ///
1392        /// # Examples
1393        ///
1394        /// ```
1395        /// # #[cfg(not(target_family = "wasm"))]
1396        /// # {
1397        /// use tokio::runtime;
1398        ///
1399        /// let rt = runtime::Builder::new_multi_thread()
1400        ///     .enable_metrics_poll_time_histogram()
1401        ///     .build()
1402        ///     .unwrap();
1403        /// # // Test default values here
1404        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1405        /// # let m = rt.handle().metrics();
1406        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1407        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1408        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1409        /// # }
1410        /// ```
1411        ///
1412        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1413        /// [`Instant::now()`]: std::time::Instant::now
1414        /// [`LogHistogram`]: crate::runtime::LogHistogram
1415        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1416        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1417            self.metrics_poll_count_histogram_enable = true;
1418            self
1419        }
1420
1421        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1422        ///
1423        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1424        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1425        #[doc(hidden)]
1426        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1427            self.enable_metrics_poll_time_histogram()
1428        }
1429
1430        /// Sets the histogram scale for tracking the distribution of task poll
1431        /// times.
1432        ///
1433        /// Tracking the distribution of task poll times can be done using a
1434        /// linear or log scale. When using linear scale, each histogram bucket
1435        /// will represent the same range of poll times. When using log scale,
1436        /// each histogram bucket will cover a range twice as big as the
1437        /// previous bucket.
1438        ///
1439        /// **Default:** linear scale.
1440        ///
1441        /// # Examples
1442        ///
1443        /// ```
1444        /// # #[cfg(not(target_family = "wasm"))]
1445        /// # {
1446        /// use tokio::runtime::{self, HistogramScale};
1447        ///
1448        /// # #[allow(deprecated)]
1449        /// let rt = runtime::Builder::new_multi_thread()
1450        ///     .enable_metrics_poll_time_histogram()
1451        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1452        ///     .build()
1453        ///     .unwrap();
1454        /// # }
1455        /// ```
1456        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1457        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1458            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1459            self
1460        }
1461
1462        /// Configure the histogram for tracking poll times
1463        ///
1464        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1465        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1466        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1467        ///
1468        /// # Examples
1469        /// Configure a [`LogHistogram`] with [default configuration]:
1470        /// ```
1471        /// # #[cfg(not(target_family = "wasm"))]
1472        /// # {
1473        /// use tokio::runtime;
1474        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1475        ///
1476        /// let rt = runtime::Builder::new_multi_thread()
1477        ///     .enable_metrics_poll_time_histogram()
1478        ///     .metrics_poll_time_histogram_configuration(
1479        ///         HistogramConfiguration::log(LogHistogram::default())
1480        ///     )
1481        ///     .build()
1482        ///     .unwrap();
1483        /// # }
1484        /// ```
1485        ///
1486        /// Configure a linear histogram with 100 buckets, each 10μs wide
1487        /// ```
1488        /// # #[cfg(not(target_family = "wasm"))]
1489        /// # {
1490        /// use tokio::runtime;
1491        /// use std::time::Duration;
1492        /// use tokio::runtime::HistogramConfiguration;
1493        ///
1494        /// let rt = runtime::Builder::new_multi_thread()
1495        ///     .enable_metrics_poll_time_histogram()
1496        ///     .metrics_poll_time_histogram_configuration(
1497        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1498        ///     )
1499        ///     .build()
1500        ///     .unwrap();
1501        /// # }
1502        /// ```
1503        ///
1504        /// Configure a [`LogHistogram`] with the following settings:
1505        /// - Measure times from 100ns to 120s
1506        /// - Max error of 0.1
1507        /// - No more than 1024 buckets
1508        /// ```
1509        /// # #[cfg(not(target_family = "wasm"))]
1510        /// # {
1511        /// use std::time::Duration;
1512        /// use tokio::runtime;
1513        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1514        ///
1515        /// let rt = runtime::Builder::new_multi_thread()
1516        ///     .enable_metrics_poll_time_histogram()
1517        ///     .metrics_poll_time_histogram_configuration(
1518        ///         HistogramConfiguration::log(LogHistogram::builder()
1519        ///             .max_value(Duration::from_secs(120))
1520        ///             .min_value(Duration::from_nanos(100))
1521        ///             .max_error(0.1)
1522        ///             .max_buckets(1024)
1523        ///             .expect("configuration uses 488 buckets")
1524        ///         )
1525        ///     )
1526        ///     .build()
1527        ///     .unwrap();
1528        /// # }
1529        /// ```
1530        ///
1531        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1532        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1533        /// where each bucket is twice the size of the previous bucket.
1534        /// ```rust
1535        /// use std::time::Duration;
1536        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1537        /// let rt = tokio::runtime::Builder::new_current_thread()
1538        ///     .enable_all()
1539        ///     .enable_metrics_poll_time_histogram()
1540        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1541        ///         LogHistogram::builder()
1542        ///             .min_value(Duration::from_micros(20))
1543        ///             .max_value(Duration::from_millis(4))
1544        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1545        ///             .precision_exact(0)
1546        ///             .max_buckets(10)
1547        ///             .unwrap(),
1548        ///     ))
1549        ///     .build()
1550        ///     .unwrap();
1551        /// ```
1552        ///
1553        /// [`LogHistogram`]: crate::runtime::LogHistogram
1554        /// [default configuration]: crate::runtime::LogHistogramBuilder
1555        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1556        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1557            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1558            self
1559        }
1560
1561        /// Sets the histogram resolution for tracking the distribution of task
1562        /// poll times.
1563        ///
1564        /// The resolution is the histogram's first bucket's range. When using a
1565        /// linear histogram scale, each bucket will cover the same range. When
1566        /// using a log scale, each bucket will cover a range twice as big as
1567        /// the previous bucket. In the log case, the resolution represents the
1568        /// smallest bucket range.
1569        ///
1570        /// Note that, when using log scale, the resolution is rounded up to the
1571        /// nearest power of 2 in nanoseconds.
1572        ///
1573        /// **Default:** 100 microseconds.
1574        ///
1575        /// # Examples
1576        ///
1577        /// ```
1578        /// # #[cfg(not(target_family = "wasm"))]
1579        /// # {
1580        /// use tokio::runtime;
1581        /// use std::time::Duration;
1582        ///
1583        /// # #[allow(deprecated)]
1584        /// let rt = runtime::Builder::new_multi_thread()
1585        ///     .enable_metrics_poll_time_histogram()
1586        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1587        ///     .build()
1588        ///     .unwrap();
1589        /// # }
1590        /// ```
1591        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1592        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1593            assert!(resolution > Duration::from_secs(0));
1594            // Sanity check the argument and also make the cast below safe.
1595            assert!(resolution <= Duration::from_secs(1));
1596
1597            let resolution = resolution.as_nanos() as u64;
1598
1599            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1600            self
1601        }
1602
1603        /// Sets the number of buckets for the histogram tracking the
1604        /// distribution of task poll times.
1605        ///
1606        /// The last bucket tracks all greater values that fall out of other
1607        /// ranges. So, configuring the histogram using a linear scale,
1608        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1609        /// polls that take more than 450ms to complete.
1610        ///
1611        /// **Default:** 10
1612        ///
1613        /// # Examples
1614        ///
1615        /// ```
1616        /// # #[cfg(not(target_family = "wasm"))]
1617        /// # {
1618        /// use tokio::runtime;
1619        ///
1620        /// # #[allow(deprecated)]
1621        /// let rt = runtime::Builder::new_multi_thread()
1622        ///     .enable_metrics_poll_time_histogram()
1623        ///     .metrics_poll_count_histogram_buckets(15)
1624        ///     .build()
1625        ///     .unwrap();
1626        /// # }
1627        /// ```
1628        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1629        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1630            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1631            self
1632        }
1633    }
1634
1635    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1636        use crate::runtime::runtime::Scheduler;
1637
1638        let (scheduler, handle, blocking_pool) =
1639            self.build_current_thread_runtime_components(None)?;
1640
1641        Ok(Runtime::from_parts(
1642            Scheduler::CurrentThread(scheduler),
1643            handle,
1644            blocking_pool,
1645        ))
1646    }
1647
1648    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1649        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1650
1651        let tid = std::thread::current().id();
1652
1653        let (scheduler, handle, blocking_pool) =
1654            self.build_current_thread_runtime_components(Some(tid))?;
1655
1656        Ok(LocalRuntime::from_parts(
1657            LocalRuntimeScheduler::CurrentThread(scheduler),
1658            handle,
1659            blocking_pool,
1660        ))
1661    }
1662
1663    fn build_current_thread_runtime_components(
1664        &mut self,
1665        local_tid: Option<ThreadId>,
1666    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1667        use crate::runtime::scheduler;
1668        use crate::runtime::Config;
1669
1670        let mut cfg = self.get_cfg();
1671        cfg.timer_flavor = TimerFlavor::Traditional;
1672        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1673
1674        // Blocking pool
1675        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1676        let blocking_spawner = blocking_pool.spawner().clone();
1677
1678        // Generate a rng seed for this runtime.
1679        let seed_generator_1 = self.seed_generator.next_generator();
1680        let seed_generator_2 = self.seed_generator.next_generator();
1681
1682        // And now put a single-threaded scheduler on top of the timer. When
1683        // there are no futures ready to do something, it'll let the timer or
1684        // the reactor to generate some new stimuli for the futures to continue
1685        // in their life.
1686        let (scheduler, handle) = CurrentThread::new(
1687            driver,
1688            driver_handle,
1689            blocking_spawner,
1690            seed_generator_2,
1691            Config {
1692                before_park: self.before_park.clone(),
1693                after_unpark: self.after_unpark.clone(),
1694                before_spawn: self.before_spawn.clone(),
1695                #[cfg(tokio_unstable)]
1696                before_poll: self.before_poll.clone(),
1697                #[cfg(tokio_unstable)]
1698                after_poll: self.after_poll.clone(),
1699                after_termination: self.after_termination.clone(),
1700                global_queue_interval: self.global_queue_interval,
1701                event_interval: self.event_interval,
1702                #[cfg(tokio_unstable)]
1703                unhandled_panic: self.unhandled_panic.clone(),
1704                disable_lifo_slot: self.disable_lifo_slot,
1705                // This setting never makes sense for a current thread runtime,
1706                // as it only configures how the I/O driver is stolen across
1707                // workers.
1708                enable_eager_driver_handoff: false,
1709                seed_generator: seed_generator_1,
1710                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1711            },
1712            local_tid,
1713            self.name.clone(),
1714        );
1715
1716        let handle = Handle {
1717            inner: scheduler::Handle::CurrentThread(handle),
1718        };
1719
1720        Ok((scheduler, handle, blocking_pool))
1721    }
1722
1723    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1724        if self.metrics_poll_count_histogram_enable {
1725            Some(self.metrics_poll_count_histogram.clone())
1726        } else {
1727            None
1728        }
1729    }
1730}
1731
1732cfg_io_driver! {
1733    impl Builder {
1734        /// Enables the I/O driver.
1735        ///
1736        /// Doing this enables using net, process, signal, and some I/O types on
1737        /// the runtime.
1738        ///
1739        /// # Examples
1740        ///
1741        /// ```
1742        /// use tokio::runtime;
1743        ///
1744        /// let rt = runtime::Builder::new_multi_thread()
1745        ///     .enable_io()
1746        ///     .build()
1747        ///     .unwrap();
1748        /// ```
1749        pub fn enable_io(&mut self) -> &mut Self {
1750            self.enable_io = true;
1751            self
1752        }
1753
1754        /// Enables the I/O driver and configures the max number of events to be
1755        /// processed per tick.
1756        ///
1757        /// # Examples
1758        ///
1759        /// ```
1760        /// use tokio::runtime;
1761        ///
1762        /// let rt = runtime::Builder::new_current_thread()
1763        ///     .enable_io()
1764        ///     .max_io_events_per_tick(1024)
1765        ///     .build()
1766        ///     .unwrap();
1767        /// ```
1768        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1769            self.nevents = capacity;
1770            self
1771        }
1772    }
1773}
1774
1775cfg_time! {
1776    impl Builder {
1777        /// Enables the time driver.
1778        ///
1779        /// Doing this enables using `tokio::time` on the runtime.
1780        ///
1781        /// # Examples
1782        ///
1783        /// ```
1784        /// # #[cfg(not(target_family = "wasm"))]
1785        /// # {
1786        /// use tokio::runtime;
1787        ///
1788        /// let rt = runtime::Builder::new_multi_thread()
1789        ///     .enable_time()
1790        ///     .build()
1791        ///     .unwrap();
1792        /// # }
1793        /// ```
1794        pub fn enable_time(&mut self) -> &mut Self {
1795            self.enable_time = true;
1796            self
1797        }
1798    }
1799}
1800
1801cfg_io_uring! {
1802    impl Builder {
1803        /// Enables the tokio's io_uring driver.
1804        ///
1805        /// Doing this enables using io_uring operations on the runtime.
1806        ///
1807        /// # Examples
1808        ///
1809        /// ```
1810        /// use tokio::runtime;
1811        ///
1812        /// let rt = runtime::Builder::new_multi_thread()
1813        ///     .enable_io_uring()
1814        ///     .build()
1815        ///     .unwrap();
1816        /// ```
1817        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1818        pub fn enable_io_uring(&mut self) -> &mut Self {
1819            // Currently, the uring flag is equivalent to `enable_io`.
1820            self.enable_io = true;
1821            self
1822        }
1823    }
1824}
1825
1826cfg_test_util! {
1827    impl Builder {
1828        /// Controls if the runtime's clock starts paused or advancing.
1829        ///
1830        /// Pausing time requires the current-thread runtime; construction of
1831        /// the runtime will panic otherwise.
1832        ///
1833        /// # Examples
1834        ///
1835        /// ```
1836        /// use tokio::runtime;
1837        ///
1838        /// let rt = runtime::Builder::new_current_thread()
1839        ///     .enable_time()
1840        ///     .start_paused(true)
1841        ///     .build()
1842        ///     .unwrap();
1843        /// ```
1844        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1845            self.start_paused = start_paused;
1846            self
1847        }
1848    }
1849}
1850
1851cfg_rt_multi_thread! {
1852    impl Builder {
1853        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1854            use crate::loom::sys::num_cpus;
1855            use crate::runtime::{Config, runtime::Scheduler};
1856            use crate::runtime::scheduler::{self, MultiThread};
1857
1858            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1859
1860            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1861
1862            // Create the blocking pool
1863            let blocking_pool =
1864                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1865            let blocking_spawner = blocking_pool.spawner().clone();
1866
1867            // Generate a rng seed for this runtime.
1868            let seed_generator_1 = self.seed_generator.next_generator();
1869            let seed_generator_2 = self.seed_generator.next_generator();
1870
1871            let (scheduler, handle, launch) = MultiThread::new(
1872                worker_threads,
1873                driver,
1874                driver_handle,
1875                blocking_spawner,
1876                seed_generator_2,
1877                Config {
1878                    before_park: self.before_park.clone(),
1879                    after_unpark: self.after_unpark.clone(),
1880                    before_spawn: self.before_spawn.clone(),
1881                    #[cfg(tokio_unstable)]
1882                    before_poll: self.before_poll.clone(),
1883                    #[cfg(tokio_unstable)]
1884                    after_poll: self.after_poll.clone(),
1885                    after_termination: self.after_termination.clone(),
1886                    global_queue_interval: self.global_queue_interval,
1887                    event_interval: self.event_interval,
1888                    #[cfg(tokio_unstable)]
1889                    unhandled_panic: self.unhandled_panic.clone(),
1890                    disable_lifo_slot: self.disable_lifo_slot,
1891                    enable_eager_driver_handoff: self.enable_eager_driver_handoff,
1892                    seed_generator: seed_generator_1,
1893                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1894                },
1895                self.timer_flavor,
1896                self.name.clone(),
1897            );
1898
1899            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1900
1901            // Spawn the thread pool workers
1902            let _enter = handle.enter();
1903            launch.launch();
1904
1905            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1906        }
1907    }
1908}
1909
1910impl fmt::Debug for Builder {
1911    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1912        let mut debug = fmt.debug_struct("Builder");
1913
1914        if let Some(name) = &self.name {
1915            debug.field("name", name);
1916        }
1917
1918        debug
1919            .field("worker_threads", &self.worker_threads)
1920            .field("max_blocking_threads", &self.max_blocking_threads)
1921            .field(
1922                "thread_name",
1923                &"<dyn Fn() -> String + Send + Sync + 'static>",
1924            )
1925            .field("thread_stack_size", &self.thread_stack_size)
1926            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1927            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1928            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1929            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1930            .field(
1931                "enable_eager_driver_handoff",
1932                &self.enable_eager_driver_handoff,
1933            );
1934
1935        if self.name.is_none() {
1936            debug.finish_non_exhaustive()
1937        } else {
1938            debug.finish()
1939        }
1940    }
1941}