Skip to main content

hyper/body/
incoming.rs

1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11    any(feature = "http1", feature = "http2"),
12    any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
17#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22    any(feature = "http1", feature = "http2"),
23    any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36/// A stream of `Bytes`, used when receiving bodies from the network.
37///
38/// Note that Users should not instantiate this struct directly. When working with the hyper client,
39/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
40/// it is provided within requests.
41///
42/// # Examples
43///
44/// ```rust,ignore
45/// async fn echo(
46///    req: Request<hyper::body::Incoming>,
47/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
48///    //Here, you can process `Incoming`
49/// }
50/// ```
51#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53    kind: Kind,
54}
55
56enum Kind {
57    Empty,
58    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59    Chan {
60        content_length: DecodedLength,
61        want_tx: watch::Sender,
62        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63        trailers_rx: oneshot::Receiver<HeaderMap>,
64    },
65    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66    H2 {
67        content_length: DecodedLength,
68        data_done: bool,
69        ping: ping::Recorder,
70        recv: h2::RecvStream,
71    },
72    #[cfg(feature = "ffi")]
73    Ffi(crate::ffi::UserBody),
74}
75
76/// A sender half created through [`Body::channel()`].
77///
78/// Useful when wanting to stream chunks from another thread.
79///
80/// ## Body Closing
81///
82/// Note that the request body will always be closed normally when the sender is dropped (meaning
83/// that the empty terminating chunk will be sent to the remote). If you desire to close the
84/// connection with an incomplete response (e.g. in the case of an error during asynchronous
85/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
86///
87/// [`Body::channel()`]: struct.Body.html#method.channel
88/// [`Sender::abort()`]: struct.Sender.html#method.abort
89#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92    want_rx: watch::Receiver,
93    data_tx: BodySender,
94    trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103    /// Create a `Body` stream with an associated sender half.
104    ///
105    /// Useful when wanting to stream chunks from another thread.
106    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107    #[inline]
108    #[cfg(test)]
109    pub(crate) fn channel() -> (Sender, Incoming) {
110        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
111    }
112
113    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115        let (data_tx, data_rx) = mpsc::channel(0);
116        let (trailers_tx, trailers_rx) = oneshot::channel();
117
118        // If wanter is true, `Sender::poll_ready()` won't becoming ready
119        // until the `Body` has been polled for data once.
120        let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122        let (want_tx, want_rx) = watch::channel(want);
123
124        let tx = Sender {
125            want_rx,
126            data_tx,
127            trailers_tx: Some(trailers_tx),
128        };
129        let rx = Incoming::new(Kind::Chan {
130            content_length,
131            want_tx,
132            data_rx,
133            trailers_rx,
134        });
135
136        (tx, rx)
137    }
138
139    fn new(kind: Kind) -> Incoming {
140        Incoming { kind }
141    }
142
143    #[allow(dead_code)]
144    pub(crate) fn empty() -> Incoming {
145        Incoming::new(Kind::Empty)
146    }
147
148    #[cfg(feature = "ffi")]
149    pub(crate) fn ffi() -> Incoming {
150        Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151    }
152
153    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154    pub(crate) fn h2(
155        recv: h2::RecvStream,
156        mut content_length: DecodedLength,
157        ping: ping::Recorder,
158    ) -> Self {
159        // If the stream is already EOS, then the "unknown length" is clearly
160        // actually ZERO.
161        if !content_length.is_exact() && recv.is_end_stream() {
162            content_length = DecodedLength::ZERO;
163        }
164
165        Incoming::new(Kind::H2 {
166            data_done: false,
167            ping,
168            content_length,
169            recv,
170        })
171    }
172
173    #[cfg(feature = "ffi")]
174    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175        match self.kind {
176            Kind::Ffi(ref mut body) => return body,
177            _ => {
178                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179            }
180        }
181
182        match self.kind {
183            Kind::Ffi(ref mut body) => body,
184            _ => unreachable!(),
185        }
186    }
187}
188
189impl Body for Incoming {
190    type Data = Bytes;
191    type Error = crate::Error;
192
193    fn poll_frame(
194        #[cfg_attr(
195            not(all(
196                any(feature = "http1", feature = "http2"),
197                any(feature = "client", feature = "server")
198            )),
199            allow(unused_mut)
200        )]
201        mut self: Pin<&mut Self>,
202        #[cfg_attr(
203            not(all(
204                any(feature = "http1", feature = "http2"),
205                any(feature = "client", feature = "server")
206            )),
207            allow(unused_variables)
208        )]
209        cx: &mut Context<'_>,
210    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211        match self.kind {
212            Kind::Empty => Poll::Ready(None),
213            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214            Kind::Chan {
215                content_length: ref mut len,
216                ref mut data_rx,
217                ref mut want_tx,
218                ref mut trailers_rx,
219            } => {
220                want_tx.send(WANT_READY);
221
222                if !data_rx.is_terminated() {
223                    if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224                        len.sub_if(chunk.len() as u64);
225                        return Poll::Ready(Some(Ok(Frame::data(chunk))));
226                    }
227                }
228
229                // check trailers after data is terminated
230                match ready!(Pin::new(trailers_rx).poll(cx)) {
231                    Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232                    Err(_) => Poll::Ready(None),
233                }
234            }
235            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236            Kind::H2 {
237                ref mut data_done,
238                ref ping,
239                recv: ref mut h2,
240                content_length: ref mut len,
241            } => {
242                if !*data_done {
243                    match ready!(h2.poll_data(cx)) {
244                        Some(Ok(bytes)) => {
245                            let _ = h2.flow_control().release_capacity(bytes.len());
246                            len.sub_if(bytes.len() as u64);
247                            ping.record_data(bytes.len());
248                            return Poll::Ready(Some(Ok(Frame::data(bytes))));
249                        }
250                        Some(Err(e)) => {
251                            if let Some(h2::Reason::NO_ERROR) = e.reason() {
252                                // As mentioned in RFC 7540 Section 8.1, a RST_STREAM with NO_ERROR
253                                // indicates an early response, and should cause the body reading
254                                // to stop, but not fail it:
255                                return Poll::Ready(None);
256                            } else {
257                                return Poll::Ready(Some(Err(crate::Error::new_body(e))));
258                            }
259                        }
260                        None => {
261                            *data_done = true;
262                            // fall through to trailers
263                        }
264                    }
265                }
266
267                // after data, check trailers
268                match ready!(h2.poll_trailers(cx)) {
269                    Ok(t) => {
270                        ping.record_non_data();
271                        Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272                    }
273                    Err(e) => {
274                        if let Some(h2::Reason::NO_ERROR) = e.reason() {
275                            // Same as above, a RST_STREAM with NO_ERROR indicates an early
276                            // response, and should cause reading the trailers to stop, but
277                            // not fail it:
278                            Poll::Ready(None)
279                        } else {
280                            Poll::Ready(Some(Err(crate::Error::new_h2(e))))
281                        }
282                    }
283                }
284            }
285
286            #[cfg(feature = "ffi")]
287            Kind::Ffi(ref mut body) => body.poll_data(cx),
288        }
289    }
290
291    fn is_end_stream(&self) -> bool {
292        match self.kind {
293            Kind::Empty => true,
294            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
295            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
296            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
297            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
298            #[cfg(feature = "ffi")]
299            Kind::Ffi(..) => false,
300        }
301    }
302
303    fn size_hint(&self) -> SizeHint {
304        #[cfg(all(
305            any(feature = "http1", feature = "http2"),
306            any(feature = "client", feature = "server")
307        ))]
308        fn opt_len(decoded_length: DecodedLength) -> SizeHint {
309            if let Some(content_length) = decoded_length.into_opt() {
310                SizeHint::with_exact(content_length)
311            } else {
312                SizeHint::default()
313            }
314        }
315
316        match self.kind {
317            Kind::Empty => SizeHint::with_exact(0),
318            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
319            Kind::Chan { content_length, .. } => opt_len(content_length),
320            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
321            Kind::H2 { content_length, .. } => opt_len(content_length),
322            #[cfg(feature = "ffi")]
323            Kind::Ffi(..) => SizeHint::default(),
324        }
325    }
326}
327
328impl fmt::Debug for Incoming {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        #[cfg(any(
331            all(
332                any(feature = "http1", feature = "http2"),
333                any(feature = "client", feature = "server")
334            ),
335            feature = "ffi"
336        ))]
337        #[derive(Debug)]
338        struct Streaming;
339        #[derive(Debug)]
340        struct Empty;
341
342        let mut builder = f.debug_tuple("Body");
343        match self.kind {
344            Kind::Empty => builder.field(&Empty),
345            #[cfg(any(
346                all(
347                    any(feature = "http1", feature = "http2"),
348                    any(feature = "client", feature = "server")
349                ),
350                feature = "ffi"
351            ))]
352            _ => builder.field(&Streaming),
353        };
354
355        builder.finish()
356    }
357}
358
359#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
360impl Sender {
361    /// Check to see if this `Sender` can send more data.
362    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
363        // Check if the receiver end has tried polling for the body yet
364        ready!(self.poll_want(cx)?);
365        self.data_tx
366            .poll_ready(cx)
367            .map_err(|_| crate::Error::new_closed())
368    }
369
370    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
371        match self.want_rx.load(cx) {
372            WANT_READY => Poll::Ready(Ok(())),
373            WANT_PENDING => Poll::Pending,
374            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
375            unexpected => unreachable!("want_rx value: {}", unexpected),
376        }
377    }
378
379    #[cfg(test)]
380    async fn ready(&mut self) -> crate::Result<()> {
381        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
382    }
383
384    /// Send data on data channel when it is ready.
385    #[cfg(test)]
386    #[allow(unused)]
387    pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
388        self.ready().await?;
389        self.data_tx
390            .try_send(Ok(chunk))
391            .map_err(|_| crate::Error::new_closed())
392    }
393
394    /// Send trailers on trailers channel.
395    #[allow(unused)]
396    pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
397        let tx = match self.trailers_tx.take() {
398            Some(tx) => tx,
399            None => return Err(crate::Error::new_closed()),
400        };
401        tx.send(trailers).map_err(|_| crate::Error::new_closed())
402    }
403
404    /// Try to send data on this channel.
405    ///
406    /// # Errors
407    ///
408    /// Returns `Err(Bytes)` if the channel could not (currently) accept
409    /// another `Bytes`.
410    ///
411    /// # Note
412    ///
413    /// This is mostly useful for when trying to send from some other thread
414    /// that doesn't have an async context. If in an async context, prefer
415    /// `send_data()` instead.
416    #[cfg(feature = "http1")]
417    pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
418        self.data_tx
419            .try_send(Ok(chunk))
420            .map_err(|err| err.into_inner().expect("just sent Ok"))
421    }
422
423    #[cfg(feature = "http1")]
424    pub(crate) fn try_send_trailers(
425        &mut self,
426        trailers: HeaderMap,
427    ) -> Result<(), Option<HeaderMap>> {
428        let tx = match self.trailers_tx.take() {
429            Some(tx) => tx,
430            None => return Err(None),
431        };
432
433        tx.send(trailers).map_err(Some)
434    }
435
436    #[cfg(test)]
437    pub(crate) fn abort(mut self) {
438        self.send_error(crate::Error::new_body_write_aborted());
439    }
440
441    pub(crate) fn send_error(&mut self, err: crate::Error) {
442        let _ = self
443            .data_tx
444            // clone so the send works even if buffer is full
445            .clone()
446            .try_send(Err(err));
447    }
448}
449
450#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
451impl fmt::Debug for Sender {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        #[derive(Debug)]
454        struct Open;
455        #[derive(Debug)]
456        struct Closed;
457
458        let mut builder = f.debug_tuple("Sender");
459        match self.want_rx.peek() {
460            watch::CLOSED => builder.field(&Closed),
461            _ => builder.field(&Open),
462        };
463
464        builder.finish()
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
471    use std::mem;
472    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
473    use std::task::Poll;
474
475    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
476    use super::{Body, Incoming, SizeHint};
477    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
478    use super::{DecodedLength, Sender};
479    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
480    use http_body_util::BodyExt;
481
482    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
483    #[test]
484    fn test_size_of() {
485        // These are mostly to help catch *accidentally* increasing
486        // the size by too much.
487
488        let body_size = mem::size_of::<Incoming>();
489        let body_expected_size = mem::size_of::<u64>() * 5;
490        assert!(
491            body_size <= body_expected_size,
492            "Body size = {} <= {}",
493            body_size,
494            body_expected_size,
495        );
496
497        //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
498
499        assert_eq!(
500            mem::size_of::<Sender>(),
501            mem::size_of::<usize>() * 5,
502            "Sender"
503        );
504
505        assert_eq!(
506            mem::size_of::<Sender>(),
507            mem::size_of::<Option<Sender>>(),
508            "Option<Sender>"
509        );
510    }
511
512    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
513    #[test]
514    fn size_hint() {
515        fn eq(body: Incoming, b: SizeHint, note: &str) {
516            let a = body.size_hint();
517            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
518            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
519        }
520
521        eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
522
523        eq(Incoming::channel().1, SizeHint::new(), "channel");
524
525        eq(
526            Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
527            SizeHint::with_exact(4),
528            "channel with length",
529        );
530    }
531
532    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
533    #[cfg(not(miri))]
534    #[tokio::test]
535    async fn channel_abort() {
536        let (tx, mut rx) = Incoming::channel();
537
538        tx.abort();
539
540        let err = rx.frame().await.unwrap().unwrap_err();
541        assert!(err.is_body_write_aborted(), "{:?}", err);
542    }
543
544    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
545    #[cfg(all(not(miri), feature = "http1"))]
546    #[tokio::test]
547    async fn channel_abort_when_buffer_is_full() {
548        let (mut tx, mut rx) = Incoming::channel();
549
550        tx.try_send_data("chunk 1".into()).expect("send 1");
551        // buffer is full, but can still send abort
552        tx.abort();
553
554        let chunk1 = rx
555            .frame()
556            .await
557            .expect("item 1")
558            .expect("chunk 1")
559            .into_data()
560            .unwrap();
561        assert_eq!(chunk1, "chunk 1");
562
563        let err = rx.frame().await.unwrap().unwrap_err();
564        assert!(err.is_body_write_aborted(), "{:?}", err);
565    }
566
567    #[cfg(feature = "http1")]
568    #[test]
569    fn channel_buffers_one() {
570        let (mut tx, _rx) = Incoming::channel();
571
572        tx.try_send_data("chunk 1".into()).expect("send 1");
573
574        // buffer is now full
575        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
576        assert_eq!(chunk2, "chunk 2");
577    }
578
579    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
580    #[cfg(not(miri))]
581    #[tokio::test]
582    async fn channel_empty() {
583        let (_, mut rx) = Incoming::channel();
584
585        assert!(rx.frame().await.is_none());
586    }
587
588    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
589    #[test]
590    fn channel_ready() {
591        let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
592
593        let mut tx_ready = tokio_test::task::spawn(tx.ready());
594
595        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
596    }
597
598    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
599    #[test]
600    fn channel_wanter() {
601        let (mut tx, mut rx) =
602            Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
603
604        let mut tx_ready = tokio_test::task::spawn(tx.ready());
605        let mut rx_data = tokio_test::task::spawn(rx.frame());
606
607        assert!(
608            tx_ready.poll().is_pending(),
609            "tx isn't ready before rx has been polled"
610        );
611
612        assert!(rx_data.poll().is_pending(), "poll rx.data");
613        assert!(tx_ready.is_woken(), "rx poll wakes tx");
614
615        assert!(
616            tx_ready.poll().is_ready(),
617            "tx is ready after rx has been polled"
618        );
619    }
620
621    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
622    #[test]
623    fn channel_notices_closure() {
624        let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
625
626        let mut tx_ready = tokio_test::task::spawn(tx.ready());
627
628        assert!(
629            tx_ready.poll().is_pending(),
630            "tx isn't ready before rx has been polled"
631        );
632
633        drop(rx);
634        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
635
636        match tx_ready.poll() {
637            Poll::Ready(Err(ref e)) if e.is_closed() => (),
638            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
639        }
640    }
641}