hyper/body/
incoming.rs

1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11    any(feature = "http1", feature = "http2"),
12    any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
17#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22    any(feature = "http1", feature = "http2"),
23    any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36/// A stream of `Bytes`, used when receiving bodies from the network.
37///
38/// Note that Users should not instantiate this struct directly. When working with the hyper client,
39/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
40/// it is provided within requests.
41///
42/// # Examples
43///
44/// ```rust,ignore
45/// async fn echo(
46///    req: Request<hyper::body::Incoming>,
47/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
48///    //Here, you can process `Incoming`
49/// }
50/// ```
51#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53    kind: Kind,
54}
55
56enum Kind {
57    Empty,
58    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59    Chan {
60        content_length: DecodedLength,
61        want_tx: watch::Sender,
62        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63        trailers_rx: oneshot::Receiver<HeaderMap>,
64    },
65    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66    H2 {
67        content_length: DecodedLength,
68        data_done: bool,
69        ping: ping::Recorder,
70        recv: h2::RecvStream,
71    },
72    #[cfg(feature = "ffi")]
73    Ffi(crate::ffi::UserBody),
74}
75
76/// A sender half created through [`Body::channel()`].
77///
78/// Useful when wanting to stream chunks from another thread.
79///
80/// ## Body Closing
81///
82/// Note that the request body will always be closed normally when the sender is dropped (meaning
83/// that the empty terminating chunk will be sent to the remote). If you desire to close the
84/// connection with an incomplete response (e.g. in the case of an error during asynchronous
85/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
86///
87/// [`Body::channel()`]: struct.Body.html#method.channel
88/// [`Sender::abort()`]: struct.Sender.html#method.abort
89#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92    want_rx: watch::Receiver,
93    data_tx: BodySender,
94    trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103    /// Create a `Body` stream with an associated sender half.
104    ///
105    /// Useful when wanting to stream chunks from another thread.
106    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107    #[inline]
108    #[cfg(test)]
109    pub(crate) fn channel() -> (Sender, Incoming) {
110        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
111    }
112
113    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115        let (data_tx, data_rx) = mpsc::channel(0);
116        let (trailers_tx, trailers_rx) = oneshot::channel();
117
118        // If wanter is true, `Sender::poll_ready()` won't becoming ready
119        // until the `Body` has been polled for data once.
120        let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122        let (want_tx, want_rx) = watch::channel(want);
123
124        let tx = Sender {
125            want_rx,
126            data_tx,
127            trailers_tx: Some(trailers_tx),
128        };
129        let rx = Incoming::new(Kind::Chan {
130            content_length,
131            want_tx,
132            data_rx,
133            trailers_rx,
134        });
135
136        (tx, rx)
137    }
138
139    fn new(kind: Kind) -> Incoming {
140        Incoming { kind }
141    }
142
143    #[allow(dead_code)]
144    pub(crate) fn empty() -> Incoming {
145        Incoming::new(Kind::Empty)
146    }
147
148    #[cfg(feature = "ffi")]
149    pub(crate) fn ffi() -> Incoming {
150        Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151    }
152
153    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154    pub(crate) fn h2(
155        recv: h2::RecvStream,
156        mut content_length: DecodedLength,
157        ping: ping::Recorder,
158    ) -> Self {
159        // If the stream is already EOS, then the "unknown length" is clearly
160        // actually ZERO.
161        if !content_length.is_exact() && recv.is_end_stream() {
162            content_length = DecodedLength::ZERO;
163        }
164
165        Incoming::new(Kind::H2 {
166            data_done: false,
167            ping,
168            content_length,
169            recv,
170        })
171    }
172
173    #[cfg(feature = "ffi")]
174    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175        match self.kind {
176            Kind::Ffi(ref mut body) => return body,
177            _ => {
178                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179            }
180        }
181
182        match self.kind {
183            Kind::Ffi(ref mut body) => body,
184            _ => unreachable!(),
185        }
186    }
187}
188
189impl Body for Incoming {
190    type Data = Bytes;
191    type Error = crate::Error;
192
193    fn poll_frame(
194        #[cfg_attr(
195            not(all(
196                any(feature = "http1", feature = "http2"),
197                any(feature = "client", feature = "server")
198            )),
199            allow(unused_mut)
200        )]
201        mut self: Pin<&mut Self>,
202        #[cfg_attr(
203            not(all(
204                any(feature = "http1", feature = "http2"),
205                any(feature = "client", feature = "server")
206            )),
207            allow(unused_variables)
208        )]
209        cx: &mut Context<'_>,
210    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211        match self.kind {
212            Kind::Empty => Poll::Ready(None),
213            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214            Kind::Chan {
215                content_length: ref mut len,
216                ref mut data_rx,
217                ref mut want_tx,
218                ref mut trailers_rx,
219            } => {
220                want_tx.send(WANT_READY);
221
222                if !data_rx.is_terminated() {
223                    if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224                        len.sub_if(chunk.len() as u64);
225                        return Poll::Ready(Some(Ok(Frame::data(chunk))));
226                    }
227                }
228
229                // check trailers after data is terminated
230                match ready!(Pin::new(trailers_rx).poll(cx)) {
231                    Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232                    Err(_) => Poll::Ready(None),
233                }
234            }
235            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236            Kind::H2 {
237                ref mut data_done,
238                ref ping,
239                recv: ref mut h2,
240                content_length: ref mut len,
241            } => {
242                if !*data_done {
243                    match ready!(h2.poll_data(cx)) {
244                        Some(Ok(bytes)) => {
245                            let _ = h2.flow_control().release_capacity(bytes.len());
246                            len.sub_if(bytes.len() as u64);
247                            ping.record_data(bytes.len());
248                            return Poll::Ready(Some(Ok(Frame::data(bytes))));
249                        }
250                        Some(Err(e)) => {
251                            return match e.reason() {
252                                // These reasons should cause the body reading to stop, but not fail it.
253                                // The same logic as for `Read for H2Upgraded` is applied here.
254                                Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
255                                    Poll::Ready(None)
256                                }
257                                _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
258                            };
259                        }
260                        None => {
261                            *data_done = true;
262                            // fall through to trailers
263                        }
264                    }
265                }
266
267                // after data, check trailers
268                match ready!(h2.poll_trailers(cx)) {
269                    Ok(t) => {
270                        ping.record_non_data();
271                        Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272                    }
273                    Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
274                }
275            }
276
277            #[cfg(feature = "ffi")]
278            Kind::Ffi(ref mut body) => body.poll_data(cx),
279        }
280    }
281
282    fn is_end_stream(&self) -> bool {
283        match self.kind {
284            Kind::Empty => true,
285            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
286            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
287            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
288            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
289            #[cfg(feature = "ffi")]
290            Kind::Ffi(..) => false,
291        }
292    }
293
294    fn size_hint(&self) -> SizeHint {
295        #[cfg(all(
296            any(feature = "http1", feature = "http2"),
297            any(feature = "client", feature = "server")
298        ))]
299        fn opt_len(decoded_length: DecodedLength) -> SizeHint {
300            if let Some(content_length) = decoded_length.into_opt() {
301                SizeHint::with_exact(content_length)
302            } else {
303                SizeHint::default()
304            }
305        }
306
307        match self.kind {
308            Kind::Empty => SizeHint::with_exact(0),
309            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
310            Kind::Chan { content_length, .. } => opt_len(content_length),
311            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
312            Kind::H2 { content_length, .. } => opt_len(content_length),
313            #[cfg(feature = "ffi")]
314            Kind::Ffi(..) => SizeHint::default(),
315        }
316    }
317}
318
319impl fmt::Debug for Incoming {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        #[cfg(any(
322            all(
323                any(feature = "http1", feature = "http2"),
324                any(feature = "client", feature = "server")
325            ),
326            feature = "ffi"
327        ))]
328        #[derive(Debug)]
329        struct Streaming;
330        #[derive(Debug)]
331        struct Empty;
332
333        let mut builder = f.debug_tuple("Body");
334        match self.kind {
335            Kind::Empty => builder.field(&Empty),
336            #[cfg(any(
337                all(
338                    any(feature = "http1", feature = "http2"),
339                    any(feature = "client", feature = "server")
340                ),
341                feature = "ffi"
342            ))]
343            _ => builder.field(&Streaming),
344        };
345
346        builder.finish()
347    }
348}
349
350#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
351impl Sender {
352    /// Check to see if this `Sender` can send more data.
353    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
354        // Check if the receiver end has tried polling for the body yet
355        ready!(self.poll_want(cx)?);
356        self.data_tx
357            .poll_ready(cx)
358            .map_err(|_| crate::Error::new_closed())
359    }
360
361    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
362        match self.want_rx.load(cx) {
363            WANT_READY => Poll::Ready(Ok(())),
364            WANT_PENDING => Poll::Pending,
365            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
366            unexpected => unreachable!("want_rx value: {}", unexpected),
367        }
368    }
369
370    #[cfg(test)]
371    async fn ready(&mut self) -> crate::Result<()> {
372        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
373    }
374
375    /// Send data on data channel when it is ready.
376    #[cfg(test)]
377    #[allow(unused)]
378    pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
379        self.ready().await?;
380        self.data_tx
381            .try_send(Ok(chunk))
382            .map_err(|_| crate::Error::new_closed())
383    }
384
385    /// Send trailers on trailers channel.
386    #[allow(unused)]
387    pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
388        let tx = match self.trailers_tx.take() {
389            Some(tx) => tx,
390            None => return Err(crate::Error::new_closed()),
391        };
392        tx.send(trailers).map_err(|_| crate::Error::new_closed())
393    }
394
395    /// Try to send data on this channel.
396    ///
397    /// # Errors
398    ///
399    /// Returns `Err(Bytes)` if the channel could not (currently) accept
400    /// another `Bytes`.
401    ///
402    /// # Note
403    ///
404    /// This is mostly useful for when trying to send from some other thread
405    /// that doesn't have an async context. If in an async context, prefer
406    /// `send_data()` instead.
407    #[cfg(feature = "http1")]
408    pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
409        self.data_tx
410            .try_send(Ok(chunk))
411            .map_err(|err| err.into_inner().expect("just sent Ok"))
412    }
413
414    #[cfg(feature = "http1")]
415    pub(crate) fn try_send_trailers(
416        &mut self,
417        trailers: HeaderMap,
418    ) -> Result<(), Option<HeaderMap>> {
419        let tx = match self.trailers_tx.take() {
420            Some(tx) => tx,
421            None => return Err(None),
422        };
423
424        tx.send(trailers).map_err(Some)
425    }
426
427    #[cfg(test)]
428    pub(crate) fn abort(mut self) {
429        self.send_error(crate::Error::new_body_write_aborted());
430    }
431
432    pub(crate) fn send_error(&mut self, err: crate::Error) {
433        let _ = self
434            .data_tx
435            // clone so the send works even if buffer is full
436            .clone()
437            .try_send(Err(err));
438    }
439}
440
441#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
442impl fmt::Debug for Sender {
443    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444        #[derive(Debug)]
445        struct Open;
446        #[derive(Debug)]
447        struct Closed;
448
449        let mut builder = f.debug_tuple("Sender");
450        match self.want_rx.peek() {
451            watch::CLOSED => builder.field(&Closed),
452            _ => builder.field(&Open),
453        };
454
455        builder.finish()
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use std::mem;
462    use std::task::Poll;
463
464    use super::{Body, Incoming, SizeHint};
465    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
466    use super::{DecodedLength, Sender};
467    use http_body_util::BodyExt;
468
469    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
470    #[test]
471    fn test_size_of() {
472        // These are mostly to help catch *accidentally* increasing
473        // the size by too much.
474
475        let body_size = mem::size_of::<Incoming>();
476        let body_expected_size = mem::size_of::<u64>() * 5;
477        assert!(
478            body_size <= body_expected_size,
479            "Body size = {} <= {}",
480            body_size,
481            body_expected_size,
482        );
483
484        //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
485
486        assert_eq!(
487            mem::size_of::<Sender>(),
488            mem::size_of::<usize>() * 5,
489            "Sender"
490        );
491
492        assert_eq!(
493            mem::size_of::<Sender>(),
494            mem::size_of::<Option<Sender>>(),
495            "Option<Sender>"
496        );
497    }
498
499    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
500    #[test]
501    fn size_hint() {
502        fn eq(body: Incoming, b: SizeHint, note: &str) {
503            let a = body.size_hint();
504            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
505            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
506        }
507
508        eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
509
510        eq(Incoming::channel().1, SizeHint::new(), "channel");
511
512        eq(
513            Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
514            SizeHint::with_exact(4),
515            "channel with length",
516        );
517    }
518
519    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
520    #[cfg(not(miri))]
521    #[tokio::test]
522    async fn channel_abort() {
523        let (tx, mut rx) = Incoming::channel();
524
525        tx.abort();
526
527        let err = rx.frame().await.unwrap().unwrap_err();
528        assert!(err.is_body_write_aborted(), "{:?}", err);
529    }
530
531    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
532    #[cfg(all(not(miri), feature = "http1"))]
533    #[tokio::test]
534    async fn channel_abort_when_buffer_is_full() {
535        let (mut tx, mut rx) = Incoming::channel();
536
537        tx.try_send_data("chunk 1".into()).expect("send 1");
538        // buffer is full, but can still send abort
539        tx.abort();
540
541        let chunk1 = rx
542            .frame()
543            .await
544            .expect("item 1")
545            .expect("chunk 1")
546            .into_data()
547            .unwrap();
548        assert_eq!(chunk1, "chunk 1");
549
550        let err = rx.frame().await.unwrap().unwrap_err();
551        assert!(err.is_body_write_aborted(), "{:?}", err);
552    }
553
554    #[cfg(feature = "http1")]
555    #[test]
556    fn channel_buffers_one() {
557        let (mut tx, _rx) = Incoming::channel();
558
559        tx.try_send_data("chunk 1".into()).expect("send 1");
560
561        // buffer is now full
562        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
563        assert_eq!(chunk2, "chunk 2");
564    }
565
566    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
567    #[cfg(not(miri))]
568    #[tokio::test]
569    async fn channel_empty() {
570        let (_, mut rx) = Incoming::channel();
571
572        assert!(rx.frame().await.is_none());
573    }
574
575    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
576    #[test]
577    fn channel_ready() {
578        let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
579
580        let mut tx_ready = tokio_test::task::spawn(tx.ready());
581
582        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
583    }
584
585    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
586    #[test]
587    fn channel_wanter() {
588        let (mut tx, mut rx) =
589            Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
590
591        let mut tx_ready = tokio_test::task::spawn(tx.ready());
592        let mut rx_data = tokio_test::task::spawn(rx.frame());
593
594        assert!(
595            tx_ready.poll().is_pending(),
596            "tx isn't ready before rx has been polled"
597        );
598
599        assert!(rx_data.poll().is_pending(), "poll rx.data");
600        assert!(tx_ready.is_woken(), "rx poll wakes tx");
601
602        assert!(
603            tx_ready.poll().is_ready(),
604            "tx is ready after rx has been polled"
605        );
606    }
607
608    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
609    #[test]
610    fn channel_notices_closure() {
611        let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
612
613        let mut tx_ready = tokio_test::task::spawn(tx.ready());
614
615        assert!(
616            tx_ready.poll().is_pending(),
617            "tx isn't ready before rx has been polled"
618        );
619
620        drop(rx);
621        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
622
623        match tx_ready.poll() {
624            Poll::Ready(Err(ref e)) if e.is_closed() => (),
625            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
626        }
627    }
628}