Skip to main content

hyper/proto/h1/
dispatch.rs

1use std::{
2    error::Error as StdError,
3    future::Future,
4    marker::Unpin,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use crate::rt::{Read, Write};
10use bytes::{Buf, Bytes};
11use futures_core::ready;
12use http::Request;
13
14use super::{Http1Transaction, Wants};
15use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
16#[cfg(feature = "client")]
17use crate::client::dispatch::TrySendError;
18use crate::common::task;
19use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
20use crate::upgrade::OnUpgrade;
21
22pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
23    conn: Conn<I, Bs::Data, T>,
24    dispatch: D,
25    body_tx: SenderDropGuard,
26    body_rx: Pin<Box<Option<Bs>>>,
27    is_closing: bool,
28}
29
30pub(crate) trait Dispatch {
31    type PollItem;
32    type PollBody;
33    type PollError;
34    type RecvItem;
35    fn poll_msg(
36        self: Pin<&mut Self>,
37        cx: &mut Context<'_>,
38    ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
39    fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
40        -> crate::Result<()>;
41    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
42    fn should_poll(&self) -> bool;
43}
44
45cfg_server! {
46    use crate::service::HttpService;
47
48    pub(crate) struct Server<S: HttpService<B>, B> {
49        in_flight: Pin<Box<Option<S::Future>>>,
50        pub(crate) service: S,
51    }
52}
53
54cfg_client! {
55    pin_project_lite::pin_project! {
56        pub(crate) struct Client<B> {
57            callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<IncomingBody>>>,
58            #[pin]
59            rx: ClientRx<B>,
60            rx_closed: bool,
61        }
62    }
63
64    type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<IncomingBody>>;
65}
66
67impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
68where
69    D: Dispatch<
70            PollItem = MessageHead<T::Outgoing>,
71            PollBody = Bs,
72            RecvItem = MessageHead<T::Incoming>,
73        > + Unpin,
74    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
75    I: Read + Write + Unpin,
76    T: Http1Transaction + Unpin,
77    Bs: Body + 'static,
78    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
79{
80    pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
81        Dispatcher {
82            conn,
83            dispatch,
84            body_tx: SenderDropGuard::none(),
85            body_rx: Box::pin(None),
86            is_closing: false,
87        }
88    }
89
90    #[cfg(feature = "server")]
91    pub(crate) fn disable_keep_alive(&mut self) {
92        self.conn.disable_keep_alive();
93
94        // If keep alive has been disabled and no read or write has been seen on
95        // the connection yet, we must be in a state where the server is being asked to
96        // shut down before any data has been seen on the connection
97        if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
98            self.close();
99        }
100    }
101
102    pub(crate) fn into_inner(self) -> (I, Bytes, D) {
103        let (io, buf) = self.conn.into_inner();
104        (io, buf, self.dispatch)
105    }
106
107    /// Run this dispatcher until HTTP says this connection is done,
108    /// but don't call `Write::shutdown` on the underlying IO.
109    ///
110    /// This is useful for old-style HTTP upgrades, but ignores
111    /// newer-style upgrade API.
112    pub(crate) fn poll_without_shutdown(
113        &mut self,
114        cx: &mut Context<'_>,
115    ) -> Poll<crate::Result<()>> {
116        Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
117            if let Dispatched::Upgrade(pending) = ds {
118                pending.manual();
119            }
120        })
121    }
122
123    fn poll_catch(
124        &mut self,
125        cx: &mut Context<'_>,
126        should_shutdown: bool,
127    ) -> Poll<crate::Result<Dispatched>> {
128        Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
129            // Be sure to alert a streaming body of the failure with a
130            // more specific error than the drop guard would provide.
131            if let Some(mut body) = self.body_tx.take() {
132                body.send_error(crate::Error::new_body("connection error"));
133            }
134            // An error means we're shutting down either way.
135            // We just try to give the error to the user,
136            // and close the connection with an Ok. If we
137            // cannot give it to the user, then return the Err.
138            self.dispatch.recv_msg(Err(e))?;
139            Ok(Dispatched::Shutdown)
140        }))
141    }
142
143    fn poll_inner(
144        &mut self,
145        cx: &mut Context<'_>,
146        should_shutdown: bool,
147    ) -> Poll<crate::Result<Dispatched>> {
148        T::update_date();
149
150        ready!(self.poll_loop(cx))?;
151
152        if self.is_done() {
153            if let Some(pending) = self.conn.pending_upgrade() {
154                self.conn.take_error()?;
155                return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
156            } else if should_shutdown {
157                ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
158            }
159            self.conn.take_error()?;
160            Poll::Ready(Ok(Dispatched::Shutdown))
161        } else {
162            Poll::Pending
163        }
164    }
165
166    fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
167        // Limit the looping on this connection, in case it is ready far too
168        // often, so that other futures don't starve.
169        //
170        // 16 was chosen arbitrarily, as that is number of pipelined requests
171        // benchmarks often use. Perhaps it should be a config option instead.
172        for _ in 0..16 {
173            let _ = self.poll_read(cx)?;
174            let write_ready = self.poll_write(cx)?.is_ready();
175            let flush_ready = self.poll_flush(cx)?.is_ready();
176
177            // If we can write more body and the connection is ready, we should
178            // write again. If we return `Ready(Ok(())` here, we will yield
179            // without a guaranteed wake-up from the write side of the connection.
180            // This would lead to a deadlock if we also don't expect reads.
181            let wants_write_again = self.can_write_again() && (write_ready || flush_ready);
182
183            // This could happen if reading paused before blocking on IO,
184            // such as getting to the end of a framed message, but then
185            // writing/flushing set the state back to Init. In that case,
186            // if the read buffer still had bytes, we'd want to try poll_read
187            // again, or else we wouldn't ever be woken up again.
188            //
189            // Using this instead of task::current() and notify() inside
190            // the Conn is noticeably faster in pipelined benchmarks.
191            let wants_read_again = self.conn.wants_read_again();
192
193            // If we cannot write or read again, we yield and rely on the
194            // wake-up from the connection futures.
195            if !(wants_write_again || wants_read_again) {
196                return Poll::Ready(Ok(()));
197            }
198
199            // If we are continuing only because "wants_write_again", re-check whether a second
200            // write poll can make progress. `poll_flush` can be ready even when there is no
201            // buffered data and the request body is still pending, so relying on the previous
202            // readiness can hot-loop.
203            if !wants_read_again && wants_write_again {
204                // Write was previously pending, but may have become ready since polling flush, so
205                // we need to check it again. If it is still pending, it is safe to yield and rely
206                // on wake-up from the connection futures.
207                if self.poll_write(cx)?.is_pending() {
208                    return Poll::Ready(Ok(()));
209                }
210            }
211        }
212        trace!("poll_loop yielding (self = {:p})", self);
213        task::yield_now(cx).map(|never| match never {})
214    }
215
216    fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
217        loop {
218            if self.is_closing {
219                return Poll::Ready(Ok(()));
220            } else if self.conn.can_read_head() {
221                ready!(self.poll_read_head(cx))?;
222            } else if let Some(mut body) = self.body_tx.take() {
223                if self.conn.can_read_body() {
224                    match body.poll_ready(cx) {
225                        Poll::Ready(Ok(())) => (),
226                        Poll::Pending => {
227                            self.body_tx.set(body);
228                            return Poll::Pending;
229                        }
230                        Poll::Ready(Err(_canceled)) => {
231                            // user doesn't care about the body
232                            // so we should stop reading
233                            trace!("body receiver dropped before eof, draining or closing");
234                            self.conn.poll_drain_or_close_read(cx);
235                            continue;
236                        }
237                    }
238                    match self.conn.poll_read_body(cx) {
239                        Poll::Ready(Some(Ok(frame))) => {
240                            if frame.is_data() {
241                                let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
242                                match body.try_send_data(chunk) {
243                                    Ok(()) => {
244                                        self.body_tx.set(body);
245                                    }
246                                    Err(_canceled) => {
247                                        if self.conn.can_read_body() {
248                                            trace!("body receiver dropped before eof, closing");
249                                            self.conn.close_read();
250                                        }
251                                    }
252                                }
253                            } else if frame.is_trailers() {
254                                let trailers =
255                                    frame.into_trailers().unwrap_or_else(|_| unreachable!());
256                                match body.try_send_trailers(trailers) {
257                                    Ok(()) => {
258                                        self.body_tx.set(body);
259                                    }
260                                    Err(_canceled) => {
261                                        if self.conn.can_read_body() {
262                                            trace!("body receiver dropped before eof, closing");
263                                            self.conn.close_read();
264                                        }
265                                    }
266                                }
267                            } else {
268                                // we should have dropped all unknown frames in poll_read_body
269                                error!("unexpected frame");
270                            }
271                        }
272                        Poll::Ready(None) => {
273                            // just drop, the body will close automatically
274                        }
275                        Poll::Pending => {
276                            self.body_tx.set(body);
277                            return Poll::Pending;
278                        }
279                        Poll::Ready(Some(Err(e))) => {
280                            body.send_error(crate::Error::new_body(e));
281                        }
282                    }
283                } else {
284                    // just drop, the body will close automatically
285                }
286            } else {
287                return self.conn.poll_read_keep_alive(cx);
288            }
289        }
290    }
291
292    fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
293        // can dispatch receive, or does it still care about other incoming message?
294        match ready!(self.dispatch.poll_ready(cx)) {
295            Ok(()) => (),
296            Err(()) => {
297                trace!("dispatch no longer receiving messages");
298                self.close();
299                return Poll::Ready(Ok(()));
300            }
301        }
302
303        // dispatch is ready for a message, try to read one
304        match ready!(self.conn.poll_read_head(cx)) {
305            Some(Ok((mut head, body_len, wants))) => {
306                let body = match body_len {
307                    DecodedLength::ZERO => IncomingBody::empty(),
308                    other => {
309                        let (tx, rx) =
310                            IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
311                        self.body_tx.set(tx);
312                        rx
313                    }
314                };
315                if wants.contains(Wants::UPGRADE) {
316                    let upgrade = self.conn.on_upgrade();
317                    debug_assert!(!upgrade.is_none(), "empty upgrade");
318                    debug_assert!(
319                        head.extensions.get::<OnUpgrade>().is_none(),
320                        "OnUpgrade already set"
321                    );
322                    head.extensions.insert(upgrade);
323                }
324                self.dispatch.recv_msg(Ok((head, body)))?;
325                Poll::Ready(Ok(()))
326            }
327            Some(Err(err)) => {
328                debug!("read_head error: {}", err);
329                self.dispatch.recv_msg(Err(err))?;
330                // if here, the dispatcher gave the user the error
331                // somewhere else. we still need to shutdown, but
332                // not as a second error.
333                self.close();
334                Poll::Ready(Ok(()))
335            }
336            None => {
337                // read eof, the write side will have been closed too unless
338                // allow_read_close was set to true, in which case just do
339                // nothing...
340                debug_assert!(self.conn.is_read_closed());
341                if self.conn.is_write_closed() {
342                    self.close();
343                }
344                Poll::Ready(Ok(()))
345            }
346        }
347    }
348
349    fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
350        loop {
351            if self.is_closing {
352                return Poll::Ready(Ok(()));
353            } else if self.body_rx.is_none()
354                && self.conn.can_write_head()
355                && self.dispatch.should_poll()
356            {
357                if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
358                    let (head, body) = msg.map_err(crate::Error::new_user_service)?;
359
360                    let body_type = if body.is_end_stream() {
361                        self.body_rx.set(None);
362                        None
363                    } else {
364                        let btype = body
365                            .size_hint()
366                            .exact()
367                            .map(BodyLength::Known)
368                            .or(Some(BodyLength::Unknown));
369                        self.body_rx.set(Some(body));
370                        btype
371                    };
372                    self.conn.write_head(head, body_type);
373                } else {
374                    self.close();
375                    return Poll::Ready(Ok(()));
376                }
377            } else if !self.conn.can_buffer_body() {
378                ready!(self.poll_flush(cx))?;
379            } else {
380                // A new scope is needed :(
381                if let (Some(mut body), clear_body) =
382                    OptGuard::new(self.body_rx.as_mut()).guard_mut()
383                {
384                    debug_assert!(!*clear_body, "opt guard defaults to keeping body");
385                    if !self.conn.can_write_body() {
386                        trace!(
387                            "no more write body allowed, user body is_end_stream = {}",
388                            body.is_end_stream(),
389                        );
390                        *clear_body = true;
391                        continue;
392                    }
393
394                    let item = ready!(body.as_mut().poll_frame(cx));
395                    if let Some(item) = item {
396                        let frame = item.map_err(|e| {
397                            *clear_body = true;
398                            crate::Error::new_user_body(e)
399                        })?;
400
401                        if frame.is_data() {
402                            let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
403                            let eos = body.is_end_stream();
404                            if eos {
405                                *clear_body = true;
406                                if chunk.remaining() == 0 {
407                                    trace!("discarding empty chunk");
408                                    self.conn.end_body()?;
409                                } else {
410                                    self.conn.write_body_and_end(chunk);
411                                }
412                            } else {
413                                if chunk.remaining() == 0 {
414                                    trace!("discarding empty chunk");
415                                    continue;
416                                }
417                                self.conn.write_body(chunk);
418                            }
419                        } else if frame.is_trailers() {
420                            *clear_body = true;
421                            self.conn.write_trailers(
422                                frame.into_trailers().unwrap_or_else(|_| unreachable!()),
423                            );
424                        } else {
425                            trace!("discarding unknown frame");
426                            continue;
427                        }
428                    } else {
429                        *clear_body = true;
430                        self.conn.end_body()?;
431                    }
432                } else {
433                    // If there's no body_rx, end the body
434                    if self.conn.can_write_body() {
435                        self.conn.end_body()?;
436                    } else {
437                        return Poll::Pending;
438                    }
439                }
440            }
441        }
442    }
443
444    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
445        self.conn.poll_flush(cx).map_err(|err| {
446            debug!("error writing: {}", err);
447            crate::Error::new_body_write(err)
448        })
449    }
450
451    fn close(&mut self) {
452        self.is_closing = true;
453        self.conn.close_read();
454        self.conn.close_write();
455    }
456
457    /// If there is pending data in `body_rx`, and the connection is still in a body-writing state,
458    /// we can make progress writing if the connection is ready.
459    fn can_write_again(&mut self) -> bool {
460        !self.is_closing && self.body_rx.is_some() && self.conn.can_write_body()
461    }
462
463    fn is_done(&self) -> bool {
464        if self.is_closing {
465            return true;
466        }
467
468        let read_done = self.conn.is_read_closed();
469
470        if !T::should_read_first() && read_done {
471            // a client that cannot read may was well be done.
472            true
473        } else {
474            let write_done = self.conn.is_write_closed()
475                || (!self.dispatch.should_poll() && self.body_rx.is_none());
476            read_done && write_done
477        }
478    }
479}
480
481impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
482where
483    D: Dispatch<
484            PollItem = MessageHead<T::Outgoing>,
485            PollBody = Bs,
486            RecvItem = MessageHead<T::Incoming>,
487        > + Unpin,
488    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
489    I: Read + Write + Unpin,
490    T: Http1Transaction + Unpin,
491    Bs: Body + 'static,
492    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
493{
494    type Output = crate::Result<Dispatched>;
495
496    #[inline]
497    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
498        self.poll_catch(cx, true)
499    }
500}
501
502// ===== impl OptGuard =====
503
504/// A drop guard to allow a mutable borrow of an Option while being able to
505/// set whether the `Option` should be cleared on drop.
506struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
507
508impl<'a, T> OptGuard<'a, T> {
509    fn new(pin: Pin<&'a mut Option<T>>) -> Self {
510        OptGuard(pin, false)
511    }
512
513    fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
514        (self.0.as_mut().as_pin_mut(), &mut self.1)
515    }
516}
517
518impl<T> Drop for OptGuard<'_, T> {
519    fn drop(&mut self) {
520        if self.1 {
521            self.0.set(None);
522        }
523    }
524}
525
526// ===== impl SenderDropGuard =====
527
528/// A drop guard for the body `Sender`.
529///
530/// If the `Dispatcher` future is dropped (e.g. the runtime driving the
531/// connection is shut down) while it still owns a body `Sender`, the guard
532/// sends an incomplete-message error so the receiver sees an error instead
533/// of a silent, clean end-of-stream.
534struct SenderDropGuard(Option<crate::body::Sender>);
535
536impl SenderDropGuard {
537    fn none() -> Self {
538        SenderDropGuard(None)
539    }
540
541    fn set(&mut self, sender: crate::body::Sender) {
542        self.0 = Some(sender);
543    }
544
545    fn take(&mut self) -> Option<crate::body::Sender> {
546        self.0.take()
547    }
548}
549
550impl Drop for SenderDropGuard {
551    fn drop(&mut self) {
552        if let Some(mut sender) = self.0.take() {
553            sender.send_error(crate::Error::new_incomplete());
554        }
555    }
556}
557
558// ===== impl Server =====
559
560cfg_server! {
561    impl<S, B> Server<S, B>
562    where
563        S: HttpService<B>,
564    {
565        pub(crate) fn new(service: S) -> Server<S, B> {
566            Server {
567                in_flight: Box::pin(None),
568                service,
569            }
570        }
571
572        pub(crate) fn into_service(self) -> S {
573            self.service
574        }
575    }
576
577    // Service is never pinned
578    impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
579
580    impl<S, Bs> Dispatch for Server<S, IncomingBody>
581    where
582        S: HttpService<IncomingBody, ResBody = Bs>,
583        S::Error: Into<Box<dyn StdError + Send + Sync>>,
584        Bs: Body,
585    {
586        type PollItem = MessageHead<http::StatusCode>;
587        type PollBody = Bs;
588        type PollError = S::Error;
589        type RecvItem = RequestHead;
590
591        fn poll_msg(
592            mut self: Pin<&mut Self>,
593            cx: &mut Context<'_>,
594        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
595            let mut this = self.as_mut();
596            let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
597                let resp = ready!(fut.as_mut().poll(cx)?);
598                let (parts, body) = resp.into_parts();
599                let head = MessageHead {
600                    version: parts.version,
601                    subject: parts.status,
602                    headers: parts.headers,
603                    extensions: parts.extensions,
604                };
605                Poll::Ready(Some(Ok((head, body))))
606            } else {
607                unreachable!("poll_msg shouldn't be called if no inflight");
608            };
609
610            // Since in_flight finished, remove it
611            this.in_flight.set(None);
612            ret
613        }
614
615        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> {
616            let (msg, body) = msg?;
617            let mut req = Request::new(body);
618            *req.method_mut() = msg.subject.0;
619            *req.uri_mut() = msg.subject.1;
620            *req.headers_mut() = msg.headers;
621            *req.version_mut() = msg.version;
622            *req.extensions_mut() = msg.extensions;
623            let fut = self.service.call(req);
624            self.in_flight.set(Some(fut));
625            Ok(())
626        }
627
628        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
629            if self.in_flight.is_some() {
630                Poll::Pending
631            } else {
632                Poll::Ready(Ok(()))
633            }
634        }
635
636        fn should_poll(&self) -> bool {
637            self.in_flight.is_some()
638        }
639    }
640}
641
642// ===== impl Client =====
643
644cfg_client! {
645    use std::convert::Infallible;
646
647    impl<B> Client<B> {
648        pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
649            Client {
650                callback: None,
651                rx,
652                rx_closed: false,
653            }
654        }
655    }
656
657    impl<B> Dispatch for Client<B>
658    where
659        B: Body,
660    {
661        type PollItem = RequestHead;
662        type PollBody = B;
663        type PollError = Infallible;
664        type RecvItem = crate::proto::ResponseHead;
665
666        fn poll_msg(
667            mut self: Pin<&mut Self>,
668            cx: &mut Context<'_>,
669        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Infallible>>> {
670            let mut this = self.as_mut();
671            debug_assert!(!this.rx_closed);
672            match this.rx.poll_recv(cx) {
673                Poll::Ready(Some((req, mut cb))) => {
674                    // check that future hasn't been canceled already
675                    match cb.poll_canceled(cx) {
676                        Poll::Ready(()) => {
677                            trace!("request canceled");
678                            Poll::Ready(None)
679                        }
680                        Poll::Pending => {
681                            let (parts, body) = req.into_parts();
682                            let head = RequestHead {
683                                version: parts.version,
684                                subject: crate::proto::RequestLine(parts.method, parts.uri),
685                                headers: parts.headers,
686                                extensions: parts.extensions,
687                            };
688                            this.callback = Some(cb);
689                            Poll::Ready(Some(Ok((head, body))))
690                        }
691                    }
692                }
693                Poll::Ready(None) => {
694                    // user has dropped sender handle
695                    trace!("client tx closed");
696                    this.rx_closed = true;
697                    Poll::Ready(None)
698                }
699                Poll::Pending => Poll::Pending,
700            }
701        }
702
703        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> {
704            match msg {
705                Ok((msg, body)) => {
706                    if let Some(cb) = self.callback.take() {
707                        let res = msg.into_response(body);
708                        cb.send(Ok(res));
709                        Ok(())
710                    } else {
711                        // Getting here is likely a bug! An error should have happened
712                        // in Conn::require_empty_read() before ever parsing a
713                        // full message!
714                        Err(crate::Error::new_unexpected_message())
715                    }
716                }
717                Err(err) => {
718                    if let Some(cb) = self.callback.take() {
719                        cb.send(Err(TrySendError {
720                            error: err,
721                            message: None,
722                        }));
723                        Ok(())
724                    } else if !self.rx_closed {
725                        self.rx.close();
726                        if let Some((req, cb)) = self.rx.try_recv() {
727                            trace!("canceling queued request with connection error: {}", err);
728                            // in this case, the message was never even started, so it's safe to tell
729                            // the user that the request was completely canceled
730                            cb.send(Err(TrySendError {
731                                error: crate::Error::new_canceled().with(err),
732                                message: Some(req),
733                            }));
734                            Ok(())
735                        } else {
736                            Err(err)
737                        }
738                    } else {
739                        Err(err)
740                    }
741                }
742            }
743        }
744
745        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
746            match self.callback {
747                Some(ref mut cb) => match cb.poll_canceled(cx) {
748                    Poll::Ready(()) => {
749                        trace!("callback receiver has dropped");
750                        Poll::Ready(Err(()))
751                    }
752                    Poll::Pending => Poll::Ready(Ok(())),
753                },
754                None => Poll::Ready(Err(())),
755            }
756        }
757
758        fn should_poll(&self) -> bool {
759            self.callback.is_none()
760        }
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use super::*;
767    use crate::common::io::Compat;
768    use crate::proto::h1::ClientTransaction;
769    use std::time::Duration;
770
771    #[test]
772    fn client_read_bytes_before_writing_request() {
773        let _ = pretty_env_logger::try_init();
774
775        tokio_test::task::spawn(()).enter(|cx, _| {
776            let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
777
778            // Block at 0 for now, but we will release this response before
779            // the request is ready to write later...
780            let (mut tx, rx) = crate::client::dispatch::channel();
781            let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
782            let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
783
784            // First poll is needed to allow tx to send...
785            assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
786
787            // Unblock our IO, which has a response before we've sent request!
788            //
789            handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
790
791            let mut res_rx = tx
792                .try_send(crate::Request::new(IncomingBody::empty()))
793                .unwrap();
794
795            tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
796            let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
797                .expect_err("callback should send error");
798
799            match (err.error.is_canceled(), err.message.as_ref()) {
800                (true, Some(_)) => (),
801                _ => panic!("expected Canceled, got {:?}", err),
802            }
803        });
804    }
805
806    #[cfg(not(miri))]
807    #[tokio::test]
808    async fn client_flushing_is_not_ready_for_next_request() {
809        let _ = pretty_env_logger::try_init();
810
811        let (io, _handle) = tokio_test::io::Builder::new()
812            .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
813            .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
814            .wait(std::time::Duration::from_secs(2))
815            .build_with_handle();
816
817        let (mut tx, rx) = crate::client::dispatch::channel();
818        let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
819        conn.set_write_strategy_queue();
820
821        let dispatcher = Dispatcher::new(Client::new(rx), conn);
822        let _dispatcher = tokio::spawn(async move { dispatcher.await });
823
824        let body = {
825            let (mut tx, body) = IncomingBody::new_channel(DecodedLength::new(4), false);
826            tx.try_send_data("reee".into()).unwrap();
827            body
828        };
829
830        let req = crate::Request::builder().method("POST").body(body).unwrap();
831
832        let res = tx.try_send(req).unwrap().await.expect("response");
833        drop(res);
834
835        assert!(!tx.is_ready());
836    }
837
838    #[cfg(not(miri))]
839    #[tokio::test]
840    async fn body_empty_chunks_ignored() {
841        let _ = pretty_env_logger::try_init();
842
843        let io = tokio_test::io::Builder::new()
844            // no reading or writing, just be blocked for the test...
845            .wait(Duration::from_secs(5))
846            .build();
847
848        let (mut tx, rx) = crate::client::dispatch::channel();
849        let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io));
850        let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
851
852        // First poll is needed to allow tx to send...
853        assert!(dispatcher.poll().is_pending());
854
855        let body = {
856            let (mut tx, body) = IncomingBody::channel();
857            tx.try_send_data("".into()).unwrap();
858            body
859        };
860
861        let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
862
863        // Ensure conn.write_body wasn't called with the empty chunk.
864        // If it is, it will trigger an assertion.
865        assert!(dispatcher.poll().is_pending());
866    }
867}