tokio_util/codec/framed.rs
1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17    /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18    /// the `Encoder` and `Decoder` traits to encode and decode frames.
19    ///
20    /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21    /// by using the `new` function seen below.
22    ///
23    /// # Cancellation safety
24    ///
25    /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
26    /// `tokio::select!` statement and some other branch completes first, then it is
27    /// guaranteed that the message was not sent, but the message itself is lost.
28    /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
29    /// future only holds onto a reference to the underlying stream, so dropping it will
30    /// never lose a value.
31    ///
32    /// [`Stream`]: futures_core::Stream
33    /// [`Sink`]: futures_sink::Sink
34    /// [`AsyncRead`]: tokio::io::AsyncRead
35    /// [`Decoder::framed`]: crate::codec::Decoder::framed()
36    /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
37    /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
38    pub struct Framed<T, U> {
39        #[pin]
40        inner: FramedImpl<T, U, RWFrames>
41    }
42}
43
44impl<T, U> Framed<T, U> {
45    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
46    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
47    ///
48    /// Raw I/O objects work with byte sequences, but higher-level code usually
49    /// wants to batch these into meaningful chunks, called "frames". This
50    /// method layers framing on top of an I/O object, by using the codec
51    /// traits to handle encoding and decoding of messages frames. Note that
52    /// the incoming and outgoing frame types may be distinct.
53    ///
54    /// This function returns a *single* object that is both [`Stream`] and
55    /// [`Sink`]; grouping this into a single object is often useful for layering
56    /// things like gzip or TLS, which require both read and write access to the
57    /// underlying object.
58    ///
59    /// If you want to work more directly with the streams and sink, consider
60    /// calling [`split`] on the `Framed` returned by this method, which will
61    /// break them into separate objects, allowing them to interact more easily.
62    ///
63    /// Note that, for some byte sources, the stream can be resumed after an EOF
64    /// by reading from it, even after it has returned `None`. Repeated attempts
65    /// to do so, without new data available, continue to return `None` without
66    /// creating more (closing) frames.
67    ///
68    /// [`Stream`]: futures_core::Stream
69    /// [`Sink`]: futures_sink::Sink
70    /// [`Decode`]: crate::codec::Decoder
71    /// [`Encoder`]: crate::codec::Encoder
72    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
73    pub fn new(inner: T, codec: U) -> Framed<T, U> {
74        Framed {
75            inner: FramedImpl {
76                inner,
77                codec,
78                state: Default::default(),
79            },
80        }
81    }
82
83    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
84    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
85    /// with a specific read buffer initial capacity.
86    ///
87    /// Raw I/O objects work with byte sequences, but higher-level code usually
88    /// wants to batch these into meaningful chunks, called "frames". This
89    /// method layers framing on top of an I/O object, by using the codec
90    /// traits to handle encoding and decoding of messages frames. Note that
91    /// the incoming and outgoing frame types may be distinct.
92    ///
93    /// This function returns a *single* object that is both [`Stream`] and
94    /// [`Sink`]; grouping this into a single object is often useful for layering
95    /// things like gzip or TLS, which require both read and write access to the
96    /// underlying object.
97    ///
98    /// If you want to work more directly with the streams and sink, consider
99    /// calling [`split`] on the `Framed` returned by this method, which will
100    /// break them into separate objects, allowing them to interact more easily.
101    ///
102    /// [`Stream`]: futures_core::Stream
103    /// [`Sink`]: futures_sink::Sink
104    /// [`Decode`]: crate::codec::Decoder
105    /// [`Encoder`]: crate::codec::Encoder
106    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
107    pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
108        Framed {
109            inner: FramedImpl {
110                inner,
111                codec,
112                state: RWFrames {
113                    read: ReadFrame {
114                        eof: false,
115                        is_readable: false,
116                        buffer: BytesMut::with_capacity(capacity),
117                        has_errored: false,
118                    },
119                    write: WriteFrame {
120                        buffer: BytesMut::with_capacity(capacity),
121                        backpressure_boundary: capacity,
122                    },
123                },
124            },
125        }
126    }
127
128    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
129    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
130    ///
131    /// Raw I/O objects work with byte sequences, but higher-level code usually
132    /// wants to batch these into meaningful chunks, called "frames". This
133    /// method layers framing on top of an I/O object, by using the `Codec`
134    /// traits to handle encoding and decoding of messages frames. Note that
135    /// the incoming and outgoing frame types may be distinct.
136    ///
137    /// This function returns a *single* object that is both [`Stream`] and
138    /// [`Sink`]; grouping this into a single object is often useful for layering
139    /// things like gzip or TLS, which require both read and write access to the
140    /// underlying object.
141    ///
142    /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field
143    /// can be obtained from an existing `Framed` with the [`into_parts`] method.
144    ///
145    /// If you want to work more directly with the streams and sink, consider
146    /// calling [`split`] on the `Framed` returned by this method, which will
147    /// break them into separate objects, allowing them to interact more easily.
148    ///
149    /// [`Stream`]: futures_core::Stream
150    /// [`Sink`]: futures_sink::Sink
151    /// [`Decoder`]: crate::codec::Decoder
152    /// [`Encoder`]: crate::codec::Encoder
153    /// [`into_parts`]: crate::codec::Framed::into_parts()
154    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
155    pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
156        Framed {
157            inner: FramedImpl {
158                inner: parts.io,
159                codec: parts.codec,
160                state: RWFrames {
161                    read: parts.read_buf.into(),
162                    write: parts.write_buf.into(),
163                },
164            },
165        }
166    }
167
168    /// Returns a reference to the underlying I/O stream wrapped by
169    /// `Framed`.
170    ///
171    /// Note that care should be taken to not tamper with the underlying stream
172    /// of data coming in as it may corrupt the stream of frames otherwise
173    /// being worked with.
174    pub fn get_ref(&self) -> &T {
175        &self.inner.inner
176    }
177
178    /// Returns a mutable reference to the underlying I/O stream wrapped by
179    /// `Framed`.
180    ///
181    /// Note that care should be taken to not tamper with the underlying stream
182    /// of data coming in as it may corrupt the stream of frames otherwise
183    /// being worked with.
184    pub fn get_mut(&mut self) -> &mut T {
185        &mut self.inner.inner
186    }
187
188    /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
189    /// `Framed`.
190    ///
191    /// Note that care should be taken to not tamper with the underlying stream
192    /// of data coming in as it may corrupt the stream of frames otherwise
193    /// being worked with.
194    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
195        self.project().inner.project().inner
196    }
197
198    /// Returns a reference to the underlying codec wrapped by
199    /// `Framed`.
200    ///
201    /// Note that care should be taken to not tamper with the underlying codec
202    /// as it may corrupt the stream of frames otherwise being worked with.
203    pub fn codec(&self) -> &U {
204        &self.inner.codec
205    }
206
207    /// Returns a mutable reference to the underlying codec wrapped by
208    /// `Framed`.
209    ///
210    /// Note that care should be taken to not tamper with the underlying codec
211    /// as it may corrupt the stream of frames otherwise being worked with.
212    pub fn codec_mut(&mut self) -> &mut U {
213        &mut self.inner.codec
214    }
215
216    /// Maps the codec `U` to `C`, preserving the read and write buffers
217    /// wrapped by `Framed`.
218    ///
219    /// Note that care should be taken to not tamper with the underlying codec
220    /// as it may corrupt the stream of frames otherwise being worked with.
221    pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
222    where
223        F: FnOnce(U) -> C,
224    {
225        // This could be potentially simplified once rust-lang/rust#86555 hits stable
226        let parts = self.into_parts();
227        Framed::from_parts(FramedParts {
228            io: parts.io,
229            codec: map(parts.codec),
230            read_buf: parts.read_buf,
231            write_buf: parts.write_buf,
232            _priv: (),
233        })
234    }
235
236    /// Returns a mutable reference to the underlying codec wrapped by
237    /// `Framed`.
238    ///
239    /// Note that care should be taken to not tamper with the underlying codec
240    /// as it may corrupt the stream of frames otherwise being worked with.
241    pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
242        self.project().inner.project().codec
243    }
244
245    /// Returns a reference to the read buffer.
246    pub fn read_buffer(&self) -> &BytesMut {
247        &self.inner.state.read.buffer
248    }
249
250    /// Returns a mutable reference to the read buffer.
251    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
252        &mut self.inner.state.read.buffer
253    }
254
255    /// Returns a reference to the write buffer.
256    pub fn write_buffer(&self) -> &BytesMut {
257        &self.inner.state.write.buffer
258    }
259
260    /// Returns a mutable reference to the write buffer.
261    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
262        &mut self.inner.state.write.buffer
263    }
264
265    /// Returns backpressure boundary
266    pub fn backpressure_boundary(&self) -> usize {
267        self.inner.state.write.backpressure_boundary
268    }
269
270    /// Updates backpressure boundary
271    pub fn set_backpressure_boundary(&mut self, boundary: usize) {
272        self.inner.state.write.backpressure_boundary = boundary;
273    }
274
275    /// Consumes the `Framed`, returning its underlying I/O stream.
276    ///
277    /// Note that care should be taken to not tamper with the underlying stream
278    /// of data coming in as it may corrupt the stream of frames otherwise
279    /// being worked with.
280    pub fn into_inner(self) -> T {
281        self.inner.inner
282    }
283
284    /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
285    /// with unprocessed data, and the codec.
286    ///
287    /// Note that care should be taken to not tamper with the underlying stream
288    /// of data coming in as it may corrupt the stream of frames otherwise
289    /// being worked with.
290    pub fn into_parts(self) -> FramedParts<T, U> {
291        FramedParts {
292            io: self.inner.inner,
293            codec: self.inner.codec,
294            read_buf: self.inner.state.read.buffer,
295            write_buf: self.inner.state.write.buffer,
296            _priv: (),
297        }
298    }
299}
300
301// This impl just defers to the underlying FramedImpl
302impl<T, U> Stream for Framed<T, U>
303where
304    T: AsyncRead,
305    U: Decoder,
306{
307    type Item = Result<U::Item, U::Error>;
308
309    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
310        self.project().inner.poll_next(cx)
311    }
312}
313
314// This impl just defers to the underlying FramedImpl
315impl<T, I, U> Sink<I> for Framed<T, U>
316where
317    T: AsyncWrite,
318    U: Encoder<I>,
319    U::Error: From<io::Error>,
320{
321    type Error = U::Error;
322
323    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324        self.project().inner.poll_ready(cx)
325    }
326
327    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
328        self.project().inner.start_send(item)
329    }
330
331    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
332        self.project().inner.poll_flush(cx)
333    }
334
335    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
336        self.project().inner.poll_close(cx)
337    }
338}
339
340impl<T, U> fmt::Debug for Framed<T, U>
341where
342    T: fmt::Debug,
343    U: fmt::Debug,
344{
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.debug_struct("Framed")
347            .field("io", self.get_ref())
348            .field("codec", self.codec())
349            .finish()
350    }
351}
352
353/// `FramedParts` contains an export of the data of a Framed transport.
354/// It can be used to construct a new [`Framed`] with a different codec.
355/// It contains all current buffers and the inner transport.
356///
357/// [`Framed`]: crate::codec::Framed
358#[derive(Debug)]
359#[allow(clippy::manual_non_exhaustive)]
360pub struct FramedParts<T, U> {
361    /// The inner transport used to read bytes to and write bytes to
362    pub io: T,
363
364    /// The codec
365    pub codec: U,
366
367    /// The buffer with read but unprocessed data.
368    pub read_buf: BytesMut,
369
370    /// A buffer with unprocessed data which are not written yet.
371    pub write_buf: BytesMut,
372
373    /// This private field allows us to add additional fields in the future in a
374    /// backwards compatible way.
375    pub(crate) _priv: (),
376}
377
378impl<T, U> FramedParts<T, U> {
379    /// Create a new, default, `FramedParts`
380    pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
381    where
382        U: Encoder<I>,
383    {
384        FramedParts {
385            io,
386            codec,
387            read_buf: BytesMut::new(),
388            write_buf: BytesMut::new(),
389            _priv: (),
390        }
391    }
392}