Skip to main content

tokio/sync/mpsc/
list.rs

1//! A concurrent, lock-free, FIFO list.
2
3use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4use crate::loom::thread;
5use crate::sync::mpsc::block::{self, Block};
6
7use std::fmt;
8use std::ptr::NonNull;
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10
11/// List queue transmit handle.
12pub(crate) struct Tx<T> {
13    /// Tail in the `Block` mpmc list.
14    block_tail: AtomicPtr<Block<T>>,
15
16    /// Position to push the next message. This references a block and offset
17    /// into the block.
18    tail_position: AtomicUsize,
19}
20
21/// List queue receive handle
22pub(crate) struct Rx<T> {
23    /// Pointer to the block being processed.
24    head: NonNull<Block<T>>,
25
26    /// Next slot index to process.
27    index: usize,
28
29    /// Pointer to the next block pending release.
30    free_head: NonNull<Block<T>>,
31}
32
33/// Return value of `Rx::try_pop`.
34pub(crate) enum TryPopResult<T> {
35    /// Successfully popped a value.
36    Ok(T),
37    /// The channel is empty.
38    ///
39    /// Note that `list.rs` only tracks the close state set by senders. If the
40    /// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still
41    /// returned, and the close state needs to be handled by `chan.rs`.
42    Empty,
43    /// The channel is empty and closed.
44    ///
45    /// Returned when the send half is closed (all senders dropped).
46    Closed,
47    /// The channel is not empty, but the first value is being written.
48    Busy,
49}
50
51pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
52    // Create the initial block shared between the tx and rx halves.
53    let initial_block = Block::new(0);
54    let initial_block_ptr = Box::into_raw(initial_block);
55
56    let tx = Tx {
57        block_tail: AtomicPtr::new(initial_block_ptr),
58        tail_position: AtomicUsize::new(0),
59    };
60
61    let head = NonNull::new(initial_block_ptr).unwrap();
62
63    let rx = Rx {
64        head,
65        index: 0,
66        free_head: head,
67    };
68
69    (tx, rx)
70}
71
72impl<T> Tx<T> {
73    /// Pushes a value into the list.
74    pub(crate) fn push(&self, value: T) {
75        // First, claim a slot for the value. `Acquire` is used here to
76        // synchronize with the `fetch_add` in `reclaim_blocks`.
77        let slot_index = self.tail_position.fetch_add(1, Acquire);
78
79        // Load the current block and write the value
80        let block = self.find_block(slot_index);
81
82        unsafe {
83            // Write the value to the block
84            block.as_ref().write(slot_index, value);
85        }
86    }
87
88    /// Closes the send half of the list.
89    ///
90    /// Similar process as pushing a value, but instead of writing the value &
91    /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
92    pub(crate) fn close(&self) {
93        // First, claim a slot for the value. This is the last slot that will be
94        // claimed.
95        let slot_index = self.tail_position.fetch_add(1, Acquire);
96
97        let block = self.find_block(slot_index);
98
99        unsafe { block.as_ref().tx_close() }
100    }
101
102    fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
103        // The start index of the block that contains `index`.
104        let start_index = block::start_index(slot_index);
105
106        // The index offset into the block
107        let offset = block::offset(slot_index);
108
109        // Load the current head of the block
110        let mut block_ptr = self.block_tail.load(Acquire);
111
112        let block = unsafe { &*block_ptr };
113
114        // Calculate the distance between the tail ptr and the target block
115        let distance = block.distance(start_index);
116
117        // Decide if this call to `find_block` should attempt to update the
118        // `block_tail` pointer.
119        //
120        // Updating `block_tail` is not always performed in order to reduce
121        // contention.
122        //
123        // When set, as the routine walks the linked list, it attempts to update
124        // `block_tail`. If the update cannot be performed, `try_updating_tail`
125        // is unset.
126        let mut try_updating_tail = distance > offset;
127
128        // Walk the linked list of blocks until the block with `start_index` is
129        // found.
130        loop {
131            let block = unsafe { &(*block_ptr) };
132
133            if block.is_at_index(start_index) {
134                return unsafe { NonNull::new_unchecked(block_ptr) };
135            }
136
137            let next_block = block
138                .load_next(Acquire)
139                // There is no allocated next block, grow the linked list.
140                .unwrap_or_else(|| block.grow());
141
142            // If the block is **not** final, then the tail pointer cannot be
143            // advanced any more.
144            try_updating_tail &= block.is_final();
145
146            if try_updating_tail {
147                // Advancing `block_tail` must happen when walking the linked
148                // list. `block_tail` may not advance passed any blocks that are
149                // not "final". At the point a block is finalized, it is unknown
150                // if there are any prior blocks that are unfinalized, which
151                // makes it impossible to advance `block_tail`.
152                //
153                // While walking the linked list, `block_tail` can be advanced
154                // as long as finalized blocks are traversed.
155                //
156                // Release ordering is used to ensure that any subsequent reads
157                // are able to see the memory pointed to by `block_tail`.
158                //
159                // Acquire is not needed as any "actual" value is not accessed.
160                // At this point, the linked list is walked to acquire blocks.
161                if self
162                    .block_tail
163                    .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
164                    .is_ok()
165                {
166                    // Synchronize with any senders
167                    let tail_position = self.tail_position.fetch_add(0, Release);
168
169                    unsafe {
170                        block.tx_release(tail_position);
171                    }
172                } else {
173                    // A concurrent sender is also working on advancing
174                    // `block_tail` and this thread is falling behind.
175                    //
176                    // Stop trying to advance the tail pointer
177                    try_updating_tail = false;
178                }
179            }
180
181            block_ptr = next_block.as_ptr();
182
183            thread::yield_now();
184        }
185    }
186
187    /// # Safety
188    ///
189    /// Behavior is undefined if any of the following conditions are violated:
190    ///
191    /// - The `block` was created by [`Box::into_raw`].
192    /// - The `block` is not currently part of any linked list.
193    /// - The `block` is a valid pointer to a [`Block<T>`].
194    pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
195        // The block has been removed from the linked list and ownership
196        // is reclaimed.
197        //
198        // Before dropping the block, see if it can be reused by
199        // inserting it back at the end of the linked list.
200        //
201        // First, reset the data
202        //
203        // Safety: caller guarantees the block is valid and not in any list.
204        unsafe {
205            block.as_mut().reclaim();
206        }
207
208        let mut reused = false;
209
210        // Attempt to insert the block at the end
211        //
212        // Walk at most three times
213        let curr_ptr = self.block_tail.load(Acquire);
214
215        // The pointer can never be null
216        debug_assert!(!curr_ptr.is_null());
217
218        // Safety: curr_ptr is never null.
219        let mut curr = unsafe { NonNull::new_unchecked(curr_ptr) };
220
221        // TODO: Unify this logic with Block::grow
222        for _ in 0..3 {
223            match unsafe { curr.as_ref().try_push(&mut block, AcqRel, Acquire) } {
224                Ok(()) => {
225                    reused = true;
226                    break;
227                }
228                Err(next) => {
229                    curr = next;
230                }
231            }
232        }
233
234        if !reused {
235            // Safety:
236            //
237            // 1. Caller guarantees the block is valid and not in any list.
238            // 2. The block was created by `Box::into_raw`.
239            let _ = unsafe { Box::from_raw(block.as_ptr()) };
240        }
241    }
242}
243
244impl<T> fmt::Debug for Tx<T> {
245    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
246        fmt.debug_struct("Tx")
247            .field("block_tail", &self.block_tail.load(Relaxed))
248            .field("tail_position", &self.tail_position.load(Relaxed))
249            .finish()
250    }
251}
252
253impl<T> Rx<T> {
254    pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
255        let block = unsafe { self.head.as_ref() };
256        if block.has_value(self.index) {
257            return false;
258        }
259
260        // It is possible that a block has no value "now" but the list is still not empty.
261        // To be sure, it is necessary to check the length of the list.
262        self.len(tx) == 0
263    }
264
265    // Guaranteed to return true if `slot_index` is the fake message sent on channel close.
266    // Guaranteed to return false if `slot_index` is a fully sent message.
267    //
268    // For messages that are partially sent, may return either true or false.
269    fn is_maybe_closed(&self, tx: &Tx<T>, slot_index: usize) -> bool {
270        let start_index = block::start_index(slot_index);
271
272        let tail = tx.block_tail.load(Acquire);
273        // SAFETY: Only the receiver frees blocks, so since we are the receiver, this will not be
274        // freed right now.
275        let tail_ref = unsafe { &*tail };
276        if tail_ref.is_at_index(start_index) {
277            return !tail_ref.has_value(slot_index);
278        }
279
280        // This method is optimized for checking whether the last value is present, so most of the
281        // time it is in `block_tail`. However, this isn't always the case since it's possible
282        // that the list was grown with an empty block, in which case `block_tail` points one block
283        // too far. To handle this case, we walk the list from the head.
284        let mut block_ptr = Some(self.head);
285
286        while let Some(block) = block_ptr {
287            // SAFETY: Only the receiver frees blocks, so since we are the receiver, this will not
288            // be freed right now.
289            let block_ref = unsafe { block.as_ref() };
290            if block_ref.is_at_index(start_index) {
291                return !block_ref.has_value(slot_index);
292            }
293            block_ptr = block_ref.load_next(Acquire);
294        }
295        true
296    }
297
298    pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
299        let tail_position = tx.tail_position.load(Acquire);
300        let mut len = tail_position.wrapping_sub(self.index);
301        debug_assert!(0 <= len as isize);
302        if len == 0 {
303            return 0;
304        }
305        // There are messages present in the queue. However, it's possible that the last message is
306        // a fake "closed" message that we do not wish to count. To avoid counting it, we do not
307        // count the last message if the ready bit is unset.
308        //
309        // Note that it is also possible for the ready bit to be unset on a normal message, but
310        // this happens only if that message is currently being sent *right now* in parallel on
311        // another thread. That is okay because it is optional to count messages that are currently
312        // being sent.
313        if self.is_maybe_closed(tx, tail_position.wrapping_sub(1)) {
314            len -= 1;
315        }
316        len
317    }
318
319    /// Pops the next value off the queue.
320    pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
321        // Advance `head`, if needed
322        if !self.try_advancing_head() {
323            return None;
324        }
325
326        self.reclaim_blocks(tx);
327
328        unsafe {
329            let block = self.head.as_ref();
330
331            let ret = block.read(self.index);
332
333            if let Some(block::Read::Value(..)) = ret {
334                self.index = self.index.wrapping_add(1);
335            }
336
337            ret
338        }
339    }
340
341    /// Pops the next value off the queue, detecting whether the block
342    /// is busy or empty on failure.
343    ///
344    /// This function exists because `Rx::pop` can return `None` even if the
345    /// channel's queue contains a message that has been completely written.
346    /// This can happen if the fully delivered message is behind another message
347    /// that is in the middle of being written to the block, since the channel
348    /// can't return the messages out of order.
349    pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
350        let tail_position = tx.tail_position.load(Acquire);
351        let result = self.pop(tx);
352
353        match result {
354            Some(block::Read::Value(t)) => TryPopResult::Ok(t),
355            Some(block::Read::Closed) => TryPopResult::Closed,
356            None if tail_position == self.index => TryPopResult::Empty,
357            None => TryPopResult::Busy,
358        }
359    }
360
361    /// Tries advancing the block pointer to the block referenced by `self.index`.
362    ///
363    /// Returns `true` if successful, `false` if there is no next block to load.
364    fn try_advancing_head(&mut self) -> bool {
365        let block_index = block::start_index(self.index);
366
367        loop {
368            let next_block = {
369                let block = unsafe { self.head.as_ref() };
370
371                if block.is_at_index(block_index) {
372                    return true;
373                }
374
375                block.load_next(Acquire)
376            };
377
378            let next_block = match next_block {
379                Some(next_block) => next_block,
380                None => {
381                    return false;
382                }
383            };
384
385            self.head = next_block;
386
387            thread::yield_now();
388        }
389    }
390
391    fn reclaim_blocks(&mut self, tx: &Tx<T>) {
392        while self.free_head != self.head {
393            unsafe {
394                // Get a handle to the block that will be freed and update
395                // `free_head` to point to the next block.
396                let block = self.free_head;
397
398                let observed_tail_position = block.as_ref().observed_tail_position();
399
400                let required_index = match observed_tail_position {
401                    Some(i) => i,
402                    None => return,
403                };
404
405                if required_index > self.index {
406                    return;
407                }
408
409                // We may read the next pointer with `Relaxed` ordering as it is
410                // guaranteed that the `reclaim_blocks` routine trails the `recv`
411                // routine. Any memory accessed by `reclaim_blocks` has already
412                // been acquired by `recv`.
413                let next_block = block.as_ref().load_next(Relaxed);
414
415                // Update the free list head
416                self.free_head = next_block.unwrap();
417
418                // Push the emptied block onto the back of the queue, making it
419                // available to senders.
420                tx.reclaim_block(block);
421            }
422
423            thread::yield_now();
424        }
425    }
426
427    /// Effectively `Drop` all the blocks. Should only be called once, when
428    /// the list is dropping.
429    pub(super) unsafe fn free_blocks(&mut self) {
430        debug_assert_ne!(self.free_head, NonNull::dangling());
431
432        let mut cur = Some(self.free_head);
433
434        #[cfg(debug_assertions)]
435        {
436            // to trigger the debug assert above so as to catch that we
437            // don't call `free_blocks` more than once.
438            self.free_head = NonNull::dangling();
439            self.head = NonNull::dangling();
440        }
441
442        while let Some(block) = cur {
443            cur = unsafe { block.as_ref() }.load_next(Relaxed);
444            drop(unsafe { Box::from_raw(block.as_ptr()) });
445        }
446    }
447}
448
449impl<T> fmt::Debug for Rx<T> {
450    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
451        fmt.debug_struct("Rx")
452            .field("head", &self.head)
453            .field("index", &self.index)
454            .field("free_head", &self.free_head)
455            .finish()
456    }
457}