hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::h2::UpgradedSendStream;
30use crate::proto::Dispatched;
31use crate::rt::bounds::Http2ClientConnExec;
32use crate::upgrade::Upgraded;
33use crate::{Request, Response};
34use h2::client::ResponseFuture;
35
36type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
37
38///// An mpsc channel is used to help notify the `Connection` task when *all*
39///// other handles to it have been dropped, so that it can shutdown.
40type ConnDropRef = mpsc::Sender<Infallible>;
41
42///// A oneshot channel watches the `Connection` task, and when it completes,
43///// the "dispatch" task will be notified and can shutdown sooner.
44type ConnEof = oneshot::Receiver<Infallible>;
45
46// Our defaults are chosen for the "majority" case, which usually are not
47// resource constrained, and so the spec default of 64kb can be too limiting
48// for performance.
49const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
50const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
51const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
52const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
53const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
54
55// The maximum number of concurrent streams that the client is allowed to open
56// before it receives the initial SETTINGS frame from the server.
57// This default value is derived from what the HTTP/2 spec recommends as the
58// minimum value that endpoints advertise to their peers. It means that using
59// this value will minimize the chance of the failure where the local endpoint
60// attempts to open too many streams and gets rejected by the remote peer with
61// the `REFUSED_STREAM` error.
62const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
63
64#[derive(Clone, Debug)]
65pub(crate) struct Config {
66    pub(crate) adaptive_window: bool,
67    pub(crate) initial_conn_window_size: u32,
68    pub(crate) initial_stream_window_size: u32,
69    pub(crate) initial_max_send_streams: usize,
70    pub(crate) max_frame_size: Option<u32>,
71    pub(crate) max_header_list_size: u32,
72    pub(crate) keep_alive_interval: Option<Duration>,
73    pub(crate) keep_alive_timeout: Duration,
74    pub(crate) keep_alive_while_idle: bool,
75    pub(crate) max_concurrent_reset_streams: Option<usize>,
76    pub(crate) max_send_buffer_size: usize,
77    pub(crate) max_pending_accept_reset_streams: Option<usize>,
78    pub(crate) header_table_size: Option<u32>,
79    pub(crate) max_concurrent_streams: Option<u32>,
80}
81
82impl Default for Config {
83    fn default() -> Config {
84        Config {
85            adaptive_window: false,
86            initial_conn_window_size: DEFAULT_CONN_WINDOW,
87            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
88            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
89            max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
90            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
91            keep_alive_interval: None,
92            keep_alive_timeout: Duration::from_secs(20),
93            keep_alive_while_idle: false,
94            max_concurrent_reset_streams: None,
95            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
96            max_pending_accept_reset_streams: None,
97            header_table_size: None,
98            max_concurrent_streams: None,
99        }
100    }
101}
102
103fn new_builder(config: &Config) -> Builder {
104    let mut builder = Builder::default();
105    builder
106        .initial_max_send_streams(config.initial_max_send_streams)
107        .initial_window_size(config.initial_stream_window_size)
108        .initial_connection_window_size(config.initial_conn_window_size)
109        .max_header_list_size(config.max_header_list_size)
110        .max_send_buffer_size(config.max_send_buffer_size)
111        .enable_push(false);
112    if let Some(max) = config.max_frame_size {
113        builder.max_frame_size(max);
114    }
115    if let Some(max) = config.max_concurrent_reset_streams {
116        builder.max_concurrent_reset_streams(max);
117    }
118    if let Some(max) = config.max_pending_accept_reset_streams {
119        builder.max_pending_accept_reset_streams(max);
120    }
121    if let Some(size) = config.header_table_size {
122        builder.header_table_size(size);
123    }
124    if let Some(max) = config.max_concurrent_streams {
125        builder.max_concurrent_streams(max);
126    }
127    builder
128}
129
130fn new_ping_config(config: &Config) -> ping::Config {
131    ping::Config {
132        bdp_initial_window: if config.adaptive_window {
133            Some(config.initial_stream_window_size)
134        } else {
135            None
136        },
137        keep_alive_interval: config.keep_alive_interval,
138        keep_alive_timeout: config.keep_alive_timeout,
139        keep_alive_while_idle: config.keep_alive_while_idle,
140    }
141}
142
143pub(crate) async fn handshake<T, B, E>(
144    io: T,
145    req_rx: ClientRx<B>,
146    config: &Config,
147    mut exec: E,
148    timer: Time,
149) -> crate::Result<ClientTask<B, E, T>>
150where
151    T: Read + Write + Unpin,
152    B: Body + 'static,
153    B::Data: Send + 'static,
154    E: Http2ClientConnExec<B, T> + Unpin,
155    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
156{
157    let (h2_tx, mut conn) = new_builder(config)
158        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
159        .await
160        .map_err(crate::Error::new_h2)?;
161
162    // An mpsc channel is used entirely to detect when the
163    // 'Client' has been dropped. This is to get around a bug
164    // in h2 where dropping all SendRequests won't notify a
165    // parked Connection.
166    let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
167    let (cancel_tx, conn_eof) = oneshot::channel();
168
169    let ping_config = new_ping_config(config);
170
171    let (conn, ping) = if ping_config.is_enabled() {
172        let pp = conn.ping_pong().expect("conn.ping_pong");
173        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
174
175        let conn: Conn<_, B> = Conn::new(ponger, conn);
176        (Either::left(conn), recorder)
177    } else {
178        (Either::right(conn), ping::disabled())
179    };
180    let conn: ConnMapErr<T, B> = ConnMapErr {
181        conn,
182        is_terminated: false,
183    };
184
185    exec.execute_h2_future(H2ClientFuture::Task {
186        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
187    });
188
189    Ok(ClientTask {
190        ping,
191        conn_drop_ref,
192        conn_eof,
193        executor: exec,
194        h2_tx,
195        req_rx,
196        fut_ctx: None,
197        marker: PhantomData,
198    })
199}
200
201pin_project! {
202    struct Conn<T, B>
203    where
204        B: Body,
205    {
206        #[pin]
207        ponger: Ponger,
208        #[pin]
209        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
210    }
211}
212
213impl<T, B> Conn<T, B>
214where
215    B: Body,
216    T: Read + Write + Unpin,
217{
218    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
219        Conn { ponger, conn }
220    }
221}
222
223impl<T, B> Future for Conn<T, B>
224where
225    B: Body,
226    T: Read + Write + Unpin,
227{
228    type Output = Result<(), h2::Error>;
229
230    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231        let mut this = self.project();
232        match this.ponger.poll(cx) {
233            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
234                this.conn.set_target_window_size(wnd);
235                this.conn.set_initial_window_size(wnd)?;
236            }
237            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
238                debug!("connection keep-alive timed out");
239                return Poll::Ready(Ok(()));
240            }
241            Poll::Pending => {}
242        }
243
244        Pin::new(&mut this.conn).poll(cx)
245    }
246}
247
248pin_project! {
249    struct ConnMapErr<T, B>
250    where
251        B: Body,
252        T: Read,
253        T: Write,
254        T: Unpin,
255    {
256        #[pin]
257        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
258        #[pin]
259        is_terminated: bool,
260    }
261}
262
263impl<T, B> Future for ConnMapErr<T, B>
264where
265    B: Body,
266    T: Read + Write + Unpin,
267{
268    type Output = Result<(), ()>;
269
270    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271        let mut this = self.project();
272
273        if *this.is_terminated {
274            return Poll::Pending;
275        }
276        let polled = this.conn.poll(cx);
277        if polled.is_ready() {
278            *this.is_terminated = true;
279        }
280        polled.map_err(|_e| {
281            debug!(error = %_e, "connection error");
282        })
283    }
284}
285
286impl<T, B> FusedFuture for ConnMapErr<T, B>
287where
288    B: Body,
289    T: Read + Write + Unpin,
290{
291    fn is_terminated(&self) -> bool {
292        self.is_terminated
293    }
294}
295
296pin_project! {
297    pub struct ConnTask<T, B>
298    where
299        B: Body,
300        T: Read,
301        T: Write,
302        T: Unpin,
303    {
304        #[pin]
305        drop_rx: Receiver<Infallible>,
306        #[pin]
307        cancel_tx: Option<oneshot::Sender<Infallible>>,
308        #[pin]
309        conn: ConnMapErr<T, B>,
310    }
311}
312
313impl<T, B> ConnTask<T, B>
314where
315    B: Body,
316    T: Read + Write + Unpin,
317{
318    fn new(
319        conn: ConnMapErr<T, B>,
320        drop_rx: Receiver<Infallible>,
321        cancel_tx: oneshot::Sender<Infallible>,
322    ) -> Self {
323        Self {
324            drop_rx,
325            cancel_tx: Some(cancel_tx),
326            conn,
327        }
328    }
329}
330
331impl<T, B> Future for ConnTask<T, B>
332where
333    B: Body,
334    T: Read + Write + Unpin,
335{
336    type Output = ();
337
338    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
339        let mut this = self.project();
340
341        if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
342            // ok or err, the `conn` has finished.
343            return Poll::Ready(());
344        }
345
346        if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
347            // mpsc has been dropped, hopefully polling
348            // the connection some more should start shutdown
349            // and then close.
350            trace!("send_request dropped, starting conn shutdown");
351            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
352        }
353
354        Poll::Pending
355    }
356}
357
358pin_project! {
359    #[project = H2ClientFutureProject]
360    pub enum H2ClientFuture<B, T>
361    where
362        B: http_body::Body,
363        B: 'static,
364        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
365        T: Read,
366        T: Write,
367        T: Unpin,
368    {
369        Pipe {
370            #[pin]
371            pipe: PipeMap<B>,
372        },
373        Send {
374            #[pin]
375            send_when: SendWhen<B>,
376        },
377        Task {
378            #[pin]
379            task: ConnTask<T, B>,
380        },
381    }
382}
383
384impl<B, T> Future for H2ClientFuture<B, T>
385where
386    B: http_body::Body + 'static,
387    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
388    T: Read + Write + Unpin,
389{
390    type Output = ();
391
392    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
393        let this = self.project();
394
395        match this {
396            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
397            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
398            H2ClientFutureProject::Task { task } => task.poll(cx),
399        }
400    }
401}
402
403struct FutCtx<B>
404where
405    B: Body,
406{
407    is_connect: bool,
408    eos: bool,
409    fut: ResponseFuture,
410    body_tx: SendStream<SendBuf<B::Data>>,
411    body: B,
412    cb: Callback<Request<B>, Response<IncomingBody>>,
413}
414
415impl<B: Body> Unpin for FutCtx<B> {}
416
417pub(crate) struct ClientTask<B, E, T>
418where
419    B: Body,
420    E: Unpin,
421{
422    ping: ping::Recorder,
423    conn_drop_ref: ConnDropRef,
424    conn_eof: ConnEof,
425    executor: E,
426    h2_tx: SendRequest<SendBuf<B::Data>>,
427    req_rx: ClientRx<B>,
428    fut_ctx: Option<FutCtx<B>>,
429    marker: PhantomData<T>,
430}
431
432impl<B, E, T> ClientTask<B, E, T>
433where
434    B: Body + 'static,
435    E: Http2ClientConnExec<B, T> + Unpin,
436    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
437    T: Read + Write + Unpin,
438{
439    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
440        self.h2_tx.is_extended_connect_protocol_enabled()
441    }
442}
443
444pin_project! {
445    pub struct PipeMap<S>
446    where
447        S: Body,
448    {
449        #[pin]
450        pipe: PipeToSendStream<S>,
451        #[pin]
452        conn_drop_ref: Option<Sender<Infallible>>,
453        #[pin]
454        ping: Option<Recorder>,
455    }
456}
457
458impl<B> Future for PipeMap<B>
459where
460    B: http_body::Body,
461    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
462{
463    type Output = ();
464
465    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
466        let mut this = self.project();
467
468        match Pin::new(&mut this.pipe).poll(cx) {
469            Poll::Ready(result) => {
470                if let Err(_e) = result {
471                    debug!("client request body error: {}", _e);
472                }
473                drop(this.conn_drop_ref.take().expect("Future polled twice"));
474                drop(this.ping.take().expect("Future polled twice"));
475                return Poll::Ready(());
476            }
477            Poll::Pending => (),
478        };
479        Poll::Pending
480    }
481}
482
483impl<B, E, T> ClientTask<B, E, T>
484where
485    B: Body + 'static + Unpin,
486    B::Data: Send,
487    E: Http2ClientConnExec<B, T> + Unpin,
488    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489    T: Read + Write + Unpin,
490{
491    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
492        let ping = self.ping.clone();
493
494        let send_stream = if !f.is_connect {
495            if !f.eos {
496                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
497
498                // eagerly see if the body pipe is ready and
499                // can thus skip allocating in the executor
500                match Pin::new(&mut pipe).poll(cx) {
501                    Poll::Ready(_) => (),
502                    Poll::Pending => {
503                        let conn_drop_ref = self.conn_drop_ref.clone();
504                        // keep the ping recorder's knowledge of an
505                        // "open stream" alive while this body is
506                        // still sending...
507                        let ping = ping.clone();
508
509                        let pipe = PipeMap {
510                            pipe,
511                            conn_drop_ref: Some(conn_drop_ref),
512                            ping: Some(ping),
513                        };
514                        // Clear send task
515                        self.executor
516                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
517                    }
518                }
519            }
520
521            None
522        } else {
523            Some(f.body_tx)
524        };
525
526        self.executor.execute_h2_future(H2ClientFuture::Send {
527            send_when: SendWhen {
528                when: ResponseFutMap {
529                    fut: f.fut,
530                    ping: Some(ping),
531                    send_stream: Some(send_stream),
532                },
533                call_back: Some(f.cb),
534            },
535        });
536    }
537}
538
539pin_project! {
540    pub(crate) struct ResponseFutMap<B>
541    where
542        B: Body,
543        B: 'static,
544    {
545        #[pin]
546        fut: ResponseFuture,
547        #[pin]
548        ping: Option<Recorder>,
549        #[pin]
550        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551    }
552}
553
554impl<B> Future for ResponseFutMap<B>
555where
556    B: Body + 'static,
557{
558    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
559
560    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561        let mut this = self.project();
562
563        let result = ready!(this.fut.poll(cx));
564
565        let ping = this.ping.take().expect("Future polled twice");
566        let send_stream = this.send_stream.take().expect("Future polled twice");
567
568        match result {
569            Ok(res) => {
570                // record that we got the response headers
571                ping.record_non_data();
572
573                let content_length = headers::content_length_parse_all(res.headers());
574                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
575                    if content_length.map_or(false, |len| len != 0) {
576                        warn!("h2 connect response with non-zero body not supported");
577
578                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
579                        return Poll::Ready(Err((
580                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
581                            None::<Request<B>>,
582                        )));
583                    }
584                    let (parts, recv_stream) = res.into_parts();
585                    let mut res = Response::from_parts(parts, IncomingBody::empty());
586
587                    let (pending, on_upgrade) = crate::upgrade::pending();
588                    let io = H2Upgraded {
589                        ping,
590                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
591                        recv_stream,
592                        buf: Bytes::new(),
593                    };
594                    let upgraded = Upgraded::new(io, Bytes::new());
595
596                    pending.fulfill(upgraded);
597                    res.extensions_mut().insert(on_upgrade);
598
599                    Poll::Ready(Ok(res))
600                } else {
601                    let res = res.map(|stream| {
602                        let ping = ping.for_stream(&stream);
603                        IncomingBody::h2(stream, content_length.into(), ping)
604                    });
605                    Poll::Ready(Ok(res))
606                }
607            }
608            Err(err) => {
609                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
610
611                debug!("client response error: {}", err);
612                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
613            }
614        }
615    }
616}
617
618impl<B, E, T> Future for ClientTask<B, E, T>
619where
620    B: Body + 'static + Unpin,
621    B::Data: Send,
622    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
623    E: Http2ClientConnExec<B, T> + Unpin,
624    T: Read + Write + Unpin,
625{
626    type Output = crate::Result<Dispatched>;
627
628    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629        loop {
630            match ready!(self.h2_tx.poll_ready(cx)) {
631                Ok(()) => (),
632                Err(err) => {
633                    self.ping.ensure_not_timed_out()?;
634                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
635                        trace!("connection gracefully shutdown");
636                        Poll::Ready(Ok(Dispatched::Shutdown))
637                    } else {
638                        Poll::Ready(Err(crate::Error::new_h2(err)))
639                    };
640                }
641            };
642
643            // If we were waiting on pending open
644            // continue where we left off.
645            if let Some(f) = self.fut_ctx.take() {
646                self.poll_pipe(f, cx);
647                continue;
648            }
649
650            match self.req_rx.poll_recv(cx) {
651                Poll::Ready(Some((req, cb))) => {
652                    // check that future hasn't been canceled already
653                    if cb.is_canceled() {
654                        trace!("request callback is canceled");
655                        continue;
656                    }
657                    let (head, body) = req.into_parts();
658                    let mut req = ::http::Request::from_parts(head, ());
659                    super::strip_connection_headers(req.headers_mut(), true);
660                    if let Some(len) = body.size_hint().exact() {
661                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
662                            headers::set_content_length_if_missing(req.headers_mut(), len);
663                        }
664                    }
665
666                    let is_connect = req.method() == Method::CONNECT;
667                    let eos = body.is_end_stream();
668
669                    if is_connect
670                        && headers::content_length_parse_all(req.headers())
671                            .map_or(false, |len| len != 0)
672                    {
673                        debug!("h2 connect request with non-zero body not supported");
674                        cb.send(Err(TrySendError {
675                            error: crate::Error::new_user_invalid_connect(),
676                            message: None,
677                        }));
678                        continue;
679                    }
680
681                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
682                        req.extensions_mut().insert(protocol.into_inner());
683                    }
684
685                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
686                        Ok(ok) => ok,
687                        Err(err) => {
688                            debug!("client send request error: {}", err);
689                            cb.send(Err(TrySendError {
690                                error: crate::Error::new_h2(err),
691                                message: None,
692                            }));
693                            continue;
694                        }
695                    };
696
697                    let f = FutCtx {
698                        is_connect,
699                        eos,
700                        fut,
701                        body_tx,
702                        body,
703                        cb,
704                    };
705
706                    // Check poll_ready() again.
707                    // If the call to send_request() resulted in the new stream being pending open
708                    // we have to wait for the open to complete before accepting new requests.
709                    match self.h2_tx.poll_ready(cx) {
710                        Poll::Pending => {
711                            // Save Context
712                            self.fut_ctx = Some(f);
713                            return Poll::Pending;
714                        }
715                        Poll::Ready(Ok(())) => (),
716                        Poll::Ready(Err(err)) => {
717                            f.cb.send(Err(TrySendError {
718                                error: crate::Error::new_h2(err),
719                                message: None,
720                            }));
721                            continue;
722                        }
723                    }
724                    self.poll_pipe(f, cx);
725                    continue;
726                }
727
728                Poll::Ready(None) => {
729                    trace!("client::dispatch::Sender dropped");
730                    return Poll::Ready(Ok(Dispatched::Shutdown));
731                }
732
733                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
734                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
735                    // But we cannot remove it as long as MSRV is less than that.
736                    #[allow(unused)]
737                    Ok(never) => match never {},
738                    Err(_conn_is_eof) => {
739                        trace!("connection task is closed, closing dispatch task");
740                        return Poll::Ready(Ok(Dispatched::Shutdown));
741                    }
742                },
743            }
744        }
745    }
746}