1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 shared: Shared,
39
40 pub(crate) driver: driver::Handle,
42
43 pub(crate) blocking_spawner: blocking::Spawner,
45
46 pub(crate) seed_generator: RngSeedGenerator,
48
49 pub(crate) task_hooks: TaskHooks,
51
52 pub(crate) local_tid: Option<ThreadId>,
54}
55
56struct Core {
59 tasks: VecDeque<Notified>,
61
62 tick: u32,
64
65 driver: Option<Driver>,
69
70 metrics: MetricsBatch,
72
73 global_queue_interval: u32,
75
76 unhandled_panic: bool,
79}
80
81struct Shared {
83 inject: Inject<Arc<Handle>>,
85
86 owned: OwnedTasks<Arc<Handle>>,
88
89 woken: AtomicBool,
91
92 config: Config,
94
95 scheduler_metrics: SchedulerMetrics,
97
98 worker_metrics: WorkerMetrics,
100}
101
102pub(crate) struct Context {
106 handle: Arc<Handle>,
108
109 core: RefCell<Option<Box<Core>>>,
112
113 pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119const INITIAL_CAPACITY: usize = 64;
121
122const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 #[cfg(tokio_unstable)]
149 before_poll_callback: config.before_poll.clone(),
150 #[cfg(tokio_unstable)]
151 after_poll_callback: config.after_poll.clone(),
152 },
153 shared: Shared {
154 inject: Inject::new(),
155 owned: OwnedTasks::new(1),
156 woken: AtomicBool::new(false),
157 config,
158 scheduler_metrics: SchedulerMetrics::new(),
159 worker_metrics,
160 },
161 driver: driver_handle,
162 blocking_spawner,
163 seed_generator,
164 local_tid,
165 });
166
167 let core = AtomicCell::new(Some(Box::new(Core {
168 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169 tick: 0,
170 driver: Some(driver),
171 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172 global_queue_interval,
173 unhandled_panic: false,
174 })));
175
176 let scheduler = CurrentThread {
177 core,
178 notify: Notify::new(),
179 };
180
181 (scheduler, handle)
182 }
183
184 #[track_caller]
185 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186 pin!(future);
187
188 crate::runtime::context::enter_runtime(handle, false, |blocking| {
189 let handle = handle.as_current_thread();
190
191 loop {
195 if let Some(core) = self.take_core(handle) {
196 handle
197 .shared
198 .worker_metrics
199 .set_thread_id(thread::current().id());
200 return core.block_on(future);
201 } else {
202 let notified = self.notify.notified();
203 pin!(notified);
204
205 if let Some(out) = blocking
206 .block_on(poll_fn(|cx| {
207 if notified.as_mut().poll(cx).is_ready() {
208 return Ready(None);
209 }
210
211 if let Ready(out) = future.as_mut().poll(cx) {
212 return Ready(Some(out));
213 }
214
215 Pending
216 }))
217 .expect("Failed to `Enter::block_on`")
218 {
219 return out;
220 }
221 }
222 }
223 })
224 }
225
226 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227 let core = self.core.take()?;
228
229 Some(CoreGuard {
230 context: scheduler::Context::CurrentThread(Context {
231 handle: handle.clone(),
232 core: RefCell::new(Some(core)),
233 defer: Defer::new(),
234 }),
235 scheduler: self,
236 })
237 }
238
239 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240 let handle = handle.as_current_thread();
241
242 let core = match self.take_core(handle) {
246 Some(core) => core,
247 None if std::thread::panicking() => return,
248 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249 };
250
251 let tls_available = context::with_current(|_| ()).is_ok();
253
254 if tls_available {
255 core.enter(|core, _context| {
256 let core = shutdown2(core, handle);
257 (core, ())
258 });
259 } else {
260 let context = core.context.expect_current_thread();
264 let core = context.core.borrow_mut().take().unwrap();
265
266 let core = shutdown2(core, handle);
267 *context.core.borrow_mut() = Some(core);
268 }
269 }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273 handle.shared.owned.close_and_shutdown_all(0);
277
278 while let Some(task) = core.next_local_task(handle) {
281 drop(task);
282 }
283
284 handle.shared.inject.close();
286
287 while let Some(task) = handle.shared.inject.pop() {
289 drop(task);
290 }
291
292 assert!(handle.shared.owned.is_empty());
293
294 core.submit_metrics(handle);
296
297 if let Some(driver) = core.driver.as_mut() {
299 driver.shutdown(&handle.driver);
300 }
301
302 core
303}
304
305impl fmt::Debug for CurrentThread {
306 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307 fmt.debug_struct("CurrentThread").finish()
308 }
309}
310
311impl Core {
314 fn tick(&mut self) {
316 self.tick = self.tick.wrapping_add(1);
317 }
318
319 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320 if self.tick % self.global_queue_interval == 0 {
321 handle
322 .next_remote_task()
323 .or_else(|| self.next_local_task(handle))
324 } else {
325 self.next_local_task(handle)
326 .or_else(|| handle.next_remote_task())
327 }
328 }
329
330 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331 let ret = self.tasks.pop_front();
332 handle
333 .shared
334 .worker_metrics
335 .set_queue_depth(self.tasks.len());
336 ret
337 }
338
339 fn push_task(&mut self, handle: &Handle, task: Notified) {
340 self.tasks.push_back(task);
341 self.metrics.inc_local_schedule_count();
342 handle
343 .shared
344 .worker_metrics
345 .set_queue_depth(self.tasks.len());
346 }
347
348 fn submit_metrics(&mut self, handle: &Handle) {
349 self.metrics.submit(&handle.shared.worker_metrics, 0);
350 }
351}
352
353#[cfg(feature = "taskdump")]
354fn wake_deferred_tasks_and_free(context: &Context) {
355 let wakers = context.defer.take_deferred();
356 for waker in wakers {
357 waker.wake();
358 }
359}
360
361impl Context {
364 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367 core.metrics.start_poll();
368 let mut ret = self.enter(core, || crate::task::coop::budget(f));
369 ret.0.metrics.end_poll();
370 ret
371 }
372
373 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376 let mut driver = core.driver.take().expect("driver missing");
377
378 if let Some(f) = &handle.shared.config.before_park {
379 let (c, ()) = self.enter(core, || f());
380 core = c;
381 }
382
383 if core.tasks.is_empty() {
386 core.metrics.about_to_park();
388 core.submit_metrics(handle);
389
390 core = self.park_internal(core, handle, &mut driver, None);
391
392 core.metrics.unparked();
393 core.submit_metrics(handle);
394 }
395
396 if let Some(f) = &handle.shared.config.after_unpark {
397 let (c, ()) = self.enter(core, || f());
398 core = c;
399 }
400
401 core.driver = Some(driver);
402 core
403 }
404
405 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
407 let mut driver = core.driver.take().expect("driver missing");
408
409 core.submit_metrics(handle);
410
411 core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
412
413 core.driver = Some(driver);
414 core
415 }
416
417 fn park_internal(
418 &self,
419 core: Box<Core>,
420 handle: &Handle,
421 driver: &mut Driver,
422 duration: Option<Duration>,
423 ) -> Box<Core> {
424 let (core, ()) = self.enter(core, || {
425 match duration {
426 Some(dur) => driver.park_timeout(&handle.driver, dur),
427 None => driver.park(&handle.driver),
428 }
429 self.defer.wake();
430 });
431
432 core
433 }
434
435 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
436 *self.core.borrow_mut() = Some(core);
440
441 let ret = f();
443
444 let core = self.core.borrow_mut().take().expect("core missing");
446 (core, ret)
447 }
448
449 pub(crate) fn defer(&self, waker: &Waker) {
450 self.defer.defer(waker);
451 }
452}
453
454impl Handle {
457 #[track_caller]
459 pub(crate) fn spawn<F>(
460 me: &Arc<Self>,
461 future: F,
462 id: crate::runtime::task::Id,
463 spawned_at: SpawnLocation,
464 ) -> JoinHandle<F::Output>
465 where
466 F: crate::future::Future + Send + 'static,
467 F::Output: Send + 'static,
468 {
469 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
470
471 me.task_hooks.spawn(&TaskMeta {
472 id,
473 spawned_at,
474 _phantom: Default::default(),
475 });
476
477 if let Some(notified) = notified {
478 me.schedule(notified);
479 }
480
481 handle
482 }
483
484 #[track_caller]
492 pub(crate) unsafe fn spawn_local<F>(
493 me: &Arc<Self>,
494 future: F,
495 id: crate::runtime::task::Id,
496 spawned_at: SpawnLocation,
497 ) -> JoinHandle<F::Output>
498 where
499 F: crate::future::Future + 'static,
500 F::Output: 'static,
501 {
502 let (handle, notified) = unsafe {
504 me.shared
505 .owned
506 .bind_local(future, me.clone(), id, spawned_at)
507 };
508
509 me.task_hooks.spawn(&TaskMeta {
510 id,
511 spawned_at,
512 _phantom: Default::default(),
513 });
514
515 if let Some(notified) = notified {
516 me.schedule(notified);
517 }
518
519 handle
520 }
521
522 #[cfg(all(
524 tokio_unstable,
525 feature = "taskdump",
526 target_os = "linux",
527 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
528 ))]
529 pub(crate) fn dump(&self) -> crate::runtime::Dump {
530 use crate::runtime::dump;
531 use task::trace::trace_current_thread;
532
533 let mut traces = vec![];
534
535 context::with_scheduler(|maybe_context| {
537 let context = if let Some(context) = maybe_context {
539 context.expect_current_thread()
540 } else {
541 return;
542 };
543 let mut maybe_core = context.core.borrow_mut();
544 let core = if let Some(core) = maybe_core.as_mut() {
545 core
546 } else {
547 return;
548 };
549 let local = &mut core.tasks;
550
551 if self.shared.inject.is_closed() {
552 return;
553 }
554
555 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
556 .into_iter()
557 .map(|(id, trace)| dump::Task::new(id, trace))
558 .collect();
559
560 drop(maybe_core);
562
563 wake_deferred_tasks_and_free(context);
567 });
568
569 dump::Dump::new(traces)
570 }
571
572 fn next_remote_task(&self) -> Option<Notified> {
573 self.shared.inject.pop()
574 }
575
576 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
577 me.shared.woken.store(true, Release);
580 waker_ref(me)
581 }
582
583 pub(crate) fn reset_woken(&self) -> bool {
585 self.shared.woken.swap(false, AcqRel)
586 }
587
588 pub(crate) fn num_alive_tasks(&self) -> usize {
589 self.shared.owned.num_alive_tasks()
590 }
591
592 pub(crate) fn injection_queue_depth(&self) -> usize {
593 self.shared.inject.len()
594 }
595
596 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
597 assert_eq!(0, worker);
598 &self.shared.worker_metrics
599 }
600}
601
602cfg_unstable_metrics! {
603 impl Handle {
604 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
605 &self.shared.scheduler_metrics
606 }
607
608 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
609 self.worker_metrics(worker).queue_depth()
610 }
611
612 pub(crate) fn num_blocking_threads(&self) -> usize {
613 self.blocking_spawner.num_threads()
614 }
615
616 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
617 self.blocking_spawner.num_idle_threads()
618 }
619
620 pub(crate) fn blocking_queue_depth(&self) -> usize {
621 self.blocking_spawner.queue_depth()
622 }
623
624 cfg_64bit_metrics! {
625 pub(crate) fn spawned_tasks_count(&self) -> u64 {
626 self.shared.owned.spawned_tasks_count()
627 }
628 }
629 }
630}
631
632use std::num::NonZeroU64;
633
634impl Handle {
635 pub(crate) fn owned_id(&self) -> NonZeroU64 {
636 self.shared.owned.id
637 }
638}
639
640impl fmt::Debug for Handle {
641 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
642 fmt.debug_struct("current_thread::Handle { ... }").finish()
643 }
644}
645
646impl Schedule for Arc<Handle> {
649 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
650 self.shared.owned.remove(task)
651 }
652
653 fn schedule(&self, task: task::Notified<Self>) {
654 use scheduler::Context::CurrentThread;
655
656 context::with_scheduler(|maybe_cx| match maybe_cx {
657 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
658 let mut core = cx.core.borrow_mut();
659
660 if let Some(core) = core.as_mut() {
663 core.push_task(self, task);
664 }
665 }
666 _ => {
667 self.shared.scheduler_metrics.inc_remote_schedule_count();
669
670 self.shared.inject.push(task);
672 self.driver.unpark();
673 }
674 });
675 }
676
677 fn hooks(&self) -> TaskHarnessScheduleHooks {
678 TaskHarnessScheduleHooks {
679 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
680 }
681 }
682
683 cfg_unstable! {
684 fn unhandled_panic(&self) {
685 use crate::runtime::UnhandledPanic;
686
687 match self.shared.config.unhandled_panic {
688 UnhandledPanic::Ignore => {
689 }
691 UnhandledPanic::ShutdownRuntime => {
692 use scheduler::Context::CurrentThread;
693
694 context::with_scheduler(|maybe_cx| match maybe_cx {
699 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
700 let mut core = cx.core.borrow_mut();
701
702 if let Some(core) = core.as_mut() {
704 core.unhandled_panic = true;
705 self.shared.owned.close_and_shutdown_all(0);
706 }
707 }
708 _ => unreachable!("runtime core not set in CURRENT thread-local"),
709 })
710 }
711 }
712 }
713 }
714}
715
716impl Wake for Handle {
717 fn wake(arc_self: Arc<Self>) {
718 Wake::wake_by_ref(&arc_self);
719 }
720
721 fn wake_by_ref(arc_self: &Arc<Self>) {
723 arc_self.shared.woken.store(true, Release);
724 arc_self.driver.unpark();
725 }
726}
727
728struct CoreGuard<'a> {
733 context: scheduler::Context,
734 scheduler: &'a CurrentThread,
735}
736
737impl CoreGuard<'_> {
738 #[track_caller]
739 fn block_on<F: Future>(self, future: F) -> F::Output {
740 let ret = self.enter(|mut core, context| {
741 let waker = Handle::waker_ref(&context.handle);
742 let mut cx = std::task::Context::from_waker(&waker);
743
744 pin!(future);
745
746 core.metrics.start_processing_scheduled_tasks();
747
748 'outer: loop {
749 let handle = &context.handle;
750
751 if handle.reset_woken() {
752 let (c, res) = context.enter(core, || {
753 crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
754 });
755
756 core = c;
757
758 if let Ready(v) = res {
759 return (core, Some(v));
760 }
761 }
762
763 for _ in 0..handle.shared.config.event_interval {
764 if core.unhandled_panic {
766 return (core, None);
767 }
768
769 core.tick();
770
771 let entry = core.next_task(handle);
772
773 let task = match entry {
774 Some(entry) => entry,
775 None => {
776 core.metrics.end_processing_scheduled_tasks();
777
778 core = if !context.defer.is_empty() {
779 context.park_yield(core, handle)
780 } else {
781 context.park(core, handle)
782 };
783
784 core.metrics.start_processing_scheduled_tasks();
785
786 continue 'outer;
788 }
789 };
790
791 let task = context.handle.shared.owned.assert_owner(task);
792
793 #[cfg(tokio_unstable)]
794 let task_meta = task.task_meta();
795
796 let (c, ()) = context.run_task(core, || {
797 #[cfg(tokio_unstable)]
798 context.handle.task_hooks.poll_start_callback(&task_meta);
799
800 task.run();
801
802 #[cfg(tokio_unstable)]
803 context.handle.task_hooks.poll_stop_callback(&task_meta);
804 });
805
806 core = c;
807 }
808
809 core.metrics.end_processing_scheduled_tasks();
810
811 core = context.park_yield(core, handle);
814
815 core.metrics.start_processing_scheduled_tasks();
816 }
817 });
818
819 match ret {
820 Some(ret) => ret,
821 None => {
822 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
824 }
825 }
826 }
827
828 fn enter<F, R>(self, f: F) -> R
831 where
832 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
833 {
834 let context = self.context.expect_current_thread();
835
836 let core = context.core.borrow_mut().take().expect("core missing");
838
839 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
841
842 *context.core.borrow_mut() = Some(core);
843
844 ret
845 }
846}
847
848impl Drop for CoreGuard<'_> {
849 fn drop(&mut self) {
850 let context = self.context.expect_current_thread();
851
852 if let Some(core) = context.core.borrow_mut().take() {
853 self.scheduler.core.set(core);
856
857 self.scheduler.notify.notify_one();
859 }
860 }
861}