Skip to main content

hyper/proto/h2/
mod.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Buf;
8use futures_core::ready;
9use h2::SendStream;
10use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14use crate::body::Body;
15
16pub(crate) mod ping;
17pub(crate) mod upgrade;
18
19cfg_client! {
20    pub(crate) mod client;
21    pub(crate) use self::client::ClientTask;
22}
23
24cfg_server! {
25    pub(crate) mod server;
26    pub(crate) use self::server::Server;
27}
28
29/// Default initial stream window size defined in HTTP2 spec.
30pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
31
32// List of connection headers from RFC 9110 Section 7.6.1
33//
34// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
35// tested separately.
36static CONNECTION_HEADERS: [HeaderName; 4] = [
37    HeaderName::from_static("keep-alive"),
38    HeaderName::from_static("proxy-connection"),
39    TRANSFER_ENCODING,
40    UPGRADE,
41];
42
43fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
44    for header in &CONNECTION_HEADERS {
45        if headers.remove(header).is_some() {
46            warn!("Connection header illegal in HTTP/2: {}", header.as_str());
47        }
48    }
49
50    if is_request {
51        if headers
52            .get(TE)
53            .map_or(false, |te_header| te_header != "trailers")
54        {
55            warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
56            headers.remove(TE);
57        }
58    } else if headers.remove(TE).is_some() {
59        warn!("TE headers illegal in HTTP/2 responses");
60    }
61
62    if let Some(header) = headers.remove(CONNECTION) {
63        warn!(
64            "Connection header illegal in HTTP/2: {}",
65            CONNECTION.as_str()
66        );
67        // A `Connection` header may have a comma-separated list of names of other headers that
68        // are meant for only this specific connection.
69        //
70        // Iterate these names and remove them as headers. Connection-specific headers are
71        // forbidden in HTTP2, as that information has been moved into frame types of the h2
72        // protocol.
73        if let Ok(header_contents) = header.to_str() {
74            for name in header_contents.split(',') {
75                let name = name.trim();
76                headers.remove(name);
77            }
78        }
79    }
80}
81
82// body adapters used by both Client and Server
83
84pin_project! {
85    pub(crate) struct PipeToSendStream<S>
86    where
87        S: Body,
88    {
89        body_tx: SendStream<SendBuf<S::Data>>,
90        data_done: bool,
91        // A data chunk that has been polled from the body but is still waiting
92        // for stream-level capacity before it can be shipped. Stored here so
93        // it survives across `Poll::Pending` returns from `poll_capacity`; if
94        // we left the chunk in a local, it would be dropped on every repoll.
95        buffered_data: Option<Peeked<S::Data>>,
96        #[pin]
97        stream: S,
98    }
99}
100
101struct Peeked<D> {
102    data: D,
103    is_eos: bool,
104}
105
106impl<S> PipeToSendStream<S>
107where
108    S: Body,
109{
110    fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
111        PipeToSendStream {
112            body_tx: tx,
113            data_done: false,
114            buffered_data: None,
115            stream,
116        }
117    }
118
119    #[cfg(feature = "client")]
120    fn send_reset(self: Pin<&mut Self>, reason: h2::Reason) {
121        self.project().body_tx.send_reset(reason);
122    }
123}
124
125impl<S> Future for PipeToSendStream<S>
126where
127    S: Body,
128    S::Error: Into<Box<dyn StdError + Send + Sync>>,
129{
130    type Output = crate::Result<()>;
131
132    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        let mut me = self.project();
134        loop {
135            // Register for RST_STREAM notification while we wait for the next
136            // body chunk or for send capacity, so the task wakes up if the
137            // peer resets the stream.
138            if let Poll::Ready(reason) = me
139                .body_tx
140                .poll_reset(cx)
141                .map_err(crate::Error::new_body_write)?
142            {
143                debug!("stream received RST_STREAM: {:?}", reason);
144                return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
145            }
146
147            // If a previously-polled chunk is still waiting for stream-level
148            // send capacity, drive that to completion before touching the
149            // body again.
150            if me.buffered_data.is_some() {
151                while me.body_tx.capacity() == 0 {
152                    match ready!(me.body_tx.poll_capacity(cx)) {
153                        Some(Ok(0)) => {}
154                        Some(Ok(_)) => break,
155                        Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
156                        None => {
157                            // None means the stream is no longer in a
158                            // streaming state, we either finished it
159                            // somehow, or the remote reset us.
160                            return Poll::Ready(Err(crate::Error::new_body_write(
161                                "send stream capacity unexpectedly closed",
162                            )));
163                        }
164                    }
165                }
166
167                let peeked = me.buffered_data.take().expect("checked is_some above");
168                let buf = SendBuf::Buf(peeked.data);
169                me.body_tx
170                    .send_data(buf, peeked.is_eos)
171                    .map_err(crate::Error::new_body_write)?;
172
173                if peeked.is_eos {
174                    return Poll::Ready(Ok(()));
175                }
176                continue;
177            }
178
179            // Poll for the next body frame *before* reserving any connection
180            // flow-control capacity. Reserving capacity speculatively (even a
181            // single byte) pins that capacity on the connection-level window,
182            // which can deadlock a second stream when talking to peers that
183            // only emit WINDOW_UPDATE once their receive window is fully
184            // exhausted. See #4003.
185            match ready!(me.stream.as_mut().poll_frame(cx)) {
186                Some(Ok(frame)) => {
187                    if frame.is_data() {
188                        let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
189                        let is_eos = me.stream.is_end_stream();
190                        let len = chunk.remaining();
191                        trace!("send body chunk: {} bytes, eos={}", len, is_eos);
192
193                        if len == 0 {
194                            // Zero-length data frames need no capacity; send
195                            // them straight through so trailing empty frames
196                            // (e.g. an explicit end-of-stream marker) are
197                            // delivered.
198                            let buf = SendBuf::Buf(chunk);
199                            me.body_tx
200                                .send_data(buf, is_eos)
201                                .map_err(crate::Error::new_body_write)?;
202
203                            if is_eos {
204                                return Poll::Ready(Ok(()));
205                            }
206                            continue;
207                        }
208
209                        // Reserve exactly the chunk size so we never pin more
210                        // connection-level flow-control window than we are
211                        // about to consume. Stash the chunk in `self` so it
212                        // survives the upcoming `poll_capacity` wait even if
213                        // it returns `Poll::Pending`.
214                        me.body_tx.reserve_capacity(len);
215                        *me.buffered_data = Some(Peeked {
216                            data: chunk,
217                            is_eos,
218                        });
219                    } else if frame.is_trailers() {
220                        // no more DATA, so give any capacity back
221                        me.body_tx.reserve_capacity(0);
222                        me.body_tx
223                            .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
224                            .map_err(crate::Error::new_body_write)?;
225                        return Poll::Ready(Ok(()));
226                    } else {
227                        trace!("discarding unknown frame");
228                        // loop again
229                    }
230                }
231                Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
232                None => {
233                    // no more frames means we're done here
234                    // but at this point, we haven't sent an EOS DATA, or
235                    // any trailers, so send an empty EOS DATA.
236                    return Poll::Ready(me.body_tx.send_eos_frame());
237                }
238            }
239        }
240    }
241}
242
243trait SendStreamExt {
244    fn on_user_err<E>(&mut self, err: E) -> crate::Error
245    where
246        E: Into<Box<dyn std::error::Error + Send + Sync>>;
247    fn send_eos_frame(&mut self) -> crate::Result<()>;
248}
249
250impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
251    fn on_user_err<E>(&mut self, err: E) -> crate::Error
252    where
253        E: Into<Box<dyn std::error::Error + Send + Sync>>,
254    {
255        let err = crate::Error::new_user_body(err);
256        debug!("send body user stream error: {}", err);
257        self.send_reset(err.h2_reason());
258        err
259    }
260
261    fn send_eos_frame(&mut self) -> crate::Result<()> {
262        trace!("send body eos");
263        self.send_data(SendBuf::None, true)
264            .map_err(crate::Error::new_body_write)
265    }
266}
267
268#[repr(usize)]
269enum SendBuf<B> {
270    Buf(B),
271    Cursor(Cursor<Box<[u8]>>),
272    None,
273}
274
275impl<B: Buf> Buf for SendBuf<B> {
276    #[inline]
277    fn remaining(&self) -> usize {
278        match *self {
279            Self::Buf(ref b) => b.remaining(),
280            Self::Cursor(ref c) => Buf::remaining(c),
281            Self::None => 0,
282        }
283    }
284
285    #[inline]
286    fn chunk(&self) -> &[u8] {
287        match *self {
288            Self::Buf(ref b) => b.chunk(),
289            Self::Cursor(ref c) => c.chunk(),
290            Self::None => &[],
291        }
292    }
293
294    #[inline]
295    fn advance(&mut self, cnt: usize) {
296        match *self {
297            Self::Buf(ref mut b) => b.advance(cnt),
298            Self::Cursor(ref mut c) => c.advance(cnt),
299            Self::None => {}
300        }
301    }
302
303    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
304        match *self {
305            Self::Buf(ref b) => b.chunks_vectored(dst),
306            Self::Cursor(ref c) => c.chunks_vectored(dst),
307            Self::None => 0,
308        }
309    }
310}