tokio/sync/mpsc/
block.rs

1use crate::loom::cell::UnsafeCell;
2use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
3
4use std::alloc::Layout;
5use std::mem::MaybeUninit;
6use std::ops;
7use std::ptr::{self, NonNull};
8use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
9
10/// A block in a linked list.
11///
12/// Each block in the list can hold up to `BLOCK_CAP` messages.
13pub(crate) struct Block<T> {
14    /// The header fields.
15    header: BlockHeader<T>,
16
17    /// Array containing values pushed into the block. Values are stored in a
18    /// continuous array in order to improve cache line behavior when reading.
19    /// The values must be manually dropped.
20    values: Values<T>,
21}
22
23/// Extra fields for a `Block<T>`.
24struct BlockHeader<T> {
25    /// The start index of this block.
26    ///
27    /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
28    start_index: usize,
29
30    /// The next block in the linked list.
31    next: AtomicPtr<Block<T>>,
32
33    /// Bitfield tracking slots that are ready to have their values consumed.
34    ready_slots: AtomicUsize,
35
36    /// The observed `tail_position` value *after* the block has been passed by
37    /// `block_tail`.
38    observed_tail_position: UnsafeCell<usize>,
39}
40
41pub(crate) enum Read<T> {
42    Value(T),
43    Closed,
44}
45
46#[repr(transparent)]
47struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
48
49use super::BLOCK_CAP;
50
51/// Masks an index to get the block identifier.
52const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
53
54/// Masks an index to get the value offset in a block.
55const SLOT_MASK: usize = BLOCK_CAP - 1;
56
57/// Flag tracking that a block has gone through the sender's release routine.
58///
59/// When this is set, the receiver may consider freeing the block.
60const RELEASED: usize = 1 << BLOCK_CAP;
61
62/// Flag tracking all senders dropped.
63///
64/// When this flag is set, the send half of the channel has closed.
65const TX_CLOSED: usize = RELEASED << 1;
66
67/// Mask covering all bits used to track slot readiness.
68const READY_MASK: usize = RELEASED - 1;
69
70/// Returns the index of the first slot in the block referenced by `slot_index`.
71#[inline(always)]
72pub(crate) fn start_index(slot_index: usize) -> usize {
73    BLOCK_MASK & slot_index
74}
75
76/// Returns the offset into the block referenced by `slot_index`.
77#[inline(always)]
78pub(crate) fn offset(slot_index: usize) -> usize {
79    SLOT_MASK & slot_index
80}
81
82generate_addr_of_methods! {
83    impl<T> Block<T> {
84        unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
85            &self.header
86        }
87
88        unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
89            &self.values
90        }
91    }
92}
93
94impl<T> Block<T> {
95    pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
96        unsafe {
97            // Allocate the block on the heap.
98            // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
99            let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
100            let block = match NonNull::new(block) {
101                Some(block) => block,
102                None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
103            };
104
105            // Write the header to the block.
106            Block::addr_of_header(block).as_ptr().write(BlockHeader {
107                // The absolute index in the channel of the first slot in the block.
108                start_index,
109
110                // Pointer to the next block in the linked list.
111                next: AtomicPtr::new(ptr::null_mut()),
112
113                ready_slots: AtomicUsize::new(0),
114
115                observed_tail_position: UnsafeCell::new(0),
116            });
117
118            // Initialize the values array.
119            Values::initialize(Block::addr_of_values(block));
120
121            // Convert the pointer to a `Box`.
122            // Safety: The raw pointer was allocated using the global allocator, and with
123            // the layout for a `Block<T>`, so it's valid to convert it to box.
124            Box::from_raw(block.as_ptr())
125        }
126    }
127
128    /// Returns `true` if the block matches the given index.
129    pub(crate) fn is_at_index(&self, index: usize) -> bool {
130        debug_assert!(offset(index) == 0);
131        self.header.start_index == index
132    }
133
134    /// Returns the number of blocks between `self` and the block at the
135    /// specified index.
136    ///
137    /// `start_index` must represent a block *after* `self`.
138    pub(crate) fn distance(&self, other_index: usize) -> usize {
139        debug_assert!(offset(other_index) == 0);
140        other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
141    }
142
143    /// Reads the value at the given offset.
144    ///
145    /// Returns `None` if the slot is empty.
146    ///
147    /// # Safety
148    ///
149    /// To maintain safety, the caller must ensure:
150    ///
151    /// * No concurrent access to the slot.
152    pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
153        let offset = offset(slot_index);
154
155        let ready_bits = self.header.ready_slots.load(Acquire);
156
157        if !is_ready(ready_bits, offset) {
158            if is_tx_closed(ready_bits) {
159                return Some(Read::Closed);
160            }
161
162            return None;
163        }
164
165        // Get the value
166        //
167        // Safety:
168        //
169        // 1. The caller guarantees that there is no concurrent access to the slot.
170        // 2. The `UnsafeCell` always give us a valid pointer to the value.
171        let value = self.values[offset].with(|ptr| unsafe { ptr::read(ptr) });
172
173        // Safety: the ready bit is set, so the value has been initialized.
174        Some(Read::Value(unsafe { value.assume_init() }))
175    }
176
177    /// Returns true if *this* block has a value in the given slot.
178    ///
179    /// Always returns false when given an index from a different block.
180    pub(crate) fn has_value(&self, slot_index: usize) -> bool {
181        if slot_index < self.header.start_index {
182            return false;
183        }
184        if slot_index >= self.header.start_index + super::BLOCK_CAP {
185            return false;
186        }
187
188        let offset = offset(slot_index);
189        let ready_bits = self.header.ready_slots.load(Acquire);
190        is_ready(ready_bits, offset)
191    }
192
193    /// Writes a value to the block at the given offset.
194    ///
195    /// # Safety
196    ///
197    /// To maintain safety, the caller must ensure:
198    ///
199    /// * The slot is empty.
200    /// * No concurrent access to the slot.
201    pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
202        // Get the offset into the block
203        let slot_offset = offset(slot_index);
204
205        self.values[slot_offset].with_mut(|ptr| {
206            // Safety: the caller guarantees that there is no concurrent access to the slot
207            unsafe {
208                ptr::write(ptr, MaybeUninit::new(value));
209            }
210        });
211
212        // Release the value. After this point, the slot ref may no longer
213        // be used. It is possible for the receiver to free the memory at
214        // any point.
215        self.set_ready(slot_offset);
216    }
217
218    /// Signal to the receiver that the sender half of the list is closed.
219    pub(crate) unsafe fn tx_close(&self) {
220        self.header.ready_slots.fetch_or(TX_CLOSED, Release);
221    }
222
223    pub(crate) unsafe fn is_closed(&self) -> bool {
224        let ready_bits = self.header.ready_slots.load(Acquire);
225        is_tx_closed(ready_bits)
226    }
227
228    /// Resets the block to a blank state. This enables reusing blocks in the
229    /// channel.
230    ///
231    /// # Safety
232    ///
233    /// To maintain safety, the caller must ensure:
234    ///
235    /// * All slots are empty.
236    /// * The caller holds a unique pointer to the block.
237    pub(crate) unsafe fn reclaim(&mut self) {
238        self.header.start_index = 0;
239        self.header.next = AtomicPtr::new(ptr::null_mut());
240        self.header.ready_slots = AtomicUsize::new(0);
241    }
242
243    /// Releases the block to the rx half for freeing.
244    ///
245    /// This function is called by the tx half once it can be guaranteed that no
246    /// more senders will attempt to access the block.
247    ///
248    /// # Safety
249    ///
250    /// To maintain safety, the caller must ensure:
251    ///
252    /// * The block will no longer be accessed by any sender.
253    pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
254        // Track the observed tail_position. Any sender targeting a greater
255        // tail_position is guaranteed to not access this block.
256        self.header
257            .observed_tail_position
258            // Safety:
259            //
260            // 1. The caller guarantees unique access to the block.
261            // 2. The `UnsafeCell` always gives us a valid pointer.
262            .with_mut(|ptr| unsafe { *ptr = tail_position });
263
264        // Set the released bit, signalling to the receiver that it is safe to
265        // free the block's memory as soon as all slots **prior** to
266        // `observed_tail_position` have been filled.
267        self.header.ready_slots.fetch_or(RELEASED, Release);
268    }
269
270    /// Mark a slot as ready
271    fn set_ready(&self, slot: usize) {
272        let mask = 1 << slot;
273        self.header.ready_slots.fetch_or(mask, Release);
274    }
275
276    /// Returns `true` when all slots have their `ready` bits set.
277    ///
278    /// This indicates that the block is in its final state and will no longer
279    /// be mutated.
280    pub(crate) fn is_final(&self) -> bool {
281        self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
282    }
283
284    /// Returns the `observed_tail_position` value, if set
285    pub(crate) fn observed_tail_position(&self) -> Option<usize> {
286        if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
287            None
288        } else {
289            Some(
290                self.header
291                    .observed_tail_position
292                    .with(|ptr| unsafe { *ptr }),
293            )
294        }
295    }
296
297    /// Loads the next block
298    pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
299        let ret = NonNull::new(self.header.next.load(ordering));
300
301        debug_assert!(unsafe {
302            ret.map_or(true, |block| {
303                block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
304            })
305        });
306
307        ret
308    }
309
310    /// Pushes `block` as the next block in the link.
311    ///
312    /// Returns Ok if successful, otherwise, a pointer to the next block in
313    /// the list is returned.
314    ///
315    /// This requires that the next pointer is null.
316    ///
317    /// # Ordering
318    ///
319    /// This performs a compare-and-swap on `next` using `AcqRel` ordering.
320    ///
321    /// # Safety
322    ///
323    /// To maintain safety, the caller must ensure:
324    ///
325    /// * `block` is not freed until it has been removed from the list.
326    pub(crate) unsafe fn try_push(
327        &self,
328        block: &mut NonNull<Block<T>>,
329        success: Ordering,
330        failure: Ordering,
331    ) -> Result<(), NonNull<Block<T>>> {
332        // Safety: caller guarantees that `block` is valid.
333        unsafe { block.as_mut() }.header.start_index =
334            self.header.start_index.wrapping_add(BLOCK_CAP);
335
336        let next_ptr = self
337            .header
338            .next
339            .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
340            .unwrap_or_else(|x| x);
341
342        match NonNull::new(next_ptr) {
343            Some(next_ptr) => Err(next_ptr),
344            None => Ok(()),
345        }
346    }
347
348    /// Grows the `Block` linked list by allocating and appending a new block.
349    ///
350    /// The next block in the linked list is returned. This may or may not be
351    /// the one allocated by the function call.
352    ///
353    /// # Implementation
354    ///
355    /// It is assumed that `self.next` is null. A new block is allocated with
356    /// `start_index` set to be the next block. A compare-and-swap is performed
357    /// with `AcqRel` memory ordering. If the compare-and-swap is successful, the
358    /// newly allocated block is released to other threads walking the block
359    /// linked list. If the compare-and-swap fails, the current thread acquires
360    /// the next block in the linked list, allowing the current thread to access
361    /// the slots.
362    pub(crate) fn grow(&self) -> NonNull<Block<T>> {
363        // Create the new block. It is assumed that the block will become the
364        // next one after `&self`. If this turns out to not be the case,
365        // `start_index` is updated accordingly.
366        let new_block = Block::new(self.header.start_index + BLOCK_CAP);
367
368        let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
369
370        // Attempt to store the block. The first compare-and-swap attempt is
371        // "unrolled" due to minor differences in logic
372        //
373        // `AcqRel` is used as the ordering **only** when attempting the
374        // compare-and-swap on self.next.
375        //
376        // If the compare-and-swap fails, then the actual value of the cell is
377        // returned from this function and accessed by the caller. Given this,
378        // the memory must be acquired.
379        //
380        // `Release` ensures that the newly allocated block is available to
381        // other threads acquiring the next pointer.
382        let next = NonNull::new(
383            self.header
384                .next
385                .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
386                .unwrap_or_else(|x| x),
387        );
388
389        let next = match next {
390            Some(next) => next,
391            None => {
392                // The compare-and-swap succeeded and the newly allocated block
393                // is successfully pushed.
394                return new_block;
395            }
396        };
397
398        // There already is a next block in the linked list. The newly allocated
399        // block could be dropped and the discovered next block returned;
400        // however, that would be wasteful. Instead, the linked list is walked
401        // by repeatedly attempting to compare-and-swap the pointer into the
402        // `next` register until the compare-and-swap succeed.
403        //
404        // Care is taken to update new_block's start_index field as appropriate.
405
406        let mut curr = next;
407
408        // TODO: Should this iteration be capped?
409        loop {
410            let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
411
412            curr = match actual {
413                Ok(()) => {
414                    return next;
415                }
416                Err(curr) => curr,
417            };
418
419            crate::loom::thread::yield_now();
420        }
421    }
422}
423
424/// Returns `true` if the specified slot has a value ready to be consumed.
425fn is_ready(bits: usize, slot: usize) -> bool {
426    let mask = 1 << slot;
427    mask == mask & bits
428}
429
430/// Returns `true` if the closed flag has been set.
431fn is_tx_closed(bits: usize) -> bool {
432    TX_CLOSED == bits & TX_CLOSED
433}
434
435impl<T> Values<T> {
436    /// Initialize a `Values` struct from a pointer.
437    ///
438    /// # Safety
439    ///
440    /// The raw pointer must be valid for writing a `Values<T>`.
441    unsafe fn initialize(_value: NonNull<Values<T>>) {
442        // When fuzzing, `UnsafeCell` needs to be initialized.
443        if_loom! {
444            let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
445            for i in 0..BLOCK_CAP {
446                unsafe {
447                    p.add(i).write(UnsafeCell::new(MaybeUninit::uninit()));
448                }
449            }
450        }
451    }
452}
453
454impl<T> ops::Index<usize> for Values<T> {
455    type Output = UnsafeCell<MaybeUninit<T>>;
456
457    fn index(&self, index: usize) -> &Self::Output {
458        self.0.index(index)
459    }
460}
461
462#[cfg(all(test, not(loom)))]
463#[test]
464fn assert_no_stack_overflow() {
465    // https://github.com/tokio-rs/tokio/issues/5293
466
467    struct Foo {
468        _a: [u8; 2_000_000],
469    }
470
471    assert_eq!(
472        Layout::new::<MaybeUninit<Block<Foo>>>(),
473        Layout::new::<Block<Foo>>()
474    );
475
476    let _block = Block::<Foo>::new(0);
477}