tokio/runtime/scheduler/multi_thread/
queue.rs

1//! Run-queue structures to support a work-stealing scheduler
2
3use crate::loom::cell::UnsafeCell;
4use crate::loom::sync::Arc;
5use crate::runtime::scheduler::multi_thread::{Overflow, Stats};
6use crate::runtime::task;
7
8use std::mem::{self, MaybeUninit};
9use std::ptr;
10use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
11
12// Use wider integers when possible to increase ABA resilience.
13//
14// See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
15cfg_has_atomic_u64! {
16    type UnsignedShort = u32;
17    type UnsignedLong = u64;
18    type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
19    type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
20}
21cfg_not_has_atomic_u64! {
22    type UnsignedShort = u16;
23    type UnsignedLong = u32;
24    type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
25    type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
26}
27
28/// Producer handle. May only be used from a single thread.
29pub(crate) struct Local<T: 'static> {
30    inner: Arc<Inner<T>>,
31}
32
33/// Consumer handle. May be used from many threads.
34pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
35
36pub(crate) struct Inner<T: 'static> {
37    /// Concurrently updated by many threads.
38    ///
39    /// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of
40    /// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process
41    /// of stealing values. It represents the first value being stolen in the
42    /// batch. The `UnsignedShort` indices are intentionally wider than strictly
43    /// required for buffer indexing in order to provide ABA mitigation and make
44    /// it possible to distinguish between full and empty buffers.
45    ///
46    /// When both `UnsignedShort` values are the same, there is no active
47    /// stealer.
48    ///
49    /// Tracking an in-progress stealer prevents a wrapping scenario.
50    head: AtomicUnsignedLong,
51
52    /// Only updated by producer thread but read by many threads.
53    tail: AtomicUnsignedShort,
54
55    /// When a task is scheduled from a worker, it is stored in this slot. The
56    /// worker will check this slot for a task **before** checking the run
57    /// queue. This effectively results in the **last** scheduled task to be run
58    /// next (LIFO). This is an optimization for improving locality which
59    /// benefits message passing patterns and helps to reduce latency.
60    lifo: task::AtomicNotified<T>,
61
62    /// Elements
63    buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
64}
65
66unsafe impl<T> Send for Inner<T> {}
67unsafe impl<T> Sync for Inner<T> {}
68
69#[cfg(not(loom))]
70const LOCAL_QUEUE_CAPACITY: usize = 256;
71
72// Shrink the size of the local queue when using loom. This shouldn't impact
73// logic, but allows loom to test more edge cases in a reasonable a mount of
74// time.
75#[cfg(loom)]
76const LOCAL_QUEUE_CAPACITY: usize = 4;
77
78const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
79
80// Constructing the fixed size array directly is very awkward. The only way to
81// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as
82// the contents are not Copy. The trick with defining a const doesn't work for
83// generic types.
84fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
85    assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);
86
87    // safety: We check that the length is correct.
88    unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
89}
90
91/// Create a new local run-queue
92pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
93    let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
94
95    for _ in 0..LOCAL_QUEUE_CAPACITY {
96        buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
97    }
98
99    let inner = Arc::new(Inner {
100        head: AtomicUnsignedLong::new(0),
101        tail: AtomicUnsignedShort::new(0),
102        lifo: task::AtomicNotified::empty(),
103        buffer: make_fixed_size(buffer.into_boxed_slice()),
104    });
105
106    let local = Local {
107        inner: inner.clone(),
108    };
109
110    let remote = Steal(inner);
111
112    (remote, local)
113}
114
115impl<T> Local<T> {
116    /// Returns the number of entries in the queue
117    pub(crate) fn len(&self) -> usize {
118        let (_, head) = unpack(self.inner.head.load(Acquire));
119        let lifo = self.inner.lifo.is_some() as usize;
120        // safety: this is the **only** thread that updates this cell.
121        let tail = unsafe { self.inner.tail.unsync_load() };
122        len(head, tail) + lifo
123    }
124
125    /// How many tasks can be pushed into the queue
126    pub(crate) fn remaining_slots(&self) -> usize {
127        let (steal, _) = unpack(self.inner.head.load(Acquire));
128        // safety: this is the **only** thread that updates this cell.
129        let tail = unsafe { self.inner.tail.unsync_load() };
130
131        LOCAL_QUEUE_CAPACITY - len(steal, tail)
132    }
133
134    pub(crate) fn max_capacity(&self) -> usize {
135        LOCAL_QUEUE_CAPACITY
136    }
137
138    /// Returns false if there are any entries in the queue
139    ///
140    /// Separate to `is_stealable` so that refactors of `is_stealable` to "protect"
141    /// some tasks from stealing won't affect this
142    pub(crate) fn has_tasks(&self) -> bool {
143        self.len() != 0
144    }
145
146    /// Pushes a batch of tasks to the back of the queue. All tasks must fit in
147    /// the local queue.
148    ///
149    /// # Panics
150    ///
151    /// The method panics if there is not enough capacity to fit in the queue.
152    pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
153        let len = tasks.len();
154        assert!(len <= LOCAL_QUEUE_CAPACITY);
155
156        if len == 0 {
157            // Nothing to do
158            return;
159        }
160
161        let head = self.inner.head.load(Acquire);
162        let (steal, _) = unpack(head);
163
164        // safety: this is the **only** thread that updates this cell.
165        let mut tail = unsafe { self.inner.tail.unsync_load() };
166
167        if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
168            // Yes, this if condition is structured a bit weird (first block
169            // does nothing, second returns an error). It is this way to match
170            // `push_back_or_overflow`.
171        } else {
172            panic!()
173        }
174
175        for task in tasks {
176            let idx = tail as usize & MASK;
177
178            self.inner.buffer[idx].with_mut(|ptr| {
179                // Write the task to the slot
180                //
181                // Safety: There is only one producer and the above `if`
182                // condition ensures we don't touch a cell if there is a
183                // value, thus no consumer.
184                unsafe {
185                    ptr::write((*ptr).as_mut_ptr(), task);
186                }
187            });
188
189            tail = tail.wrapping_add(1);
190        }
191
192        self.inner.tail.store(tail, Release);
193    }
194
195    /// Pushes a task to the back of the local queue, if there is not enough
196    /// capacity in the queue, this triggers the overflow operation.
197    ///
198    /// When the queue overflows, half of the current contents of the queue is
199    /// moved to the given Injection queue. This frees up capacity for more
200    /// tasks to be pushed into the local queue.
201    pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
202        &mut self,
203        mut task: task::Notified<T>,
204        overflow: &O,
205        stats: &mut Stats,
206    ) {
207        let tail = loop {
208            let head = self.inner.head.load(Acquire);
209            let (steal, real) = unpack(head);
210
211            // safety: this is the **only** thread that updates this cell.
212            let tail = unsafe { self.inner.tail.unsync_load() };
213
214            if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
215                // There is capacity for the task
216                break tail;
217            } else if steal != real {
218                // Concurrently stealing, this will free up capacity, so only
219                // push the task onto the inject queue
220                overflow.push(task);
221                return;
222            } else {
223                // Push the current task and half of the queue into the
224                // inject queue.
225                match self.push_overflow(task, real, tail, overflow, stats) {
226                    Ok(_) => return,
227                    // Lost the race, try again
228                    Err(v) => {
229                        task = v;
230                    }
231                }
232            }
233        };
234
235        self.push_back_finish(task, tail);
236    }
237
238    // Second half of `push_back`
239    fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
240        // Map the position to a slot index.
241        let idx = tail as usize & MASK;
242
243        self.inner.buffer[idx].with_mut(|ptr| {
244            // Write the task to the slot
245            //
246            // Safety: There is only one producer and the above `if`
247            // condition ensures we don't touch a cell if there is a
248            // value, thus no consumer.
249            unsafe {
250                ptr::write((*ptr).as_mut_ptr(), task);
251            }
252        });
253
254        // Make the task available. Synchronizes with a load in
255        // `steal_into2`.
256        self.inner.tail.store(tail.wrapping_add(1), Release);
257    }
258
259    /// Moves a batch of tasks into the inject queue.
260    ///
261    /// This will temporarily make some of the tasks unavailable to stealers.
262    /// Once `push_overflow` is done, a notification is sent out, so if other
263    /// workers "missed" some of the tasks during a steal, they will get
264    /// another opportunity.
265    #[inline(never)]
266    fn push_overflow<O: Overflow<T>>(
267        &mut self,
268        task: task::Notified<T>,
269        head: UnsignedShort,
270        tail: UnsignedShort,
271        overflow: &O,
272        stats: &mut Stats,
273    ) -> Result<(), task::Notified<T>> {
274        /// How many elements are we taking from the local queue.
275        ///
276        /// This is one less than the number of tasks pushed to the inject
277        /// queue as we are also inserting the `task` argument.
278        const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort;
279
280        assert_eq!(
281            tail.wrapping_sub(head) as usize,
282            LOCAL_QUEUE_CAPACITY,
283            "queue is not full; tail = {tail}; head = {head}"
284        );
285
286        // Claim all tasks.
287        //
288        // We are claiming the tasks **before** reading them out of the buffer.
289        // This is safe because only the **current** thread is able to push new
290        // tasks.
291        //
292        // There isn't really any need for memory ordering... Relaxed would
293        // work. This is because all tasks are pushed into the queue from the
294        // current thread (or memory has been acquired if the local queue handle
295        // moved).
296        if self
297            .inner
298            .head
299            .compare_exchange_weak(pack(head, head), pack(tail, tail), Release, Relaxed)
300            .is_err()
301        {
302            // We failed to claim the tasks, losing the race. Return out of
303            // this function and try the full `push` routine again. The queue
304            // may not be full anymore.
305            return Err(task);
306        }
307
308        // Add back the first half of tasks.
309        //
310        // We are doing it this way instead of just taking half of the tasks because we want the
311        // *second* half of the tasks, and if you just incremented `head` by `NUM_TASKS_TAKEN`,
312        // then you would be taking the first half instead of the second half.
313        //
314        // Pushing the second half of the local queue to the injection queue is better because when
315        // we take tasks *out* of the injection queue, we always place them in the first half. This
316        // means that if a task is in the second half, then we know for sure that this task is not
317        // a task we just got from the injection queue. This ensures that when we take a task out
318        // of the injection queue, then it will not be moved back into the injection queue (at
319        // least not until after we have polled it at least once).
320        //
321        // Note that if a concurrent worker tries to steal from us between these two operations and
322        // sees that the worker queue is empty, then that worker may go to sleep, and we do not
323        // notify it about these tasks becoming available for stealing again. Ordinarily this would
324        // be a problem, but it isn't in this case because the worker will be notified about the
325        // tasks we are adding to the injection queue instead, which ensures that the stealer wakes
326        // up again to take the tasks from the injection queue.
327        self.inner
328            .tail
329            .store(tail.wrapping_add(NUM_TASKS_TAKEN), Release);
330
331        /// An iterator that takes elements out of the run queue.
332        struct BatchTaskIter<'a, T: 'static> {
333            buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
334            head: UnsignedLong,
335            i: UnsignedLong,
336        }
337        impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
338            type Item = task::Notified<T>;
339
340            #[inline]
341            fn next(&mut self) -> Option<task::Notified<T>> {
342                if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) {
343                    None
344                } else {
345                    let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
346                    let slot = &self.buffer[i_idx];
347
348                    // safety: Our CAS from before has assumed exclusive ownership
349                    // of the task pointers in this range.
350                    let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
351
352                    self.i += 1;
353                    Some(task)
354                }
355            }
356        }
357
358        // safety: The CAS above ensures that no consumer will look at these
359        // values again, and we are the only producer.
360        let batch_iter = BatchTaskIter {
361            buffer: &self.inner.buffer,
362            head: head.wrapping_add(NUM_TASKS_TAKEN) as UnsignedLong,
363            i: 0,
364        };
365        overflow.push_batch(batch_iter.chain(std::iter::once(task)));
366
367        // Add 1 to factor in the task currently being scheduled.
368        stats.incr_overflow_count();
369
370        Ok(())
371    }
372
373    /// Pops a task from the local queue.
374    pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
375        let mut head = self.inner.head.load(Acquire);
376
377        let idx = loop {
378            let (steal, real) = unpack(head);
379
380            // safety: this is the **only** thread that updates this cell.
381            let tail = unsafe { self.inner.tail.unsync_load() };
382
383            if real == tail {
384                // queue is empty
385                return None;
386            }
387
388            let next_real = real.wrapping_add(1);
389
390            // If `steal == real` there are no concurrent stealers. Both `steal`
391            // and `real` are updated.
392            let next = if steal == real {
393                pack(next_real, next_real)
394            } else {
395                assert_ne!(steal, next_real);
396                pack(steal, next_real)
397            };
398
399            // Attempt to claim a task.
400            let res = self
401                .inner
402                .head
403                .compare_exchange_weak(head, next, AcqRel, Acquire);
404
405            match res {
406                Ok(_) => break real as usize & MASK,
407                Err(actual) => head = actual,
408            }
409        };
410
411        Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
412    }
413
414    /// Pushes a task to the LIFO slot, returning the task previously in the
415    /// LIFO slot (if there was one).
416    pub(crate) fn push_lifo(&self, task: task::Notified<T>) -> Option<task::Notified<T>> {
417        self.inner.lifo.swap(Some(task))
418    }
419
420    /// Pops the task currently held in the LIFO slot, if there is one;
421    /// otherwise, returns `None`.
422    pub(crate) fn pop_lifo(&self) -> Option<task::Notified<T>> {
423        // LIFO-suction!
424        self.inner.lifo.take()
425    }
426}
427
428impl<T> Steal<T> {
429    /// Returns the number of entries in the queue
430    pub(crate) fn len(&self) -> usize {
431        let (_, head) = unpack(self.0.head.load(Acquire));
432        let tail = self.0.tail.load(Acquire);
433        let lifo = self.0.lifo.is_some() as usize;
434        len(head, tail) + lifo
435    }
436
437    /// Return true if the queue is empty,
438    /// false if there are any entries in the queue
439    pub(crate) fn is_empty(&self) -> bool {
440        self.len() == 0
441    }
442
443    /// Steals half the tasks from self and place them into `dst`.
444    pub(crate) fn steal_into(
445        &self,
446        dst: &mut Local<T>,
447        dst_stats: &mut Stats,
448    ) -> Option<task::Notified<T>> {
449        // Safety: the caller is the only thread that mutates `dst.tail` and
450        // holds a mutable reference.
451        let dst_tail = unsafe { dst.inner.tail.unsync_load() };
452
453        // To the caller, `dst` may **look** empty but still have values
454        // contained in the buffer. If another thread is concurrently stealing
455        // from `dst` there may not be enough capacity to steal.
456        let (steal, _) = unpack(dst.inner.head.load(Acquire));
457
458        if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 {
459            // we *could* try to steal less here, but for simplicity, we're just
460            // going to abort.
461            return None;
462        }
463
464        // Steal the tasks into `dst`'s buffer. This does not yet expose the
465        // tasks in `dst`.
466        let mut n = self.steal_into2(dst, dst_tail);
467
468        if n == 0 {
469            // If no tasks were stolen, let's see if there's one in the LIFO
470            // slot.
471            let lifo = self.0.lifo.take();
472            if lifo.is_some() {
473                dst_stats.incr_steal_count(1);
474                dst_stats.incr_steal_operations();
475            }
476            return lifo;
477        }
478
479        dst_stats.incr_steal_count(n as u16);
480        dst_stats.incr_steal_operations();
481
482        // We are returning a task here
483        n -= 1;
484
485        let ret_pos = dst_tail.wrapping_add(n);
486        let ret_idx = ret_pos as usize & MASK;
487
488        // safety: the value was written as part of `steal_into2` and not
489        // exposed to stealers, so no other thread can access it.
490        let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
491
492        if n == 0 {
493            // The `dst` queue is empty, but a single task was stolen
494            return Some(ret);
495        }
496
497        // Make the stolen items available to consumers
498        dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
499
500        Some(ret)
501    }
502
503    // Steal tasks from `self`, placing them into `dst`. Returns the number of
504    // tasks that were stolen.
505    fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
506        let mut prev_packed = self.0.head.load(Acquire);
507        let mut next_packed;
508
509        let n = loop {
510            let (src_head_steal, src_head_real) = unpack(prev_packed);
511            let src_tail = self.0.tail.load(Acquire);
512
513            // If these two do not match, another thread is concurrently
514            // stealing from the queue.
515            if src_head_steal != src_head_real {
516                return 0;
517            }
518
519            // Number of available tasks to steal
520            let n = src_tail.wrapping_sub(src_head_real);
521            let n = n - n / 2;
522
523            if n == 0 {
524                // No tasks available to steal
525                return 0;
526            }
527
528            // Update the real head index to acquire the tasks.
529            let steal_to = src_head_real.wrapping_add(n);
530            assert_ne!(src_head_steal, steal_to);
531            next_packed = pack(src_head_steal, steal_to);
532
533            // Claim all those tasks. This is done by incrementing the "real"
534            // head but not the steal. By doing this, no other thread is able to
535            // steal from this queue until the current thread completes.
536            let res = self
537                .0
538                .head
539                .compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
540
541            match res {
542                Ok(_) => break n,
543                Err(actual) => prev_packed = actual,
544            }
545        };
546
547        assert!(
548            n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2,
549            "actual = {n}"
550        );
551
552        let (first, _) = unpack(next_packed);
553
554        // Take all the tasks
555        for i in 0..n {
556            // Compute the positions
557            let src_pos = first.wrapping_add(i);
558            let dst_pos = dst_tail.wrapping_add(i);
559
560            // Map to slots
561            let src_idx = src_pos as usize & MASK;
562            let dst_idx = dst_pos as usize & MASK;
563
564            // Read the task
565            //
566            // safety: We acquired the task with the atomic exchange above.
567            let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
568
569            // Write the task to the new slot
570            //
571            // safety: `dst` queue is empty and we are the only producer to
572            // this queue.
573            dst.inner.buffer[dst_idx]
574                .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
575        }
576
577        let mut prev_packed = next_packed;
578
579        // Update `src_head_steal` to match `src_head_real` signalling that the
580        // stealing routine is complete.
581        loop {
582            let head = unpack(prev_packed).1;
583            next_packed = pack(head, head);
584
585            let res = self
586                .0
587                .head
588                .compare_exchange_weak(prev_packed, next_packed, AcqRel, Acquire);
589
590            match res {
591                Ok(_) => return n,
592                Err(actual) => prev_packed = actual,
593            }
594        }
595    }
596}
597
598impl<T> Clone for Steal<T> {
599    fn clone(&self) -> Steal<T> {
600        Steal(self.0.clone())
601    }
602}
603
604impl<T> Drop for Local<T> {
605    fn drop(&mut self) {
606        if !std::thread::panicking() {
607            assert!(self.pop().is_none(), "queue not empty");
608            assert!(self.pop_lifo().is_none(), "LIFO slot not empty");
609        }
610    }
611}
612
613/// Calculate the length of the queue using the head and tail.
614/// The `head` can be the `steal` or `real` head.
615fn len(head: UnsignedShort, tail: UnsignedShort) -> usize {
616    tail.wrapping_sub(head) as usize
617}
618
619/// Split the head value into the real head and the index a stealer is working
620/// on.
621fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
622    let real = n & UnsignedShort::MAX as UnsignedLong;
623    let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
624
625    (steal as UnsignedShort, real as UnsignedShort)
626}
627
628/// Join the two head values
629fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
630    (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
631}
632
633#[test]
634fn test_local_queue_capacity() {
635    assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize);
636}