Skip to main content

h2/proto/streams/
recv.rs

1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15    /// Initial window size of remote initiated streams
16    init_window_sz: WindowSize,
17
18    /// Connection level flow control governing received data
19    flow: FlowControl,
20
21    /// Amount of connection window capacity currently used by outstanding streams.
22    in_flight_data: WindowSize,
23
24    /// The lowest stream ID that is still idle
25    next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27    /// The stream ID of the last processed stream
28    last_processed_id: StreamId,
29
30    /// Any streams with a higher ID are ignored.
31    ///
32    /// This starts as MAX, but is lowered when a GOAWAY is received.
33    ///
34    /// > After sending a GOAWAY frame, the sender can discard frames for
35    /// > streams initiated by the receiver with identifiers higher than
36    /// > the identified last stream.
37    max_stream_id: StreamId,
38
39    /// Streams that have pending window updates
40    pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42    /// New streams to be accepted
43    pending_accept: store::Queue<stream::NextAccept>,
44
45    /// Locally reset streams that should be reaped when they expire
46    pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48    /// How long locally reset streams should ignore received frames
49    reset_duration: Duration,
50
51    /// Holds frames that are waiting to be read
52    buffer: Buffer<Event>,
53
54    /// Refused StreamId, this represents a frame that must be sent out.
55    refused: Option<StreamId>,
56
57    /// If push promises are allowed to be received.
58    is_push_enabled: bool,
59
60    /// If extended connect protocol is enabled.
61    is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66    Headers(peer::PollMessage),
67    Data(Bytes),
68    Trailers(HeaderMap),
69    InformationalHeaders(peer::PollMessage),
70}
71
72#[derive(Debug)]
73pub(super) enum RecvHeaderBlockError<T> {
74    Oversize(T),
75    State(Error),
76}
77
78#[derive(Debug)]
79pub(crate) enum Open {
80    PushPromise,
81    Headers,
82}
83
84impl Recv {
85    pub fn new(peer: peer::Dyn, config: &Config) -> Self {
86        let next_stream_id = if peer.is_server() { 1 } else { 2 };
87
88        let mut flow = FlowControl::new();
89
90        // connections always have the default window size, regardless of
91        // settings
92        flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
93            .expect("invalid initial remote window size");
94        flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
95
96        Recv {
97            init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
98            flow,
99            in_flight_data: 0 as WindowSize,
100            next_stream_id: Ok(next_stream_id.into()),
101            pending_window_updates: store::Queue::new(),
102            last_processed_id: StreamId::ZERO,
103            max_stream_id: StreamId::MAX,
104            pending_accept: store::Queue::new(),
105            pending_reset_expired: store::Queue::new(),
106            reset_duration: config.local_reset_duration,
107            buffer: Buffer::new(),
108            refused: None,
109            is_push_enabled: config.local_push_enabled,
110            is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
111        }
112    }
113
114    /// Returns the initial receive window size
115    pub fn init_window_sz(&self) -> WindowSize {
116        self.init_window_sz
117    }
118
119    /// Returns the ID of the last processed stream
120    pub fn last_processed_id(&self) -> StreamId {
121        self.last_processed_id
122    }
123
124    /// Update state reflecting a new, remotely opened stream
125    ///
126    /// Returns the stream state if successful. `None` if refused
127    pub fn open(
128        &mut self,
129        id: StreamId,
130        mode: Open,
131        counts: &mut Counts,
132    ) -> Result<Option<StreamId>, Error> {
133        assert!(self.refused.is_none());
134
135        counts.peer().ensure_can_open(id, mode)?;
136
137        let next_id = self.next_stream_id()?;
138        if id < next_id {
139            proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
140            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
141        }
142
143        self.next_stream_id = id.next_id();
144
145        if !counts.can_inc_num_recv_streams() {
146            self.refused = Some(id);
147            return Ok(None);
148        }
149
150        Ok(Some(id))
151    }
152
153    /// Transition the stream state based on receiving headers
154    ///
155    /// The caller ensures that the frame represents headers and not trailers.
156    pub fn recv_headers(
157        &mut self,
158        frame: frame::Headers,
159        stream: &mut store::Ptr,
160        counts: &mut Counts,
161    ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
162        tracing::trace!("opening stream; init_window={}", self.init_window_sz);
163        let is_initial = stream.state.recv_open(&frame)?;
164
165        if is_initial {
166            // TODO: be smarter about this logic
167            if frame.stream_id() > self.last_processed_id {
168                self.last_processed_id = frame.stream_id();
169            }
170
171            // Increment the number of concurrent streams
172            counts.inc_num_recv_streams(stream);
173        }
174
175        if !stream.content_length.is_head() {
176            use super::stream::ContentLength;
177            use http::header;
178
179            if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
180                let content_length = match frame::parse_u64(content_length.as_bytes()) {
181                    Ok(v) => v,
182                    Err(_) => {
183                        proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
184                        return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
185                    }
186                };
187
188                stream.content_length = ContentLength::Remaining(content_length);
189                // END_STREAM on headers frame with non-zero content-length is malformed.
190                // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
191                if frame.is_end_stream()
192                    && content_length > 0
193                    && frame
194                        .pseudo()
195                        .status
196                        .map_or(true, |status| status != 204 && status != 304)
197                {
198                    proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
199                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
200                }
201            }
202        }
203
204        if frame.is_over_size() {
205            // A frame is over size if the decoded header block was bigger than
206            // SETTINGS_MAX_HEADER_LIST_SIZE.
207            //
208            // > A server that receives a larger header block than it is willing
209            // > to handle can send an HTTP 431 (Request Header Fields Too
210            // > Large) status code [RFC6585]. A client can discard responses
211            // > that it cannot process.
212            //
213            // So, if peer is a server, we'll send a 431. In either case,
214            // an error is recorded, which will send a REFUSED_STREAM,
215            // since we don't want any of the data frames either.
216            tracing::debug!(
217                "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
218                 recv_headers: frame is over size; stream={:?}",
219                stream.id
220            );
221            return if counts.peer().is_server() && is_initial {
222                let mut res = frame::Headers::new(
223                    stream.id,
224                    frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
225                    HeaderMap::new(),
226                );
227                res.set_end_stream();
228                Err(RecvHeaderBlockError::Oversize(Some(res)))
229            } else {
230                Err(RecvHeaderBlockError::Oversize(None))
231            };
232        }
233
234        let stream_id = frame.stream_id();
235        let (pseudo, fields) = frame.into_parts();
236
237        if pseudo.protocol.is_some()
238            && counts.peer().is_server()
239            && !self.is_extended_connect_protocol_enabled
240        {
241            proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
242            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
243        }
244
245        if pseudo.status.is_some() && counts.peer().is_server() {
246            proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
247            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
248        }
249
250        if !pseudo.is_informational() {
251            let message = counts
252                .peer()
253                .convert_poll_message(pseudo, fields, stream_id)?;
254
255            // Push the frame onto the stream's recv buffer
256            stream
257                .pending_recv
258                .push_back(&mut self.buffer, Event::Headers(message));
259            stream.notify_recv();
260
261            // Only servers can receive a headers frame that initiates the stream.
262            // This is verified in `Streams` before calling this function.
263            if counts.peer().is_server() {
264                // Correctness: never push a stream to `pending_accept` without having the
265                // corresponding headers frame pushed to `stream.pending_recv`.
266                self.pending_accept.push(stream);
267            }
268        } else {
269            // This is an informational response (1xx status code)
270            // Convert to response and store it for polling
271            let message = counts
272                .peer()
273                .convert_poll_message(pseudo, fields, stream_id)?;
274
275            tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276
277            // Push the informational response onto the stream's recv buffer
278            // with a special event type so it can be polled separately
279            stream
280                .pending_recv
281                .push_back(&mut self.buffer, Event::InformationalHeaders(message));
282            stream.notify_recv();
283        }
284
285        Ok(())
286    }
287
288    /// Called by the server to get the request
289    ///
290    /// # Panics
291    ///
292    /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
293    ///
294    pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
295        use super::peer::PollMessage::*;
296
297        match stream.pending_recv.pop_front(&mut self.buffer) {
298            Some(Event::Headers(Server(request))) => request,
299            _ => unreachable!("server stream queue must start with Headers"),
300        }
301    }
302
303    /// Called by the client to get pushed response
304    pub fn poll_pushed(
305        &mut self,
306        cx: &Context,
307        stream: &mut store::Ptr,
308    ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
309        use super::peer::PollMessage::*;
310
311        let mut ppp = stream.pending_push_promises.take();
312        let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
313            match pushed.pending_recv.pop_front(&mut self.buffer) {
314                Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
315                // When frames are pushed into the queue, it is verified that
316                // the first frame is a HEADERS frame.
317                _ => panic!("Headers not set on pushed stream"),
318            }
319        });
320        stream.pending_push_promises = ppp;
321        if let Some(p) = pushed {
322            Poll::Ready(Some(Ok(p)))
323        } else {
324            let is_open = stream.state.ensure_recv_open()?;
325
326            if is_open {
327                stream.push_task = Some(cx.waker().clone());
328                Poll::Pending
329            } else {
330                Poll::Ready(None)
331            }
332        }
333    }
334
335    /// Called by the client to get the response
336    pub fn poll_response(
337        &mut self,
338        cx: &Context,
339        stream: &mut store::Ptr,
340    ) -> Poll<Result<Response<()>, proto::Error>> {
341        use super::peer::PollMessage::*;
342
343        // Skip over any interim informational headers to find the main response
344        loop {
345            match stream.pending_recv.pop_front(&mut self.buffer) {
346                Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347                Some(Event::InformationalHeaders(_)) => {
348                    tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349                    continue;
350                }
351                Some(_) => panic!("poll_response called after response returned"),
352                None => {
353                    if !stream.state.ensure_recv_open()? {
354                        proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
355                        return Poll::Ready(Err(Error::library_reset(
356                            stream.id,
357                            Reason::PROTOCOL_ERROR,
358                        )));
359                    }
360
361                    stream.recv_task = Some(cx.waker().clone());
362                    return Poll::Pending;
363                }
364            }
365        }
366    }
367
368    /// Called by the client to get informational responses (1xx status codes)
369    pub fn poll_informational(
370        &mut self,
371        cx: &Context,
372        stream: &mut store::Ptr,
373    ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374        use super::peer::PollMessage::*;
375
376        // Try to pop the front event and check if it's an informational response
377        // If it's not, we put it back
378        if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379            match event {
380                Event::Headers(Client(response)) => {
381                    // Final response
382                    stream
383                        .pending_recv
384                        .push_front(&mut self.buffer, Event::Headers(Client(response)));
385                    return Poll::Ready(None);
386                }
387                Event::InformationalHeaders(Client(response)) => {
388                    // Found an informational response, return it
389                    return Poll::Ready(Some(Ok(response)));
390                }
391                other => {
392                    // Not an informational response, put it back at the front
393                    stream.pending_recv.push_front(&mut self.buffer, other);
394                }
395            }
396        }
397
398        // No informational response available at the front
399        if stream.state.ensure_recv_open()? {
400            // Request to get notified once more frames arrive
401            stream.recv_task = Some(cx.waker().clone());
402            Poll::Pending
403        } else {
404            // No more frames will be received
405            Poll::Ready(None)
406        }
407    }
408
409    /// Transition the stream based on receiving trailers
410    pub fn recv_trailers(
411        &mut self,
412        frame: frame::Headers,
413        stream: &mut store::Ptr,
414    ) -> Result<(), Error> {
415        // Transition the state
416        stream.state.recv_close()?;
417
418        if stream.ensure_content_length_zero().is_err() {
419            proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
420            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
421        }
422
423        let trailers = frame.into_fields();
424
425        // Push the frame onto the stream's recv buffer
426        stream
427            .pending_recv
428            .push_back(&mut self.buffer, Event::Trailers(trailers));
429        stream.notify_recv();
430
431        Ok(())
432    }
433
434    /// Releases capacity of the connection
435    pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
436        tracing::trace!(
437            "release_connection_capacity; size={}, connection in_flight_data={}",
438            capacity,
439            self.in_flight_data,
440        );
441
442        // Decrement in-flight data
443        self.in_flight_data -= capacity;
444
445        // Assign capacity to connection
446        // TODO: proper error handling
447        let _res = self.flow.assign_capacity(capacity);
448        debug_assert!(_res.is_ok());
449
450        if self.flow.unclaimed_capacity().is_some() {
451            if let Some(task) = task.take() {
452                task.wake();
453            }
454        }
455    }
456
457    /// Releases capacity back to the connection & stream
458    pub fn release_capacity(
459        &mut self,
460        capacity: WindowSize,
461        stream: &mut store::Ptr,
462        task: &mut Option<Waker>,
463    ) -> Result<(), UserError> {
464        tracing::trace!("release_capacity; size={}", capacity);
465
466        if capacity > stream.in_flight_recv_data {
467            return Err(UserError::ReleaseCapacityTooBig);
468        }
469
470        self.release_connection_capacity(capacity, task);
471
472        // Decrement in-flight data
473        stream.in_flight_recv_data -= capacity;
474
475        // Assign capacity to stream
476        // TODO: proper error handling
477        let _res = stream.recv_flow.assign_capacity(capacity);
478        debug_assert!(_res.is_ok());
479
480        if stream.recv_flow.unclaimed_capacity().is_some() {
481            // Queue the stream for sending the WINDOW_UPDATE frame.
482            self.pending_window_updates.push(stream);
483
484            if let Some(task) = task.take() {
485                task.wake();
486            }
487        }
488
489        Ok(())
490    }
491
492    /// Release any unclaimed capacity for a closed stream.
493    pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
494        debug_assert_eq!(stream.ref_count, 0);
495
496        if stream.in_flight_recv_data == 0 {
497            return;
498        }
499
500        tracing::trace!(
501            "auto-release closed stream ({:?}) capacity: {:?}",
502            stream.id,
503            stream.in_flight_recv_data,
504        );
505
506        self.release_connection_capacity(stream.in_flight_recv_data, task);
507        stream.in_flight_recv_data = 0;
508
509        self.clear_recv_buffer(stream);
510    }
511
512    /// Set the "target" connection window size.
513    ///
514    /// By default, all new connections start with 64kb of window size. As
515    /// streams used and release capacity, we will send WINDOW_UPDATEs for the
516    /// connection to bring it back up to the initial "target".
517    ///
518    /// Setting a target means that we will try to tell the peer about
519    /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
520    /// for the whole connection.
521    ///
522    /// The `task` is an optional parked task for the `Connection` that might
523    /// be blocked on needing more window capacity.
524    pub fn set_target_connection_window(
525        &mut self,
526        target: WindowSize,
527        task: &mut Option<Waker>,
528    ) -> Result<(), Reason> {
529        tracing::trace!(
530            "set_target_connection_window; target={}; available={}, reserved={}",
531            target,
532            self.flow.available(),
533            self.in_flight_data,
534        );
535
536        // The current target connection window is our `available` plus any
537        // in-flight data reserved by streams.
538        //
539        // Update the flow controller with the difference between the new
540        // target and the current target.
541        let current = self
542            .flow
543            .available()
544            .add(self.in_flight_data)?
545            .checked_size();
546        if target > current {
547            self.flow.assign_capacity(target - current)?;
548        } else {
549            self.flow.claim_capacity(current - target)?;
550        }
551
552        // If changing the target capacity means we gained a bunch of capacity,
553        // enough that we went over the update threshold, then schedule sending
554        // a connection WINDOW_UPDATE.
555        if self.flow.unclaimed_capacity().is_some() {
556            if let Some(task) = task.take() {
557                task.wake();
558            }
559        }
560        Ok(())
561    }
562
563    pub(crate) fn apply_local_settings(
564        &mut self,
565        settings: &frame::Settings,
566        store: &mut Store,
567    ) -> Result<(), proto::Error> {
568        if let Some(val) = settings.is_extended_connect_protocol_enabled() {
569            self.is_extended_connect_protocol_enabled = val;
570        }
571
572        if let Some(target) = settings.initial_window_size() {
573            let old_sz = self.init_window_sz;
574            self.init_window_sz = target;
575
576            tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
577
578            // Per RFC 7540 ยง6.9.2:
579            //
580            // In addition to changing the flow-control window for streams that are
581            // not yet active, a SETTINGS frame can alter the initial flow-control
582            // window size for streams with active flow-control windows (that is,
583            // streams in the "open" or "half-closed (remote)" state). When the
584            // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
585            // the size of all stream flow-control windows that it maintains by the
586            // difference between the new value and the old value.
587            //
588            // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
589            // space in a flow-control window to become negative. A sender MUST
590            // track the negative flow-control window and MUST NOT send new
591            // flow-controlled frames until it receives WINDOW_UPDATE frames that
592            // cause the flow-control window to become positive.
593
594            match target.cmp(&old_sz) {
595                Ordering::Less => {
596                    // We must decrease the (local) window on every open stream.
597                    let dec = old_sz - target;
598                    tracing::trace!("decrementing all windows; dec={}", dec);
599
600                    store.try_for_each(|mut stream| {
601                        stream
602                            .recv_flow
603                            .dec_recv_window(dec)
604                            .map_err(proto::Error::library_go_away)?;
605                        Ok::<_, proto::Error>(())
606                    })?;
607                }
608                Ordering::Greater => {
609                    // We must increase the (local) window on every open stream.
610                    let inc = target - old_sz;
611                    tracing::trace!("incrementing all windows; inc={}", inc);
612                    store.try_for_each(|mut stream| {
613                        // XXX: Shouldn't the peer have already noticed our
614                        // overflow and sent us a GOAWAY?
615                        stream
616                            .recv_flow
617                            .inc_window(inc)
618                            .map_err(proto::Error::library_go_away)?;
619                        stream
620                            .recv_flow
621                            .assign_capacity(inc)
622                            .map_err(proto::Error::library_go_away)?;
623                        Ok::<_, proto::Error>(())
624                    })?;
625                }
626                Ordering::Equal => (),
627            }
628        }
629
630        Ok(())
631    }
632
633    pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
634        if !stream.state.is_recv_end_stream() {
635            return false;
636        }
637
638        stream.pending_recv.is_empty()
639    }
640
641    pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
642        // could include padding
643        let sz = frame.flow_controlled_len();
644
645        // This should have been enforced at the codec::FramedRead layer, so
646        // this is just a sanity check.
647        assert!(sz <= MAX_WINDOW_SIZE as usize);
648
649        let sz = sz as WindowSize;
650
651        let is_ignoring_frame = stream.state.is_local_error();
652
653        if !is_ignoring_frame && !stream.state.is_recv_streaming() {
654            // TODO: There are cases where this can be a stream error of
655            // STREAM_CLOSED instead...
656
657            // Receiving a DATA frame when not expecting one is a protocol
658            // error.
659            proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
660            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
661        }
662
663        tracing::trace!(
664            "recv_data; size={}; connection={}; stream={}",
665            sz,
666            self.flow.window_size(),
667            stream.recv_flow.window_size()
668        );
669
670        if is_ignoring_frame {
671            tracing::trace!(
672                "recv_data; frame ignored on locally reset {:?} for some time",
673                stream.id,
674            );
675            return self.ignore_data(sz);
676        }
677
678        // Ensure that there is enough capacity on the connection before acting
679        // on the stream.
680        self.consume_connection_window(sz)?;
681
682        if stream.recv_flow.window_size() < sz {
683            // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
684            // > A receiver MAY respond with a stream error (Section 5.4.2) or
685            // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
686            // > it is unable to accept a frame.
687            //
688            // So, for violating the **stream** window, we can send either a
689            // stream or connection error. We've opted to send a stream
690            // error.
691            return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
692        }
693
694        // use payload len, padding doesn't count for content-length
695        if stream.dec_content_length(frame.payload().len()).is_err() {
696            proto_err!(stream:
697                "recv_data: content-length overflow; stream={:?}; len={:?}",
698                stream.id,
699                frame.payload().len(),
700            );
701            return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
702        }
703
704        if frame.is_end_stream() {
705            if stream.ensure_content_length_zero().is_err() {
706                proto_err!(stream:
707                    "recv_data: content-length underflow; stream={:?}; len={:?}",
708                    stream.id,
709                    frame.payload().len(),
710                );
711                return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
712            }
713
714            if stream.state.recv_close().is_err() {
715                proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
716                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
717            }
718        }
719
720        // Received a frame, but no one cared about it. fix issue#648
721        if !stream.is_recv {
722            tracing::trace!(
723                "recv_data; frame ignored on stream release {:?} for some time",
724                stream.id,
725            );
726            self.release_connection_capacity(sz, &mut None);
727            return Ok(());
728        }
729
730        // Update stream level flow control
731        stream
732            .recv_flow
733            .send_data(sz)
734            .map_err(proto::Error::library_go_away)?;
735
736        // Track the data as in-flight
737        stream.in_flight_recv_data += sz;
738
739        // Auto-release padding overhead (pad_len field + padding bytes),
740        // since the user only sees the data payload via `payload()`.
741        let padding = (frame.flow_controlled_len() - frame.payload().len()) as WindowSize;
742        if padding > 0 {
743            tracing::trace!(
744                "recv_data; auto-releasing padding of {:?} for {:?}",
745                padding,
746                stream.id,
747            );
748            let _res = self.release_capacity(padding, stream, &mut None);
749            // cannot fail, we JUST added more in_flight data above.
750            debug_assert!(_res.is_ok());
751        }
752
753        let event = Event::Data(frame.into_payload());
754
755        // Push the frame onto the recv buffer
756        stream.pending_recv.push_back(&mut self.buffer, event);
757        stream.notify_recv();
758
759        Ok(())
760    }
761
762    pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
763        // Ensure that there is enough capacity on the connection...
764        self.consume_connection_window(sz)?;
765
766        // Since we are ignoring this frame,
767        // we aren't returning the frame to the user. That means they
768        // have no way to release the capacity back to the connection. So
769        // we have to release it automatically.
770        //
771        // This call doesn't send a WINDOW_UPDATE immediately, just marks
772        // the capacity as available to be reclaimed. When the available
773        // capacity meets a threshold, a WINDOW_UPDATE is then sent.
774        self.release_connection_capacity(sz, &mut None);
775        Ok(())
776    }
777
778    pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
779        if self.flow.window_size() < sz {
780            tracing::debug!(
781                "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
782                self.flow.window_size(),
783                sz,
784            );
785            return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
786        }
787
788        // Update connection level flow control
789        self.flow.send_data(sz).map_err(Error::library_go_away)?;
790
791        // Track the data as in-flight
792        self.in_flight_data += sz;
793        Ok(())
794    }
795
796    pub fn recv_push_promise(
797        &mut self,
798        frame: frame::PushPromise,
799        stream: &mut store::Ptr,
800    ) -> Result<(), Error> {
801        stream.state.reserve_remote()?;
802        if frame.is_over_size() {
803            // A frame is over size if the decoded header block was bigger than
804            // SETTINGS_MAX_HEADER_LIST_SIZE.
805            //
806            // > A server that receives a larger header block than it is willing
807            // > to handle can send an HTTP 431 (Request Header Fields Too
808            // > Large) status code [RFC6585]. A client can discard responses
809            // > that it cannot process.
810            //
811            // So, if peer is a server, we'll send a 431. In either case,
812            // an error is recorded, which will send a PROTOCOL_ERROR,
813            // since we don't want any of the data frames either.
814            tracing::debug!(
815                "stream error PROTOCOL_ERROR -- recv_push_promise: \
816                 headers frame is over size; promised_id={:?};",
817                frame.promised_id(),
818            );
819            return Err(Error::library_reset(
820                frame.promised_id(),
821                Reason::PROTOCOL_ERROR,
822            ));
823        }
824
825        let promised_id = frame.promised_id();
826        let (pseudo, fields) = frame.into_parts();
827        let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
828
829        if let Err(e) = frame::PushPromise::validate_request(&req) {
830            use PushPromiseHeaderError::*;
831            match e {
832                NotSafeAndCacheable => proto_err!(
833                    stream:
834                    "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
835                    req.method(),
836                    promised_id,
837                ),
838                InvalidContentLength(e) => proto_err!(
839                    stream:
840                    "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
841                    e,
842                    promised_id,
843                ),
844            }
845            return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
846        }
847
848        use super::peer::PollMessage::*;
849        stream
850            .pending_recv
851            .push_back(&mut self.buffer, Event::Headers(Server(req)));
852        stream.notify_recv();
853        stream.notify_push();
854        Ok(())
855    }
856
857    /// Ensures that `id` is not in the `Idle` state.
858    pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
859        if let Ok(next) = self.next_stream_id {
860            if id >= next {
861                tracing::debug!(
862                    "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
863                    id
864                );
865                return Err(Reason::PROTOCOL_ERROR);
866            }
867        }
868        // if next_stream_id is overflowed, that's ok.
869
870        Ok(())
871    }
872
873    /// Handle remote sending an explicit RST_STREAM.
874    pub fn recv_reset(
875        &mut self,
876        frame: frame::Reset,
877        stream: &mut Stream,
878        counts: &mut Counts,
879    ) -> Result<(), Error> {
880        // Reseting a stream that the user hasn't accepted is possible,
881        // but should be done with care. These streams will continue
882        // to take up memory in the accept queue, but will no longer be
883        // counted as "concurrent" streams.
884        //
885        // So, we have a separate limit for these.
886        //
887        // See https://github.com/hyperium/hyper/issues/2877
888        if stream.is_pending_accept {
889            if counts.can_inc_num_remote_reset_streams() {
890                counts.inc_num_remote_reset_streams();
891            } else {
892                tracing::warn!(
893                    "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
894                    counts.max_remote_reset_streams(),
895                );
896                return Err(Error::library_go_away_data(
897                    Reason::ENHANCE_YOUR_CALM,
898                    "too_many_resets",
899                ));
900            }
901        }
902
903        // Notify the stream
904        stream.state.recv_reset(frame, stream.is_pending_send);
905
906        stream.notify_send();
907        stream.notify_recv();
908        stream.notify_push();
909
910        Ok(())
911    }
912
913    /// Handle a connection-level error
914    pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
915        // Receive an error
916        stream.state.handle_error(err);
917
918        // If a receiver is waiting, notify it
919        stream.notify_send();
920        stream.notify_recv();
921        stream.notify_push();
922    }
923
924    pub fn go_away(&mut self, last_processed_id: StreamId) {
925        assert!(self.max_stream_id >= last_processed_id);
926        self.max_stream_id = last_processed_id;
927    }
928
929    pub fn recv_eof(&mut self, stream: &mut Stream) {
930        stream.state.recv_eof();
931        stream.notify_send();
932        stream.notify_recv();
933        stream.notify_push();
934    }
935
936    pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
937        while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
938            // drop it
939        }
940    }
941
942    /// Get the max ID of streams we can receive.
943    ///
944    /// This gets lowered if we send a GOAWAY frame.
945    pub fn max_stream_id(&self) -> StreamId {
946        self.max_stream_id
947    }
948
949    pub fn next_stream_id(&self) -> Result<StreamId, Error> {
950        if let Ok(id) = self.next_stream_id {
951            Ok(id)
952        } else {
953            Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
954        }
955    }
956
957    pub fn may_have_created_stream(&self, id: StreamId) -> bool {
958        if let Ok(next_id) = self.next_stream_id {
959            // Peer::is_local_init should have been called beforehand
960            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
961            id < next_id
962        } else {
963            true
964        }
965    }
966
967    pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
968        if let Ok(next_id) = self.next_stream_id {
969            // !Peer::is_local_init should have been called beforehand
970            debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
971            if id >= next_id {
972                self.next_stream_id = id.next_id();
973            }
974        }
975    }
976
977    /// Returns true if the remote peer can reserve a stream with the given ID.
978    pub fn ensure_can_reserve(&self) -> Result<(), Error> {
979        if !self.is_push_enabled {
980            proto_err!(conn: "recv_push_promise: push is disabled");
981            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
982        }
983
984        Ok(())
985    }
986
987    /// Add a locally reset stream to queue to be eventually reaped.
988    pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
989        if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
990            return;
991        }
992
993        if counts.can_inc_num_reset_streams() {
994            counts.inc_num_reset_streams();
995            tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
996            self.pending_reset_expired.push(stream);
997        } else {
998            tracing::trace!(
999                "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
1000                stream.id
1001            );
1002        }
1003    }
1004
1005    /// Send any pending refusals.
1006    pub fn send_pending_refusal<T, B>(
1007        &mut self,
1008        cx: &mut Context,
1009        dst: &mut Codec<T, Prioritized<B>>,
1010    ) -> Poll<io::Result<()>>
1011    where
1012        T: AsyncWrite + Unpin,
1013        B: Buf,
1014    {
1015        if let Some(stream_id) = self.refused {
1016            ready!(dst.poll_ready(cx))?;
1017
1018            // Create the RST_STREAM frame
1019            let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
1020
1021            // Buffer the frame
1022            dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
1023        }
1024
1025        self.refused = None;
1026
1027        Poll::Ready(Ok(()))
1028    }
1029
1030    pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1031        if !self.pending_reset_expired.is_empty() {
1032            let now = Instant::now();
1033            let reset_duration = self.reset_duration;
1034            while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
1035                let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
1036                // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
1037                // subtraction can panic (because, on some platforms, `Instant` isn't actually
1038                // monotonic). We use a saturating operation to avoid this panic here.
1039                now.saturating_duration_since(reset_at) > reset_duration
1040            }) {
1041                counts.transition_after(stream, true);
1042            }
1043        }
1044    }
1045
1046    pub fn clear_queues(
1047        &mut self,
1048        clear_pending_accept: bool,
1049        store: &mut Store,
1050        counts: &mut Counts,
1051    ) {
1052        self.clear_stream_window_update_queue(store, counts);
1053        self.clear_all_reset_streams(store, counts);
1054
1055        if clear_pending_accept {
1056            self.clear_all_pending_accept(store, counts);
1057        }
1058    }
1059
1060    fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
1061        while let Some(stream) = self.pending_window_updates.pop(store) {
1062            counts.transition(stream, |_, stream| {
1063                tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
1064            })
1065        }
1066    }
1067
1068    /// Called on EOF
1069    fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1070        while let Some(stream) = self.pending_reset_expired.pop(store) {
1071            counts.transition_after(stream, true);
1072        }
1073    }
1074
1075    fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
1076        while let Some(stream) = self.pending_accept.pop(store) {
1077            counts.transition_after(stream, false);
1078        }
1079    }
1080
1081    pub fn poll_complete<T, B>(
1082        &mut self,
1083        cx: &mut Context,
1084        store: &mut Store,
1085        counts: &mut Counts,
1086        dst: &mut Codec<T, Prioritized<B>>,
1087    ) -> Poll<io::Result<()>>
1088    where
1089        T: AsyncWrite + Unpin,
1090        B: Buf,
1091    {
1092        // Send any pending connection level window updates
1093        ready!(self.send_connection_window_update(cx, dst))?;
1094
1095        // Send any pending stream level window updates
1096        ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1097
1098        Poll::Ready(Ok(()))
1099    }
1100
1101    /// Send connection level window update
1102    fn send_connection_window_update<T, B>(
1103        &mut self,
1104        cx: &mut Context,
1105        dst: &mut Codec<T, Prioritized<B>>,
1106    ) -> Poll<io::Result<()>>
1107    where
1108        T: AsyncWrite + Unpin,
1109        B: Buf,
1110    {
1111        if let Some(incr) = self.flow.unclaimed_capacity() {
1112            let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1113
1114            // Ensure the codec has capacity
1115            ready!(dst.poll_ready(cx))?;
1116
1117            // Buffer the WINDOW_UPDATE frame
1118            dst.buffer(frame.into())
1119                .expect("invalid WINDOW_UPDATE frame");
1120
1121            // Update flow control
1122            self.flow
1123                .inc_window(incr)
1124                .expect("unexpected flow control state");
1125        }
1126
1127        Poll::Ready(Ok(()))
1128    }
1129
1130    /// Send stream level window update
1131    pub fn send_stream_window_updates<T, B>(
1132        &mut self,
1133        cx: &mut Context,
1134        store: &mut Store,
1135        counts: &mut Counts,
1136        dst: &mut Codec<T, Prioritized<B>>,
1137    ) -> Poll<io::Result<()>>
1138    where
1139        T: AsyncWrite + Unpin,
1140        B: Buf,
1141    {
1142        loop {
1143            // Ensure the codec has capacity
1144            ready!(dst.poll_ready(cx))?;
1145
1146            // Get the next stream
1147            let stream = match self.pending_window_updates.pop(store) {
1148                Some(stream) => stream,
1149                None => return Poll::Ready(Ok(())),
1150            };
1151
1152            counts.transition(stream, |_, stream| {
1153                tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1154                debug_assert!(!stream.is_pending_window_update);
1155
1156                if !stream.state.is_recv_streaming() {
1157                    // No need to send window updates on the stream if the stream is
1158                    // no longer receiving data.
1159                    //
1160                    // TODO: is this correct? We could possibly send a window
1161                    // update on a ReservedRemote stream if we already know
1162                    // we want to stream the data faster...
1163                    return;
1164                }
1165
1166                // TODO: de-dup
1167                if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1168                    // Create the WINDOW_UPDATE frame
1169                    let frame = frame::WindowUpdate::new(stream.id, incr);
1170
1171                    // Buffer it
1172                    dst.buffer(frame.into())
1173                        .expect("invalid WINDOW_UPDATE frame");
1174
1175                    // Update flow control
1176                    stream
1177                        .recv_flow
1178                        .inc_window(incr)
1179                        .expect("unexpected flow control state");
1180                }
1181            })
1182        }
1183    }
1184
1185    pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1186        self.pending_accept.pop(store).map(|ptr| ptr.key())
1187    }
1188
1189    pub fn poll_data(
1190        &mut self,
1191        cx: &Context,
1192        stream: &mut Stream,
1193    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1194        match stream.pending_recv.pop_front(&mut self.buffer) {
1195            Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1196            Some(event) => {
1197                // Frame is trailer
1198                stream.pending_recv.push_front(&mut self.buffer, event);
1199
1200                // Notify the recv task. This is done just in case
1201                // `poll_trailers` was called.
1202                //
1203                // It is very likely that `notify_recv` will just be a no-op (as
1204                // the task will be None), so this isn't really much of a
1205                // performance concern. It also means we don't have to track
1206                // state to see if `poll_trailers` was called before `poll_data`
1207                // returned `None`.
1208                stream.notify_recv();
1209
1210                // No more data frames
1211                Poll::Ready(None)
1212            }
1213            None => self.schedule_recv(cx, stream),
1214        }
1215    }
1216
1217    pub fn poll_trailers(
1218        &mut self,
1219        cx: &Context,
1220        stream: &mut Stream,
1221    ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1222        match stream.pending_recv.pop_front(&mut self.buffer) {
1223            Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1224            Some(event) => {
1225                // Frame is not trailers.. not ready to poll trailers yet.
1226                stream.pending_recv.push_front(&mut self.buffer, event);
1227
1228                Poll::Pending
1229            }
1230            None => self.schedule_recv(cx, stream),
1231        }
1232    }
1233
1234    fn schedule_recv<T>(
1235        &mut self,
1236        cx: &Context,
1237        stream: &mut Stream,
1238    ) -> Poll<Option<Result<T, proto::Error>>> {
1239        if stream.state.ensure_recv_open()? {
1240            // Request to get notified once more frames arrive
1241            stream.recv_task = Some(cx.waker().clone());
1242            Poll::Pending
1243        } else {
1244            // No more frames will be received
1245            Poll::Ready(None)
1246        }
1247    }
1248}
1249
1250// ===== impl Open =====
1251
1252impl Open {
1253    pub fn is_push_promise(&self) -> bool {
1254        matches!(*self, Self::PushPromise)
1255    }
1256}
1257
1258// ===== impl RecvHeaderBlockError =====
1259
1260impl<T> From<Error> for RecvHeaderBlockError<T> {
1261    fn from(err: Error) -> Self {
1262        RecvHeaderBlockError::State(err)
1263    }
1264}