1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core::marker::PhantomData;
3use core::ops::Range;
4use core::{any, mem};
5#[cfg(feature = "std")]
6use std;
7
8use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
9use super::encode::{
10 hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize,
11 BrotliEncoderOperation, SanitizeParams,
12};
13use super::BrotliAlloc;
14use crate::concat::{BroCatli, BroCatliResult};
15use crate::enc::combined_alloc::{alloc_default, allocate};
16use crate::enc::encode::BrotliEncoderStateStruct;
17
18pub type PoisonedThreadError = ();
19
20#[cfg(feature = "std")]
21pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
22#[cfg(not(feature = "std"))]
23pub type LowLevelThreadError = ();
24
25pub trait AnyBoxConstructor {
26 fn new(data: LowLevelThreadError) -> Self;
27}
28
29pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
30 fn join(self) -> Result<T, U>;
31}
32#[derive(Debug)]
33pub enum BrotliEncoderThreadError {
34 InsufficientOutputSpace,
35 ConcatenationDidNotProcessFullFile,
36 ConcatenationError(BroCatliResult),
37 ConcatenationFinalizationError(BroCatliResult),
38 OtherThreadPanic,
39 ThreadExecError(LowLevelThreadError),
40}
41
42impl AnyBoxConstructor for BrotliEncoderThreadError {
43 fn new(data: LowLevelThreadError) -> Self {
44 BrotliEncoderThreadError::ThreadExecError(data)
45 }
46}
47
48fn set_pending_error(
49 pending_error: &mut Option<BrotliEncoderThreadError>,
50 error: BrotliEncoderThreadError,
51) {
52 if pending_error.is_none() {
53 *pending_error = Some(error);
54 }
55}
56
57pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
58where
59 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
60{
61 data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
62 data_size: usize,
63}
64pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
65where
66 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
67{
68 compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
69 alloc: Alloc,
70}
71pub enum InternalSendAlloc<
72 ReturnVal: Send + 'static,
73 ExtraInput: Send + 'static,
74 Alloc: BrotliAlloc + Send + 'static,
75 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
76> where
77 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
78{
79 A(Alloc, ExtraInput),
80 Join(Join),
81 SpawningOrJoining(PhantomData<ReturnVal>),
82}
83impl<
84 ReturnVal: Send + 'static,
85 ExtraInput: Send + 'static,
86 Alloc: BrotliAlloc + Send + 'static,
87 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
88 > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
89where
90 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
91{
92 fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
93 match *self {
94 InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
95 _ => panic!("Bad state for allocator"),
96 }
97 }
98}
99
100pub struct SendAlloc<
101 ReturnValue: Send + 'static,
102 ExtraInput: Send + 'static,
103 Alloc: BrotliAlloc + Send + 'static,
104 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
105>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
106where
108 <Alloc as Allocator<u8>>::AllocatedMemory: Send;
109
110impl<
111 ReturnValue: Send + 'static,
112 ExtraInput: Send + 'static,
113 Alloc: BrotliAlloc + Send + 'static,
114 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
115 > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
116where
117 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
118{
119 pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
120 SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
121 }
122 pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
123 match self.0 {
124 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
125 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
126 (other, other_extra)
127 }
128 }
129 }
130 fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
131 match self.0 {
132 InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
133 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
134 panic!("Item permanently borrowed/leaked")
135 }
136 }
137 }
138 pub fn unwrap(self) -> (Alloc, ExtraInput) {
139 match self.0 {
140 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
141 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
142 panic!("Item permanently borrowed/leaked")
143 }
144 }
145 }
146 pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
147 match mem::replace(
148 &mut self.0,
149 InternalSendAlloc::SpawningOrJoining(PhantomData),
150 ) {
151 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
152 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
153 panic!("Item permanently borrowed/leaked")
154 }
155 }
156 }
157}
158
159pub enum InternalOwned<T> {
160 Item(T),
162 Borrowed,
163}
164
165pub struct Owned<T>(pub InternalOwned<T>); impl<T> Owned<T> {
167 pub fn new(data: T) -> Self {
168 Owned::<T>(InternalOwned::Item(data))
169 }
170 pub fn unwrap_or(self, other: T) -> T {
171 if let InternalOwned::Item(x) = self.0 {
172 x
173 } else {
174 other
175 }
176 }
177 pub fn unwrap(self) -> T {
178 if let InternalOwned::Item(x) = self.0 {
179 x
180 } else {
181 panic!("Item permanently borrowed")
182 }
183 }
184 pub fn view(&self) -> &T {
185 if let InternalOwned::Item(ref x) = self.0 {
186 x
187 } else {
188 panic!("Item permanently borrowed")
189 }
190 }
191}
192
193pub trait OwnedRetriever<U: Send + 'static> {
194 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
195 fn unwrap(self) -> Result<U, PoisonedThreadError>;
196}
197
198#[cfg(feature = "std")]
199impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
200 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
201 match self.read() {
202 Ok(ref u) => Ok(f(u)),
203 Err(_) => Err(PoisonedThreadError::default()),
204 }
205 }
206 fn unwrap(self) -> Result<U, PoisonedThreadError> {
207 match std::sync::Arc::try_unwrap(self) {
208 Ok(rwlock) => match rwlock.into_inner() {
209 Ok(u) => Ok(u),
210 Err(_) => Err(PoisonedThreadError::default()),
211 },
212 Err(_) => Err(PoisonedThreadError::default()),
213 }
214 }
215}
216
217pub trait BatchSpawnable<
218 ReturnValue: Send + 'static,
219 ExtraInput: Send + 'static,
220 Alloc: BrotliAlloc + Send + 'static,
221 U: Send + 'static + Sync,
222> where
223 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
224{
225 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
226 type FinalJoinHandle: OwnedRetriever<U>;
227 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
236 fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
237 &mut self,
238 handle: &mut Self::FinalJoinHandle,
239 alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
240 index: usize,
241 num_threads: usize,
242 f: F,
243 );
244}
245
246pub trait BatchSpawnableLite<
247 ReturnValue: Send + 'static,
248 ExtraInput: Send + 'static,
249 Alloc: BrotliAlloc + Send + 'static,
250 U: Send + 'static + Sync,
251> where
252 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
253{
254 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
255 type FinalJoinHandle: OwnedRetriever<U>;
256 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
257 fn spawn(
258 &mut self,
259 handle: &mut Self::FinalJoinHandle,
260 alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
261 index: usize,
262 num_threads: usize,
263 f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
264 );
265}
266pub fn CompressMultiSlice<
285 Alloc: BrotliAlloc + Send + 'static,
286 Spawner: BatchSpawnableLite<
287 CompressionThreadResult<Alloc>,
288 UnionHasher<Alloc>,
289 Alloc,
290 (
291 <Alloc as Allocator<u8>>::AllocatedMemory,
292 BrotliEncoderParams,
293 ),
294 >,
295>(
296 params: &BrotliEncoderParams,
297 input_slice: &[u8],
298 output: &mut [u8],
299 alloc_per_thread: &mut [SendAlloc<
300 CompressionThreadResult<Alloc>,
301 UnionHasher<Alloc>,
302 Alloc,
303 Spawner::JoinHandle,
304 >],
305 thread_spawner: &mut Spawner,
306) -> Result<usize, BrotliEncoderThreadError>
307where
308 <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
309 <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
310 <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
311{
312 let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
313 let mut input = allocate::<u8, _>(alloc, input_slice.len());
314 input.slice_mut().clone_from_slice(input_slice);
315 input
316 } else {
317 alloc_default::<u8, Alloc>()
318 };
319 let mut owned_input = Owned::new(input);
320 let ret = CompressMulti(
321 params,
322 &mut owned_input,
323 output,
324 alloc_per_thread,
325 thread_spawner,
326 );
327 if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
328 <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
329 }
330 ret
331}
332
333fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
334 ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
335}
336
337fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
338 hasher: UnionHasher<Alloc>,
339 thread_index: usize,
340 num_threads: usize,
341 input_and_params: &(SliceW, BrotliEncoderParams),
342 mut alloc: Alloc,
343) -> CompressionThreadResult<Alloc>
344where
345 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
346{
347 let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
348 let mut mem = allocate::<u8, _>(
349 &mut alloc,
350 BrotliEncoderMaxCompressedSize(range.end - range.start),
351 );
352 let mut state = BrotliEncoderStateStruct::new(alloc);
353 state.params = input_and_params.1.clone();
354 if thread_index != 0 {
355 state.params.catable = true; state.params.magic_number = false; }
358 state.params.appendable = true; if thread_index != 0 {
360 state.set_custom_dictionary_with_optional_precomputed_hasher(
361 range.start,
362 &input_and_params.0.slice()[..range.start],
363 hasher,
364 true,
365 );
366 }
367 let mut out_offset = 0usize;
368 let compression_result;
369 let mut available_out = mem.len();
370 loop {
371 let mut next_in_offset = 0usize;
372 let mut available_in = range.end - range.start;
373 let result = state.compress_stream(
374 BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
375 &mut available_in,
376 &input_and_params.0.slice()[range.clone()],
377 &mut next_in_offset,
378 &mut available_out,
379 mem.slice_mut(),
380 &mut out_offset,
381 &mut None,
382 &mut |_a, _b, _c, _d| (),
383 );
384 let new_range = range.start + next_in_offset..range.end;
385 range = new_range;
386 if result {
387 compression_result = Ok(out_offset);
388 break;
389 } else if available_out == 0 {
390 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); break;
392 }
393 }
394 BrotliEncoderDestroyInstance(&mut state);
395 match compression_result {
396 Ok(size) => CompressionThreadResult::<Alloc> {
397 compressed: Ok(CompressedFileChunk {
398 data_backing: mem,
399 data_size: size,
400 }),
401 alloc: state.m8,
402 },
403 Err(e) => {
404 <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
405 CompressionThreadResult::<Alloc> {
406 compressed: Err(e),
407 alloc: state.m8,
408 }
409 }
410 }
411}
412
413pub fn CompressMulti<
414 Alloc: BrotliAlloc + Send + 'static,
415 SliceW: SliceWrapper<u8> + Send + 'static + Sync,
416 Spawner: BatchSpawnableLite<
417 CompressionThreadResult<Alloc>,
418 UnionHasher<Alloc>,
419 Alloc,
420 (SliceW, BrotliEncoderParams),
421 >,
422>(
423 params: &BrotliEncoderParams,
424 owned_input: &mut Owned<SliceW>,
425 output: &mut [u8],
426 alloc_per_thread: &mut [SendAlloc<
427 CompressionThreadResult<Alloc>,
428 UnionHasher<Alloc>,
429 Alloc,
430 Spawner::JoinHandle,
431 >],
432 thread_spawner: &mut Spawner,
433) -> Result<usize, BrotliEncoderThreadError>
434where
435 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
436 <Alloc as Allocator<u16>>::AllocatedMemory: Send,
437 <Alloc as Allocator<u32>>::AllocatedMemory: Send,
438{
439 let num_threads = alloc_per_thread.len();
440 let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
441 let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
442 let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
444 if num_threads > 1 {
445 thread_spawner.spawn(
447 &mut spawner_and_input,
448 &mut alloc_per_thread[0],
449 0,
450 num_threads,
451 compress_part,
452 );
453 }
454 let mut compression_last_thread_result;
456 if num_threads > 1 && params.favor_cpu_efficiency {
457 let mut local_params = params.clone();
458 SanitizeParams(&mut local_params);
459 let mut hasher = UnionHasher::Uninit;
460 hasher_setup(
461 alloc_per_thread[num_threads - 1].0.unwrap_input().0,
462 &mut hasher,
463 &mut local_params,
464 None, &[],
466 0,
467 0,
468 false,
469 );
470 let mut setup_error = false;
471 for thread_index in 1..num_threads {
472 let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
473 let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
474 let overlap = hasher.StoreLookahead().wrapping_sub(1);
475 if range.end - range.start > overlap {
476 hasher.BulkStoreRange(
477 input_and_params.0.slice(),
478 usize::MAX,
479 if range.start > overlap {
480 range.start - overlap
481 } else {
482 0
483 },
484 range.end - overlap,
485 );
486 }
487 });
488 if let Err(_e) = res {
489 setup_error = true;
490 break;
491 }
492 if thread_index + 1 != num_threads {
493 {
494 let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
495 *out_hasher = hasher.clone_with_alloc(alloc);
496 }
497 thread_spawner.spawn(
498 &mut spawner_and_input,
499 &mut alloc_per_thread[thread_index],
500 thread_index,
501 num_threads,
502 compress_part,
503 );
504 }
505 }
506 if setup_error {
507 let mut setup_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
508 for thread in alloc_per_thread.iter_mut() {
509 match mem::replace(
510 &mut thread.0,
511 InternalSendAlloc::SpawningOrJoining(PhantomData),
512 ) {
513 InternalSendAlloc::Join(join) => match join.join() {
514 Ok(mut thread_result) => {
515 if let Ok(compressed_out) = thread_result.compressed {
516 <Alloc as Allocator<u8>>::free_cell(
517 &mut thread_result.alloc,
518 compressed_out.data_backing,
519 );
520 }
521 thread.0 =
522 InternalSendAlloc::A(thread_result.alloc, UnionHasher::Uninit);
523 }
524 Err(join_error) => setup_result = Err(join_error),
525 },
526 other => thread.0 = other,
527 }
528 }
529 if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
530 *owned_input = Owned::new(retrieved_owned_input.0);
531 }
532 return setup_result;
533 }
534 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
535 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
536 compress_part(hasher,
537 num_threads - 1,
538 num_threads,
539 input_and_params,
540 alloc,
541 )
542 });
543 } else {
544 if num_threads > 1 {
545 for thread_index in 1..num_threads - 1 {
546 thread_spawner.spawn(
547 &mut spawner_and_input,
548 &mut alloc_per_thread[thread_index],
549 thread_index,
550 num_threads,
551 compress_part,
552 );
553 }
554 }
555 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
556 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
557 compress_part(UnionHasher::Uninit,
558 num_threads - 1,
559 num_threads,
560 input_and_params,
561 alloc,
562 )
563 });
564 }
565 let mut compression_result = Ok(0usize);
566 let mut pending_error = None;
567 let mut out_file_size = 0usize;
568 let mut bro_cat_li = BroCatli::new();
569 for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
570 let cur_result = if index + 1 == num_threads {
571 match mem::replace(&mut compression_last_thread_result, Err(())) {
572 Ok(result) => Some(result),
573 Err(_err) => {
574 set_pending_error(
575 &mut pending_error,
576 BrotliEncoderThreadError::OtherThreadPanic,
577 );
578 None
579 }
580 }
581 } else {
582 match mem::replace(
583 &mut thread.0,
584 InternalSendAlloc::SpawningOrJoining(PhantomData),
585 ) {
586 InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
587 panic!("Thread not properly spawned")
588 }
589 InternalSendAlloc::Join(join) => match join.join() {
590 Ok(result) => Some(result),
591 Err(err) => {
592 set_pending_error(&mut pending_error, err);
593 None
594 }
595 },
596 }
597 };
598 if let Some(mut cur_result) = cur_result {
599 match cur_result.compressed {
600 Ok(compressed_out) => {
601 if pending_error.is_none() {
602 bro_cat_li.new_brotli_file();
603 let mut in_offset = 0usize;
604 let cat_result = bro_cat_li.stream(
605 &compressed_out.data_backing.slice()[..compressed_out.data_size],
606 &mut in_offset,
607 output,
608 &mut out_file_size,
609 );
610 match cat_result {
611 BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
612 compression_result = Ok(out_file_size);
613 }
614 BroCatliResult::NeedsMoreOutput => {
615 set_pending_error(
616 &mut pending_error,
617 BrotliEncoderThreadError::InsufficientOutputSpace,
618 );
619 }
621 err => {
622 set_pending_error(
623 &mut pending_error,
624 BrotliEncoderThreadError::ConcatenationError(err),
625 );
626 }
628 }
629 }
630 <Alloc as Allocator<u8>>::free_cell(
631 &mut cur_result.alloc,
632 compressed_out.data_backing,
633 );
634 }
635 Err(e) => {
636 set_pending_error(&mut pending_error, e);
637 }
638 }
639 thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
640 }
641 }
642 if let Some(error) = pending_error {
643 compression_result = Err(error);
644 }
645 if compression_result.is_ok() {
646 match bro_cat_li.finish(output, &mut out_file_size) {
647 BroCatliResult::Success => compression_result = Ok(out_file_size),
648 err => {
649 compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
650 err,
651 ))
652 }
653 }
654 }
655 if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
656 *owned_input = Owned::new(retrieved_owned_input.0); } else if compression_result.is_ok() {
658 compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
659 }
660 compression_result
661}
662
663mod test;