async_compression/generic/bufread/
encoder.rs1use crate::{
2 codecs::EncodeV2,
3 core::util::{PartialBuffer, WriteBuffer},
4};
5use std::{io::Result, ops::ControlFlow};
6
7#[derive(Debug)]
8enum State {
9 Encoding(usize),
10 Flushing,
11 Finishing,
12 Done,
13}
14
15#[derive(Debug)]
16pub struct Encoder {
17 state: State,
18}
19
20impl Default for Encoder {
21 fn default() -> Self {
22 Self {
23 state: State::Encoding(0),
24 }
25 }
26}
27
28impl Encoder {
29 pub fn do_poll_read(
31 &mut self,
32 output: &mut WriteBuffer<'_>,
33 encoder: &mut dyn EncodeV2,
34 mut input: Option<&mut PartialBuffer<&[u8]>>,
35 ) -> ControlFlow<Result<()>> {
36 loop {
37 self.state = match self.state {
38 State::Encoding(mut read) => match input.as_mut() {
39 None => {
40 if read == 0 {
41 if output.written().is_empty() {
42 break;
44 } else {
45 return ControlFlow::Break(Ok(()));
46 }
47 } else {
48 State::Flushing
49 }
50 }
51 Some(input) => {
52 if input.unwritten().is_empty() {
53 State::Finishing
54 } else {
55 if let Err(err) = encoder.encode(input, output) {
56 return ControlFlow::Break(Err(err));
57 }
58
59 read += input.written().len();
60
61 break;
63 }
64 }
65 },
66
67 State::Flushing => match encoder.flush(output) {
68 Ok(true) => {
69 self.state = State::Encoding(0);
70
71 break;
73 }
74 Ok(false) => State::Flushing,
75 Err(err) => return ControlFlow::Break(Err(err)),
76 },
77
78 State::Finishing => match encoder.finish(output) {
79 Ok(true) => State::Done,
80 Ok(false) => State::Finishing,
81 Err(err) => return ControlFlow::Break(Err(err)),
82 },
83
84 State::Done => return ControlFlow::Break(Ok(())),
85 };
86
87 if output.has_no_spare_space() {
88 return ControlFlow::Break(Ok(()));
89 }
90 }
91
92 if output.has_no_spare_space() {
93 ControlFlow::Break(Ok(()))
94 } else {
95 ControlFlow::Continue(())
96 }
97 }
98}
99
100macro_rules! impl_encoder {
101 () => {
102 use crate::generic::bufread::Encoder as GenericEncoder;
103
104 use std::ops::ControlFlow;
105
106 use futures_core::ready;
107 use pin_project_lite::pin_project;
108
109 pin_project! {
110 #[derive(Debug)]
111 pub struct Encoder<R, E> {
112 #[pin]
113 reader: R,
114 encoder: E,
115 inner: GenericEncoder,
116 }
117 }
118
119 impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
120 pub fn new(reader: R, encoder: E) -> Self {
121 Self {
122 reader,
123 encoder,
124 inner: Default::default(),
125 }
126 }
127
128 pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
129 Self::new(reader, encoder)
130 }
131 }
132
133 impl<R, E> Encoder<R, E> {
134 pub fn get_ref(&self) -> &R {
135 &self.reader
136 }
137
138 pub fn get_mut(&mut self) -> &mut R {
139 &mut self.reader
140 }
141
142 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
143 self.project().reader
144 }
145
146 pub(crate) fn get_encoder_ref(&self) -> &E {
147 &self.encoder
148 }
149
150 pub fn into_inner(self) -> R {
151 self.reader
152 }
153 }
154
155 fn do_poll_read(
156 inner: &mut GenericEncoder,
157 encoder: &mut dyn EncodeV2,
158 mut reader: Pin<&mut dyn AsyncBufRead>,
159 cx: &mut Context<'_>,
160 output: &mut WriteBuffer<'_>,
161 ) -> Poll<Result<()>> {
162 if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
163 return Poll::Ready(res);
164 }
165
166 loop {
167 let mut input = match reader.as_mut().poll_fill_buf(cx) {
168 Poll::Pending => None,
169 Poll::Ready(res) => Some(PartialBuffer::new(res?)),
170 };
171
172 let control_flow = inner.do_poll_read(output, encoder, input.as_mut());
173
174 let is_pending = input.is_none();
175 if let Some(input) = input {
176 let len = input.written().len();
177 reader.as_mut().consume(len);
178 }
179
180 if let ControlFlow::Break(res) = control_flow {
181 break Poll::Ready(res);
182 }
183
184 if is_pending {
185 return Poll::Pending;
186 }
187 }
188 }
189
190 impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
191 fn do_poll_read(
192 self: Pin<&mut Self>,
193 cx: &mut Context<'_>,
194 output: &mut WriteBuffer<'_>,
195 ) -> Poll<Result<()>> {
196 let this = self.project();
197
198 do_poll_read(this.inner, this.encoder, this.reader, cx, output)
199 }
200 }
201 };
202}
203pub(crate) use impl_encoder;