tokio_util/codec/
framed_read.rs1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use super::FramedParts;
15
16pin_project! {
17    pub struct FramedRead<T, D> {
32        #[pin]
33        inner: FramedImpl<T, D, ReadFrame>,
34    }
35}
36
37impl<T, D> FramedRead<T, D> {
40    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
42        FramedRead {
43            inner: FramedImpl {
44                inner,
45                codec: decoder,
46                state: Default::default(),
47            },
48        }
49    }
50
51    pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
54        FramedRead {
55            inner: FramedImpl {
56                inner,
57                codec: decoder,
58                state: ReadFrame {
59                    eof: false,
60                    is_readable: false,
61                    buffer: BytesMut::with_capacity(capacity),
62                    has_errored: false,
63                },
64            },
65        }
66    }
67
68    pub fn get_ref(&self) -> &T {
75        &self.inner.inner
76    }
77
78    pub fn get_mut(&mut self) -> &mut T {
85        &mut self.inner.inner
86    }
87
88    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95        self.project().inner.project().inner
96    }
97
98    pub fn into_inner(self) -> T {
104        self.inner.inner
105    }
106
107    pub fn decoder(&self) -> &D {
109        &self.inner.codec
110    }
111
112    pub fn decoder_mut(&mut self) -> &mut D {
114        &mut self.inner.codec
115    }
116
117    pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
120    where
121        F: FnOnce(D) -> C,
122    {
123        let FramedImpl {
125            inner,
126            state,
127            codec,
128        } = self.inner;
129        FramedRead {
130            inner: FramedImpl {
131                inner,
132                state,
133                codec: map(codec),
134            },
135        }
136    }
137
138    pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
140        self.project().inner.project().codec
141    }
142
143    pub fn read_buffer(&self) -> &BytesMut {
145        &self.inner.state.buffer
146    }
147
148    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
150        &mut self.inner.state.buffer
151    }
152
153    pub fn into_parts(self) -> FramedParts<T, D> {
156        FramedParts {
157            io: self.inner.inner,
158            codec: self.inner.codec,
159            read_buf: self.inner.state.buffer,
160            write_buf: BytesMut::new(),
161            _priv: (),
162        }
163    }
164}
165
166impl<T, D> Stream for FramedRead<T, D>
168where
169    T: AsyncRead,
170    D: Decoder,
171{
172    type Item = Result<D::Item, D::Error>;
173
174    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175        self.project().inner.poll_next(cx)
176    }
177}
178
179impl<T, I, D> Sink<I> for FramedRead<T, D>
181where
182    T: Sink<I>,
183{
184    type Error = T::Error;
185
186    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187        self.project().inner.project().inner.poll_ready(cx)
188    }
189
190    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
191        self.project().inner.project().inner.start_send(item)
192    }
193
194    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195        self.project().inner.project().inner.poll_flush(cx)
196    }
197
198    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199        self.project().inner.project().inner.poll_close(cx)
200    }
201}
202
203impl<T, D> fmt::Debug for FramedRead<T, D>
204where
205    T: fmt::Debug,
206    D: fmt::Debug,
207{
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("FramedRead")
210            .field("inner", &self.get_ref())
211            .field("decoder", &self.decoder())
212            .field("eof", &self.inner.state.eof)
213            .field("is_readable", &self.inner.state.is_readable)
214            .field("buffer", &self.read_buffer())
215            .finish()
216    }
217}