Skip to main content

hyper/client/conn/
http1.rs

1//! HTTP/1 client connections.
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use futures_core::ready;
12use http::{Request, Response};
13use httparse::ParserConfig;
14
15use super::super::dispatch::{self, TrySendError};
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18
19type Dispatcher<T, B> =
20    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
21
22/// The sender side of an established connection.
23pub struct SendRequest<B> {
24    dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
25}
26
27/// Deconstructed parts of a `Connection`.
28///
29/// This allows taking apart a `Connection` at a later time, in order to
30/// reclaim the IO object, and additional related pieces.
31#[derive(Debug)]
32#[non_exhaustive]
33pub struct Parts<T> {
34    /// The original IO object used in the handshake.
35    pub io: T,
36    /// A buffer of bytes that have been read but not processed as HTTP.
37    ///
38    /// For instance, if the `Connection` is used for an HTTP upgrade request,
39    /// it is possible the server sent back the first bytes of the new protocol
40    /// along with the response upgrade.
41    ///
42    /// You will want to check for any existing bytes if you plan to continue
43    /// communicating on the IO object.
44    pub read_buf: Bytes,
45}
46
47/// A future that processes all HTTP state for the IO object.
48///
49/// In most cases, this should just be spawned into an executor, so that it
50/// can process incoming and outgoing messages, notice hangups, and the like.
51///
52/// Instances of this type are typically created via the [`handshake`] function.
53///
54/// # Drop behavior
55///
56/// Dropping the `Connection` will close the underlying IO resource.
57/// Any in-flight requests that have not received a response will be
58/// interrupted. If graceful shutdown is desired, poll the connection
59/// until it completes instead of dropping.
60#[must_use = "futures do nothing unless polled"]
61pub struct Connection<T, B>
62where
63    T: Read + Write,
64    B: Body + 'static,
65{
66    inner: Dispatcher<T, B>,
67}
68
69impl<T, B> Connection<T, B>
70where
71    T: Read + Write + Unpin,
72    B: Body + 'static,
73    B::Error: Into<Box<dyn StdError + Send + Sync>>,
74{
75    /// Return the inner IO object, and additional information.
76    ///
77    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
78    pub fn into_parts(self) -> Parts<T> {
79        let (io, read_buf, _) = self.inner.into_inner();
80        Parts { io, read_buf }
81    }
82
83    /// Poll the connection for completion, but without calling `shutdown`
84    /// on the underlying IO.
85    ///
86    /// This is useful to allow running a connection while doing an HTTP
87    /// upgrade. Once the upgrade is completed, the connection would be "done",
88    /// but it is not desired to actually shutdown the IO object. Instead you
89    /// would take it back using `into_parts`.
90    ///
91    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
92    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
93    /// to work with this function; or use the `without_shutdown` wrapper.
94    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
95        self.inner.poll_without_shutdown(cx)
96    }
97
98    /// Prevent shutdown of the underlying IO object at the end of service the request,
99    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
100    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
101        let mut conn = Some(self);
102        crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
103            ready!(conn
104                .as_mut()
105                .expect("client connection polled after completion")
106                .poll_without_shutdown(cx))?;
107            Poll::Ready(Ok(conn
108                .take()
109                .expect("client connection missing before completion")
110                .into_parts()))
111        })
112        .await
113    }
114}
115
116/// A builder to configure an HTTP connection.
117///
118/// After setting options, the builder is used to create a handshake future.
119///
120/// **Note**: The default values of options are *not considered stable*. They
121/// are subject to change at any time.
122#[derive(Clone, Debug)]
123pub struct Builder {
124    h09_responses: bool,
125    h1_parser_config: ParserConfig,
126    h1_writev: Option<bool>,
127    h1_title_case_headers: bool,
128    h1_preserve_header_case: bool,
129    h1_max_headers: Option<usize>,
130    #[cfg(feature = "ffi")]
131    h1_preserve_header_order: bool,
132    h1_read_buf_exact_size: Option<usize>,
133    h1_max_buf_size: Option<usize>,
134}
135
136/// Returns a handshake future over some IO.
137///
138/// This is a shortcut for `Builder::new().handshake(io)`.
139/// See [`client::conn`](crate::client::conn) for more.
140pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
141where
142    T: Read + Write + Unpin,
143    B: Body + 'static,
144    B::Data: Send,
145    B::Error: Into<Box<dyn StdError + Send + Sync>>,
146{
147    Builder::new().handshake(io).await
148}
149
150// ===== impl SendRequest
151
152impl<B> SendRequest<B> {
153    /// Polls to determine whether this sender can be used yet for a request.
154    ///
155    /// If the associated connection is closed, this returns an Error.
156    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
157        self.dispatch.poll_ready(cx)
158    }
159
160    /// Waits until the dispatcher is ready.
161    ///
162    /// If the associated connection is closed, this returns an Error.
163    pub async fn ready(&mut self) -> crate::Result<()> {
164        crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
165    }
166
167    /// Checks if the connection is currently ready to send a request.
168    ///
169    /// # Note
170    ///
171    /// This is mostly a hint. Due to inherent latency of networks, it is
172    /// possible that even after checking this is ready, sending a request
173    /// may still fail because the connection was closed in the meantime.
174    pub fn is_ready(&self) -> bool {
175        self.dispatch.is_ready()
176    }
177
178    /// Checks if the connection side has been closed.
179    pub fn is_closed(&self) -> bool {
180        self.dispatch.is_closed()
181    }
182}
183
184impl<B> SendRequest<B>
185where
186    B: Body + 'static,
187{
188    /// Sends a `Request` on the associated connection.
189    ///
190    /// Returns a future that if successful, yields the `Response`.
191    ///
192    /// `req` must have a `Host` header.
193    ///
194    /// # Uri
195    ///
196    /// The `Uri` of the request is serialized as-is.
197    ///
198    /// - Usually you want origin-form (`/path?query`).
199    /// - For sending to an HTTP proxy, you want to send in absolute-form
200    ///   (`https://hyper.rs/guides`).
201    ///
202    /// This is however not enforced or validated and it is up to the user
203    /// of this method to ensure the `Uri` is correct for their intended purpose.
204    ///
205    /// # Cancel safety
206    ///
207    /// Dropping the returned future is the supported way to cancel an
208    /// in-flight HTTP/1 request. Because HTTP/1 has no in-protocol way to
209    /// abort a single request without affecting the shared connection,
210    /// hyper closes the underlying connection when a request future is
211    /// dropped before completion. Any subsequent calls on the same
212    /// [`SendRequest`] will return a `canceled` error.
213    pub fn send_request(
214        &mut self,
215        req: Request<B>,
216    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
217        let sent = self.dispatch.send(req);
218
219        async move {
220            match sent {
221                Ok(rx) => match rx.await {
222                    Ok(Ok(resp)) => Ok(resp),
223                    Ok(Err(err)) => Err(err),
224                    // this is definite bug if it happens, but it shouldn't happen!
225                    Err(_canceled) => panic!("dispatch dropped without returning error"),
226                },
227                Err(_req) => {
228                    debug!("connection was not ready");
229                    Err(crate::Error::new_canceled().with("connection was not ready"))
230                }
231            }
232        }
233    }
234
235    /// Sends a `Request` on the associated connection.
236    ///
237    /// Returns a future that if successful, yields the `Response`.
238    ///
239    /// # Error
240    ///
241    /// If there was an error before trying to serialize the request to the
242    /// connection, the message will be returned as part of this error.
243    pub fn try_send_request(
244        &mut self,
245        req: Request<B>,
246    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
247        let sent = self.dispatch.try_send(req);
248        async move {
249            match sent {
250                Ok(rx) => match rx.await {
251                    Ok(Ok(res)) => Ok(res),
252                    Ok(Err(err)) => Err(err),
253                    // this is definite bug if it happens, but it shouldn't happen!
254                    Err(_) => panic!("dispatch dropped without returning error"),
255                },
256                Err(req) => {
257                    debug!("connection was not ready");
258                    let error = crate::Error::new_canceled().with("connection was not ready");
259                    Err(TrySendError {
260                        error,
261                        message: Some(req),
262                    })
263                }
264            }
265        }
266    }
267}
268
269impl<B> fmt::Debug for SendRequest<B> {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.debug_struct("SendRequest").finish()
272    }
273}
274
275// ===== impl Connection
276
277impl<T, B> Connection<T, B>
278where
279    T: Read + Write + Unpin + Send,
280    B: Body + 'static,
281    B::Error: Into<Box<dyn StdError + Send + Sync>>,
282{
283    /// Enable this connection to support higher-level HTTP upgrades.
284    ///
285    /// See [the `upgrade` module](crate::upgrade) for more.
286    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
287        upgrades::UpgradeableConnection { inner: Some(self) }
288    }
289}
290
291impl<T, B> fmt::Debug for Connection<T, B>
292where
293    T: Read + Write + fmt::Debug,
294    B: Body + 'static,
295{
296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297        f.debug_struct("Connection").finish()
298    }
299}
300
301impl<T, B> Future for Connection<T, B>
302where
303    T: Read + Write + Unpin,
304    B: Body + 'static,
305    B::Data: Send,
306    B::Error: Into<Box<dyn StdError + Send + Sync>>,
307{
308    type Output = crate::Result<()>;
309
310    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
311        match ready!(Pin::new(&mut self.inner).poll(cx))? {
312            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
313            proto::Dispatched::Upgrade(pending) => {
314                // With no `Send` bound on `I`, we can't try to do
315                // upgrades here. In case a user was trying to use
316                // `upgrade` with this API, send a special
317                // error letting them know about that.
318                pending.manual();
319                Poll::Ready(Ok(()))
320            }
321        }
322    }
323}
324
325// ===== impl Builder
326
327impl Builder {
328    /// Creates a new connection builder.
329    #[inline]
330    pub fn new() -> Builder {
331        Builder {
332            h09_responses: false,
333            h1_writev: None,
334            h1_read_buf_exact_size: None,
335            h1_parser_config: Default::default(),
336            h1_title_case_headers: false,
337            h1_preserve_header_case: false,
338            h1_max_headers: None,
339            #[cfg(feature = "ffi")]
340            h1_preserve_header_order: false,
341            h1_max_buf_size: None,
342        }
343    }
344
345    /// Set whether HTTP/0.9 responses should be tolerated.
346    ///
347    /// Default is false.
348    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
349        self.h09_responses = enabled;
350        self
351    }
352
353    /// Set whether HTTP/1 connections will accept spaces between header names
354    /// and the colon that follow them in responses.
355    ///
356    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
357    /// to say about it:
358    ///
359    /// > No whitespace is allowed between the header field-name and colon. In
360    /// > the past, differences in the handling of such whitespace have led to
361    /// > security vulnerabilities in request routing and response handling. A
362    /// > server MUST reject any received request message that contains
363    /// > whitespace between a header field-name and colon with a response code
364    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
365    /// > response message before forwarding the message downstream.
366    ///
367    /// Default is false.
368    ///
369    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
370    pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
371        self.h1_parser_config
372            .allow_spaces_after_header_name_in_responses(enabled);
373        self
374    }
375
376    /// Set whether HTTP/1 connections will accept obsolete line folding for
377    /// header values.
378    ///
379    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
380    /// parsing.
381    ///
382    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
383    /// to say about it:
384    ///
385    /// > A server that receives an obs-fold in a request message that is not
386    /// > within a message/http container MUST either reject the message by
387    /// > sending a 400 (Bad Request), preferably with a representation
388    /// > explaining that obsolete line folding is unacceptable, or replace
389    /// > each received obs-fold with one or more SP octets prior to
390    /// > interpreting the field value or forwarding the message downstream.
391    ///
392    /// > A proxy or gateway that receives an obs-fold in a response message
393    /// > that is not within a message/http container MUST either discard the
394    /// > message and replace it with a 502 (Bad Gateway) response, preferably
395    /// > with a representation explaining that unacceptable line folding was
396    /// > received, or replace each received obs-fold with one or more SP
397    /// > octets prior to interpreting the field value or forwarding the
398    /// > message downstream.
399    ///
400    /// > A user agent that receives an obs-fold in a response message that is
401    /// > not within a message/http container MUST replace each received
402    /// > obs-fold with one or more SP octets prior to interpreting the field
403    /// > value.
404    ///
405    /// Default is false.
406    ///
407    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
408    pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
409        self.h1_parser_config
410            .allow_obsolete_multiline_headers_in_responses(enabled);
411        self
412    }
413
414    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
415    ///
416    /// If this is enabled and a header line does not start with a valid header
417    /// name, or does not include a colon at all, the line will be silently ignored
418    /// and no error will be reported.
419    ///
420    /// Default is false.
421    pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
422        self.h1_parser_config
423            .ignore_invalid_headers_in_responses(enabled);
424        self
425    }
426
427    /// Set whether HTTP/1 connections should try to use vectored writes,
428    /// or always flatten into a single buffer.
429    ///
430    /// Note that setting this to false may mean more copies of body data,
431    /// but may also improve performance when an IO transport doesn't
432    /// support vectored writes well, such as most TLS implementations.
433    ///
434    /// Setting this to true will force hyper to use queued strategy,
435    /// which may eliminate unnecessary cloning on some TLS backends.
436    ///
437    /// Default is `auto`. In this mode hyper will try to guess which
438    /// mode to use.
439    pub fn writev(&mut self, enabled: bool) -> &mut Builder {
440        self.h1_writev = Some(enabled);
441        self
442    }
443
444    /// Set whether HTTP/1 connections will write header names as title case at
445    /// the socket level.
446    ///
447    /// Default is false.
448    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
449        self.h1_title_case_headers = enabled;
450        self
451    }
452
453    /// Set whether to support preserving original header cases.
454    ///
455    /// Currently, this will record the original cases received, and store them
456    /// in a private extension on the `Response`. It will also look for and use
457    /// such an extension in any provided `Request`.
458    ///
459    /// Since the relevant extension is still private, there is no way to
460    /// interact with the original cases. The only effect this can have now is
461    /// to forward the cases in a proxy-like fashion.
462    ///
463    /// Default is false.
464    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
465        self.h1_preserve_header_case = enabled;
466        self
467    }
468
469    /// Set the maximum number of headers.
470    ///
471    /// When a response is received, the parser will reserve a buffer to store headers for optimal
472    /// performance.
473    ///
474    /// If client receives more headers than the buffer size, the error "message header too large"
475    /// is returned.
476    ///
477    /// Note that headers is allocated on the stack by default, which has higher performance. After
478    /// setting this value, headers will be allocated in heap memory, that is, heap memory
479    /// allocation will occur for each response, and there will be a performance drop of about 5%.
480    ///
481    /// Default is 100.
482    pub fn max_headers(&mut self, val: usize) -> &mut Self {
483        self.h1_max_headers = Some(val);
484        self
485    }
486
487    /// Set whether to support preserving original header order.
488    ///
489    /// Currently, this will record the order in which headers are received, and store this
490    /// ordering in a private extension on the `Response`. It will also look for and use
491    /// such an extension in any provided `Request`.
492    ///
493    /// Default is false.
494    #[cfg(feature = "ffi")]
495    pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
496        self.h1_preserve_header_order = enabled;
497        self
498    }
499
500    /// Sets the exact size of the read buffer to *always* use.
501    ///
502    /// Note that setting this option unsets the `max_buf_size` option.
503    ///
504    /// Default is an adaptive read buffer.
505    pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
506        self.h1_read_buf_exact_size = sz;
507        self.h1_max_buf_size = None;
508        self
509    }
510
511    /// Set the maximum buffer size for the connection.
512    ///
513    /// Default is ~400kb.
514    ///
515    /// Note that setting this option unsets the `read_exact_buf_size` option.
516    ///
517    /// # Panics
518    ///
519    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
520    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
521        assert!(
522            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
523            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
524        );
525
526        self.h1_max_buf_size = Some(max);
527        self.h1_read_buf_exact_size = None;
528        self
529    }
530
531    /// Constructs a connection with the configured options and IO.
532    /// See [`client::conn`](crate::client::conn) for more.
533    ///
534    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
535    /// do nothing.
536    pub fn handshake<T, B>(
537        &self,
538        io: T,
539    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
540    where
541        T: Read + Write + Unpin,
542        B: Body + 'static,
543        B::Data: Send,
544        B::Error: Into<Box<dyn StdError + Send + Sync>>,
545    {
546        let opts = self.clone();
547
548        async move {
549            trace!("client handshake HTTP/1");
550
551            let (tx, rx) = dispatch::channel();
552            let mut conn = proto::Conn::new(io);
553            conn.set_h1_parser_config(opts.h1_parser_config);
554            if let Some(writev) = opts.h1_writev {
555                if writev {
556                    conn.set_write_strategy_queue();
557                } else {
558                    conn.set_write_strategy_flatten();
559                }
560            }
561            if opts.h1_title_case_headers {
562                conn.set_title_case_headers();
563            }
564            if opts.h1_preserve_header_case {
565                conn.set_preserve_header_case();
566            }
567            if let Some(max_headers) = opts.h1_max_headers {
568                conn.set_http1_max_headers(max_headers);
569            }
570            #[cfg(feature = "ffi")]
571            if opts.h1_preserve_header_order {
572                conn.set_preserve_header_order();
573            }
574
575            if opts.h09_responses {
576                conn.set_h09_responses();
577            }
578
579            if let Some(sz) = opts.h1_read_buf_exact_size {
580                conn.set_read_buf_exact_size(sz);
581            }
582            if let Some(max) = opts.h1_max_buf_size {
583                conn.set_max_buf_size(max);
584            }
585            let cd = proto::h1::dispatch::Client::new(rx);
586            let proto = proto::h1::Dispatcher::new(cd, conn);
587
588            Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
589        }
590    }
591}
592
593mod upgrades {
594    use super::{Connection, Context, Future, Parts, Pin, Poll, Read, StdError, Write};
595    use crate::body::Body;
596    use crate::proto::Dispatched;
597    use crate::upgrade::Upgraded;
598    use futures_core::ready;
599    // A future binding a connection with a Service with Upgrade support.
600    //
601    // This type is unnameable outside the crate.
602    #[must_use = "futures do nothing unless polled"]
603    #[allow(missing_debug_implementations)]
604    pub struct UpgradeableConnection<T, B>
605    where
606        T: Read + Write + Unpin + Send + 'static,
607        B: Body + 'static,
608        B::Error: Into<Box<dyn StdError + Send + Sync>>,
609    {
610        pub(super) inner: Option<Connection<T, B>>,
611    }
612
613    impl<I, B> Future for UpgradeableConnection<I, B>
614    where
615        I: Read + Write + Unpin + Send + 'static,
616        B: Body + 'static,
617        B::Data: Send,
618        B::Error: Into<Box<dyn StdError + Send + Sync>>,
619    {
620        type Output = crate::Result<()>;
621
622        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
623            match ready!(Pin::new(
624                &mut self
625                    .inner
626                    .as_mut()
627                    .expect("upgradeable client connection polled after upgrade")
628                    .inner,
629            )
630            .poll(cx))
631            {
632                Ok(Dispatched::Shutdown) => Poll::Ready(Ok(())),
633                Ok(Dispatched::Upgrade(pending)) => {
634                    let Parts { io, read_buf } = self
635                        .inner
636                        .take()
637                        .expect("upgradeable client connection missing after upgrade")
638                        .into_parts();
639                    pending.fulfill(Upgraded::new(io, read_buf));
640                    Poll::Ready(Ok(()))
641                }
642                Err(e) => Poll::Ready(Err(e)),
643            }
644        }
645    }
646}