1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 name: Option<String>,
39
40 shared: Shared,
42
43 pub(crate) driver: driver::Handle,
45
46 pub(crate) blocking_spawner: blocking::Spawner,
48
49 pub(crate) seed_generator: RngSeedGenerator,
51
52 pub(crate) task_hooks: TaskHooks,
54
55 pub(crate) local_tid: Option<ThreadId>,
57}
58
59struct Core {
62 tasks: VecDeque<Notified>,
64
65 tick: u32,
67
68 driver: Option<Driver>,
72
73 metrics: MetricsBatch,
75
76 global_queue_interval: u32,
78
79 unhandled_panic: bool,
82}
83
84struct Shared {
86 inject: Inject<Arc<Handle>>,
88
89 owned: OwnedTasks<Arc<Handle>>,
91
92 woken: AtomicBool,
94
95 config: Config,
97
98 scheduler_metrics: SchedulerMetrics,
100
101 worker_metrics: WorkerMetrics,
103}
104
105pub(crate) struct Context {
109 handle: Arc<Handle>,
111
112 core: RefCell<Option<Box<Core>>>,
115
116 pub(crate) defer: Defer,
118}
119
120type Notified = task::Notified<Arc<Handle>>;
121
122const INITIAL_CAPACITY: usize = 64;
124
125const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
129
130impl CurrentThread {
131 pub(crate) fn new(
132 driver: Driver,
133 driver_handle: driver::Handle,
134 blocking_spawner: blocking::Spawner,
135 seed_generator: RngSeedGenerator,
136 config: Config,
137 local_tid: Option<ThreadId>,
138 name: Option<String>,
139 ) -> (CurrentThread, Arc<Handle>) {
140 let worker_metrics = WorkerMetrics::from_config(&config);
141 worker_metrics.set_thread_id(thread::current().id());
142
143 let global_queue_interval = config
145 .global_queue_interval
146 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
147
148 let handle = Arc::new(Handle {
149 name,
150 task_hooks: TaskHooks {
151 task_spawn_callback: config.before_spawn.clone(),
152 task_terminate_callback: config.after_termination.clone(),
153 #[cfg(tokio_unstable)]
154 before_poll_callback: config.before_poll.clone(),
155 #[cfg(tokio_unstable)]
156 after_poll_callback: config.after_poll.clone(),
157 },
158 shared: Shared {
159 inject: Inject::new(),
160 owned: OwnedTasks::new(1),
161 woken: AtomicBool::new(false),
162 config,
163 scheduler_metrics: SchedulerMetrics::new(),
164 worker_metrics,
165 },
166 driver: driver_handle,
167 blocking_spawner,
168 seed_generator,
169 local_tid,
170 });
171
172 let core = AtomicCell::new(Some(Box::new(Core {
173 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
174 tick: 0,
175 driver: Some(driver),
176 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
177 global_queue_interval,
178 unhandled_panic: false,
179 })));
180
181 let scheduler = CurrentThread {
182 core,
183 notify: Notify::new(),
184 };
185
186 (scheduler, handle)
187 }
188
189 #[track_caller]
190 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
191 pin!(future);
192
193 crate::runtime::context::enter_runtime(handle, false, |blocking| {
194 let handle = handle.as_current_thread();
195
196 loop {
200 if let Some(core) = self.take_core(handle) {
201 handle
202 .shared
203 .worker_metrics
204 .set_thread_id(thread::current().id());
205 return core.block_on(future);
206 } else {
207 let notified = self.notify.notified();
208 pin!(notified);
209
210 if let Some(out) = blocking
211 .block_on(poll_fn(|cx| {
212 if notified.as_mut().poll(cx).is_ready() {
213 return Ready(None);
214 }
215
216 if let Ready(out) = future.as_mut().poll(cx) {
217 return Ready(Some(out));
218 }
219
220 Pending
221 }))
222 .expect("Failed to `Enter::block_on`")
223 {
224 return out;
225 }
226 }
227 }
228 })
229 }
230
231 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
232 let core = self.core.take()?;
233
234 Some(CoreGuard {
235 context: scheduler::Context::CurrentThread(Context {
236 handle: handle.clone(),
237 core: RefCell::new(Some(core)),
238 defer: Defer::new(),
239 }),
240 scheduler: self,
241 })
242 }
243
244 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
245 let handle = handle.as_current_thread();
246
247 let core = match self.take_core(handle) {
251 Some(core) => core,
252 None if std::thread::panicking() => return,
253 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
254 };
255
256 let tls_available = context::with_current(|_| ()).is_ok();
258
259 if tls_available {
260 core.enter(|core, _context| {
261 let core = shutdown2(core, handle);
262 (core, ())
263 });
264 } else {
265 let context = core.context.expect_current_thread();
269 let core = context.core.borrow_mut().take().unwrap();
270
271 let core = shutdown2(core, handle);
272 *context.core.borrow_mut() = Some(core);
273 }
274 }
275}
276
277fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
278 handle.shared.owned.close_and_shutdown_all(0);
282
283 while let Some(task) = core.next_local_task(handle) {
286 drop(task);
287 }
288
289 handle.shared.inject.close();
291
292 while let Some(task) = handle.shared.inject.pop() {
294 drop(task);
295 }
296
297 assert!(handle.shared.owned.is_empty());
298
299 core.submit_metrics(handle);
301
302 if let Some(driver) = core.driver.as_mut() {
304 driver.shutdown(&handle.driver);
305 }
306
307 core
308}
309
310impl fmt::Debug for CurrentThread {
311 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
312 fmt.debug_struct("CurrentThread").finish()
313 }
314}
315
316impl Core {
319 fn tick(&mut self) {
321 self.tick = self.tick.wrapping_add(1);
322 }
323
324 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
325 if self.tick % self.global_queue_interval == 0 {
326 handle
327 .next_remote_task()
328 .or_else(|| self.next_local_task(handle))
329 } else {
330 self.next_local_task(handle)
331 .or_else(|| handle.next_remote_task())
332 }
333 }
334
335 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
336 let ret = self.tasks.pop_front();
337 handle
338 .shared
339 .worker_metrics
340 .set_queue_depth(self.tasks.len());
341 ret
342 }
343
344 fn push_task(&mut self, handle: &Handle, task: Notified) {
345 self.tasks.push_back(task);
346 self.metrics.inc_local_schedule_count();
347 handle
348 .shared
349 .worker_metrics
350 .set_queue_depth(self.tasks.len());
351 }
352
353 fn submit_metrics(&mut self, handle: &Handle) {
354 self.metrics.submit(&handle.shared.worker_metrics, 0);
355 }
356}
357
358#[cfg(feature = "taskdump")]
359fn wake_deferred_tasks_and_free(context: &Context) {
360 let wakers = context.defer.take_deferred();
361 for waker in wakers {
362 waker.wake();
363 }
364}
365
366impl Context {
369 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
372 core.metrics.start_poll();
373 let mut ret = self.enter(core, || crate::task::coop::budget(f));
374 ret.0.metrics.end_poll();
375 ret
376 }
377
378 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
381 let mut driver = core.driver.take().expect("driver missing");
382
383 if let Some(f) = &handle.shared.config.before_park {
384 let (c, ()) = self.enter(core, || f());
385 core = c;
386 }
387
388 if !self.has_pending_work(&core) {
391 core.metrics.about_to_park();
393 core.submit_metrics(handle);
394
395 core = self.park_internal(core, handle, &mut driver, None);
396
397 core.metrics.unparked();
398 core.submit_metrics(handle);
399 }
400
401 if let Some(f) = &handle.shared.config.after_unpark {
402 let (c, ()) = self.enter(core, || f());
403 core = c;
404 }
405
406 core.driver = Some(driver);
407 core
408 }
409
410 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412 let mut driver = core.driver.take().expect("driver missing");
413
414 core.submit_metrics(handle);
415
416 core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
417
418 core.driver = Some(driver);
419 core
420 }
421
422 fn has_pending_work(&self, core: &Core) -> bool {
423 !core.tasks.is_empty() || !self.defer.is_empty() || self.handle.shared.woken.load(Acquire)
424 }
425
426 fn park_internal(
427 &self,
428 core: Box<Core>,
429 handle: &Handle,
430 driver: &mut Driver,
431 duration: Option<Duration>,
432 ) -> Box<Core> {
433 let (core, ()) = self.enter(core, || {
434 match duration {
435 Some(dur) => driver.park_timeout(&handle.driver, dur),
436 None => driver.park(&handle.driver),
437 }
438 self.defer.wake();
439 });
440
441 core
442 }
443
444 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
445 *self.core.borrow_mut() = Some(core);
449
450 let ret = f();
452
453 let core = self.core.borrow_mut().take().expect("core missing");
455 (core, ret)
456 }
457
458 pub(crate) fn defer(&self, waker: &Waker) {
459 self.defer.defer(waker);
460 }
461}
462
463impl Handle {
466 #[track_caller]
468 pub(crate) fn spawn<F>(
469 me: &Arc<Self>,
470 future: F,
471 id: crate::runtime::task::Id,
472 spawned_at: SpawnLocation,
473 ) -> JoinHandle<F::Output>
474 where
475 F: crate::future::Future + Send + 'static,
476 F::Output: Send + 'static,
477 {
478 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
479
480 me.task_hooks.spawn(&TaskMeta {
481 id,
482 spawned_at,
483 _phantom: Default::default(),
484 });
485
486 if let Some(notified) = notified {
487 me.schedule(notified);
488 }
489
490 handle
491 }
492
493 #[track_caller]
501 pub(crate) unsafe fn spawn_local<F>(
502 me: &Arc<Self>,
503 future: F,
504 id: crate::runtime::task::Id,
505 spawned_at: SpawnLocation,
506 ) -> JoinHandle<F::Output>
507 where
508 F: crate::future::Future + 'static,
509 F::Output: 'static,
510 {
511 let (handle, notified) = unsafe {
513 me.shared
514 .owned
515 .bind_local(future, me.clone(), id, spawned_at)
516 };
517
518 me.task_hooks.spawn(&TaskMeta {
519 id,
520 spawned_at,
521 _phantom: Default::default(),
522 });
523
524 if let Some(notified) = notified {
525 me.schedule(notified);
526 }
527
528 handle
529 }
530
531 #[cfg(all(
533 tokio_unstable,
534 feature = "taskdump",
535 target_os = "linux",
536 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
537 ))]
538 pub(crate) fn dump(&self) -> crate::runtime::Dump {
539 use crate::runtime::dump;
540 use task::trace::trace_current_thread;
541
542 let mut traces = vec![];
543
544 context::with_scheduler(|maybe_context| {
546 let context = if let Some(context) = maybe_context {
548 context.expect_current_thread()
549 } else {
550 return;
551 };
552 let mut maybe_core = context.core.borrow_mut();
553 let core = if let Some(core) = maybe_core.as_mut() {
554 core
555 } else {
556 return;
557 };
558 let local = &mut core.tasks;
559
560 if self.shared.inject.is_closed() {
561 return;
562 }
563
564 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
565 .into_iter()
566 .map(|(id, trace)| dump::Task::new(id, trace))
567 .collect();
568
569 drop(maybe_core);
571
572 wake_deferred_tasks_and_free(context);
576 });
577
578 dump::Dump::new(traces)
579 }
580
581 fn next_remote_task(&self) -> Option<Notified> {
582 self.shared.inject.pop()
583 }
584
585 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
586 me.shared.woken.store(true, Release);
589 waker_ref(me)
590 }
591
592 pub(crate) fn reset_woken(&self) -> bool {
594 self.shared.woken.swap(false, AcqRel)
595 }
596
597 pub(crate) fn num_alive_tasks(&self) -> usize {
598 self.shared.owned.num_alive_tasks()
599 }
600
601 pub(crate) fn injection_queue_depth(&self) -> usize {
602 self.shared.inject.len()
603 }
604
605 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
606 assert_eq!(0, worker);
607 &self.shared.worker_metrics
608 }
609}
610
611cfg_unstable_metrics! {
612 impl Handle {
613 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
614 &self.shared.scheduler_metrics
615 }
616
617 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
618 self.worker_metrics(worker).queue_depth()
619 }
620
621 pub(crate) fn num_blocking_threads(&self) -> usize {
622 self.blocking_spawner.num_threads()
623 }
624
625 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
626 self.blocking_spawner.num_idle_threads()
627 }
628
629 pub(crate) fn blocking_queue_depth(&self) -> usize {
630 self.blocking_spawner.queue_depth()
631 }
632
633 cfg_64bit_metrics! {
634 pub(crate) fn spawned_tasks_count(&self) -> u64 {
635 self.shared.owned.spawned_tasks_count()
636 }
637 }
638 }
639}
640
641use std::num::NonZeroU64;
642
643impl Handle {
644 pub(crate) fn owned_id(&self) -> NonZeroU64 {
645 self.shared.owned.id
646 }
647
648 pub(crate) fn name(&self) -> Option<&str> {
649 self.name.as_deref()
650 }
651}
652
653impl fmt::Debug for Handle {
654 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
655 fmt.debug_struct("current_thread::Handle { ... }").finish()
656 }
657}
658
659impl Schedule for Arc<Handle> {
662 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
663 self.shared.owned.remove(task)
664 }
665
666 fn schedule(&self, task: task::Notified<Self>) {
667 use scheduler::Context::CurrentThread;
668
669 context::with_scheduler(|maybe_cx| match maybe_cx {
670 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
671 let mut core = cx.core.borrow_mut();
672
673 if let Some(core) = core.as_mut() {
676 core.push_task(self, task);
677 }
678 }
679 _ => {
680 self.shared.scheduler_metrics.inc_remote_schedule_count();
682
683 self.shared.inject.push(task);
685 self.driver.unpark();
686 }
687 });
688 }
689
690 fn hooks(&self) -> TaskHarnessScheduleHooks {
691 TaskHarnessScheduleHooks {
692 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
693 }
694 }
695
696 cfg_unstable! {
697 fn unhandled_panic(&self) {
698 use crate::runtime::UnhandledPanic;
699
700 match self.shared.config.unhandled_panic {
701 UnhandledPanic::Ignore => {
702 }
704 UnhandledPanic::ShutdownRuntime => {
705 use scheduler::Context::CurrentThread;
706
707 context::with_scheduler(|maybe_cx| match maybe_cx {
712 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
713 let mut core = cx.core.borrow_mut();
714
715 if let Some(core) = core.as_mut() {
717 core.unhandled_panic = true;
718 self.shared.owned.close_and_shutdown_all(0);
719 }
720 }
721 _ => unreachable!("runtime core not set in CURRENT thread-local"),
722 })
723 }
724 }
725 }
726 }
727}
728
729impl Wake for Handle {
730 fn wake(arc_self: Arc<Self>) {
731 Wake::wake_by_ref(&arc_self);
732 }
733
734 fn wake_by_ref(arc_self: &Arc<Self>) {
736 let already_woken = arc_self.shared.woken.swap(true, Release);
737
738 if !already_woken {
739 use scheduler::Context::CurrentThread;
740
741 context::with_scheduler(|maybe_cx| match maybe_cx {
744 Some(CurrentThread(cx)) if Arc::ptr_eq(arc_self, &cx.handle) => {}
745 _ => {
746 arc_self.driver.unpark();
747 }
748 });
749 }
750 }
751}
752
753struct CoreGuard<'a> {
758 context: scheduler::Context,
759 scheduler: &'a CurrentThread,
760}
761
762impl CoreGuard<'_> {
763 #[track_caller]
764 fn block_on<F: Future>(self, future: F) -> F::Output {
765 let ret = self.enter(|mut core, context| {
766 let waker = Handle::waker_ref(&context.handle);
767 let mut cx = std::task::Context::from_waker(&waker);
768
769 pin!(future);
770
771 core.metrics.start_processing_scheduled_tasks();
772
773 'outer: loop {
774 let handle = &context.handle;
775
776 if handle.reset_woken() {
777 let (c, res) = context.enter(core, || {
778 crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
779 });
780
781 core = c;
782
783 if let Ready(v) = res {
784 return (core, Some(v));
785 }
786 }
787
788 for _ in 0..handle.shared.config.event_interval {
789 if core.unhandled_panic {
791 return (core, None);
792 }
793
794 core.tick();
795
796 let entry = core.next_task(handle);
797
798 let task = match entry {
799 Some(entry) => entry,
800 None => {
801 core.metrics.end_processing_scheduled_tasks();
802
803 core = if context.has_pending_work(&core) {
804 context.park_yield(core, handle)
805 } else {
806 context.park(core, handle)
807 };
808
809 core.metrics.start_processing_scheduled_tasks();
810
811 continue 'outer;
813 }
814 };
815
816 let task = context.handle.shared.owned.assert_owner(task);
817
818 #[cfg(tokio_unstable)]
819 let task_meta = task.task_meta();
820
821 let (c, ()) = context.run_task(core, || {
822 #[cfg(tokio_unstable)]
823 context.handle.task_hooks.poll_start_callback(&task_meta);
824
825 task.run();
826
827 #[cfg(tokio_unstable)]
828 context.handle.task_hooks.poll_stop_callback(&task_meta);
829 });
830
831 core = c;
832 }
833
834 core.metrics.end_processing_scheduled_tasks();
835
836 core = context.park_yield(core, handle);
839
840 core.metrics.start_processing_scheduled_tasks();
841 }
842 });
843
844 match ret {
845 Some(ret) => ret,
846 None => {
847 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
849 }
850 }
851 }
852
853 fn enter<F, R>(self, f: F) -> R
856 where
857 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
858 {
859 let context = self.context.expect_current_thread();
860
861 let core = context.core.borrow_mut().take().expect("core missing");
863
864 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
866
867 *context.core.borrow_mut() = Some(core);
868
869 ret
870 }
871}
872
873impl Drop for CoreGuard<'_> {
874 fn drop(&mut self) {
875 let context = self.context.expect_current_thread();
876
877 if let Some(core) = context.core.borrow_mut().take() {
878 self.scheduler.core.set(core);
881
882 self.scheduler.notify.notify_one();
884 }
885 }
886}