1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11 any(feature = "http1", feature = "http2"),
12 any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22 any(feature = "http1", feature = "http2"),
23 any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53 kind: Kind,
54}
55
56enum Kind {
57 Empty,
58 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59 Chan {
60 content_length: DecodedLength,
61 want_tx: watch::Sender,
62 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63 trailers_rx: oneshot::Receiver<HeaderMap>,
64 },
65 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66 H2 {
67 content_length: DecodedLength,
68 data_done: bool,
69 ping: ping::Recorder,
70 recv: h2::RecvStream,
71 },
72 #[cfg(feature = "ffi")]
73 Ffi(crate::ffi::UserBody),
74}
75
76#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92 want_rx: watch::Receiver,
93 data_tx: BodySender,
94 trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107 #[inline]
108 #[cfg(test)]
109 pub(crate) fn channel() -> (Sender, Incoming) {
110 Self::new_channel(DecodedLength::CHUNKED, false)
111 }
112
113 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115 let (data_tx, data_rx) = mpsc::channel(0);
116 let (trailers_tx, trailers_rx) = oneshot::channel();
117
118 let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122 let (want_tx, want_rx) = watch::channel(want);
123
124 let tx = Sender {
125 want_rx,
126 data_tx,
127 trailers_tx: Some(trailers_tx),
128 };
129 let rx = Incoming::new(Kind::Chan {
130 content_length,
131 want_tx,
132 data_rx,
133 trailers_rx,
134 });
135
136 (tx, rx)
137 }
138
139 fn new(kind: Kind) -> Incoming {
140 Incoming { kind }
141 }
142
143 #[allow(dead_code)]
144 pub(crate) fn empty() -> Incoming {
145 Incoming::new(Kind::Empty)
146 }
147
148 #[cfg(feature = "ffi")]
149 pub(crate) fn ffi() -> Incoming {
150 Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151 }
152
153 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154 pub(crate) fn h2(
155 recv: h2::RecvStream,
156 mut content_length: DecodedLength,
157 ping: ping::Recorder,
158 ) -> Self {
159 if !content_length.is_exact() && recv.is_end_stream() {
162 content_length = DecodedLength::ZERO;
163 }
164
165 Incoming::new(Kind::H2 {
166 data_done: false,
167 ping,
168 content_length,
169 recv,
170 })
171 }
172
173 #[cfg(feature = "ffi")]
174 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175 match self.kind {
176 Kind::Ffi(ref mut body) => return body,
177 _ => {
178 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179 }
180 }
181
182 match self.kind {
183 Kind::Ffi(ref mut body) => body,
184 _ => unreachable!(),
185 }
186 }
187}
188
189impl Body for Incoming {
190 type Data = Bytes;
191 type Error = crate::Error;
192
193 fn poll_frame(
194 #[cfg_attr(
195 not(all(
196 any(feature = "http1", feature = "http2"),
197 any(feature = "client", feature = "server")
198 )),
199 allow(unused_mut)
200 )]
201 mut self: Pin<&mut Self>,
202 #[cfg_attr(
203 not(all(
204 any(feature = "http1", feature = "http2"),
205 any(feature = "client", feature = "server")
206 )),
207 allow(unused_variables)
208 )]
209 cx: &mut Context<'_>,
210 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211 match self.kind {
212 Kind::Empty => Poll::Ready(None),
213 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214 Kind::Chan {
215 content_length: ref mut len,
216 ref mut data_rx,
217 ref mut want_tx,
218 ref mut trailers_rx,
219 } => {
220 want_tx.send(WANT_READY);
221
222 if !data_rx.is_terminated() {
223 if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224 len.sub_if(chunk.len() as u64);
225 return Poll::Ready(Some(Ok(Frame::data(chunk))));
226 }
227 }
228
229 match ready!(Pin::new(trailers_rx).poll(cx)) {
231 Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232 Err(_) => Poll::Ready(None),
233 }
234 }
235 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236 Kind::H2 {
237 ref mut data_done,
238 ref ping,
239 recv: ref mut h2,
240 content_length: ref mut len,
241 } => {
242 if !*data_done {
243 match ready!(h2.poll_data(cx)) {
244 Some(Ok(bytes)) => {
245 let _ = h2.flow_control().release_capacity(bytes.len());
246 len.sub_if(bytes.len() as u64);
247 ping.record_data(bytes.len());
248 return Poll::Ready(Some(Ok(Frame::data(bytes))));
249 }
250 Some(Err(e)) => {
251 return match e.reason() {
252 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
255 Poll::Ready(None)
256 }
257 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
258 };
259 }
260 None => {
261 *data_done = true;
262 }
264 }
265 }
266
267 match ready!(h2.poll_trailers(cx)) {
269 Ok(t) => {
270 ping.record_non_data();
271 Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272 }
273 Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
274 }
275 }
276
277 #[cfg(feature = "ffi")]
278 Kind::Ffi(ref mut body) => body.poll_data(cx),
279 }
280 }
281
282 fn is_end_stream(&self) -> bool {
283 match self.kind {
284 Kind::Empty => true,
285 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
286 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
287 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
288 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
289 #[cfg(feature = "ffi")]
290 Kind::Ffi(..) => false,
291 }
292 }
293
294 fn size_hint(&self) -> SizeHint {
295 #[cfg(all(
296 any(feature = "http1", feature = "http2"),
297 any(feature = "client", feature = "server")
298 ))]
299 fn opt_len(decoded_length: DecodedLength) -> SizeHint {
300 if let Some(content_length) = decoded_length.into_opt() {
301 SizeHint::with_exact(content_length)
302 } else {
303 SizeHint::default()
304 }
305 }
306
307 match self.kind {
308 Kind::Empty => SizeHint::with_exact(0),
309 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
310 Kind::Chan { content_length, .. } => opt_len(content_length),
311 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
312 Kind::H2 { content_length, .. } => opt_len(content_length),
313 #[cfg(feature = "ffi")]
314 Kind::Ffi(..) => SizeHint::default(),
315 }
316 }
317}
318
319impl fmt::Debug for Incoming {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 #[cfg(any(
322 all(
323 any(feature = "http1", feature = "http2"),
324 any(feature = "client", feature = "server")
325 ),
326 feature = "ffi"
327 ))]
328 #[derive(Debug)]
329 struct Streaming;
330 #[derive(Debug)]
331 struct Empty;
332
333 let mut builder = f.debug_tuple("Body");
334 match self.kind {
335 Kind::Empty => builder.field(&Empty),
336 #[cfg(any(
337 all(
338 any(feature = "http1", feature = "http2"),
339 any(feature = "client", feature = "server")
340 ),
341 feature = "ffi"
342 ))]
343 _ => builder.field(&Streaming),
344 };
345
346 builder.finish()
347 }
348}
349
350#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
351impl Sender {
352 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
354 ready!(self.poll_want(cx)?);
356 self.data_tx
357 .poll_ready(cx)
358 .map_err(|_| crate::Error::new_closed())
359 }
360
361 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
362 match self.want_rx.load(cx) {
363 WANT_READY => Poll::Ready(Ok(())),
364 WANT_PENDING => Poll::Pending,
365 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
366 unexpected => unreachable!("want_rx value: {}", unexpected),
367 }
368 }
369
370 #[cfg(test)]
371 async fn ready(&mut self) -> crate::Result<()> {
372 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
373 }
374
375 #[cfg(test)]
377 #[allow(unused)]
378 pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
379 self.ready().await?;
380 self.data_tx
381 .try_send(Ok(chunk))
382 .map_err(|_| crate::Error::new_closed())
383 }
384
385 #[allow(unused)]
387 pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
388 let tx = match self.trailers_tx.take() {
389 Some(tx) => tx,
390 None => return Err(crate::Error::new_closed()),
391 };
392 tx.send(trailers).map_err(|_| crate::Error::new_closed())
393 }
394
395 #[cfg(feature = "http1")]
408 pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
409 self.data_tx
410 .try_send(Ok(chunk))
411 .map_err(|err| err.into_inner().expect("just sent Ok"))
412 }
413
414 #[cfg(feature = "http1")]
415 pub(crate) fn try_send_trailers(
416 &mut self,
417 trailers: HeaderMap,
418 ) -> Result<(), Option<HeaderMap>> {
419 let tx = match self.trailers_tx.take() {
420 Some(tx) => tx,
421 None => return Err(None),
422 };
423
424 tx.send(trailers).map_err(Some)
425 }
426
427 #[cfg(test)]
428 pub(crate) fn abort(mut self) {
429 self.send_error(crate::Error::new_body_write_aborted());
430 }
431
432 pub(crate) fn send_error(&mut self, err: crate::Error) {
433 let _ = self
434 .data_tx
435 .clone()
437 .try_send(Err(err));
438 }
439}
440
441#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
442impl fmt::Debug for Sender {
443 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444 #[derive(Debug)]
445 struct Open;
446 #[derive(Debug)]
447 struct Closed;
448
449 let mut builder = f.debug_tuple("Sender");
450 match self.want_rx.peek() {
451 watch::CLOSED => builder.field(&Closed),
452 _ => builder.field(&Open),
453 };
454
455 builder.finish()
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use std::mem;
462 use std::task::Poll;
463
464 use super::{Body, Incoming, SizeHint};
465 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
466 use super::{DecodedLength, Sender};
467 use http_body_util::BodyExt;
468
469 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
470 #[test]
471 fn test_size_of() {
472 let body_size = mem::size_of::<Incoming>();
476 let body_expected_size = mem::size_of::<u64>() * 5;
477 assert!(
478 body_size <= body_expected_size,
479 "Body size = {} <= {}",
480 body_size,
481 body_expected_size,
482 );
483
484 assert_eq!(
487 mem::size_of::<Sender>(),
488 mem::size_of::<usize>() * 5,
489 "Sender"
490 );
491
492 assert_eq!(
493 mem::size_of::<Sender>(),
494 mem::size_of::<Option<Sender>>(),
495 "Option<Sender>"
496 );
497 }
498
499 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
500 #[test]
501 fn size_hint() {
502 fn eq(body: Incoming, b: SizeHint, note: &str) {
503 let a = body.size_hint();
504 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
505 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
506 }
507
508 eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
509
510 eq(Incoming::channel().1, SizeHint::new(), "channel");
511
512 eq(
513 Incoming::new_channel(DecodedLength::new(4), false).1,
514 SizeHint::with_exact(4),
515 "channel with length",
516 );
517 }
518
519 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
520 #[cfg(not(miri))]
521 #[tokio::test]
522 async fn channel_abort() {
523 let (tx, mut rx) = Incoming::channel();
524
525 tx.abort();
526
527 let err = rx.frame().await.unwrap().unwrap_err();
528 assert!(err.is_body_write_aborted(), "{:?}", err);
529 }
530
531 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
532 #[cfg(all(not(miri), feature = "http1"))]
533 #[tokio::test]
534 async fn channel_abort_when_buffer_is_full() {
535 let (mut tx, mut rx) = Incoming::channel();
536
537 tx.try_send_data("chunk 1".into()).expect("send 1");
538 tx.abort();
540
541 let chunk1 = rx
542 .frame()
543 .await
544 .expect("item 1")
545 .expect("chunk 1")
546 .into_data()
547 .unwrap();
548 assert_eq!(chunk1, "chunk 1");
549
550 let err = rx.frame().await.unwrap().unwrap_err();
551 assert!(err.is_body_write_aborted(), "{:?}", err);
552 }
553
554 #[cfg(feature = "http1")]
555 #[test]
556 fn channel_buffers_one() {
557 let (mut tx, _rx) = Incoming::channel();
558
559 tx.try_send_data("chunk 1".into()).expect("send 1");
560
561 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
563 assert_eq!(chunk2, "chunk 2");
564 }
565
566 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
567 #[cfg(not(miri))]
568 #[tokio::test]
569 async fn channel_empty() {
570 let (_, mut rx) = Incoming::channel();
571
572 assert!(rx.frame().await.is_none());
573 }
574
575 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
576 #[test]
577 fn channel_ready() {
578 let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, false);
579
580 let mut tx_ready = tokio_test::task::spawn(tx.ready());
581
582 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
583 }
584
585 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
586 #[test]
587 fn channel_wanter() {
588 let (mut tx, mut rx) =
589 Incoming::new_channel(DecodedLength::CHUNKED, true);
590
591 let mut tx_ready = tokio_test::task::spawn(tx.ready());
592 let mut rx_data = tokio_test::task::spawn(rx.frame());
593
594 assert!(
595 tx_ready.poll().is_pending(),
596 "tx isn't ready before rx has been polled"
597 );
598
599 assert!(rx_data.poll().is_pending(), "poll rx.data");
600 assert!(tx_ready.is_woken(), "rx poll wakes tx");
601
602 assert!(
603 tx_ready.poll().is_ready(),
604 "tx is ready after rx has been polled"
605 );
606 }
607
608 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
609 #[test]
610 fn channel_notices_closure() {
611 let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, true);
612
613 let mut tx_ready = tokio_test::task::spawn(tx.ready());
614
615 assert!(
616 tx_ready.poll().is_pending(),
617 "tx isn't ready before rx has been polled"
618 );
619
620 drop(rx);
621 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
622
623 match tx_ready.poll() {
624 Poll::Ready(Err(ref e)) if e.is_closed() => (),
625 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
626 }
627 }
628}