tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5 blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, TaskMeta};
9
10use crate::runtime::{LocalOptions, LocalRuntime};
11use crate::util::rand::{RngSeed, RngSeedGenerator};
12
13use crate::runtime::blocking::BlockingPool;
14use crate::runtime::scheduler::CurrentThread;
15use std::fmt;
16use std::io;
17use std::thread::ThreadId;
18use std::time::Duration;
19
20/// Builds Tokio Runtime with custom configuration values.
21///
22/// Methods can be chained in order to set the configuration values. The
23/// Runtime is constructed by calling [`build`].
24///
25/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
26/// or [`Builder::new_current_thread`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: method@Self::build
32/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
33/// [`Builder::new_current_thread`]: method@Self::new_current_thread
34///
35/// # Examples
36///
37/// ```
38/// # #[cfg(not(target_family = "wasm"))]
39/// # {
40/// use tokio::runtime::Builder;
41///
42/// fn main() {
43/// // build runtime
44/// let runtime = Builder::new_multi_thread()
45/// .worker_threads(4)
46/// .thread_name("my-custom-name")
47/// .thread_stack_size(3 * 1024 * 1024)
48/// .build()
49/// .unwrap();
50///
51/// // use runtime ...
52/// }
53/// # }
54/// ```
55pub struct Builder {
56 /// Runtime type
57 kind: Kind,
58
59 /// Name of the runtime.
60 name: Option<String>,
61
62 /// Whether or not to enable the I/O driver
63 enable_io: bool,
64 nevents: usize,
65
66 /// Whether or not to enable the time driver
67 enable_time: bool,
68
69 /// Whether or not the clock should start paused.
70 start_paused: bool,
71
72 /// The number of worker threads, used by Runtime.
73 ///
74 /// Only used when not using the current-thread executor.
75 worker_threads: Option<usize>,
76
77 /// Cap on thread usage.
78 max_blocking_threads: usize,
79
80 /// Name fn used for threads spawned by the runtime.
81 pub(super) thread_name: ThreadNameFn,
82
83 /// Stack size used for threads spawned by the runtime.
84 pub(super) thread_stack_size: Option<usize>,
85
86 /// Callback to run after each thread starts.
87 pub(super) after_start: Option<Callback>,
88
89 /// To run before each worker thread stops
90 pub(super) before_stop: Option<Callback>,
91
92 /// To run before each worker thread is parked.
93 pub(super) before_park: Option<Callback>,
94
95 /// To run after each thread is unparked.
96 pub(super) after_unpark: Option<Callback>,
97
98 /// To run before each task is spawned.
99 pub(super) before_spawn: Option<TaskCallback>,
100
101 /// To run before each poll
102 #[cfg(tokio_unstable)]
103 pub(super) before_poll: Option<TaskCallback>,
104
105 /// To run after each poll
106 #[cfg(tokio_unstable)]
107 pub(super) after_poll: Option<TaskCallback>,
108
109 /// To run after each task is terminated.
110 pub(super) after_termination: Option<TaskCallback>,
111
112 /// Customizable keep alive timeout for `BlockingPool`
113 pub(super) keep_alive: Option<Duration>,
114
115 /// How many ticks before pulling a task from the global/remote queue?
116 ///
117 /// When `None`, the value is unspecified and behavior details are left to
118 /// the scheduler. Each scheduler flavor could choose to either pick its own
119 /// default value or use some other strategy to decide when to poll from the
120 /// global queue. For example, the multi-threaded scheduler uses a
121 /// self-tuning strategy based on mean task poll times.
122 pub(super) global_queue_interval: Option<u32>,
123
124 /// How many ticks before yielding to the driver for timer and I/O events?
125 pub(super) event_interval: u32,
126
127 /// When true, the multi-threade scheduler LIFO slot should not be used.
128 ///
129 /// This option should only be exposed as unstable.
130 pub(super) disable_lifo_slot: bool,
131
132 /// Specify a random number generator seed to provide deterministic results
133 pub(super) seed_generator: RngSeedGenerator,
134
135 /// When true, enables task poll count histogram instrumentation.
136 pub(super) metrics_poll_count_histogram_enable: bool,
137
138 /// Configures the task poll count histogram
139 pub(super) metrics_poll_count_histogram: HistogramBuilder,
140
141 #[cfg(tokio_unstable)]
142 pub(super) unhandled_panic: UnhandledPanic,
143
144 timer_flavor: TimerFlavor,
145
146 /// Whether or not to enable eager hand-off for the I/O and time drivers (in
147 /// `tokio_unstable`).
148 enable_eager_driver_handoff: bool,
149}
150
151cfg_unstable! {
152 /// How the runtime should respond to unhandled panics.
153 ///
154 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
155 /// to configure the runtime behavior when a spawned task panics.
156 ///
157 /// See [`Builder::unhandled_panic`] for more details.
158 #[derive(Debug, Clone)]
159 #[non_exhaustive]
160 pub enum UnhandledPanic {
161 /// The runtime should ignore panics on spawned tasks.
162 ///
163 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
164 /// tasks continue running normally.
165 ///
166 /// This is the default behavior.
167 ///
168 /// # Examples
169 ///
170 /// ```
171 /// # #[cfg(not(target_family = "wasm"))]
172 /// # {
173 /// use tokio::runtime::{self, UnhandledPanic};
174 ///
175 /// # pub fn main() {
176 /// let rt = runtime::Builder::new_current_thread()
177 /// .unhandled_panic(UnhandledPanic::Ignore)
178 /// .build()
179 /// .unwrap();
180 ///
181 /// let task1 = rt.spawn(async { panic!("boom"); });
182 /// let task2 = rt.spawn(async {
183 /// // This task completes normally
184 /// "done"
185 /// });
186 ///
187 /// rt.block_on(async {
188 /// // The panic on the first task is forwarded to the `JoinHandle`
189 /// assert!(task1.await.is_err());
190 ///
191 /// // The second task completes normally
192 /// assert!(task2.await.is_ok());
193 /// })
194 /// # }
195 /// # }
196 /// ```
197 ///
198 /// [`JoinHandle`]: struct@crate::task::JoinHandle
199 Ignore,
200
201 /// The runtime should immediately shutdown if a spawned task panics.
202 ///
203 /// The runtime will immediately shutdown even if the panicked task's
204 /// [`JoinHandle`] is still available. All further spawned tasks will be
205 /// immediately dropped and call to [`Runtime::block_on`] will panic.
206 ///
207 /// # Examples
208 ///
209 /// ```should_panic
210 /// use tokio::runtime::{self, UnhandledPanic};
211 ///
212 /// # pub fn main() {
213 /// let rt = runtime::Builder::new_current_thread()
214 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
215 /// .build()
216 /// .unwrap();
217 ///
218 /// rt.spawn(async { panic!("boom"); });
219 /// rt.spawn(async {
220 /// // This task never completes.
221 /// });
222 ///
223 /// rt.block_on(async {
224 /// // Do some work
225 /// # loop { tokio::task::yield_now().await; }
226 /// })
227 /// # }
228 /// ```
229 ///
230 /// [`JoinHandle`]: struct@crate::task::JoinHandle
231 ShutdownRuntime,
232 }
233}
234
235pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
236
237#[derive(Clone, Copy)]
238pub(crate) enum Kind {
239 CurrentThread,
240 #[cfg(feature = "rt-multi-thread")]
241 MultiThread,
242}
243
244impl Builder {
245 /// Returns a new builder with the current thread scheduler selected.
246 ///
247 /// Configuration methods can be chained on the return value.
248 ///
249 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
250 /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`].
251 ///
252 /// [`LocalSet`]: crate::task::LocalSet
253 /// [`LocalRuntime`]: crate::runtime::LocalRuntime
254 /// [`build_local`]: crate::runtime::Builder::build_local
255 pub fn new_current_thread() -> Builder {
256 #[cfg(loom)]
257 const EVENT_INTERVAL: u32 = 4;
258 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
259 #[cfg(not(loom))]
260 const EVENT_INTERVAL: u32 = 61;
261
262 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
263 }
264
265 /// Returns a new builder with the multi thread scheduler selected.
266 ///
267 /// Configuration methods can be chained on the return value.
268 #[cfg(feature = "rt-multi-thread")]
269 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
270 pub fn new_multi_thread() -> Builder {
271 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
272 Builder::new(Kind::MultiThread, 61)
273 }
274
275 /// Returns a new runtime builder initialized with default configuration
276 /// values.
277 ///
278 /// Configuration methods can be chained on the return value.
279 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
280 Builder {
281 kind,
282
283 // Default runtime name
284 name: None,
285
286 // I/O defaults to "off"
287 enable_io: false,
288 nevents: 1024,
289
290 // Time defaults to "off"
291 enable_time: false,
292
293 // The clock starts not-paused
294 start_paused: false,
295
296 // Read from environment variable first in multi-threaded mode.
297 // Default to lazy auto-detection (one thread per CPU core)
298 worker_threads: None,
299
300 max_blocking_threads: 512,
301
302 // Default thread name
303 thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
304
305 // Do not set a stack size by default
306 thread_stack_size: None,
307
308 // No worker thread callbacks
309 after_start: None,
310 before_stop: None,
311 before_park: None,
312 after_unpark: None,
313
314 before_spawn: None,
315 after_termination: None,
316
317 #[cfg(tokio_unstable)]
318 before_poll: None,
319 #[cfg(tokio_unstable)]
320 after_poll: None,
321
322 keep_alive: None,
323
324 // Defaults for these values depend on the scheduler kind, so we get them
325 // as parameters.
326 global_queue_interval: None,
327 event_interval,
328
329 seed_generator: RngSeedGenerator::new(RngSeed::new()),
330
331 #[cfg(tokio_unstable)]
332 unhandled_panic: UnhandledPanic::Ignore,
333
334 metrics_poll_count_histogram_enable: false,
335
336 metrics_poll_count_histogram: HistogramBuilder::default(),
337
338 disable_lifo_slot: false,
339
340 timer_flavor: TimerFlavor::Traditional,
341
342 // Eager driver handoff is disabled by default.
343 enable_eager_driver_handoff: false,
344 }
345 }
346
347 /// Enables both I/O and time drivers.
348 ///
349 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
350 /// individually. If additional components are added to Tokio in the future,
351 /// `enable_all` will include these future components.
352 ///
353 /// # Examples
354 ///
355 /// ```
356 /// # #[cfg(not(target_family = "wasm"))]
357 /// # {
358 /// use tokio::runtime;
359 ///
360 /// let rt = runtime::Builder::new_multi_thread()
361 /// .enable_all()
362 /// .build()
363 /// .unwrap();
364 /// # }
365 /// ```
366 pub fn enable_all(&mut self) -> &mut Self {
367 #[cfg(any(
368 feature = "net",
369 all(unix, feature = "process"),
370 all(unix, feature = "signal")
371 ))]
372 self.enable_io();
373
374 #[cfg(all(
375 tokio_unstable,
376 feature = "io-uring",
377 feature = "rt",
378 feature = "fs",
379 target_os = "linux",
380 ))]
381 self.enable_io_uring();
382
383 #[cfg(feature = "time")]
384 self.enable_time();
385
386 self
387 }
388
389 /// Enables the alternative timer implementation, which is disabled by default.
390 ///
391 /// The alternative timer implementation is an unstable feature that may
392 /// provide better performance on multi-threaded runtimes with a large number
393 /// of worker threads.
394 ///
395 /// This option only applies to multi-threaded runtimes. Attempting to use
396 /// this option with any other runtime type will have no effect.
397 ///
398 /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// # #[cfg(not(target_family = "wasm"))]
404 /// # {
405 /// use tokio::runtime;
406 ///
407 /// let rt = runtime::Builder::new_multi_thread()
408 /// .enable_alt_timer()
409 /// .build()
410 /// .unwrap();
411 /// # }
412 /// ```
413 #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
414 #[cfg_attr(
415 docsrs,
416 doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
417 )]
418 pub fn enable_alt_timer(&mut self) -> &mut Self {
419 self.enable_time();
420 self.timer_flavor = TimerFlavor::Alternative;
421 self
422 }
423
424 /// Enable eager hand-off of the I/O and time drivers for multi-threaded
425 /// runtimes, which is disabled by default.
426 ///
427 /// When this option is enabled, a worker thread which has parked on the I/O
428 /// or time driver will notify another worker thread once it is preparing to
429 /// begin polling a task from the run queue, so that the notified worker can
430 /// begin polling the I/O or time driver. This can reduce the latency with
431 /// which I/O and timer notifications are processed, especially when some
432 /// tasks have polls that take a long time to complete. In addition, it can
433 /// reduce the risk of a deadlock which may occur when a task blocks the
434 /// worker thread which is holding the I/O or time driver until some other
435 /// task, which is waiting for a notification from *that* driver, unblocks
436 /// it.
437 ///
438 /// This option is disabled by default, as enabling it may potentially
439 /// increase contention due to extra synchronization in cross-driver
440 /// wakeups.
441 ///
442 /// This option only applies to multi-threaded runtimes. Attempting to use
443 /// this option with any other runtime type will have no effect.
444 ///
445 /// **Note**: This is an [unstable API][unstable]. Eager driver hand-off is
446 /// an experimental feature whose behavior may be removed or changed in 1.x
447 /// releases. See [the documentation on unstable features][unstable] for
448 /// details.
449 ///
450 /// [unstable]: crate#unstable-features
451 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
452 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt-multi-thread"))))]
453 pub fn enable_eager_driver_handoff(&mut self) -> &mut Self {
454 self.enable_eager_driver_handoff = true;
455 self
456 }
457
458 /// Sets the number of worker threads the `Runtime` will use.
459 ///
460 /// This can be any number above 0 though it is advised to keep this value
461 /// on the smaller side.
462 ///
463 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
464 ///
465 /// # Default
466 ///
467 /// The default value is the number of cores available to the system.
468 ///
469 /// When using the `current_thread` runtime this method has no effect.
470 ///
471 /// # Examples
472 ///
473 /// ## Multi threaded runtime with 4 threads
474 ///
475 /// ```
476 /// # #[cfg(not(target_family = "wasm"))]
477 /// # {
478 /// use tokio::runtime;
479 ///
480 /// // This will spawn a work-stealing runtime with 4 worker threads.
481 /// let rt = runtime::Builder::new_multi_thread()
482 /// .worker_threads(4)
483 /// .build()
484 /// .unwrap();
485 ///
486 /// rt.spawn(async move {});
487 /// # }
488 /// ```
489 ///
490 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
491 ///
492 /// ```
493 /// use tokio::runtime;
494 ///
495 /// // Create a runtime that _must_ be driven from a call
496 /// // to `Runtime::block_on`.
497 /// let rt = runtime::Builder::new_current_thread()
498 /// .build()
499 /// .unwrap();
500 ///
501 /// // This will run the runtime and future on the current thread
502 /// rt.block_on(async move {});
503 /// ```
504 ///
505 /// # Panics
506 ///
507 /// This will panic if `val` is not larger than `0`.
508 #[track_caller]
509 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
510 assert!(val > 0, "Worker threads cannot be set to 0");
511 self.worker_threads = Some(val);
512 self
513 }
514
515 /// Specifies the limit for additional threads spawned by the Runtime.
516 ///
517 /// These threads are used for blocking operations like tasks spawned
518 /// through [`spawn_blocking`], this includes but is not limited to:
519 /// - [`fs`] operations
520 /// - dns resolution through [`ToSocketAddrs`]
521 /// - writing to [`Stdout`] or [`Stderr`]
522 /// - reading from [`Stdin`]
523 ///
524 /// Unlike the [`worker_threads`], they are not always active and will exit
525 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
526 ///
527 /// It's recommended to not set this limit too low in order to avoid hanging on operations
528 /// requiring [`spawn_blocking`].
529 ///
530 /// The default value is 512.
531 ///
532 /// # Queue Behavior
533 ///
534 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
535 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
536 /// method has not been reached, a new thread will be spawned. If no idle thread is available
537 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
538 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
539 /// it could potentially grow unbounded.
540 ///
541 /// # Panics
542 ///
543 /// This will panic if `val` is not larger than `0`.
544 ///
545 /// # Upgrading from 0.x
546 ///
547 /// In old versions `max_threads` limited both blocking and worker threads, but the
548 /// current `max_blocking_threads` does not include async worker threads in the count.
549 ///
550 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
551 /// [`fs`]: mod@crate::fs
552 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
553 /// [`Stdout`]: struct@crate::io::Stdout
554 /// [`Stdin`]: struct@crate::io::Stdin
555 /// [`Stderr`]: struct@crate::io::Stderr
556 /// [`worker_threads`]: Self::worker_threads
557 /// [`thread_keep_alive`]: Self::thread_keep_alive
558 #[track_caller]
559 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
560 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
561 assert!(val > 0, "Max blocking threads cannot be set to 0");
562 self.max_blocking_threads = val;
563 self
564 }
565
566 /// Sets name of threads spawned by the `Runtime`'s thread pool.
567 ///
568 /// The default name is "tokio-rt-worker".
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// # #[cfg(not(target_family = "wasm"))]
574 /// # {
575 /// # use tokio::runtime;
576 ///
577 /// # pub fn main() {
578 /// let rt = runtime::Builder::new_multi_thread()
579 /// .thread_name("my-pool")
580 /// .build();
581 /// # }
582 /// # }
583 /// ```
584 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
585 let val = val.into();
586 self.thread_name = std::sync::Arc::new(move || val.clone());
587 self
588 }
589
590 /// Sets the name of the runtime.
591 ///
592 /// # Examples
593 ///
594 /// ```
595 /// # #[cfg(not(target_family = "wasm"))]
596 /// # {
597 /// # use tokio::runtime;
598 ///
599 /// # pub fn main() {
600 /// let rt = runtime::Builder::new_multi_thread()
601 /// .name("my-runtime")
602 /// .build();
603 /// # }
604 /// # }
605 /// ```
606 /// # Panics
607 ///
608 /// This function will panic if an empty value is passed as an argument.
609 ///
610 #[track_caller]
611 pub fn name(&mut self, val: impl Into<String>) -> &mut Self {
612 let val = val.into();
613 assert!(!val.trim().is_empty(), "runtime name shouldn't be empty");
614 self.name = Some(val);
615 self
616 }
617
618 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
619 ///
620 /// The default name fn is `|| "tokio-rt-worker".into()`.
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// # #[cfg(not(target_family = "wasm"))]
626 /// # {
627 /// # use tokio::runtime;
628 /// # use std::sync::atomic::{AtomicUsize, Ordering};
629 /// # pub fn main() {
630 /// let rt = runtime::Builder::new_multi_thread()
631 /// .thread_name_fn(|| {
632 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
633 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
634 /// format!("my-pool-{}", id)
635 /// })
636 /// .build();
637 /// # }
638 /// # }
639 /// ```
640 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
641 where
642 F: Fn() -> String + Send + Sync + 'static,
643 {
644 self.thread_name = std::sync::Arc::new(f);
645 self
646 }
647
648 /// Sets the stack size (in bytes) for worker threads.
649 ///
650 /// The actual stack size may be greater than this value if the platform
651 /// specifies minimal stack size.
652 ///
653 /// The default stack size for spawned threads is 2 MiB, though this
654 /// particular stack size is subject to change in the future.
655 ///
656 /// # Examples
657 ///
658 /// ```
659 /// # #[cfg(not(target_family = "wasm"))]
660 /// # {
661 /// # use tokio::runtime;
662 ///
663 /// # pub fn main() {
664 /// let rt = runtime::Builder::new_multi_thread()
665 /// .thread_stack_size(32 * 1024)
666 /// .build();
667 /// # }
668 /// # }
669 /// ```
670 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
671 self.thread_stack_size = Some(val);
672 self
673 }
674
675 /// Executes function `f` after each thread is started but before it starts
676 /// doing work.
677 ///
678 /// This is intended for bookkeeping and monitoring use cases.
679 ///
680 /// # Examples
681 ///
682 /// ```
683 /// # #[cfg(not(target_family = "wasm"))]
684 /// # {
685 /// # use tokio::runtime;
686 /// # pub fn main() {
687 /// let runtime = runtime::Builder::new_multi_thread()
688 /// .on_thread_start(|| {
689 /// println!("thread started");
690 /// })
691 /// .build();
692 /// # }
693 /// # }
694 /// ```
695 #[cfg(not(loom))]
696 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
697 where
698 F: Fn() + Send + Sync + 'static,
699 {
700 self.after_start = Some(std::sync::Arc::new(f));
701 self
702 }
703
704 /// Executes function `f` before each thread stops.
705 ///
706 /// This is intended for bookkeeping and monitoring use cases.
707 ///
708 /// # Examples
709 ///
710 /// ```
711 /// # #[cfg(not(target_family = "wasm"))]
712 /// {
713 /// # use tokio::runtime;
714 /// # pub fn main() {
715 /// let runtime = runtime::Builder::new_multi_thread()
716 /// .on_thread_stop(|| {
717 /// println!("thread stopping");
718 /// })
719 /// .build();
720 /// # }
721 /// # }
722 /// ```
723 #[cfg(not(loom))]
724 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
725 where
726 F: Fn() + Send + Sync + 'static,
727 {
728 self.before_stop = Some(std::sync::Arc::new(f));
729 self
730 }
731
732 /// Executes function `f` just before a thread is parked (goes idle).
733 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
734 /// can be called, and may result in this thread being unparked immediately.
735 ///
736 /// This can be used to start work only when the executor is idle, or for bookkeeping
737 /// and monitoring purposes.
738 ///
739 /// Note: There can only be one park callback for a runtime; calling this function
740 /// more than once replaces the last callback defined, rather than adding to it.
741 ///
742 /// # Examples
743 ///
744 /// ## Multithreaded executor
745 /// ```
746 /// # #[cfg(not(target_family = "wasm"))]
747 /// # {
748 /// # use std::sync::Arc;
749 /// # use std::sync::atomic::{AtomicBool, Ordering};
750 /// # use tokio::runtime;
751 /// # use tokio::sync::Barrier;
752 /// # pub fn main() {
753 /// let once = AtomicBool::new(true);
754 /// let barrier = Arc::new(Barrier::new(2));
755 ///
756 /// let runtime = runtime::Builder::new_multi_thread()
757 /// .worker_threads(1)
758 /// .on_thread_park({
759 /// let barrier = barrier.clone();
760 /// move || {
761 /// let barrier = barrier.clone();
762 /// if once.swap(false, Ordering::Relaxed) {
763 /// tokio::spawn(async move { barrier.wait().await; });
764 /// }
765 /// }
766 /// })
767 /// .build()
768 /// .unwrap();
769 ///
770 /// runtime.block_on(async {
771 /// barrier.wait().await;
772 /// })
773 /// # }
774 /// # }
775 /// ```
776 /// ## Current thread executor
777 /// ```
778 /// # use std::sync::Arc;
779 /// # use std::sync::atomic::{AtomicBool, Ordering};
780 /// # use tokio::runtime;
781 /// # use tokio::sync::Barrier;
782 /// # pub fn main() {
783 /// let once = AtomicBool::new(true);
784 /// let barrier = Arc::new(Barrier::new(2));
785 ///
786 /// let runtime = runtime::Builder::new_current_thread()
787 /// .on_thread_park({
788 /// let barrier = barrier.clone();
789 /// move || {
790 /// let barrier = barrier.clone();
791 /// if once.swap(false, Ordering::Relaxed) {
792 /// tokio::spawn(async move { barrier.wait().await; });
793 /// }
794 /// }
795 /// })
796 /// .build()
797 /// .unwrap();
798 ///
799 /// runtime.block_on(async {
800 /// barrier.wait().await;
801 /// })
802 /// # }
803 /// ```
804 #[cfg(not(loom))]
805 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
806 where
807 F: Fn() + Send + Sync + 'static,
808 {
809 self.before_park = Some(std::sync::Arc::new(f));
810 self
811 }
812
813 /// Executes function `f` just after a thread unparks (starts executing tasks).
814 ///
815 /// This is intended for bookkeeping and monitoring use cases; note that work
816 /// in this callback will increase latencies when the application has allowed one or
817 /// more runtime threads to go idle.
818 ///
819 /// Note: There can only be one unpark callback for a runtime; calling this function
820 /// more than once replaces the last callback defined, rather than adding to it.
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// # #[cfg(not(target_family = "wasm"))]
826 /// # {
827 /// # use tokio::runtime;
828 /// # pub fn main() {
829 /// let runtime = runtime::Builder::new_multi_thread()
830 /// .on_thread_unpark(|| {
831 /// println!("thread unparking");
832 /// })
833 /// .build();
834 ///
835 /// runtime.unwrap().block_on(async {
836 /// tokio::task::yield_now().await;
837 /// println!("Hello from Tokio!");
838 /// })
839 /// # }
840 /// # }
841 /// ```
842 #[cfg(not(loom))]
843 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
844 where
845 F: Fn() + Send + Sync + 'static,
846 {
847 self.after_unpark = Some(std::sync::Arc::new(f));
848 self
849 }
850
851 /// Executes function `f` just before a task is spawned.
852 ///
853 /// `f` is called within the Tokio context, so functions like
854 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
855 /// invoked immediately.
856 ///
857 /// This can be used for bookkeeping or monitoring purposes.
858 ///
859 /// Note: There can only be one spawn callback for a runtime; calling this function more
860 /// than once replaces the last callback defined, rather than adding to it.
861 ///
862 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
863 ///
864 /// **Note**: This is an [unstable API][unstable]. The public API of this type
865 /// may break in 1.x releases. See [the documentation on unstable
866 /// features][unstable] for details.
867 ///
868 /// [unstable]: crate#unstable-features
869 ///
870 /// # Examples
871 ///
872 /// ```
873 /// # use tokio::runtime;
874 /// # pub fn main() {
875 /// let runtime = runtime::Builder::new_current_thread()
876 /// .on_task_spawn(|_| {
877 /// println!("spawning task");
878 /// })
879 /// .build()
880 /// .unwrap();
881 ///
882 /// runtime.block_on(async {
883 /// tokio::task::spawn(std::future::ready(()));
884 ///
885 /// for _ in 0..64 {
886 /// tokio::task::yield_now().await;
887 /// }
888 /// })
889 /// # }
890 /// ```
891 #[cfg(all(not(loom), tokio_unstable))]
892 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
893 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
894 where
895 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
896 {
897 self.before_spawn = Some(std::sync::Arc::new(f));
898 self
899 }
900
901 /// Executes function `f` just before a task is polled
902 ///
903 /// `f` is called within the Tokio context, so functions like
904 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
905 /// invoked immediately.
906 ///
907 /// **Note**: This is an [unstable API][unstable]. The public API of this type
908 /// may break in 1.x releases. See [the documentation on unstable
909 /// features][unstable] for details.
910 ///
911 /// [unstable]: crate#unstable-features
912 ///
913 /// # Examples
914 ///
915 /// ```
916 /// # #[cfg(not(target_family = "wasm"))]
917 /// # {
918 /// # use std::sync::{atomic::AtomicUsize, Arc};
919 /// # use tokio::task::yield_now;
920 /// # pub fn main() {
921 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
922 /// let poll_start = poll_start_counter.clone();
923 /// let rt = tokio::runtime::Builder::new_multi_thread()
924 /// .enable_all()
925 /// .on_before_task_poll(move |meta| {
926 /// println!("task {} is about to be polled", meta.id())
927 /// })
928 /// .build()
929 /// .unwrap();
930 /// let task = rt.spawn(async {
931 /// yield_now().await;
932 /// });
933 /// let _ = rt.block_on(task);
934 ///
935 /// # }
936 /// # }
937 /// ```
938 #[cfg(tokio_unstable)]
939 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
940 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
941 where
942 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
943 {
944 self.before_poll = Some(std::sync::Arc::new(f));
945 self
946 }
947
948 /// Executes function `f` just after a task is polled
949 ///
950 /// `f` is called within the Tokio context, so functions like
951 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
952 /// invoked immediately.
953 ///
954 /// **Note**: This is an [unstable API][unstable]. The public API of this type
955 /// may break in 1.x releases. See [the documentation on unstable
956 /// features][unstable] for details.
957 ///
958 /// [unstable]: crate#unstable-features
959 ///
960 /// # Examples
961 ///
962 /// ```
963 /// # #[cfg(not(target_family = "wasm"))]
964 /// # {
965 /// # use std::sync::{atomic::AtomicUsize, Arc};
966 /// # use tokio::task::yield_now;
967 /// # pub fn main() {
968 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
969 /// let poll_stop = poll_stop_counter.clone();
970 /// let rt = tokio::runtime::Builder::new_multi_thread()
971 /// .enable_all()
972 /// .on_after_task_poll(move |meta| {
973 /// println!("task {} completed polling", meta.id());
974 /// })
975 /// .build()
976 /// .unwrap();
977 /// let task = rt.spawn(async {
978 /// yield_now().await;
979 /// });
980 /// let _ = rt.block_on(task);
981 ///
982 /// # }
983 /// # }
984 /// ```
985 #[cfg(tokio_unstable)]
986 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
987 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
988 where
989 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
990 {
991 self.after_poll = Some(std::sync::Arc::new(f));
992 self
993 }
994
995 /// Executes function `f` just after a task is terminated.
996 ///
997 /// `f` is called within the Tokio context, so functions like
998 /// [`tokio::spawn`](crate::spawn) can be called.
999 ///
1000 /// This can be used for bookkeeping or monitoring purposes.
1001 ///
1002 /// Note: There can only be one task termination callback for a runtime; calling this
1003 /// function more than once replaces the last callback defined, rather than adding to it.
1004 ///
1005 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
1006 ///
1007 /// **Note**: This is an [unstable API][unstable]. The public API of this type
1008 /// may break in 1.x releases. See [the documentation on unstable
1009 /// features][unstable] for details.
1010 ///
1011 /// [unstable]: crate#unstable-features
1012 ///
1013 /// # Examples
1014 ///
1015 /// ```
1016 /// # use tokio::runtime;
1017 /// # pub fn main() {
1018 /// let runtime = runtime::Builder::new_current_thread()
1019 /// .on_task_terminate(|_| {
1020 /// println!("killing task");
1021 /// })
1022 /// .build()
1023 /// .unwrap();
1024 ///
1025 /// runtime.block_on(async {
1026 /// tokio::task::spawn(std::future::ready(()));
1027 ///
1028 /// for _ in 0..64 {
1029 /// tokio::task::yield_now().await;
1030 /// }
1031 /// })
1032 /// # }
1033 /// ```
1034 #[cfg(all(not(loom), tokio_unstable))]
1035 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1036 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
1037 where
1038 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
1039 {
1040 self.after_termination = Some(std::sync::Arc::new(f));
1041 self
1042 }
1043
1044 /// Creates the configured `Runtime`.
1045 ///
1046 /// The returned `Runtime` instance is ready to spawn tasks.
1047 ///
1048 /// # Examples
1049 ///
1050 /// ```
1051 /// # #[cfg(not(target_family = "wasm"))]
1052 /// # {
1053 /// use tokio::runtime::Builder;
1054 ///
1055 /// let rt = Builder::new_multi_thread().build().unwrap();
1056 ///
1057 /// rt.block_on(async {
1058 /// println!("Hello from the Tokio runtime");
1059 /// });
1060 /// # }
1061 /// ```
1062 pub fn build(&mut self) -> io::Result<Runtime> {
1063 match &self.kind {
1064 Kind::CurrentThread => self.build_current_thread_runtime(),
1065 #[cfg(feature = "rt-multi-thread")]
1066 Kind::MultiThread => self.build_threaded_runtime(),
1067 }
1068 }
1069
1070 /// Creates the configured [`LocalRuntime`].
1071 ///
1072 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
1073 ///
1074 /// # Panics
1075 ///
1076 /// This will panic if the runtime is configured with [`new_multi_thread()`].
1077 ///
1078 /// [`new_multi_thread()`]: Builder::new_multi_thread
1079 ///
1080 /// # Examples
1081 ///
1082 /// ```
1083 /// use tokio::runtime::{Builder, LocalOptions};
1084 ///
1085 /// let rt = Builder::new_current_thread()
1086 /// .build_local(LocalOptions::default())
1087 /// .unwrap();
1088 ///
1089 /// rt.spawn_local(async {
1090 /// println!("Hello from the Tokio runtime");
1091 /// });
1092 /// ```
1093 #[allow(unused_variables, unreachable_patterns)]
1094 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1095 match &self.kind {
1096 Kind::CurrentThread => self.build_current_thread_local_runtime(),
1097 #[cfg(feature = "rt-multi-thread")]
1098 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1099 }
1100 }
1101
1102 fn get_cfg(&self) -> driver::Cfg {
1103 driver::Cfg {
1104 enable_pause_time: match self.kind {
1105 Kind::CurrentThread => true,
1106 #[cfg(feature = "rt-multi-thread")]
1107 Kind::MultiThread => false,
1108 },
1109 enable_io: self.enable_io,
1110 enable_time: self.enable_time,
1111 start_paused: self.start_paused,
1112 nevents: self.nevents,
1113 timer_flavor: self.timer_flavor,
1114 }
1115 }
1116
1117 /// Sets a custom timeout for a thread in the blocking pool.
1118 ///
1119 /// By default, the timeout for a thread is set to 10 seconds. This can
1120 /// be overridden using `.thread_keep_alive()`.
1121 ///
1122 /// # Example
1123 ///
1124 /// ```
1125 /// # #[cfg(not(target_family = "wasm"))]
1126 /// # {
1127 /// # use tokio::runtime;
1128 /// # use std::time::Duration;
1129 /// # pub fn main() {
1130 /// let rt = runtime::Builder::new_multi_thread()
1131 /// .thread_keep_alive(Duration::from_millis(100))
1132 /// .build();
1133 /// # }
1134 /// # }
1135 /// ```
1136 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1137 self.keep_alive = Some(duration);
1138 self
1139 }
1140
1141 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1142 /// task queue.
1143 ///
1144 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1145 ///
1146 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1147 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1148 ///
1149 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1150 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1151 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1152 /// getting started on new work, especially if tasks frequently yield rather than complete
1153 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1154 /// tasks from the local queue will be executed only if the global queue is empty.
1155 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1156 /// tasks quickly complete polling.
1157 ///
1158 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1159 ///
1160 /// # Panics
1161 ///
1162 /// This function will panic if 0 is passed as an argument.
1163 ///
1164 /// # Examples
1165 ///
1166 /// ```
1167 /// # #[cfg(not(target_family = "wasm"))]
1168 /// # {
1169 /// # use tokio::runtime;
1170 /// # pub fn main() {
1171 /// let rt = runtime::Builder::new_multi_thread()
1172 /// .global_queue_interval(31)
1173 /// .build();
1174 /// # }
1175 /// # }
1176 /// ```
1177 #[track_caller]
1178 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1179 assert!(val > 0, "global_queue_interval must be greater than 0");
1180 self.global_queue_interval = Some(val);
1181 self
1182 }
1183
1184 /// Sets the number of scheduler ticks after which the scheduler will poll for
1185 /// external events (timers, I/O, and so on).
1186 ///
1187 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1188 ///
1189 /// By default, the event interval is `61` for all scheduler types.
1190 ///
1191 /// Setting the event interval determines the effective "priority" of delivering
1192 /// these external events (which may wake up additional tasks), compared to
1193 /// executing tasks that are currently ready to run. A smaller value is useful
1194 /// when tasks frequently spend a long time in polling, or infrequently yield,
1195 /// which can result in overly long delays picking up I/O events. Conversely,
1196 /// picking up new events requires extra synchronization and syscall overhead,
1197 /// so if tasks generally complete their polling quickly, a higher event interval
1198 /// will minimize that overhead while still keeping the scheduler responsive to
1199 /// events.
1200 ///
1201 /// # Panics
1202 ///
1203 /// This function will panic if 0 is passed as an argument.
1204 ///
1205 /// # Examples
1206 ///
1207 /// ```
1208 /// # #[cfg(not(target_family = "wasm"))]
1209 /// # {
1210 /// # use tokio::runtime;
1211 /// # pub fn main() {
1212 /// let rt = runtime::Builder::new_multi_thread()
1213 /// .event_interval(31)
1214 /// .build();
1215 /// # }
1216 /// # }
1217 /// ```
1218 #[track_caller]
1219 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1220 assert!(val > 0, "event_interval must be greater than 0");
1221 self.event_interval = val;
1222 self
1223 }
1224
1225 cfg_unstable! {
1226 /// Configure how the runtime responds to an unhandled panic on a
1227 /// spawned task.
1228 ///
1229 /// By default, an unhandled panic (i.e. a panic not caught by
1230 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1231 /// execution. The panic's error value is forwarded to the task's
1232 /// [`JoinHandle`] and all other spawned tasks continue running.
1233 ///
1234 /// The `unhandled_panic` option enables configuring this behavior.
1235 ///
1236 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1237 /// spawned tasks have no impact on the runtime's execution.
1238 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1239 /// shutdown immediately when a spawned task panics even if that
1240 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1241 /// will immediately terminate and further calls to
1242 /// [`Runtime::block_on`] will panic.
1243 ///
1244 /// # Panics
1245 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1246 /// on a runtime other than the current thread runtime.
1247 ///
1248 /// # Unstable
1249 ///
1250 /// This option is currently unstable and its implementation is
1251 /// incomplete. The API may change or be removed in the future. See
1252 /// issue [tokio-rs/tokio#4516] for more details.
1253 ///
1254 /// # Examples
1255 ///
1256 /// The following demonstrates a runtime configured to shutdown on
1257 /// panic. The first spawned task panics and results in the runtime
1258 /// shutting down. The second spawned task never has a chance to
1259 /// execute. The call to `block_on` will panic due to the runtime being
1260 /// forcibly shutdown.
1261 ///
1262 /// ```should_panic
1263 /// use tokio::runtime::{self, UnhandledPanic};
1264 ///
1265 /// # pub fn main() {
1266 /// let rt = runtime::Builder::new_current_thread()
1267 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1268 /// .build()
1269 /// .unwrap();
1270 ///
1271 /// rt.spawn(async { panic!("boom"); });
1272 /// rt.spawn(async {
1273 /// // This task never completes.
1274 /// });
1275 ///
1276 /// rt.block_on(async {
1277 /// // Do some work
1278 /// # loop { tokio::task::yield_now().await; }
1279 /// })
1280 /// # }
1281 /// ```
1282 ///
1283 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1284 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1285 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1286 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1287 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1288 }
1289
1290 self.unhandled_panic = behavior;
1291 self
1292 }
1293
1294 /// Disables the LIFO task scheduler heuristic.
1295 ///
1296 /// The multi-threaded scheduler includes a heuristic for optimizing
1297 /// message-passing patterns. This heuristic results in the **last**
1298 /// scheduled task being polled first.
1299 ///
1300 /// To implement this heuristic, each worker thread has a slot which
1301 /// holds the task that should be polled next. In earlier versions of
1302 /// Tokio, this slot could not be stolen by other worker threads, which
1303 /// can result in lower total throughput when tasks tend to have longer
1304 /// poll times.
1305 ///
1306 /// This configuration option will disable this heuristic resulting in
1307 /// all scheduled tasks being pushed into the worker-local queue. This
1308 /// was intended as a workaround for the LIFO slot not being stealable.
1309 /// As of Tokio 1.51, tasks can be stolen from the LIFO slot. In a
1310 /// future version, this option may be deprecated.
1311 ///
1312 /// # Unstable
1313 ///
1314 /// This configuration option was considered a workaround for the LIFO
1315 /// slot not being stealable. Since this is no longer the case, we will
1316 /// revisit whether or not this option is necessary. See
1317 /// issue [tokio-rs/tokio#4941].
1318 ///
1319 /// # Examples
1320 ///
1321 /// ```
1322 /// # #[cfg(not(target_family = "wasm"))]
1323 /// # {
1324 /// use tokio::runtime;
1325 ///
1326 /// let rt = runtime::Builder::new_multi_thread()
1327 /// .disable_lifo_slot()
1328 /// .build()
1329 /// .unwrap();
1330 /// # }
1331 /// ```
1332 ///
1333 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1334 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1335 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1336 self.disable_lifo_slot = true;
1337 self
1338 }
1339
1340 /// Specifies the random number generation seed to use within all
1341 /// threads associated with the runtime being built.
1342 ///
1343 /// This option is intended to make certain parts of the runtime
1344 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1345 /// [`tokio::select!`] it will ensure that the order that branches are
1346 /// polled is deterministic.
1347 ///
1348 /// In addition to the code specifying `rng_seed` and interacting with
1349 /// the runtime, the internals of Tokio and the Rust compiler may affect
1350 /// the sequences of random numbers. In order to ensure repeatable
1351 /// results, the version of Tokio, the versions of all other
1352 /// dependencies that interact with Tokio, and the Rust compiler version
1353 /// should also all remain constant.
1354 ///
1355 /// # Examples
1356 ///
1357 /// ```
1358 /// # use tokio::runtime::{self, RngSeed};
1359 /// # pub fn main() {
1360 /// let seed = RngSeed::from_bytes(b"place your seed here");
1361 /// let rt = runtime::Builder::new_current_thread()
1362 /// .rng_seed(seed)
1363 /// .build();
1364 /// # }
1365 /// ```
1366 ///
1367 /// [`tokio::select!`]: crate::select
1368 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1369 self.seed_generator = RngSeedGenerator::new(seed);
1370 self
1371 }
1372 }
1373
1374 cfg_unstable_metrics! {
1375 /// Enables tracking the distribution of task poll times.
1376 ///
1377 /// Task poll times are not instrumented by default as doing so requires
1378 /// calling [`Instant::now()`] twice per task poll, which could add
1379 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1380 /// metrics data.
1381 ///
1382 /// The histogram uses fixed bucket sizes. In other words, the histogram
1383 /// buckets are not dynamic based on input values. Use the
1384 /// `metrics_poll_time_histogram` builder methods to configure the
1385 /// histogram details.
1386 ///
1387 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1388 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1389 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1390 /// to select [`LogHistogram`] instead.
1391 ///
1392 /// # Examples
1393 ///
1394 /// ```
1395 /// # #[cfg(not(target_family = "wasm"))]
1396 /// # {
1397 /// use tokio::runtime;
1398 ///
1399 /// let rt = runtime::Builder::new_multi_thread()
1400 /// .enable_metrics_poll_time_histogram()
1401 /// .build()
1402 /// .unwrap();
1403 /// # // Test default values here
1404 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1405 /// # let m = rt.handle().metrics();
1406 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1407 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1408 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1409 /// # }
1410 /// ```
1411 ///
1412 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1413 /// [`Instant::now()`]: std::time::Instant::now
1414 /// [`LogHistogram`]: crate::runtime::LogHistogram
1415 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1416 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1417 self.metrics_poll_count_histogram_enable = true;
1418 self
1419 }
1420
1421 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1422 ///
1423 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1424 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1425 #[doc(hidden)]
1426 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1427 self.enable_metrics_poll_time_histogram()
1428 }
1429
1430 /// Sets the histogram scale for tracking the distribution of task poll
1431 /// times.
1432 ///
1433 /// Tracking the distribution of task poll times can be done using a
1434 /// linear or log scale. When using linear scale, each histogram bucket
1435 /// will represent the same range of poll times. When using log scale,
1436 /// each histogram bucket will cover a range twice as big as the
1437 /// previous bucket.
1438 ///
1439 /// **Default:** linear scale.
1440 ///
1441 /// # Examples
1442 ///
1443 /// ```
1444 /// # #[cfg(not(target_family = "wasm"))]
1445 /// # {
1446 /// use tokio::runtime::{self, HistogramScale};
1447 ///
1448 /// # #[allow(deprecated)]
1449 /// let rt = runtime::Builder::new_multi_thread()
1450 /// .enable_metrics_poll_time_histogram()
1451 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1452 /// .build()
1453 /// .unwrap();
1454 /// # }
1455 /// ```
1456 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1457 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1458 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1459 self
1460 }
1461
1462 /// Configure the histogram for tracking poll times
1463 ///
1464 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1465 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1466 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1467 ///
1468 /// # Examples
1469 /// Configure a [`LogHistogram`] with [default configuration]:
1470 /// ```
1471 /// # #[cfg(not(target_family = "wasm"))]
1472 /// # {
1473 /// use tokio::runtime;
1474 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1475 ///
1476 /// let rt = runtime::Builder::new_multi_thread()
1477 /// .enable_metrics_poll_time_histogram()
1478 /// .metrics_poll_time_histogram_configuration(
1479 /// HistogramConfiguration::log(LogHistogram::default())
1480 /// )
1481 /// .build()
1482 /// .unwrap();
1483 /// # }
1484 /// ```
1485 ///
1486 /// Configure a linear histogram with 100 buckets, each 10μs wide
1487 /// ```
1488 /// # #[cfg(not(target_family = "wasm"))]
1489 /// # {
1490 /// use tokio::runtime;
1491 /// use std::time::Duration;
1492 /// use tokio::runtime::HistogramConfiguration;
1493 ///
1494 /// let rt = runtime::Builder::new_multi_thread()
1495 /// .enable_metrics_poll_time_histogram()
1496 /// .metrics_poll_time_histogram_configuration(
1497 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1498 /// )
1499 /// .build()
1500 /// .unwrap();
1501 /// # }
1502 /// ```
1503 ///
1504 /// Configure a [`LogHistogram`] with the following settings:
1505 /// - Measure times from 100ns to 120s
1506 /// - Max error of 0.1
1507 /// - No more than 1024 buckets
1508 /// ```
1509 /// # #[cfg(not(target_family = "wasm"))]
1510 /// # {
1511 /// use std::time::Duration;
1512 /// use tokio::runtime;
1513 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1514 ///
1515 /// let rt = runtime::Builder::new_multi_thread()
1516 /// .enable_metrics_poll_time_histogram()
1517 /// .metrics_poll_time_histogram_configuration(
1518 /// HistogramConfiguration::log(LogHistogram::builder()
1519 /// .max_value(Duration::from_secs(120))
1520 /// .min_value(Duration::from_nanos(100))
1521 /// .max_error(0.1)
1522 /// .max_buckets(1024)
1523 /// .expect("configuration uses 488 buckets")
1524 /// )
1525 /// )
1526 /// .build()
1527 /// .unwrap();
1528 /// # }
1529 /// ```
1530 ///
1531 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1532 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1533 /// where each bucket is twice the size of the previous bucket.
1534 /// ```rust
1535 /// use std::time::Duration;
1536 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1537 /// let rt = tokio::runtime::Builder::new_current_thread()
1538 /// .enable_all()
1539 /// .enable_metrics_poll_time_histogram()
1540 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1541 /// LogHistogram::builder()
1542 /// .min_value(Duration::from_micros(20))
1543 /// .max_value(Duration::from_millis(4))
1544 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1545 /// .precision_exact(0)
1546 /// .max_buckets(10)
1547 /// .unwrap(),
1548 /// ))
1549 /// .build()
1550 /// .unwrap();
1551 /// ```
1552 ///
1553 /// [`LogHistogram`]: crate::runtime::LogHistogram
1554 /// [default configuration]: crate::runtime::LogHistogramBuilder
1555 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1556 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1557 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1558 self
1559 }
1560
1561 /// Sets the histogram resolution for tracking the distribution of task
1562 /// poll times.
1563 ///
1564 /// The resolution is the histogram's first bucket's range. When using a
1565 /// linear histogram scale, each bucket will cover the same range. When
1566 /// using a log scale, each bucket will cover a range twice as big as
1567 /// the previous bucket. In the log case, the resolution represents the
1568 /// smallest bucket range.
1569 ///
1570 /// Note that, when using log scale, the resolution is rounded up to the
1571 /// nearest power of 2 in nanoseconds.
1572 ///
1573 /// **Default:** 100 microseconds.
1574 ///
1575 /// # Examples
1576 ///
1577 /// ```
1578 /// # #[cfg(not(target_family = "wasm"))]
1579 /// # {
1580 /// use tokio::runtime;
1581 /// use std::time::Duration;
1582 ///
1583 /// # #[allow(deprecated)]
1584 /// let rt = runtime::Builder::new_multi_thread()
1585 /// .enable_metrics_poll_time_histogram()
1586 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1587 /// .build()
1588 /// .unwrap();
1589 /// # }
1590 /// ```
1591 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1592 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1593 assert!(resolution > Duration::from_secs(0));
1594 // Sanity check the argument and also make the cast below safe.
1595 assert!(resolution <= Duration::from_secs(1));
1596
1597 let resolution = resolution.as_nanos() as u64;
1598
1599 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1600 self
1601 }
1602
1603 /// Sets the number of buckets for the histogram tracking the
1604 /// distribution of task poll times.
1605 ///
1606 /// The last bucket tracks all greater values that fall out of other
1607 /// ranges. So, configuring the histogram using a linear scale,
1608 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1609 /// polls that take more than 450ms to complete.
1610 ///
1611 /// **Default:** 10
1612 ///
1613 /// # Examples
1614 ///
1615 /// ```
1616 /// # #[cfg(not(target_family = "wasm"))]
1617 /// # {
1618 /// use tokio::runtime;
1619 ///
1620 /// # #[allow(deprecated)]
1621 /// let rt = runtime::Builder::new_multi_thread()
1622 /// .enable_metrics_poll_time_histogram()
1623 /// .metrics_poll_count_histogram_buckets(15)
1624 /// .build()
1625 /// .unwrap();
1626 /// # }
1627 /// ```
1628 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1629 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1630 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1631 self
1632 }
1633 }
1634
1635 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1636 use crate::runtime::runtime::Scheduler;
1637
1638 let (scheduler, handle, blocking_pool) =
1639 self.build_current_thread_runtime_components(None)?;
1640
1641 Ok(Runtime::from_parts(
1642 Scheduler::CurrentThread(scheduler),
1643 handle,
1644 blocking_pool,
1645 ))
1646 }
1647
1648 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1649 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1650
1651 let tid = std::thread::current().id();
1652
1653 let (scheduler, handle, blocking_pool) =
1654 self.build_current_thread_runtime_components(Some(tid))?;
1655
1656 Ok(LocalRuntime::from_parts(
1657 LocalRuntimeScheduler::CurrentThread(scheduler),
1658 handle,
1659 blocking_pool,
1660 ))
1661 }
1662
1663 fn build_current_thread_runtime_components(
1664 &mut self,
1665 local_tid: Option<ThreadId>,
1666 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1667 use crate::runtime::scheduler;
1668 use crate::runtime::Config;
1669
1670 let mut cfg = self.get_cfg();
1671 cfg.timer_flavor = TimerFlavor::Traditional;
1672 let (driver, driver_handle) = driver::Driver::new(cfg)?;
1673
1674 // Blocking pool
1675 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1676 let blocking_spawner = blocking_pool.spawner().clone();
1677
1678 // Generate a rng seed for this runtime.
1679 let seed_generator_1 = self.seed_generator.next_generator();
1680 let seed_generator_2 = self.seed_generator.next_generator();
1681
1682 // And now put a single-threaded scheduler on top of the timer. When
1683 // there are no futures ready to do something, it'll let the timer or
1684 // the reactor to generate some new stimuli for the futures to continue
1685 // in their life.
1686 let (scheduler, handle) = CurrentThread::new(
1687 driver,
1688 driver_handle,
1689 blocking_spawner,
1690 seed_generator_2,
1691 Config {
1692 before_park: self.before_park.clone(),
1693 after_unpark: self.after_unpark.clone(),
1694 before_spawn: self.before_spawn.clone(),
1695 #[cfg(tokio_unstable)]
1696 before_poll: self.before_poll.clone(),
1697 #[cfg(tokio_unstable)]
1698 after_poll: self.after_poll.clone(),
1699 after_termination: self.after_termination.clone(),
1700 global_queue_interval: self.global_queue_interval,
1701 event_interval: self.event_interval,
1702 #[cfg(tokio_unstable)]
1703 unhandled_panic: self.unhandled_panic.clone(),
1704 disable_lifo_slot: self.disable_lifo_slot,
1705 // This setting never makes sense for a current thread runtime,
1706 // as it only configures how the I/O driver is stolen across
1707 // workers.
1708 enable_eager_driver_handoff: false,
1709 seed_generator: seed_generator_1,
1710 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1711 },
1712 local_tid,
1713 self.name.clone(),
1714 );
1715
1716 let handle = Handle {
1717 inner: scheduler::Handle::CurrentThread(handle),
1718 };
1719
1720 Ok((scheduler, handle, blocking_pool))
1721 }
1722
1723 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1724 if self.metrics_poll_count_histogram_enable {
1725 Some(self.metrics_poll_count_histogram.clone())
1726 } else {
1727 None
1728 }
1729 }
1730}
1731
1732cfg_io_driver! {
1733 impl Builder {
1734 /// Enables the I/O driver.
1735 ///
1736 /// Doing this enables using net, process, signal, and some I/O types on
1737 /// the runtime.
1738 ///
1739 /// # Examples
1740 ///
1741 /// ```
1742 /// use tokio::runtime;
1743 ///
1744 /// let rt = runtime::Builder::new_multi_thread()
1745 /// .enable_io()
1746 /// .build()
1747 /// .unwrap();
1748 /// ```
1749 pub fn enable_io(&mut self) -> &mut Self {
1750 self.enable_io = true;
1751 self
1752 }
1753
1754 /// Enables the I/O driver and configures the max number of events to be
1755 /// processed per tick.
1756 ///
1757 /// # Examples
1758 ///
1759 /// ```
1760 /// use tokio::runtime;
1761 ///
1762 /// let rt = runtime::Builder::new_current_thread()
1763 /// .enable_io()
1764 /// .max_io_events_per_tick(1024)
1765 /// .build()
1766 /// .unwrap();
1767 /// ```
1768 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1769 self.nevents = capacity;
1770 self
1771 }
1772 }
1773}
1774
1775cfg_time! {
1776 impl Builder {
1777 /// Enables the time driver.
1778 ///
1779 /// Doing this enables using `tokio::time` on the runtime.
1780 ///
1781 /// # Examples
1782 ///
1783 /// ```
1784 /// # #[cfg(not(target_family = "wasm"))]
1785 /// # {
1786 /// use tokio::runtime;
1787 ///
1788 /// let rt = runtime::Builder::new_multi_thread()
1789 /// .enable_time()
1790 /// .build()
1791 /// .unwrap();
1792 /// # }
1793 /// ```
1794 pub fn enable_time(&mut self) -> &mut Self {
1795 self.enable_time = true;
1796 self
1797 }
1798 }
1799}
1800
1801cfg_io_uring! {
1802 impl Builder {
1803 /// Enables the tokio's io_uring driver.
1804 ///
1805 /// Doing this enables using io_uring operations on the runtime.
1806 ///
1807 /// # Examples
1808 ///
1809 /// ```
1810 /// use tokio::runtime;
1811 ///
1812 /// let rt = runtime::Builder::new_multi_thread()
1813 /// .enable_io_uring()
1814 /// .build()
1815 /// .unwrap();
1816 /// ```
1817 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1818 pub fn enable_io_uring(&mut self) -> &mut Self {
1819 // Currently, the uring flag is equivalent to `enable_io`.
1820 self.enable_io = true;
1821 self
1822 }
1823 }
1824}
1825
1826cfg_test_util! {
1827 impl Builder {
1828 /// Controls if the runtime's clock starts paused or advancing.
1829 ///
1830 /// Pausing time requires the current-thread runtime; construction of
1831 /// the runtime will panic otherwise.
1832 ///
1833 /// # Examples
1834 ///
1835 /// ```
1836 /// use tokio::runtime;
1837 ///
1838 /// let rt = runtime::Builder::new_current_thread()
1839 /// .enable_time()
1840 /// .start_paused(true)
1841 /// .build()
1842 /// .unwrap();
1843 /// ```
1844 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1845 self.start_paused = start_paused;
1846 self
1847 }
1848 }
1849}
1850
1851cfg_rt_multi_thread! {
1852 impl Builder {
1853 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1854 use crate::loom::sys::num_cpus;
1855 use crate::runtime::{Config, runtime::Scheduler};
1856 use crate::runtime::scheduler::{self, MultiThread};
1857
1858 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1859
1860 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1861
1862 // Create the blocking pool
1863 let blocking_pool =
1864 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1865 let blocking_spawner = blocking_pool.spawner().clone();
1866
1867 // Generate a rng seed for this runtime.
1868 let seed_generator_1 = self.seed_generator.next_generator();
1869 let seed_generator_2 = self.seed_generator.next_generator();
1870
1871 let (scheduler, handle, launch) = MultiThread::new(
1872 worker_threads,
1873 driver,
1874 driver_handle,
1875 blocking_spawner,
1876 seed_generator_2,
1877 Config {
1878 before_park: self.before_park.clone(),
1879 after_unpark: self.after_unpark.clone(),
1880 before_spawn: self.before_spawn.clone(),
1881 #[cfg(tokio_unstable)]
1882 before_poll: self.before_poll.clone(),
1883 #[cfg(tokio_unstable)]
1884 after_poll: self.after_poll.clone(),
1885 after_termination: self.after_termination.clone(),
1886 global_queue_interval: self.global_queue_interval,
1887 event_interval: self.event_interval,
1888 #[cfg(tokio_unstable)]
1889 unhandled_panic: self.unhandled_panic.clone(),
1890 disable_lifo_slot: self.disable_lifo_slot,
1891 enable_eager_driver_handoff: self.enable_eager_driver_handoff,
1892 seed_generator: seed_generator_1,
1893 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1894 },
1895 self.timer_flavor,
1896 self.name.clone(),
1897 );
1898
1899 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1900
1901 // Spawn the thread pool workers
1902 let _enter = handle.enter();
1903 launch.launch();
1904
1905 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1906 }
1907 }
1908}
1909
1910impl fmt::Debug for Builder {
1911 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1912 let mut debug = fmt.debug_struct("Builder");
1913
1914 if let Some(name) = &self.name {
1915 debug.field("name", name);
1916 }
1917
1918 debug
1919 .field("worker_threads", &self.worker_threads)
1920 .field("max_blocking_threads", &self.max_blocking_threads)
1921 .field(
1922 "thread_name",
1923 &"<dyn Fn() -> String + Send + Sync + 'static>",
1924 )
1925 .field("thread_stack_size", &self.thread_stack_size)
1926 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1927 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1928 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1929 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1930 .field(
1931 "enable_eager_driver_handoff",
1932 &self.enable_eager_driver_handoff,
1933 );
1934
1935 if self.name.is_none() {
1936 debug.finish_non_exhaustive()
1937 } else {
1938 debug.finish()
1939 }
1940 }
1941}