async_compression/generic/write/
encoder.rs

1use crate::{
2    codecs::EncodeV2,
3    core::util::{PartialBuffer, WriteBuffer},
4    generic::write::AsyncBufWrite,
5};
6use futures_core::ready;
7use std::{
8    io,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13#[derive(Debug)]
14enum State {
15    Encoding,
16    Finishing,
17    Done,
18}
19
20#[derive(Debug)]
21pub struct Encoder {
22    state: State,
23}
24
25impl Default for Encoder {
26    fn default() -> Self {
27        Self {
28            state: State::Encoding,
29        }
30    }
31}
32
33impl Encoder {
34    fn do_poll_write(
35        &mut self,
36        cx: &mut Context<'_>,
37        input: &mut PartialBuffer<&[u8]>,
38        mut writer: Pin<&mut dyn AsyncBufWrite>,
39        encoder: &mut dyn EncodeV2,
40    ) -> Poll<io::Result<()>> {
41        loop {
42            let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
43            let output = &mut output.write_buffer;
44
45            self.state = match self.state {
46                State::Encoding => {
47                    encoder.encode(input, output)?;
48                    State::Encoding
49                }
50
51                State::Finishing | State::Done => {
52                    break Poll::Ready(Err(io::Error::other("Write after close")))
53                }
54            };
55
56            if input.unwritten().is_empty() {
57                break Poll::Ready(Ok(()));
58            }
59        }
60    }
61
62    pub fn poll_write(
63        &mut self,
64        cx: &mut Context<'_>,
65        buf: &[u8],
66        writer: Pin<&mut dyn AsyncBufWrite>,
67        encoder: &mut dyn EncodeV2,
68    ) -> Poll<io::Result<usize>> {
69        if buf.is_empty() {
70            return Poll::Ready(Ok(0));
71        }
72
73        let mut input = PartialBuffer::new(buf);
74
75        match self.do_poll_write(cx, &mut input, writer, encoder)? {
76            Poll::Pending if input.written().is_empty() => Poll::Pending,
77            _ => Poll::Ready(Ok(input.written().len())),
78        }
79    }
80
81    pub fn do_poll_flush(
82        &mut self,
83        cx: &mut Context<'_>,
84        mut writer: Pin<&mut dyn AsyncBufWrite>,
85        encoder: &mut dyn EncodeV2,
86    ) -> Poll<io::Result<()>> {
87        loop {
88            let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
89            let output = &mut output.write_buffer;
90
91            let done = match self.state {
92                State::Encoding => encoder.flush(output)?,
93
94                State::Finishing | State::Done => {
95                    break Poll::Ready(Err(io::Error::other("Flush after close")))
96                }
97            };
98
99            if done {
100                break Poll::Ready(Ok(()));
101            }
102        }
103    }
104
105    pub fn do_poll_close(
106        &mut self,
107        cx: &mut Context<'_>,
108        mut writer: Pin<&mut dyn AsyncBufWrite>,
109        encoder: &mut dyn EncodeV2,
110    ) -> Poll<io::Result<()>> {
111        loop {
112            let mut output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?;
113            let output = &mut output.write_buffer;
114
115            self.state = match self.state {
116                State::Encoding | State::Finishing => {
117                    if encoder.finish(output)? {
118                        State::Done
119                    } else {
120                        State::Finishing
121                    }
122                }
123
124                State::Done => State::Done,
125            };
126
127            if let State::Done = self.state {
128                break Poll::Ready(Ok(()));
129            }
130        }
131    }
132}
133
134macro_rules! impl_encoder {
135    ($poll_close: tt) => {
136        use crate::{codecs::EncodeV2, generic::write::Encoder as GenericEncoder};
137        use futures_core::ready;
138        use pin_project_lite::pin_project;
139
140        pin_project! {
141            #[derive(Debug)]
142            pub struct Encoder<W, E> {
143                #[pin]
144                writer: BufWriter<W>,
145                encoder: E,
146                inner: GenericEncoder,
147            }
148        }
149
150        impl<W, E> Encoder<W, E> {
151            pub fn get_ref(&self) -> &W {
152                self.writer.get_ref()
153            }
154
155            pub fn get_mut(&mut self) -> &mut W {
156                self.writer.get_mut()
157            }
158
159            pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
160                self.project().writer.get_pin_mut()
161            }
162
163            pub(crate) fn get_encoder_ref(&self) -> &E {
164                &self.encoder
165            }
166
167            pub fn into_inner(self) -> W {
168                self.writer.into_inner()
169            }
170
171            pub fn new(writer: W, encoder: E) -> Self {
172                Self {
173                    writer: BufWriter::new(writer),
174                    encoder,
175                    inner: Default::default(),
176                }
177            }
178
179            pub fn with_capacity(writer: W, encoder: E, cap: usize) -> Self {
180                Self {
181                    writer: BufWriter::with_capacity(cap, writer),
182                    encoder,
183                    inner: Default::default(),
184                }
185            }
186        }
187
188        impl<W: AsyncWrite, E: EncodeV2> AsyncWrite for Encoder<W, E> {
189            fn poll_write(
190                self: Pin<&mut Self>,
191                cx: &mut Context<'_>,
192                buf: &[u8],
193            ) -> Poll<io::Result<usize>> {
194                let this = self.project();
195                this.inner.poll_write(cx, buf, this.writer, this.encoder)
196            }
197
198            fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
199                let mut this = self.project();
200
201                ready!(this
202                    .inner
203                    .do_poll_flush(cx, this.writer.as_mut(), this.encoder))?;
204                this.writer.poll_flush(cx)
205            }
206
207            fn $poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
208                let mut this = self.project();
209
210                ready!(this
211                    .inner
212                    .do_poll_close(cx, this.writer.as_mut(), this.encoder))?;
213                this.writer.$poll_close(cx)
214            }
215        }
216
217        impl<W: AsyncBufRead, E> AsyncBufRead for Encoder<W, E> {
218            fn poll_fill_buf(
219                self: Pin<&mut Self>,
220                cx: &mut Context<'_>,
221            ) -> Poll<io::Result<&[u8]>> {
222                self.get_pin_mut().poll_fill_buf(cx)
223            }
224
225            fn consume(self: Pin<&mut Self>, amt: usize) {
226                self.get_pin_mut().consume(amt)
227            }
228        }
229    };
230}
231pub(crate) use impl_encoder;