1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 init_window_sz: WindowSize,
17
18 flow: FlowControl,
20
21 in_flight_data: WindowSize,
23
24 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 last_processed_id: StreamId,
29
30 max_stream_id: StreamId,
38
39 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 pending_accept: store::Queue<stream::NextAccept>,
44
45 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 reset_duration: Duration,
50
51 buffer: Buffer<Event>,
53
54 refused: Option<StreamId>,
56
57 is_push_enabled: bool,
59
60 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69 InformationalHeaders(peer::PollMessage),
70}
71
72#[derive(Debug)]
73pub(super) enum RecvHeaderBlockError<T> {
74 Oversize(T),
75 State(Error),
76}
77
78#[derive(Debug)]
79pub(crate) enum Open {
80 PushPromise,
81 Headers,
82}
83
84impl Recv {
85 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
86 let next_stream_id = if peer.is_server() { 1 } else { 2 };
87
88 let mut flow = FlowControl::new();
89
90 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
93 .expect("invalid initial remote window size");
94 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
95
96 Recv {
97 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
98 flow,
99 in_flight_data: 0 as WindowSize,
100 next_stream_id: Ok(next_stream_id.into()),
101 pending_window_updates: store::Queue::new(),
102 last_processed_id: StreamId::ZERO,
103 max_stream_id: StreamId::MAX,
104 pending_accept: store::Queue::new(),
105 pending_reset_expired: store::Queue::new(),
106 reset_duration: config.local_reset_duration,
107 buffer: Buffer::new(),
108 refused: None,
109 is_push_enabled: config.local_push_enabled,
110 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
111 }
112 }
113
114 pub fn init_window_sz(&self) -> WindowSize {
116 self.init_window_sz
117 }
118
119 pub fn last_processed_id(&self) -> StreamId {
121 self.last_processed_id
122 }
123
124 pub fn open(
128 &mut self,
129 id: StreamId,
130 mode: Open,
131 counts: &mut Counts,
132 ) -> Result<Option<StreamId>, Error> {
133 assert!(self.refused.is_none());
134
135 counts.peer().ensure_can_open(id, mode)?;
136
137 let next_id = self.next_stream_id()?;
138 if id < next_id {
139 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
140 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
141 }
142
143 self.next_stream_id = id.next_id();
144
145 if !counts.can_inc_num_recv_streams() {
146 self.refused = Some(id);
147 return Ok(None);
148 }
149
150 Ok(Some(id))
151 }
152
153 pub fn recv_headers(
157 &mut self,
158 frame: frame::Headers,
159 stream: &mut store::Ptr,
160 counts: &mut Counts,
161 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
162 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
163 let is_initial = stream.state.recv_open(&frame)?;
164
165 if is_initial {
166 if frame.stream_id() > self.last_processed_id {
168 self.last_processed_id = frame.stream_id();
169 }
170
171 counts.inc_num_recv_streams(stream);
173 }
174
175 if !stream.content_length.is_head() {
176 use super::stream::ContentLength;
177 use http::header;
178
179 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
180 let content_length = match frame::parse_u64(content_length.as_bytes()) {
181 Ok(v) => v,
182 Err(_) => {
183 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
184 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
185 }
186 };
187
188 stream.content_length = ContentLength::Remaining(content_length);
189 if frame.is_end_stream()
192 && content_length > 0
193 && frame
194 .pseudo()
195 .status
196 .map_or(true, |status| status != 204 && status != 304)
197 {
198 proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
199 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
200 }
201 }
202 }
203
204 if frame.is_over_size() {
205 tracing::debug!(
217 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
218 recv_headers: frame is over size; stream={:?}",
219 stream.id
220 );
221 return if counts.peer().is_server() && is_initial {
222 let mut res = frame::Headers::new(
223 stream.id,
224 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
225 HeaderMap::new(),
226 );
227 res.set_end_stream();
228 Err(RecvHeaderBlockError::Oversize(Some(res)))
229 } else {
230 Err(RecvHeaderBlockError::Oversize(None))
231 };
232 }
233
234 let stream_id = frame.stream_id();
235 let (pseudo, fields) = frame.into_parts();
236
237 if pseudo.protocol.is_some()
238 && counts.peer().is_server()
239 && !self.is_extended_connect_protocol_enabled
240 {
241 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
242 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
243 }
244
245 if pseudo.status.is_some() && counts.peer().is_server() {
246 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
247 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
248 }
249
250 if !pseudo.is_informational() {
251 let message = counts
252 .peer()
253 .convert_poll_message(pseudo, fields, stream_id)?;
254
255 stream
257 .pending_recv
258 .push_back(&mut self.buffer, Event::Headers(message));
259 stream.notify_recv();
260
261 if counts.peer().is_server() {
264 self.pending_accept.push(stream);
267 }
268 } else {
269 let message = counts
272 .peer()
273 .convert_poll_message(pseudo, fields, stream_id)?;
274
275 tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276
277 stream
280 .pending_recv
281 .push_back(&mut self.buffer, Event::InformationalHeaders(message));
282 stream.notify_recv();
283 }
284
285 Ok(())
286 }
287
288 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
295 use super::peer::PollMessage::*;
296
297 match stream.pending_recv.pop_front(&mut self.buffer) {
298 Some(Event::Headers(Server(request))) => request,
299 _ => unreachable!("server stream queue must start with Headers"),
300 }
301 }
302
303 pub fn poll_pushed(
305 &mut self,
306 cx: &Context,
307 stream: &mut store::Ptr,
308 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
309 use super::peer::PollMessage::*;
310
311 let mut ppp = stream.pending_push_promises.take();
312 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
313 match pushed.pending_recv.pop_front(&mut self.buffer) {
314 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
315 _ => panic!("Headers not set on pushed stream"),
318 }
319 });
320 stream.pending_push_promises = ppp;
321 if let Some(p) = pushed {
322 Poll::Ready(Some(Ok(p)))
323 } else {
324 let is_open = stream.state.ensure_recv_open()?;
325
326 if is_open {
327 stream.push_task = Some(cx.waker().clone());
328 Poll::Pending
329 } else {
330 Poll::Ready(None)
331 }
332 }
333 }
334
335 pub fn poll_response(
337 &mut self,
338 cx: &Context,
339 stream: &mut store::Ptr,
340 ) -> Poll<Result<Response<()>, proto::Error>> {
341 use super::peer::PollMessage::*;
342
343 loop {
345 match stream.pending_recv.pop_front(&mut self.buffer) {
346 Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347 Some(Event::InformationalHeaders(_)) => {
348 tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349 continue;
350 }
351 Some(_) => panic!("poll_response called after response returned"),
352 None => {
353 if !stream.state.ensure_recv_open()? {
354 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
355 return Poll::Ready(Err(Error::library_reset(
356 stream.id,
357 Reason::PROTOCOL_ERROR,
358 )));
359 }
360
361 stream.recv_task = Some(cx.waker().clone());
362 return Poll::Pending;
363 }
364 }
365 }
366 }
367
368 pub fn poll_informational(
370 &mut self,
371 cx: &Context,
372 stream: &mut store::Ptr,
373 ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374 use super::peer::PollMessage::*;
375
376 if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379 match event {
380 Event::Headers(Client(response)) => {
381 stream
383 .pending_recv
384 .push_front(&mut self.buffer, Event::Headers(Client(response)));
385 return Poll::Ready(None);
386 }
387 Event::InformationalHeaders(Client(response)) => {
388 return Poll::Ready(Some(Ok(response)));
390 }
391 other => {
392 stream.pending_recv.push_front(&mut self.buffer, other);
394 }
395 }
396 }
397
398 if stream.state.ensure_recv_open()? {
400 stream.recv_task = Some(cx.waker().clone());
402 Poll::Pending
403 } else {
404 Poll::Ready(None)
406 }
407 }
408
409 pub fn recv_trailers(
411 &mut self,
412 frame: frame::Headers,
413 stream: &mut store::Ptr,
414 ) -> Result<(), Error> {
415 stream.state.recv_close()?;
417
418 if stream.ensure_content_length_zero().is_err() {
419 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
420 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
421 }
422
423 let trailers = frame.into_fields();
424
425 stream
427 .pending_recv
428 .push_back(&mut self.buffer, Event::Trailers(trailers));
429 stream.notify_recv();
430
431 Ok(())
432 }
433
434 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
436 tracing::trace!(
437 "release_connection_capacity; size={}, connection in_flight_data={}",
438 capacity,
439 self.in_flight_data,
440 );
441
442 self.in_flight_data -= capacity;
444
445 let _res = self.flow.assign_capacity(capacity);
448 debug_assert!(_res.is_ok());
449
450 if self.flow.unclaimed_capacity().is_some() {
451 if let Some(task) = task.take() {
452 task.wake();
453 }
454 }
455 }
456
457 pub fn release_capacity(
459 &mut self,
460 capacity: WindowSize,
461 stream: &mut store::Ptr,
462 task: &mut Option<Waker>,
463 ) -> Result<(), UserError> {
464 tracing::trace!("release_capacity; size={}", capacity);
465
466 if capacity > stream.in_flight_recv_data {
467 return Err(UserError::ReleaseCapacityTooBig);
468 }
469
470 self.release_connection_capacity(capacity, task);
471
472 stream.in_flight_recv_data -= capacity;
474
475 let _res = stream.recv_flow.assign_capacity(capacity);
478 debug_assert!(_res.is_ok());
479
480 if stream.recv_flow.unclaimed_capacity().is_some() {
481 self.pending_window_updates.push(stream);
483
484 if let Some(task) = task.take() {
485 task.wake();
486 }
487 }
488
489 Ok(())
490 }
491
492 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
494 debug_assert_eq!(stream.ref_count, 0);
495
496 if stream.in_flight_recv_data == 0 {
497 return;
498 }
499
500 tracing::trace!(
501 "auto-release closed stream ({:?}) capacity: {:?}",
502 stream.id,
503 stream.in_flight_recv_data,
504 );
505
506 self.release_connection_capacity(stream.in_flight_recv_data, task);
507 stream.in_flight_recv_data = 0;
508
509 self.clear_recv_buffer(stream);
510 }
511
512 pub fn set_target_connection_window(
525 &mut self,
526 target: WindowSize,
527 task: &mut Option<Waker>,
528 ) -> Result<(), Reason> {
529 tracing::trace!(
530 "set_target_connection_window; target={}; available={}, reserved={}",
531 target,
532 self.flow.available(),
533 self.in_flight_data,
534 );
535
536 let current = self
542 .flow
543 .available()
544 .add(self.in_flight_data)?
545 .checked_size();
546 if target > current {
547 self.flow.assign_capacity(target - current)?;
548 } else {
549 self.flow.claim_capacity(current - target)?;
550 }
551
552 if self.flow.unclaimed_capacity().is_some() {
556 if let Some(task) = task.take() {
557 task.wake();
558 }
559 }
560 Ok(())
561 }
562
563 pub(crate) fn apply_local_settings(
564 &mut self,
565 settings: &frame::Settings,
566 store: &mut Store,
567 ) -> Result<(), proto::Error> {
568 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
569 self.is_extended_connect_protocol_enabled = val;
570 }
571
572 if let Some(target) = settings.initial_window_size() {
573 let old_sz = self.init_window_sz;
574 self.init_window_sz = target;
575
576 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
577
578 match target.cmp(&old_sz) {
595 Ordering::Less => {
596 let dec = old_sz - target;
598 tracing::trace!("decrementing all windows; dec={}", dec);
599
600 store.try_for_each(|mut stream| {
601 stream
602 .recv_flow
603 .dec_recv_window(dec)
604 .map_err(proto::Error::library_go_away)?;
605 Ok::<_, proto::Error>(())
606 })?;
607 }
608 Ordering::Greater => {
609 let inc = target - old_sz;
611 tracing::trace!("incrementing all windows; inc={}", inc);
612 store.try_for_each(|mut stream| {
613 stream
616 .recv_flow
617 .inc_window(inc)
618 .map_err(proto::Error::library_go_away)?;
619 stream
620 .recv_flow
621 .assign_capacity(inc)
622 .map_err(proto::Error::library_go_away)?;
623 Ok::<_, proto::Error>(())
624 })?;
625 }
626 Ordering::Equal => (),
627 }
628 }
629
630 Ok(())
631 }
632
633 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
634 if !stream.state.is_recv_end_stream() {
635 return false;
636 }
637
638 stream.pending_recv.is_empty()
639 }
640
641 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
642 let sz = frame.flow_controlled_len();
644
645 assert!(sz <= MAX_WINDOW_SIZE as usize);
648
649 let sz = sz as WindowSize;
650
651 let is_ignoring_frame = stream.state.is_local_error();
652
653 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
654 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
660 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
661 }
662
663 tracing::trace!(
664 "recv_data; size={}; connection={}; stream={}",
665 sz,
666 self.flow.window_size(),
667 stream.recv_flow.window_size()
668 );
669
670 if is_ignoring_frame {
671 tracing::trace!(
672 "recv_data; frame ignored on locally reset {:?} for some time",
673 stream.id,
674 );
675 return self.ignore_data(sz);
676 }
677
678 self.consume_connection_window(sz)?;
681
682 if stream.recv_flow.window_size() < sz {
683 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
692 }
693
694 if stream.dec_content_length(frame.payload().len()).is_err() {
696 proto_err!(stream:
697 "recv_data: content-length overflow; stream={:?}; len={:?}",
698 stream.id,
699 frame.payload().len(),
700 );
701 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
702 }
703
704 if frame.is_end_stream() {
705 if stream.ensure_content_length_zero().is_err() {
706 proto_err!(stream:
707 "recv_data: content-length underflow; stream={:?}; len={:?}",
708 stream.id,
709 frame.payload().len(),
710 );
711 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
712 }
713
714 if stream.state.recv_close().is_err() {
715 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
716 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
717 }
718 }
719
720 if !stream.is_recv {
722 tracing::trace!(
723 "recv_data; frame ignored on stream release {:?} for some time",
724 stream.id,
725 );
726 self.release_connection_capacity(sz, &mut None);
727 return Ok(());
728 }
729
730 stream
732 .recv_flow
733 .send_data(sz)
734 .map_err(proto::Error::library_go_away)?;
735
736 stream.in_flight_recv_data += sz;
738
739 let padding = (frame.flow_controlled_len() - frame.payload().len()) as WindowSize;
742 if padding > 0 {
743 tracing::trace!(
744 "recv_data; auto-releasing padding of {:?} for {:?}",
745 padding,
746 stream.id,
747 );
748 let _res = self.release_capacity(padding, stream, &mut None);
749 debug_assert!(_res.is_ok());
751 }
752
753 let event = Event::Data(frame.into_payload());
754
755 stream.pending_recv.push_back(&mut self.buffer, event);
757 stream.notify_recv();
758
759 Ok(())
760 }
761
762 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
763 self.consume_connection_window(sz)?;
765
766 self.release_connection_capacity(sz, &mut None);
775 Ok(())
776 }
777
778 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
779 if self.flow.window_size() < sz {
780 tracing::debug!(
781 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
782 self.flow.window_size(),
783 sz,
784 );
785 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
786 }
787
788 self.flow.send_data(sz).map_err(Error::library_go_away)?;
790
791 self.in_flight_data += sz;
793 Ok(())
794 }
795
796 pub fn recv_push_promise(
797 &mut self,
798 frame: frame::PushPromise,
799 stream: &mut store::Ptr,
800 ) -> Result<(), Error> {
801 stream.state.reserve_remote()?;
802 if frame.is_over_size() {
803 tracing::debug!(
815 "stream error PROTOCOL_ERROR -- recv_push_promise: \
816 headers frame is over size; promised_id={:?};",
817 frame.promised_id(),
818 );
819 return Err(Error::library_reset(
820 frame.promised_id(),
821 Reason::PROTOCOL_ERROR,
822 ));
823 }
824
825 let promised_id = frame.promised_id();
826 let (pseudo, fields) = frame.into_parts();
827 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
828
829 if let Err(e) = frame::PushPromise::validate_request(&req) {
830 use PushPromiseHeaderError::*;
831 match e {
832 NotSafeAndCacheable => proto_err!(
833 stream:
834 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
835 req.method(),
836 promised_id,
837 ),
838 InvalidContentLength(e) => proto_err!(
839 stream:
840 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
841 e,
842 promised_id,
843 ),
844 }
845 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
846 }
847
848 use super::peer::PollMessage::*;
849 stream
850 .pending_recv
851 .push_back(&mut self.buffer, Event::Headers(Server(req)));
852 stream.notify_recv();
853 stream.notify_push();
854 Ok(())
855 }
856
857 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
859 if let Ok(next) = self.next_stream_id {
860 if id >= next {
861 tracing::debug!(
862 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
863 id
864 );
865 return Err(Reason::PROTOCOL_ERROR);
866 }
867 }
868 Ok(())
871 }
872
873 pub fn recv_reset(
875 &mut self,
876 frame: frame::Reset,
877 stream: &mut Stream,
878 counts: &mut Counts,
879 ) -> Result<(), Error> {
880 if stream.is_pending_accept {
889 if counts.can_inc_num_remote_reset_streams() {
890 counts.inc_num_remote_reset_streams();
891 } else {
892 tracing::warn!(
893 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
894 counts.max_remote_reset_streams(),
895 );
896 return Err(Error::library_go_away_data(
897 Reason::ENHANCE_YOUR_CALM,
898 "too_many_resets",
899 ));
900 }
901 }
902
903 stream.state.recv_reset(frame, stream.is_pending_send);
905
906 stream.notify_send();
907 stream.notify_recv();
908 stream.notify_push();
909
910 Ok(())
911 }
912
913 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
915 stream.state.handle_error(err);
917
918 stream.notify_send();
920 stream.notify_recv();
921 stream.notify_push();
922 }
923
924 pub fn go_away(&mut self, last_processed_id: StreamId) {
925 assert!(self.max_stream_id >= last_processed_id);
926 self.max_stream_id = last_processed_id;
927 }
928
929 pub fn recv_eof(&mut self, stream: &mut Stream) {
930 stream.state.recv_eof();
931 stream.notify_send();
932 stream.notify_recv();
933 stream.notify_push();
934 }
935
936 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
937 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
938 }
940 }
941
942 pub fn max_stream_id(&self) -> StreamId {
946 self.max_stream_id
947 }
948
949 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
950 if let Ok(id) = self.next_stream_id {
951 Ok(id)
952 } else {
953 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
954 }
955 }
956
957 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
958 if let Ok(next_id) = self.next_stream_id {
959 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
961 id < next_id
962 } else {
963 true
964 }
965 }
966
967 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
968 if let Ok(next_id) = self.next_stream_id {
969 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
971 if id >= next_id {
972 self.next_stream_id = id.next_id();
973 }
974 }
975 }
976
977 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
979 if !self.is_push_enabled {
980 proto_err!(conn: "recv_push_promise: push is disabled");
981 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
982 }
983
984 Ok(())
985 }
986
987 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
989 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
990 return;
991 }
992
993 if counts.can_inc_num_reset_streams() {
994 counts.inc_num_reset_streams();
995 tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
996 self.pending_reset_expired.push(stream);
997 } else {
998 tracing::trace!(
999 "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
1000 stream.id
1001 );
1002 }
1003 }
1004
1005 pub fn send_pending_refusal<T, B>(
1007 &mut self,
1008 cx: &mut Context,
1009 dst: &mut Codec<T, Prioritized<B>>,
1010 ) -> Poll<io::Result<()>>
1011 where
1012 T: AsyncWrite + Unpin,
1013 B: Buf,
1014 {
1015 if let Some(stream_id) = self.refused {
1016 ready!(dst.poll_ready(cx))?;
1017
1018 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
1020
1021 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
1023 }
1024
1025 self.refused = None;
1026
1027 Poll::Ready(Ok(()))
1028 }
1029
1030 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1031 if !self.pending_reset_expired.is_empty() {
1032 let now = Instant::now();
1033 let reset_duration = self.reset_duration;
1034 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
1035 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
1036 now.saturating_duration_since(reset_at) > reset_duration
1040 }) {
1041 counts.transition_after(stream, true);
1042 }
1043 }
1044 }
1045
1046 pub fn clear_queues(
1047 &mut self,
1048 clear_pending_accept: bool,
1049 store: &mut Store,
1050 counts: &mut Counts,
1051 ) {
1052 self.clear_stream_window_update_queue(store, counts);
1053 self.clear_all_reset_streams(store, counts);
1054
1055 if clear_pending_accept {
1056 self.clear_all_pending_accept(store, counts);
1057 }
1058 }
1059
1060 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
1061 while let Some(stream) = self.pending_window_updates.pop(store) {
1062 counts.transition(stream, |_, stream| {
1063 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
1064 })
1065 }
1066 }
1067
1068 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1070 while let Some(stream) = self.pending_reset_expired.pop(store) {
1071 counts.transition_after(stream, true);
1072 }
1073 }
1074
1075 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
1076 while let Some(stream) = self.pending_accept.pop(store) {
1077 counts.transition_after(stream, false);
1078 }
1079 }
1080
1081 pub fn poll_complete<T, B>(
1082 &mut self,
1083 cx: &mut Context,
1084 store: &mut Store,
1085 counts: &mut Counts,
1086 dst: &mut Codec<T, Prioritized<B>>,
1087 ) -> Poll<io::Result<()>>
1088 where
1089 T: AsyncWrite + Unpin,
1090 B: Buf,
1091 {
1092 ready!(self.send_connection_window_update(cx, dst))?;
1094
1095 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1097
1098 Poll::Ready(Ok(()))
1099 }
1100
1101 fn send_connection_window_update<T, B>(
1103 &mut self,
1104 cx: &mut Context,
1105 dst: &mut Codec<T, Prioritized<B>>,
1106 ) -> Poll<io::Result<()>>
1107 where
1108 T: AsyncWrite + Unpin,
1109 B: Buf,
1110 {
1111 if let Some(incr) = self.flow.unclaimed_capacity() {
1112 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1113
1114 ready!(dst.poll_ready(cx))?;
1116
1117 dst.buffer(frame.into())
1119 .expect("invalid WINDOW_UPDATE frame");
1120
1121 self.flow
1123 .inc_window(incr)
1124 .expect("unexpected flow control state");
1125 }
1126
1127 Poll::Ready(Ok(()))
1128 }
1129
1130 pub fn send_stream_window_updates<T, B>(
1132 &mut self,
1133 cx: &mut Context,
1134 store: &mut Store,
1135 counts: &mut Counts,
1136 dst: &mut Codec<T, Prioritized<B>>,
1137 ) -> Poll<io::Result<()>>
1138 where
1139 T: AsyncWrite + Unpin,
1140 B: Buf,
1141 {
1142 loop {
1143 ready!(dst.poll_ready(cx))?;
1145
1146 let stream = match self.pending_window_updates.pop(store) {
1148 Some(stream) => stream,
1149 None => return Poll::Ready(Ok(())),
1150 };
1151
1152 counts.transition(stream, |_, stream| {
1153 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1154 debug_assert!(!stream.is_pending_window_update);
1155
1156 if !stream.state.is_recv_streaming() {
1157 return;
1164 }
1165
1166 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1168 let frame = frame::WindowUpdate::new(stream.id, incr);
1170
1171 dst.buffer(frame.into())
1173 .expect("invalid WINDOW_UPDATE frame");
1174
1175 stream
1177 .recv_flow
1178 .inc_window(incr)
1179 .expect("unexpected flow control state");
1180 }
1181 })
1182 }
1183 }
1184
1185 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1186 self.pending_accept.pop(store).map(|ptr| ptr.key())
1187 }
1188
1189 pub fn poll_data(
1190 &mut self,
1191 cx: &Context,
1192 stream: &mut Stream,
1193 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1194 match stream.pending_recv.pop_front(&mut self.buffer) {
1195 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1196 Some(event) => {
1197 stream.pending_recv.push_front(&mut self.buffer, event);
1199
1200 stream.notify_recv();
1209
1210 Poll::Ready(None)
1212 }
1213 None => self.schedule_recv(cx, stream),
1214 }
1215 }
1216
1217 pub fn poll_trailers(
1218 &mut self,
1219 cx: &Context,
1220 stream: &mut Stream,
1221 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1222 match stream.pending_recv.pop_front(&mut self.buffer) {
1223 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1224 Some(event) => {
1225 stream.pending_recv.push_front(&mut self.buffer, event);
1227
1228 Poll::Pending
1229 }
1230 None => self.schedule_recv(cx, stream),
1231 }
1232 }
1233
1234 fn schedule_recv<T>(
1235 &mut self,
1236 cx: &Context,
1237 stream: &mut Stream,
1238 ) -> Poll<Option<Result<T, proto::Error>>> {
1239 if stream.state.ensure_recv_open()? {
1240 stream.recv_task = Some(cx.waker().clone());
1242 Poll::Pending
1243 } else {
1244 Poll::Ready(None)
1246 }
1247 }
1248}
1249
1250impl Open {
1253 pub fn is_push_promise(&self) -> bool {
1254 matches!(*self, Self::PushPromise)
1255 }
1256}
1257
1258impl<T> From<Error> for RecvHeaderBlockError<T> {
1261 fn from(err: Error) -> Self {
1262 RecvHeaderBlockError::State(err)
1263 }
1264}