Skip to main content

brotli/enc/threading/
mod.rs

1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core::marker::PhantomData;
3use core::ops::Range;
4use core::{any, mem};
5#[cfg(feature = "std")]
6use std;
7
8use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
9use super::encode::{
10    hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize,
11    BrotliEncoderOperation, SanitizeParams,
12};
13use super::BrotliAlloc;
14use crate::concat::{BroCatli, BroCatliResult};
15use crate::enc::combined_alloc::{alloc_default, allocate};
16use crate::enc::encode::BrotliEncoderStateStruct;
17
18pub type PoisonedThreadError = ();
19
20#[cfg(feature = "std")]
21pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
22#[cfg(not(feature = "std"))]
23pub type LowLevelThreadError = ();
24
25pub trait AnyBoxConstructor {
26    fn new(data: LowLevelThreadError) -> Self;
27}
28
29pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
30    fn join(self) -> Result<T, U>;
31}
32#[derive(Debug)]
33pub enum BrotliEncoderThreadError {
34    InsufficientOutputSpace,
35    ConcatenationDidNotProcessFullFile,
36    ConcatenationError(BroCatliResult),
37    ConcatenationFinalizationError(BroCatliResult),
38    OtherThreadPanic,
39    ThreadExecError(LowLevelThreadError),
40}
41
42impl AnyBoxConstructor for BrotliEncoderThreadError {
43    fn new(data: LowLevelThreadError) -> Self {
44        BrotliEncoderThreadError::ThreadExecError(data)
45    }
46}
47
48fn set_pending_error(
49    pending_error: &mut Option<BrotliEncoderThreadError>,
50    error: BrotliEncoderThreadError,
51) {
52    if pending_error.is_none() {
53        *pending_error = Some(error);
54    }
55}
56
57pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
58where
59    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
60{
61    data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
62    data_size: usize,
63}
64pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
65where
66    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
67{
68    compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
69    alloc: Alloc,
70}
71pub enum InternalSendAlloc<
72    ReturnVal: Send + 'static,
73    ExtraInput: Send + 'static,
74    Alloc: BrotliAlloc + Send + 'static,
75    Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
76> where
77    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
78{
79    A(Alloc, ExtraInput),
80    Join(Join),
81    SpawningOrJoining(PhantomData<ReturnVal>),
82}
83impl<
84        ReturnVal: Send + 'static,
85        ExtraInput: Send + 'static,
86        Alloc: BrotliAlloc + Send + 'static,
87        Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
88    > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
89where
90    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
91{
92    fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
93        match *self {
94            InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
95            _ => panic!("Bad state for allocator"),
96        }
97    }
98}
99
100pub struct SendAlloc<
101    ReturnValue: Send + 'static,
102    ExtraInput: Send + 'static,
103    Alloc: BrotliAlloc + Send + 'static,
104    Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
105>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
106//FIXME pub
107where
108    <Alloc as Allocator<u8>>::AllocatedMemory: Send;
109
110impl<
111        ReturnValue: Send + 'static,
112        ExtraInput: Send + 'static,
113        Alloc: BrotliAlloc + Send + 'static,
114        Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
115    > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
116where
117    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
118{
119    pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
120        SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
121    }
122    pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
123        match self.0 {
124            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
125            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
126                (other, other_extra)
127            }
128        }
129    }
130    fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
131        match self.0 {
132            InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
133            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
134                panic!("Item permanently borrowed/leaked")
135            }
136        }
137    }
138    pub fn unwrap(self) -> (Alloc, ExtraInput) {
139        match self.0 {
140            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
141            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
142                panic!("Item permanently borrowed/leaked")
143            }
144        }
145    }
146    pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
147        match mem::replace(
148            &mut self.0,
149            InternalSendAlloc::SpawningOrJoining(PhantomData),
150        ) {
151            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
152            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
153                panic!("Item permanently borrowed/leaked")
154            }
155        }
156    }
157}
158
159pub enum InternalOwned<T> {
160    // FIXME pub
161    Item(T),
162    Borrowed,
163}
164
165pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub
166impl<T> Owned<T> {
167    pub fn new(data: T) -> Self {
168        Owned::<T>(InternalOwned::Item(data))
169    }
170    pub fn unwrap_or(self, other: T) -> T {
171        if let InternalOwned::Item(x) = self.0 {
172            x
173        } else {
174            other
175        }
176    }
177    pub fn unwrap(self) -> T {
178        if let InternalOwned::Item(x) = self.0 {
179            x
180        } else {
181            panic!("Item permanently borrowed")
182        }
183    }
184    pub fn view(&self) -> &T {
185        if let InternalOwned::Item(ref x) = self.0 {
186            x
187        } else {
188            panic!("Item permanently borrowed")
189        }
190    }
191}
192
193pub trait OwnedRetriever<U: Send + 'static> {
194    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
195    fn unwrap(self) -> Result<U, PoisonedThreadError>;
196}
197
198#[cfg(feature = "std")]
199impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
200    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
201        match self.read() {
202            Ok(ref u) => Ok(f(u)),
203            Err(_) => Err(PoisonedThreadError::default()),
204        }
205    }
206    fn unwrap(self) -> Result<U, PoisonedThreadError> {
207        match std::sync::Arc::try_unwrap(self) {
208            Ok(rwlock) => match rwlock.into_inner() {
209                Ok(u) => Ok(u),
210                Err(_) => Err(PoisonedThreadError::default()),
211            },
212            Err(_) => Err(PoisonedThreadError::default()),
213        }
214    }
215}
216
217pub trait BatchSpawnable<
218    ReturnValue: Send + 'static,
219    ExtraInput: Send + 'static,
220    Alloc: BrotliAlloc + Send + 'static,
221    U: Send + 'static + Sync,
222> where
223    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
224{
225    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
226    type FinalJoinHandle: OwnedRetriever<U>;
227    // this function takes in an input slice
228    // a SendAlloc per thread and converts them all into JoinHandle
229    // the input is borrowed until the joins complete
230    // owned is set to borrowed
231    // the final join handle is a r/w lock which will return the SliceW to the owner
232    // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined
233    // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock,
234    // and an allocator from the alloc_per_thread
235    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
236    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
237        &mut self,
238        handle: &mut Self::FinalJoinHandle,
239        alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
240        index: usize,
241        num_threads: usize,
242        f: F,
243    );
244}
245
246pub trait BatchSpawnableLite<
247    ReturnValue: Send + 'static,
248    ExtraInput: Send + 'static,
249    Alloc: BrotliAlloc + Send + 'static,
250    U: Send + 'static + Sync,
251> where
252    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
253{
254    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
255    type FinalJoinHandle: OwnedRetriever<U>;
256    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
257    fn spawn(
258        &mut self,
259        handle: &mut Self::FinalJoinHandle,
260        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
261        index: usize,
262        num_threads: usize,
263        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
264    );
265}
266/*
267impl<ReturnValue:Send+'static,
268     ExtraInput:Send+'static,
269     Alloc:BrotliAlloc+Send+'static,
270     U:Send+'static+Sync>
271     BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> {
272  type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle;
273  type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle;
274  fn batch_spawn(
275    &mut self,
276    input: &mut Owned<U>,
277    alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>],
278    f: fn(usize, usize, &U, Alloc) -> T,
279  ) -> Self::FinalJoinHandle {
280   <Self as BatchSpawnable<ReturnValue, ExtraInput,  Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f)
281  }
282}*/
283
284pub fn CompressMultiSlice<
285    Alloc: BrotliAlloc + Send + 'static,
286    Spawner: BatchSpawnableLite<
287        CompressionThreadResult<Alloc>,
288        UnionHasher<Alloc>,
289        Alloc,
290        (
291            <Alloc as Allocator<u8>>::AllocatedMemory,
292            BrotliEncoderParams,
293        ),
294    >,
295>(
296    params: &BrotliEncoderParams,
297    input_slice: &[u8],
298    output: &mut [u8],
299    alloc_per_thread: &mut [SendAlloc<
300        CompressionThreadResult<Alloc>,
301        UnionHasher<Alloc>,
302        Alloc,
303        Spawner::JoinHandle,
304    >],
305    thread_spawner: &mut Spawner,
306) -> Result<usize, BrotliEncoderThreadError>
307where
308    <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
309    <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
310    <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
311{
312    let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
313        let mut input = allocate::<u8, _>(alloc, input_slice.len());
314        input.slice_mut().clone_from_slice(input_slice);
315        input
316    } else {
317        alloc_default::<u8, Alloc>()
318    };
319    let mut owned_input = Owned::new(input);
320    let ret = CompressMulti(
321        params,
322        &mut owned_input,
323        output,
324        alloc_per_thread,
325        thread_spawner,
326    );
327    if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
328        <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
329    }
330    ret
331}
332
333fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
334    ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
335}
336
337fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
338    hasher: UnionHasher<Alloc>,
339    thread_index: usize,
340    num_threads: usize,
341    input_and_params: &(SliceW, BrotliEncoderParams),
342    mut alloc: Alloc,
343) -> CompressionThreadResult<Alloc>
344where
345    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
346{
347    let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
348    let mut mem = allocate::<u8, _>(
349        &mut alloc,
350        BrotliEncoderMaxCompressedSize(range.end - range.start),
351    );
352    let mut state = BrotliEncoderStateStruct::new(alloc);
353    state.params = input_and_params.1.clone();
354    if thread_index != 0 {
355        state.params.catable = true; // make sure we can concatenate this to the other work results
356        state.params.magic_number = false; // no reason to pepper this around
357    }
358    state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in
359    if thread_index != 0 {
360        state.set_custom_dictionary_with_optional_precomputed_hasher(
361            range.start,
362            &input_and_params.0.slice()[..range.start],
363            hasher,
364            true,
365        );
366    }
367    let mut out_offset = 0usize;
368    let compression_result;
369    let mut available_out = mem.len();
370    loop {
371        let mut next_in_offset = 0usize;
372        let mut available_in = range.end - range.start;
373        let result = state.compress_stream(
374            BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
375            &mut available_in,
376            &input_and_params.0.slice()[range.clone()],
377            &mut next_in_offset,
378            &mut available_out,
379            mem.slice_mut(),
380            &mut out_offset,
381            &mut None,
382            &mut |_a, _b, _c, _d| (),
383        );
384        let new_range = range.start + next_in_offset..range.end;
385        range = new_range;
386        if result {
387            compression_result = Ok(out_offset);
388            break;
389        } else if available_out == 0 {
390            compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space??
391            break;
392        }
393    }
394    BrotliEncoderDestroyInstance(&mut state);
395    match compression_result {
396        Ok(size) => CompressionThreadResult::<Alloc> {
397            compressed: Ok(CompressedFileChunk {
398                data_backing: mem,
399                data_size: size,
400            }),
401            alloc: state.m8,
402        },
403        Err(e) => {
404            <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
405            CompressionThreadResult::<Alloc> {
406                compressed: Err(e),
407                alloc: state.m8,
408            }
409        }
410    }
411}
412
413pub fn CompressMulti<
414    Alloc: BrotliAlloc + Send + 'static,
415    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
416    Spawner: BatchSpawnableLite<
417        CompressionThreadResult<Alloc>,
418        UnionHasher<Alloc>,
419        Alloc,
420        (SliceW, BrotliEncoderParams),
421    >,
422>(
423    params: &BrotliEncoderParams,
424    owned_input: &mut Owned<SliceW>,
425    output: &mut [u8],
426    alloc_per_thread: &mut [SendAlloc<
427        CompressionThreadResult<Alloc>,
428        UnionHasher<Alloc>,
429        Alloc,
430        Spawner::JoinHandle,
431    >],
432    thread_spawner: &mut Spawner,
433) -> Result<usize, BrotliEncoderThreadError>
434where
435    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
436    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
437    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
438{
439    let num_threads = alloc_per_thread.len();
440    let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
441    let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
442    // start thread spawner
443    let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
444    if num_threads > 1 {
445        // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items
446        thread_spawner.spawn(
447            &mut spawner_and_input,
448            &mut alloc_per_thread[0],
449            0,
450            num_threads,
451            compress_part,
452        );
453    }
454    // populate all hashers at once, cloning them one by one
455    let mut compression_last_thread_result;
456    if num_threads > 1 && params.favor_cpu_efficiency {
457        let mut local_params = params.clone();
458        SanitizeParams(&mut local_params);
459        let mut hasher = UnionHasher::Uninit;
460        hasher_setup(
461            alloc_per_thread[num_threads - 1].0.unwrap_input().0,
462            &mut hasher,
463            &mut local_params,
464            None, // No unwrappable custom dict used here.
465            &[],
466            0,
467            0,
468            false,
469        );
470        let mut setup_error = false;
471        for thread_index in 1..num_threads {
472            let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
473                let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
474                let overlap = hasher.StoreLookahead().wrapping_sub(1);
475                if range.end - range.start > overlap {
476                    hasher.BulkStoreRange(
477                        input_and_params.0.slice(),
478                        usize::MAX,
479                        if range.start > overlap {
480                            range.start - overlap
481                        } else {
482                            0
483                        },
484                        range.end - overlap,
485                    );
486                }
487            });
488            if let Err(_e) = res {
489                setup_error = true;
490                break;
491            }
492            if thread_index + 1 != num_threads {
493                {
494                    let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
495                    *out_hasher = hasher.clone_with_alloc(alloc);
496                }
497                thread_spawner.spawn(
498                    &mut spawner_and_input,
499                    &mut alloc_per_thread[thread_index],
500                    thread_index,
501                    num_threads,
502                    compress_part,
503                );
504            }
505        }
506        if setup_error {
507            let mut setup_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
508            for thread in alloc_per_thread.iter_mut() {
509                match mem::replace(
510                    &mut thread.0,
511                    InternalSendAlloc::SpawningOrJoining(PhantomData),
512                ) {
513                    InternalSendAlloc::Join(join) => match join.join() {
514                        Ok(mut thread_result) => {
515                            if let Ok(compressed_out) = thread_result.compressed {
516                                <Alloc as Allocator<u8>>::free_cell(
517                                    &mut thread_result.alloc,
518                                    compressed_out.data_backing,
519                                );
520                            }
521                            thread.0 =
522                                InternalSendAlloc::A(thread_result.alloc, UnionHasher::Uninit);
523                        }
524                        Err(join_error) => setup_result = Err(join_error),
525                    },
526                    other => thread.0 = other,
527                }
528            }
529            if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
530                *owned_input = Owned::new(retrieved_owned_input.0);
531            }
532            return setup_result;
533        }
534        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
535        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
536        compress_part(hasher,
537                      num_threads - 1,
538                      num_threads,
539                      input_and_params,
540                      alloc,
541        )
542      });
543    } else {
544        if num_threads > 1 {
545            for thread_index in 1..num_threads - 1 {
546                thread_spawner.spawn(
547                    &mut spawner_and_input,
548                    &mut alloc_per_thread[thread_index],
549                    thread_index,
550                    num_threads,
551                    compress_part,
552                );
553            }
554        }
555        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
556        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
557        compress_part(UnionHasher::Uninit,
558                      num_threads - 1,
559                      num_threads,
560                      input_and_params,
561                      alloc,
562        )
563      });
564    }
565    let mut compression_result = Ok(0usize);
566    let mut pending_error = None;
567    let mut out_file_size = 0usize;
568    let mut bro_cat_li = BroCatli::new();
569    for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
570        let cur_result = if index + 1 == num_threads {
571            match mem::replace(&mut compression_last_thread_result, Err(())) {
572                Ok(result) => Some(result),
573                Err(_err) => {
574                    set_pending_error(
575                        &mut pending_error,
576                        BrotliEncoderThreadError::OtherThreadPanic,
577                    );
578                    None
579                }
580            }
581        } else {
582            match mem::replace(
583                &mut thread.0,
584                InternalSendAlloc::SpawningOrJoining(PhantomData),
585            ) {
586                InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
587                    panic!("Thread not properly spawned")
588                }
589                InternalSendAlloc::Join(join) => match join.join() {
590                    Ok(result) => Some(result),
591                    Err(err) => {
592                        set_pending_error(&mut pending_error, err);
593                        None
594                    }
595                },
596            }
597        };
598        if let Some(mut cur_result) = cur_result {
599            match cur_result.compressed {
600                Ok(compressed_out) => {
601                    if pending_error.is_none() {
602                        bro_cat_li.new_brotli_file();
603                        let mut in_offset = 0usize;
604                        let cat_result = bro_cat_li.stream(
605                            &compressed_out.data_backing.slice()[..compressed_out.data_size],
606                            &mut in_offset,
607                            output,
608                            &mut out_file_size,
609                        );
610                        match cat_result {
611                            BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
612                                compression_result = Ok(out_file_size);
613                            }
614                            BroCatliResult::NeedsMoreOutput => {
615                                set_pending_error(
616                                    &mut pending_error,
617                                    BrotliEncoderThreadError::InsufficientOutputSpace,
618                                );
619                                // not enough space
620                            }
621                            err => {
622                                set_pending_error(
623                                    &mut pending_error,
624                                    BrotliEncoderThreadError::ConcatenationError(err),
625                                );
626                                // misc error
627                            }
628                        }
629                    }
630                    <Alloc as Allocator<u8>>::free_cell(
631                        &mut cur_result.alloc,
632                        compressed_out.data_backing,
633                    );
634                }
635                Err(e) => {
636                    set_pending_error(&mut pending_error, e);
637                }
638            }
639            thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
640        }
641    }
642    if let Some(error) = pending_error {
643        compression_result = Err(error);
644    }
645    if compression_result.is_ok() {
646        match bro_cat_li.finish(output, &mut out_file_size) {
647            BroCatliResult::Success => compression_result = Ok(out_file_size),
648            err => {
649                compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
650                    err,
651                ))
652            }
653        }
654    }
655    if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
656        *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning
657    } else if compression_result.is_ok() {
658        compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
659    }
660    compression_result
661}
662
663mod test;