async_compression/generic/bufread/
encoder.rs

1use crate::{
2    codecs::EncodeV2,
3    core::util::{PartialBuffer, WriteBuffer},
4};
5use std::{io::Result, ops::ControlFlow};
6
7#[derive(Debug)]
8enum State {
9    Encoding(usize),
10    Flushing,
11    Finishing,
12    Done,
13}
14
15#[derive(Debug)]
16pub struct Encoder {
17    state: State,
18}
19
20impl Default for Encoder {
21    fn default() -> Self {
22        Self {
23            state: State::Encoding(0),
24        }
25    }
26}
27
28impl Encoder {
29    /// `input` - should be `None` if `Poll::Pending`.
30    pub fn do_poll_read(
31        &mut self,
32        output: &mut WriteBuffer<'_>,
33        encoder: &mut dyn EncodeV2,
34        mut input: Option<&mut PartialBuffer<&[u8]>>,
35    ) -> ControlFlow<Result<()>> {
36        loop {
37            self.state = match &mut self.state {
38                State::Encoding(read) => match input.as_mut() {
39                    None => {
40                        if *read == 0 {
41                            if output.written().is_empty() {
42                                // Poll for more data
43                                break;
44                            } else {
45                                return ControlFlow::Break(Ok(()));
46                            }
47                        } else {
48                            State::Flushing
49                        }
50                    }
51                    Some(input) => {
52                        if input.unwritten().is_empty() {
53                            State::Finishing
54                        } else {
55                            if let Err(err) = encoder.encode(input, output) {
56                                return ControlFlow::Break(Err(err));
57                            }
58
59                            *read += input.written().len();
60
61                            // Poll for more data
62                            break;
63                        }
64                    }
65                },
66
67                State::Flushing => match encoder.flush(output) {
68                    Ok(true) => {
69                        self.state = State::Encoding(0);
70
71                        // Poll for more data
72                        break;
73                    }
74                    Ok(false) => State::Flushing,
75                    Err(err) => return ControlFlow::Break(Err(err)),
76                },
77
78                State::Finishing => match encoder.finish(output) {
79                    Ok(true) => State::Done,
80                    Ok(false) => State::Finishing,
81                    Err(err) => return ControlFlow::Break(Err(err)),
82                },
83
84                State::Done => return ControlFlow::Break(Ok(())),
85            };
86
87            if output.has_no_spare_space() {
88                return ControlFlow::Break(Ok(()));
89            }
90        }
91
92        if output.has_no_spare_space() {
93            ControlFlow::Break(Ok(()))
94        } else {
95            ControlFlow::Continue(())
96        }
97    }
98}
99
100macro_rules! impl_encoder {
101    () => {
102        use crate::generic::bufread::Encoder as GenericEncoder;
103
104        use std::{ops::ControlFlow, task::ready};
105
106        use pin_project_lite::pin_project;
107
108        pin_project! {
109            #[derive(Debug)]
110            pub struct Encoder<R, E> {
111                #[pin]
112                reader: R,
113                encoder: E,
114                inner: GenericEncoder,
115            }
116        }
117
118        impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
119            pub fn new(reader: R, encoder: E) -> Self {
120                Self {
121                    reader,
122                    encoder,
123                    inner: Default::default(),
124                }
125            }
126
127            pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
128                Self::new(reader, encoder)
129            }
130        }
131
132        impl<R, E> Encoder<R, E> {
133            pub fn get_ref(&self) -> &R {
134                &self.reader
135            }
136
137            pub fn get_mut(&mut self) -> &mut R {
138                &mut self.reader
139            }
140
141            pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
142                self.project().reader
143            }
144
145            pub(crate) fn get_encoder_ref(&self) -> &E {
146                &self.encoder
147            }
148
149            pub fn into_inner(self) -> R {
150                self.reader
151            }
152        }
153
154        fn do_poll_read(
155            inner: &mut GenericEncoder,
156            encoder: &mut dyn EncodeV2,
157            mut reader: Pin<&mut dyn AsyncBufRead>,
158            cx: &mut Context<'_>,
159            output: &mut WriteBuffer<'_>,
160        ) -> Poll<Result<()>> {
161            if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
162                return Poll::Ready(res);
163            }
164
165            loop {
166                let mut input = match reader.as_mut().poll_fill_buf(cx) {
167                    Poll::Pending => None,
168                    Poll::Ready(res) => Some(PartialBuffer::new(res?)),
169                };
170
171                let control_flow = inner.do_poll_read(output, encoder, input.as_mut());
172
173                let is_pending = input.is_none();
174                if let Some(input) = input {
175                    let len = input.written().len();
176                    reader.as_mut().consume(len);
177                }
178
179                if let ControlFlow::Break(res) = control_flow {
180                    break Poll::Ready(res);
181                }
182
183                if is_pending {
184                    if output.written().is_empty() {
185                        return Poll::Pending;
186                    } else {
187                        return Poll::Ready(Ok(()));
188                    }
189                }
190            }
191        }
192
193        impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
194            fn do_poll_read(
195                self: Pin<&mut Self>,
196                cx: &mut Context<'_>,
197                output: &mut WriteBuffer<'_>,
198            ) -> Poll<Result<()>> {
199                let this = self.project();
200
201                do_poll_read(this.inner, this.encoder, this.reader, cx, output)
202            }
203        }
204    };
205}
206pub(crate) use impl_encoder;