tokio/util/
sharded_list.rs1use std::ptr::NonNull;
2use std::sync::atomic::Ordering;
3
4use crate::loom::sync::{Mutex, MutexGuard};
5use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
6
7use super::linked_list::{Link, LinkedList};
8
9pub(crate) struct ShardedList<L, T> {
16 lists: Box<[Mutex<LinkedList<L, T>>]>,
17 added: MetricAtomicU64,
18 count: MetricAtomicUsize,
19 shard_mask: usize,
20}
21
22pub(crate) unsafe trait ShardedListItem: Link {
29 unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize;
33}
34
35impl<L, T> ShardedList<L, T> {
36 pub(crate) fn new(sharded_size: usize) -> Self {
38 assert!(sharded_size.is_power_of_two());
39
40 let shard_mask = sharded_size - 1;
41 let mut lists = Vec::with_capacity(sharded_size);
42 for _ in 0..sharded_size {
43 lists.push(Mutex::new(LinkedList::<L, T>::new()))
44 }
45 Self {
46 lists: lists.into_boxed_slice(),
47 added: MetricAtomicU64::new(0),
48 count: MetricAtomicUsize::new(0),
49 shard_mask,
50 }
51 }
52}
53
54pub(crate) struct ShardGuard<'a, L, T> {
56 lock: MutexGuard<'a, LinkedList<L, T>>,
57 added: &'a MetricAtomicU64,
58 count: &'a MetricAtomicUsize,
59 id: usize,
60}
61
62impl<L: ShardedListItem> ShardedList<L, L::Target> {
63 pub(crate) fn pop_back(&self, shard_id: usize) -> Option<L::Handle> {
66 let mut lock = self.shard_inner(shard_id);
67 let node = lock.pop_back();
68 if node.is_some() {
69 self.count.decrement();
70 }
71 node
72 }
73
74 pub(crate) unsafe fn remove(&self, node: NonNull<L::Target>) -> Option<L::Handle> {
83 let id = unsafe { L::get_shard_id(node) };
84 let mut lock = self.shard_inner(id);
85 let node = unsafe { lock.remove(node) };
88 if node.is_some() {
89 self.count.decrement();
90 }
91 node
92 }
93
94 pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> {
96 let id = unsafe { L::get_shard_id(L::as_raw(val)) };
97 ShardGuard {
98 lock: self.shard_inner(id),
99 added: &self.added,
100 count: &self.count,
101 id,
102 }
103 }
104
105 pub(crate) fn len(&self) -> usize {
107 self.count.load(Ordering::Relaxed)
108 }
109
110 cfg_64bit_metrics! {
111 pub(crate) fn added(&self) -> u64 {
113 self.added.load(Ordering::Relaxed)
114 }
115 }
116
117 pub(crate) fn is_empty(&self) -> bool {
119 self.len() == 0
120 }
121
122 pub(crate) fn shard_size(&self) -> usize {
126 self.shard_mask + 1
127 }
128
129 #[inline]
130 fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>> {
131 unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() }
133 }
134}
135
136impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
137 pub(crate) fn push(mut self, val: L::Handle) {
139 let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
140 assert_eq!(id, self.id);
141 self.lock.push_front(val);
142 self.added.add(1, Ordering::Relaxed);
143 self.count.increment();
144 }
145}
146
147cfg_taskdump! {
148 impl<L: ShardedListItem> ShardedList<L, L::Target> {
149 pub(crate) fn for_each<F>(&self, mut f: F)
150 where
151 F: FnMut(&L::Handle),
152 {
153 let mut guards = Vec::with_capacity(self.lists.len());
154 for list in self.lists.iter() {
155 guards.push(list.lock());
156 }
157 for g in &mut guards {
158 g.for_each(&mut f);
159 }
160 }
161 }
162}