1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::h2::UpgradedSendStream;
30use crate::proto::Dispatched;
31use crate::rt::bounds::Http2ClientConnExec;
32use crate::upgrade::Upgraded;
33use crate::{Request, Response};
34use h2::client::ResponseFuture;
35
36type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
37
38type ConnDropRef = mpsc::Sender<Infallible>;
41
42type ConnEof = oneshot::Receiver<Infallible>;
45
46const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
63
64#[derive(Clone, Debug)]
65pub(crate) struct Config {
66 pub(crate) adaptive_window: bool,
67 pub(crate) initial_conn_window_size: u32,
68 pub(crate) initial_stream_window_size: u32,
69 pub(crate) initial_max_send_streams: usize,
70 pub(crate) max_frame_size: Option<u32>,
71 pub(crate) max_header_list_size: u32,
72 pub(crate) keep_alive_interval: Option<Duration>,
73 pub(crate) keep_alive_timeout: Duration,
74 pub(crate) keep_alive_while_idle: bool,
75 pub(crate) max_concurrent_reset_streams: Option<usize>,
76 pub(crate) max_send_buffer_size: usize,
77 pub(crate) max_pending_accept_reset_streams: Option<usize>,
78 pub(crate) header_table_size: Option<u32>,
79 pub(crate) max_concurrent_streams: Option<u32>,
80}
81
82impl Default for Config {
83 fn default() -> Config {
84 Config {
85 adaptive_window: false,
86 initial_conn_window_size: DEFAULT_CONN_WINDOW,
87 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
88 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
89 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
90 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
91 keep_alive_interval: None,
92 keep_alive_timeout: Duration::from_secs(20),
93 keep_alive_while_idle: false,
94 max_concurrent_reset_streams: None,
95 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
96 max_pending_accept_reset_streams: None,
97 header_table_size: None,
98 max_concurrent_streams: None,
99 }
100 }
101}
102
103fn new_builder(config: &Config) -> Builder {
104 let mut builder = Builder::default();
105 builder
106 .initial_max_send_streams(config.initial_max_send_streams)
107 .initial_window_size(config.initial_stream_window_size)
108 .initial_connection_window_size(config.initial_conn_window_size)
109 .max_header_list_size(config.max_header_list_size)
110 .max_send_buffer_size(config.max_send_buffer_size)
111 .enable_push(false);
112 if let Some(max) = config.max_frame_size {
113 builder.max_frame_size(max);
114 }
115 if let Some(max) = config.max_concurrent_reset_streams {
116 builder.max_concurrent_reset_streams(max);
117 }
118 if let Some(max) = config.max_pending_accept_reset_streams {
119 builder.max_pending_accept_reset_streams(max);
120 }
121 if let Some(size) = config.header_table_size {
122 builder.header_table_size(size);
123 }
124 if let Some(max) = config.max_concurrent_streams {
125 builder.max_concurrent_streams(max);
126 }
127 builder
128}
129
130fn new_ping_config(config: &Config) -> ping::Config {
131 ping::Config {
132 bdp_initial_window: if config.adaptive_window {
133 Some(config.initial_stream_window_size)
134 } else {
135 None
136 },
137 keep_alive_interval: config.keep_alive_interval,
138 keep_alive_timeout: config.keep_alive_timeout,
139 keep_alive_while_idle: config.keep_alive_while_idle,
140 }
141}
142
143pub(crate) async fn handshake<T, B, E>(
144 io: T,
145 req_rx: ClientRx<B>,
146 config: &Config,
147 mut exec: E,
148 timer: Time,
149) -> crate::Result<ClientTask<B, E, T>>
150where
151 T: Read + Write + Unpin,
152 B: Body + 'static,
153 B::Data: Send + 'static,
154 E: Http2ClientConnExec<B, T> + Unpin,
155 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
156{
157 let (h2_tx, mut conn) = new_builder(config)
158 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
159 .await
160 .map_err(crate::Error::new_h2)?;
161
162 let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
167 let (cancel_tx, conn_eof) = oneshot::channel();
168
169 let ping_config = new_ping_config(config);
170
171 let (conn, ping) = if ping_config.is_enabled() {
172 let pp = conn.ping_pong().expect("conn.ping_pong");
173 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
174
175 let conn: Conn<_, B> = Conn::new(ponger, conn);
176 (Either::left(conn), recorder)
177 } else {
178 (Either::right(conn), ping::disabled())
179 };
180 let conn: ConnMapErr<T, B> = ConnMapErr {
181 conn,
182 is_terminated: false,
183 };
184
185 exec.execute_h2_future(H2ClientFuture::Task {
186 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
187 });
188
189 Ok(ClientTask {
190 ping,
191 conn_drop_ref,
192 conn_eof,
193 executor: exec,
194 h2_tx,
195 req_rx,
196 fut_ctx: None,
197 marker: PhantomData,
198 })
199}
200
201pin_project! {
202 struct Conn<T, B>
203 where
204 B: Body,
205 {
206 #[pin]
207 ponger: Ponger,
208 #[pin]
209 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
210 }
211}
212
213impl<T, B> Conn<T, B>
214where
215 B: Body,
216 T: Read + Write + Unpin,
217{
218 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
219 Conn { ponger, conn }
220 }
221}
222
223impl<T, B> Future for Conn<T, B>
224where
225 B: Body,
226 T: Read + Write + Unpin,
227{
228 type Output = Result<(), h2::Error>;
229
230 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231 let mut this = self.project();
232 match this.ponger.poll(cx) {
233 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
234 this.conn.set_target_window_size(wnd);
235 this.conn.set_initial_window_size(wnd)?;
236 }
237 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
238 debug!("connection keep-alive timed out");
239 return Poll::Ready(Ok(()));
240 }
241 Poll::Pending => {}
242 }
243
244 Pin::new(&mut this.conn).poll(cx)
245 }
246}
247
248pin_project! {
249 struct ConnMapErr<T, B>
250 where
251 B: Body,
252 T: Read,
253 T: Write,
254 T: Unpin,
255 {
256 #[pin]
257 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
258 #[pin]
259 is_terminated: bool,
260 }
261}
262
263impl<T, B> Future for ConnMapErr<T, B>
264where
265 B: Body,
266 T: Read + Write + Unpin,
267{
268 type Output = Result<(), ()>;
269
270 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271 let mut this = self.project();
272
273 if *this.is_terminated {
274 return Poll::Pending;
275 }
276 let polled = this.conn.poll(cx);
277 if polled.is_ready() {
278 *this.is_terminated = true;
279 }
280 polled.map_err(|_e| {
281 debug!(error = %_e, "connection error");
282 })
283 }
284}
285
286impl<T, B> FusedFuture for ConnMapErr<T, B>
287where
288 B: Body,
289 T: Read + Write + Unpin,
290{
291 fn is_terminated(&self) -> bool {
292 self.is_terminated
293 }
294}
295
296pin_project! {
297 pub struct ConnTask<T, B>
298 where
299 B: Body,
300 T: Read,
301 T: Write,
302 T: Unpin,
303 {
304 #[pin]
305 drop_rx: Receiver<Infallible>,
306 #[pin]
307 cancel_tx: Option<oneshot::Sender<Infallible>>,
308 #[pin]
309 conn: ConnMapErr<T, B>,
310 }
311}
312
313impl<T, B> ConnTask<T, B>
314where
315 B: Body,
316 T: Read + Write + Unpin,
317{
318 fn new(
319 conn: ConnMapErr<T, B>,
320 drop_rx: Receiver<Infallible>,
321 cancel_tx: oneshot::Sender<Infallible>,
322 ) -> Self {
323 Self {
324 drop_rx,
325 cancel_tx: Some(cancel_tx),
326 conn,
327 }
328 }
329}
330
331impl<T, B> Future for ConnTask<T, B>
332where
333 B: Body,
334 T: Read + Write + Unpin,
335{
336 type Output = ();
337
338 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
339 let mut this = self.project();
340
341 if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
342 return Poll::Ready(());
344 }
345
346 if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
347 trace!("send_request dropped, starting conn shutdown");
351 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
352 }
353
354 Poll::Pending
355 }
356}
357
358pin_project! {
359 #[project = H2ClientFutureProject]
360 pub enum H2ClientFuture<B, T>
361 where
362 B: http_body::Body,
363 B: 'static,
364 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
365 T: Read,
366 T: Write,
367 T: Unpin,
368 {
369 Pipe {
370 #[pin]
371 pipe: PipeMap<B>,
372 },
373 Send {
374 #[pin]
375 send_when: SendWhen<B>,
376 },
377 Task {
378 #[pin]
379 task: ConnTask<T, B>,
380 },
381 }
382}
383
384impl<B, T> Future for H2ClientFuture<B, T>
385where
386 B: http_body::Body + 'static,
387 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
388 T: Read + Write + Unpin,
389{
390 type Output = ();
391
392 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
393 let this = self.project();
394
395 match this {
396 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
397 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
398 H2ClientFutureProject::Task { task } => task.poll(cx),
399 }
400 }
401}
402
403struct FutCtx<B>
404where
405 B: Body,
406{
407 is_connect: bool,
408 eos: bool,
409 fut: ResponseFuture,
410 body_tx: SendStream<SendBuf<B::Data>>,
411 body: B,
412 cb: Callback<Request<B>, Response<IncomingBody>>,
413}
414
415impl<B: Body> Unpin for FutCtx<B> {}
416
417pub(crate) struct ClientTask<B, E, T>
418where
419 B: Body,
420 E: Unpin,
421{
422 ping: ping::Recorder,
423 conn_drop_ref: ConnDropRef,
424 conn_eof: ConnEof,
425 executor: E,
426 h2_tx: SendRequest<SendBuf<B::Data>>,
427 req_rx: ClientRx<B>,
428 fut_ctx: Option<FutCtx<B>>,
429 marker: PhantomData<T>,
430}
431
432impl<B, E, T> ClientTask<B, E, T>
433where
434 B: Body + 'static,
435 E: Http2ClientConnExec<B, T> + Unpin,
436 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
437 T: Read + Write + Unpin,
438{
439 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
440 self.h2_tx.is_extended_connect_protocol_enabled()
441 }
442}
443
444pin_project! {
445 pub struct PipeMap<S>
446 where
447 S: Body,
448 {
449 #[pin]
450 pipe: PipeToSendStream<S>,
451 #[pin]
452 conn_drop_ref: Option<Sender<Infallible>>,
453 #[pin]
454 ping: Option<Recorder>,
455 }
456}
457
458impl<B> Future for PipeMap<B>
459where
460 B: http_body::Body,
461 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
462{
463 type Output = ();
464
465 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
466 let mut this = self.project();
467
468 match Pin::new(&mut this.pipe).poll(cx) {
469 Poll::Ready(result) => {
470 if let Err(_e) = result {
471 debug!("client request body error: {}", _e);
472 }
473 drop(this.conn_drop_ref.take().expect("Future polled twice"));
474 drop(this.ping.take().expect("Future polled twice"));
475 return Poll::Ready(());
476 }
477 Poll::Pending => (),
478 };
479 Poll::Pending
480 }
481}
482
483impl<B, E, T> ClientTask<B, E, T>
484where
485 B: Body + 'static + Unpin,
486 B::Data: Send,
487 E: Http2ClientConnExec<B, T> + Unpin,
488 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
489 T: Read + Write + Unpin,
490{
491 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
492 let ping = self.ping.clone();
493
494 let send_stream = if !f.is_connect {
495 if !f.eos {
496 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
497
498 match Pin::new(&mut pipe).poll(cx) {
501 Poll::Ready(_) => (),
502 Poll::Pending => {
503 let conn_drop_ref = self.conn_drop_ref.clone();
504 let ping = ping.clone();
508
509 let pipe = PipeMap {
510 pipe,
511 conn_drop_ref: Some(conn_drop_ref),
512 ping: Some(ping),
513 };
514 self.executor
516 .execute_h2_future(H2ClientFuture::Pipe { pipe });
517 }
518 }
519 }
520
521 None
522 } else {
523 Some(f.body_tx)
524 };
525
526 self.executor.execute_h2_future(H2ClientFuture::Send {
527 send_when: SendWhen {
528 when: ResponseFutMap {
529 fut: f.fut,
530 ping: Some(ping),
531 send_stream: Some(send_stream),
532 },
533 call_back: Some(f.cb),
534 },
535 });
536 }
537}
538
539pin_project! {
540 pub(crate) struct ResponseFutMap<B>
541 where
542 B: Body,
543 B: 'static,
544 {
545 #[pin]
546 fut: ResponseFuture,
547 #[pin]
548 ping: Option<Recorder>,
549 #[pin]
550 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
551 }
552}
553
554impl<B> Future for ResponseFutMap<B>
555where
556 B: Body + 'static,
557{
558 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
559
560 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
561 let mut this = self.project();
562
563 let result = ready!(this.fut.poll(cx));
564
565 let ping = this.ping.take().expect("Future polled twice");
566 let send_stream = this.send_stream.take().expect("Future polled twice");
567
568 match result {
569 Ok(res) => {
570 ping.record_non_data();
572
573 let content_length = headers::content_length_parse_all(res.headers());
574 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
575 if content_length.map_or(false, |len| len != 0) {
576 warn!("h2 connect response with non-zero body not supported");
577
578 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
579 return Poll::Ready(Err((
580 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
581 None::<Request<B>>,
582 )));
583 }
584 let (parts, recv_stream) = res.into_parts();
585 let mut res = Response::from_parts(parts, IncomingBody::empty());
586
587 let (pending, on_upgrade) = crate::upgrade::pending();
588 let io = H2Upgraded {
589 ping,
590 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
591 recv_stream,
592 buf: Bytes::new(),
593 };
594 let upgraded = Upgraded::new(io, Bytes::new());
595
596 pending.fulfill(upgraded);
597 res.extensions_mut().insert(on_upgrade);
598
599 Poll::Ready(Ok(res))
600 } else {
601 let res = res.map(|stream| {
602 let ping = ping.for_stream(&stream);
603 IncomingBody::h2(stream, content_length.into(), ping)
604 });
605 Poll::Ready(Ok(res))
606 }
607 }
608 Err(err) => {
609 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
610
611 debug!("client response error: {}", err);
612 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
613 }
614 }
615 }
616}
617
618impl<B, E, T> Future for ClientTask<B, E, T>
619where
620 B: Body + 'static + Unpin,
621 B::Data: Send,
622 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
623 E: Http2ClientConnExec<B, T> + Unpin,
624 T: Read + Write + Unpin,
625{
626 type Output = crate::Result<Dispatched>;
627
628 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629 loop {
630 match ready!(self.h2_tx.poll_ready(cx)) {
631 Ok(()) => (),
632 Err(err) => {
633 self.ping.ensure_not_timed_out()?;
634 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
635 trace!("connection gracefully shutdown");
636 Poll::Ready(Ok(Dispatched::Shutdown))
637 } else {
638 Poll::Ready(Err(crate::Error::new_h2(err)))
639 };
640 }
641 };
642
643 if let Some(f) = self.fut_ctx.take() {
646 self.poll_pipe(f, cx);
647 continue;
648 }
649
650 match self.req_rx.poll_recv(cx) {
651 Poll::Ready(Some((req, cb))) => {
652 if cb.is_canceled() {
654 trace!("request callback is canceled");
655 continue;
656 }
657 let (head, body) = req.into_parts();
658 let mut req = ::http::Request::from_parts(head, ());
659 super::strip_connection_headers(req.headers_mut(), true);
660 if let Some(len) = body.size_hint().exact() {
661 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
662 headers::set_content_length_if_missing(req.headers_mut(), len);
663 }
664 }
665
666 let is_connect = req.method() == Method::CONNECT;
667 let eos = body.is_end_stream();
668
669 if is_connect
670 && headers::content_length_parse_all(req.headers())
671 .map_or(false, |len| len != 0)
672 {
673 debug!("h2 connect request with non-zero body not supported");
674 cb.send(Err(TrySendError {
675 error: crate::Error::new_user_invalid_connect(),
676 message: None,
677 }));
678 continue;
679 }
680
681 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
682 req.extensions_mut().insert(protocol.into_inner());
683 }
684
685 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
686 Ok(ok) => ok,
687 Err(err) => {
688 debug!("client send request error: {}", err);
689 cb.send(Err(TrySendError {
690 error: crate::Error::new_h2(err),
691 message: None,
692 }));
693 continue;
694 }
695 };
696
697 let f = FutCtx {
698 is_connect,
699 eos,
700 fut,
701 body_tx,
702 body,
703 cb,
704 };
705
706 match self.h2_tx.poll_ready(cx) {
710 Poll::Pending => {
711 self.fut_ctx = Some(f);
713 return Poll::Pending;
714 }
715 Poll::Ready(Ok(())) => (),
716 Poll::Ready(Err(err)) => {
717 f.cb.send(Err(TrySendError {
718 error: crate::Error::new_h2(err),
719 message: None,
720 }));
721 continue;
722 }
723 }
724 self.poll_pipe(f, cx);
725 continue;
726 }
727
728 Poll::Ready(None) => {
729 trace!("client::dispatch::Sender dropped");
730 return Poll::Ready(Ok(Dispatched::Shutdown));
731 }
732
733 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
734 #[allow(unused)]
737 Ok(never) => match never {},
738 Err(_conn_is_eof) => {
739 trace!("connection task is closed, closing dispatch task");
740 return Poll::Ready(Ok(Dispatched::Shutdown));
741 }
742 },
743 }
744 }
745 }
746}