async_compression/generic/write/
encoder.rs1use crate::{
2 codecs::EncodeV2,
3 core::util::{PartialBuffer, WriteBuffer},
4 generic::write::AsyncBufWrite,
5};
6use futures_core::ready;
7use std::{
8 io,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13#[derive(Debug)]
14enum State {
15 Encoding,
16 Finishing,
17 Done,
18}
19
20#[derive(Debug)]
21pub struct Encoder {
22 state: State,
23}
24
25impl Default for Encoder {
26 fn default() -> Self {
27 Self {
28 state: State::Encoding,
29 }
30 }
31}
32
33impl Encoder {
34 fn do_poll_write(
35 &mut self,
36 cx: &mut Context<'_>,
37 input: &mut PartialBuffer<&[u8]>,
38 mut writer: Pin<&mut dyn AsyncBufWrite>,
39 encoder: &mut dyn EncodeV2,
40 ) -> Poll<io::Result<()>> {
41 loop {
42 let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
43 let output = &mut output.write_buffer;
44
45 self.state = match self.state {
46 State::Encoding => {
47 encoder.encode(input, output)?;
48 State::Encoding
49 }
50
51 State::Finishing | State::Done => {
52 break Poll::Ready(Err(io::Error::other("Write after close")))
53 }
54 };
55
56 if input.unwritten().is_empty() {
57 break Poll::Ready(Ok(()));
58 }
59 }
60 }
61
62 pub fn poll_write(
63 &mut self,
64 cx: &mut Context<'_>,
65 buf: &[u8],
66 writer: Pin<&mut dyn AsyncBufWrite>,
67 encoder: &mut dyn EncodeV2,
68 ) -> Poll<io::Result<usize>> {
69 if buf.is_empty() {
70 return Poll::Ready(Ok(0));
71 }
72
73 let mut input = PartialBuffer::new(buf);
74
75 match self.do_poll_write(cx, &mut input, writer, encoder)? {
76 Poll::Pending if input.written().is_empty() => Poll::Pending,
77 _ => Poll::Ready(Ok(input.written().len())),
78 }
79 }
80
81 pub fn do_poll_flush(
82 &mut self,
83 cx: &mut Context<'_>,
84 mut writer: Pin<&mut dyn AsyncBufWrite>,
85 encoder: &mut dyn EncodeV2,
86 ) -> Poll<io::Result<()>> {
87 loop {
88 let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
89 let output = &mut output.write_buffer;
90
91 let done = match self.state {
92 State::Encoding => encoder.flush(output)?,
93
94 State::Finishing | State::Done => {
95 break Poll::Ready(Err(io::Error::other("Flush after close")))
96 }
97 };
98
99 if done {
100 break Poll::Ready(Ok(()));
101 }
102 }
103 }
104
105 pub fn do_poll_close(
106 &mut self,
107 cx: &mut Context<'_>,
108 mut writer: Pin<&mut dyn AsyncBufWrite>,
109 encoder: &mut dyn EncodeV2,
110 ) -> Poll<io::Result<()>> {
111 loop {
112 let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
113 let output = &mut output.write_buffer;
114
115 self.state = match self.state {
116 State::Encoding | State::Finishing => {
117 if encoder.finish(output)? {
118 State::Done
119 } else {
120 State::Finishing
121 }
122 }
123
124 State::Done => State::Done,
125 };
126
127 if let State::Done = self.state {
128 break Poll::Ready(Ok(()));
129 }
130 }
131 }
132}
133
134macro_rules! impl_encoder {
135 ($poll_close: tt) => {
136 use crate::{codecs::EncodeV2, generic::write::Encoder as GenericEncoder};
137 use futures_core::ready;
138 use pin_project_lite::pin_project;
139
140 pin_project! {
141 #[derive(Debug)]
142 pub struct Encoder<W, E> {
143 #[pin]
144 writer: BufWriter<W>,
145 encoder: E,
146 inner: GenericEncoder,
147 }
148 }
149
150 impl<W, E> Encoder<W, E> {
151 pub fn get_ref(&self) -> &W {
152 self.writer.get_ref()
153 }
154
155 pub fn get_mut(&mut self) -> &mut W {
156 self.writer.get_mut()
157 }
158
159 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
160 self.project().writer.get_pin_mut()
161 }
162
163 pub(crate) fn get_encoder_ref(&self) -> &E {
164 &self.encoder
165 }
166
167 pub fn into_inner(self) -> W {
168 self.writer.into_inner()
169 }
170
171 pub fn new(writer: W, encoder: E) -> Self {
172 Self {
173 writer: BufWriter::new(writer),
174 encoder,
175 inner: Default::default(),
176 }
177 }
178
179 pub fn with_capacity(writer: W, encoder: E, cap: usize) -> Self {
180 Self {
181 writer: BufWriter::with_capacity(cap, writer),
182 encoder,
183 inner: Default::default(),
184 }
185 }
186 }
187
188 impl<W: AsyncWrite, E: EncodeV2> AsyncWrite for Encoder<W, E> {
189 fn poll_write(
190 self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 buf: &[u8],
193 ) -> Poll<io::Result<usize>> {
194 let this = self.project();
195 this.inner.poll_write(cx, buf, this.writer, this.encoder)
196 }
197
198 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
199 let mut this = self.project();
200
201 ready!(this
202 .inner
203 .do_poll_flush(cx, this.writer.as_mut(), this.encoder))?;
204 this.writer.poll_flush(cx)
205 }
206
207 fn $poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
208 let mut this = self.project();
209
210 ready!(this
211 .inner
212 .do_poll_close(cx, this.writer.as_mut(), this.encoder))?;
213 this.writer.$poll_close(cx)
214 }
215 }
216
217 impl<W: AsyncBufRead, E> AsyncBufRead for Encoder<W, E> {
218 fn poll_fill_buf(
219 self: Pin<&mut Self>,
220 cx: &mut Context<'_>,
221 ) -> Poll<io::Result<&[u8]>> {
222 self.get_pin_mut().poll_fill_buf(cx)
223 }
224
225 fn consume(self: Pin<&mut Self>, amt: usize) {
226 self.get_pin_mut().consume(amt)
227 }
228 }
229 };
230}
231pub(crate) use impl_encoder;