Skip to main content

hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37///// An mpsc channel is used to help notify the `Connection` task when *all*
38///// other handles to it have been dropped, so that it can shutdown.
39type ConnDropRef = mpsc::Sender<Infallible>;
40
41///// A oneshot channel watches the `Connection` task, and when it completes,
42///// the "dispatch" task will be notified and can shutdown sooner.
43type ConnEof = oneshot::Receiver<Infallible>;
44
45// Our defaults are chosen for the "majority" case, which usually are not
46// resource constrained, and so the spec default of 64kb can be too limiting
47// for performance.
48const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
49const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
50const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
51const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
52const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
53
54// The maximum number of concurrent streams that the client is allowed to open
55// before it receives the initial SETTINGS frame from the server.
56// This default value is derived from what the HTTP/2 spec recommends as the
57// minimum value that endpoints advertise to their peers. It means that using
58// this value will minimize the chance of the failure where the local endpoint
59// attempts to open too many streams and gets rejected by the remote peer with
60// the `REFUSED_STREAM` error.
61const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65    pub(crate) adaptive_window: bool,
66    pub(crate) initial_conn_window_size: u32,
67    pub(crate) initial_stream_window_size: u32,
68    pub(crate) initial_max_send_streams: usize,
69    pub(crate) max_frame_size: Option<u32>,
70    pub(crate) max_header_list_size: u32,
71    pub(crate) keep_alive_interval: Option<Duration>,
72    pub(crate) keep_alive_timeout: Duration,
73    pub(crate) keep_alive_while_idle: bool,
74    pub(crate) max_concurrent_reset_streams: Option<usize>,
75    pub(crate) max_send_buffer_size: usize,
76    pub(crate) max_pending_accept_reset_streams: Option<usize>,
77    pub(crate) max_local_error_reset_streams: Option<usize>,
78    pub(crate) header_table_size: Option<u32>,
79    pub(crate) max_concurrent_streams: Option<u32>,
80    pub(crate) reset_stream_duration: Option<Duration>,
81}
82
83impl Default for Config {
84    fn default() -> Config {
85        Config {
86            adaptive_window: false,
87            initial_conn_window_size: DEFAULT_CONN_WINDOW,
88            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
89            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
90            max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
91            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
92            keep_alive_interval: None,
93            keep_alive_timeout: Duration::from_secs(20),
94            keep_alive_while_idle: false,
95            max_concurrent_reset_streams: None,
96            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
97            max_pending_accept_reset_streams: None,
98            max_local_error_reset_streams: Some(1024),
99            header_table_size: None,
100            max_concurrent_streams: None,
101            reset_stream_duration: None,
102        }
103    }
104}
105
106fn new_builder(config: &Config) -> Builder {
107    let mut builder = Builder::default();
108    builder
109        .initial_max_send_streams(config.initial_max_send_streams)
110        .initial_window_size(config.initial_stream_window_size)
111        .initial_connection_window_size(config.initial_conn_window_size)
112        .max_header_list_size(config.max_header_list_size)
113        .max_send_buffer_size(config.max_send_buffer_size)
114        .max_local_error_reset_streams(config.max_local_error_reset_streams)
115        .enable_push(false);
116    if let Some(max) = config.max_frame_size {
117        builder.max_frame_size(max);
118    }
119    if let Some(max) = config.max_concurrent_reset_streams {
120        builder.max_concurrent_reset_streams(max);
121    }
122    if let Some(max) = config.max_pending_accept_reset_streams {
123        builder.max_pending_accept_reset_streams(max);
124    }
125    if let Some(size) = config.header_table_size {
126        builder.header_table_size(size);
127    }
128    if let Some(max) = config.max_concurrent_streams {
129        builder.max_concurrent_streams(max);
130    }
131    if let Some(dur) = config.reset_stream_duration {
132        builder.reset_stream_duration(dur);
133    }
134    builder
135}
136
137fn new_ping_config(config: &Config) -> ping::Config {
138    ping::Config {
139        bdp_initial_window: if config.adaptive_window {
140            Some(config.initial_stream_window_size)
141        } else {
142            None
143        },
144        keep_alive_interval: config.keep_alive_interval,
145        keep_alive_timeout: config.keep_alive_timeout,
146        keep_alive_while_idle: config.keep_alive_while_idle,
147    }
148}
149
150pub(crate) async fn handshake<T, B, E>(
151    io: T,
152    req_rx: ClientRx<B>,
153    config: &Config,
154    mut exec: E,
155    timer: Time,
156) -> crate::Result<ClientTask<B, E, T>>
157where
158    T: Read + Write + Unpin,
159    B: Body + 'static,
160    B::Data: Send + 'static,
161    E: Http2ClientConnExec<B, T> + Clone + Unpin,
162    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
163{
164    let (h2_tx, mut conn) = new_builder(config)
165        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
166        .await
167        .map_err(crate::Error::new_h2)?;
168
169    // An mpsc channel is used entirely to detect when the
170    // 'Client' has been dropped. This is to get around a bug
171    // in h2 where dropping all SendRequests won't notify a
172    // parked Connection.
173    let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
174    let (cancel_tx, conn_eof) = oneshot::channel();
175
176    let ping_config = new_ping_config(config);
177
178    let (conn, ping) = if ping_config.is_enabled() {
179        let pp = conn.ping_pong().expect("conn.ping_pong");
180        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
181
182        let conn: Conn<_, B> = Conn::new(ponger, conn);
183        (Either::left(conn), recorder)
184    } else {
185        (Either::right(conn), ping::disabled())
186    };
187    let conn: ConnMapErr<T, B> = ConnMapErr {
188        conn,
189        is_terminated: false,
190    };
191
192    exec.execute_h2_future(H2ClientFuture::Task {
193        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
194    });
195
196    Ok(ClientTask {
197        ping,
198        conn_drop_ref,
199        conn_eof,
200        executor: exec,
201        h2_tx,
202        req_rx,
203        fut_ctx: None,
204        marker: PhantomData,
205    })
206}
207
208pin_project! {
209    struct Conn<T, B>
210    where
211        B: Body,
212    {
213        #[pin]
214        ponger: Ponger,
215        #[pin]
216        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
217    }
218}
219
220impl<T, B> Conn<T, B>
221where
222    B: Body,
223    T: Read + Write + Unpin,
224{
225    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
226        Conn { ponger, conn }
227    }
228}
229
230impl<T, B> Future for Conn<T, B>
231where
232    B: Body,
233    T: Read + Write + Unpin,
234{
235    type Output = Result<(), h2::Error>;
236
237    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
238        let mut this = self.project();
239        match this.ponger.poll(cx) {
240            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
241                this.conn.set_target_window_size(wnd);
242                this.conn.set_initial_window_size(wnd)?;
243            }
244            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
245                debug!("connection keep-alive timed out");
246                return Poll::Ready(Ok(()));
247            }
248            Poll::Pending => {}
249        }
250
251        Pin::new(&mut this.conn).poll(cx)
252    }
253}
254
255pin_project! {
256    struct ConnMapErr<T, B>
257    where
258        B: Body,
259        T: Read,
260        T: Write,
261        T: Unpin,
262    {
263        #[pin]
264        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
265        #[pin]
266        is_terminated: bool,
267    }
268}
269
270impl<T, B> Future for ConnMapErr<T, B>
271where
272    B: Body,
273    T: Read + Write + Unpin,
274{
275    type Output = Result<(), ()>;
276
277    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
278        let mut this = self.project();
279
280        if *this.is_terminated {
281            return Poll::Pending;
282        }
283        let polled = this.conn.poll(cx);
284        if polled.is_ready() {
285            *this.is_terminated = true;
286        }
287        polled.map_err(|_e| {
288            debug!(error = %_e, "connection error");
289        })
290    }
291}
292
293impl<T, B> FusedFuture for ConnMapErr<T, B>
294where
295    B: Body,
296    T: Read + Write + Unpin,
297{
298    fn is_terminated(&self) -> bool {
299        self.is_terminated
300    }
301}
302
303pin_project! {
304    pub struct ConnTask<T, B>
305    where
306        B: Body,
307        T: Read,
308        T: Write,
309        T: Unpin,
310    {
311        #[pin]
312        drop_rx: Receiver<Infallible>,
313        #[pin]
314        cancel_tx: Option<oneshot::Sender<Infallible>>,
315        #[pin]
316        conn: ConnMapErr<T, B>,
317    }
318}
319
320impl<T, B> ConnTask<T, B>
321where
322    B: Body,
323    T: Read + Write + Unpin,
324{
325    fn new(
326        conn: ConnMapErr<T, B>,
327        drop_rx: Receiver<Infallible>,
328        cancel_tx: oneshot::Sender<Infallible>,
329    ) -> Self {
330        Self {
331            drop_rx,
332            cancel_tx: Some(cancel_tx),
333            conn,
334        }
335    }
336}
337
338impl<T, B> Future for ConnTask<T, B>
339where
340    B: Body,
341    T: Read + Write + Unpin,
342{
343    type Output = ();
344
345    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
346        let mut this = self.project();
347
348        if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
349            // ok or err, the `conn` has finished.
350            return Poll::Ready(());
351        }
352
353        if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
354            // mpsc has been dropped, hopefully polling
355            // the connection some more should start shutdown
356            // and then close.
357            trace!("send_request dropped, starting conn shutdown");
358            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
359        }
360
361        Poll::Pending
362    }
363}
364
365pin_project! {
366    #[project = H2ClientFutureProject]
367    pub enum H2ClientFuture<B, T, E>
368    where
369        B: http_body::Body,
370        B: 'static,
371        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
372        T: Read,
373        T: Write,
374        T: Unpin,
375    {
376        Pipe {
377            #[pin]
378            pipe: PipeMap<B>,
379        },
380        Send {
381            #[pin]
382            send_when: SendWhen<B, E>,
383        },
384        Task {
385            #[pin]
386            task: ConnTask<T, B>,
387        },
388    }
389}
390
391impl<B, T, E> Future for H2ClientFuture<B, T, E>
392where
393    B: http_body::Body + 'static,
394    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
395    T: Read + Write + Unpin,
396    E: Http2UpgradedExec<B::Data>,
397{
398    type Output = ();
399
400    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
401        let this = self.project();
402
403        match this {
404            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
405            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
406            H2ClientFutureProject::Task { task } => task.poll(cx),
407        }
408    }
409}
410
411struct FutCtx<B>
412where
413    B: Body,
414{
415    is_connect: bool,
416    eos: bool,
417    fut: ResponseFuture,
418    body_tx: SendStream<SendBuf<B::Data>>,
419    body: B,
420    cb: Callback<Request<B>, Response<IncomingBody>>,
421}
422
423impl<B: Body> Unpin for FutCtx<B> {}
424
425pub(crate) struct ClientTask<B, E, T>
426where
427    B: Body,
428    E: Unpin,
429{
430    ping: ping::Recorder,
431    conn_drop_ref: ConnDropRef,
432    conn_eof: ConnEof,
433    executor: E,
434    h2_tx: SendRequest<SendBuf<B::Data>>,
435    req_rx: ClientRx<B>,
436    fut_ctx: Option<FutCtx<B>>,
437    marker: PhantomData<T>,
438}
439
440impl<B, E, T> ClientTask<B, E, T>
441where
442    B: Body + 'static,
443    E: Http2ClientConnExec<B, T> + Unpin,
444    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
445    T: Read + Write + Unpin,
446{
447    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
448        self.h2_tx.is_extended_connect_protocol_enabled()
449    }
450    pub(crate) fn current_max_send_streams(&self) -> usize {
451        self.h2_tx.current_max_send_streams()
452    }
453    pub(crate) fn current_max_recv_streams(&self) -> usize {
454        self.h2_tx.current_max_recv_streams()
455    }
456}
457
458pin_project! {
459    pub struct PipeMap<S>
460    where
461        S: Body,
462    {
463        #[pin]
464        pipe: PipeToSendStream<S>,
465        #[pin]
466        conn_drop_ref: Option<Sender<Infallible>>,
467        #[pin]
468        ping: Option<Recorder>,
469        cancel_rx: Option<oneshot::Receiver<()>>,
470    }
471}
472
473impl<B> Future for PipeMap<B>
474where
475    B: http_body::Body,
476    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
477{
478    type Output = ();
479
480    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
481        let mut this = self.project();
482
483        // Check if the client cancelled the request (e.g. dropped the
484        // response future due to a timeout). If so, reset the h2 stream
485        // so that a RST_STREAM is sent and flow-control capacity is freed.
486        let cancel_result = this.cancel_rx.as_mut().map(|rx| Pin::new(rx).poll(cx));
487        match cancel_result {
488            Some(Poll::Ready(Ok(()))) => {
489                debug!("client request body send cancelled, resetting stream");
490                this.pipe.as_mut().send_reset(h2::Reason::CANCEL);
491                drop(this.conn_drop_ref.take().expect("Future polled twice"));
492                drop(this.ping.take().expect("Future polled twice"));
493                return Poll::Ready(());
494            }
495            Some(Poll::Ready(Err(_))) => {
496                // Sender dropped without cancelling (normal response or error).
497                // Stop polling the receiver.
498                *this.cancel_rx = None;
499            }
500            Some(Poll::Pending) | None => {}
501        }
502
503        match Pin::new(&mut this.pipe).poll(cx) {
504            Poll::Ready(result) => {
505                if let Err(_e) = result {
506                    debug!("client request body error: {}", _e);
507                }
508                drop(this.conn_drop_ref.take().expect("Future polled twice"));
509                drop(this.ping.take().expect("Future polled twice"));
510                return Poll::Ready(());
511            }
512            Poll::Pending => (),
513        };
514        Poll::Pending
515    }
516}
517
518impl<B, E, T> ClientTask<B, E, T>
519where
520    B: Body + 'static + Unpin,
521    B::Data: Send,
522    E: Http2ClientConnExec<B, T> + Clone + Unpin,
523    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
524    T: Read + Write + Unpin,
525{
526    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
527        let ping = self.ping.clone();
528
529        // A one-shot channel so that send_task can tell pipe_task to
530        // reset the stream when the client cancels the request.
531        let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
532
533        let send_stream = if !f.is_connect {
534            if !f.eos {
535                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
536
537                // eagerly see if the body pipe is ready and
538                // can thus skip allocating in the executor
539                match Pin::new(&mut pipe).poll(cx) {
540                    Poll::Ready(_) => (),
541                    Poll::Pending => {
542                        let conn_drop_ref = self.conn_drop_ref.clone();
543                        // keep the ping recorder's knowledge of an
544                        // "open stream" alive while this body is
545                        // still sending...
546                        let ping = ping.clone();
547
548                        let pipe = PipeMap {
549                            pipe,
550                            conn_drop_ref: Some(conn_drop_ref),
551                            ping: Some(ping),
552                            cancel_rx: Some(cancel_rx),
553                        };
554                        // Clear send task
555                        self.executor
556                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
557                    }
558                }
559            }
560
561            None
562        } else {
563            Some(f.body_tx)
564        };
565
566        self.executor.execute_h2_future(H2ClientFuture::Send {
567            send_when: SendWhen {
568                when: ResponseFutMap {
569                    fut: f.fut,
570                    ping: Some(ping),
571                    send_stream: Some(send_stream),
572                    exec: self.executor.clone(),
573                    cancel_tx: Some(cancel_tx),
574                },
575                call_back: Some(f.cb),
576            },
577        });
578    }
579}
580
581pin_project! {
582    pub(crate) struct ResponseFutMap<B, E>
583    where
584        B: Body,
585        B: 'static,
586    {
587        #[pin]
588        fut: ResponseFuture,
589        ping: Option<Recorder>,
590        #[pin]
591        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
592        exec: E,
593        cancel_tx: Option<oneshot::Sender<()>>,
594    }
595}
596
597impl<B: Body + 'static, E> ResponseFutMap<B, E> {
598    /// Signal the `pipe_task` to reset the stream (e.g. on client cancellation).
599    pub(crate) fn cancel(self: Pin<&mut Self>) {
600        if let Some(cancel_tx) = self.project().cancel_tx.take() {
601            let _ = cancel_tx.send(());
602        }
603    }
604}
605
606impl<B, E> Future for ResponseFutMap<B, E>
607where
608    B: Body + 'static,
609    E: Http2UpgradedExec<B::Data>,
610{
611    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
612
613    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
614        let mut this = self.as_mut().project();
615
616        let result = ready!(this.fut.poll(cx));
617
618        let ping = this.ping.take().expect("Future polled twice");
619        let send_stream = this.send_stream.take().expect("Future polled twice");
620
621        match result {
622            Ok(res) => {
623                // record that we got the response headers
624                ping.record_non_data();
625
626                let content_length = headers::content_length_parse_all(res.headers());
627                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
628                    if content_length.map_or(false, |len| len != 0) {
629                        warn!("h2 connect response with non-zero body not supported");
630
631                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
632                        return Poll::Ready(Err((
633                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
634                            None::<Request<B>>,
635                        )));
636                    }
637                    let (parts, recv_stream) = res.into_parts();
638                    let mut res = Response::from_parts(parts, IncomingBody::empty());
639
640                    let (pending, on_upgrade) = crate::upgrade::pending();
641
642                    let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
643                    self.exec.execute_upgrade(up_task);
644                    let upgraded = Upgraded::new(h2_up, Bytes::new());
645
646                    pending.fulfill(upgraded);
647                    res.extensions_mut().insert(on_upgrade);
648
649                    Poll::Ready(Ok(res))
650                } else {
651                    let res = res.map(|stream| {
652                        let ping = ping.for_stream(&stream);
653                        IncomingBody::h2(stream, content_length.into(), ping)
654                    });
655                    Poll::Ready(Ok(res))
656                }
657            }
658            Err(err) => {
659                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
660
661                debug!("client response error: {}", err);
662                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
663            }
664        }
665    }
666}
667
668impl<B, E, T> Future for ClientTask<B, E, T>
669where
670    B: Body + 'static + Unpin,
671    B::Data: Send,
672    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
673    E: Http2ClientConnExec<B, T> + Clone + Unpin,
674    T: Read + Write + Unpin,
675{
676    type Output = crate::Result<Dispatched>;
677
678    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
679        loop {
680            match ready!(self.h2_tx.poll_ready(cx)) {
681                Ok(()) => (),
682                Err(err) => {
683                    self.ping.ensure_not_timed_out()?;
684                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
685                        trace!("connection gracefully shutdown");
686                        Poll::Ready(Ok(Dispatched::Shutdown))
687                    } else {
688                        Poll::Ready(Err(crate::Error::new_h2(err)))
689                    };
690                }
691            };
692
693            // If we were waiting on pending open
694            // continue where we left off.
695            if let Some(f) = self.fut_ctx.take() {
696                self.poll_pipe(f, cx);
697                continue;
698            }
699
700            match self.req_rx.poll_recv(cx) {
701                Poll::Ready(Some((req, cb))) => {
702                    // check that future hasn't been canceled already
703                    if cb.is_canceled() {
704                        trace!("request callback is canceled");
705                        continue;
706                    }
707                    let (head, body) = req.into_parts();
708                    let mut req = ::http::Request::from_parts(head, ());
709                    super::strip_connection_headers(req.headers_mut(), true);
710                    if let Some(len) = body.size_hint().exact() {
711                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
712                            headers::set_content_length_if_missing(req.headers_mut(), len);
713                        }
714                    }
715
716                    let is_connect = req.method() == Method::CONNECT;
717                    let eos = body.is_end_stream();
718
719                    if is_connect
720                        && headers::content_length_parse_all(req.headers())
721                            .map_or(false, |len| len != 0)
722                    {
723                        debug!("h2 connect request with non-zero body not supported");
724                        cb.send(Err(TrySendError {
725                            error: crate::Error::new_user_invalid_connect(),
726                            message: None,
727                        }));
728                        continue;
729                    }
730
731                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
732                        req.extensions_mut().insert(protocol.into_inner());
733                    }
734
735                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
736                        Ok(ok) => ok,
737                        Err(err) => {
738                            debug!("client send request error: {}", err);
739                            cb.send(Err(TrySendError {
740                                error: crate::Error::new_h2(err),
741                                message: None,
742                            }));
743                            continue;
744                        }
745                    };
746
747                    let f = FutCtx {
748                        is_connect,
749                        eos,
750                        fut,
751                        body_tx,
752                        body,
753                        cb,
754                    };
755
756                    // Check poll_ready() again.
757                    // If the call to send_request() resulted in the new stream being pending open
758                    // we have to wait for the open to complete before accepting new requests.
759                    match self.h2_tx.poll_ready(cx) {
760                        Poll::Pending => {
761                            // Save Context
762                            self.fut_ctx = Some(f);
763                            return Poll::Pending;
764                        }
765                        Poll::Ready(Ok(())) => (),
766                        Poll::Ready(Err(err)) => {
767                            f.cb.send(Err(TrySendError {
768                                error: crate::Error::new_h2(err),
769                                message: None,
770                            }));
771                            continue;
772                        }
773                    }
774                    self.poll_pipe(f, cx);
775                    continue;
776                }
777
778                Poll::Ready(None) => {
779                    trace!("client::dispatch::Sender dropped");
780                    return Poll::Ready(Ok(Dispatched::Shutdown));
781                }
782
783                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
784                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
785                    // But we cannot remove it as long as MSRV is less than that.
786                    #[allow(unused)]
787                    Ok(never) => match never {},
788                    Err(_conn_is_eof) => {
789                        trace!("connection task is closed, closing dispatch task");
790                        return Poll::Ready(Ok(Dispatched::Shutdown));
791                    }
792                },
793            }
794        }
795    }
796}