tokio/sync/mpsc/list.rs
1//! A concurrent, lock-free, FIFO list.
2
3use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4use crate::loom::thread;
5use crate::sync::mpsc::block::{self, Block};
6
7use std::fmt;
8use std::ptr::NonNull;
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10
11/// List queue transmit handle.
12pub(crate) struct Tx<T> {
13 /// Tail in the `Block` mpmc list.
14 block_tail: AtomicPtr<Block<T>>,
15
16 /// Position to push the next message. This references a block and offset
17 /// into the block.
18 tail_position: AtomicUsize,
19}
20
21/// List queue receive handle
22pub(crate) struct Rx<T> {
23 /// Pointer to the block being processed.
24 head: NonNull<Block<T>>,
25
26 /// Next slot index to process.
27 index: usize,
28
29 /// Pointer to the next block pending release.
30 free_head: NonNull<Block<T>>,
31}
32
33/// Return value of `Rx::try_pop`.
34pub(crate) enum TryPopResult<T> {
35 /// Successfully popped a value.
36 Ok(T),
37 /// The channel is empty.
38 ///
39 /// Note that `list.rs` only tracks the close state set by senders. If the
40 /// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still
41 /// returned, and the close state needs to be handled by `chan.rs`.
42 Empty,
43 /// The channel is empty and closed.
44 ///
45 /// Returned when the send half is closed (all senders dropped).
46 Closed,
47 /// The channel is not empty, but the first value is being written.
48 Busy,
49}
50
51pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
52 // Create the initial block shared between the tx and rx halves.
53 let initial_block = Block::new(0);
54 let initial_block_ptr = Box::into_raw(initial_block);
55
56 let tx = Tx {
57 block_tail: AtomicPtr::new(initial_block_ptr),
58 tail_position: AtomicUsize::new(0),
59 };
60
61 let head = NonNull::new(initial_block_ptr).unwrap();
62
63 let rx = Rx {
64 head,
65 index: 0,
66 free_head: head,
67 };
68
69 (tx, rx)
70}
71
72impl<T> Tx<T> {
73 /// Pushes a value into the list.
74 pub(crate) fn push(&self, value: T) {
75 // First, claim a slot for the value. `Acquire` is used here to
76 // synchronize with the `fetch_add` in `reclaim_blocks`.
77 let slot_index = self.tail_position.fetch_add(1, Acquire);
78
79 // Load the current block and write the value
80 let block = self.find_block(slot_index);
81
82 unsafe {
83 // Write the value to the block
84 block.as_ref().write(slot_index, value);
85 }
86 }
87
88 /// Closes the send half of the list.
89 ///
90 /// Similar process as pushing a value, but instead of writing the value &
91 /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
92 pub(crate) fn close(&self) {
93 // First, claim a slot for the value. This is the last slot that will be
94 // claimed.
95 let slot_index = self.tail_position.fetch_add(1, Acquire);
96
97 let block = self.find_block(slot_index);
98
99 unsafe { block.as_ref().tx_close() }
100 }
101
102 fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
103 // The start index of the block that contains `index`.
104 let start_index = block::start_index(slot_index);
105
106 // The index offset into the block
107 let offset = block::offset(slot_index);
108
109 // Load the current head of the block
110 let mut block_ptr = self.block_tail.load(Acquire);
111
112 let block = unsafe { &*block_ptr };
113
114 // Calculate the distance between the tail ptr and the target block
115 let distance = block.distance(start_index);
116
117 // Decide if this call to `find_block` should attempt to update the
118 // `block_tail` pointer.
119 //
120 // Updating `block_tail` is not always performed in order to reduce
121 // contention.
122 //
123 // When set, as the routine walks the linked list, it attempts to update
124 // `block_tail`. If the update cannot be performed, `try_updating_tail`
125 // is unset.
126 let mut try_updating_tail = distance > offset;
127
128 // Walk the linked list of blocks until the block with `start_index` is
129 // found.
130 loop {
131 let block = unsafe { &(*block_ptr) };
132
133 if block.is_at_index(start_index) {
134 return unsafe { NonNull::new_unchecked(block_ptr) };
135 }
136
137 let next_block = block
138 .load_next(Acquire)
139 // There is no allocated next block, grow the linked list.
140 .unwrap_or_else(|| block.grow());
141
142 // If the block is **not** final, then the tail pointer cannot be
143 // advanced any more.
144 try_updating_tail &= block.is_final();
145
146 if try_updating_tail {
147 // Advancing `block_tail` must happen when walking the linked
148 // list. `block_tail` may not advance passed any blocks that are
149 // not "final". At the point a block is finalized, it is unknown
150 // if there are any prior blocks that are unfinalized, which
151 // makes it impossible to advance `block_tail`.
152 //
153 // While walking the linked list, `block_tail` can be advanced
154 // as long as finalized blocks are traversed.
155 //
156 // Release ordering is used to ensure that any subsequent reads
157 // are able to see the memory pointed to by `block_tail`.
158 //
159 // Acquire is not needed as any "actual" value is not accessed.
160 // At this point, the linked list is walked to acquire blocks.
161 if self
162 .block_tail
163 .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
164 .is_ok()
165 {
166 // Synchronize with any senders
167 let tail_position = self.tail_position.fetch_add(0, Release);
168
169 unsafe {
170 block.tx_release(tail_position);
171 }
172 } else {
173 // A concurrent sender is also working on advancing
174 // `block_tail` and this thread is falling behind.
175 //
176 // Stop trying to advance the tail pointer
177 try_updating_tail = false;
178 }
179 }
180
181 block_ptr = next_block.as_ptr();
182
183 thread::yield_now();
184 }
185 }
186
187 /// # Safety
188 ///
189 /// Behavior is undefined if any of the following conditions are violated:
190 ///
191 /// - The `block` was created by [`Box::into_raw`].
192 /// - The `block` is not currently part of any linked list.
193 /// - The `block` is a valid pointer to a [`Block<T>`].
194 pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
195 // The block has been removed from the linked list and ownership
196 // is reclaimed.
197 //
198 // Before dropping the block, see if it can be reused by
199 // inserting it back at the end of the linked list.
200 //
201 // First, reset the data
202 //
203 // Safety: caller guarantees the block is valid and not in any list.
204 unsafe {
205 block.as_mut().reclaim();
206 }
207
208 let mut reused = false;
209
210 // Attempt to insert the block at the end
211 //
212 // Walk at most three times
213 let curr_ptr = self.block_tail.load(Acquire);
214
215 // The pointer can never be null
216 debug_assert!(!curr_ptr.is_null());
217
218 // Safety: curr_ptr is never null.
219 let mut curr = unsafe { NonNull::new_unchecked(curr_ptr) };
220
221 // TODO: Unify this logic with Block::grow
222 for _ in 0..3 {
223 match unsafe { curr.as_ref().try_push(&mut block, AcqRel, Acquire) } {
224 Ok(()) => {
225 reused = true;
226 break;
227 }
228 Err(next) => {
229 curr = next;
230 }
231 }
232 }
233
234 if !reused {
235 // Safety:
236 //
237 // 1. Caller guarantees the block is valid and not in any list.
238 // 2. The block was created by `Box::into_raw`.
239 let _ = unsafe { Box::from_raw(block.as_ptr()) };
240 }
241 }
242
243 pub(crate) fn is_closed(&self) -> bool {
244 let tail = self.block_tail.load(Acquire);
245
246 unsafe {
247 let tail_block = &*tail;
248 tail_block.is_closed()
249 }
250 }
251}
252
253impl<T> fmt::Debug for Tx<T> {
254 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
255 fmt.debug_struct("Tx")
256 .field("block_tail", &self.block_tail.load(Relaxed))
257 .field("tail_position", &self.tail_position.load(Relaxed))
258 .finish()
259 }
260}
261
262impl<T> Rx<T> {
263 pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
264 let block = unsafe { self.head.as_ref() };
265 if block.has_value(self.index) {
266 return false;
267 }
268
269 // It is possible that a block has no value "now" but the list is still not empty.
270 // To be sure, it is necessary to check the length of the list.
271 self.len(tx) == 0
272 }
273
274 pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
275 // When all the senders are dropped, there will be a last block in the tail position,
276 // but it will be closed
277 let tail_position = tx.tail_position.load(Acquire);
278 tail_position - self.index - (tx.is_closed() as usize)
279 }
280
281 /// Pops the next value off the queue.
282 pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
283 // Advance `head`, if needed
284 if !self.try_advancing_head() {
285 return None;
286 }
287
288 self.reclaim_blocks(tx);
289
290 unsafe {
291 let block = self.head.as_ref();
292
293 let ret = block.read(self.index);
294
295 if let Some(block::Read::Value(..)) = ret {
296 self.index = self.index.wrapping_add(1);
297 }
298
299 ret
300 }
301 }
302
303 /// Pops the next value off the queue, detecting whether the block
304 /// is busy or empty on failure.
305 ///
306 /// This function exists because `Rx::pop` can return `None` even if the
307 /// channel's queue contains a message that has been completely written.
308 /// This can happen if the fully delivered message is behind another message
309 /// that is in the middle of being written to the block, since the channel
310 /// can't return the messages out of order.
311 pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
312 let tail_position = tx.tail_position.load(Acquire);
313 let result = self.pop(tx);
314
315 match result {
316 Some(block::Read::Value(t)) => TryPopResult::Ok(t),
317 Some(block::Read::Closed) => TryPopResult::Closed,
318 None if tail_position == self.index => TryPopResult::Empty,
319 None => TryPopResult::Busy,
320 }
321 }
322
323 /// Tries advancing the block pointer to the block referenced by `self.index`.
324 ///
325 /// Returns `true` if successful, `false` if there is no next block to load.
326 fn try_advancing_head(&mut self) -> bool {
327 let block_index = block::start_index(self.index);
328
329 loop {
330 let next_block = {
331 let block = unsafe { self.head.as_ref() };
332
333 if block.is_at_index(block_index) {
334 return true;
335 }
336
337 block.load_next(Acquire)
338 };
339
340 let next_block = match next_block {
341 Some(next_block) => next_block,
342 None => {
343 return false;
344 }
345 };
346
347 self.head = next_block;
348
349 thread::yield_now();
350 }
351 }
352
353 fn reclaim_blocks(&mut self, tx: &Tx<T>) {
354 while self.free_head != self.head {
355 unsafe {
356 // Get a handle to the block that will be freed and update
357 // `free_head` to point to the next block.
358 let block = self.free_head;
359
360 let observed_tail_position = block.as_ref().observed_tail_position();
361
362 let required_index = match observed_tail_position {
363 Some(i) => i,
364 None => return,
365 };
366
367 if required_index > self.index {
368 return;
369 }
370
371 // We may read the next pointer with `Relaxed` ordering as it is
372 // guaranteed that the `reclaim_blocks` routine trails the `recv`
373 // routine. Any memory accessed by `reclaim_blocks` has already
374 // been acquired by `recv`.
375 let next_block = block.as_ref().load_next(Relaxed);
376
377 // Update the free list head
378 self.free_head = next_block.unwrap();
379
380 // Push the emptied block onto the back of the queue, making it
381 // available to senders.
382 tx.reclaim_block(block);
383 }
384
385 thread::yield_now();
386 }
387 }
388
389 /// Effectively `Drop` all the blocks. Should only be called once, when
390 /// the list is dropping.
391 pub(super) unsafe fn free_blocks(&mut self) {
392 debug_assert_ne!(self.free_head, NonNull::dangling());
393
394 let mut cur = Some(self.free_head);
395
396 #[cfg(debug_assertions)]
397 {
398 // to trigger the debug assert above so as to catch that we
399 // don't call `free_blocks` more than once.
400 self.free_head = NonNull::dangling();
401 self.head = NonNull::dangling();
402 }
403
404 while let Some(block) = cur {
405 cur = unsafe { block.as_ref() }.load_next(Relaxed);
406 drop(unsafe { Box::from_raw(block.as_ptr()) });
407 }
408 }
409}
410
411impl<T> fmt::Debug for Rx<T> {
412 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413 fmt.debug_struct("Rx")
414 .field("head", &self.head)
415 .field("index", &self.index)
416 .field("free_head", &self.free_head)
417 .finish()
418 }
419}