1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 init_window_sz: WindowSize,
17
18 flow: FlowControl,
20
21 in_flight_data: WindowSize,
23
24 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 last_processed_id: StreamId,
29
30 max_stream_id: StreamId,
38
39 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 pending_accept: store::Queue<stream::NextAccept>,
44
45 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 reset_duration: Duration,
50
51 buffer: Buffer<Event>,
53
54 refused: Option<StreamId>,
56
57 is_push_enabled: bool,
59
60 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69 InformationalHeaders(peer::PollMessage),
70}
71
72#[derive(Debug)]
73pub(super) enum RecvHeaderBlockError<T> {
74 Oversize(T),
75 State(Error),
76}
77
78#[derive(Debug)]
79pub(crate) enum Open {
80 PushPromise,
81 Headers,
82}
83
84impl Recv {
85 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
86 let next_stream_id = if peer.is_server() { 1 } else { 2 };
87
88 let mut flow = FlowControl::new();
89
90 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
93 .expect("invalid initial remote window size");
94 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
95
96 Recv {
97 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
98 flow,
99 in_flight_data: 0 as WindowSize,
100 next_stream_id: Ok(next_stream_id.into()),
101 pending_window_updates: store::Queue::new(),
102 last_processed_id: StreamId::ZERO,
103 max_stream_id: StreamId::MAX,
104 pending_accept: store::Queue::new(),
105 pending_reset_expired: store::Queue::new(),
106 reset_duration: config.local_reset_duration,
107 buffer: Buffer::new(),
108 refused: None,
109 is_push_enabled: config.local_push_enabled,
110 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
111 }
112 }
113
114 pub fn init_window_sz(&self) -> WindowSize {
116 self.init_window_sz
117 }
118
119 pub fn last_processed_id(&self) -> StreamId {
121 self.last_processed_id
122 }
123
124 pub fn open(
128 &mut self,
129 id: StreamId,
130 mode: Open,
131 counts: &mut Counts,
132 ) -> Result<Option<StreamId>, Error> {
133 assert!(self.refused.is_none());
134
135 counts.peer().ensure_can_open(id, mode)?;
136
137 let next_id = self.next_stream_id()?;
138 if id < next_id {
139 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
140 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
141 }
142
143 self.next_stream_id = id.next_id();
144
145 if !counts.can_inc_num_recv_streams() {
146 self.refused = Some(id);
147 return Ok(None);
148 }
149
150 Ok(Some(id))
151 }
152
153 pub fn recv_headers(
157 &mut self,
158 frame: frame::Headers,
159 stream: &mut store::Ptr,
160 counts: &mut Counts,
161 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
162 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
163 let is_initial = stream.state.recv_open(&frame)?;
164
165 if is_initial {
166 if frame.stream_id() > self.last_processed_id {
168 self.last_processed_id = frame.stream_id();
169 }
170
171 counts.inc_num_recv_streams(stream);
173 }
174
175 if !stream.content_length.is_head() {
176 use super::stream::ContentLength;
177 use http::header;
178
179 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
180 let content_length = match frame::parse_u64(content_length.as_bytes()) {
181 Ok(v) => v,
182 Err(_) => {
183 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
184 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
185 }
186 };
187
188 stream.content_length = ContentLength::Remaining(content_length);
189 if frame.is_end_stream()
192 && content_length > 0
193 && frame
194 .pseudo()
195 .status
196 .map_or(true, |status| status != 204 && status != 304)
197 {
198 proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
199 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
200 }
201 }
202 }
203
204 if frame.is_over_size() {
205 tracing::debug!(
217 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
218 recv_headers: frame is over size; stream={:?}",
219 stream.id
220 );
221 return if counts.peer().is_server() && is_initial {
222 let mut res = frame::Headers::new(
223 stream.id,
224 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
225 HeaderMap::new(),
226 );
227 res.set_end_stream();
228 Err(RecvHeaderBlockError::Oversize(Some(res)))
229 } else {
230 Err(RecvHeaderBlockError::Oversize(None))
231 };
232 }
233
234 let stream_id = frame.stream_id();
235 let (pseudo, fields) = frame.into_parts();
236
237 if pseudo.protocol.is_some()
238 && counts.peer().is_server()
239 && !self.is_extended_connect_protocol_enabled
240 {
241 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
242 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
243 }
244
245 if pseudo.status.is_some() && counts.peer().is_server() {
246 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
247 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
248 }
249
250 if !pseudo.is_informational() {
251 let message = counts
252 .peer()
253 .convert_poll_message(pseudo, fields, stream_id)?;
254
255 stream
257 .pending_recv
258 .push_back(&mut self.buffer, Event::Headers(message));
259 stream.notify_recv();
260
261 if counts.peer().is_server() {
264 self.pending_accept.push(stream);
267 }
268 } else {
269 let message = counts
272 .peer()
273 .convert_poll_message(pseudo, fields, stream_id)?;
274
275 tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276
277 stream
280 .pending_recv
281 .push_back(&mut self.buffer, Event::InformationalHeaders(message));
282 stream.notify_recv();
283 }
284
285 Ok(())
286 }
287
288 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
295 use super::peer::PollMessage::*;
296
297 match stream.pending_recv.pop_front(&mut self.buffer) {
298 Some(Event::Headers(Server(request))) => request,
299 _ => unreachable!("server stream queue must start with Headers"),
300 }
301 }
302
303 pub fn poll_pushed(
305 &mut self,
306 cx: &Context,
307 stream: &mut store::Ptr,
308 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
309 use super::peer::PollMessage::*;
310
311 let mut ppp = stream.pending_push_promises.take();
312 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
313 match pushed.pending_recv.pop_front(&mut self.buffer) {
314 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
315 _ => panic!("Headers not set on pushed stream"),
318 }
319 });
320 stream.pending_push_promises = ppp;
321 if let Some(p) = pushed {
322 Poll::Ready(Some(Ok(p)))
323 } else {
324 let is_open = stream.state.ensure_recv_open()?;
325
326 if is_open {
327 stream.push_task = Some(cx.waker().clone());
328 Poll::Pending
329 } else {
330 Poll::Ready(None)
331 }
332 }
333 }
334
335 pub fn poll_response(
337 &mut self,
338 cx: &Context,
339 stream: &mut store::Ptr,
340 ) -> Poll<Result<Response<()>, proto::Error>> {
341 use super::peer::PollMessage::*;
342
343 loop {
345 match stream.pending_recv.pop_front(&mut self.buffer) {
346 Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347 Some(Event::InformationalHeaders(_)) => {
348 tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349 continue;
350 }
351 Some(_) => panic!("poll_response called after response returned"),
352 None => {
353 if !stream.state.ensure_recv_open()? {
354 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
355 return Poll::Ready(Err(Error::library_reset(
356 stream.id,
357 Reason::PROTOCOL_ERROR,
358 )));
359 }
360
361 stream.recv_task = Some(cx.waker().clone());
362 return Poll::Pending;
363 }
364 }
365 }
366 }
367
368 pub fn poll_informational(
370 &mut self,
371 cx: &Context,
372 stream: &mut store::Ptr,
373 ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374 use super::peer::PollMessage::*;
375
376 if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379 match event {
380 Event::Headers(Client(response)) => {
381 stream
383 .pending_recv
384 .push_front(&mut self.buffer, Event::Headers(Client(response)));
385 return Poll::Ready(None);
386 }
387 Event::InformationalHeaders(Client(response)) => {
388 return Poll::Ready(Some(Ok(response)));
390 }
391 other => {
392 stream.pending_recv.push_front(&mut self.buffer, other);
394 }
395 }
396 }
397
398 if stream.state.ensure_recv_open()? {
400 stream.recv_task = Some(cx.waker().clone());
402 Poll::Pending
403 } else {
404 Poll::Ready(None)
406 }
407 }
408
409 pub fn recv_trailers(
411 &mut self,
412 frame: frame::Headers,
413 stream: &mut store::Ptr,
414 ) -> Result<(), Error> {
415 stream.state.recv_close()?;
417
418 if stream.ensure_content_length_zero().is_err() {
419 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
420 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
421 }
422
423 let trailers = frame.into_fields();
424
425 stream
427 .pending_recv
428 .push_back(&mut self.buffer, Event::Trailers(trailers));
429 stream.notify_recv();
430
431 Ok(())
432 }
433
434 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
436 tracing::trace!(
437 "release_connection_capacity; size={}, connection in_flight_data={}",
438 capacity,
439 self.in_flight_data,
440 );
441
442 self.in_flight_data -= capacity;
444
445 let _res = self.flow.assign_capacity(capacity);
448 debug_assert!(_res.is_ok());
449
450 if self.flow.unclaimed_capacity().is_some() {
451 if let Some(task) = task.take() {
452 task.wake();
453 }
454 }
455 }
456
457 pub fn release_capacity(
459 &mut self,
460 capacity: WindowSize,
461 stream: &mut store::Ptr,
462 task: &mut Option<Waker>,
463 ) -> Result<(), UserError> {
464 tracing::trace!("release_capacity; size={}", capacity);
465
466 if capacity > stream.in_flight_recv_data {
467 return Err(UserError::ReleaseCapacityTooBig);
468 }
469
470 self.release_connection_capacity(capacity, task);
471
472 stream.in_flight_recv_data -= capacity;
474
475 let _res = stream.recv_flow.assign_capacity(capacity);
478 debug_assert!(_res.is_ok());
479
480 if stream.recv_flow.unclaimed_capacity().is_some() {
481 self.pending_window_updates.push(stream);
483
484 if let Some(task) = task.take() {
485 task.wake();
486 }
487 }
488
489 Ok(())
490 }
491
492 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
494 debug_assert_eq!(stream.ref_count, 0);
495
496 if stream.in_flight_recv_data == 0 {
497 return;
498 }
499
500 tracing::trace!(
501 "auto-release closed stream ({:?}) capacity: {:?}",
502 stream.id,
503 stream.in_flight_recv_data,
504 );
505
506 self.release_connection_capacity(stream.in_flight_recv_data, task);
507 stream.in_flight_recv_data = 0;
508
509 self.clear_recv_buffer(stream);
510 }
511
512 pub fn set_target_connection_window(
525 &mut self,
526 target: WindowSize,
527 task: &mut Option<Waker>,
528 ) -> Result<(), Reason> {
529 tracing::trace!(
530 "set_target_connection_window; target={}; available={}, reserved={}",
531 target,
532 self.flow.available(),
533 self.in_flight_data,
534 );
535
536 let current = self
542 .flow
543 .available()
544 .add(self.in_flight_data)?
545 .checked_size();
546 if target > current {
547 self.flow.assign_capacity(target - current)?;
548 } else {
549 self.flow.claim_capacity(current - target)?;
550 }
551
552 if self.flow.unclaimed_capacity().is_some() {
556 if let Some(task) = task.take() {
557 task.wake();
558 }
559 }
560 Ok(())
561 }
562
563 pub(crate) fn apply_local_settings(
564 &mut self,
565 settings: &frame::Settings,
566 store: &mut Store,
567 ) -> Result<(), proto::Error> {
568 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
569 self.is_extended_connect_protocol_enabled = val;
570 }
571
572 if let Some(target) = settings.initial_window_size() {
573 let old_sz = self.init_window_sz;
574 self.init_window_sz = target;
575
576 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
577
578 match target.cmp(&old_sz) {
595 Ordering::Less => {
596 let dec = old_sz - target;
598 tracing::trace!("decrementing all windows; dec={}", dec);
599
600 store.try_for_each(|mut stream| {
601 stream
602 .recv_flow
603 .dec_recv_window(dec)
604 .map_err(proto::Error::library_go_away)?;
605 Ok::<_, proto::Error>(())
606 })?;
607 }
608 Ordering::Greater => {
609 let inc = target - old_sz;
611 tracing::trace!("incrementing all windows; inc={}", inc);
612 store.try_for_each(|mut stream| {
613 stream
616 .recv_flow
617 .inc_window(inc)
618 .map_err(proto::Error::library_go_away)?;
619 stream
620 .recv_flow
621 .assign_capacity(inc)
622 .map_err(proto::Error::library_go_away)?;
623 Ok::<_, proto::Error>(())
624 })?;
625 }
626 Ordering::Equal => (),
627 }
628 }
629
630 Ok(())
631 }
632
633 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
634 if !stream.state.is_recv_end_stream() {
635 return false;
636 }
637
638 stream.pending_recv.is_empty()
639 }
640
641 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
642 let sz = frame.flow_controlled_len();
644
645 assert!(sz <= MAX_WINDOW_SIZE as usize);
648
649 let sz = sz as WindowSize;
650
651 let is_ignoring_frame = stream.state.is_local_error();
652
653 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
654 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
660 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
661 }
662
663 tracing::trace!(
664 "recv_data; size={}; connection={}; stream={}",
665 sz,
666 self.flow.window_size(),
667 stream.recv_flow.window_size()
668 );
669
670 if is_ignoring_frame {
671 tracing::trace!(
672 "recv_data; frame ignored on locally reset {:?} for some time",
673 stream.id,
674 );
675 return self.ignore_data(sz);
676 }
677
678 self.consume_connection_window(sz)?;
681
682 if stream.recv_flow.window_size() < sz {
683 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
692 }
693
694 if stream.dec_content_length(frame.payload().len()).is_err() {
696 proto_err!(stream:
697 "recv_data: content-length overflow; stream={:?}; len={:?}",
698 stream.id,
699 frame.payload().len(),
700 );
701 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
702 }
703
704 if frame.is_end_stream() {
705 if stream.ensure_content_length_zero().is_err() {
706 proto_err!(stream:
707 "recv_data: content-length underflow; stream={:?}; len={:?}",
708 stream.id,
709 frame.payload().len(),
710 );
711 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
712 }
713
714 if stream.state.recv_close().is_err() {
715 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
716 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
717 }
718 }
719
720 if !stream.is_recv {
722 tracing::trace!(
723 "recv_data; frame ignored on stream release {:?} for some time",
724 stream.id,
725 );
726 self.release_connection_capacity(sz, &mut None);
727 return Ok(());
728 }
729
730 stream
732 .recv_flow
733 .send_data(sz)
734 .map_err(proto::Error::library_go_away)?;
735
736 stream.in_flight_recv_data += sz;
738
739 if let Some(padded_len) = frame.padded_len() {
741 tracing::trace!(
742 "recv_data; auto-releasing padded length of {:?} for {:?}",
743 padded_len,
744 stream.id,
745 );
746 let _res = self.release_capacity(padded_len.into(), stream, &mut None);
747 debug_assert!(_res.is_ok());
749 }
750
751 let event = Event::Data(frame.into_payload());
752
753 stream.pending_recv.push_back(&mut self.buffer, event);
755 stream.notify_recv();
756
757 Ok(())
758 }
759
760 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
761 self.consume_connection_window(sz)?;
763
764 self.release_connection_capacity(sz, &mut None);
773 Ok(())
774 }
775
776 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
777 if self.flow.window_size() < sz {
778 tracing::debug!(
779 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
780 self.flow.window_size(),
781 sz,
782 );
783 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
784 }
785
786 self.flow.send_data(sz).map_err(Error::library_go_away)?;
788
789 self.in_flight_data += sz;
791 Ok(())
792 }
793
794 pub fn recv_push_promise(
795 &mut self,
796 frame: frame::PushPromise,
797 stream: &mut store::Ptr,
798 ) -> Result<(), Error> {
799 stream.state.reserve_remote()?;
800 if frame.is_over_size() {
801 tracing::debug!(
813 "stream error PROTOCOL_ERROR -- recv_push_promise: \
814 headers frame is over size; promised_id={:?};",
815 frame.promised_id(),
816 );
817 return Err(Error::library_reset(
818 frame.promised_id(),
819 Reason::PROTOCOL_ERROR,
820 ));
821 }
822
823 let promised_id = frame.promised_id();
824 let (pseudo, fields) = frame.into_parts();
825 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
826
827 if let Err(e) = frame::PushPromise::validate_request(&req) {
828 use PushPromiseHeaderError::*;
829 match e {
830 NotSafeAndCacheable => proto_err!(
831 stream:
832 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
833 req.method(),
834 promised_id,
835 ),
836 InvalidContentLength(e) => proto_err!(
837 stream:
838 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
839 e,
840 promised_id,
841 ),
842 }
843 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
844 }
845
846 use super::peer::PollMessage::*;
847 stream
848 .pending_recv
849 .push_back(&mut self.buffer, Event::Headers(Server(req)));
850 stream.notify_recv();
851 stream.notify_push();
852 Ok(())
853 }
854
855 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
857 if let Ok(next) = self.next_stream_id {
858 if id >= next {
859 tracing::debug!(
860 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
861 id
862 );
863 return Err(Reason::PROTOCOL_ERROR);
864 }
865 }
866 Ok(())
869 }
870
871 pub fn recv_reset(
873 &mut self,
874 frame: frame::Reset,
875 stream: &mut Stream,
876 counts: &mut Counts,
877 ) -> Result<(), Error> {
878 if stream.is_pending_accept {
887 if counts.can_inc_num_remote_reset_streams() {
888 counts.inc_num_remote_reset_streams();
889 } else {
890 tracing::warn!(
891 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
892 counts.max_remote_reset_streams(),
893 );
894 return Err(Error::library_go_away_data(
895 Reason::ENHANCE_YOUR_CALM,
896 "too_many_resets",
897 ));
898 }
899 }
900
901 stream.state.recv_reset(frame, stream.is_pending_send);
903
904 stream.notify_send();
905 stream.notify_recv();
906 stream.notify_push();
907
908 Ok(())
909 }
910
911 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
913 stream.state.handle_error(err);
915
916 stream.notify_send();
918 stream.notify_recv();
919 stream.notify_push();
920 }
921
922 pub fn go_away(&mut self, last_processed_id: StreamId) {
923 assert!(self.max_stream_id >= last_processed_id);
924 self.max_stream_id = last_processed_id;
925 }
926
927 pub fn recv_eof(&mut self, stream: &mut Stream) {
928 stream.state.recv_eof();
929 stream.notify_send();
930 stream.notify_recv();
931 stream.notify_push();
932 }
933
934 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
935 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
936 }
938 }
939
940 pub fn max_stream_id(&self) -> StreamId {
944 self.max_stream_id
945 }
946
947 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
948 if let Ok(id) = self.next_stream_id {
949 Ok(id)
950 } else {
951 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
952 }
953 }
954
955 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
956 if let Ok(next_id) = self.next_stream_id {
957 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
959 id < next_id
960 } else {
961 true
962 }
963 }
964
965 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
966 if let Ok(next_id) = self.next_stream_id {
967 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
969 if id >= next_id {
970 self.next_stream_id = id.next_id();
971 }
972 }
973 }
974
975 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
977 if !self.is_push_enabled {
978 proto_err!(conn: "recv_push_promise: push is disabled");
979 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
980 }
981
982 Ok(())
983 }
984
985 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
987 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
988 return;
989 }
990
991 if counts.can_inc_num_reset_streams() {
992 counts.inc_num_reset_streams();
993 tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
994 self.pending_reset_expired.push(stream);
995 } else {
996 tracing::trace!(
997 "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
998 stream.id
999 );
1000 }
1001 }
1002
1003 pub fn send_pending_refusal<T, B>(
1005 &mut self,
1006 cx: &mut Context,
1007 dst: &mut Codec<T, Prioritized<B>>,
1008 ) -> Poll<io::Result<()>>
1009 where
1010 T: AsyncWrite + Unpin,
1011 B: Buf,
1012 {
1013 if let Some(stream_id) = self.refused {
1014 ready!(dst.poll_ready(cx))?;
1015
1016 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
1018
1019 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
1021 }
1022
1023 self.refused = None;
1024
1025 Poll::Ready(Ok(()))
1026 }
1027
1028 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1029 if !self.pending_reset_expired.is_empty() {
1030 let now = Instant::now();
1031 let reset_duration = self.reset_duration;
1032 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
1033 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
1034 now.saturating_duration_since(reset_at) > reset_duration
1038 }) {
1039 counts.transition_after(stream, true);
1040 }
1041 }
1042 }
1043
1044 pub fn clear_queues(
1045 &mut self,
1046 clear_pending_accept: bool,
1047 store: &mut Store,
1048 counts: &mut Counts,
1049 ) {
1050 self.clear_stream_window_update_queue(store, counts);
1051 self.clear_all_reset_streams(store, counts);
1052
1053 if clear_pending_accept {
1054 self.clear_all_pending_accept(store, counts);
1055 }
1056 }
1057
1058 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
1059 while let Some(stream) = self.pending_window_updates.pop(store) {
1060 counts.transition(stream, |_, stream| {
1061 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
1062 })
1063 }
1064 }
1065
1066 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1068 while let Some(stream) = self.pending_reset_expired.pop(store) {
1069 counts.transition_after(stream, true);
1070 }
1071 }
1072
1073 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
1074 while let Some(stream) = self.pending_accept.pop(store) {
1075 counts.transition_after(stream, false);
1076 }
1077 }
1078
1079 pub fn poll_complete<T, B>(
1080 &mut self,
1081 cx: &mut Context,
1082 store: &mut Store,
1083 counts: &mut Counts,
1084 dst: &mut Codec<T, Prioritized<B>>,
1085 ) -> Poll<io::Result<()>>
1086 where
1087 T: AsyncWrite + Unpin,
1088 B: Buf,
1089 {
1090 ready!(self.send_connection_window_update(cx, dst))?;
1092
1093 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1095
1096 Poll::Ready(Ok(()))
1097 }
1098
1099 fn send_connection_window_update<T, B>(
1101 &mut self,
1102 cx: &mut Context,
1103 dst: &mut Codec<T, Prioritized<B>>,
1104 ) -> Poll<io::Result<()>>
1105 where
1106 T: AsyncWrite + Unpin,
1107 B: Buf,
1108 {
1109 if let Some(incr) = self.flow.unclaimed_capacity() {
1110 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1111
1112 ready!(dst.poll_ready(cx))?;
1114
1115 dst.buffer(frame.into())
1117 .expect("invalid WINDOW_UPDATE frame");
1118
1119 self.flow
1121 .inc_window(incr)
1122 .expect("unexpected flow control state");
1123 }
1124
1125 Poll::Ready(Ok(()))
1126 }
1127
1128 pub fn send_stream_window_updates<T, B>(
1130 &mut self,
1131 cx: &mut Context,
1132 store: &mut Store,
1133 counts: &mut Counts,
1134 dst: &mut Codec<T, Prioritized<B>>,
1135 ) -> Poll<io::Result<()>>
1136 where
1137 T: AsyncWrite + Unpin,
1138 B: Buf,
1139 {
1140 loop {
1141 ready!(dst.poll_ready(cx))?;
1143
1144 let stream = match self.pending_window_updates.pop(store) {
1146 Some(stream) => stream,
1147 None => return Poll::Ready(Ok(())),
1148 };
1149
1150 counts.transition(stream, |_, stream| {
1151 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1152 debug_assert!(!stream.is_pending_window_update);
1153
1154 if !stream.state.is_recv_streaming() {
1155 return;
1162 }
1163
1164 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1166 let frame = frame::WindowUpdate::new(stream.id, incr);
1168
1169 dst.buffer(frame.into())
1171 .expect("invalid WINDOW_UPDATE frame");
1172
1173 stream
1175 .recv_flow
1176 .inc_window(incr)
1177 .expect("unexpected flow control state");
1178 }
1179 })
1180 }
1181 }
1182
1183 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1184 self.pending_accept.pop(store).map(|ptr| ptr.key())
1185 }
1186
1187 pub fn poll_data(
1188 &mut self,
1189 cx: &Context,
1190 stream: &mut Stream,
1191 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1192 match stream.pending_recv.pop_front(&mut self.buffer) {
1193 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1194 Some(event) => {
1195 stream.pending_recv.push_front(&mut self.buffer, event);
1197
1198 stream.notify_recv();
1207
1208 Poll::Ready(None)
1210 }
1211 None => self.schedule_recv(cx, stream),
1212 }
1213 }
1214
1215 pub fn poll_trailers(
1216 &mut self,
1217 cx: &Context,
1218 stream: &mut Stream,
1219 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1220 match stream.pending_recv.pop_front(&mut self.buffer) {
1221 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1222 Some(event) => {
1223 stream.pending_recv.push_front(&mut self.buffer, event);
1225
1226 Poll::Pending
1227 }
1228 None => self.schedule_recv(cx, stream),
1229 }
1230 }
1231
1232 fn schedule_recv<T>(
1233 &mut self,
1234 cx: &Context,
1235 stream: &mut Stream,
1236 ) -> Poll<Option<Result<T, proto::Error>>> {
1237 if stream.state.ensure_recv_open()? {
1238 stream.recv_task = Some(cx.waker().clone());
1240 Poll::Pending
1241 } else {
1242 Poll::Ready(None)
1244 }
1245 }
1246}
1247
1248impl Open {
1251 pub fn is_push_promise(&self) -> bool {
1252 matches!(*self, Self::PushPromise)
1253 }
1254}
1255
1256impl<T> From<Error> for RecvHeaderBlockError<T> {
1259 fn from(err: Error) -> Self {
1260 RecvHeaderBlockError::State(err)
1261 }
1262}