rayon/iter/collect/
consumer.rs1use super::super::plumbing::*;
2use crate::SendPtr;
3use std::marker::PhantomData;
4use std::ptr;
5
6pub(super) struct CollectConsumer<'c, T: Send> {
7 start: SendPtr<T>,
9 len: usize,
10 marker: PhantomData<&'c mut T>,
11}
12
13impl<T: Send> CollectConsumer<'_, T> {
14 pub(super) fn appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T> {
16 let start = vec.len();
17 assert!(vec.capacity() - start >= len);
18
19 unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) }
23 }
24}
25
26impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
27 unsafe fn new(start: *mut T, len: usize) -> Self {
30 CollectConsumer {
31 start: SendPtr(start),
32 len,
33 marker: PhantomData,
34 }
35 }
36}
37
38#[must_use]
43pub(super) struct CollectResult<'c, T> {
44 start: SendPtr<T>,
48 total_len: usize,
49 initialized_len: usize,
51 invariant_lifetime: PhantomData<&'c mut &'c mut [T]>,
54}
55
56unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {}
57
58impl<'c, T> CollectResult<'c, T> {
59 pub(super) fn len(&self) -> usize {
61 self.initialized_len
62 }
63
64 pub(super) fn release_ownership(mut self) -> usize {
66 let ret = self.initialized_len;
67 self.initialized_len = 0;
68 ret
69 }
70}
71
72impl<'c, T> Drop for CollectResult<'c, T> {
73 fn drop(&mut self) {
74 unsafe {
77 ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
78 self.start.0,
79 self.initialized_len,
80 ));
81 }
82 }
83}
84
85impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
86 type Folder = CollectResult<'c, T>;
87 type Reducer = CollectReducer;
88 type Result = CollectResult<'c, T>;
89
90 fn split_at(self, index: usize) -> (Self, Self, CollectReducer) {
91 let CollectConsumer { start, len, .. } = self;
92
93 unsafe {
96 assert!(index <= len);
97 (
98 CollectConsumer::new(start.0, index),
99 CollectConsumer::new(start.0.add(index), len - index),
100 CollectReducer,
101 )
102 }
103 }
104
105 fn into_folder(self) -> Self::Folder {
106 CollectResult {
109 start: self.start,
110 total_len: self.len,
111 initialized_len: 0,
112 invariant_lifetime: PhantomData,
113 }
114 }
115
116 fn full(&self) -> bool {
117 false
118 }
119}
120
121impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> {
122 type Result = Self;
123
124 fn consume(mut self, item: T) -> Self {
125 assert!(
126 self.initialized_len < self.total_len,
127 "too many values pushed to consumer"
128 );
129
130 unsafe {
133 self.start.0.add(self.initialized_len).write(item);
135 self.initialized_len += 1;
136 }
137
138 self
139 }
140
141 fn complete(self) -> Self::Result {
142 self
145 }
146
147 fn full(&self) -> bool {
148 false
149 }
150}
151
152impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> {
155 fn split_off_left(&self) -> Self {
156 unreachable!("CollectConsumer must be indexed!")
157 }
158 fn to_reducer(&self) -> Self::Reducer {
159 CollectReducer
160 }
161}
162
163pub(super) struct CollectReducer;
166
167impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer {
168 fn reduce(
169 self,
170 mut left: CollectResult<'c, T>,
171 right: CollectResult<'c, T>,
172 ) -> CollectResult<'c, T> {
173 unsafe {
177 let left_end = left.start.0.add(left.initialized_len);
178 if left_end == right.start.0 {
179 left.total_len += right.total_len;
180 left.initialized_len += right.release_ownership();
181 }
182 left
183 }
184 }
185}