hyper/proto/h2/
mod.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Buf;
8use futures_core::ready;
9use h2::SendStream;
10use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14use crate::body::Body;
15
16pub(crate) mod ping;
17pub(crate) mod upgrade;
18
19cfg_client! {
20    pub(crate) mod client;
21    pub(crate) use self::client::ClientTask;
22}
23
24cfg_server! {
25    pub(crate) mod server;
26    pub(crate) use self::server::Server;
27}
28
29/// Default initial stream window size defined in HTTP2 spec.
30pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
31
32// List of connection headers from RFC 9110 Section 7.6.1
33//
34// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
35// tested separately.
36static CONNECTION_HEADERS: [HeaderName; 4] = [
37    HeaderName::from_static("keep-alive"),
38    HeaderName::from_static("proxy-connection"),
39    TRANSFER_ENCODING,
40    UPGRADE,
41];
42
43fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
44    for header in &CONNECTION_HEADERS {
45        if headers.remove(header).is_some() {
46            warn!("Connection header illegal in HTTP/2: {}", header.as_str());
47        }
48    }
49
50    if is_request {
51        if headers
52            .get(TE)
53            .map_or(false, |te_header| te_header != "trailers")
54        {
55            warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
56            headers.remove(TE);
57        }
58    } else if headers.remove(TE).is_some() {
59        warn!("TE headers illegal in HTTP/2 responses");
60    }
61
62    if let Some(header) = headers.remove(CONNECTION) {
63        warn!(
64            "Connection header illegal in HTTP/2: {}",
65            CONNECTION.as_str()
66        );
67        let header_contents = header.to_str().unwrap();
68
69        // A `Connection` header may have a comma-separated list of names of other headers that
70        // are meant for only this specific connection.
71        //
72        // Iterate these names and remove them as headers. Connection-specific headers are
73        // forbidden in HTTP2, as that information has been moved into frame types of the h2
74        // protocol.
75        for name in header_contents.split(',') {
76            let name = name.trim();
77            headers.remove(name);
78        }
79    }
80}
81
82// body adapters used by both Client and Server
83
84pin_project! {
85    pub(crate) struct PipeToSendStream<S>
86    where
87        S: Body,
88    {
89        body_tx: SendStream<SendBuf<S::Data>>,
90        data_done: bool,
91        #[pin]
92        stream: S,
93    }
94}
95
96impl<S> PipeToSendStream<S>
97where
98    S: Body,
99{
100    fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
101        PipeToSendStream {
102            body_tx: tx,
103            data_done: false,
104            stream,
105        }
106    }
107}
108
109impl<S> Future for PipeToSendStream<S>
110where
111    S: Body,
112    S::Error: Into<Box<dyn StdError + Send + Sync>>,
113{
114    type Output = crate::Result<()>;
115
116    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117        let mut me = self.project();
118        loop {
119            // we don't have the next chunk of data yet, so just reserve 1 byte to make
120            // sure there's some capacity available. h2 will handle the capacity management
121            // for the actual body chunk.
122            me.body_tx.reserve_capacity(1);
123
124            if me.body_tx.capacity() == 0 {
125                loop {
126                    match ready!(me.body_tx.poll_capacity(cx)) {
127                        Some(Ok(0)) => {}
128                        Some(Ok(_)) => break,
129                        Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
130                        None => {
131                            // None means the stream is no longer in a
132                            // streaming state, we either finished it
133                            // somehow, or the remote reset us.
134                            return Poll::Ready(Err(crate::Error::new_body_write(
135                                "send stream capacity unexpectedly closed",
136                            )));
137                        }
138                    }
139                }
140            } else if let Poll::Ready(reason) = me
141                .body_tx
142                .poll_reset(cx)
143                .map_err(crate::Error::new_body_write)?
144            {
145                debug!("stream received RST_STREAM: {:?}", reason);
146                return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
147            }
148
149            match ready!(me.stream.as_mut().poll_frame(cx)) {
150                Some(Ok(frame)) => {
151                    if frame.is_data() {
152                        let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
153                        let is_eos = me.stream.is_end_stream();
154                        trace!(
155                            "send body chunk: {} bytes, eos={}",
156                            chunk.remaining(),
157                            is_eos,
158                        );
159
160                        let buf = SendBuf::Buf(chunk);
161                        me.body_tx
162                            .send_data(buf, is_eos)
163                            .map_err(crate::Error::new_body_write)?;
164
165                        if is_eos {
166                            return Poll::Ready(Ok(()));
167                        }
168                    } else if frame.is_trailers() {
169                        // no more DATA, so give any capacity back
170                        me.body_tx.reserve_capacity(0);
171                        me.body_tx
172                            .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
173                            .map_err(crate::Error::new_body_write)?;
174                        return Poll::Ready(Ok(()));
175                    } else {
176                        trace!("discarding unknown frame");
177                        // loop again
178                    }
179                }
180                Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
181                None => {
182                    // no more frames means we're done here
183                    // but at this point, we haven't sent an EOS DATA, or
184                    // any trailers, so send an empty EOS DATA.
185                    return Poll::Ready(me.body_tx.send_eos_frame());
186                }
187            }
188        }
189    }
190}
191
192trait SendStreamExt {
193    fn on_user_err<E>(&mut self, err: E) -> crate::Error
194    where
195        E: Into<Box<dyn std::error::Error + Send + Sync>>;
196    fn send_eos_frame(&mut self) -> crate::Result<()>;
197}
198
199impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
200    fn on_user_err<E>(&mut self, err: E) -> crate::Error
201    where
202        E: Into<Box<dyn std::error::Error + Send + Sync>>,
203    {
204        let err = crate::Error::new_user_body(err);
205        debug!("send body user stream error: {}", err);
206        self.send_reset(err.h2_reason());
207        err
208    }
209
210    fn send_eos_frame(&mut self) -> crate::Result<()> {
211        trace!("send body eos");
212        self.send_data(SendBuf::None, true)
213            .map_err(crate::Error::new_body_write)
214    }
215}
216
217#[repr(usize)]
218enum SendBuf<B> {
219    Buf(B),
220    Cursor(Cursor<Box<[u8]>>),
221    None,
222}
223
224impl<B: Buf> Buf for SendBuf<B> {
225    #[inline]
226    fn remaining(&self) -> usize {
227        match *self {
228            Self::Buf(ref b) => b.remaining(),
229            Self::Cursor(ref c) => Buf::remaining(c),
230            Self::None => 0,
231        }
232    }
233
234    #[inline]
235    fn chunk(&self) -> &[u8] {
236        match *self {
237            Self::Buf(ref b) => b.chunk(),
238            Self::Cursor(ref c) => c.chunk(),
239            Self::None => &[],
240        }
241    }
242
243    #[inline]
244    fn advance(&mut self, cnt: usize) {
245        match *self {
246            Self::Buf(ref mut b) => b.advance(cnt),
247            Self::Cursor(ref mut c) => c.advance(cnt),
248            Self::None => {}
249        }
250    }
251
252    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
253        match *self {
254            Self::Buf(ref b) => b.chunks_vectored(dst),
255            Self::Cursor(ref c) => c.chunks_vectored(dst),
256            Self::None => 0,
257        }
258    }
259}