h2/server.rs
1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! tokio::spawn(async {
80//! // Start the HTTP/2 connection handshake
81//! let mut h2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2 streams sent over the
83//! // connection.
84//! while let Some(request) = h2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder()
90//! .status(StatusCode::OK)
91//! .body(())
92//! .unwrap();
93//!
94//! // Send the response back to the client
95//! respond.send_response(response, true)
96//! .unwrap();
97//! }
98//!
99//! });
100//! }
101//! }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use http::{HeaderMap, Method, Request, Response};
125use std::future::Future;
126use std::pin::Pin;
127use std::task::{Context, Poll};
128use std::time::Duration;
129use std::{fmt, io};
130use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131use tracing::instrument::{Instrument, Instrumented};
132
133/// In progress HTTP/2 connection handshake future.
134///
135/// This type implements `Future`, yielding a `Connection` instance once the
136/// handshake has completed.
137///
138/// The handshake is completed once the connection preface is fully received
139/// from the client **and** the initial settings frame is sent to the client.
140///
141/// The handshake future does not wait for the initial settings frame from the
142/// client.
143///
144/// See [module] level docs for more details.
145///
146/// [module]: index.html
147#[must_use = "futures do nothing unless polled"]
148pub struct Handshake<T, B: Buf = Bytes> {
149 /// The config to pass to Connection::new after handshake succeeds.
150 builder: Builder,
151 /// The current state of the handshake.
152 state: Handshaking<T, B>,
153 /// Span tracking the handshake
154 span: tracing::Span,
155}
156
157/// Accepts inbound HTTP/2 streams on a connection.
158///
159/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160/// implements the HTTP/2 server logic for that connection. It is responsible
161/// for receiving inbound streams initiated by the client as well as driving the
162/// internal state forward.
163///
164/// `Connection` values are created by calling [`handshake`]. Once a
165/// `Connection` value is obtained, the caller must call [`poll`] or
166/// [`poll_close`] in order to drive the internal connection state forward.
167///
168/// See [module level] documentation for more details
169///
170/// [module level]: index.html
171/// [`handshake`]: struct.Connection.html#method.handshake
172/// [`poll`]: struct.Connection.html#method.poll
173/// [`poll_close`]: struct.Connection.html#method.poll_close
174///
175/// # Examples
176///
177/// ```
178/// # use tokio::io::{AsyncRead, AsyncWrite};
179/// # use h2::server;
180/// # use h2::server::*;
181/// #
182/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183/// let mut server = server::handshake(my_io).await.unwrap();
184/// while let Some(request) = server.accept().await {
185/// tokio::spawn(async move {
186/// let (request, respond) = request.unwrap();
187/// // Process the request and send the response back to the client
188/// // using `respond`.
189/// });
190/// }
191/// # }
192/// #
193/// # pub fn main() {}
194/// ```
195#[must_use = "streams do nothing unless polled"]
196pub struct Connection<T, B: Buf> {
197 connection: proto::Connection<T, Peer, B>,
198}
199
200/// Builds server connections with custom configuration values.
201///
202/// Methods can be chained in order to set the configuration values.
203///
204/// The server is constructed by calling [`handshake`] and passing the I/O
205/// handle that will back the HTTP/2 server.
206///
207/// New instances of `Builder` are obtained via [`Builder::new`].
208///
209/// See function level documentation for details on the various server
210/// configuration settings.
211///
212/// [`Builder::new`]: struct.Builder.html#method.new
213/// [`handshake`]: struct.Builder.html#method.handshake
214///
215/// # Examples
216///
217/// ```
218/// # use tokio::io::{AsyncRead, AsyncWrite};
219/// # use h2::server::*;
220/// #
221/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222/// # -> Handshake<T>
223/// # {
224/// // `server_fut` is a future representing the completion of the HTTP/2
225/// // handshake.
226/// let server_fut = Builder::new()
227/// .initial_window_size(1_000_000)
228/// .max_concurrent_streams(1000)
229/// .handshake(my_io);
230/// # server_fut
231/// # }
232/// #
233/// # pub fn main() {}
234/// ```
235#[derive(Clone, Debug)]
236pub struct Builder {
237 /// Time to keep locally reset streams around before reaping.
238 reset_stream_duration: Duration,
239
240 /// Maximum number of locally reset streams to keep at a time.
241 reset_stream_max: usize,
242
243 /// Maximum number of remotely reset streams to allow in the pending
244 /// accept queue.
245 pending_accept_reset_stream_max: usize,
246
247 /// Initial `Settings` frame to send as part of the handshake.
248 settings: Settings,
249
250 /// Initial target window size for new connections.
251 initial_target_connection_window_size: Option<u32>,
252
253 /// Maximum amount of bytes to "buffer" for writing per stream.
254 max_send_buffer_size: usize,
255
256 /// Maximum number of locally reset streams due to protocol error across
257 /// the lifetime of the connection.
258 ///
259 /// When this gets exceeded, we issue GOAWAYs.
260 local_max_error_reset_streams: Option<usize>,
261}
262
263/// Send a response back to the client
264///
265/// A `SendResponse` instance is provided when receiving a request and is used
266/// to send the associated response back to the client. It is also used to
267/// explicitly reset the stream with a custom reason.
268///
269/// It will also be used to initiate push promises linked with the associated
270/// stream.
271///
272/// If the `SendResponse` instance is dropped without sending a response, then
273/// the HTTP/2 stream will be reset.
274///
275/// See [module] level docs for more details.
276///
277/// [module]: index.html
278#[derive(Debug)]
279pub struct SendResponse<B: Buf> {
280 inner: proto::StreamRef<B>,
281}
282
283/// Send a response to a promised request
284///
285/// A `SendPushedResponse` instance is provided when promising a request and is used
286/// to send the associated response to the client. It is also used to
287/// explicitly reset the stream with a custom reason.
288///
289/// It can not be used to initiate push promises.
290///
291/// If the `SendPushedResponse` instance is dropped without sending a response, then
292/// the HTTP/2 stream will be reset.
293///
294/// See [module] level docs for more details.
295///
296/// [module]: index.html
297pub struct SendPushedResponse<B: Buf> {
298 inner: SendResponse<B>,
299}
300
301// Manual implementation necessary because of rust-lang/rust#26925
302impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305 }
306}
307
308/// Stages of an in-progress handshake.
309enum Handshaking<T, B: Buf> {
310 /// State 1. Connection is flushing pending SETTINGS frame.
311 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312 /// State 2. Connection is waiting for the client preface.
313 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314 /// State 3. Handshake is done, polling again would panic.
315 Done,
316}
317
318/// Flush a Sink
319struct Flush<T, B> {
320 codec: Option<Codec<T, B>>,
321}
322
323/// Read the client connection preface
324struct ReadPreface<T, B> {
325 codec: Option<Codec<T, B>>,
326 pos: usize,
327}
328
329#[derive(Debug)]
330pub(crate) struct Peer;
331
332const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334/// Creates a new configured HTTP/2 server with default configuration
335/// values backed by `io`.
336///
337/// It is expected that `io` already be in an appropriate state to commence
338/// the [HTTP/2 handshake]. See [Handshake] for more details.
339///
340/// Returns a future which resolves to the [`Connection`] instance once the
341/// HTTP/2 handshake has been completed. The returned [`Connection`]
342/// instance will be using default configuration values. Use [`Builder`] to
343/// customize the configuration values used by a [`Connection`] instance.
344///
345/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346/// [Handshake]: ../index.html#handshake
347/// [`Connection`]: struct.Connection.html
348///
349/// # Examples
350///
351/// ```
352/// # use tokio::io::{AsyncRead, AsyncWrite};
353/// # use h2::server;
354/// # use h2::server::*;
355/// #
356/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357/// # {
358/// let connection = server::handshake(my_io).await.unwrap();
359/// // The HTTP/2 handshake has completed, now use `connection` to
360/// // accept inbound HTTP/2 streams.
361/// # }
362/// #
363/// # pub fn main() {}
364/// ```
365pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366where
367 T: AsyncRead + AsyncWrite + Unpin,
368{
369 Builder::new().handshake(io)
370}
371
372// ===== impl Connection =====
373
374impl<T, B> Connection<T, B>
375where
376 T: AsyncRead + AsyncWrite + Unpin,
377 B: Buf,
378{
379 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380 let span = tracing::trace_span!("server_handshake");
381 let entered = span.enter();
382
383 // Create the codec.
384 let mut codec = Codec::new(io);
385
386 if let Some(max) = builder.settings.max_frame_size() {
387 codec.set_max_recv_frame_size(max as usize);
388 }
389
390 if let Some(max) = builder.settings.max_header_list_size() {
391 codec.set_max_recv_header_list_size(max as usize);
392 }
393
394 // Send initial settings frame.
395 codec
396 .buffer(builder.settings.clone().into())
397 .expect("invalid SETTINGS frame");
398
399 // Create the handshake future.
400 let state =
401 Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403 drop(entered);
404
405 Handshake {
406 builder,
407 state,
408 span,
409 }
410 }
411
412 /// Accept the next incoming request on this connection.
413 pub async fn accept(
414 &mut self,
415 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416 crate::poll_fn(move |cx| self.poll_accept(cx)).await
417 }
418
419 #[doc(hidden)]
420 pub fn poll_accept(
421 &mut self,
422 cx: &mut Context<'_>,
423 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424 // Always try to advance the internal state. Getting Pending also is
425 // needed to allow this function to return Pending.
426 if self.poll_closed(cx)?.is_ready() {
427 // If the socket is closed, don't return anything
428 // TODO: drop any pending streams
429 return Poll::Ready(None);
430 }
431
432 if let Some(inner) = self.connection.next_incoming() {
433 tracing::trace!("received incoming");
434 let (head, _) = inner.take_request().into_parts();
435 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437 let request = Request::from_parts(head, body);
438 let respond = SendResponse { inner };
439
440 return Poll::Ready(Some(Ok((request, respond))));
441 }
442
443 Poll::Pending
444 }
445
446 /// Sets the target window size for the whole connection.
447 ///
448 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449 /// frame will be immediately sent to the remote, increasing the connection
450 /// level window by `size - current_value`.
451 ///
452 /// If `size` is less than the current value, nothing will happen
453 /// immediately. However, as window capacity is released by
454 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455 /// out until the number of "in flight" bytes drops below `size`.
456 ///
457 /// The default value is 65,535.
458 ///
459 /// See [`FlowControl`] documentation for more details.
460 ///
461 /// [`FlowControl`]: ../struct.FlowControl.html
462 /// [library level]: ../index.html#flow-control
463 pub fn set_target_window_size(&mut self, size: u32) {
464 assert!(size <= proto::MAX_WINDOW_SIZE);
465 self.connection.set_target_window_size(size);
466 }
467
468 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469 /// flow control for received data.
470 ///
471 /// The `SETTINGS` will be sent to the remote, and only applied once the
472 /// remote acknowledges the change.
473 ///
474 /// This can be used to increase or decrease the window size for existing
475 /// streams.
476 ///
477 /// # Errors
478 ///
479 /// Returns an error if a previous call is still pending acknowledgement
480 /// from the remote endpoint.
481 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482 assert!(size <= proto::MAX_WINDOW_SIZE);
483 self.connection.set_initial_window_size(size)?;
484 Ok(())
485 }
486
487 /// Enables the [extended CONNECT protocol].
488 ///
489 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490 ///
491 /// # Errors
492 ///
493 /// Returns an error if a previous call is still pending acknowledgement
494 /// from the remote endpoint.
495 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496 self.connection.set_enable_connect_protocol()?;
497 Ok(())
498 }
499
500 /// Returns `Ready` when the underlying connection has closed.
501 ///
502 /// If any new inbound streams are received during a call to `poll_closed`,
503 /// they will be queued and returned on the next call to [`poll_accept`].
504 ///
505 /// This function will advance the internal connection state, driving
506 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507 ///
508 /// See [here](index.html#managing-the-connection) for more details.
509 ///
510 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511 /// [`RecvStream`]: ../struct.RecvStream.html
512 /// [`SendStream`]: ../struct.SendStream.html
513 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514 self.connection.poll(cx).map_err(Into::into)
515 }
516
517 /// Sets the connection to a GOAWAY state.
518 ///
519 /// Does not terminate the connection. Must continue being polled to close
520 /// connection.
521 ///
522 /// After flushing the GOAWAY frame, the connection is closed. Any
523 /// outstanding streams do not prevent the connection from closing. This
524 /// should usually be reserved for shutting down when something bad
525 /// external to `h2` has happened, and open streams cannot be properly
526 /// handled.
527 ///
528 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
529 pub fn abrupt_shutdown(&mut self, reason: Reason) {
530 self.connection.go_away_from_user(reason);
531 }
532
533 /// Starts a [graceful shutdown][1] process.
534 ///
535 /// Must continue being polled to close connection.
536 ///
537 /// It's possible to receive more requests after calling this method, since
538 /// they might have been in-flight from the client already. After about
539 /// 1 RTT, no new requests should be accepted. Once all active streams
540 /// have completed, the connection is closed.
541 ///
542 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
543 pub fn graceful_shutdown(&mut self) {
544 self.connection.go_away_gracefully();
545 }
546
547 /// Takes a `PingPong` instance from the connection.
548 ///
549 /// # Note
550 ///
551 /// This may only be called once. Calling multiple times will return `None`.
552 pub fn ping_pong(&mut self) -> Option<PingPong> {
553 self.connection.take_user_pings().map(PingPong::new)
554 }
555
556 /// Checks if there are any streams
557 pub fn has_streams(&self) -> bool {
558 self.connection.has_streams()
559 }
560
561 /// Returns the maximum number of concurrent streams that may be initiated
562 /// by the server on this connection.
563 ///
564 /// This limit is configured by the client peer by sending the
565 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
566 /// This method returns the currently acknowledged value received from the
567 /// remote.
568 ///
569 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
570 pub fn max_concurrent_send_streams(&self) -> usize {
571 self.connection.max_send_streams()
572 }
573
574 /// Returns the maximum number of concurrent streams that may be initiated
575 /// by the client on this connection.
576 ///
577 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
578 /// parameter][1] sent in a `SETTINGS` frame that has been
579 /// acknowledged by the remote peer. The value to be sent is configured by
580 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
581 /// with the remote peer.
582 ///
583 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
584 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
585 pub fn max_concurrent_recv_streams(&self) -> usize {
586 self.connection.max_recv_streams()
587 }
588
589 // Could disappear at anytime.
590 #[doc(hidden)]
591 #[cfg(feature = "unstable")]
592 pub fn num_wired_streams(&self) -> usize {
593 self.connection.num_wired_streams()
594 }
595}
596
597#[cfg(feature = "stream")]
598impl<T, B> futures_core::Stream for Connection<T, B>
599where
600 T: AsyncRead + AsyncWrite + Unpin,
601 B: Buf,
602{
603 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
604
605 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
606 self.poll_accept(cx)
607 }
608}
609
610impl<T, B> fmt::Debug for Connection<T, B>
611where
612 T: fmt::Debug,
613 B: fmt::Debug + Buf,
614{
615 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
616 fmt.debug_struct("Connection")
617 .field("connection", &self.connection)
618 .finish()
619 }
620}
621
622// ===== impl Builder =====
623
624impl Builder {
625 /// Returns a new server builder instance initialized with default
626 /// configuration values.
627 ///
628 /// Configuration methods can be chained on the return value.
629 ///
630 /// # Examples
631 ///
632 /// ```
633 /// # use tokio::io::{AsyncRead, AsyncWrite};
634 /// # use h2::server::*;
635 /// #
636 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
637 /// # -> Handshake<T>
638 /// # {
639 /// // `server_fut` is a future representing the completion of the HTTP/2
640 /// // handshake.
641 /// let server_fut = Builder::new()
642 /// .initial_window_size(1_000_000)
643 /// .max_concurrent_streams(1000)
644 /// .handshake(my_io);
645 /// # server_fut
646 /// # }
647 /// #
648 /// # pub fn main() {}
649 /// ```
650 pub fn new() -> Builder {
651 Builder {
652 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
653 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
654 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
655 settings: Settings::default(),
656 initial_target_connection_window_size: None,
657 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658
659 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
660 }
661 }
662
663 /// Indicates the initial window size (in octets) for stream-level
664 /// flow control for received data.
665 ///
666 /// The initial window of a stream is used as part of flow control. For more
667 /// details, see [`FlowControl`].
668 ///
669 /// The default value is 65,535.
670 ///
671 /// [`FlowControl`]: ../struct.FlowControl.html
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// # use tokio::io::{AsyncRead, AsyncWrite};
677 /// # use h2::server::*;
678 /// #
679 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
680 /// # -> Handshake<T>
681 /// # {
682 /// // `server_fut` is a future representing the completion of the HTTP/2
683 /// // handshake.
684 /// let server_fut = Builder::new()
685 /// .initial_window_size(1_000_000)
686 /// .handshake(my_io);
687 /// # server_fut
688 /// # }
689 /// #
690 /// # pub fn main() {}
691 /// ```
692 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
693 self.settings.set_initial_window_size(Some(size));
694 self
695 }
696
697 /// Indicates the initial window size (in octets) for connection-level flow control
698 /// for received data.
699 ///
700 /// The initial window of a connection is used as part of flow control. For more details,
701 /// see [`FlowControl`].
702 ///
703 /// The default value is 65,535.
704 ///
705 /// [`FlowControl`]: ../struct.FlowControl.html
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// # use tokio::io::{AsyncRead, AsyncWrite};
711 /// # use h2::server::*;
712 /// #
713 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
714 /// # -> Handshake<T>
715 /// # {
716 /// // `server_fut` is a future representing the completion of the HTTP/2
717 /// // handshake.
718 /// let server_fut = Builder::new()
719 /// .initial_connection_window_size(1_000_000)
720 /// .handshake(my_io);
721 /// # server_fut
722 /// # }
723 /// #
724 /// # pub fn main() {}
725 /// ```
726 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
727 self.initial_target_connection_window_size = Some(size);
728 self
729 }
730
731 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
732 /// configured server is able to accept.
733 ///
734 /// The sender may send data frames that are **smaller** than this value,
735 /// but any data larger than `max` will be broken up into multiple `DATA`
736 /// frames.
737 ///
738 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
739 ///
740 /// # Examples
741 ///
742 /// ```
743 /// # use tokio::io::{AsyncRead, AsyncWrite};
744 /// # use h2::server::*;
745 /// #
746 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747 /// # -> Handshake<T>
748 /// # {
749 /// // `server_fut` is a future representing the completion of the HTTP/2
750 /// // handshake.
751 /// let server_fut = Builder::new()
752 /// .max_frame_size(1_000_000)
753 /// .handshake(my_io);
754 /// # server_fut
755 /// # }
756 /// #
757 /// # pub fn main() {}
758 /// ```
759 ///
760 /// # Panics
761 ///
762 /// This function panics if `max` is not within the legal range specified
763 /// above.
764 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765 self.settings.set_max_frame_size(Some(max));
766 self
767 }
768
769 /// Sets the max size of received header frames.
770 ///
771 /// This advisory setting informs a peer of the maximum size of header list
772 /// that the sender is prepared to accept, in octets. The value is based on
773 /// the uncompressed size of header fields, including the length of the name
774 /// and value in octets plus an overhead of 32 octets for each header field.
775 ///
776 /// This setting is also used to limit the maximum amount of data that is
777 /// buffered to decode HEADERS frames.
778 ///
779 /// # Examples
780 ///
781 /// ```
782 /// # use tokio::io::{AsyncRead, AsyncWrite};
783 /// # use h2::server::*;
784 /// #
785 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
786 /// # -> Handshake<T>
787 /// # {
788 /// // `server_fut` is a future representing the completion of the HTTP/2
789 /// // handshake.
790 /// let server_fut = Builder::new()
791 /// .max_header_list_size(16 * 1024)
792 /// .handshake(my_io);
793 /// # server_fut
794 /// # }
795 /// #
796 /// # pub fn main() {}
797 /// ```
798 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
799 self.settings.set_max_header_list_size(Some(max));
800 self
801 }
802
803 /// Sets the header table size.
804 ///
805 /// This setting informs the peer of the maximum size of the header compression
806 /// table used to encode header blocks, in octets. The encoder may select any value
807 /// equal to or less than the header table size specified by the sender.
808 ///
809 /// The default value is 4,096.
810 pub fn header_table_size(&mut self, size: u32) -> &mut Self {
811 self.settings.set_header_table_size(Some(size));
812 self
813 }
814
815 /// Sets the maximum number of concurrent streams.
816 ///
817 /// The maximum concurrent streams setting only controls the maximum number
818 /// of streams that can be initiated by the remote peer. In other words,
819 /// when this setting is set to 100, this does not limit the number of
820 /// concurrent streams that can be created by the caller.
821 ///
822 /// It is recommended that this value be no smaller than 100, so as to not
823 /// unnecessarily limit parallelism. However, any value is legal, including
824 /// 0. If `max` is set to 0, then the remote will not be permitted to
825 /// initiate streams.
826 ///
827 /// Note that streams in the reserved state, i.e., push promises that have
828 /// been reserved but the stream has not started, do not count against this
829 /// setting.
830 ///
831 /// Also note that if the remote *does* exceed the value set here, it is not
832 /// a protocol level error. Instead, the `h2` library will immediately reset
833 /// the stream.
834 ///
835 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
836 ///
837 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
838 ///
839 /// # Examples
840 ///
841 /// ```
842 /// # use tokio::io::{AsyncRead, AsyncWrite};
843 /// # use h2::server::*;
844 /// #
845 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
846 /// # -> Handshake<T>
847 /// # {
848 /// // `server_fut` is a future representing the completion of the HTTP/2
849 /// // handshake.
850 /// let server_fut = Builder::new()
851 /// .max_concurrent_streams(1000)
852 /// .handshake(my_io);
853 /// # server_fut
854 /// # }
855 /// #
856 /// # pub fn main() {}
857 /// ```
858 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
859 self.settings.set_max_concurrent_streams(Some(max));
860 self
861 }
862
863 /// Sets the maximum number of concurrent locally reset streams.
864 ///
865 /// When a stream is explicitly reset by either calling
866 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
867 /// before completing the stream, the HTTP/2 specification requires that
868 /// any further frames received for that stream must be ignored for "some
869 /// time".
870 ///
871 /// In order to satisfy the specification, internal state must be maintained
872 /// to implement the behavior. This state grows linearly with the number of
873 /// streams that are locally reset.
874 ///
875 /// The `max_concurrent_reset_streams` setting configures sets an upper
876 /// bound on the amount of state that is maintained. When this max value is
877 /// reached, the oldest reset stream is purged from memory.
878 ///
879 /// Once the stream has been fully purged from memory, any additional frames
880 /// received for that stream will result in a connection level protocol
881 /// error, forcing the connection to terminate.
882 ///
883 /// The default value is currently 50.
884 ///
885 /// # Examples
886 ///
887 /// ```
888 /// # use tokio::io::{AsyncRead, AsyncWrite};
889 /// # use h2::server::*;
890 /// #
891 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
892 /// # -> Handshake<T>
893 /// # {
894 /// // `server_fut` is a future representing the completion of the HTTP/2
895 /// // handshake.
896 /// let server_fut = Builder::new()
897 /// .max_concurrent_reset_streams(1000)
898 /// .handshake(my_io);
899 /// # server_fut
900 /// # }
901 /// #
902 /// # pub fn main() {}
903 /// ```
904 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
905 self.reset_stream_max = max;
906 self
907 }
908
909 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
910 ///
911 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
912 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
913 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
914 ///
915 /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
916 /// `ENHANCE_YOUR_CALM` to the client.
917 ///
918 /// If you really want to disable this, supply [`Option::None`] here.
919 /// Disabling this is not recommended and may expose you to DOS attacks.
920 ///
921 /// The default value is currently 1024, but could change.
922 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
923 self.local_max_error_reset_streams = max;
924 self
925 }
926
927 /// Sets the maximum number of pending-accept remotely-reset streams.
928 ///
929 /// Streams that have been received by the peer, but not accepted by the
930 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
931 /// could send a request and then shortly after, realize it is not needed,
932 /// sending a CANCEL.
933 ///
934 /// However, since those streams are now "closed", they don't count towards
935 /// the max concurrent streams. So, they will sit in the accept queue,
936 /// using memory.
937 ///
938 /// When the number of remotely-reset streams sitting in the pending-accept
939 /// queue reaches this maximum value, a connection error with the code of
940 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
941 /// `Future`.
942 ///
943 /// The default value is currently 20, but could change.
944 ///
945 /// # Examples
946 ///
947 ///
948 /// ```
949 /// # use tokio::io::{AsyncRead, AsyncWrite};
950 /// # use h2::server::*;
951 /// #
952 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
953 /// # -> Handshake<T>
954 /// # {
955 /// // `server_fut` is a future representing the completion of the HTTP/2
956 /// // handshake.
957 /// let server_fut = Builder::new()
958 /// .max_pending_accept_reset_streams(100)
959 /// .handshake(my_io);
960 /// # server_fut
961 /// # }
962 /// #
963 /// # pub fn main() {}
964 /// ```
965 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
966 self.pending_accept_reset_stream_max = max;
967 self
968 }
969
970 /// Sets the maximum send buffer size per stream.
971 ///
972 /// Once a stream has buffered up to (or over) the maximum, the stream's
973 /// flow control will not "poll" additional capacity. Once bytes for the
974 /// stream have been written to the connection, the send buffer capacity
975 /// will be freed up again.
976 ///
977 /// The default is currently ~400KB, but may change.
978 ///
979 /// # Panics
980 ///
981 /// This function panics if `max` is larger than `u32::MAX`.
982 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
983 assert!(max <= u32::MAX as usize);
984 self.max_send_buffer_size = max;
985 self
986 }
987
988 /// Sets the maximum number of concurrent locally reset streams.
989 ///
990 /// When a stream is explicitly reset by either calling
991 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
992 /// before completing the stream, the HTTP/2 specification requires that
993 /// any further frames received for that stream must be ignored for "some
994 /// time".
995 ///
996 /// In order to satisfy the specification, internal state must be maintained
997 /// to implement the behavior. This state grows linearly with the number of
998 /// streams that are locally reset.
999 ///
1000 /// The `reset_stream_duration` setting configures the max amount of time
1001 /// this state will be maintained in memory. Once the duration elapses, the
1002 /// stream state is purged from memory.
1003 ///
1004 /// Once the stream has been fully purged from memory, any additional frames
1005 /// received for that stream will result in a connection level protocol
1006 /// error, forcing the connection to terminate.
1007 ///
1008 /// The default value is currently 1 second.
1009 ///
1010 /// # Examples
1011 ///
1012 /// ```
1013 /// # use tokio::io::{AsyncRead, AsyncWrite};
1014 /// # use h2::server::*;
1015 /// # use std::time::Duration;
1016 /// #
1017 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1018 /// # -> Handshake<T>
1019 /// # {
1020 /// // `server_fut` is a future representing the completion of the HTTP/2
1021 /// // handshake.
1022 /// let server_fut = Builder::new()
1023 /// .reset_stream_duration(Duration::from_secs(10))
1024 /// .handshake(my_io);
1025 /// # server_fut
1026 /// # }
1027 /// #
1028 /// # pub fn main() {}
1029 /// ```
1030 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1031 self.reset_stream_duration = dur;
1032 self
1033 }
1034
1035 /// Enables the [extended CONNECT protocol].
1036 ///
1037 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1038 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1039 self.settings.set_enable_connect_protocol(Some(1));
1040 self
1041 }
1042
1043 /// Creates a new configured HTTP/2 server backed by `io`.
1044 ///
1045 /// It is expected that `io` already be in an appropriate state to commence
1046 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1047 ///
1048 /// Returns a future which resolves to the [`Connection`] instance once the
1049 /// HTTP/2 handshake has been completed.
1050 ///
1051 /// This function also allows the caller to configure the send payload data
1052 /// type. See [Outbound data type] for more details.
1053 ///
1054 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1055 /// [Handshake]: ../index.html#handshake
1056 /// [`Connection`]: struct.Connection.html
1057 /// [Outbound data type]: ../index.html#outbound-data-type.
1058 ///
1059 /// # Examples
1060 ///
1061 /// Basic usage:
1062 ///
1063 /// ```
1064 /// # use tokio::io::{AsyncRead, AsyncWrite};
1065 /// # use h2::server::*;
1066 /// #
1067 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1068 /// # -> Handshake<T>
1069 /// # {
1070 /// // `server_fut` is a future representing the completion of the HTTP/2
1071 /// // handshake.
1072 /// let server_fut = Builder::new()
1073 /// .handshake(my_io);
1074 /// # server_fut
1075 /// # }
1076 /// #
1077 /// # pub fn main() {}
1078 /// ```
1079 ///
1080 /// Configures the send-payload data type. In this case, the outbound data
1081 /// type will be `&'static [u8]`.
1082 ///
1083 /// ```
1084 /// # use tokio::io::{AsyncRead, AsyncWrite};
1085 /// # use h2::server::*;
1086 /// #
1087 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1088 /// # -> Handshake<T, &'static [u8]>
1089 /// # {
1090 /// // `server_fut` is a future representing the completion of the HTTP/2
1091 /// // handshake.
1092 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1093 /// .handshake(my_io);
1094 /// # server_fut
1095 /// # }
1096 /// #
1097 /// # pub fn main() {}
1098 /// ```
1099 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1100 where
1101 T: AsyncRead + AsyncWrite + Unpin,
1102 B: Buf,
1103 {
1104 Connection::handshake2(io, self.clone())
1105 }
1106}
1107
1108impl Default for Builder {
1109 fn default() -> Builder {
1110 Builder::new()
1111 }
1112}
1113
1114// ===== impl SendResponse =====
1115
1116impl<B: Buf> SendResponse<B> {
1117 /// Send an interim informational response (1xx status codes)
1118 ///
1119 /// This method can be called multiple times before calling `send_response()`
1120 /// to send the final response. Only 1xx status codes are allowed.
1121 ///
1122 /// Interim informational responses are used to provide early feedback to the client
1123 /// before the final response is ready. Common examples include:
1124 /// - 100 Continue: Indicates the client should continue with the request
1125 /// - 103 Early Hints: Provides early hints about resources to preload
1126 ///
1127 /// # Arguments
1128 /// * `response` - HTTP response with 1xx status code and headers
1129 ///
1130 /// # Returns
1131 /// * `Ok(())` - Interim Informational response sent successfully
1132 /// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.)
1133 ///
1134 /// # Examples
1135 /// ```rust
1136 /// use h2::server;
1137 /// use http::{Response, StatusCode};
1138 ///
1139 /// # async fn example(mut send_response: h2::server::SendResponse<bytes::Bytes>) -> Result<(), h2::Error> {
1140 /// // Send 100 Continue before processing request body
1141 /// let continue_response = Response::builder()
1142 /// .status(StatusCode::CONTINUE)
1143 /// .body(())
1144 /// .unwrap();
1145 /// send_response.send_informational(continue_response)?;
1146 ///
1147 /// // Later send the final response
1148 /// let final_response = Response::builder()
1149 /// .status(StatusCode::OK)
1150 /// .body(())
1151 /// .unwrap();
1152 /// let _stream = send_response.send_response(final_response, false)?;
1153 /// # Ok(())
1154 /// # }
1155 /// ```
1156 ///
1157 /// # Errors
1158 /// This method will return an error if:
1159 /// - The response status code is not in the 1xx range
1160 /// - The final response has already been sent
1161 /// - There is a connection-level error
1162 pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> {
1163 let stream_id = self.inner.stream_id();
1164 let status = response.status();
1165
1166 tracing::trace!(
1167 "send_informational called with status: {} on stream: {:?}",
1168 status,
1169 stream_id
1170 );
1171
1172 // Validate that this is an informational response (1xx status code)
1173 if !response.status().is_informational() {
1174 tracing::trace!(
1175 "invalid informational status code: {} on stream: {:?}",
1176 status,
1177 stream_id
1178 );
1179 return Err(crate::Error::from(
1180 UserError::InvalidInformationalStatusCode,
1181 ));
1182 }
1183
1184 tracing::trace!(
1185 "converting informational response to HEADERS frame without END_STREAM flag for stream: {:?}",
1186 stream_id
1187 );
1188
1189 let frame = Peer::convert_send_message(
1190 stream_id, response, false, // NOT end_of_stream for informational responses
1191 );
1192
1193 tracing::trace!(
1194 "sending interim informational headers frame for stream: {:?}",
1195 stream_id
1196 );
1197
1198 // Use the proper H2 streams API for sending interim informational headers
1199 // This bypasses the normal response flow and allows multiple informational responses
1200 let result = self
1201 .inner
1202 .send_informational_headers(frame)
1203 .map_err(Into::into);
1204
1205 match &result {
1206 Ok(()) => tracing::trace!(
1207 "Successfully sent informational headers for stream: {:?}",
1208 stream_id
1209 ),
1210 Err(e) => tracing::trace!(
1211 "Failed to send informational headers for stream: {:?}: {:?}",
1212 stream_id,
1213 e
1214 ),
1215 }
1216
1217 result
1218 }
1219
1220 /// Send a response to a client request.
1221 ///
1222 /// On success, a [`SendStream`] instance is returned. This instance can be
1223 /// used to stream the response body and send trailers.
1224 ///
1225 /// If a body or trailers will be sent on the returned [`SendStream`]
1226 /// instance, then `end_of_stream` must be set to `false` when calling this
1227 /// function.
1228 ///
1229 /// The [`SendResponse`] instance is already associated with a received
1230 /// request. This function may only be called once per instance and only if
1231 /// [`send_reset`] has not been previously called.
1232 ///
1233 /// [`SendResponse`]: #
1234 /// [`SendStream`]: ../struct.SendStream.html
1235 /// [`send_reset`]: #method.send_reset
1236 pub fn send_response(
1237 &mut self,
1238 response: Response<()>,
1239 end_of_stream: bool,
1240 ) -> Result<SendStream<B>, crate::Error> {
1241 self.inner
1242 .send_response(response, end_of_stream)
1243 .map(|_| SendStream::new(self.inner.clone()))
1244 .map_err(Into::into)
1245 }
1246
1247 /// Push a request and response to the client
1248 ///
1249 /// On success, a [`SendResponse`] instance is returned.
1250 ///
1251 /// [`SendResponse`]: #
1252 pub fn push_request(
1253 &mut self,
1254 request: Request<()>,
1255 ) -> Result<SendPushedResponse<B>, crate::Error> {
1256 self.inner
1257 .send_push_promise(request)
1258 .map(|inner| SendPushedResponse {
1259 inner: SendResponse { inner },
1260 })
1261 .map_err(Into::into)
1262 }
1263
1264 /// Send a stream reset to the peer.
1265 ///
1266 /// This essentially cancels the stream, including any inbound or outbound
1267 /// data streams.
1268 ///
1269 /// If this function is called before [`send_response`], a call to
1270 /// [`send_response`] will result in an error.
1271 ///
1272 /// If this function is called while a [`SendStream`] instance is active,
1273 /// any further use of the instance will result in an error.
1274 ///
1275 /// This function should only be called once.
1276 ///
1277 /// [`send_response`]: #method.send_response
1278 /// [`SendStream`]: ../struct.SendStream.html
1279 pub fn send_reset(&mut self, reason: Reason) {
1280 self.inner.send_reset(reason)
1281 }
1282
1283 /// Polls to be notified when the client resets this stream.
1284 ///
1285 /// If stream is still open, this returns `Poll::Pending`, and
1286 /// registers the task to be notified if a `RST_STREAM` is received.
1287 ///
1288 /// If a `RST_STREAM` frame is received for this stream, calling this
1289 /// method will yield the `Reason` for the reset.
1290 ///
1291 /// # Error
1292 ///
1293 /// Calling this method after having called `send_response` will return
1294 /// a user error.
1295 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1296 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1297 }
1298
1299 /// Returns the stream ID of the response stream.
1300 ///
1301 /// # Panics
1302 ///
1303 /// If the lock on the stream store has been poisoned.
1304 pub fn stream_id(&self) -> crate::StreamId {
1305 crate::StreamId::from_internal(self.inner.stream_id())
1306 }
1307}
1308
1309// ===== impl SendPushedResponse =====
1310
1311impl<B: Buf> SendPushedResponse<B> {
1312 /// Send a response to a promised request.
1313 ///
1314 /// On success, a [`SendStream`] instance is returned. This instance can be
1315 /// used to stream the response body and send trailers.
1316 ///
1317 /// If a body or trailers will be sent on the returned [`SendStream`]
1318 /// instance, then `end_of_stream` must be set to `false` when calling this
1319 /// function.
1320 ///
1321 /// The [`SendPushedResponse`] instance is associated with a promised
1322 /// request. This function may only be called once per instance and only if
1323 /// [`send_reset`] has not been previously called.
1324 ///
1325 /// [`SendPushedResponse`]: #
1326 /// [`SendStream`]: ../struct.SendStream.html
1327 /// [`send_reset`]: #method.send_reset
1328 pub fn send_response(
1329 &mut self,
1330 response: Response<()>,
1331 end_of_stream: bool,
1332 ) -> Result<SendStream<B>, crate::Error> {
1333 self.inner.send_response(response, end_of_stream)
1334 }
1335
1336 /// Send a stream reset to the peer.
1337 ///
1338 /// This essentially cancels the stream, including any inbound or outbound
1339 /// data streams.
1340 ///
1341 /// If this function is called before [`send_response`], a call to
1342 /// [`send_response`] will result in an error.
1343 ///
1344 /// If this function is called while a [`SendStream`] instance is active,
1345 /// any further use of the instance will result in an error.
1346 ///
1347 /// This function should only be called once.
1348 ///
1349 /// [`send_response`]: #method.send_response
1350 /// [`SendStream`]: ../struct.SendStream.html
1351 pub fn send_reset(&mut self, reason: Reason) {
1352 self.inner.send_reset(reason)
1353 }
1354
1355 /// Polls to be notified when the client resets this stream.
1356 ///
1357 /// If stream is still open, this returns `Poll::Pending`, and
1358 /// registers the task to be notified if a `RST_STREAM` is received.
1359 ///
1360 /// If a `RST_STREAM` frame is received for this stream, calling this
1361 /// method will yield the `Reason` for the reset.
1362 ///
1363 /// # Error
1364 ///
1365 /// Calling this method after having called `send_response` will return
1366 /// a user error.
1367 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1368 self.inner.poll_reset(cx)
1369 }
1370
1371 /// Returns the stream ID of the response stream.
1372 ///
1373 /// # Panics
1374 ///
1375 /// If the lock on the stream store has been poisoned.
1376 pub fn stream_id(&self) -> crate::StreamId {
1377 self.inner.stream_id()
1378 }
1379}
1380
1381// ===== impl Flush =====
1382
1383impl<T, B: Buf> Flush<T, B> {
1384 fn new(codec: Codec<T, B>) -> Self {
1385 Flush { codec: Some(codec) }
1386 }
1387}
1388
1389impl<T, B> Future for Flush<T, B>
1390where
1391 T: AsyncWrite + Unpin,
1392 B: Buf,
1393{
1394 type Output = Result<Codec<T, B>, crate::Error>;
1395
1396 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1397 // Flush the codec
1398 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1399
1400 // Return the codec
1401 Poll::Ready(Ok(self.codec.take().unwrap()))
1402 }
1403}
1404
1405impl<T, B: Buf> ReadPreface<T, B> {
1406 fn new(codec: Codec<T, B>) -> Self {
1407 ReadPreface {
1408 codec: Some(codec),
1409 pos: 0,
1410 }
1411 }
1412
1413 fn inner_mut(&mut self) -> &mut T {
1414 self.codec.as_mut().unwrap().get_mut()
1415 }
1416}
1417
1418impl<T, B> Future for ReadPreface<T, B>
1419where
1420 T: AsyncRead + Unpin,
1421 B: Buf,
1422{
1423 type Output = Result<Codec<T, B>, crate::Error>;
1424
1425 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1426 let mut buf = [0; 24];
1427 let mut rem = PREFACE.len() - self.pos;
1428
1429 while rem > 0 {
1430 let mut buf = ReadBuf::new(&mut buf[..rem]);
1431 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1432 .map_err(crate::Error::from_io)?;
1433 let n = buf.filled().len();
1434 if n == 0 {
1435 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1436 io::ErrorKind::UnexpectedEof,
1437 "connection closed before reading preface",
1438 ))));
1439 }
1440
1441 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1442 proto_err!(conn: "read_preface: invalid preface");
1443 // TODO: Should this just write the GO_AWAY frame directly?
1444 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1445 }
1446
1447 self.pos += n;
1448 rem -= n; // TODO test
1449 }
1450
1451 Poll::Ready(Ok(self.codec.take().unwrap()))
1452 }
1453}
1454
1455// ===== impl Handshake =====
1456
1457impl<T, B: Buf> Future for Handshake<T, B>
1458where
1459 T: AsyncRead + AsyncWrite + Unpin,
1460 B: Buf,
1461{
1462 type Output = Result<Connection<T, B>, crate::Error>;
1463
1464 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1465 let span = self.span.clone(); // XXX(eliza): T_T
1466 let _e = span.enter();
1467 tracing::trace!(state = ?self.state);
1468
1469 loop {
1470 match &mut self.state {
1471 Handshaking::Flushing(flush) => {
1472 // We're currently flushing a pending SETTINGS frame. Poll the
1473 // flush future, and, if it's completed, advance our state to wait
1474 // for the client preface.
1475 let codec = match Pin::new(flush).poll(cx)? {
1476 Poll::Pending => {
1477 tracing::trace!(flush.poll = %"Pending");
1478 return Poll::Pending;
1479 }
1480 Poll::Ready(flushed) => {
1481 tracing::trace!(flush.poll = %"Ready");
1482 flushed
1483 }
1484 };
1485 self.state = Handshaking::ReadingPreface(
1486 ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1487 );
1488 }
1489 Handshaking::ReadingPreface(read) => {
1490 let codec = ready!(Pin::new(read).poll(cx)?);
1491
1492 self.state = Handshaking::Done;
1493
1494 let connection = proto::Connection::new(
1495 codec,
1496 Config {
1497 next_stream_id: 2.into(),
1498 // Server does not need to locally initiate any streams
1499 initial_max_send_streams: 0,
1500 max_send_buffer_size: self.builder.max_send_buffer_size,
1501 reset_stream_duration: self.builder.reset_stream_duration,
1502 reset_stream_max: self.builder.reset_stream_max,
1503 remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1504 local_error_reset_streams_max: self
1505 .builder
1506 .local_max_error_reset_streams,
1507 settings: self.builder.settings.clone(),
1508 },
1509 );
1510
1511 tracing::trace!("connection established!");
1512 let mut c = Connection { connection };
1513 if let Some(sz) = self.builder.initial_target_connection_window_size {
1514 c.set_target_window_size(sz);
1515 }
1516
1517 return Poll::Ready(Ok(c));
1518 }
1519 Handshaking::Done => {
1520 panic!("Handshaking::poll() called again after handshaking was complete")
1521 }
1522 }
1523 }
1524 }
1525}
1526
1527impl<T, B> fmt::Debug for Handshake<T, B>
1528where
1529 T: AsyncRead + AsyncWrite + fmt::Debug,
1530 B: fmt::Debug + Buf,
1531{
1532 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1533 write!(fmt, "server::Handshake")
1534 }
1535}
1536
1537impl Peer {
1538 pub fn convert_send_message(
1539 id: StreamId,
1540 response: Response<()>,
1541 end_of_stream: bool,
1542 ) -> frame::Headers {
1543 use http::response::Parts;
1544
1545 // Extract the components of the HTTP request
1546 let (
1547 Parts {
1548 status, headers, ..
1549 },
1550 _,
1551 ) = response.into_parts();
1552
1553 // Build the set pseudo header set. All requests will include `method`
1554 // and `path`.
1555 let pseudo = Pseudo::response(status);
1556
1557 // Create the HEADERS frame
1558 let mut frame = frame::Headers::new(id, pseudo, headers);
1559
1560 if end_of_stream {
1561 frame.set_end_stream()
1562 }
1563
1564 frame
1565 }
1566
1567 pub fn convert_push_message(
1568 stream_id: StreamId,
1569 promised_id: StreamId,
1570 request: Request<()>,
1571 ) -> Result<frame::PushPromise, UserError> {
1572 use http::request::Parts;
1573
1574 if let Err(e) = frame::PushPromise::validate_request(&request) {
1575 use PushPromiseHeaderError::*;
1576 match e {
1577 NotSafeAndCacheable => tracing::debug!(
1578 ?promised_id,
1579 "convert_push_message: method {} is not safe and cacheable",
1580 request.method(),
1581 ),
1582 InvalidContentLength(e) => tracing::debug!(
1583 ?promised_id,
1584 "convert_push_message; promised request has invalid content-length {:?}",
1585 e,
1586 ),
1587 }
1588 return Err(UserError::MalformedHeaders);
1589 }
1590
1591 // Extract the components of the HTTP request
1592 let (
1593 Parts {
1594 method,
1595 uri,
1596 headers,
1597 ..
1598 },
1599 _,
1600 ) = request.into_parts();
1601
1602 let pseudo = Pseudo::request(method, uri, None);
1603
1604 Ok(frame::PushPromise::new(
1605 stream_id,
1606 promised_id,
1607 pseudo,
1608 headers,
1609 ))
1610 }
1611}
1612
1613impl proto::Peer for Peer {
1614 type Poll = Request<()>;
1615
1616 const NAME: &'static str = "Server";
1617
1618 /*
1619 fn is_server() -> bool {
1620 true
1621 }
1622 */
1623
1624 fn r#dyn() -> proto::DynPeer {
1625 proto::DynPeer::Server
1626 }
1627
1628 fn convert_poll_message(
1629 pseudo: Pseudo,
1630 fields: HeaderMap,
1631 stream_id: StreamId,
1632 ) -> Result<Self::Poll, Error> {
1633 use http::{uri, Version};
1634
1635 let mut b = Request::builder();
1636
1637 macro_rules! malformed {
1638 ($($arg:tt)*) => {{
1639 tracing::debug!($($arg)*);
1640 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1641 }}
1642 }
1643
1644 b = b.version(Version::HTTP_2);
1645
1646 let is_connect;
1647 if let Some(method) = pseudo.method {
1648 is_connect = method == Method::CONNECT;
1649 b = b.method(method);
1650 } else {
1651 malformed!("malformed headers: missing method");
1652 }
1653
1654 let has_protocol = pseudo.protocol.is_some();
1655 if has_protocol {
1656 if is_connect {
1657 // Assert that we have the right type.
1658 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1659 } else {
1660 malformed!("malformed headers: :protocol on non-CONNECT request");
1661 }
1662 }
1663
1664 if pseudo.status.is_some() {
1665 malformed!("malformed headers: :status field on request");
1666 }
1667
1668 // Convert the URI
1669 let mut parts = uri::Parts::default();
1670
1671 // A request translated from HTTP/1 must not include the :authority
1672 // header
1673 if let Some(authority) = pseudo.authority {
1674 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1675 parts.authority = Some(maybe_authority.or_else(|why| {
1676 malformed!(
1677 "malformed headers: malformed authority ({:?}): {}",
1678 authority,
1679 why,
1680 )
1681 })?);
1682 }
1683
1684 // A :scheme is required, except CONNECT.
1685 if let Some(scheme) = pseudo.scheme {
1686 if is_connect && !has_protocol {
1687 malformed!("malformed headers: :scheme in CONNECT");
1688 }
1689 let maybe_scheme = scheme.parse();
1690 let scheme = maybe_scheme.or_else(|why| {
1691 malformed!(
1692 "malformed headers: malformed scheme ({:?}): {}",
1693 scheme,
1694 why,
1695 )
1696 })?;
1697
1698 // It's not possible to build an `Uri` from a scheme and path. So,
1699 // after validating is was a valid scheme, we just have to drop it
1700 // if there isn't an :authority.
1701 if parts.authority.is_some() {
1702 parts.scheme = Some(scheme);
1703 }
1704 } else if !is_connect || has_protocol {
1705 malformed!("malformed headers: missing scheme");
1706 }
1707
1708 if let Some(path) = pseudo.path {
1709 if is_connect && !has_protocol {
1710 malformed!("malformed headers: :path in CONNECT");
1711 }
1712
1713 // This cannot be empty
1714 if path.is_empty() {
1715 malformed!("malformed headers: missing path");
1716 }
1717
1718 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1719 parts.path_and_query = Some(maybe_path.or_else(|why| {
1720 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1721 })?);
1722 } else if is_connect && has_protocol {
1723 malformed!("malformed headers: missing path in extended CONNECT");
1724 }
1725
1726 b = b.uri(parts);
1727
1728 let mut request = match b.body(()) {
1729 Ok(request) => request,
1730 Err(e) => {
1731 // TODO: Should there be more specialized handling for different
1732 // kinds of errors
1733 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1734 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1735 }
1736 };
1737
1738 *request.headers_mut() = fields;
1739
1740 Ok(request)
1741 }
1742}
1743
1744// ===== impl Handshaking =====
1745
1746impl<T, B> fmt::Debug for Handshaking<T, B>
1747where
1748 B: Buf,
1749{
1750 #[inline]
1751 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1752 match *self {
1753 Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1754 Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1755 Handshaking::Done => f.write_str("Done"),
1756 }
1757 }
1758}