async_compression/generic/bufread/
encoder.rs

1use crate::{
2    codecs::EncodeV2,
3    core::util::{PartialBuffer, WriteBuffer},
4};
5use std::{io::Result, ops::ControlFlow};
6
7#[derive(Debug)]
8enum State {
9    Encoding(usize),
10    Flushing,
11    Finishing,
12    Done,
13}
14
15#[derive(Debug)]
16pub struct Encoder {
17    state: State,
18}
19
20impl Default for Encoder {
21    fn default() -> Self {
22        Self {
23            state: State::Encoding(0),
24        }
25    }
26}
27
28impl Encoder {
29    /// `input` - should be `None` if `Poll::Pending`.
30    pub fn do_poll_read(
31        &mut self,
32        output: &mut WriteBuffer<'_>,
33        encoder: &mut dyn EncodeV2,
34        mut input: Option<&mut PartialBuffer<&[u8]>>,
35    ) -> ControlFlow<Result<()>> {
36        loop {
37            self.state = match self.state {
38                State::Encoding(mut read) => match input.as_mut() {
39                    None => {
40                        if read == 0 {
41                            if output.written().is_empty() {
42                                // Poll for more data
43                                break;
44                            } else {
45                                return ControlFlow::Break(Ok(()));
46                            }
47                        } else {
48                            State::Flushing
49                        }
50                    }
51                    Some(input) => {
52                        if input.unwritten().is_empty() {
53                            State::Finishing
54                        } else {
55                            if let Err(err) = encoder.encode(input, output) {
56                                return ControlFlow::Break(Err(err));
57                            }
58
59                            read += input.written().len();
60
61                            // Poll for more data
62                            break;
63                        }
64                    }
65                },
66
67                State::Flushing => match encoder.flush(output) {
68                    Ok(true) => {
69                        self.state = State::Encoding(0);
70
71                        // Poll for more data
72                        break;
73                    }
74                    Ok(false) => State::Flushing,
75                    Err(err) => return ControlFlow::Break(Err(err)),
76                },
77
78                State::Finishing => match encoder.finish(output) {
79                    Ok(true) => State::Done,
80                    Ok(false) => State::Finishing,
81                    Err(err) => return ControlFlow::Break(Err(err)),
82                },
83
84                State::Done => return ControlFlow::Break(Ok(())),
85            };
86
87            if output.has_no_spare_space() {
88                return ControlFlow::Break(Ok(()));
89            }
90        }
91
92        if output.has_no_spare_space() {
93            ControlFlow::Break(Ok(()))
94        } else {
95            ControlFlow::Continue(())
96        }
97    }
98}
99
100macro_rules! impl_encoder {
101    () => {
102        use crate::generic::bufread::Encoder as GenericEncoder;
103
104        use std::ops::ControlFlow;
105
106        use futures_core::ready;
107        use pin_project_lite::pin_project;
108
109        pin_project! {
110            #[derive(Debug)]
111            pub struct Encoder<R, E> {
112                #[pin]
113                reader: R,
114                encoder: E,
115                inner: GenericEncoder,
116            }
117        }
118
119        impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
120            pub fn new(reader: R, encoder: E) -> Self {
121                Self {
122                    reader,
123                    encoder,
124                    inner: Default::default(),
125                }
126            }
127
128            pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
129                Self::new(reader, encoder)
130            }
131        }
132
133        impl<R, E> Encoder<R, E> {
134            pub fn get_ref(&self) -> &R {
135                &self.reader
136            }
137
138            pub fn get_mut(&mut self) -> &mut R {
139                &mut self.reader
140            }
141
142            pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
143                self.project().reader
144            }
145
146            pub(crate) fn get_encoder_ref(&self) -> &E {
147                &self.encoder
148            }
149
150            pub fn into_inner(self) -> R {
151                self.reader
152            }
153        }
154
155        fn do_poll_read(
156            inner: &mut GenericEncoder,
157            encoder: &mut dyn EncodeV2,
158            mut reader: Pin<&mut dyn AsyncBufRead>,
159            cx: &mut Context<'_>,
160            output: &mut WriteBuffer<'_>,
161        ) -> Poll<Result<()>> {
162            if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
163                return Poll::Ready(res);
164            }
165
166            loop {
167                let mut input = match reader.as_mut().poll_fill_buf(cx) {
168                    Poll::Pending => None,
169                    Poll::Ready(res) => Some(PartialBuffer::new(res?)),
170                };
171
172                let control_flow = inner.do_poll_read(output, encoder, input.as_mut());
173
174                let is_pending = input.is_none();
175                if let Some(input) = input {
176                    let len = input.written().len();
177                    reader.as_mut().consume(len);
178                }
179
180                if let ControlFlow::Break(res) = control_flow {
181                    break Poll::Ready(res);
182                }
183
184                if is_pending {
185                    return Poll::Pending;
186                }
187            }
188        }
189
190        impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
191            fn do_poll_read(
192                self: Pin<&mut Self>,
193                cx: &mut Context<'_>,
194                output: &mut WriteBuffer<'_>,
195            ) -> Poll<Result<()>> {
196                let this = self.project();
197
198                do_poll_read(this.inner, this.encoder, this.reader, cx, output)
199            }
200        }
201    };
202}
203pub(crate) use impl_encoder;