tokio/runtime/scheduler/inject/
rt_multi_thread.rs1use super::{Shared, Synced};
2
3use crate::runtime::scheduler::Lock;
4use crate::runtime::task;
5
6use std::sync::atomic::Ordering::Release;
7
8impl<'a> Lock<Synced> for &'a mut Synced {
9 type Handle = &'a mut Synced;
10
11 fn lock(self) -> Self::Handle {
12 self
13 }
14}
15
16impl AsMut<Synced> for Synced {
17 fn as_mut(&mut self) -> &mut Synced {
18 self
19 }
20}
21
22impl<T: 'static> Shared<T> {
23 #[inline]
29 pub(crate) unsafe fn push_batch<L, I>(&self, shared: L, mut iter: I)
30 where
31 L: Lock<Synced>,
32 I: Iterator<Item = task::Notified<T>>,
33 {
34 let first = match iter.next() {
35 Some(first) => first.into_raw(),
36 None => return,
37 };
38
39 let mut prev = first;
41 let mut counter = 1;
42
43 iter.for_each(|next| {
47 let next = next.into_raw();
48
49 unsafe { prev.set_queue_next(Some(next)) };
52 prev = next;
53 counter += 1;
54 });
55
56 unsafe {
61 self.push_batch_inner(shared, first, prev, counter);
62 }
63 }
64
65 #[inline]
74 unsafe fn push_batch_inner<L>(
75 &self,
76 shared: L,
77 batch_head: task::RawTask,
78 batch_tail: task::RawTask,
79 num: usize,
80 ) where
81 L: Lock<Synced>,
82 {
83 debug_assert!(unsafe { batch_tail.get_queue_next().is_none() });
84
85 let mut synced = shared.lock();
86
87 if synced.as_mut().is_closed {
88 drop(synced);
89
90 let mut curr = Some(batch_head);
91
92 while let Some(task) = curr {
93 curr = unsafe { task.get_queue_next() };
95
96 let _ = unsafe { task::Notified::<T>::from_raw(task) };
97 }
98
99 return;
100 }
101
102 let synced = synced.as_mut();
103
104 if let Some(tail) = synced.tail {
105 unsafe {
106 tail.set_queue_next(Some(batch_head));
107 }
108 } else {
109 synced.head = Some(batch_head);
110 }
111
112 synced.tail = Some(batch_tail);
113
114 let len = unsafe { self.len.unsync_load() };
119
120 self.len.store(len + num, Release);
121 }
122}