1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_core::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::Dispatched;
23use crate::rt::bounds::{Http2ServerConnExec, Http2UpgradedExec};
24use crate::rt::{Read, Write};
25use crate::service::HttpService;
26
27use crate::upgrade::{OnUpgrade, Pending, Upgraded};
28use crate::Response;
29
30const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
42
43#[derive(Clone, Debug)]
44pub(crate) struct Config {
45 pub(crate) adaptive_window: bool,
46 pub(crate) initial_conn_window_size: u32,
47 pub(crate) initial_stream_window_size: u32,
48 pub(crate) max_frame_size: u32,
49 pub(crate) enable_connect_protocol: bool,
50 pub(crate) max_concurrent_streams: Option<u32>,
51 pub(crate) max_pending_accept_reset_streams: Option<usize>,
52 pub(crate) max_local_error_reset_streams: Option<usize>,
53 pub(crate) keep_alive_interval: Option<Duration>,
54 pub(crate) keep_alive_timeout: Duration,
55 pub(crate) max_send_buffer_size: usize,
56 pub(crate) header_table_size: Option<u32>,
57 pub(crate) max_header_list_size: u32,
58 pub(crate) date_header: bool,
59}
60
61impl Default for Config {
62 fn default() -> Config {
63 Config {
64 adaptive_window: false,
65 initial_conn_window_size: DEFAULT_CONN_WINDOW,
66 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
67 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
68 enable_connect_protocol: false,
69 max_concurrent_streams: Some(200),
70 max_pending_accept_reset_streams: None,
71 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
72 header_table_size: None,
73 keep_alive_interval: None,
74 keep_alive_timeout: Duration::from_secs(20),
75 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
76 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
77 date_header: true,
78 }
79 }
80}
81
82pin_project! {
83 pub(crate) struct Server<T, S, B, E>
84 where
85 S: HttpService<IncomingBody>,
86 B: Body,
87 {
88 exec: E,
89 timer: Time,
90 service: S,
91 state: State<T, B>,
92 date_header: bool,
93 close_pending: bool
94 }
95}
96
97enum State<T, B>
98where
99 B: Body,
100{
101 Handshaking {
102 ping_config: ping::Config,
103 hs: Handshake<Compat<T>, SendBuf<B::Data>>,
104 },
105 Serving(Serving<T, B>),
106}
107
108struct Serving<T, B>
109where
110 B: Body,
111{
112 ping: Option<(ping::Recorder, ping::Ponger)>,
113 conn: Connection<Compat<T>, SendBuf<B::Data>>,
114 closing: Option<crate::Error>,
115 date_header: bool,
116}
117
118impl<T, S, B, E> Server<T, S, B, E>
119where
120 T: Read + Write + Unpin,
121 S: HttpService<IncomingBody, ResBody = B>,
122 S::Error: Into<Box<dyn StdError + Send + Sync>>,
123 B: Body + 'static,
124 E: Http2ServerConnExec<S::Future, B>,
125{
126 pub(crate) fn new(
127 io: T,
128 service: S,
129 config: &Config,
130 exec: E,
131 timer: Time,
132 ) -> Server<T, S, B, E> {
133 let mut builder = h2::server::Builder::default();
134 builder
135 .initial_window_size(config.initial_stream_window_size)
136 .initial_connection_window_size(config.initial_conn_window_size)
137 .max_frame_size(config.max_frame_size)
138 .max_header_list_size(config.max_header_list_size)
139 .max_local_error_reset_streams(config.max_local_error_reset_streams)
140 .max_send_buffer_size(config.max_send_buffer_size);
141 if let Some(max) = config.max_concurrent_streams {
142 builder.max_concurrent_streams(max);
143 }
144 if let Some(max) = config.max_pending_accept_reset_streams {
145 builder.max_pending_accept_reset_streams(max);
146 }
147 if let Some(size) = config.header_table_size {
148 builder.header_table_size(size);
149 }
150 if config.enable_connect_protocol {
151 builder.enable_connect_protocol();
152 }
153 let handshake = builder.handshake(Compat::new(io));
154
155 let bdp = if config.adaptive_window {
156 Some(config.initial_stream_window_size)
157 } else {
158 None
159 };
160
161 let ping_config = ping::Config {
162 bdp_initial_window: bdp,
163 keep_alive_interval: config.keep_alive_interval,
164 keep_alive_timeout: config.keep_alive_timeout,
165 keep_alive_while_idle: true,
168 };
169
170 Server {
171 exec,
172 timer,
173 state: State::Handshaking {
174 ping_config,
175 hs: handshake,
176 },
177 service,
178 date_header: config.date_header,
179 close_pending: false,
180 }
181 }
182
183 pub(crate) fn graceful_shutdown(&mut self) {
184 trace!("graceful_shutdown");
185 match self.state {
186 State::Handshaking { .. } => {
187 self.close_pending = true;
188 }
189 State::Serving(ref mut srv) => {
190 if srv.closing.is_none() {
191 srv.conn.graceful_shutdown();
192 }
193 }
194 }
195 }
196}
197
198impl<T, S, B, E> Future for Server<T, S, B, E>
199where
200 T: Read + Write + Unpin,
201 S: HttpService<IncomingBody, ResBody = B>,
202 S::Error: Into<Box<dyn StdError + Send + Sync>>,
203 B: Body + 'static,
204 E: Http2ServerConnExec<S::Future, B>,
205{
206 type Output = crate::Result<Dispatched>;
207
208 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209 let me = &mut *self;
210 loop {
211 let next = match me.state {
212 State::Handshaking {
213 ref mut hs,
214 ref ping_config,
215 } => {
216 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
217 let ping = if ping_config.is_enabled() {
218 let pp = conn.ping_pong().expect("conn.ping_pong");
219 Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
220 } else {
221 None
222 };
223 State::Serving(Serving {
224 ping,
225 conn,
226 closing: None,
227 date_header: me.date_header,
228 })
229 }
230 State::Serving(ref mut srv) => {
231 if me.close_pending && srv.closing.is_none() {
233 srv.conn.graceful_shutdown();
234 }
235 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
236 return Poll::Ready(Ok(Dispatched::Shutdown));
237 }
238 };
239 me.state = next;
240 }
241 }
242}
243
244impl<T, B> Serving<T, B>
245where
246 T: Read + Write + Unpin,
247 B: Body + 'static,
248{
249 fn poll_server<S, E>(
250 &mut self,
251 cx: &mut Context<'_>,
252 service: &mut S,
253 exec: &mut E,
254 ) -> Poll<crate::Result<()>>
255 where
256 S: HttpService<IncomingBody, ResBody = B>,
257 S::Error: Into<Box<dyn StdError + Send + Sync>>,
258 E: Http2ServerConnExec<S::Future, B>,
259 {
260 if self.closing.is_none() {
261 loop {
262 self.poll_ping(cx);
263
264 match ready!(self.conn.poll_accept(cx)) {
265 Some(Ok((req, mut respond))) => {
266 trace!("incoming request");
267 let content_length = headers::content_length_parse_all(req.headers());
268 let ping = self
269 .ping
270 .as_ref()
271 .map(|ping| ping.0.clone())
272 .unwrap_or_else(ping::disabled);
273
274 ping.record_non_data();
276
277 let is_connect = req.method() == Method::CONNECT;
278 let (mut parts, stream) = req.into_parts();
279 let (mut req, connect_parts) = if !is_connect {
280 (
281 Request::from_parts(
282 parts,
283 IncomingBody::h2(stream, content_length.into(), ping),
284 ),
285 None,
286 )
287 } else {
288 if content_length.map_or(false, |len| len != 0) {
289 warn!("h2 connect request with non-zero body not supported");
290 respond.send_reset(h2::Reason::INTERNAL_ERROR);
291 return Poll::Ready(Ok(()));
292 }
293 let (pending, upgrade) = crate::upgrade::pending();
294 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
295 parts.extensions.insert(upgrade);
296 (
297 Request::from_parts(parts, IncomingBody::empty()),
298 Some(ConnectParts {
299 pending,
300 ping,
301 recv_stream: stream,
302 }),
303 )
304 };
305
306 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
307 req.extensions_mut().insert(Protocol::from_inner(protocol));
308 }
309
310 let fut = H2Stream::new(
311 service.call(req),
312 connect_parts,
313 respond,
314 self.date_header,
315 exec.clone(),
316 );
317
318 exec.execute_h2stream(fut);
319 }
320 Some(Err(e)) => {
321 return Poll::Ready(Err(crate::Error::new_h2(e)));
322 }
323 None => {
324 if let Some((ref ping, _)) = self.ping {
326 ping.ensure_not_timed_out()?;
327 }
328
329 trace!("incoming connection complete");
330 return Poll::Ready(Ok(()));
331 }
332 }
333 }
334 }
335
336 debug_assert!(
337 self.closing.is_some(),
338 "poll_server broke loop without closing"
339 );
340
341 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
342
343 Poll::Ready(Err(self.closing.take().expect("polled after error")))
344 }
345
346 fn poll_ping(&mut self, cx: &mut Context<'_>) {
347 if let Some((_, ref mut estimator)) = self.ping {
348 match estimator.poll(cx) {
349 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
350 self.conn.set_target_window_size(wnd);
351 let _ = self.conn.set_initial_window_size(wnd);
352 }
353 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
354 debug!("keep-alive timed out, closing connection");
355 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
356 }
357 Poll::Pending => {}
358 }
359 }
360 }
361}
362
363pin_project! {
364 #[allow(missing_debug_implementations)]
365 pub struct H2Stream<F, B, E>
366 where
367 B: Body,
368 {
369 reply: SendResponse<SendBuf<B::Data>>,
370 #[pin]
371 state: H2StreamState<F, B>,
372 date_header: bool,
373 exec: E,
374 }
375}
376
377pin_project! {
378 #[project = H2StreamStateProj]
379 enum H2StreamState<F, B>
380 where
381 B: Body,
382 {
383 Service {
384 #[pin]
385 fut: F,
386 connect_parts: Option<ConnectParts>,
387 },
388 Body {
389 #[pin]
390 pipe: PipeToSendStream<B>,
391 },
392 }
393}
394
395struct ConnectParts {
396 pending: Pending,
397 ping: Recorder,
398 recv_stream: RecvStream,
399}
400
401impl<F, B, E> H2Stream<F, B, E>
402where
403 B: Body,
404{
405 fn new(
406 fut: F,
407 connect_parts: Option<ConnectParts>,
408 respond: SendResponse<SendBuf<B::Data>>,
409 date_header: bool,
410 exec: E,
411 ) -> H2Stream<F, B, E> {
412 H2Stream {
413 reply: respond,
414 state: H2StreamState::Service { fut, connect_parts },
415 date_header,
416 exec,
417 }
418 }
419}
420
421macro_rules! reply {
422 ($me:expr, $res:expr, $eos:expr) => {{
423 match $me.reply.send_response($res, $eos) {
424 Ok(tx) => tx,
425 Err(e) => {
426 debug!("send response error: {}", e);
427 $me.reply.send_reset(Reason::INTERNAL_ERROR);
428 return Poll::Ready(Err(crate::Error::new_h2(e)));
429 }
430 }
431 }};
432}
433
434impl<F, B, Ex, E> H2Stream<F, B, Ex>
435where
436 F: Future<Output = Result<Response<B>, E>>,
437 B: Body,
438 B::Data: 'static,
439 B::Error: Into<Box<dyn StdError + Send + Sync>>,
440 Ex: Http2UpgradedExec<B::Data>,
441 E: Into<Box<dyn StdError + Send + Sync>>,
442{
443 fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
444 let mut me = self.as_mut().project();
445 loop {
446 let next = match me.state.as_mut().project() {
447 H2StreamStateProj::Service {
448 fut: h,
449 connect_parts,
450 } => {
451 let res = match h.poll(cx) {
452 Poll::Ready(Ok(r)) => r,
453 Poll::Pending => {
454 if let Poll::Ready(reason) =
457 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
458 {
459 debug!("stream received RST_STREAM: {:?}", reason);
460 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
461 }
462 return Poll::Pending;
463 }
464 Poll::Ready(Err(e)) => {
465 let err = crate::Error::new_user_service(e);
466 warn!("http2 service errored: {}", err);
467 me.reply.send_reset(err.h2_reason());
468 return Poll::Ready(Err(err));
469 }
470 };
471
472 let (head, body) = res.into_parts();
473 let mut res = ::http::Response::from_parts(head, ());
474 super::strip_connection_headers(res.headers_mut(), false);
475
476 if *me.date_header {
478 res.headers_mut()
479 .entry(::http::header::DATE)
480 .or_insert_with(date::update_and_header_value);
481 }
482
483 if let Some(connect_parts) = connect_parts.take() {
484 if res.status().is_success() {
485 if headers::content_length_parse_all(res.headers())
486 .map_or(false, |len| len != 0)
487 {
488 warn!("h2 successful response to CONNECT request with body not supported");
489 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
490 return Poll::Ready(Err(crate::Error::new_user_header()));
491 }
492 if res
493 .headers_mut()
494 .remove(::http::header::CONTENT_LENGTH)
495 .is_some()
496 {
497 warn!("successful response to CONNECT request disallows content-length header");
498 }
499 let send_stream = reply!(me, res, false);
500 let (h2_up, up_task) = super::upgrade::pair(
501 send_stream,
502 connect_parts.recv_stream,
503 connect_parts.ping,
504 );
505 connect_parts
506 .pending
507 .fulfill(Upgraded::new(h2_up, Bytes::new()));
508 self.exec.execute_upgrade(up_task);
509 return Poll::Ready(Ok(()));
510 }
511 }
512
513 if !body.is_end_stream() {
514 if let Some(len) = body.size_hint().exact() {
516 headers::set_content_length_if_missing(res.headers_mut(), len);
517 }
518
519 let body_tx = reply!(me, res, false);
520 H2StreamState::Body {
521 pipe: PipeToSendStream::new(body, body_tx),
522 }
523 } else {
524 reply!(me, res, true);
525 return Poll::Ready(Ok(()));
526 }
527 }
528 H2StreamStateProj::Body { pipe } => {
529 return pipe.poll(cx);
530 }
531 };
532 me.state.set(next);
533 }
534 }
535}
536
537impl<F, B, Ex, E> Future for H2Stream<F, B, Ex>
538where
539 F: Future<Output = Result<Response<B>, E>>,
540 B: Body,
541 B::Data: 'static,
542 B::Error: Into<Box<dyn StdError + Send + Sync>>,
543 Ex: Http2UpgradedExec<B::Data>,
544 E: Into<Box<dyn StdError + Send + Sync>>,
545{
546 type Output = ();
547
548 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549 self.poll2(cx).map(|res| {
550 if let Err(_e) = res {
551 debug!("stream error: {}", _e);
552 }
553 })
554 }
555}