1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Buf;
8use futures_core::ready;
9use h2::SendStream;
10use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14use crate::body::Body;
15
16pub(crate) mod ping;
17pub(crate) mod upgrade;
18
19cfg_client! {
20 pub(crate) mod client;
21 pub(crate) use self::client::ClientTask;
22}
23
24cfg_server! {
25 pub(crate) mod server;
26 pub(crate) use self::server::Server;
27}
28
29pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
31
32static CONNECTION_HEADERS: [HeaderName; 4] = [
37 HeaderName::from_static("keep-alive"),
38 HeaderName::from_static("proxy-connection"),
39 TRANSFER_ENCODING,
40 UPGRADE,
41];
42
43fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
44 for header in &CONNECTION_HEADERS {
45 if headers.remove(header).is_some() {
46 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
47 }
48 }
49
50 if is_request {
51 if headers
52 .get(TE)
53 .map_or(false, |te_header| te_header != "trailers")
54 {
55 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
56 headers.remove(TE);
57 }
58 } else if headers.remove(TE).is_some() {
59 warn!("TE headers illegal in HTTP/2 responses");
60 }
61
62 if let Some(header) = headers.remove(CONNECTION) {
63 warn!(
64 "Connection header illegal in HTTP/2: {}",
65 CONNECTION.as_str()
66 );
67 if let Ok(header_contents) = header.to_str() {
74 for name in header_contents.split(',') {
75 let name = name.trim();
76 headers.remove(name);
77 }
78 }
79 }
80}
81
82pin_project! {
85 pub(crate) struct PipeToSendStream<S>
86 where
87 S: Body,
88 {
89 body_tx: SendStream<SendBuf<S::Data>>,
90 data_done: bool,
91 buffered_data: Option<Peeked<S::Data>>,
96 #[pin]
97 stream: S,
98 }
99}
100
101struct Peeked<D> {
102 data: D,
103 is_eos: bool,
104}
105
106impl<S> PipeToSendStream<S>
107where
108 S: Body,
109{
110 fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
111 PipeToSendStream {
112 body_tx: tx,
113 data_done: false,
114 buffered_data: None,
115 stream,
116 }
117 }
118
119 #[cfg(feature = "client")]
120 fn send_reset(self: Pin<&mut Self>, reason: h2::Reason) {
121 self.project().body_tx.send_reset(reason);
122 }
123}
124
125impl<S> Future for PipeToSendStream<S>
126where
127 S: Body,
128 S::Error: Into<Box<dyn StdError + Send + Sync>>,
129{
130 type Output = crate::Result<()>;
131
132 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 let mut me = self.project();
134 loop {
135 if let Poll::Ready(reason) = me
139 .body_tx
140 .poll_reset(cx)
141 .map_err(crate::Error::new_body_write)?
142 {
143 debug!("stream received RST_STREAM: {:?}", reason);
144 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
145 }
146
147 if me.buffered_data.is_some() {
151 while me.body_tx.capacity() == 0 {
152 match ready!(me.body_tx.poll_capacity(cx)) {
153 Some(Ok(0)) => {}
154 Some(Ok(_)) => break,
155 Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
156 None => {
157 return Poll::Ready(Err(crate::Error::new_body_write(
161 "send stream capacity unexpectedly closed",
162 )));
163 }
164 }
165 }
166
167 let peeked = me.buffered_data.take().expect("checked is_some above");
168 let buf = SendBuf::Buf(peeked.data);
169 me.body_tx
170 .send_data(buf, peeked.is_eos)
171 .map_err(crate::Error::new_body_write)?;
172
173 if peeked.is_eos {
174 return Poll::Ready(Ok(()));
175 }
176 continue;
177 }
178
179 match ready!(me.stream.as_mut().poll_frame(cx)) {
186 Some(Ok(frame)) => {
187 if frame.is_data() {
188 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
189 let is_eos = me.stream.is_end_stream();
190 let len = chunk.remaining();
191 trace!("send body chunk: {} bytes, eos={}", len, is_eos);
192
193 if len == 0 {
194 let buf = SendBuf::Buf(chunk);
199 me.body_tx
200 .send_data(buf, is_eos)
201 .map_err(crate::Error::new_body_write)?;
202
203 if is_eos {
204 return Poll::Ready(Ok(()));
205 }
206 continue;
207 }
208
209 me.body_tx.reserve_capacity(len);
215 *me.buffered_data = Some(Peeked {
216 data: chunk,
217 is_eos,
218 });
219 } else if frame.is_trailers() {
220 me.body_tx.reserve_capacity(0);
222 me.body_tx
223 .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
224 .map_err(crate::Error::new_body_write)?;
225 return Poll::Ready(Ok(()));
226 } else {
227 trace!("discarding unknown frame");
228 }
230 }
231 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
232 None => {
233 return Poll::Ready(me.body_tx.send_eos_frame());
237 }
238 }
239 }
240 }
241}
242
243trait SendStreamExt {
244 fn on_user_err<E>(&mut self, err: E) -> crate::Error
245 where
246 E: Into<Box<dyn std::error::Error + Send + Sync>>;
247 fn send_eos_frame(&mut self) -> crate::Result<()>;
248}
249
250impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
251 fn on_user_err<E>(&mut self, err: E) -> crate::Error
252 where
253 E: Into<Box<dyn std::error::Error + Send + Sync>>,
254 {
255 let err = crate::Error::new_user_body(err);
256 debug!("send body user stream error: {}", err);
257 self.send_reset(err.h2_reason());
258 err
259 }
260
261 fn send_eos_frame(&mut self) -> crate::Result<()> {
262 trace!("send body eos");
263 self.send_data(SendBuf::None, true)
264 .map_err(crate::Error::new_body_write)
265 }
266}
267
268#[repr(usize)]
269enum SendBuf<B> {
270 Buf(B),
271 Cursor(Cursor<Box<[u8]>>),
272 None,
273}
274
275impl<B: Buf> Buf for SendBuf<B> {
276 #[inline]
277 fn remaining(&self) -> usize {
278 match *self {
279 Self::Buf(ref b) => b.remaining(),
280 Self::Cursor(ref c) => Buf::remaining(c),
281 Self::None => 0,
282 }
283 }
284
285 #[inline]
286 fn chunk(&self) -> &[u8] {
287 match *self {
288 Self::Buf(ref b) => b.chunk(),
289 Self::Cursor(ref c) => c.chunk(),
290 Self::None => &[],
291 }
292 }
293
294 #[inline]
295 fn advance(&mut self, cnt: usize) {
296 match *self {
297 Self::Buf(ref mut b) => b.advance(cnt),
298 Self::Cursor(ref mut c) => c.advance(cnt),
299 Self::None => {}
300 }
301 }
302
303 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
304 match *self {
305 Self::Buf(ref b) => b.chunks_vectored(dst),
306 Self::Cursor(ref c) => c.chunks_vectored(dst),
307 Self::None => 0,
308 }
309 }
310}