1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11 any(feature = "http1", feature = "http2"),
12 any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22 any(feature = "http1", feature = "http2"),
23 any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53 kind: Kind,
54}
55
56enum Kind {
57 Empty,
58 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59 Chan {
60 content_length: DecodedLength,
61 want_tx: watch::Sender,
62 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63 trailers_rx: oneshot::Receiver<HeaderMap>,
64 },
65 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66 H2 {
67 content_length: DecodedLength,
68 data_done: bool,
69 ping: ping::Recorder,
70 recv: h2::RecvStream,
71 },
72 #[cfg(feature = "ffi")]
73 Ffi(crate::ffi::UserBody),
74}
75
76#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92 want_rx: watch::Receiver,
93 data_tx: BodySender,
94 trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107 #[inline]
108 #[cfg(test)]
109 pub(crate) fn channel() -> (Sender, Incoming) {
110 Self::new_channel(DecodedLength::CHUNKED, false)
111 }
112
113 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115 let (data_tx, data_rx) = mpsc::channel(0);
116 let (trailers_tx, trailers_rx) = oneshot::channel();
117
118 let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122 let (want_tx, want_rx) = watch::channel(want);
123
124 let tx = Sender {
125 want_rx,
126 data_tx,
127 trailers_tx: Some(trailers_tx),
128 };
129 let rx = Incoming::new(Kind::Chan {
130 content_length,
131 want_tx,
132 data_rx,
133 trailers_rx,
134 });
135
136 (tx, rx)
137 }
138
139 fn new(kind: Kind) -> Incoming {
140 Incoming { kind }
141 }
142
143 #[allow(dead_code)]
144 pub(crate) fn empty() -> Incoming {
145 Incoming::new(Kind::Empty)
146 }
147
148 #[cfg(feature = "ffi")]
149 pub(crate) fn ffi() -> Incoming {
150 Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151 }
152
153 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154 pub(crate) fn h2(
155 recv: h2::RecvStream,
156 mut content_length: DecodedLength,
157 ping: ping::Recorder,
158 ) -> Self {
159 if !content_length.is_exact() && recv.is_end_stream() {
162 content_length = DecodedLength::ZERO;
163 }
164
165 Incoming::new(Kind::H2 {
166 data_done: false,
167 ping,
168 content_length,
169 recv,
170 })
171 }
172
173 #[cfg(feature = "ffi")]
174 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175 match self.kind {
176 Kind::Ffi(ref mut body) => return body,
177 _ => {
178 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179 }
180 }
181
182 match self.kind {
183 Kind::Ffi(ref mut body) => body,
184 _ => unreachable!(),
185 }
186 }
187}
188
189impl Body for Incoming {
190 type Data = Bytes;
191 type Error = crate::Error;
192
193 fn poll_frame(
194 #[cfg_attr(
195 not(all(
196 any(feature = "http1", feature = "http2"),
197 any(feature = "client", feature = "server")
198 )),
199 allow(unused_mut)
200 )]
201 mut self: Pin<&mut Self>,
202 #[cfg_attr(
203 not(all(
204 any(feature = "http1", feature = "http2"),
205 any(feature = "client", feature = "server")
206 )),
207 allow(unused_variables)
208 )]
209 cx: &mut Context<'_>,
210 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211 match self.kind {
212 Kind::Empty => Poll::Ready(None),
213 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214 Kind::Chan {
215 content_length: ref mut len,
216 ref mut data_rx,
217 ref mut want_tx,
218 ref mut trailers_rx,
219 } => {
220 want_tx.send(WANT_READY);
221
222 if !data_rx.is_terminated() {
223 if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224 len.sub_if(chunk.len() as u64);
225 return Poll::Ready(Some(Ok(Frame::data(chunk))));
226 }
227 }
228
229 match ready!(Pin::new(trailers_rx).poll(cx)) {
231 Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232 Err(_) => Poll::Ready(None),
233 }
234 }
235 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236 Kind::H2 {
237 ref mut data_done,
238 ref ping,
239 recv: ref mut h2,
240 content_length: ref mut len,
241 } => {
242 if !*data_done {
243 match ready!(h2.poll_data(cx)) {
244 Some(Ok(bytes)) => {
245 let _ = h2.flow_control().release_capacity(bytes.len());
246 len.sub_if(bytes.len() as u64);
247 ping.record_data(bytes.len());
248 return Poll::Ready(Some(Ok(Frame::data(bytes))));
249 }
250 Some(Err(e)) => {
251 if let Some(h2::Reason::NO_ERROR) = e.reason() {
252 return Poll::Ready(None);
256 } else {
257 return Poll::Ready(Some(Err(crate::Error::new_body(e))));
258 }
259 }
260 None => {
261 *data_done = true;
262 }
264 }
265 }
266
267 match ready!(h2.poll_trailers(cx)) {
269 Ok(t) => {
270 ping.record_non_data();
271 Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272 }
273 Err(e) => {
274 if let Some(h2::Reason::NO_ERROR) = e.reason() {
275 Poll::Ready(None)
279 } else {
280 Poll::Ready(Some(Err(crate::Error::new_h2(e))))
281 }
282 }
283 }
284 }
285
286 #[cfg(feature = "ffi")]
287 Kind::Ffi(ref mut body) => body.poll_data(cx),
288 }
289 }
290
291 fn is_end_stream(&self) -> bool {
292 match self.kind {
293 Kind::Empty => true,
294 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
295 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
296 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
297 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
298 #[cfg(feature = "ffi")]
299 Kind::Ffi(..) => false,
300 }
301 }
302
303 fn size_hint(&self) -> SizeHint {
304 #[cfg(all(
305 any(feature = "http1", feature = "http2"),
306 any(feature = "client", feature = "server")
307 ))]
308 fn opt_len(decoded_length: DecodedLength) -> SizeHint {
309 if let Some(content_length) = decoded_length.into_opt() {
310 SizeHint::with_exact(content_length)
311 } else {
312 SizeHint::default()
313 }
314 }
315
316 match self.kind {
317 Kind::Empty => SizeHint::with_exact(0),
318 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
319 Kind::Chan { content_length, .. } => opt_len(content_length),
320 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
321 Kind::H2 { content_length, .. } => opt_len(content_length),
322 #[cfg(feature = "ffi")]
323 Kind::Ffi(..) => SizeHint::default(),
324 }
325 }
326}
327
328impl fmt::Debug for Incoming {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 #[cfg(any(
331 all(
332 any(feature = "http1", feature = "http2"),
333 any(feature = "client", feature = "server")
334 ),
335 feature = "ffi"
336 ))]
337 #[derive(Debug)]
338 struct Streaming;
339 #[derive(Debug)]
340 struct Empty;
341
342 let mut builder = f.debug_tuple("Body");
343 match self.kind {
344 Kind::Empty => builder.field(&Empty),
345 #[cfg(any(
346 all(
347 any(feature = "http1", feature = "http2"),
348 any(feature = "client", feature = "server")
349 ),
350 feature = "ffi"
351 ))]
352 _ => builder.field(&Streaming),
353 };
354
355 builder.finish()
356 }
357}
358
359#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
360impl Sender {
361 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
363 ready!(self.poll_want(cx)?);
365 self.data_tx
366 .poll_ready(cx)
367 .map_err(|_| crate::Error::new_closed())
368 }
369
370 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
371 match self.want_rx.load(cx) {
372 WANT_READY => Poll::Ready(Ok(())),
373 WANT_PENDING => Poll::Pending,
374 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
375 unexpected => unreachable!("want_rx value: {}", unexpected),
376 }
377 }
378
379 #[cfg(test)]
380 async fn ready(&mut self) -> crate::Result<()> {
381 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
382 }
383
384 #[cfg(test)]
386 #[allow(unused)]
387 pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
388 self.ready().await?;
389 self.data_tx
390 .try_send(Ok(chunk))
391 .map_err(|_| crate::Error::new_closed())
392 }
393
394 #[allow(unused)]
396 pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
397 let tx = match self.trailers_tx.take() {
398 Some(tx) => tx,
399 None => return Err(crate::Error::new_closed()),
400 };
401 tx.send(trailers).map_err(|_| crate::Error::new_closed())
402 }
403
404 #[cfg(feature = "http1")]
417 pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
418 self.data_tx
419 .try_send(Ok(chunk))
420 .map_err(|err| err.into_inner().expect("just sent Ok"))
421 }
422
423 #[cfg(feature = "http1")]
424 pub(crate) fn try_send_trailers(
425 &mut self,
426 trailers: HeaderMap,
427 ) -> Result<(), Option<HeaderMap>> {
428 let tx = match self.trailers_tx.take() {
429 Some(tx) => tx,
430 None => return Err(None),
431 };
432
433 tx.send(trailers).map_err(Some)
434 }
435
436 #[cfg(test)]
437 pub(crate) fn abort(mut self) {
438 self.send_error(crate::Error::new_body_write_aborted());
439 }
440
441 pub(crate) fn send_error(&mut self, err: crate::Error) {
442 let _ = self
443 .data_tx
444 .clone()
446 .try_send(Err(err));
447 }
448}
449
450#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
451impl fmt::Debug for Sender {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 #[derive(Debug)]
454 struct Open;
455 #[derive(Debug)]
456 struct Closed;
457
458 let mut builder = f.debug_tuple("Sender");
459 match self.want_rx.peek() {
460 watch::CLOSED => builder.field(&Closed),
461 _ => builder.field(&Open),
462 };
463
464 builder.finish()
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
471 use std::mem;
472 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
473 use std::task::Poll;
474
475 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
476 use super::{Body, Incoming, SizeHint};
477 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
478 use super::{DecodedLength, Sender};
479 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
480 use http_body_util::BodyExt;
481
482 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
483 #[test]
484 fn test_size_of() {
485 let body_size = mem::size_of::<Incoming>();
489 let body_expected_size = mem::size_of::<u64>() * 5;
490 assert!(
491 body_size <= body_expected_size,
492 "Body size = {} <= {}",
493 body_size,
494 body_expected_size,
495 );
496
497 assert_eq!(
500 mem::size_of::<Sender>(),
501 mem::size_of::<usize>() * 5,
502 "Sender"
503 );
504
505 assert_eq!(
506 mem::size_of::<Sender>(),
507 mem::size_of::<Option<Sender>>(),
508 "Option<Sender>"
509 );
510 }
511
512 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
513 #[test]
514 fn size_hint() {
515 fn eq(body: Incoming, b: SizeHint, note: &str) {
516 let a = body.size_hint();
517 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
518 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
519 }
520
521 eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
522
523 eq(Incoming::channel().1, SizeHint::new(), "channel");
524
525 eq(
526 Incoming::new_channel(DecodedLength::new(4), false).1,
527 SizeHint::with_exact(4),
528 "channel with length",
529 );
530 }
531
532 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
533 #[cfg(not(miri))]
534 #[tokio::test]
535 async fn channel_abort() {
536 let (tx, mut rx) = Incoming::channel();
537
538 tx.abort();
539
540 let err = rx.frame().await.unwrap().unwrap_err();
541 assert!(err.is_body_write_aborted(), "{:?}", err);
542 }
543
544 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
545 #[cfg(all(not(miri), feature = "http1"))]
546 #[tokio::test]
547 async fn channel_abort_when_buffer_is_full() {
548 let (mut tx, mut rx) = Incoming::channel();
549
550 tx.try_send_data("chunk 1".into()).expect("send 1");
551 tx.abort();
553
554 let chunk1 = rx
555 .frame()
556 .await
557 .expect("item 1")
558 .expect("chunk 1")
559 .into_data()
560 .unwrap();
561 assert_eq!(chunk1, "chunk 1");
562
563 let err = rx.frame().await.unwrap().unwrap_err();
564 assert!(err.is_body_write_aborted(), "{:?}", err);
565 }
566
567 #[cfg(feature = "http1")]
568 #[test]
569 fn channel_buffers_one() {
570 let (mut tx, _rx) = Incoming::channel();
571
572 tx.try_send_data("chunk 1".into()).expect("send 1");
573
574 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
576 assert_eq!(chunk2, "chunk 2");
577 }
578
579 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
580 #[cfg(not(miri))]
581 #[tokio::test]
582 async fn channel_empty() {
583 let (_, mut rx) = Incoming::channel();
584
585 assert!(rx.frame().await.is_none());
586 }
587
588 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
589 #[test]
590 fn channel_ready() {
591 let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, false);
592
593 let mut tx_ready = tokio_test::task::spawn(tx.ready());
594
595 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
596 }
597
598 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
599 #[test]
600 fn channel_wanter() {
601 let (mut tx, mut rx) =
602 Incoming::new_channel(DecodedLength::CHUNKED, true);
603
604 let mut tx_ready = tokio_test::task::spawn(tx.ready());
605 let mut rx_data = tokio_test::task::spawn(rx.frame());
606
607 assert!(
608 tx_ready.poll().is_pending(),
609 "tx isn't ready before rx has been polled"
610 );
611
612 assert!(rx_data.poll().is_pending(), "poll rx.data");
613 assert!(tx_ready.is_woken(), "rx poll wakes tx");
614
615 assert!(
616 tx_ready.poll().is_ready(),
617 "tx is ready after rx has been polled"
618 );
619 }
620
621 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
622 #[test]
623 fn channel_notices_closure() {
624 let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, true);
625
626 let mut tx_ready = tokio_test::task::spawn(tx.ready());
627
628 assert!(
629 tx_ready.poll().is_pending(),
630 "tx isn't ready before rx has been polled"
631 );
632
633 drop(rx);
634 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
635
636 match tx_ready.poll() {
637 Poll::Ready(Err(ref e)) if e.is_closed() => (),
638 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
639 }
640 }
641}