tokio/util/
idle_notified_set.rs

1//! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2//! Each element is intended to correspond to a task, and the collection will
3//! keep track of which tasks have had their waker notified, and which have not.
4//!
5//! Each entry in the set holds some user-specified value. The value's type is
6//! specified using the `T` parameter. It will usually be a `JoinHandle` or
7//! similar.
8
9use std::marker::PhantomPinned;
10use std::mem::ManuallyDrop;
11use std::ptr::NonNull;
12use std::task::{Context, Waker};
13
14use crate::loom::cell::UnsafeCell;
15use crate::loom::sync::{Arc, Mutex};
16use crate::util::linked_list::{self, Link};
17use crate::util::{waker_ref, Wake};
18
19type LinkedList<T> =
20    linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21
22/// This is the main handle to the collection.
23pub(crate) struct IdleNotifiedSet<T> {
24    lists: Arc<Lists<T>>,
25    length: usize,
26}
27
28/// A handle to an entry that is guaranteed to be stored in the idle or notified
29/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30/// mutably to prevent the entry from being moved to the `Neither` list, which
31/// only the `IdleNotifiedSet` may do.
32///
33/// The main consequence of being stored in one of the lists is that the `value`
34/// field has not yet been consumed.
35///
36/// Note: This entry can be moved from the idle to the notified list while this
37/// object exists by waking its waker.
38pub(crate) struct EntryInOneOfTheLists<'a, T> {
39    entry: Arc<ListEntry<T>>,
40    set: &'a mut IdleNotifiedSet<T>,
41}
42
43type Lists<T> = Mutex<ListsInner<T>>;
44
45/// The linked lists hold strong references to the `ListEntry` items, and the
46/// `ListEntry` items also hold a strong reference back to the Lists object, but
47/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48/// that object is destroyed, no ref-cycles will remain.
49struct ListsInner<T> {
50    notified: LinkedList<T>,
51    idle: LinkedList<T>,
52    /// Whenever an element in the `notified` list is woken, this waker will be
53    /// notified and consumed, if it exists.
54    waker: Option<Waker>,
55}
56
57/// Which of the two lists in the shared Lists object is this entry stored in?
58///
59/// If the value is `Idle`, then an entry's waker may move it to the notified
60/// list. Otherwise, only the `IdleNotifiedSet` may move it.
61///
62/// If the value is `Neither`, then it is still possible that the entry is in
63/// some third external list (this happens in `drain`).
64#[derive(Copy, Clone, Eq, PartialEq)]
65enum List {
66    Notified,
67    Idle,
68    Neither,
69}
70
71/// An entry in the list.
72///
73/// # Safety
74///
75/// The `my_list` field must only be accessed while holding the mutex in
76/// `parent`. It is an invariant that the value of `my_list` corresponds to
77/// which linked list in the `parent` holds this entry. Once this field takes
78/// the value `Neither`, then it may never be modified again.
79///
80/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81/// must only be accessed while holding the mutex. If the value of `my_list` is
82/// `Neither`, then the `pointers` field may be accessed by the
83/// `IdleNotifiedSet` (this happens inside `drain`).
84///
85/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87/// `Neither` assumes ownership of the `value`, and it must either drop it or
88/// move it out from this entry to prevent it from getting leaked. (Since the
89/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90/// value should not be leaked.)
91struct ListEntry<T> {
92    /// The linked list pointers of the list this entry is in.
93    pointers: linked_list::Pointers<ListEntry<T>>,
94    /// Pointer to the shared `Lists` struct.
95    parent: Arc<Lists<T>>,
96    /// The value stored in this entry.
97    value: UnsafeCell<ManuallyDrop<T>>,
98    /// Used to remember which list this entry is in.
99    my_list: UnsafeCell<List>,
100    /// Required by the `linked_list::Pointers` field.
101    _pin: PhantomPinned,
102}
103
104generate_addr_of_methods! {
105    impl<T> ListEntry<T> {
106        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107            &self.pointers
108        }
109    }
110}
111
112// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113// the values.
114unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115// With the current API we strictly speaking don't even need `T: Sync`, but we
116// require it anyway to support adding &self APIs that access the values in the
117// future.
118unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119
120// These impls control when it is safe to create a Waker. Since the waker does
121// not allow access to the value in any way (including its destructor), it is
122// not necessary for `T` to be Send or Sync.
123unsafe impl<T> Send for ListEntry<T> {}
124unsafe impl<T> Sync for ListEntry<T> {}
125
126impl<T> IdleNotifiedSet<T> {
127    /// Create a new `IdleNotifiedSet`.
128    pub(crate) fn new() -> Self {
129        let lists = Mutex::new(ListsInner {
130            notified: LinkedList::new(),
131            idle: LinkedList::new(),
132            waker: None,
133        });
134
135        IdleNotifiedSet {
136            lists: Arc::new(lists),
137            length: 0,
138        }
139    }
140
141    pub(crate) fn len(&self) -> usize {
142        self.length
143    }
144
145    pub(crate) fn is_empty(&self) -> bool {
146        self.length == 0
147    }
148
149    /// Insert the given value into the `idle` list.
150    pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151        self.length += 1;
152
153        let entry = Arc::new(ListEntry {
154            parent: self.lists.clone(),
155            value: UnsafeCell::new(ManuallyDrop::new(value)),
156            my_list: UnsafeCell::new(List::Idle),
157            pointers: linked_list::Pointers::new(),
158            _pin: PhantomPinned,
159        });
160
161        {
162            let mut lock = self.lists.lock();
163            lock.idle.push_front(entry.clone());
164        }
165
166        // Safety: We just put the entry in the idle list, so it is in one of the lists.
167        EntryInOneOfTheLists { entry, set: self }
168    }
169
170    /// Pop an entry from the notified list to poll it. The entry is moved to
171    /// the idle list atomically.
172    pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173        // We don't decrement the length because this call moves the entry to
174        // the idle list rather than removing it.
175        if self.length == 0 {
176            // Fast path.
177            return None;
178        }
179
180        let mut lock = self.lists.lock();
181
182        let should_update_waker = match lock.waker.as_mut() {
183            Some(cur_waker) => !waker.will_wake(cur_waker),
184            None => true,
185        };
186        if should_update_waker {
187            lock.waker = Some(waker.clone());
188        }
189
190        // Pop the entry, returning None if empty.
191        let entry = lock.notified.pop_back()?;
192
193        lock.idle.push_front(entry.clone());
194
195        // Safety: We are holding the lock.
196        entry.my_list.with_mut(|ptr| unsafe {
197            *ptr = List::Idle;
198        });
199
200        drop(lock);
201
202        // Safety: We just put the entry in the idle list, so it is in one of the lists.
203        Some(EntryInOneOfTheLists { entry, set: self })
204    }
205
206    /// Tries to pop an entry from the notified list to poll it. The entry is moved to
207    /// the idle list atomically.
208    pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> {
209        // We don't decrement the length because this call moves the entry to
210        // the idle list rather than removing it.
211        if self.length == 0 {
212            // Fast path.
213            return None;
214        }
215
216        let mut lock = self.lists.lock();
217
218        // Pop the entry, returning None if empty.
219        let entry = lock.notified.pop_back()?;
220
221        lock.idle.push_front(entry.clone());
222
223        // Safety: We are holding the lock.
224        entry.my_list.with_mut(|ptr| unsafe {
225            *ptr = List::Idle;
226        });
227
228        drop(lock);
229
230        // Safety: We just put the entry in the idle list, so it is in one of the lists.
231        Some(EntryInOneOfTheLists { entry, set: self })
232    }
233
234    /// Call a function on every element in this list.
235    pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
236        fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
237            let mut node = list.last();
238
239            while let Some(entry) = node {
240                ptrs.push(entry.value.with_mut(|ptr| {
241                    let ptr: *mut ManuallyDrop<T> = ptr;
242                    let ptr: *mut T = ptr.cast();
243                    ptr
244                }));
245
246                let prev = entry.pointers.get_prev();
247                node = prev.map(|prev| unsafe { &*prev.as_ptr() });
248            }
249        }
250
251        // Atomically get a raw pointer to the value of every entry.
252        //
253        // Since this only locks the mutex once, it is not possible for a value
254        // to get moved from the idle list to the notified list during the
255        // operation, which would otherwise result in some value being listed
256        // twice.
257        let mut ptrs = Vec::with_capacity(self.len());
258        {
259            let mut lock = self.lists.lock();
260
261            get_ptrs(&mut lock.idle, &mut ptrs);
262            get_ptrs(&mut lock.notified, &mut ptrs);
263        }
264        debug_assert_eq!(ptrs.len(), ptrs.capacity());
265
266        for ptr in ptrs {
267            // Safety: When we grabbed the pointers, the entries were in one of
268            // the two lists. This means that their value was valid at the time,
269            // and it must still be valid because we are the IdleNotifiedSet,
270            // and only we can remove an entry from the two lists. (It's
271            // possible that an entry is moved from one list to the other during
272            // this loop, but that is ok.)
273            func(unsafe { &mut *ptr });
274        }
275    }
276
277    /// Remove all entries in both lists, applying some function to each element.
278    ///
279    /// The closure is called on all elements even if it panics. Having it panic
280    /// twice is a double-panic, and will abort the application.
281    pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
282        if self.length == 0 {
283            // Fast path.
284            return;
285        }
286        self.length = 0;
287
288        // The LinkedList is not cleared on panic, so we use a bomb to clear it.
289        //
290        // This value has the invariant that any entry in its `all_entries` list
291        // has `my_list` set to `Neither` and that the value has not yet been
292        // dropped.
293        struct AllEntries<T, F: FnMut(T)> {
294            all_entries: LinkedList<T>,
295            func: F,
296        }
297
298        impl<T, F: FnMut(T)> AllEntries<T, F> {
299            fn pop_next(&mut self) -> bool {
300                if let Some(entry) = self.all_entries.pop_back() {
301                    // Safety: We just took this value from the list, so we can
302                    // destroy the value in the entry.
303                    entry
304                        .value
305                        .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
306                    true
307                } else {
308                    false
309                }
310            }
311        }
312
313        impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
314            fn drop(&mut self) {
315                while self.pop_next() {}
316            }
317        }
318
319        let mut all_entries = AllEntries {
320            all_entries: LinkedList::new(),
321            func,
322        };
323
324        // Atomically move all entries to the new linked list in the AllEntries
325        // object.
326        {
327            let mut lock = self.lists.lock();
328            unsafe {
329                // Safety: We are holding the lock and `all_entries` is a new
330                // LinkedList.
331                move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
332                move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
333            }
334        }
335
336        // Keep destroying entries in the list until it is empty.
337        //
338        // If the closure panics, then the destructor of the `AllEntries` bomb
339        // ensures that we keep running the destructor on the remaining values.
340        // A second panic will abort the program.
341        while all_entries.pop_next() {}
342    }
343}
344
345/// # Safety
346///
347/// The mutex for the entries must be held, and the target list must be such
348/// that setting `my_list` to `Neither` is ok.
349unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
350    while let Some(entry) = from.pop_back() {
351        entry.my_list.with_mut(|ptr| {
352            // Safety: pointer is accessed while holding the mutex.
353            unsafe {
354                *ptr = List::Neither;
355            }
356        });
357        to.push_front(entry);
358    }
359}
360
361impl<'a, T> EntryInOneOfTheLists<'a, T> {
362    /// Remove this entry from the list it is in, returning the value associated
363    /// with the entry.
364    ///
365    /// This consumes the value, since it is no longer guaranteed to be in a
366    /// list.
367    pub(crate) fn remove(self) -> T {
368        self.set.length -= 1;
369
370        {
371            let mut lock = self.set.lists.lock();
372
373            // Safety: We are holding the lock so there is no race, and we will
374            // remove the entry afterwards to uphold invariants.
375            let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
376                let old_my_list = *ptr;
377                *ptr = List::Neither;
378                old_my_list
379            });
380
381            let list = match old_my_list {
382                List::Idle => &mut lock.idle,
383                List::Notified => &mut lock.notified,
384                // An entry in one of the lists is in one of the lists.
385                List::Neither => unreachable!(),
386            };
387
388            unsafe {
389                // Safety: We just checked that the entry is in this particular
390                // list.
391                list.remove(ListEntry::as_raw(&self.entry)).unwrap();
392            }
393        }
394
395        // By setting `my_list` to `Neither`, we have taken ownership of the
396        // value. We return it to the caller.
397        //
398        // Safety: We have a mutable reference to the `IdleNotifiedSet` that
399        // owns this entry, so we can use its permission to access the value.
400        self.entry
401            .value
402            .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
403    }
404
405    /// Access the value in this entry together with a context for its waker.
406    pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
407    where
408        F: FnOnce(&mut T, &mut Context<'_>) -> U,
409        T: 'static,
410    {
411        let waker = waker_ref(&self.entry);
412
413        let mut context = Context::from_waker(&waker);
414
415        // Safety: We have a mutable reference to the `IdleNotifiedSet` that
416        // owns this entry, so we can use its permission to access the value.
417        self.entry
418            .value
419            .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
420    }
421}
422
423impl<T> Drop for IdleNotifiedSet<T> {
424    fn drop(&mut self) {
425        // Clear both lists.
426        self.drain(drop);
427
428        #[cfg(debug_assertions)]
429        if !std::thread::panicking() {
430            let lock = self.lists.lock();
431            assert!(lock.idle.is_empty());
432            assert!(lock.notified.is_empty());
433        }
434    }
435}
436
437impl<T: 'static> Wake for ListEntry<T> {
438    fn wake_by_ref(me: &Arc<Self>) {
439        let mut lock = me.parent.lock();
440
441        // Safety: We are holding the lock and we will update the lists to
442        // maintain invariants.
443        let old_my_list = me.my_list.with_mut(|ptr| unsafe {
444            let old_my_list = *ptr;
445            if old_my_list == List::Idle {
446                *ptr = List::Notified;
447            }
448            old_my_list
449        });
450
451        if old_my_list == List::Idle {
452            // We move ourself to the notified list.
453            let me = unsafe {
454                // Safety: We just checked that we are in this particular list.
455                lock.idle.remove(ListEntry::as_raw(me)).unwrap()
456            };
457            lock.notified.push_front(me);
458
459            if let Some(waker) = lock.waker.take() {
460                drop(lock);
461                waker.wake();
462            }
463        }
464    }
465
466    fn wake(me: Arc<Self>) {
467        Self::wake_by_ref(&me);
468    }
469}
470
471/// # Safety
472///
473/// `ListEntry` is forced to be !Unpin.
474unsafe impl<T> linked_list::Link for ListEntry<T> {
475    type Handle = Arc<ListEntry<T>>;
476    type Target = ListEntry<T>;
477
478    fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
479        let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
480        // Safety: We can't get a null pointer from `Arc::as_ptr`.
481        unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
482    }
483
484    unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
485        unsafe { Arc::from_raw(ptr.as_ptr()) }
486    }
487
488    unsafe fn pointers(
489        target: NonNull<ListEntry<T>>,
490    ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
491        unsafe { ListEntry::addr_of_pointers(target) }
492    }
493}
494
495#[cfg(all(test, not(loom)))]
496mod tests {
497    use crate::runtime::Builder;
498    use crate::task::JoinSet;
499
500    // A test that runs under miri.
501    //
502    // https://github.com/tokio-rs/tokio/pull/5693
503    #[test]
504    fn join_set_test() {
505        let rt = Builder::new_current_thread().build().unwrap();
506
507        let mut set = JoinSet::new();
508        set.spawn_on(futures::future::ready(()), rt.handle());
509
510        rt.block_on(set.join_next()).unwrap().unwrap();
511    }
512}