async_compression/generic/bufread/
encoder.rs1use crate::{
2 codecs::EncodeV2,
3 core::util::{PartialBuffer, WriteBuffer},
4};
5use std::{io::Result, ops::ControlFlow};
6
7#[derive(Debug)]
8enum State {
9 Encoding(usize),
10 Flushing,
11 Finishing,
12 Done,
13}
14
15#[derive(Debug)]
16pub struct Encoder {
17 state: State,
18}
19
20impl Default for Encoder {
21 fn default() -> Self {
22 Self {
23 state: State::Encoding(0),
24 }
25 }
26}
27
28impl Encoder {
29 pub fn do_poll_read(
31 &mut self,
32 output: &mut WriteBuffer<'_>,
33 encoder: &mut dyn EncodeV2,
34 mut input: Option<&mut PartialBuffer<&[u8]>>,
35 ) -> ControlFlow<Result<()>> {
36 loop {
37 self.state = match &mut self.state {
38 State::Encoding(read) => match input.as_mut() {
39 None => {
40 if *read == 0 {
41 if output.written().is_empty() {
42 break;
44 } else {
45 return ControlFlow::Break(Ok(()));
46 }
47 } else {
48 State::Flushing
49 }
50 }
51 Some(input) => {
52 if input.unwritten().is_empty() {
53 State::Finishing
54 } else {
55 if let Err(err) = encoder.encode(input, output) {
56 return ControlFlow::Break(Err(err));
57 }
58
59 *read += input.written().len();
60
61 break;
63 }
64 }
65 },
66
67 State::Flushing => match encoder.flush(output) {
68 Ok(true) => {
69 self.state = State::Encoding(0);
70
71 break;
73 }
74 Ok(false) => State::Flushing,
75 Err(err) => return ControlFlow::Break(Err(err)),
76 },
77
78 State::Finishing => match encoder.finish(output) {
79 Ok(true) => State::Done,
80 Ok(false) => State::Finishing,
81 Err(err) => return ControlFlow::Break(Err(err)),
82 },
83
84 State::Done => return ControlFlow::Break(Ok(())),
85 };
86
87 if output.has_no_spare_space() {
88 return ControlFlow::Break(Ok(()));
89 }
90 }
91
92 if output.has_no_spare_space() {
93 ControlFlow::Break(Ok(()))
94 } else {
95 ControlFlow::Continue(())
96 }
97 }
98}
99
100macro_rules! impl_encoder {
101 () => {
102 use crate::generic::bufread::Encoder as GenericEncoder;
103
104 use std::{ops::ControlFlow, task::ready};
105
106 use pin_project_lite::pin_project;
107
108 pin_project! {
109 #[derive(Debug)]
110 pub struct Encoder<R, E> {
111 #[pin]
112 reader: R,
113 encoder: E,
114 inner: GenericEncoder,
115 }
116 }
117
118 impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
119 pub fn new(reader: R, encoder: E) -> Self {
120 Self {
121 reader,
122 encoder,
123 inner: Default::default(),
124 }
125 }
126
127 pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
128 Self::new(reader, encoder)
129 }
130 }
131
132 impl<R, E> Encoder<R, E> {
133 pub fn get_ref(&self) -> &R {
134 &self.reader
135 }
136
137 pub fn get_mut(&mut self) -> &mut R {
138 &mut self.reader
139 }
140
141 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
142 self.project().reader
143 }
144
145 pub(crate) fn get_encoder_ref(&self) -> &E {
146 &self.encoder
147 }
148
149 pub fn into_inner(self) -> R {
150 self.reader
151 }
152 }
153
154 fn do_poll_read(
155 inner: &mut GenericEncoder,
156 encoder: &mut dyn EncodeV2,
157 mut reader: Pin<&mut dyn AsyncBufRead>,
158 cx: &mut Context<'_>,
159 output: &mut WriteBuffer<'_>,
160 ) -> Poll<Result<()>> {
161 if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
162 return Poll::Ready(res);
163 }
164
165 loop {
166 let mut input = match reader.as_mut().poll_fill_buf(cx) {
167 Poll::Pending => None,
168 Poll::Ready(res) => Some(PartialBuffer::new(res?)),
169 };
170
171 let control_flow = inner.do_poll_read(output, encoder, input.as_mut());
172
173 let is_pending = input.is_none();
174 if let Some(input) = input {
175 let len = input.written().len();
176 reader.as_mut().consume(len);
177 }
178
179 if let ControlFlow::Break(res) = control_flow {
180 break Poll::Ready(res);
181 }
182
183 if is_pending {
184 if output.written().is_empty() {
185 return Poll::Pending;
186 } else {
187 return Poll::Ready(Ok(()));
188 }
189 }
190 }
191 }
192
193 impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
194 fn do_poll_read(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 output: &mut WriteBuffer<'_>,
198 ) -> Poll<Result<()>> {
199 let this = self.project();
200
201 do_poll_read(this.inner, this.encoder, this.reader, cx, output)
202 }
203 }
204 };
205}
206pub(crate) use impl_encoder;