hyper/client/conn/http1.rs
1//! HTTP/1 client connections.
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use futures_core::ready;
12use http::{Request, Response};
13use httparse::ParserConfig;
14
15use super::super::dispatch::{self, TrySendError};
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18
19type Dispatcher<T, B> =
20 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
21
22/// The sender side of an established connection.
23pub struct SendRequest<B> {
24 dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
25}
26
27/// Deconstructed parts of a `Connection`.
28///
29/// This allows taking apart a `Connection` at a later time, in order to
30/// reclaim the IO object, and additional related pieces.
31#[derive(Debug)]
32#[non_exhaustive]
33pub struct Parts<T> {
34 /// The original IO object used in the handshake.
35 pub io: T,
36 /// A buffer of bytes that have been read but not processed as HTTP.
37 ///
38 /// For instance, if the `Connection` is used for an HTTP upgrade request,
39 /// it is possible the server sent back the first bytes of the new protocol
40 /// along with the response upgrade.
41 ///
42 /// You will want to check for any existing bytes if you plan to continue
43 /// communicating on the IO object.
44 pub read_buf: Bytes,
45}
46
47/// A future that processes all HTTP state for the IO object.
48///
49/// In most cases, this should just be spawned into an executor, so that it
50/// can process incoming and outgoing messages, notice hangups, and the like.
51///
52/// Instances of this type are typically created via the [`handshake`] function.
53///
54/// # Drop behavior
55///
56/// Dropping the `Connection` will close the underlying IO resource.
57/// Any in-flight requests that have not received a response will be
58/// interrupted. If graceful shutdown is desired, poll the connection
59/// until it completes instead of dropping.
60#[must_use = "futures do nothing unless polled"]
61pub struct Connection<T, B>
62where
63 T: Read + Write,
64 B: Body + 'static,
65{
66 inner: Dispatcher<T, B>,
67}
68
69impl<T, B> Connection<T, B>
70where
71 T: Read + Write + Unpin,
72 B: Body + 'static,
73 B::Error: Into<Box<dyn StdError + Send + Sync>>,
74{
75 /// Return the inner IO object, and additional information.
76 ///
77 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
78 pub fn into_parts(self) -> Parts<T> {
79 let (io, read_buf, _) = self.inner.into_inner();
80 Parts { io, read_buf }
81 }
82
83 /// Poll the connection for completion, but without calling `shutdown`
84 /// on the underlying IO.
85 ///
86 /// This is useful to allow running a connection while doing an HTTP
87 /// upgrade. Once the upgrade is completed, the connection would be "done",
88 /// but it is not desired to actually shutdown the IO object. Instead you
89 /// would take it back using `into_parts`.
90 ///
91 /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
92 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
93 /// to work with this function; or use the `without_shutdown` wrapper.
94 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
95 self.inner.poll_without_shutdown(cx)
96 }
97
98 /// Prevent shutdown of the underlying IO object at the end of service the request,
99 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
100 pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
101 let mut conn = Some(self);
102 crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
103 ready!(conn
104 .as_mut()
105 .expect("client connection polled after completion")
106 .poll_without_shutdown(cx))?;
107 Poll::Ready(Ok(conn
108 .take()
109 .expect("client connection missing before completion")
110 .into_parts()))
111 })
112 .await
113 }
114}
115
116/// A builder to configure an HTTP connection.
117///
118/// After setting options, the builder is used to create a handshake future.
119///
120/// **Note**: The default values of options are *not considered stable*. They
121/// are subject to change at any time.
122#[derive(Clone, Debug)]
123pub struct Builder {
124 h09_responses: bool,
125 h1_parser_config: ParserConfig,
126 h1_writev: Option<bool>,
127 h1_title_case_headers: bool,
128 h1_preserve_header_case: bool,
129 h1_max_headers: Option<usize>,
130 #[cfg(feature = "ffi")]
131 h1_preserve_header_order: bool,
132 h1_read_buf_exact_size: Option<usize>,
133 h1_max_buf_size: Option<usize>,
134}
135
136/// Returns a handshake future over some IO.
137///
138/// This is a shortcut for `Builder::new().handshake(io)`.
139/// See [`client::conn`](crate::client::conn) for more.
140pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
141where
142 T: Read + Write + Unpin,
143 B: Body + 'static,
144 B::Data: Send,
145 B::Error: Into<Box<dyn StdError + Send + Sync>>,
146{
147 Builder::new().handshake(io).await
148}
149
150// ===== impl SendRequest
151
152impl<B> SendRequest<B> {
153 /// Polls to determine whether this sender can be used yet for a request.
154 ///
155 /// If the associated connection is closed, this returns an Error.
156 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
157 self.dispatch.poll_ready(cx)
158 }
159
160 /// Waits until the dispatcher is ready.
161 ///
162 /// If the associated connection is closed, this returns an Error.
163 pub async fn ready(&mut self) -> crate::Result<()> {
164 crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
165 }
166
167 /// Checks if the connection is currently ready to send a request.
168 ///
169 /// # Note
170 ///
171 /// This is mostly a hint. Due to inherent latency of networks, it is
172 /// possible that even after checking this is ready, sending a request
173 /// may still fail because the connection was closed in the meantime.
174 pub fn is_ready(&self) -> bool {
175 self.dispatch.is_ready()
176 }
177
178 /// Checks if the connection side has been closed.
179 pub fn is_closed(&self) -> bool {
180 self.dispatch.is_closed()
181 }
182}
183
184impl<B> SendRequest<B>
185where
186 B: Body + 'static,
187{
188 /// Sends a `Request` on the associated connection.
189 ///
190 /// Returns a future that if successful, yields the `Response`.
191 ///
192 /// `req` must have a `Host` header.
193 ///
194 /// # Uri
195 ///
196 /// The `Uri` of the request is serialized as-is.
197 ///
198 /// - Usually you want origin-form (`/path?query`).
199 /// - For sending to an HTTP proxy, you want to send in absolute-form
200 /// (`https://hyper.rs/guides`).
201 ///
202 /// This is however not enforced or validated and it is up to the user
203 /// of this method to ensure the `Uri` is correct for their intended purpose.
204 ///
205 /// # Cancel safety
206 ///
207 /// Dropping the returned future is the supported way to cancel an
208 /// in-flight HTTP/1 request. Because HTTP/1 has no in-protocol way to
209 /// abort a single request without affecting the shared connection,
210 /// hyper closes the underlying connection when a request future is
211 /// dropped before completion. Any subsequent calls on the same
212 /// [`SendRequest`] will return a `canceled` error.
213 pub fn send_request(
214 &mut self,
215 req: Request<B>,
216 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
217 let sent = self.dispatch.send(req);
218
219 async move {
220 match sent {
221 Ok(rx) => match rx.await {
222 Ok(Ok(resp)) => Ok(resp),
223 Ok(Err(err)) => Err(err),
224 // this is definite bug if it happens, but it shouldn't happen!
225 Err(_canceled) => panic!("dispatch dropped without returning error"),
226 },
227 Err(_req) => {
228 debug!("connection was not ready");
229 Err(crate::Error::new_canceled().with("connection was not ready"))
230 }
231 }
232 }
233 }
234
235 /// Sends a `Request` on the associated connection.
236 ///
237 /// Returns a future that if successful, yields the `Response`.
238 ///
239 /// # Error
240 ///
241 /// If there was an error before trying to serialize the request to the
242 /// connection, the message will be returned as part of this error.
243 pub fn try_send_request(
244 &mut self,
245 req: Request<B>,
246 ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
247 let sent = self.dispatch.try_send(req);
248 async move {
249 match sent {
250 Ok(rx) => match rx.await {
251 Ok(Ok(res)) => Ok(res),
252 Ok(Err(err)) => Err(err),
253 // this is definite bug if it happens, but it shouldn't happen!
254 Err(_) => panic!("dispatch dropped without returning error"),
255 },
256 Err(req) => {
257 debug!("connection was not ready");
258 let error = crate::Error::new_canceled().with("connection was not ready");
259 Err(TrySendError {
260 error,
261 message: Some(req),
262 })
263 }
264 }
265 }
266 }
267}
268
269impl<B> fmt::Debug for SendRequest<B> {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.debug_struct("SendRequest").finish()
272 }
273}
274
275// ===== impl Connection
276
277impl<T, B> Connection<T, B>
278where
279 T: Read + Write + Unpin + Send,
280 B: Body + 'static,
281 B::Error: Into<Box<dyn StdError + Send + Sync>>,
282{
283 /// Enable this connection to support higher-level HTTP upgrades.
284 ///
285 /// See [the `upgrade` module](crate::upgrade) for more.
286 pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
287 upgrades::UpgradeableConnection { inner: Some(self) }
288 }
289}
290
291impl<T, B> fmt::Debug for Connection<T, B>
292where
293 T: Read + Write + fmt::Debug,
294 B: Body + 'static,
295{
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 f.debug_struct("Connection").finish()
298 }
299}
300
301impl<T, B> Future for Connection<T, B>
302where
303 T: Read + Write + Unpin,
304 B: Body + 'static,
305 B::Data: Send,
306 B::Error: Into<Box<dyn StdError + Send + Sync>>,
307{
308 type Output = crate::Result<()>;
309
310 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
311 match ready!(Pin::new(&mut self.inner).poll(cx))? {
312 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
313 proto::Dispatched::Upgrade(pending) => {
314 // With no `Send` bound on `I`, we can't try to do
315 // upgrades here. In case a user was trying to use
316 // `upgrade` with this API, send a special
317 // error letting them know about that.
318 pending.manual();
319 Poll::Ready(Ok(()))
320 }
321 }
322 }
323}
324
325// ===== impl Builder
326
327impl Builder {
328 /// Creates a new connection builder.
329 #[inline]
330 pub fn new() -> Builder {
331 Builder {
332 h09_responses: false,
333 h1_writev: None,
334 h1_read_buf_exact_size: None,
335 h1_parser_config: Default::default(),
336 h1_title_case_headers: false,
337 h1_preserve_header_case: false,
338 h1_max_headers: None,
339 #[cfg(feature = "ffi")]
340 h1_preserve_header_order: false,
341 h1_max_buf_size: None,
342 }
343 }
344
345 /// Set whether HTTP/0.9 responses should be tolerated.
346 ///
347 /// Default is false.
348 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
349 self.h09_responses = enabled;
350 self
351 }
352
353 /// Set whether HTTP/1 connections will accept spaces between header names
354 /// and the colon that follow them in responses.
355 ///
356 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
357 /// to say about it:
358 ///
359 /// > No whitespace is allowed between the header field-name and colon. In
360 /// > the past, differences in the handling of such whitespace have led to
361 /// > security vulnerabilities in request routing and response handling. A
362 /// > server MUST reject any received request message that contains
363 /// > whitespace between a header field-name and colon with a response code
364 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
365 /// > response message before forwarding the message downstream.
366 ///
367 /// Default is false.
368 ///
369 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
370 pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
371 self.h1_parser_config
372 .allow_spaces_after_header_name_in_responses(enabled);
373 self
374 }
375
376 /// Set whether HTTP/1 connections will accept obsolete line folding for
377 /// header values.
378 ///
379 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
380 /// parsing.
381 ///
382 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
383 /// to say about it:
384 ///
385 /// > A server that receives an obs-fold in a request message that is not
386 /// > within a message/http container MUST either reject the message by
387 /// > sending a 400 (Bad Request), preferably with a representation
388 /// > explaining that obsolete line folding is unacceptable, or replace
389 /// > each received obs-fold with one or more SP octets prior to
390 /// > interpreting the field value or forwarding the message downstream.
391 ///
392 /// > A proxy or gateway that receives an obs-fold in a response message
393 /// > that is not within a message/http container MUST either discard the
394 /// > message and replace it with a 502 (Bad Gateway) response, preferably
395 /// > with a representation explaining that unacceptable line folding was
396 /// > received, or replace each received obs-fold with one or more SP
397 /// > octets prior to interpreting the field value or forwarding the
398 /// > message downstream.
399 ///
400 /// > A user agent that receives an obs-fold in a response message that is
401 /// > not within a message/http container MUST replace each received
402 /// > obs-fold with one or more SP octets prior to interpreting the field
403 /// > value.
404 ///
405 /// Default is false.
406 ///
407 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
408 pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
409 self.h1_parser_config
410 .allow_obsolete_multiline_headers_in_responses(enabled);
411 self
412 }
413
414 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
415 ///
416 /// If this is enabled and a header line does not start with a valid header
417 /// name, or does not include a colon at all, the line will be silently ignored
418 /// and no error will be reported.
419 ///
420 /// Default is false.
421 pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
422 self.h1_parser_config
423 .ignore_invalid_headers_in_responses(enabled);
424 self
425 }
426
427 /// Set whether HTTP/1 connections should try to use vectored writes,
428 /// or always flatten into a single buffer.
429 ///
430 /// Note that setting this to false may mean more copies of body data,
431 /// but may also improve performance when an IO transport doesn't
432 /// support vectored writes well, such as most TLS implementations.
433 ///
434 /// Setting this to true will force hyper to use queued strategy,
435 /// which may eliminate unnecessary cloning on some TLS backends.
436 ///
437 /// Default is `auto`. In this mode hyper will try to guess which
438 /// mode to use.
439 pub fn writev(&mut self, enabled: bool) -> &mut Builder {
440 self.h1_writev = Some(enabled);
441 self
442 }
443
444 /// Set whether HTTP/1 connections will write header names as title case at
445 /// the socket level.
446 ///
447 /// Default is false.
448 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
449 self.h1_title_case_headers = enabled;
450 self
451 }
452
453 /// Set whether to support preserving original header cases.
454 ///
455 /// Currently, this will record the original cases received, and store them
456 /// in a private extension on the `Response`. It will also look for and use
457 /// such an extension in any provided `Request`.
458 ///
459 /// Since the relevant extension is still private, there is no way to
460 /// interact with the original cases. The only effect this can have now is
461 /// to forward the cases in a proxy-like fashion.
462 ///
463 /// Default is false.
464 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
465 self.h1_preserve_header_case = enabled;
466 self
467 }
468
469 /// Set the maximum number of headers.
470 ///
471 /// When a response is received, the parser will reserve a buffer to store headers for optimal
472 /// performance.
473 ///
474 /// If client receives more headers than the buffer size, the error "message header too large"
475 /// is returned.
476 ///
477 /// Note that headers is allocated on the stack by default, which has higher performance. After
478 /// setting this value, headers will be allocated in heap memory, that is, heap memory
479 /// allocation will occur for each response, and there will be a performance drop of about 5%.
480 ///
481 /// Default is 100.
482 pub fn max_headers(&mut self, val: usize) -> &mut Self {
483 self.h1_max_headers = Some(val);
484 self
485 }
486
487 /// Set whether to support preserving original header order.
488 ///
489 /// Currently, this will record the order in which headers are received, and store this
490 /// ordering in a private extension on the `Response`. It will also look for and use
491 /// such an extension in any provided `Request`.
492 ///
493 /// Default is false.
494 #[cfg(feature = "ffi")]
495 pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
496 self.h1_preserve_header_order = enabled;
497 self
498 }
499
500 /// Sets the exact size of the read buffer to *always* use.
501 ///
502 /// Note that setting this option unsets the `max_buf_size` option.
503 ///
504 /// Default is an adaptive read buffer.
505 pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
506 self.h1_read_buf_exact_size = sz;
507 self.h1_max_buf_size = None;
508 self
509 }
510
511 /// Set the maximum buffer size for the connection.
512 ///
513 /// Default is ~400kb.
514 ///
515 /// Note that setting this option unsets the `read_exact_buf_size` option.
516 ///
517 /// # Panics
518 ///
519 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
520 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
521 assert!(
522 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
523 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
524 );
525
526 self.h1_max_buf_size = Some(max);
527 self.h1_read_buf_exact_size = None;
528 self
529 }
530
531 /// Constructs a connection with the configured options and IO.
532 /// See [`client::conn`](crate::client::conn) for more.
533 ///
534 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
535 /// do nothing.
536 pub fn handshake<T, B>(
537 &self,
538 io: T,
539 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
540 where
541 T: Read + Write + Unpin,
542 B: Body + 'static,
543 B::Data: Send,
544 B::Error: Into<Box<dyn StdError + Send + Sync>>,
545 {
546 let opts = self.clone();
547
548 async move {
549 trace!("client handshake HTTP/1");
550
551 let (tx, rx) = dispatch::channel();
552 let mut conn = proto::Conn::new(io);
553 conn.set_h1_parser_config(opts.h1_parser_config);
554 if let Some(writev) = opts.h1_writev {
555 if writev {
556 conn.set_write_strategy_queue();
557 } else {
558 conn.set_write_strategy_flatten();
559 }
560 }
561 if opts.h1_title_case_headers {
562 conn.set_title_case_headers();
563 }
564 if opts.h1_preserve_header_case {
565 conn.set_preserve_header_case();
566 }
567 if let Some(max_headers) = opts.h1_max_headers {
568 conn.set_http1_max_headers(max_headers);
569 }
570 #[cfg(feature = "ffi")]
571 if opts.h1_preserve_header_order {
572 conn.set_preserve_header_order();
573 }
574
575 if opts.h09_responses {
576 conn.set_h09_responses();
577 }
578
579 if let Some(sz) = opts.h1_read_buf_exact_size {
580 conn.set_read_buf_exact_size(sz);
581 }
582 if let Some(max) = opts.h1_max_buf_size {
583 conn.set_max_buf_size(max);
584 }
585 let cd = proto::h1::dispatch::Client::new(rx);
586 let proto = proto::h1::Dispatcher::new(cd, conn);
587
588 Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
589 }
590 }
591}
592
593mod upgrades {
594 use super::{Connection, Context, Future, Parts, Pin, Poll, Read, StdError, Write};
595 use crate::body::Body;
596 use crate::proto::Dispatched;
597 use crate::upgrade::Upgraded;
598 use futures_core::ready;
599 // A future binding a connection with a Service with Upgrade support.
600 //
601 // This type is unnameable outside the crate.
602 #[must_use = "futures do nothing unless polled"]
603 #[allow(missing_debug_implementations)]
604 pub struct UpgradeableConnection<T, B>
605 where
606 T: Read + Write + Unpin + Send + 'static,
607 B: Body + 'static,
608 B::Error: Into<Box<dyn StdError + Send + Sync>>,
609 {
610 pub(super) inner: Option<Connection<T, B>>,
611 }
612
613 impl<I, B> Future for UpgradeableConnection<I, B>
614 where
615 I: Read + Write + Unpin + Send + 'static,
616 B: Body + 'static,
617 B::Data: Send,
618 B::Error: Into<Box<dyn StdError + Send + Sync>>,
619 {
620 type Output = crate::Result<()>;
621
622 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
623 match ready!(Pin::new(
624 &mut self
625 .inner
626 .as_mut()
627 .expect("upgradeable client connection polled after upgrade")
628 .inner,
629 )
630 .poll(cx))
631 {
632 Ok(Dispatched::Shutdown) => Poll::Ready(Ok(())),
633 Ok(Dispatched::Upgrade(pending)) => {
634 let Parts { io, read_buf } = self
635 .inner
636 .take()
637 .expect("upgradeable client connection missing after upgrade")
638 .into_parts();
639 pending.fulfill(Upgraded::new(io, read_buf));
640 Poll::Ready(Ok(()))
641 }
642 Err(e) => Poll::Ready(Err(e)),
643 }
644 }
645 }
646}