async_compression/generic/bufread/
decoder.rs1use crate::{
2 codecs::DecodeV2,
3 core::util::{PartialBuffer, WriteBuffer},
4};
5
6use std::{io::Result, ops::ControlFlow};
7
8#[derive(Debug)]
9enum State {
10 Decoding,
11 Flushing,
12 Done,
13 Next,
14}
15
16#[derive(Debug)]
17pub struct Decoder {
18 state: State,
19 multiple_members: bool,
20}
21
22impl Default for Decoder {
23 fn default() -> Self {
24 Self {
25 state: State::Decoding,
26 multiple_members: false,
27 }
28 }
29}
30
31impl Decoder {
32 pub fn multiple_members(&mut self, enabled: bool) {
33 self.multiple_members = enabled;
34 }
35
36 pub fn do_poll_read(
37 &mut self,
38 output: &mut WriteBuffer<'_>,
39 decoder: &mut dyn DecodeV2,
40 input: &mut PartialBuffer<&[u8]>,
41 mut first: bool,
42 ) -> ControlFlow<Result<()>> {
43 loop {
44 self.state = match self.state {
45 State::Decoding => {
46 if input.unwritten().is_empty() && !first {
47 self.multiple_members = false;
50
51 State::Flushing
52 } else {
53 match decoder.decode(input, output) {
54 Ok(true) => State::Flushing,
55 Err(err) if !first => return ControlFlow::Break(Err(err)),
58 _ => break,
60 }
61 }
62 }
63
64 State::Flushing => {
65 match decoder.finish(output) {
66 Ok(true) => {
67 if self.multiple_members {
68 if let Err(err) = decoder.reinit() {
69 return ControlFlow::Break(Err(err));
70 }
71
72 first = true;
75 State::Next
76 } else {
77 State::Done
78 }
79 }
80 Ok(false) => State::Flushing,
81 Err(err) => return ControlFlow::Break(Err(err)),
82 }
83 }
84
85 State::Done => return ControlFlow::Break(Ok(())),
86
87 State::Next => {
88 if input.unwritten().is_empty() {
89 if first {
90 break;
92 }
93 State::Done
94 } else {
95 State::Decoding
96 }
97 }
98 };
99
100 if output.has_no_spare_space() {
101 return ControlFlow::Break(Ok(()));
102 }
103 }
104
105 if output.has_no_spare_space() {
106 ControlFlow::Break(Ok(()))
107 } else {
108 ControlFlow::Continue(())
109 }
110 }
111}
112
113macro_rules! impl_decoder {
114 () => {
115 use crate::generic::bufread::Decoder as GenericDecoder;
116
117 use std::{ops::ControlFlow, task::ready};
118
119 use pin_project_lite::pin_project;
120
121 pin_project! {
122 #[derive(Debug)]
123 pub struct Decoder<R, D> {
124 #[pin]
125 reader: R,
126 decoder: D,
127 inner: GenericDecoder,
128 }
129 }
130
131 impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
132 pub fn new(reader: R, decoder: D) -> Self {
133 Self {
134 reader,
135 decoder,
136 inner: GenericDecoder::default(),
137 }
138 }
139 }
140
141 impl<R, D> Decoder<R, D> {
142 pub fn get_ref(&self) -> &R {
143 &self.reader
144 }
145
146 pub fn get_mut(&mut self) -> &mut R {
147 &mut self.reader
148 }
149
150 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
151 self.project().reader
152 }
153
154 pub fn into_inner(self) -> R {
155 self.reader
156 }
157
158 pub fn multiple_members(&mut self, enabled: bool) {
159 self.inner.multiple_members(enabled);
160 }
161 }
162
163 fn do_poll_read(
164 inner: &mut GenericDecoder,
165 decoder: &mut dyn DecodeV2,
166 mut reader: Pin<&mut dyn AsyncBufRead>,
167 cx: &mut Context<'_>,
168 output: &mut WriteBuffer<'_>,
169 ) -> Poll<Result<()>> {
170 if let ControlFlow::Break(res) =
171 inner.do_poll_read(output, decoder, &mut PartialBuffer::new(&[][..]), true)
172 {
173 return Poll::Ready(res);
174 }
175
176 loop {
177 let mut input = PartialBuffer::new(match reader.as_mut().poll_fill_buf(cx)? {
178 Poll::Ready(input) => input,
179 Poll::Pending if output.written().is_empty() => return Poll::Pending,
180 _ => return Poll::Ready(Ok(())),
181 });
182
183 let control_flow = inner.do_poll_read(output, decoder, &mut input, false);
184
185 let bytes_read = input.written().len();
186 reader.as_mut().consume(bytes_read);
187
188 if let ControlFlow::Break(res) = control_flow {
189 break Poll::Ready(res);
190 }
191 }
192 }
193
194 impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
195 fn do_poll_read(
196 self: Pin<&mut Self>,
197 cx: &mut Context<'_>,
198 output: &mut WriteBuffer<'_>,
199 ) -> Poll<Result<()>> {
200 let this = self.project();
201
202 do_poll_read(this.inner, this.decoder, this.reader, cx, output)
203 }
204 }
205 };
206}
207pub(crate) use impl_decoder;