1use std::error::Error as StdError;
2use std::fmt;
3use std::io;
4use std::task::{Context, Poll};
5
6use bytes::{BufMut, Bytes, BytesMut};
7use futures_core::ready;
8use http::{HeaderMap, HeaderName, HeaderValue};
9use http_body::Frame;
10
11use super::io::MemRead;
12use super::role::DEFAULT_MAX_HEADERS;
13use super::DecodedLength;
14
15use self::Kind::{Chunked, Eof, Length};
16
17const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16;
21
22const TRAILER_LIMIT: usize = 1024 * 16;
26
27#[derive(Clone, PartialEq)]
32pub(crate) struct Decoder {
33 kind: Kind,
34}
35
36#[derive(Debug, Clone, PartialEq)]
37enum Kind {
38 Length(u64),
40 Chunked {
42 state: ChunkedState,
43 chunk_len: u64,
44 extensions_cnt: u64,
45 trailers_buf: Option<BytesMut>,
46 trailers_cnt: usize,
47 h1_max_headers: Option<usize>,
48 h1_max_header_size: Option<usize>,
49 },
50 Eof(bool),
67}
68
69#[derive(Debug, PartialEq, Clone, Copy)]
70enum ChunkedState {
71 Start,
72 Size,
73 SizeLws,
74 Extension,
75 SizeLf,
76 Body,
77 BodyCr,
78 BodyLf,
79 Trailer,
80 TrailerLf,
81 EndCr,
82 EndLf,
83 End,
84}
85
86impl Decoder {
87 pub(crate) fn length(x: u64) -> Decoder {
90 Decoder {
91 kind: Kind::Length(x),
92 }
93 }
94
95 pub(crate) fn chunked(
96 h1_max_headers: Option<usize>,
97 h1_max_header_size: Option<usize>,
98 ) -> Decoder {
99 Decoder {
100 kind: Kind::Chunked {
101 state: ChunkedState::new(),
102 chunk_len: 0,
103 extensions_cnt: 0,
104 trailers_buf: None,
105 trailers_cnt: 0,
106 h1_max_headers,
107 h1_max_header_size,
108 },
109 }
110 }
111
112 pub(crate) fn eof() -> Decoder {
113 Decoder {
114 kind: Kind::Eof(false),
115 }
116 }
117
118 pub(super) fn new(
119 len: DecodedLength,
120 h1_max_headers: Option<usize>,
121 h1_max_header_size: Option<usize>,
122 ) -> Self {
123 match len {
124 DecodedLength::CHUNKED => Decoder::chunked(h1_max_headers, h1_max_header_size),
125 DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
126 length => Decoder::length(length.danger_len()),
127 }
128 }
129
130 pub(crate) fn is_eof(&self) -> bool {
133 matches!(
134 self.kind,
135 Length(0)
136 | Chunked {
137 state: ChunkedState::End,
138 ..
139 }
140 | Eof(true)
141 )
142 }
143
144 pub(crate) fn decode<R: MemRead>(
145 &mut self,
146 cx: &mut Context<'_>,
147 body: &mut R,
148 ) -> Poll<Result<Frame<Bytes>, io::Error>> {
149 trace!("decode; state={:?}", self.kind);
150 match self.kind {
151 Length(ref mut remaining) => {
152 if *remaining == 0 {
153 Poll::Ready(Ok(Frame::data(Bytes::new())))
154 } else {
155 let to_read = *remaining as usize;
156 let buf = ready!(body.read_mem(cx, to_read))?;
157 let num = buf.as_ref().len() as u64;
158 if num > *remaining {
159 *remaining = 0;
160 } else if num == 0 {
161 return Poll::Ready(Err(io::Error::new(
162 io::ErrorKind::UnexpectedEof,
163 IncompleteBody,
164 )));
165 } else {
166 *remaining -= num;
167 }
168 Poll::Ready(Ok(Frame::data(buf)))
169 }
170 }
171 Chunked {
172 ref mut state,
173 ref mut chunk_len,
174 ref mut extensions_cnt,
175 ref mut trailers_buf,
176 ref mut trailers_cnt,
177 ref h1_max_headers,
178 ref h1_max_header_size,
179 } => {
180 let h1_max_headers = h1_max_headers.unwrap_or(DEFAULT_MAX_HEADERS);
181 let h1_max_header_size = h1_max_header_size.unwrap_or(TRAILER_LIMIT);
182 loop {
183 let mut buf = None;
184 *state = ready!(state.step(
186 cx,
187 body,
188 StepArgs {
189 chunk_size: chunk_len,
190 extensions_cnt,
191 chunk_buf: &mut buf,
192 trailers_buf,
193 trailers_cnt,
194 max_headers_cnt: h1_max_headers,
195 max_headers_bytes: h1_max_header_size,
196 }
197 ))?;
198 if *state == ChunkedState::End {
199 trace!("end of chunked");
200
201 if trailers_buf.is_some() {
202 trace!("found possible trailers");
203
204 if *trailers_cnt >= h1_max_headers {
206 return Poll::Ready(Err(io::Error::new(
207 io::ErrorKind::InvalidData,
208 "chunk trailers count overflow",
209 )));
210 }
211 match decode_trailers(
212 &mut trailers_buf.take().expect("Trailer is None"),
213 *trailers_cnt,
214 ) {
215 Ok(headers) => {
216 return Poll::Ready(Ok(Frame::trailers(headers)));
217 }
218 Err(e) => {
219 return Poll::Ready(Err(e));
220 }
221 }
222 }
223
224 return Poll::Ready(Ok(Frame::data(Bytes::new())));
225 }
226 if let Some(buf) = buf {
227 return Poll::Ready(Ok(Frame::data(buf)));
228 }
229 }
230 }
231 Eof(ref mut is_eof) => {
232 if *is_eof {
233 Poll::Ready(Ok(Frame::data(Bytes::new())))
234 } else {
235 body.read_mem(cx, 8192).map_ok(|slice| {
239 *is_eof = slice.is_empty();
240 Frame::data(slice)
241 })
242 }
243 }
244 }
245 }
246
247 #[cfg(test)]
248 async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Frame<Bytes>, io::Error> {
249 futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
250 }
251}
252
253impl fmt::Debug for Decoder {
254 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255 fmt::Debug::fmt(&self.kind, f)
256 }
257}
258
259macro_rules! byte (
260 ($rdr:ident, $cx:expr) => ({
261 let buf = ready!($rdr.read_mem($cx, 1))?;
262 if !buf.is_empty() {
263 buf[0]
264 } else {
265 return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
266 "unexpected EOF during chunk size line")));
267 }
268 })
269);
270
271macro_rules! or_overflow {
272 ($e:expr) => (
273 match $e {
274 Some(val) => val,
275 None => return Poll::Ready(Err(io::Error::new(
276 io::ErrorKind::InvalidData,
277 "invalid chunk size: overflow",
278 ))),
279 }
280 )
281}
282
283macro_rules! put_u8 {
284 ($trailers_buf:expr, $byte:expr, $limit:expr) => {
285 $trailers_buf.put_u8($byte);
286
287 if $trailers_buf.len() >= $limit {
288 return Poll::Ready(Err(io::Error::new(
289 io::ErrorKind::InvalidData,
290 "chunk trailers bytes over limit",
291 )));
292 }
293 };
294}
295
296struct StepArgs<'a> {
297 chunk_size: &'a mut u64,
298 chunk_buf: &'a mut Option<Bytes>,
299 extensions_cnt: &'a mut u64,
300 trailers_buf: &'a mut Option<BytesMut>,
301 trailers_cnt: &'a mut usize,
302 max_headers_cnt: usize,
303 max_headers_bytes: usize,
304}
305
306impl ChunkedState {
307 fn new() -> ChunkedState {
308 ChunkedState::Start
309 }
310 fn step<R: MemRead>(
311 &self,
312 cx: &mut Context<'_>,
313 body: &mut R,
314 StepArgs {
315 chunk_size,
316 chunk_buf,
317 extensions_cnt,
318 trailers_buf,
319 trailers_cnt,
320 max_headers_cnt,
321 max_headers_bytes,
322 }: StepArgs<'_>,
323 ) -> Poll<Result<ChunkedState, io::Error>> {
324 use self::ChunkedState::*;
325 match *self {
326 Start => ChunkedState::read_start(cx, body, chunk_size),
327 Size => ChunkedState::read_size(cx, body, chunk_size),
328 SizeLws => ChunkedState::read_size_lws(cx, body),
329 Extension => ChunkedState::read_extension(cx, body, extensions_cnt),
330 SizeLf => ChunkedState::read_size_lf(cx, body, *chunk_size),
331 Body => ChunkedState::read_body(cx, body, chunk_size, chunk_buf),
332 BodyCr => ChunkedState::read_body_cr(cx, body),
333 BodyLf => ChunkedState::read_body_lf(cx, body),
334 Trailer => ChunkedState::read_trailer(cx, body, trailers_buf, max_headers_bytes),
335 TrailerLf => ChunkedState::read_trailer_lf(
336 cx,
337 body,
338 trailers_buf,
339 trailers_cnt,
340 max_headers_cnt,
341 max_headers_bytes,
342 ),
343 EndCr => ChunkedState::read_end_cr(cx, body, trailers_buf, max_headers_bytes),
344 EndLf => ChunkedState::read_end_lf(cx, body, trailers_buf, max_headers_bytes),
345 End => Poll::Ready(Ok(ChunkedState::End)),
346 }
347 }
348
349 fn read_start<R: MemRead>(
350 cx: &mut Context<'_>,
351 rdr: &mut R,
352 size: &mut u64,
353 ) -> Poll<Result<ChunkedState, io::Error>> {
354 trace!("Read chunk start");
355
356 let radix = 16;
357 match byte!(rdr, cx) {
358 b @ b'0'..=b'9' => {
359 *size = or_overflow!(size.checked_mul(radix));
360 *size = or_overflow!(size.checked_add((b - b'0') as u64));
361 }
362 b @ b'a'..=b'f' => {
363 *size = or_overflow!(size.checked_mul(radix));
364 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
365 }
366 b @ b'A'..=b'F' => {
367 *size = or_overflow!(size.checked_mul(radix));
368 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
369 }
370 _ => {
371 return Poll::Ready(Err(io::Error::new(
372 io::ErrorKind::InvalidInput,
373 "Invalid chunk size line: missing size digit",
374 )));
375 }
376 }
377
378 Poll::Ready(Ok(ChunkedState::Size))
379 }
380
381 fn read_size<R: MemRead>(
382 cx: &mut Context<'_>,
383 rdr: &mut R,
384 size: &mut u64,
385 ) -> Poll<Result<ChunkedState, io::Error>> {
386 trace!("Read chunk hex size");
387
388 let radix = 16;
389 match byte!(rdr, cx) {
390 b @ b'0'..=b'9' => {
391 *size = or_overflow!(size.checked_mul(radix));
392 *size = or_overflow!(size.checked_add((b - b'0') as u64));
393 }
394 b @ b'a'..=b'f' => {
395 *size = or_overflow!(size.checked_mul(radix));
396 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
397 }
398 b @ b'A'..=b'F' => {
399 *size = or_overflow!(size.checked_mul(radix));
400 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
401 }
402 b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
403 b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
404 b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
405 _ => {
406 return Poll::Ready(Err(io::Error::new(
407 io::ErrorKind::InvalidInput,
408 "Invalid chunk size line: Invalid Size",
409 )));
410 }
411 }
412 Poll::Ready(Ok(ChunkedState::Size))
413 }
414 fn read_size_lws<R: MemRead>(
415 cx: &mut Context<'_>,
416 rdr: &mut R,
417 ) -> Poll<Result<ChunkedState, io::Error>> {
418 trace!("read_size_lws");
419 match byte!(rdr, cx) {
420 b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
422 b';' => Poll::Ready(Ok(ChunkedState::Extension)),
423 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
424 _ => Poll::Ready(Err(io::Error::new(
425 io::ErrorKind::InvalidInput,
426 "Invalid chunk size linear white space",
427 ))),
428 }
429 }
430 fn read_extension<R: MemRead>(
431 cx: &mut Context<'_>,
432 rdr: &mut R,
433 extensions_cnt: &mut u64,
434 ) -> Poll<Result<ChunkedState, io::Error>> {
435 trace!("read_extension");
436 match byte!(rdr, cx) {
443 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
444 b'\n' => Poll::Ready(Err(io::Error::new(
445 io::ErrorKind::InvalidData,
446 "invalid chunk extension contains newline",
447 ))),
448 _ => {
449 *extensions_cnt += 1;
450 if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT {
451 Poll::Ready(Err(io::Error::new(
452 io::ErrorKind::InvalidData,
453 "chunk extensions over limit",
454 )))
455 } else {
456 Poll::Ready(Ok(ChunkedState::Extension))
457 }
458 } }
460 }
461 fn read_size_lf<R: MemRead>(
462 cx: &mut Context<'_>,
463 rdr: &mut R,
464 size: u64,
465 ) -> Poll<Result<ChunkedState, io::Error>> {
466 trace!("Chunk size is {:?}", size);
467 match byte!(rdr, cx) {
468 b'\n' => {
469 if size == 0 {
470 Poll::Ready(Ok(ChunkedState::EndCr))
471 } else {
472 debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
473 Poll::Ready(Ok(ChunkedState::Body))
474 }
475 }
476 _ => Poll::Ready(Err(io::Error::new(
477 io::ErrorKind::InvalidInput,
478 "Invalid chunk size LF",
479 ))),
480 }
481 }
482
483 fn read_body<R: MemRead>(
484 cx: &mut Context<'_>,
485 rdr: &mut R,
486 rem: &mut u64,
487 buf: &mut Option<Bytes>,
488 ) -> Poll<Result<ChunkedState, io::Error>> {
489 trace!("Chunked read, remaining={:?}", rem);
490
491 let rem_cap = match *rem {
493 r if r > usize::MAX as u64 => usize::MAX,
494 r => r as usize,
495 };
496
497 let to_read = rem_cap;
498 let slice = ready!(rdr.read_mem(cx, to_read))?;
499 let count = slice.len();
500
501 if count == 0 {
502 *rem = 0;
503 return Poll::Ready(Err(io::Error::new(
504 io::ErrorKind::UnexpectedEof,
505 IncompleteBody,
506 )));
507 }
508 *buf = Some(slice);
509 *rem -= count as u64;
510
511 if *rem > 0 {
512 Poll::Ready(Ok(ChunkedState::Body))
513 } else {
514 Poll::Ready(Ok(ChunkedState::BodyCr))
515 }
516 }
517 fn read_body_cr<R: MemRead>(
518 cx: &mut Context<'_>,
519 rdr: &mut R,
520 ) -> Poll<Result<ChunkedState, io::Error>> {
521 match byte!(rdr, cx) {
522 b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
523 _ => Poll::Ready(Err(io::Error::new(
524 io::ErrorKind::InvalidInput,
525 "Invalid chunk body CR",
526 ))),
527 }
528 }
529 fn read_body_lf<R: MemRead>(
530 cx: &mut Context<'_>,
531 rdr: &mut R,
532 ) -> Poll<Result<ChunkedState, io::Error>> {
533 match byte!(rdr, cx) {
534 b'\n' => Poll::Ready(Ok(ChunkedState::Start)),
535 _ => Poll::Ready(Err(io::Error::new(
536 io::ErrorKind::InvalidInput,
537 "Invalid chunk body LF",
538 ))),
539 }
540 }
541
542 fn read_trailer<R: MemRead>(
543 cx: &mut Context<'_>,
544 rdr: &mut R,
545 trailers_buf: &mut Option<BytesMut>,
546 h1_max_header_size: usize,
547 ) -> Poll<Result<ChunkedState, io::Error>> {
548 trace!("read_trailer");
549 let byte = byte!(rdr, cx);
550
551 put_u8!(
552 trailers_buf.as_mut().expect("trailers_buf is None"),
553 byte,
554 h1_max_header_size
555 );
556
557 match byte {
558 b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
559 _ => Poll::Ready(Ok(ChunkedState::Trailer)),
560 }
561 }
562
563 fn read_trailer_lf<R: MemRead>(
564 cx: &mut Context<'_>,
565 rdr: &mut R,
566 trailers_buf: &mut Option<BytesMut>,
567 trailers_cnt: &mut usize,
568 h1_max_headers: usize,
569 h1_max_header_size: usize,
570 ) -> Poll<Result<ChunkedState, io::Error>> {
571 let byte = byte!(rdr, cx);
572 match byte {
573 b'\n' => {
574 if *trailers_cnt >= h1_max_headers {
575 return Poll::Ready(Err(io::Error::new(
576 io::ErrorKind::InvalidData,
577 "chunk trailers count overflow",
578 )));
579 }
580 *trailers_cnt += 1;
581
582 put_u8!(
583 trailers_buf.as_mut().expect("trailers_buf is None"),
584 byte,
585 h1_max_header_size
586 );
587
588 Poll::Ready(Ok(ChunkedState::EndCr))
589 }
590 _ => Poll::Ready(Err(io::Error::new(
591 io::ErrorKind::InvalidInput,
592 "Invalid trailer end LF",
593 ))),
594 }
595 }
596
597 fn read_end_cr<R: MemRead>(
598 cx: &mut Context<'_>,
599 rdr: &mut R,
600 trailers_buf: &mut Option<BytesMut>,
601 h1_max_header_size: usize,
602 ) -> Poll<Result<ChunkedState, io::Error>> {
603 let byte = byte!(rdr, cx);
604 match byte {
605 b'\r' => {
606 if let Some(trailers_buf) = trailers_buf {
607 put_u8!(trailers_buf, byte, h1_max_header_size);
608 }
609 Poll::Ready(Ok(ChunkedState::EndLf))
610 }
611 byte => {
612 match trailers_buf {
613 None => {
614 let mut buf = BytesMut::with_capacity(64);
616 buf.put_u8(byte);
617 *trailers_buf = Some(buf);
618 }
619 Some(ref mut trailers_buf) => {
620 put_u8!(trailers_buf, byte, h1_max_header_size);
621 }
622 }
623
624 Poll::Ready(Ok(ChunkedState::Trailer))
625 }
626 }
627 }
628 fn read_end_lf<R: MemRead>(
629 cx: &mut Context<'_>,
630 rdr: &mut R,
631 trailers_buf: &mut Option<BytesMut>,
632 h1_max_header_size: usize,
633 ) -> Poll<Result<ChunkedState, io::Error>> {
634 let byte = byte!(rdr, cx);
635 match byte {
636 b'\n' => {
637 if let Some(trailers_buf) = trailers_buf {
638 put_u8!(trailers_buf, byte, h1_max_header_size);
639 }
640 Poll::Ready(Ok(ChunkedState::End))
641 }
642 _ => Poll::Ready(Err(io::Error::new(
643 io::ErrorKind::InvalidInput,
644 "Invalid chunk end LF",
645 ))),
646 }
647 }
648}
649
650fn decode_trailers(buf: &mut BytesMut, count: usize) -> Result<HeaderMap, io::Error> {
652 let mut trailers = HeaderMap::new();
653 let mut headers = vec![httparse::EMPTY_HEADER; count];
654 let res = httparse::parse_headers(buf, &mut headers);
655 match res {
656 Ok(httparse::Status::Complete((_, headers))) => {
657 for header in headers.iter() {
658 use std::convert::TryFrom;
659 let name = match HeaderName::try_from(header.name) {
660 Ok(name) => name,
661 Err(_) => {
662 return Err(io::Error::new(
663 io::ErrorKind::InvalidInput,
664 format!("Invalid header name: {:?}", &header),
665 ));
666 }
667 };
668
669 let value = match HeaderValue::from_bytes(header.value) {
670 Ok(value) => value,
671 Err(_) => {
672 return Err(io::Error::new(
673 io::ErrorKind::InvalidInput,
674 format!("Invalid header value: {:?}", &header),
675 ));
676 }
677 };
678
679 trailers.insert(name, value);
680 }
681
682 Ok(trailers)
683 }
684 Ok(httparse::Status::Partial) => Err(io::Error::new(
685 io::ErrorKind::InvalidInput,
686 "Partial header",
687 )),
688 Err(e) => Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
689 }
690}
691
692#[derive(Debug)]
693struct IncompleteBody;
694
695impl fmt::Display for IncompleteBody {
696 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697 write!(f, "end of file before message length reached")
698 }
699}
700
701impl StdError for IncompleteBody {}
702
703#[cfg(test)]
704mod tests {
705 use super::*;
706 use crate::rt::{Read, ReadBuf};
707 use std::pin::Pin;
708 use std::time::Duration;
709
710 impl MemRead for &[u8] {
711 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
712 let n = std::cmp::min(len, self.len());
713 if n > 0 {
714 let (a, b) = self.split_at(n);
715 let buf = Bytes::copy_from_slice(a);
716 *self = b;
717 Poll::Ready(Ok(buf))
718 } else {
719 Poll::Ready(Ok(Bytes::new()))
720 }
721 }
722 }
723
724 impl MemRead for &mut (dyn Read + Unpin) {
725 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
726 let mut v = vec![0; len];
727 let mut buf = ReadBuf::new(&mut v);
728 ready!(Pin::new(self).poll_read(cx, buf.unfilled())?);
729 Poll::Ready(Ok(Bytes::copy_from_slice(buf.filled())))
730 }
731 }
732
733 impl MemRead for Bytes {
734 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
735 let n = std::cmp::min(len, self.len());
736 let ret = self.split_to(n);
737 Poll::Ready(Ok(ret))
738 }
739 }
740
741 #[cfg(not(miri))]
752 #[tokio::test]
753 async fn test_read_chunk_size() {
754 use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
755
756 async fn read(s: &str) -> u64 {
757 let mut state = ChunkedState::new();
758 let rdr = &mut s.as_bytes();
759 let mut size = 0;
760 let mut ext_cnt = 0;
761 let mut trailers_cnt = 0;
762 loop {
763 let result = futures_util::future::poll_fn(|cx| {
764 state.step(
765 cx,
766 rdr,
767 StepArgs {
768 chunk_size: &mut size,
769 extensions_cnt: &mut ext_cnt,
770 chunk_buf: &mut None,
771 trailers_buf: &mut None,
772 trailers_cnt: &mut trailers_cnt,
773 max_headers_cnt: DEFAULT_MAX_HEADERS,
774 max_headers_bytes: TRAILER_LIMIT,
775 },
776 )
777 })
778 .await;
779 let desc = format!("read_size failed for {:?}", s);
780 state = result.expect(&desc);
781 if state == ChunkedState::Body || state == ChunkedState::EndCr {
782 break;
783 }
784 }
785 size
786 }
787
788 async fn read_err(s: &str, expected_err: io::ErrorKind) {
789 let mut state = ChunkedState::new();
790 let rdr = &mut s.as_bytes();
791 let mut size = 0;
792 let mut ext_cnt = 0;
793 let mut trailers_cnt = 0;
794 loop {
795 let result = futures_util::future::poll_fn(|cx| {
796 state.step(
797 cx,
798 rdr,
799 StepArgs {
800 chunk_size: &mut size,
801 extensions_cnt: &mut ext_cnt,
802 chunk_buf: &mut None,
803 trailers_buf: &mut None,
804 trailers_cnt: &mut trailers_cnt,
805 max_headers_cnt: DEFAULT_MAX_HEADERS,
806 max_headers_bytes: TRAILER_LIMIT,
807 },
808 )
809 })
810 .await;
811 state = match result {
812 Ok(s) => s,
813 Err(e) => {
814 assert!(
815 expected_err == e.kind(),
816 "Reading {:?}, expected {:?}, but got {:?}",
817 s,
818 expected_err,
819 e.kind()
820 );
821 return;
822 }
823 };
824 if state == ChunkedState::Body || state == ChunkedState::End {
825 panic!("Was Ok. Expected Err for {:?}", s);
826 }
827 }
828 }
829
830 assert_eq!(1, read("1\r\n").await);
831 assert_eq!(1, read("01\r\n").await);
832 assert_eq!(0, read("0\r\n").await);
833 assert_eq!(0, read("00\r\n").await);
834 assert_eq!(10, read("A\r\n").await);
835 assert_eq!(10, read("a\r\n").await);
836 assert_eq!(255, read("Ff\r\n").await);
837 assert_eq!(255, read("Ff \r\n").await);
838 read_err("F\rF", InvalidInput).await;
840 read_err("F", UnexpectedEof).await;
841 read_err("\r\n\r\n", InvalidInput).await;
843 read_err("\r\n", InvalidInput).await;
844 read_err("X\r\n", InvalidInput).await;
846 read_err("1X\r\n", InvalidInput).await;
847 read_err("-\r\n", InvalidInput).await;
848 read_err("-1\r\n", InvalidInput).await;
849 assert_eq!(1, read("1;extension\r\n").await);
851 assert_eq!(10, read("a;ext name=value\r\n").await);
852 assert_eq!(1, read("1;extension;extension2\r\n").await);
853 assert_eq!(1, read("1;;; ;\r\n").await);
854 assert_eq!(2, read("2; extension...\r\n").await);
855 assert_eq!(3, read("3 ; extension=123\r\n").await);
856 assert_eq!(3, read("3 ;\r\n").await);
857 assert_eq!(3, read("3 ; \r\n").await);
858 read_err("1 invalid extension\r\n", InvalidInput).await;
860 read_err("1 A\r\n", InvalidInput).await;
861 read_err("1;no CRLF", UnexpectedEof).await;
862 read_err("1;reject\nnewlines\r\n", InvalidData).await;
863 read_err("f0000000000000003\r\n", InvalidData).await;
865 }
866
867 #[cfg(not(miri))]
868 #[tokio::test]
869 async fn test_read_sized_early_eof() {
870 let mut bytes = &b"foo bar"[..];
871 let mut decoder = Decoder::length(10);
872 assert_eq!(
873 decoder
874 .decode_fut(&mut bytes)
875 .await
876 .unwrap()
877 .data_ref()
878 .unwrap()
879 .len(),
880 7
881 );
882 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
883 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
884 }
885
886 #[cfg(not(miri))]
887 #[tokio::test]
888 async fn test_read_chunked_early_eof() {
889 let mut bytes = &b"\
890 9\r\n\
891 foo bar\
892 "[..];
893 let mut decoder = Decoder::chunked(None, None);
894 assert_eq!(
895 decoder
896 .decode_fut(&mut bytes)
897 .await
898 .unwrap()
899 .data_ref()
900 .unwrap()
901 .len(),
902 7
903 );
904 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
905 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
906 }
907
908 #[cfg(not(miri))]
909 #[tokio::test]
910 async fn test_read_chunked_single_read() {
911 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
912 let buf = Decoder::chunked(None, None)
913 .decode_fut(&mut mock_buf)
914 .await
915 .expect("decode")
916 .into_data()
917 .expect("unknown frame type");
918 assert_eq!(16, buf.len());
919 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
920 assert_eq!("1234567890abcdef", &result);
921 }
922
923 #[tokio::test]
924 async fn test_read_chunked_with_missing_zero_digit() {
925 let mut mock_buf = &b"1\r\nZ\r\n\r\n\r\n"[..];
927 let mut decoder = Decoder::chunked(None, None);
928 let buf = decoder
929 .decode_fut(&mut mock_buf)
930 .await
931 .expect("decode")
932 .into_data()
933 .expect("unknown frame type");
934 assert_eq!("Z", buf);
935
936 let err = decoder
937 .decode_fut(&mut mock_buf)
938 .await
939 .expect_err("decode 2");
940 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
941 }
942
943 #[tokio::test]
944 async fn test_read_chunked_extensions_over_limit() {
945 let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3;
948 let mut scratch = vec![];
949 for _ in 0..2 {
950 scratch.extend(b"1;");
951 scratch.extend(b"x".repeat(per_chunk as usize));
952 scratch.extend(b"\r\nA\r\n");
953 }
954 scratch.extend(b"0\r\n\r\n");
955 let mut mock_buf = Bytes::from(scratch);
956
957 let mut decoder = Decoder::chunked(None, None);
958 let buf1 = decoder
959 .decode_fut(&mut mock_buf)
960 .await
961 .expect("decode1")
962 .into_data()
963 .expect("unknown frame type");
964 assert_eq!(&buf1[..], b"A");
965
966 let err = decoder
967 .decode_fut(&mut mock_buf)
968 .await
969 .expect_err("decode2");
970 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
971 assert_eq!(err.to_string(), "chunk extensions over limit");
972 }
973
974 #[cfg(not(miri))]
975 #[tokio::test]
976 async fn test_read_chunked_trailer_with_missing_lf() {
977 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
978 let mut decoder = Decoder::chunked(None, None);
979 decoder.decode_fut(&mut mock_buf).await.expect("decode");
980 let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
981 assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
982 }
983
984 #[cfg(not(miri))]
985 #[tokio::test]
986 async fn test_read_chunked_after_eof() {
987 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
988 let mut decoder = Decoder::chunked(None, None);
989
990 let buf = decoder
992 .decode_fut(&mut mock_buf)
993 .await
994 .unwrap()
995 .into_data()
996 .expect("unknown frame type");
997 assert_eq!(16, buf.len());
998 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
999 assert_eq!("1234567890abcdef", &result);
1000
1001 let buf = decoder
1003 .decode_fut(&mut mock_buf)
1004 .await
1005 .expect("decode")
1006 .into_data()
1007 .expect("unknown frame type");
1008 assert_eq!(0, buf.len());
1009
1010 let buf = decoder
1012 .decode_fut(&mut mock_buf)
1013 .await
1014 .expect("decode")
1015 .into_data()
1016 .expect("unknown frame type");
1017 assert_eq!(0, buf.len());
1018 }
1019
1020 async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
1023 let mut outs = Vec::new();
1024
1025 let mut ins = crate::common::io::Compat::new(if block_at == 0 {
1026 tokio_test::io::Builder::new()
1027 .wait(Duration::from_millis(10))
1028 .read(content)
1029 .build()
1030 } else {
1031 tokio_test::io::Builder::new()
1032 .read(&content[..block_at])
1033 .wait(Duration::from_millis(10))
1034 .read(&content[block_at..])
1035 .build()
1036 });
1037
1038 let mut ins = &mut ins as &mut (dyn Read + Unpin);
1039
1040 loop {
1041 let buf = decoder
1042 .decode_fut(&mut ins)
1043 .await
1044 .expect("unexpected decode error")
1045 .into_data()
1046 .expect("unexpected frame type");
1047 if buf.is_empty() {
1048 break; }
1050 outs.extend(buf.as_ref());
1051 }
1052
1053 String::from_utf8(outs).expect("decode String")
1054 }
1055
1056 async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
1059 let content_len = content.len();
1060 for block_at in 0..content_len {
1061 let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
1062 assert_eq!(expected, &actual) }
1064 }
1065
1066 #[cfg(not(miri))]
1067 #[tokio::test]
1068 async fn test_read_length_async() {
1069 let content = "foobar";
1070 all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
1071 }
1072
1073 #[cfg(not(miri))]
1074 #[tokio::test]
1075 async fn test_read_chunked_async() {
1076 let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
1077 let expected = "foobar";
1078 all_async_cases(content, expected, Decoder::chunked(None, None)).await;
1079 }
1080
1081 #[cfg(not(miri))]
1082 #[tokio::test]
1083 async fn test_read_eof_async() {
1084 let content = "foobar";
1085 all_async_cases(content, content, Decoder::eof()).await;
1086 }
1087
1088 #[cfg(all(feature = "nightly", not(miri)))]
1089 #[bench]
1090 fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
1091 let rt = new_runtime();
1092
1093 const LEN: usize = 1024;
1094 let mut vec = Vec::new();
1095 vec.extend(format!("{:x}\r\n", LEN).as_bytes());
1096 vec.extend(&[0; LEN][..]);
1097 vec.extend(b"\r\n");
1098 let content = Bytes::from(vec);
1099
1100 b.bytes = LEN as u64;
1101
1102 b.iter(|| {
1103 let mut decoder = Decoder::chunked(None, None);
1104 rt.block_on(async {
1105 let mut raw = content.clone();
1106 let chunk = decoder
1107 .decode_fut(&mut raw)
1108 .await
1109 .unwrap()
1110 .into_data()
1111 .unwrap();
1112 assert_eq!(chunk.len(), LEN);
1113 });
1114 });
1115 }
1116
1117 #[cfg(all(feature = "nightly", not(miri)))]
1118 #[bench]
1119 fn bench_decode_length_1kb(b: &mut test::Bencher) {
1120 let rt = new_runtime();
1121
1122 const LEN: usize = 1024;
1123 let content = Bytes::from(&[0; LEN][..]);
1124 b.bytes = LEN as u64;
1125
1126 b.iter(|| {
1127 let mut decoder = Decoder::length(LEN as u64);
1128 rt.block_on(async {
1129 let mut raw = content.clone();
1130 let chunk = decoder
1131 .decode_fut(&mut raw)
1132 .await
1133 .unwrap()
1134 .into_data()
1135 .unwrap();
1136 assert_eq!(chunk.len(), LEN);
1137 });
1138 });
1139 }
1140
1141 #[cfg(feature = "nightly")]
1142 fn new_runtime() -> tokio::runtime::Runtime {
1143 tokio::runtime::Builder::new_current_thread()
1144 .enable_all()
1145 .build()
1146 .expect("rt build")
1147 }
1148
1149 #[test]
1150 fn test_decode_trailers() {
1151 let mut buf = BytesMut::new();
1152 buf.extend_from_slice(
1153 b"Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\nX-Stream-Error: failed to decode\r\n\r\n",
1154 );
1155 let headers = decode_trailers(&mut buf, 2).expect("decode_trailers");
1156 assert_eq!(headers.len(), 2);
1157 assert_eq!(
1158 headers.get("Expires").unwrap(),
1159 "Wed, 21 Oct 2015 07:28:00 GMT"
1160 );
1161 assert_eq!(headers.get("X-Stream-Error").unwrap(), "failed to decode");
1162 }
1163
1164 #[tokio::test]
1165 async fn test_trailer_max_headers_enforced() {
1166 let h1_max_headers = 10;
1167 let mut scratch = vec![];
1168 scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n");
1169 for i in 0..h1_max_headers {
1170 scratch.extend(format!("trailer{}: {}\r\n", i, i).as_bytes());
1171 }
1172 scratch.extend(b"\r\n");
1173 let mut mock_buf = Bytes::from(scratch);
1174
1175 let mut decoder = Decoder::chunked(Some(h1_max_headers), None);
1176
1177 let buf = decoder
1179 .decode_fut(&mut mock_buf)
1180 .await
1181 .unwrap()
1182 .into_data()
1183 .expect("unknown frame type");
1184 assert_eq!(16, buf.len());
1185
1186 let err = decoder
1188 .decode_fut(&mut mock_buf)
1189 .await
1190 .expect_err("trailer fields over limit");
1191 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1192 }
1193
1194 #[tokio::test]
1195 async fn test_trailer_max_header_size_huge_trailer() {
1196 let max_header_size = 1024;
1197 let mut scratch = vec![];
1198 scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n");
1199 scratch.extend(format!("huge_trailer: {}\r\n", "x".repeat(max_header_size)).as_bytes());
1200 scratch.extend(b"\r\n");
1201 let mut mock_buf = Bytes::from(scratch);
1202
1203 let mut decoder = Decoder::chunked(None, Some(max_header_size));
1204
1205 let buf = decoder
1207 .decode_fut(&mut mock_buf)
1208 .await
1209 .unwrap()
1210 .into_data()
1211 .expect("unknown frame type");
1212 assert_eq!(16, buf.len());
1213
1214 let err = decoder
1216 .decode_fut(&mut mock_buf)
1217 .await
1218 .expect_err("trailers over limit");
1219 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_trailer_max_header_size_many_small_trailers() {
1224 let max_headers = 10;
1225 let header_size = 64;
1226 let mut scratch = vec![];
1227 scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n");
1228
1229 for i in 0..max_headers {
1230 scratch.extend(format!("trailer{}: {}\r\n", i, "x".repeat(header_size)).as_bytes());
1231 }
1232
1233 scratch.extend(b"\r\n");
1234 let mut mock_buf = Bytes::from(scratch);
1235
1236 let mut decoder = Decoder::chunked(None, Some(max_headers * header_size));
1237
1238 let buf = decoder
1240 .decode_fut(&mut mock_buf)
1241 .await
1242 .unwrap()
1243 .into_data()
1244 .expect("unknown frame type");
1245 assert_eq!(16, buf.len());
1246
1247 let err = decoder
1249 .decode_fut(&mut mock_buf)
1250 .await
1251 .expect_err("trailers over limit");
1252 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1253 }
1254}