Skip to main content

hyper/proto/h2/
server.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_core::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::Dispatched;
23use crate::rt::bounds::{Http2ServerConnExec, Http2UpgradedExec};
24use crate::rt::{Read, Write};
25use crate::service::HttpService;
26
27use crate::upgrade::{OnUpgrade, Pending, Upgraded};
28use crate::Response;
29
30// Our defaults are chosen for the "majority" case, which usually are not
31// resource constrained, and so the spec default of 64kb can be too limiting
32// for performance.
33//
34// At the same time, a server more often has multiple clients connected, and
35// so is more likely to use more resources than a client would.
36const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
37const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
38const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
39const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
40const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
41const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
42
43#[derive(Clone, Debug)]
44pub(crate) struct Config {
45    pub(crate) adaptive_window: bool,
46    pub(crate) initial_conn_window_size: u32,
47    pub(crate) initial_stream_window_size: u32,
48    pub(crate) max_frame_size: u32,
49    pub(crate) enable_connect_protocol: bool,
50    pub(crate) max_concurrent_streams: Option<u32>,
51    pub(crate) max_pending_accept_reset_streams: Option<usize>,
52    pub(crate) max_local_error_reset_streams: Option<usize>,
53    pub(crate) keep_alive_interval: Option<Duration>,
54    pub(crate) keep_alive_timeout: Duration,
55    pub(crate) max_send_buffer_size: usize,
56    pub(crate) header_table_size: Option<u32>,
57    pub(crate) max_header_list_size: u32,
58    pub(crate) date_header: bool,
59}
60
61impl Default for Config {
62    fn default() -> Config {
63        Config {
64            adaptive_window: false,
65            initial_conn_window_size: DEFAULT_CONN_WINDOW,
66            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
67            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
68            enable_connect_protocol: false,
69            max_concurrent_streams: Some(200),
70            max_pending_accept_reset_streams: None,
71            max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
72            header_table_size: None,
73            keep_alive_interval: None,
74            keep_alive_timeout: Duration::from_secs(20),
75            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
76            max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
77            date_header: true,
78        }
79    }
80}
81
82pin_project! {
83    pub(crate) struct Server<T, S, B, E>
84    where
85        S: HttpService<IncomingBody>,
86        B: Body,
87    {
88        exec: E,
89        timer: Time,
90        service: S,
91        state: State<T, B>,
92        date_header: bool,
93        close_pending: bool
94    }
95}
96
97enum State<T, B>
98where
99    B: Body,
100{
101    Handshaking {
102        ping_config: ping::Config,
103        hs: Handshake<Compat<T>, SendBuf<B::Data>>,
104    },
105    Serving(Serving<T, B>),
106}
107
108struct Serving<T, B>
109where
110    B: Body,
111{
112    ping: Option<(ping::Recorder, ping::Ponger)>,
113    conn: Connection<Compat<T>, SendBuf<B::Data>>,
114    closing: Option<crate::Error>,
115    date_header: bool,
116}
117
118impl<T, S, B, E> Server<T, S, B, E>
119where
120    T: Read + Write + Unpin,
121    S: HttpService<IncomingBody, ResBody = B>,
122    S::Error: Into<Box<dyn StdError + Send + Sync>>,
123    B: Body + 'static,
124    E: Http2ServerConnExec<S::Future, B>,
125{
126    pub(crate) fn new(
127        io: T,
128        service: S,
129        config: &Config,
130        exec: E,
131        timer: Time,
132    ) -> Server<T, S, B, E> {
133        let mut builder = h2::server::Builder::default();
134        builder
135            .initial_window_size(config.initial_stream_window_size)
136            .initial_connection_window_size(config.initial_conn_window_size)
137            .max_frame_size(config.max_frame_size)
138            .max_header_list_size(config.max_header_list_size)
139            .max_local_error_reset_streams(config.max_local_error_reset_streams)
140            .max_send_buffer_size(config.max_send_buffer_size);
141        if let Some(max) = config.max_concurrent_streams {
142            builder.max_concurrent_streams(max);
143        }
144        if let Some(max) = config.max_pending_accept_reset_streams {
145            builder.max_pending_accept_reset_streams(max);
146        }
147        if let Some(size) = config.header_table_size {
148            builder.header_table_size(size);
149        }
150        if config.enable_connect_protocol {
151            builder.enable_connect_protocol();
152        }
153        let handshake = builder.handshake(Compat::new(io));
154
155        let bdp = if config.adaptive_window {
156            Some(config.initial_stream_window_size)
157        } else {
158            None
159        };
160
161        let ping_config = ping::Config {
162            bdp_initial_window: bdp,
163            keep_alive_interval: config.keep_alive_interval,
164            keep_alive_timeout: config.keep_alive_timeout,
165            // If keep-alive is enabled for servers, always enabled while
166            // idle, so it can more aggressively close dead connections.
167            keep_alive_while_idle: true,
168        };
169
170        Server {
171            exec,
172            timer,
173            state: State::Handshaking {
174                ping_config,
175                hs: handshake,
176            },
177            service,
178            date_header: config.date_header,
179            close_pending: false,
180        }
181    }
182
183    pub(crate) fn graceful_shutdown(&mut self) {
184        trace!("graceful_shutdown");
185        match self.state {
186            State::Handshaking { .. } => {
187                self.close_pending = true;
188            }
189            State::Serving(ref mut srv) => {
190                if srv.closing.is_none() {
191                    srv.conn.graceful_shutdown();
192                }
193            }
194        }
195    }
196}
197
198impl<T, S, B, E> Future for Server<T, S, B, E>
199where
200    T: Read + Write + Unpin,
201    S: HttpService<IncomingBody, ResBody = B>,
202    S::Error: Into<Box<dyn StdError + Send + Sync>>,
203    B: Body + 'static,
204    E: Http2ServerConnExec<S::Future, B>,
205{
206    type Output = crate::Result<Dispatched>;
207
208    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209        let me = &mut *self;
210        loop {
211            let next = match me.state {
212                State::Handshaking {
213                    ref mut hs,
214                    ref ping_config,
215                } => {
216                    let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
217                    let ping = if ping_config.is_enabled() {
218                        let pp = conn.ping_pong().expect("conn.ping_pong");
219                        Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
220                    } else {
221                        None
222                    };
223                    State::Serving(Serving {
224                        ping,
225                        conn,
226                        closing: None,
227                        date_header: me.date_header,
228                    })
229                }
230                State::Serving(ref mut srv) => {
231                    // graceful_shutdown was called before handshaking finished,
232                    if me.close_pending && srv.closing.is_none() {
233                        srv.conn.graceful_shutdown();
234                    }
235                    ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
236                    return Poll::Ready(Ok(Dispatched::Shutdown));
237                }
238            };
239            me.state = next;
240        }
241    }
242}
243
244impl<T, B> Serving<T, B>
245where
246    T: Read + Write + Unpin,
247    B: Body + 'static,
248{
249    fn poll_server<S, E>(
250        &mut self,
251        cx: &mut Context<'_>,
252        service: &mut S,
253        exec: &mut E,
254    ) -> Poll<crate::Result<()>>
255    where
256        S: HttpService<IncomingBody, ResBody = B>,
257        S::Error: Into<Box<dyn StdError + Send + Sync>>,
258        E: Http2ServerConnExec<S::Future, B>,
259    {
260        if self.closing.is_none() {
261            loop {
262                self.poll_ping(cx);
263
264                match ready!(self.conn.poll_accept(cx)) {
265                    Some(Ok((req, mut respond))) => {
266                        trace!("incoming request");
267                        let content_length = headers::content_length_parse_all(req.headers());
268                        let ping = self
269                            .ping
270                            .as_ref()
271                            .map(|ping| ping.0.clone())
272                            .unwrap_or_else(ping::disabled);
273
274                        // Record the headers received
275                        ping.record_non_data();
276
277                        let is_connect = req.method() == Method::CONNECT;
278                        let (mut parts, stream) = req.into_parts();
279                        let (mut req, connect_parts) = if !is_connect {
280                            (
281                                Request::from_parts(
282                                    parts,
283                                    IncomingBody::h2(stream, content_length.into(), ping),
284                                ),
285                                None,
286                            )
287                        } else {
288                            if content_length.map_or(false, |len| len != 0) {
289                                warn!("h2 connect request with non-zero body not supported");
290                                respond.send_reset(h2::Reason::INTERNAL_ERROR);
291                                return Poll::Ready(Ok(()));
292                            }
293                            let (pending, upgrade) = crate::upgrade::pending();
294                            debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
295                            parts.extensions.insert(upgrade);
296                            (
297                                Request::from_parts(parts, IncomingBody::empty()),
298                                Some(ConnectParts {
299                                    pending,
300                                    ping,
301                                    recv_stream: stream,
302                                }),
303                            )
304                        };
305
306                        if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
307                            req.extensions_mut().insert(Protocol::from_inner(protocol));
308                        }
309
310                        let fut = H2Stream::new(
311                            service.call(req),
312                            connect_parts,
313                            respond,
314                            self.date_header,
315                            exec.clone(),
316                        );
317
318                        exec.execute_h2stream(fut);
319                    }
320                    Some(Err(e)) => {
321                        return Poll::Ready(Err(crate::Error::new_h2(e)));
322                    }
323                    None => {
324                        // no more incoming streams...
325                        if let Some((ref ping, _)) = self.ping {
326                            ping.ensure_not_timed_out()?;
327                        }
328
329                        trace!("incoming connection complete");
330                        return Poll::Ready(Ok(()));
331                    }
332                }
333            }
334        }
335
336        debug_assert!(
337            self.closing.is_some(),
338            "poll_server broke loop without closing"
339        );
340
341        ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
342
343        Poll::Ready(Err(self.closing.take().expect("polled after error")))
344    }
345
346    fn poll_ping(&mut self, cx: &mut Context<'_>) {
347        if let Some((_, ref mut estimator)) = self.ping {
348            match estimator.poll(cx) {
349                Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
350                    self.conn.set_target_window_size(wnd);
351                    let _ = self.conn.set_initial_window_size(wnd);
352                }
353                Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
354                    debug!("keep-alive timed out, closing connection");
355                    self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
356                }
357                Poll::Pending => {}
358            }
359        }
360    }
361}
362
363pin_project! {
364    #[allow(missing_debug_implementations)]
365    pub struct H2Stream<F, B, E>
366    where
367        B: Body,
368    {
369        reply: SendResponse<SendBuf<B::Data>>,
370        #[pin]
371        state: H2StreamState<F, B>,
372        date_header: bool,
373        exec: E,
374    }
375}
376
377pin_project! {
378    #[project = H2StreamStateProj]
379    enum H2StreamState<F, B>
380    where
381        B: Body,
382    {
383        Service {
384            #[pin]
385            fut: F,
386            connect_parts: Option<ConnectParts>,
387        },
388        Body {
389            #[pin]
390            pipe: PipeToSendStream<B>,
391        },
392    }
393}
394
395struct ConnectParts {
396    pending: Pending,
397    ping: Recorder,
398    recv_stream: RecvStream,
399}
400
401impl<F, B, E> H2Stream<F, B, E>
402where
403    B: Body,
404{
405    fn new(
406        fut: F,
407        connect_parts: Option<ConnectParts>,
408        respond: SendResponse<SendBuf<B::Data>>,
409        date_header: bool,
410        exec: E,
411    ) -> H2Stream<F, B, E> {
412        H2Stream {
413            reply: respond,
414            state: H2StreamState::Service { fut, connect_parts },
415            date_header,
416            exec,
417        }
418    }
419}
420
421macro_rules! reply {
422    ($me:expr, $res:expr, $eos:expr) => {{
423        match $me.reply.send_response($res, $eos) {
424            Ok(tx) => tx,
425            Err(e) => {
426                debug!("send response error: {}", e);
427                $me.reply.send_reset(Reason::INTERNAL_ERROR);
428                return Poll::Ready(Err(crate::Error::new_h2(e)));
429            }
430        }
431    }};
432}
433
434impl<F, B, Ex, E> H2Stream<F, B, Ex>
435where
436    F: Future<Output = Result<Response<B>, E>>,
437    B: Body,
438    B::Data: 'static,
439    B::Error: Into<Box<dyn StdError + Send + Sync>>,
440    Ex: Http2UpgradedExec<B::Data>,
441    E: Into<Box<dyn StdError + Send + Sync>>,
442{
443    fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
444        let mut me = self.as_mut().project();
445        loop {
446            let next = match me.state.as_mut().project() {
447                H2StreamStateProj::Service {
448                    fut: h,
449                    connect_parts,
450                } => {
451                    let res = match h.poll(cx) {
452                        Poll::Ready(Ok(r)) => r,
453                        Poll::Pending => {
454                            // Response is not yet ready, so we want to check if the client has sent a
455                            // RST_STREAM frame which would cancel the current request.
456                            if let Poll::Ready(reason) =
457                                me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
458                            {
459                                debug!("stream received RST_STREAM: {:?}", reason);
460                                return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
461                            }
462                            return Poll::Pending;
463                        }
464                        Poll::Ready(Err(e)) => {
465                            let err = crate::Error::new_user_service(e);
466                            warn!("http2 service errored: {}", err);
467                            me.reply.send_reset(err.h2_reason());
468                            return Poll::Ready(Err(err));
469                        }
470                    };
471
472                    let (head, body) = res.into_parts();
473                    let mut res = ::http::Response::from_parts(head, ());
474                    super::strip_connection_headers(res.headers_mut(), false);
475
476                    // set Date header if it isn't already set if instructed
477                    if *me.date_header {
478                        res.headers_mut()
479                            .entry(::http::header::DATE)
480                            .or_insert_with(date::update_and_header_value);
481                    }
482
483                    if let Some(connect_parts) = connect_parts.take() {
484                        if res.status().is_success() {
485                            if headers::content_length_parse_all(res.headers())
486                                .map_or(false, |len| len != 0)
487                            {
488                                warn!("h2 successful response to CONNECT request with body not supported");
489                                me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
490                                return Poll::Ready(Err(crate::Error::new_user_header()));
491                            }
492                            if res
493                                .headers_mut()
494                                .remove(::http::header::CONTENT_LENGTH)
495                                .is_some()
496                            {
497                                warn!("successful response to CONNECT request disallows content-length header");
498                            }
499                            let send_stream = reply!(me, res, false);
500                            let (h2_up, up_task) = super::upgrade::pair(
501                                send_stream,
502                                connect_parts.recv_stream,
503                                connect_parts.ping,
504                            );
505                            connect_parts
506                                .pending
507                                .fulfill(Upgraded::new(h2_up, Bytes::new()));
508                            self.exec.execute_upgrade(up_task);
509                            return Poll::Ready(Ok(()));
510                        }
511                    }
512
513                    if !body.is_end_stream() {
514                        // automatically set Content-Length from body...
515                        if let Some(len) = body.size_hint().exact() {
516                            headers::set_content_length_if_missing(res.headers_mut(), len);
517                        }
518
519                        let body_tx = reply!(me, res, false);
520                        H2StreamState::Body {
521                            pipe: PipeToSendStream::new(body, body_tx),
522                        }
523                    } else {
524                        reply!(me, res, true);
525                        return Poll::Ready(Ok(()));
526                    }
527                }
528                H2StreamStateProj::Body { pipe } => {
529                    return pipe.poll(cx);
530                }
531            };
532            me.state.set(next);
533        }
534    }
535}
536
537impl<F, B, Ex, E> Future for H2Stream<F, B, Ex>
538where
539    F: Future<Output = Result<Response<B>, E>>,
540    B: Body,
541    B::Data: 'static,
542    B::Error: Into<Box<dyn StdError + Send + Sync>>,
543    Ex: Http2UpgradedExec<B::Data>,
544    E: Into<Box<dyn StdError + Send + Sync>>,
545{
546    type Output = ();
547
548    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549        self.poll2(cx).map(|res| {
550            if let Err(_e) = res {
551                debug!("stream error: {}", _e);
552            }
553        })
554    }
555}