1use std::{
2 error::Error as StdError,
3 future::Future,
4 marker::Unpin,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use crate::rt::{Read, Write};
10use bytes::{Buf, Bytes};
11use futures_core::ready;
12use http::Request;
13
14use super::{Http1Transaction, Wants};
15use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
16#[cfg(feature = "client")]
17use crate::client::dispatch::TrySendError;
18use crate::common::task;
19use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
20use crate::upgrade::OnUpgrade;
21
22pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
23 conn: Conn<I, Bs::Data, T>,
24 dispatch: D,
25 body_tx: SenderDropGuard,
26 body_rx: Pin<Box<Option<Bs>>>,
27 is_closing: bool,
28}
29
30pub(crate) trait Dispatch {
31 type PollItem;
32 type PollBody;
33 type PollError;
34 type RecvItem;
35 fn poll_msg(
36 self: Pin<&mut Self>,
37 cx: &mut Context<'_>,
38 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
39 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
40 -> crate::Result<()>;
41 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
42 fn should_poll(&self) -> bool;
43}
44
45cfg_server! {
46 use crate::service::HttpService;
47
48 pub(crate) struct Server<S: HttpService<B>, B> {
49 in_flight: Pin<Box<Option<S::Future>>>,
50 pub(crate) service: S,
51 }
52}
53
54cfg_client! {
55 pin_project_lite::pin_project! {
56 pub(crate) struct Client<B> {
57 callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<IncomingBody>>>,
58 #[pin]
59 rx: ClientRx<B>,
60 rx_closed: bool,
61 }
62 }
63
64 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<IncomingBody>>;
65}
66
67impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
68where
69 D: Dispatch<
70 PollItem = MessageHead<T::Outgoing>,
71 PollBody = Bs,
72 RecvItem = MessageHead<T::Incoming>,
73 > + Unpin,
74 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
75 I: Read + Write + Unpin,
76 T: Http1Transaction + Unpin,
77 Bs: Body + 'static,
78 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
79{
80 pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
81 Dispatcher {
82 conn,
83 dispatch,
84 body_tx: SenderDropGuard::none(),
85 body_rx: Box::pin(None),
86 is_closing: false,
87 }
88 }
89
90 #[cfg(feature = "server")]
91 pub(crate) fn disable_keep_alive(&mut self) {
92 self.conn.disable_keep_alive();
93
94 if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
98 self.close();
99 }
100 }
101
102 pub(crate) fn into_inner(self) -> (I, Bytes, D) {
103 let (io, buf) = self.conn.into_inner();
104 (io, buf, self.dispatch)
105 }
106
107 pub(crate) fn poll_without_shutdown(
113 &mut self,
114 cx: &mut Context<'_>,
115 ) -> Poll<crate::Result<()>> {
116 Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
117 if let Dispatched::Upgrade(pending) = ds {
118 pending.manual();
119 }
120 })
121 }
122
123 fn poll_catch(
124 &mut self,
125 cx: &mut Context<'_>,
126 should_shutdown: bool,
127 ) -> Poll<crate::Result<Dispatched>> {
128 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
129 if let Some(mut body) = self.body_tx.take() {
132 body.send_error(crate::Error::new_body("connection error"));
133 }
134 self.dispatch.recv_msg(Err(e))?;
139 Ok(Dispatched::Shutdown)
140 }))
141 }
142
143 fn poll_inner(
144 &mut self,
145 cx: &mut Context<'_>,
146 should_shutdown: bool,
147 ) -> Poll<crate::Result<Dispatched>> {
148 T::update_date();
149
150 ready!(self.poll_loop(cx))?;
151
152 if self.is_done() {
153 if let Some(pending) = self.conn.pending_upgrade() {
154 self.conn.take_error()?;
155 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
156 } else if should_shutdown {
157 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
158 }
159 self.conn.take_error()?;
160 Poll::Ready(Ok(Dispatched::Shutdown))
161 } else {
162 Poll::Pending
163 }
164 }
165
166 fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
167 for _ in 0..16 {
173 let _ = self.poll_read(cx)?;
174 let write_ready = self.poll_write(cx)?.is_ready();
175 let flush_ready = self.poll_flush(cx)?.is_ready();
176
177 let wants_write_again = self.can_write_again() && (write_ready || flush_ready);
182
183 let wants_read_again = self.conn.wants_read_again();
192
193 if !(wants_write_again || wants_read_again) {
196 return Poll::Ready(Ok(()));
197 }
198
199 if !wants_read_again && wants_write_again {
204 if self.poll_write(cx)?.is_pending() {
208 return Poll::Ready(Ok(()));
209 }
210 }
211 }
212 trace!("poll_loop yielding (self = {:p})", self);
213 task::yield_now(cx).map(|never| match never {})
214 }
215
216 fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
217 loop {
218 if self.is_closing {
219 return Poll::Ready(Ok(()));
220 } else if self.conn.can_read_head() {
221 ready!(self.poll_read_head(cx))?;
222 } else if let Some(mut body) = self.body_tx.take() {
223 if self.conn.can_read_body() {
224 match body.poll_ready(cx) {
225 Poll::Ready(Ok(())) => (),
226 Poll::Pending => {
227 self.body_tx.set(body);
228 return Poll::Pending;
229 }
230 Poll::Ready(Err(_canceled)) => {
231 trace!("body receiver dropped before eof, draining or closing");
234 self.conn.poll_drain_or_close_read(cx);
235 continue;
236 }
237 }
238 match self.conn.poll_read_body(cx) {
239 Poll::Ready(Some(Ok(frame))) => {
240 if frame.is_data() {
241 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
242 match body.try_send_data(chunk) {
243 Ok(()) => {
244 self.body_tx.set(body);
245 }
246 Err(_canceled) => {
247 if self.conn.can_read_body() {
248 trace!("body receiver dropped before eof, closing");
249 self.conn.close_read();
250 }
251 }
252 }
253 } else if frame.is_trailers() {
254 let trailers =
255 frame.into_trailers().unwrap_or_else(|_| unreachable!());
256 match body.try_send_trailers(trailers) {
257 Ok(()) => {
258 self.body_tx.set(body);
259 }
260 Err(_canceled) => {
261 if self.conn.can_read_body() {
262 trace!("body receiver dropped before eof, closing");
263 self.conn.close_read();
264 }
265 }
266 }
267 } else {
268 error!("unexpected frame");
270 }
271 }
272 Poll::Ready(None) => {
273 }
275 Poll::Pending => {
276 self.body_tx.set(body);
277 return Poll::Pending;
278 }
279 Poll::Ready(Some(Err(e))) => {
280 body.send_error(crate::Error::new_body(e));
281 }
282 }
283 } else {
284 }
286 } else {
287 return self.conn.poll_read_keep_alive(cx);
288 }
289 }
290 }
291
292 fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
293 match ready!(self.dispatch.poll_ready(cx)) {
295 Ok(()) => (),
296 Err(()) => {
297 trace!("dispatch no longer receiving messages");
298 self.close();
299 return Poll::Ready(Ok(()));
300 }
301 }
302
303 match ready!(self.conn.poll_read_head(cx)) {
305 Some(Ok((mut head, body_len, wants))) => {
306 let body = match body_len {
307 DecodedLength::ZERO => IncomingBody::empty(),
308 other => {
309 let (tx, rx) =
310 IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
311 self.body_tx.set(tx);
312 rx
313 }
314 };
315 if wants.contains(Wants::UPGRADE) {
316 let upgrade = self.conn.on_upgrade();
317 debug_assert!(!upgrade.is_none(), "empty upgrade");
318 debug_assert!(
319 head.extensions.get::<OnUpgrade>().is_none(),
320 "OnUpgrade already set"
321 );
322 head.extensions.insert(upgrade);
323 }
324 self.dispatch.recv_msg(Ok((head, body)))?;
325 Poll::Ready(Ok(()))
326 }
327 Some(Err(err)) => {
328 debug!("read_head error: {}", err);
329 self.dispatch.recv_msg(Err(err))?;
330 self.close();
334 Poll::Ready(Ok(()))
335 }
336 None => {
337 debug_assert!(self.conn.is_read_closed());
341 if self.conn.is_write_closed() {
342 self.close();
343 }
344 Poll::Ready(Ok(()))
345 }
346 }
347 }
348
349 fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
350 loop {
351 if self.is_closing {
352 return Poll::Ready(Ok(()));
353 } else if self.body_rx.is_none()
354 && self.conn.can_write_head()
355 && self.dispatch.should_poll()
356 {
357 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
358 let (head, body) = msg.map_err(crate::Error::new_user_service)?;
359
360 let body_type = if body.is_end_stream() {
361 self.body_rx.set(None);
362 None
363 } else {
364 let btype = body
365 .size_hint()
366 .exact()
367 .map(BodyLength::Known)
368 .or(Some(BodyLength::Unknown));
369 self.body_rx.set(Some(body));
370 btype
371 };
372 self.conn.write_head(head, body_type);
373 } else {
374 self.close();
375 return Poll::Ready(Ok(()));
376 }
377 } else if !self.conn.can_buffer_body() {
378 ready!(self.poll_flush(cx))?;
379 } else {
380 if let (Some(mut body), clear_body) =
382 OptGuard::new(self.body_rx.as_mut()).guard_mut()
383 {
384 debug_assert!(!*clear_body, "opt guard defaults to keeping body");
385 if !self.conn.can_write_body() {
386 trace!(
387 "no more write body allowed, user body is_end_stream = {}",
388 body.is_end_stream(),
389 );
390 *clear_body = true;
391 continue;
392 }
393
394 let item = ready!(body.as_mut().poll_frame(cx));
395 if let Some(item) = item {
396 let frame = item.map_err(|e| {
397 *clear_body = true;
398 crate::Error::new_user_body(e)
399 })?;
400
401 if frame.is_data() {
402 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
403 let eos = body.is_end_stream();
404 if eos {
405 *clear_body = true;
406 if chunk.remaining() == 0 {
407 trace!("discarding empty chunk");
408 self.conn.end_body()?;
409 } else {
410 self.conn.write_body_and_end(chunk);
411 }
412 } else {
413 if chunk.remaining() == 0 {
414 trace!("discarding empty chunk");
415 continue;
416 }
417 self.conn.write_body(chunk);
418 }
419 } else if frame.is_trailers() {
420 *clear_body = true;
421 self.conn.write_trailers(
422 frame.into_trailers().unwrap_or_else(|_| unreachable!()),
423 );
424 } else {
425 trace!("discarding unknown frame");
426 continue;
427 }
428 } else {
429 *clear_body = true;
430 self.conn.end_body()?;
431 }
432 } else {
433 if self.conn.can_write_body() {
435 self.conn.end_body()?;
436 } else {
437 return Poll::Pending;
438 }
439 }
440 }
441 }
442 }
443
444 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
445 self.conn.poll_flush(cx).map_err(|err| {
446 debug!("error writing: {}", err);
447 crate::Error::new_body_write(err)
448 })
449 }
450
451 fn close(&mut self) {
452 self.is_closing = true;
453 self.conn.close_read();
454 self.conn.close_write();
455 }
456
457 fn can_write_again(&mut self) -> bool {
460 !self.is_closing && self.body_rx.is_some() && self.conn.can_write_body()
461 }
462
463 fn is_done(&self) -> bool {
464 if self.is_closing {
465 return true;
466 }
467
468 let read_done = self.conn.is_read_closed();
469
470 if !T::should_read_first() && read_done {
471 true
473 } else {
474 let write_done = self.conn.is_write_closed()
475 || (!self.dispatch.should_poll() && self.body_rx.is_none());
476 read_done && write_done
477 }
478 }
479}
480
481impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
482where
483 D: Dispatch<
484 PollItem = MessageHead<T::Outgoing>,
485 PollBody = Bs,
486 RecvItem = MessageHead<T::Incoming>,
487 > + Unpin,
488 D::PollError: Into<Box<dyn StdError + Send + Sync>>,
489 I: Read + Write + Unpin,
490 T: Http1Transaction + Unpin,
491 Bs: Body + 'static,
492 Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
493{
494 type Output = crate::Result<Dispatched>;
495
496 #[inline]
497 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
498 self.poll_catch(cx, true)
499 }
500}
501
502struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
507
508impl<'a, T> OptGuard<'a, T> {
509 fn new(pin: Pin<&'a mut Option<T>>) -> Self {
510 OptGuard(pin, false)
511 }
512
513 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
514 (self.0.as_mut().as_pin_mut(), &mut self.1)
515 }
516}
517
518impl<T> Drop for OptGuard<'_, T> {
519 fn drop(&mut self) {
520 if self.1 {
521 self.0.set(None);
522 }
523 }
524}
525
526struct SenderDropGuard(Option<crate::body::Sender>);
535
536impl SenderDropGuard {
537 fn none() -> Self {
538 SenderDropGuard(None)
539 }
540
541 fn set(&mut self, sender: crate::body::Sender) {
542 self.0 = Some(sender);
543 }
544
545 fn take(&mut self) -> Option<crate::body::Sender> {
546 self.0.take()
547 }
548}
549
550impl Drop for SenderDropGuard {
551 fn drop(&mut self) {
552 if let Some(mut sender) = self.0.take() {
553 sender.send_error(crate::Error::new_incomplete());
554 }
555 }
556}
557
558cfg_server! {
561 impl<S, B> Server<S, B>
562 where
563 S: HttpService<B>,
564 {
565 pub(crate) fn new(service: S) -> Server<S, B> {
566 Server {
567 in_flight: Box::pin(None),
568 service,
569 }
570 }
571
572 pub(crate) fn into_service(self) -> S {
573 self.service
574 }
575 }
576
577 impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
579
580 impl<S, Bs> Dispatch for Server<S, IncomingBody>
581 where
582 S: HttpService<IncomingBody, ResBody = Bs>,
583 S::Error: Into<Box<dyn StdError + Send + Sync>>,
584 Bs: Body,
585 {
586 type PollItem = MessageHead<http::StatusCode>;
587 type PollBody = Bs;
588 type PollError = S::Error;
589 type RecvItem = RequestHead;
590
591 fn poll_msg(
592 mut self: Pin<&mut Self>,
593 cx: &mut Context<'_>,
594 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
595 let mut this = self.as_mut();
596 let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
597 let resp = ready!(fut.as_mut().poll(cx)?);
598 let (parts, body) = resp.into_parts();
599 let head = MessageHead {
600 version: parts.version,
601 subject: parts.status,
602 headers: parts.headers,
603 extensions: parts.extensions,
604 };
605 Poll::Ready(Some(Ok((head, body))))
606 } else {
607 unreachable!("poll_msg shouldn't be called if no inflight");
608 };
609
610 this.in_flight.set(None);
612 ret
613 }
614
615 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> {
616 let (msg, body) = msg?;
617 let mut req = Request::new(body);
618 *req.method_mut() = msg.subject.0;
619 *req.uri_mut() = msg.subject.1;
620 *req.headers_mut() = msg.headers;
621 *req.version_mut() = msg.version;
622 *req.extensions_mut() = msg.extensions;
623 let fut = self.service.call(req);
624 self.in_flight.set(Some(fut));
625 Ok(())
626 }
627
628 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
629 if self.in_flight.is_some() {
630 Poll::Pending
631 } else {
632 Poll::Ready(Ok(()))
633 }
634 }
635
636 fn should_poll(&self) -> bool {
637 self.in_flight.is_some()
638 }
639 }
640}
641
642cfg_client! {
645 use std::convert::Infallible;
646
647 impl<B> Client<B> {
648 pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
649 Client {
650 callback: None,
651 rx,
652 rx_closed: false,
653 }
654 }
655 }
656
657 impl<B> Dispatch for Client<B>
658 where
659 B: Body,
660 {
661 type PollItem = RequestHead;
662 type PollBody = B;
663 type PollError = Infallible;
664 type RecvItem = crate::proto::ResponseHead;
665
666 fn poll_msg(
667 mut self: Pin<&mut Self>,
668 cx: &mut Context<'_>,
669 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Infallible>>> {
670 let mut this = self.as_mut();
671 debug_assert!(!this.rx_closed);
672 match this.rx.poll_recv(cx) {
673 Poll::Ready(Some((req, mut cb))) => {
674 match cb.poll_canceled(cx) {
676 Poll::Ready(()) => {
677 trace!("request canceled");
678 Poll::Ready(None)
679 }
680 Poll::Pending => {
681 let (parts, body) = req.into_parts();
682 let head = RequestHead {
683 version: parts.version,
684 subject: crate::proto::RequestLine(parts.method, parts.uri),
685 headers: parts.headers,
686 extensions: parts.extensions,
687 };
688 this.callback = Some(cb);
689 Poll::Ready(Some(Ok((head, body))))
690 }
691 }
692 }
693 Poll::Ready(None) => {
694 trace!("client tx closed");
696 this.rx_closed = true;
697 Poll::Ready(None)
698 }
699 Poll::Pending => Poll::Pending,
700 }
701 }
702
703 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> {
704 match msg {
705 Ok((msg, body)) => {
706 if let Some(cb) = self.callback.take() {
707 let res = msg.into_response(body);
708 cb.send(Ok(res));
709 Ok(())
710 } else {
711 Err(crate::Error::new_unexpected_message())
715 }
716 }
717 Err(err) => {
718 if let Some(cb) = self.callback.take() {
719 cb.send(Err(TrySendError {
720 error: err,
721 message: None,
722 }));
723 Ok(())
724 } else if !self.rx_closed {
725 self.rx.close();
726 if let Some((req, cb)) = self.rx.try_recv() {
727 trace!("canceling queued request with connection error: {}", err);
728 cb.send(Err(TrySendError {
731 error: crate::Error::new_canceled().with(err),
732 message: Some(req),
733 }));
734 Ok(())
735 } else {
736 Err(err)
737 }
738 } else {
739 Err(err)
740 }
741 }
742 }
743 }
744
745 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
746 match self.callback {
747 Some(ref mut cb) => match cb.poll_canceled(cx) {
748 Poll::Ready(()) => {
749 trace!("callback receiver has dropped");
750 Poll::Ready(Err(()))
751 }
752 Poll::Pending => Poll::Ready(Ok(())),
753 },
754 None => Poll::Ready(Err(())),
755 }
756 }
757
758 fn should_poll(&self) -> bool {
759 self.callback.is_none()
760 }
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use crate::common::io::Compat;
768 use crate::proto::h1::ClientTransaction;
769 use std::time::Duration;
770
771 #[test]
772 fn client_read_bytes_before_writing_request() {
773 let _ = pretty_env_logger::try_init();
774
775 tokio_test::task::spawn(()).enter(|cx, _| {
776 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
777
778 let (mut tx, rx) = crate::client::dispatch::channel();
781 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
782 let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
783
784 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
786
787 handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
790
791 let mut res_rx = tx
792 .try_send(crate::Request::new(IncomingBody::empty()))
793 .unwrap();
794
795 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
796 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
797 .expect_err("callback should send error");
798
799 match (err.error.is_canceled(), err.message.as_ref()) {
800 (true, Some(_)) => (),
801 _ => panic!("expected Canceled, got {:?}", err),
802 }
803 });
804 }
805
806 #[cfg(not(miri))]
807 #[tokio::test]
808 async fn client_flushing_is_not_ready_for_next_request() {
809 let _ = pretty_env_logger::try_init();
810
811 let (io, _handle) = tokio_test::io::Builder::new()
812 .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
813 .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
814 .wait(std::time::Duration::from_secs(2))
815 .build_with_handle();
816
817 let (mut tx, rx) = crate::client::dispatch::channel();
818 let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
819 conn.set_write_strategy_queue();
820
821 let dispatcher = Dispatcher::new(Client::new(rx), conn);
822 let _dispatcher = tokio::spawn(async move { dispatcher.await });
823
824 let body = {
825 let (mut tx, body) = IncomingBody::new_channel(DecodedLength::new(4), false);
826 tx.try_send_data("reee".into()).unwrap();
827 body
828 };
829
830 let req = crate::Request::builder().method("POST").body(body).unwrap();
831
832 let res = tx.try_send(req).unwrap().await.expect("response");
833 drop(res);
834
835 assert!(!tx.is_ready());
836 }
837
838 #[cfg(not(miri))]
839 #[tokio::test]
840 async fn body_empty_chunks_ignored() {
841 let _ = pretty_env_logger::try_init();
842
843 let io = tokio_test::io::Builder::new()
844 .wait(Duration::from_secs(5))
846 .build();
847
848 let (mut tx, rx) = crate::client::dispatch::channel();
849 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
850 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
851
852 assert!(dispatcher.poll().is_pending());
854
855 let body = {
856 let (mut tx, body) = IncomingBody::channel();
857 tx.try_send_data("".into()).unwrap();
858 body
859 };
860
861 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
862
863 assert!(dispatcher.poll().is_pending());
866 }
867}