tokio/sync/mpsc/list.rs
1//! A concurrent, lock-free, FIFO list.
2
3use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4use crate::loom::thread;
5use crate::sync::mpsc::block::{self, Block};
6
7use std::fmt;
8use std::ptr::NonNull;
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10
11/// List queue transmit handle.
12pub(crate) struct Tx<T> {
13 /// Tail in the `Block` mpmc list.
14 block_tail: AtomicPtr<Block<T>>,
15
16 /// Position to push the next message. This references a block and offset
17 /// into the block.
18 tail_position: AtomicUsize,
19}
20
21/// List queue receive handle
22pub(crate) struct Rx<T> {
23 /// Pointer to the block being processed.
24 head: NonNull<Block<T>>,
25
26 /// Next slot index to process.
27 index: usize,
28
29 /// Pointer to the next block pending release.
30 free_head: NonNull<Block<T>>,
31}
32
33/// Return value of `Rx::try_pop`.
34pub(crate) enum TryPopResult<T> {
35 /// Successfully popped a value.
36 Ok(T),
37 /// The channel is empty.
38 ///
39 /// Note that `list.rs` only tracks the close state set by senders. If the
40 /// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still
41 /// returned, and the close state needs to be handled by `chan.rs`.
42 Empty,
43 /// The channel is empty and closed.
44 ///
45 /// Returned when the send half is closed (all senders dropped).
46 Closed,
47 /// The channel is not empty, but the first value is being written.
48 Busy,
49}
50
51pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
52 // Create the initial block shared between the tx and rx halves.
53 let initial_block = Block::new(0);
54 let initial_block_ptr = Box::into_raw(initial_block);
55
56 let tx = Tx {
57 block_tail: AtomicPtr::new(initial_block_ptr),
58 tail_position: AtomicUsize::new(0),
59 };
60
61 let head = NonNull::new(initial_block_ptr).unwrap();
62
63 let rx = Rx {
64 head,
65 index: 0,
66 free_head: head,
67 };
68
69 (tx, rx)
70}
71
72impl<T> Tx<T> {
73 /// Pushes a value into the list.
74 pub(crate) fn push(&self, value: T) {
75 // First, claim a slot for the value. `Acquire` is used here to
76 // synchronize with the `fetch_add` in `reclaim_blocks`.
77 let slot_index = self.tail_position.fetch_add(1, Acquire);
78
79 // Load the current block and write the value
80 let block = self.find_block(slot_index);
81
82 unsafe {
83 // Write the value to the block
84 block.as_ref().write(slot_index, value);
85 }
86 }
87
88 /// Closes the send half of the list.
89 ///
90 /// Similar process as pushing a value, but instead of writing the value &
91 /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
92 pub(crate) fn close(&self) {
93 // First, claim a slot for the value. This is the last slot that will be
94 // claimed.
95 let slot_index = self.tail_position.fetch_add(1, Acquire);
96
97 let block = self.find_block(slot_index);
98
99 unsafe { block.as_ref().tx_close() }
100 }
101
102 fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
103 // The start index of the block that contains `index`.
104 let start_index = block::start_index(slot_index);
105
106 // The index offset into the block
107 let offset = block::offset(slot_index);
108
109 // Load the current head of the block
110 let mut block_ptr = self.block_tail.load(Acquire);
111
112 let block = unsafe { &*block_ptr };
113
114 // Calculate the distance between the tail ptr and the target block
115 let distance = block.distance(start_index);
116
117 // Decide if this call to `find_block` should attempt to update the
118 // `block_tail` pointer.
119 //
120 // Updating `block_tail` is not always performed in order to reduce
121 // contention.
122 //
123 // When set, as the routine walks the linked list, it attempts to update
124 // `block_tail`. If the update cannot be performed, `try_updating_tail`
125 // is unset.
126 let mut try_updating_tail = distance > offset;
127
128 // Walk the linked list of blocks until the block with `start_index` is
129 // found.
130 loop {
131 let block = unsafe { &(*block_ptr) };
132
133 if block.is_at_index(start_index) {
134 return unsafe { NonNull::new_unchecked(block_ptr) };
135 }
136
137 let next_block = block
138 .load_next(Acquire)
139 // There is no allocated next block, grow the linked list.
140 .unwrap_or_else(|| block.grow());
141
142 // If the block is **not** final, then the tail pointer cannot be
143 // advanced any more.
144 try_updating_tail &= block.is_final();
145
146 if try_updating_tail {
147 // Advancing `block_tail` must happen when walking the linked
148 // list. `block_tail` may not advance passed any blocks that are
149 // not "final". At the point a block is finalized, it is unknown
150 // if there are any prior blocks that are unfinalized, which
151 // makes it impossible to advance `block_tail`.
152 //
153 // While walking the linked list, `block_tail` can be advanced
154 // as long as finalized blocks are traversed.
155 //
156 // Release ordering is used to ensure that any subsequent reads
157 // are able to see the memory pointed to by `block_tail`.
158 //
159 // Acquire is not needed as any "actual" value is not accessed.
160 // At this point, the linked list is walked to acquire blocks.
161 if self
162 .block_tail
163 .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
164 .is_ok()
165 {
166 // Synchronize with any senders
167 let tail_position = self.tail_position.fetch_add(0, Release);
168
169 unsafe {
170 block.tx_release(tail_position);
171 }
172 } else {
173 // A concurrent sender is also working on advancing
174 // `block_tail` and this thread is falling behind.
175 //
176 // Stop trying to advance the tail pointer
177 try_updating_tail = false;
178 }
179 }
180
181 block_ptr = next_block.as_ptr();
182
183 thread::yield_now();
184 }
185 }
186
187 /// # Safety
188 ///
189 /// Behavior is undefined if any of the following conditions are violated:
190 ///
191 /// - The `block` was created by [`Box::into_raw`].
192 /// - The `block` is not currently part of any linked list.
193 /// - The `block` is a valid pointer to a [`Block<T>`].
194 pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
195 // The block has been removed from the linked list and ownership
196 // is reclaimed.
197 //
198 // Before dropping the block, see if it can be reused by
199 // inserting it back at the end of the linked list.
200 //
201 // First, reset the data
202 //
203 // Safety: caller guarantees the block is valid and not in any list.
204 unsafe {
205 block.as_mut().reclaim();
206 }
207
208 let mut reused = false;
209
210 // Attempt to insert the block at the end
211 //
212 // Walk at most three times
213 let curr_ptr = self.block_tail.load(Acquire);
214
215 // The pointer can never be null
216 debug_assert!(!curr_ptr.is_null());
217
218 // Safety: curr_ptr is never null.
219 let mut curr = unsafe { NonNull::new_unchecked(curr_ptr) };
220
221 // TODO: Unify this logic with Block::grow
222 for _ in 0..3 {
223 match unsafe { curr.as_ref().try_push(&mut block, AcqRel, Acquire) } {
224 Ok(()) => {
225 reused = true;
226 break;
227 }
228 Err(next) => {
229 curr = next;
230 }
231 }
232 }
233
234 if !reused {
235 // Safety:
236 //
237 // 1. Caller guarantees the block is valid and not in any list.
238 // 2. The block was created by `Box::into_raw`.
239 let _ = unsafe { Box::from_raw(block.as_ptr()) };
240 }
241 }
242}
243
244impl<T> fmt::Debug for Tx<T> {
245 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
246 fmt.debug_struct("Tx")
247 .field("block_tail", &self.block_tail.load(Relaxed))
248 .field("tail_position", &self.tail_position.load(Relaxed))
249 .finish()
250 }
251}
252
253impl<T> Rx<T> {
254 pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
255 let block = unsafe { self.head.as_ref() };
256 if block.has_value(self.index) {
257 return false;
258 }
259
260 // It is possible that a block has no value "now" but the list is still not empty.
261 // To be sure, it is necessary to check the length of the list.
262 self.len(tx) == 0
263 }
264
265 // Guaranteed to return true if `slot_index` is the fake message sent on channel close.
266 // Guaranteed to return false if `slot_index` is a fully sent message.
267 //
268 // For messages that are partially sent, may return either true or false.
269 fn is_maybe_closed(&self, tx: &Tx<T>, slot_index: usize) -> bool {
270 let start_index = block::start_index(slot_index);
271
272 let tail = tx.block_tail.load(Acquire);
273 // SAFETY: Only the receiver frees blocks, so since we are the receiver, this will not be
274 // freed right now.
275 let tail_ref = unsafe { &*tail };
276 if tail_ref.is_at_index(start_index) {
277 return !tail_ref.has_value(slot_index);
278 }
279
280 // This method is optimized for checking whether the last value is present, so most of the
281 // time it is in `block_tail`. However, this isn't always the case since it's possible
282 // that the list was grown with an empty block, in which case `block_tail` points one block
283 // too far. To handle this case, we walk the list from the head.
284 let mut block_ptr = Some(self.head);
285
286 while let Some(block) = block_ptr {
287 // SAFETY: Only the receiver frees blocks, so since we are the receiver, this will not
288 // be freed right now.
289 let block_ref = unsafe { block.as_ref() };
290 if block_ref.is_at_index(start_index) {
291 return !block_ref.has_value(slot_index);
292 }
293 block_ptr = block_ref.load_next(Acquire);
294 }
295 true
296 }
297
298 pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
299 let tail_position = tx.tail_position.load(Acquire);
300 let mut len = tail_position.wrapping_sub(self.index);
301 debug_assert!(0 <= len as isize);
302 if len == 0 {
303 return 0;
304 }
305 // There are messages present in the queue. However, it's possible that the last message is
306 // a fake "closed" message that we do not wish to count. To avoid counting it, we do not
307 // count the last message if the ready bit is unset.
308 //
309 // Note that it is also possible for the ready bit to be unset on a normal message, but
310 // this happens only if that message is currently being sent *right now* in parallel on
311 // another thread. That is okay because it is optional to count messages that are currently
312 // being sent.
313 if self.is_maybe_closed(tx, tail_position.wrapping_sub(1)) {
314 len -= 1;
315 }
316 len
317 }
318
319 /// Pops the next value off the queue.
320 pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
321 // Advance `head`, if needed
322 if !self.try_advancing_head() {
323 return None;
324 }
325
326 self.reclaim_blocks(tx);
327
328 unsafe {
329 let block = self.head.as_ref();
330
331 let ret = block.read(self.index);
332
333 if let Some(block::Read::Value(..)) = ret {
334 self.index = self.index.wrapping_add(1);
335 }
336
337 ret
338 }
339 }
340
341 /// Pops the next value off the queue, detecting whether the block
342 /// is busy or empty on failure.
343 ///
344 /// This function exists because `Rx::pop` can return `None` even if the
345 /// channel's queue contains a message that has been completely written.
346 /// This can happen if the fully delivered message is behind another message
347 /// that is in the middle of being written to the block, since the channel
348 /// can't return the messages out of order.
349 pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
350 let tail_position = tx.tail_position.load(Acquire);
351 let result = self.pop(tx);
352
353 match result {
354 Some(block::Read::Value(t)) => TryPopResult::Ok(t),
355 Some(block::Read::Closed) => TryPopResult::Closed,
356 None if tail_position == self.index => TryPopResult::Empty,
357 None => TryPopResult::Busy,
358 }
359 }
360
361 /// Tries advancing the block pointer to the block referenced by `self.index`.
362 ///
363 /// Returns `true` if successful, `false` if there is no next block to load.
364 fn try_advancing_head(&mut self) -> bool {
365 let block_index = block::start_index(self.index);
366
367 loop {
368 let next_block = {
369 let block = unsafe { self.head.as_ref() };
370
371 if block.is_at_index(block_index) {
372 return true;
373 }
374
375 block.load_next(Acquire)
376 };
377
378 let next_block = match next_block {
379 Some(next_block) => next_block,
380 None => {
381 return false;
382 }
383 };
384
385 self.head = next_block;
386
387 thread::yield_now();
388 }
389 }
390
391 fn reclaim_blocks(&mut self, tx: &Tx<T>) {
392 while self.free_head != self.head {
393 unsafe {
394 // Get a handle to the block that will be freed and update
395 // `free_head` to point to the next block.
396 let block = self.free_head;
397
398 let observed_tail_position = block.as_ref().observed_tail_position();
399
400 let required_index = match observed_tail_position {
401 Some(i) => i,
402 None => return,
403 };
404
405 if required_index > self.index {
406 return;
407 }
408
409 // We may read the next pointer with `Relaxed` ordering as it is
410 // guaranteed that the `reclaim_blocks` routine trails the `recv`
411 // routine. Any memory accessed by `reclaim_blocks` has already
412 // been acquired by `recv`.
413 let next_block = block.as_ref().load_next(Relaxed);
414
415 // Update the free list head
416 self.free_head = next_block.unwrap();
417
418 // Push the emptied block onto the back of the queue, making it
419 // available to senders.
420 tx.reclaim_block(block);
421 }
422
423 thread::yield_now();
424 }
425 }
426
427 /// Effectively `Drop` all the blocks. Should only be called once, when
428 /// the list is dropping.
429 pub(super) unsafe fn free_blocks(&mut self) {
430 debug_assert_ne!(self.free_head, NonNull::dangling());
431
432 let mut cur = Some(self.free_head);
433
434 #[cfg(debug_assertions)]
435 {
436 // to trigger the debug assert above so as to catch that we
437 // don't call `free_blocks` more than once.
438 self.free_head = NonNull::dangling();
439 self.head = NonNull::dangling();
440 }
441
442 while let Some(block) = cur {
443 cur = unsafe { block.as_ref() }.load_next(Relaxed);
444 drop(unsafe { Box::from_raw(block.as_ptr()) });
445 }
446 }
447}
448
449impl<T> fmt::Debug for Rx<T> {
450 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
451 fmt.debug_struct("Rx")
452 .field("head", &self.head)
453 .field("index", &self.index)
454 .field("free_head", &self.free_head)
455 .finish()
456 }
457}