tokio/runtime/scheduler/multi_thread/queue.rs
1//! Run-queue structures to support a work-stealing scheduler
2
3use crate::loom::cell::UnsafeCell;
4use crate::loom::sync::Arc;
5use crate::runtime::scheduler::multi_thread::{Overflow, Stats};
6use crate::runtime::task;
7
8use std::mem::{self, MaybeUninit};
9use std::ptr;
10use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
11
12// Use wider integers when possible to increase ABA resilience.
13//
14// See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
15cfg_has_atomic_u64! {
16 type UnsignedShort = u32;
17 type UnsignedLong = u64;
18 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
19 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
20}
21cfg_not_has_atomic_u64! {
22 type UnsignedShort = u16;
23 type UnsignedLong = u32;
24 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
25 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
26}
27
28/// Producer handle. May only be used from a single thread.
29pub(crate) struct Local<T: 'static> {
30 inner: Arc<Inner<T>>,
31}
32
33/// Consumer handle. May be used from many threads.
34pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
35
36pub(crate) struct Inner<T: 'static> {
37 /// Concurrently updated by many threads.
38 ///
39 /// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of
40 /// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process
41 /// of stealing values. It represents the first value being stolen in the
42 /// batch. The `UnsignedShort` indices are intentionally wider than strictly
43 /// required for buffer indexing in order to provide ABA mitigation and make
44 /// it possible to distinguish between full and empty buffers.
45 ///
46 /// When both `UnsignedShort` values are the same, there is no active
47 /// stealer.
48 ///
49 /// Tracking an in-progress stealer prevents a wrapping scenario.
50 head: AtomicUnsignedLong,
51
52 /// Only updated by producer thread but read by many threads.
53 tail: AtomicUnsignedShort,
54
55 /// When a task is scheduled from a worker, it is stored in this slot. The
56 /// worker will check this slot for a task **before** checking the run
57 /// queue. This effectively results in the **last** scheduled task to be run
58 /// next (LIFO). This is an optimization for improving locality which
59 /// benefits message passing patterns and helps to reduce latency.
60 lifo: task::AtomicNotified<T>,
61
62 /// Elements
63 buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
64}
65
66unsafe impl<T> Send for Inner<T> {}
67unsafe impl<T> Sync for Inner<T> {}
68
69#[cfg(not(loom))]
70const LOCAL_QUEUE_CAPACITY: usize = 256;
71
72// Shrink the size of the local queue when using loom. This shouldn't impact
73// logic, but allows loom to test more edge cases in a reasonable a mount of
74// time.
75#[cfg(loom)]
76const LOCAL_QUEUE_CAPACITY: usize = 4;
77
78const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
79
80// Constructing the fixed size array directly is very awkward. The only way to
81// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as
82// the contents are not Copy. The trick with defining a const doesn't work for
83// generic types.
84fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
85 assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);
86
87 // safety: We check that the length is correct.
88 unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
89}
90
91/// Create a new local run-queue
92pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
93 let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
94
95 for _ in 0..LOCAL_QUEUE_CAPACITY {
96 buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
97 }
98
99 let inner = Arc::new(Inner {
100 head: AtomicUnsignedLong::new(0),
101 tail: AtomicUnsignedShort::new(0),
102 lifo: task::AtomicNotified::empty(),
103 buffer: make_fixed_size(buffer.into_boxed_slice()),
104 });
105
106 let local = Local {
107 inner: inner.clone(),
108 };
109
110 let remote = Steal(inner);
111
112 (remote, local)
113}
114
115impl<T> Local<T> {
116 /// Returns the number of entries in the queue
117 pub(crate) fn len(&self) -> usize {
118 let (_, head) = unpack(self.inner.head.load(Acquire));
119 let lifo = self.inner.lifo.is_some() as usize;
120 // safety: this is the **only** thread that updates this cell.
121 let tail = unsafe { self.inner.tail.unsync_load() };
122 len(head, tail) + lifo
123 }
124
125 /// How many tasks can be pushed into the queue
126 pub(crate) fn remaining_slots(&self) -> usize {
127 let (steal, _) = unpack(self.inner.head.load(Acquire));
128 // safety: this is the **only** thread that updates this cell.
129 let tail = unsafe { self.inner.tail.unsync_load() };
130
131 LOCAL_QUEUE_CAPACITY - len(steal, tail)
132 }
133
134 pub(crate) fn max_capacity(&self) -> usize {
135 LOCAL_QUEUE_CAPACITY
136 }
137
138 /// Returns false if there are any entries in the queue
139 ///
140 /// Separate to `is_stealable` so that refactors of `is_stealable` to "protect"
141 /// some tasks from stealing won't affect this
142 pub(crate) fn has_tasks(&self) -> bool {
143 self.len() != 0
144 }
145
146 /// Pushes a batch of tasks to the back of the queue. All tasks must fit in
147 /// the local queue.
148 ///
149 /// # Panics
150 ///
151 /// The method panics if there is not enough capacity to fit in the queue.
152 pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
153 let len = tasks.len();
154 assert!(len <= LOCAL_QUEUE_CAPACITY);
155
156 if len == 0 {
157 // Nothing to do
158 return;
159 }
160
161 let head = self.inner.head.load(Acquire);
162 let (steal, _) = unpack(head);
163
164 // safety: this is the **only** thread that updates this cell.
165 let mut tail = unsafe { self.inner.tail.unsync_load() };
166
167 if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
168 // Yes, this if condition is structured a bit weird (first block
169 // does nothing, second returns an error). It is this way to match
170 // `push_back_or_overflow`.
171 } else {
172 panic!()
173 }
174
175 for task in tasks {
176 let idx = tail as usize & MASK;
177
178 self.inner.buffer[idx].with_mut(|ptr| {
179 // Write the task to the slot
180 //
181 // Safety: There is only one producer and the above `if`
182 // condition ensures we don't touch a cell if there is a
183 // value, thus no consumer.
184 unsafe {
185 ptr::write((*ptr).as_mut_ptr(), task);
186 }
187 });
188
189 tail = tail.wrapping_add(1);
190 }
191
192 self.inner.tail.store(tail, Release);
193 }
194
195 /// Pushes a task to the back of the local queue, if there is not enough
196 /// capacity in the queue, this triggers the overflow operation.
197 ///
198 /// When the queue overflows, half of the current contents of the queue is
199 /// moved to the given Injection queue. This frees up capacity for more
200 /// tasks to be pushed into the local queue.
201 pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
202 &mut self,
203 mut task: task::Notified<T>,
204 overflow: &O,
205 stats: &mut Stats,
206 ) {
207 let tail = loop {
208 let head = self.inner.head.load(Acquire);
209 let (steal, real) = unpack(head);
210
211 // safety: this is the **only** thread that updates this cell.
212 let tail = unsafe { self.inner.tail.unsync_load() };
213
214 if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
215 // There is capacity for the task
216 break tail;
217 } else if steal != real {
218 // Concurrently stealing, this will free up capacity, so only
219 // push the task onto the inject queue
220 overflow.push(task);
221 return;
222 } else {
223 // Push the current task and half of the queue into the
224 // inject queue.
225 match self.push_overflow(task, real, tail, overflow, stats) {
226 Ok(_) => return,
227 // Lost the race, try again
228 Err(v) => {
229 task = v;
230 }
231 }
232 }
233 };
234
235 self.push_back_finish(task, tail);
236 }
237
238 // Second half of `push_back`
239 fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
240 // Map the position to a slot index.
241 let idx = tail as usize & MASK;
242
243 self.inner.buffer[idx].with_mut(|ptr| {
244 // Write the task to the slot
245 //
246 // Safety: There is only one producer and the above `if`
247 // condition ensures we don't touch a cell if there is a
248 // value, thus no consumer.
249 unsafe {
250 ptr::write((*ptr).as_mut_ptr(), task);
251 }
252 });
253
254 // Make the task available. Synchronizes with a load in
255 // `steal_into2`.
256 self.inner.tail.store(tail.wrapping_add(1), Release);
257 }
258
259 /// Moves a batch of tasks into the inject queue.
260 ///
261 /// This will temporarily make some of the tasks unavailable to stealers.
262 /// Once `push_overflow` is done, a notification is sent out, so if other
263 /// workers "missed" some of the tasks during a steal, they will get
264 /// another opportunity.
265 #[inline(never)]
266 fn push_overflow<O: Overflow<T>>(
267 &mut self,
268 task: task::Notified<T>,
269 head: UnsignedShort,
270 tail: UnsignedShort,
271 overflow: &O,
272 stats: &mut Stats,
273 ) -> Result<(), task::Notified<T>> {
274 /// How many elements are we taking from the local queue.
275 ///
276 /// This is one less than the number of tasks pushed to the inject
277 /// queue as we are also inserting the `task` argument.
278 const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort;
279
280 assert_eq!(
281 tail.wrapping_sub(head) as usize,
282 LOCAL_QUEUE_CAPACITY,
283 "queue is not full; tail = {tail}; head = {head}"
284 );
285
286 // Claim all tasks.
287 //
288 // We are claiming the tasks **before** reading them out of the buffer.
289 // This is safe because only the **current** thread is able to push new
290 // tasks.
291 //
292 // There isn't really any need for memory ordering... Relaxed would
293 // work. This is because all tasks are pushed into the queue from the
294 // current thread (or memory has been acquired if the local queue handle
295 // moved).
296 if self
297 .inner
298 .head
299 .compare_exchange_weak(pack(head, head), pack(tail, tail), Release, Relaxed)
300 .is_err()
301 {
302 // We failed to claim the tasks, losing the race. Return out of
303 // this function and try the full `push` routine again. The queue
304 // may not be full anymore.
305 return Err(task);
306 }
307
308 // Add back the first half of tasks.
309 //
310 // We are doing it this way instead of just taking half of the tasks because we want the
311 // *second* half of the tasks, and if you just incremented `head` by `NUM_TASKS_TAKEN`,
312 // then you would be taking the first half instead of the second half.
313 //
314 // Pushing the second half of the local queue to the injection queue is better because when
315 // we take tasks *out* of the injection queue, we always place them in the first half. This
316 // means that if a task is in the second half, then we know for sure that this task is not
317 // a task we just got from the injection queue. This ensures that when we take a task out
318 // of the injection queue, then it will not be moved back into the injection queue (at
319 // least not until after we have polled it at least once).
320 //
321 // Note that if a concurrent worker tries to steal from us between these two operations and
322 // sees that the worker queue is empty, then that worker may go to sleep, and we do not
323 // notify it about these tasks becoming available for stealing again. Ordinarily this would
324 // be a problem, but it isn't in this case because the worker will be notified about the
325 // tasks we are adding to the injection queue instead, which ensures that the stealer wakes
326 // up again to take the tasks from the injection queue.
327 self.inner
328 .tail
329 .store(tail.wrapping_add(NUM_TASKS_TAKEN), Release);
330
331 /// An iterator that takes elements out of the run queue.
332 struct BatchTaskIter<'a, T: 'static> {
333 buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
334 head: UnsignedLong,
335 i: UnsignedLong,
336 }
337 impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
338 type Item = task::Notified<T>;
339
340 #[inline]
341 fn next(&mut self) -> Option<task::Notified<T>> {
342 if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) {
343 None
344 } else {
345 let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
346 let slot = &self.buffer[i_idx];
347
348 // safety: Our CAS from before has assumed exclusive ownership
349 // of the task pointers in this range.
350 let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
351
352 self.i += 1;
353 Some(task)
354 }
355 }
356 }
357
358 // safety: The CAS above ensures that no consumer will look at these
359 // values again, and we are the only producer.
360 let batch_iter = BatchTaskIter {
361 buffer: &self.inner.buffer,
362 head: head.wrapping_add(NUM_TASKS_TAKEN) as UnsignedLong,
363 i: 0,
364 };
365 overflow.push_batch(batch_iter.chain(std::iter::once(task)));
366
367 // Add 1 to factor in the task currently being scheduled.
368 stats.incr_overflow_count();
369
370 Ok(())
371 }
372
373 /// Pops a task from the local queue.
374 pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
375 let mut head = self.inner.head.load(Acquire);
376
377 let idx = loop {
378 let (steal, real) = unpack(head);
379
380 // safety: this is the **only** thread that updates this cell.
381 let tail = unsafe { self.inner.tail.unsync_load() };
382
383 if real == tail {
384 // queue is empty
385 return None;
386 }
387
388 let next_real = real.wrapping_add(1);
389
390 // If `steal == real` there are no concurrent stealers. Both `steal`
391 // and `real` are updated.
392 let next = if steal == real {
393 pack(next_real, next_real)
394 } else {
395 assert_ne!(steal, next_real);
396 pack(steal, next_real)
397 };
398
399 // Attempt to claim a task.
400 let res = self
401 .inner
402 .head
403 .compare_exchange_weak(head, next, AcqRel, Acquire);
404
405 match res {
406 Ok(_) => break real as usize & MASK,
407 Err(actual) => head = actual,
408 }
409 };
410
411 Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
412 }
413
414 /// Pushes a task to the LIFO slot, returning the task previously in the
415 /// LIFO slot (if there was one).
416 pub(crate) fn push_lifo(&self, task: task::Notified<T>) -> Option<task::Notified<T>> {
417 self.inner.lifo.swap(Some(task))
418 }
419
420 /// Pops the task currently held in the LIFO slot, if there is one;
421 /// otherwise, returns `None`.
422 pub(crate) fn pop_lifo(&self) -> Option<task::Notified<T>> {
423 // LIFO-suction!
424 self.inner.lifo.take()
425 }
426}
427
428impl<T> Steal<T> {
429 /// Returns the number of entries in the queue
430 pub(crate) fn len(&self) -> usize {
431 let (_, head) = unpack(self.0.head.load(Acquire));
432 let tail = self.0.tail.load(Acquire);
433 let lifo = self.0.lifo.is_some() as usize;
434 len(head, tail) + lifo
435 }
436
437 /// Return true if the queue is empty,
438 /// false if there are any entries in the queue
439 pub(crate) fn is_empty(&self) -> bool {
440 self.len() == 0
441 }
442
443 /// Steals half the tasks from self and place them into `dst`.
444 pub(crate) fn steal_into(
445 &self,
446 dst: &mut Local<T>,
447 dst_stats: &mut Stats,
448 ) -> Option<task::Notified<T>> {
449 // Safety: the caller is the only thread that mutates `dst.tail` and
450 // holds a mutable reference.
451 let dst_tail = unsafe { dst.inner.tail.unsync_load() };
452
453 // To the caller, `dst` may **look** empty but still have values
454 // contained in the buffer. If another thread is concurrently stealing
455 // from `dst` there may not be enough capacity to steal.
456 let (steal, _) = unpack(dst.inner.head.load(Acquire));
457
458 if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 {
459 // we *could* try to steal less here, but for simplicity, we're just
460 // going to abort.
461 return None;
462 }
463
464 // Steal the tasks into `dst`'s buffer. This does not yet expose the
465 // tasks in `dst`.
466 let mut n = self.steal_into2(dst, dst_tail);
467
468 if n == 0 {
469 // If no tasks were stolen, let's see if there's one in the LIFO
470 // slot.
471 let lifo = self.0.lifo.take();
472 if lifo.is_some() {
473 dst_stats.incr_steal_count(1);
474 dst_stats.incr_steal_operations();
475 }
476 return lifo;
477 }
478
479 dst_stats.incr_steal_count(n as u16);
480 dst_stats.incr_steal_operations();
481
482 // We are returning a task here
483 n -= 1;
484
485 let ret_pos = dst_tail.wrapping_add(n);
486 let ret_idx = ret_pos as usize & MASK;
487
488 // safety: the value was written as part of `steal_into2` and not
489 // exposed to stealers, so no other thread can access it.
490 let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
491
492 if n == 0 {
493 // The `dst` queue is empty, but a single task was stolen
494 return Some(ret);
495 }
496
497 // Make the stolen items available to consumers
498 dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
499
500 Some(ret)
501 }
502
503 // Steal tasks from `self`, placing them into `dst`. Returns the number of
504 // tasks that were stolen.
505 fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
506 let mut prev_packed = self.0.head.load(Acquire);
507 let mut next_packed;
508
509 let n = loop {
510 let (src_head_steal, src_head_real) = unpack(prev_packed);
511 let src_tail = self.0.tail.load(Acquire);
512
513 // If these two do not match, another thread is concurrently
514 // stealing from the queue.
515 if src_head_steal != src_head_real {
516 return 0;
517 }
518
519 // Number of available tasks to steal
520 let n = src_tail.wrapping_sub(src_head_real);
521 let n = n - n / 2;
522
523 if n == 0 {
524 // No tasks available to steal
525 return 0;
526 }
527
528 // Update the real head index to acquire the tasks.
529 let steal_to = src_head_real.wrapping_add(n);
530 assert_ne!(src_head_steal, steal_to);
531 next_packed = pack(src_head_steal, steal_to);
532
533 // Claim all those tasks. This is done by incrementing the "real"
534 // head but not the steal. By doing this, no other thread is able to
535 // steal from this queue until the current thread completes.
536 let res = self
537 .0
538 .head
539 .compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
540
541 match res {
542 Ok(_) => break n,
543 Err(actual) => prev_packed = actual,
544 }
545 };
546
547 assert!(
548 n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2,
549 "actual = {n}"
550 );
551
552 let (first, _) = unpack(next_packed);
553
554 // Take all the tasks
555 for i in 0..n {
556 // Compute the positions
557 let src_pos = first.wrapping_add(i);
558 let dst_pos = dst_tail.wrapping_add(i);
559
560 // Map to slots
561 let src_idx = src_pos as usize & MASK;
562 let dst_idx = dst_pos as usize & MASK;
563
564 // Read the task
565 //
566 // safety: We acquired the task with the atomic exchange above.
567 let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
568
569 // Write the task to the new slot
570 //
571 // safety: `dst` queue is empty and we are the only producer to
572 // this queue.
573 dst.inner.buffer[dst_idx]
574 .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
575 }
576
577 let mut prev_packed = next_packed;
578
579 // Update `src_head_steal` to match `src_head_real` signalling that the
580 // stealing routine is complete.
581 loop {
582 let head = unpack(prev_packed).1;
583 next_packed = pack(head, head);
584
585 let res = self
586 .0
587 .head
588 .compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
589
590 match res {
591 Ok(_) => return n,
592 Err(actual) => prev_packed = actual,
593 }
594 }
595 }
596}
597
598impl<T> Clone for Steal<T> {
599 fn clone(&self) -> Steal<T> {
600 Steal(self.0.clone())
601 }
602}
603
604impl<T> Drop for Local<T> {
605 fn drop(&mut self) {
606 if !std::thread::panicking() {
607 assert!(self.pop().is_none(), "queue not empty");
608 assert!(self.pop_lifo().is_none(), "LIFO slot not empty");
609 }
610 }
611}
612
613/// Calculate the length of the queue using the head and tail.
614/// The `head` can be the `steal` or `real` head.
615fn len(head: UnsignedShort, tail: UnsignedShort) -> usize {
616 tail.wrapping_sub(head) as usize
617}
618
619/// Split the head value into the real head and the index a stealer is working
620/// on.
621fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
622 let real = n & UnsignedShort::MAX as UnsignedLong;
623 let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
624
625 (steal as UnsignedShort, real as UnsignedShort)
626}
627
628/// Join the two head values
629fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
630 (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
631}
632
633#[test]
634fn test_local_queue_capacity() {
635 assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize);
636}