h2/proto/streams/prioritize.rs
1use super::store::Resolve;
2use super::*;
3
4use crate::frame::Reason;
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::Take;
10use std::{
11 cmp::{self, Ordering},
12 fmt, io, mem,
13 task::{Context, Poll, Waker},
14};
15
16/// # Warning
17///
18/// Queued streams are ordered by stream ID, as we need to ensure that
19/// lower-numbered streams are sent headers before higher-numbered ones.
20/// This is because "idle" stream IDs – those which have been initiated but
21/// have yet to receive frames – will be implicitly closed on receipt of a
22/// frame on a higher stream ID. If these queues was not ordered by stream
23/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24/// idle stream is opened first.
25#[derive(Debug)]
26pub(super) struct Prioritize {
27 /// Queue of streams waiting for socket capacity to send a frame.
28 pending_send: store::Queue<stream::NextSend>,
29
30 /// Queue of streams waiting for window capacity to produce data.
31 pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33 /// Streams waiting for capacity due to max concurrency
34 ///
35 /// The `SendRequest` handle is `Clone`. This enables initiating requests
36 /// from many tasks. However, offering this capability while supporting
37 /// backpressure at some level is tricky. If there are many `SendRequest`
38 /// handles and a single stream becomes available, which handle gets
39 /// assigned that stream? Maybe that handle is no longer ready to send a
40 /// request.
41 ///
42 /// The strategy used is to allow each `SendRequest` handle one buffered
43 /// request. A `SendRequest` handle is ready to send a request if it has no
44 /// associated buffered requests. This is the same strategy as `mpsc` in the
45 /// futures library.
46 pending_open: store::Queue<stream::NextOpen>,
47
48 /// Connection level flow control governing sent data
49 flow: FlowControl,
50
51 /// Stream ID of the last stream opened.
52 last_opened_id: StreamId,
53
54 /// What `DATA` frame is currently being sent in the codec.
55 in_flight_data_frame: InFlightData,
56
57 /// The maximum amount of bytes a stream should buffer.
58 max_buffer_size: usize,
59}
60
61#[derive(Debug, Eq, PartialEq)]
62enum InFlightData {
63 /// There is no `DATA` frame in flight.
64 Nothing,
65 /// There is a `DATA` frame in flight belonging to the given stream.
66 DataFrame(store::Key),
67 /// There was a `DATA` frame, but the stream's queue was since cleared.
68 Drop,
69}
70
71pub(crate) struct Prioritized<B> {
72 // The buffer
73 inner: Take<B>,
74
75 end_of_stream: bool,
76
77 // The stream that this is associated with
78 stream: store::Key,
79}
80
81// ===== impl Prioritize =====
82
83impl Prioritize {
84 pub fn new(config: &Config) -> Prioritize {
85 let mut flow = FlowControl::new();
86
87 flow.inc_window(config.remote_init_window_sz)
88 .expect("invalid initial window size");
89
90 // TODO: proper error handling
91 let _res = flow.assign_capacity(config.remote_init_window_sz);
92 debug_assert!(_res.is_ok());
93
94 tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96 Prioritize {
97 pending_send: store::Queue::new(),
98 pending_capacity: store::Queue::new(),
99 pending_open: store::Queue::new(),
100 flow,
101 last_opened_id: StreamId::ZERO,
102 in_flight_data_frame: InFlightData::Nothing,
103 max_buffer_size: config.local_max_buffer_size,
104 }
105 }
106
107 pub(crate) fn max_buffer_size(&self) -> usize {
108 self.max_buffer_size
109 }
110
111 /// Queue a frame to be sent to the remote
112 pub fn queue_frame<B>(
113 &mut self,
114 frame: Frame<B>,
115 buffer: &mut Buffer<Frame<B>>,
116 stream: &mut store::Ptr,
117 task: &mut Option<Waker>,
118 ) {
119 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120 let _e = span.enter();
121 // Queue the frame in the buffer
122 stream.pending_send.push_back(buffer, frame);
123 self.schedule_send(stream, task);
124 }
125
126 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127 // If the stream is waiting to be opened, nothing more to do.
128 if stream.is_send_ready() {
129 tracing::trace!(?stream.id, "schedule_send");
130 // Queue the stream
131 self.pending_send.push(stream);
132
133 // Notify the connection.
134 if let Some(task) = task.take() {
135 task.wake();
136 }
137 }
138 }
139
140 pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141 self.pending_open.push(stream);
142 }
143
144 /// Send a data frame
145 pub fn send_data<B>(
146 &mut self,
147 frame: frame::Data<B>,
148 buffer: &mut Buffer<Frame<B>>,
149 stream: &mut store::Ptr,
150 counts: &mut Counts,
151 task: &mut Option<Waker>,
152 ) -> Result<(), UserError>
153 where
154 B: Buf,
155 {
156 let sz = frame.payload().remaining();
157
158 if sz > MAX_WINDOW_SIZE as usize {
159 return Err(UserError::PayloadTooBig);
160 }
161
162 let sz = sz as WindowSize;
163
164 if !stream.state.is_send_streaming() {
165 if stream.state.is_closed() {
166 return Err(InactiveStreamId);
167 } else {
168 return Err(UnexpectedFrameType);
169 }
170 }
171
172 // Update the buffered data counter
173 stream.buffered_send_data += sz as usize;
174
175 let span =
176 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177 let _e = span.enter();
178 tracing::trace!(buffered = stream.buffered_send_data);
179
180 // Implicitly request more send capacity if not enough has been
181 // requested yet.
182 if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183 // Update the target requested capacity
184 stream.requested_send_capacity =
185 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
187 // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188 // cannot be assigned at the time it is called.
189 self.try_assign_capacity(stream);
190 }
191
192 if frame.is_end_stream() {
193 stream.state.send_close();
194 self.reserve_capacity(0, stream, counts);
195 }
196
197 tracing::trace!(
198 available = %stream.send_flow.available(),
199 buffered = stream.buffered_send_data,
200 );
201
202 // The `stream.buffered_send_data == 0` check is here so that, if a zero
203 // length data frame is queued to the front (there is no previously
204 // queued data), it gets sent out immediately even if there is no
205 // available send window.
206 //
207 // Sending out zero length data frames can be done to signal
208 // end-of-stream.
209 //
210 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
211 // The stream currently has capacity to send the data frame, so
212 // queue it up and notify the connection task.
213 self.queue_frame(frame.into(), buffer, stream, task);
214 } else {
215 // The stream has no capacity to send the frame now, save it but
216 // don't notify the connection task. Once additional capacity
217 // becomes available, the frame will be flushed.
218 stream.pending_send.push_back(buffer, frame.into());
219 }
220
221 Ok(())
222 }
223
224 /// Request capacity to send data
225 pub fn reserve_capacity(
226 &mut self,
227 capacity: WindowSize,
228 stream: &mut store::Ptr,
229 counts: &mut Counts,
230 ) {
231 let span = tracing::trace_span!(
232 "reserve_capacity",
233 ?stream.id,
234 requested = capacity,
235 effective = (capacity as usize) + stream.buffered_send_data,
236 curr = stream.requested_send_capacity
237 );
238 let _e = span.enter();
239
240 // Actual capacity is `capacity` + the current amount of buffered data.
241 // If it were less, then we could never send out the buffered data.
242 let capacity = (capacity as usize) + stream.buffered_send_data;
243
244 match capacity.cmp(&(stream.requested_send_capacity as usize)) {
245 Ordering::Equal => {
246 // Nothing to do
247 }
248 Ordering::Less => {
249 // Update the target requested capacity
250 stream.requested_send_capacity = capacity as WindowSize;
251
252 // Currently available capacity assigned to the stream
253 let available = stream.send_flow.available().as_size();
254
255 // If the stream has more assigned capacity than requested, reclaim
256 // some for the connection
257 if available as usize > capacity {
258 let diff = available - capacity as WindowSize;
259
260 // TODO: proper error handling
261 let _res = stream.send_flow.claim_capacity(diff);
262 debug_assert!(_res.is_ok());
263
264 self.assign_connection_capacity(diff, stream, counts);
265 }
266 }
267 Ordering::Greater => {
268 // If trying to *add* capacity, but the stream send side is closed,
269 // there's nothing to be done.
270 if stream.state.is_send_closed() {
271 return;
272 }
273
274 // Update the target requested capacity
275 stream.requested_send_capacity =
276 cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
277
278 // Try to assign additional capacity to the stream. If none is
279 // currently available, the stream will be queued to receive some
280 // when more becomes available.
281 self.try_assign_capacity(stream);
282 }
283 }
284 }
285
286 pub fn recv_stream_window_update(
287 &mut self,
288 inc: WindowSize,
289 stream: &mut store::Ptr,
290 ) -> Result<(), Reason> {
291 let span = tracing::trace_span!(
292 "recv_stream_window_update",
293 ?stream.id,
294 ?stream.state,
295 inc,
296 flow = ?stream.send_flow
297 );
298 let _e = span.enter();
299
300 if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
301 // We can't send any data, so don't bother doing anything else.
302 return Ok(());
303 }
304
305 // Update the stream level flow control.
306 stream.send_flow.inc_window(inc)?;
307
308 // If the stream is waiting on additional capacity, then this will
309 // assign it (if available on the connection) and notify the producer
310 self.try_assign_capacity(stream);
311
312 Ok(())
313 }
314
315 pub fn recv_connection_window_update(
316 &mut self,
317 inc: WindowSize,
318 store: &mut Store,
319 counts: &mut Counts,
320 ) -> Result<(), Reason> {
321 // Update the connection's window
322 self.flow.inc_window(inc)?;
323
324 self.assign_connection_capacity(inc, store, counts);
325 Ok(())
326 }
327
328 /// Reclaim all capacity assigned to the stream and re-assign it to the
329 /// connection
330 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
331 let available = stream.send_flow.available().as_size();
332 if available > 0 {
333 // TODO: proper error handling
334 let _res = stream.send_flow.claim_capacity(available);
335 debug_assert!(_res.is_ok());
336 // Re-assign all capacity to the connection
337 self.assign_connection_capacity(available, stream, counts);
338 }
339 }
340
341 /// Reclaim just reserved capacity, not buffered capacity, and re-assign
342 /// it to the connection
343 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
344 // only reclaim reserved capacity that isn't already buffered
345 if stream.send_flow.available().as_size() as usize > stream.buffered_send_data {
346 let reserved =
347 stream.send_flow.available().as_size() - stream.buffered_send_data as WindowSize;
348
349 // Panic safety: due to how `reserved` is computed it can't be greater
350 // than what's available.
351 stream
352 .send_flow
353 .claim_capacity(reserved)
354 .expect("window size should be greater than reserved");
355
356 self.assign_connection_capacity(reserved, stream, counts);
357 }
358 }
359
360 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
361 let span = tracing::trace_span!("clear_pending_capacity");
362 let _e = span.enter();
363 while let Some(stream) = self.pending_capacity.pop(store) {
364 counts.transition(stream, |_, stream| {
365 tracing::trace!(?stream.id, "clear_pending_capacity");
366 })
367 }
368 }
369
370 pub fn assign_connection_capacity<R>(
371 &mut self,
372 inc: WindowSize,
373 store: &mut R,
374 counts: &mut Counts,
375 ) where
376 R: Resolve,
377 {
378 let span = tracing::trace_span!("assign_connection_capacity", inc);
379 let _e = span.enter();
380
381 // TODO: proper error handling
382 let _res = self.flow.assign_capacity(inc);
383 debug_assert!(_res.is_ok());
384
385 // Assign newly acquired capacity to streams pending capacity.
386 while self.flow.available() > 0 {
387 let stream = match self.pending_capacity.pop(store) {
388 Some(stream) => stream,
389 None => return,
390 };
391
392 // Streams pending capacity may have been reset before capacity
393 // became available. In that case, the stream won't want any
394 // capacity, and so we shouldn't "transition" on it, but just evict
395 // it and continue the loop.
396 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
397 continue;
398 }
399
400 counts.transition(stream, |_, stream| {
401 // Try to assign capacity to the stream. This will also re-queue the
402 // stream if there isn't enough connection level capacity to fulfill
403 // the capacity request.
404 self.try_assign_capacity(stream);
405 })
406 }
407 }
408
409 /// Request capacity to send data
410 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
411 // Streams over the max concurrent count should not have capacity assign to avoid starving the connection
412 // capacity for open streams
413 if stream.is_pending_open {
414 return;
415 }
416
417 let total_requested = stream.requested_send_capacity;
418
419 // Total requested should never go below actual assigned
420 // (Note: the window size can go lower than assigned)
421 debug_assert!(stream.send_flow.available() <= total_requested as usize);
422
423 // The amount of additional capacity that the stream requests.
424 // Don't assign more than the window has available!
425 let additional = cmp::min(
426 total_requested - stream.send_flow.available().as_size(),
427 // Can't assign more than what is available
428 stream.send_flow.window_size() - stream.send_flow.available().as_size(),
429 );
430 let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
431 let _e = span.enter();
432 tracing::trace!(
433 requested = total_requested,
434 additional,
435 buffered = stream.buffered_send_data,
436 window = stream.send_flow.window_size(),
437 conn = %self.flow.available()
438 );
439
440 if additional == 0 {
441 // Nothing more to do
442 return;
443 }
444
445 // The stream may have been reset or closed since capacity was requested.
446 if !stream.state.is_send_streaming() && stream.buffered_send_data == 0 {
447 return;
448 }
449
450 // The amount of currently available capacity on the connection
451 let conn_available = self.flow.available().as_size();
452
453 // First check if capacity is immediately available
454 if conn_available > 0 {
455 // The amount of capacity to assign to the stream
456 // TODO: Should prioritization factor into this?
457 let assign = cmp::min(conn_available, additional);
458
459 tracing::trace!(capacity = assign, "assigning");
460
461 // Assign the capacity to the stream
462 stream.assign_capacity(assign, self.max_buffer_size);
463
464 // Claim the capacity from the connection
465 // TODO: proper error handling
466 let _res = self.flow.claim_capacity(assign);
467 debug_assert!(_res.is_ok());
468 }
469
470 tracing::trace!(
471 available = %stream.send_flow.available(),
472 requested = stream.requested_send_capacity,
473 buffered = stream.buffered_send_data,
474 has_unavailable = %stream.send_flow.has_unavailable()
475 );
476
477 if stream.send_flow.available() < stream.requested_send_capacity as usize
478 && stream.send_flow.has_unavailable()
479 {
480 // The stream requires additional capacity and the stream's
481 // window has available capacity, but the connection window
482 // does not.
483 //
484 // In this case, the stream needs to be queued up for when the
485 // connection has more capacity.
486 self.pending_capacity.push(stream);
487 }
488
489 // If data is buffered and the stream is send ready, then
490 // schedule the stream for execution
491 if stream.buffered_send_data > 0 && stream.is_send_ready() {
492 // TODO: This assertion isn't *exactly* correct. There can still be
493 // buffered send data while the stream's pending send queue is
494 // empty. This can happen when a large data frame is in the process
495 // of being **partially** sent. Once the window has been sent, the
496 // data frame will be returned to the prioritization layer to be
497 // re-scheduled.
498 //
499 // That said, it would be nice to figure out how to make this
500 // assertion correctly.
501 //
502 // debug_assert!(!stream.pending_send.is_empty());
503
504 self.pending_send.push(stream);
505 }
506 }
507
508 pub fn poll_complete<T, B>(
509 &mut self,
510 cx: &mut Context,
511 buffer: &mut Buffer<Frame<B>>,
512 store: &mut Store,
513 counts: &mut Counts,
514 dst: &mut Codec<T, Prioritized<B>>,
515 ) -> Poll<io::Result<()>>
516 where
517 T: AsyncWrite + Unpin,
518 B: Buf,
519 {
520 // Ensure codec is ready
521 ready!(dst.poll_ready(cx))?;
522
523 // Reclaim any frame that has previously been written
524 self.reclaim_frame(buffer, store, dst);
525
526 // The max frame length
527 let max_frame_len = dst.max_send_frame_size();
528
529 tracing::trace!("poll_complete");
530
531 loop {
532 if let Some(mut stream) = self.pop_pending_open(store, counts) {
533 self.pending_send.push_front(&mut stream);
534 self.try_assign_capacity(&mut stream);
535 }
536
537 match self.pop_frame(buffer, store, max_frame_len, counts) {
538 Some(frame) => {
539 tracing::trace!(?frame, "writing");
540
541 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
542 if let Frame::Data(ref frame) = frame {
543 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
544 }
545 dst.buffer(frame).expect("invalid frame");
546
547 // Ensure the codec is ready to try the loop again.
548 ready!(dst.poll_ready(cx))?;
549
550 // Because, always try to reclaim...
551 self.reclaim_frame(buffer, store, dst);
552 }
553 None => {
554 // Try to flush the codec.
555 ready!(dst.flush(cx))?;
556
557 // This might release a data frame...
558 if !self.reclaim_frame(buffer, store, dst) {
559 return Poll::Ready(Ok(()));
560 }
561
562 // No need to poll ready as poll_complete() does this for
563 // us...
564 }
565 }
566 }
567 }
568
569 /// Tries to reclaim a pending data frame from the codec.
570 ///
571 /// Returns true if a frame was reclaimed.
572 ///
573 /// When a data frame is written to the codec, it may not be written in its
574 /// entirety (large chunks are split up into potentially many data frames).
575 /// In this case, the stream needs to be reprioritized.
576 fn reclaim_frame<T, B>(
577 &mut self,
578 buffer: &mut Buffer<Frame<B>>,
579 store: &mut Store,
580 dst: &mut Codec<T, Prioritized<B>>,
581 ) -> bool
582 where
583 B: Buf,
584 {
585 let span = tracing::trace_span!("try_reclaim_frame");
586 let _e = span.enter();
587
588 // First check if there are any data chunks to take back
589 if let Some(frame) = dst.take_last_data_frame() {
590 self.reclaim_frame_inner(buffer, store, frame)
591 } else {
592 false
593 }
594 }
595
596 fn reclaim_frame_inner<B>(
597 &mut self,
598 buffer: &mut Buffer<Frame<B>>,
599 store: &mut Store,
600 frame: frame::Data<Prioritized<B>>,
601 ) -> bool
602 where
603 B: Buf,
604 {
605 tracing::trace!(
606 ?frame,
607 sz = frame.payload().inner.get_ref().remaining(),
608 "reclaimed"
609 );
610
611 let mut eos = false;
612 let key = frame.payload().stream;
613
614 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
615 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
616 InFlightData::Drop => {
617 tracing::trace!("not reclaiming frame for cancelled stream");
618 return false;
619 }
620 InFlightData::DataFrame(k) => {
621 debug_assert_eq!(k, key);
622 }
623 }
624
625 let mut frame = frame.map(|prioritized| {
626 // TODO: Ensure fully written
627 eos = prioritized.end_of_stream;
628 prioritized.inner.into_inner()
629 });
630
631 if frame.payload().has_remaining() {
632 let mut stream = store.resolve(key);
633
634 if eos {
635 frame.set_end_stream(true);
636 }
637
638 self.push_back_frame(frame.into(), buffer, &mut stream);
639
640 return true;
641 }
642
643 false
644 }
645
646 /// Push the frame to the front of the stream's deque, scheduling the
647 /// stream if needed.
648 fn push_back_frame<B>(
649 &mut self,
650 frame: Frame<B>,
651 buffer: &mut Buffer<Frame<B>>,
652 stream: &mut store::Ptr,
653 ) {
654 // Push the frame to the front of the stream's deque
655 stream.pending_send.push_front(buffer, frame);
656
657 // If needed, schedule the sender
658 if stream.send_flow.available() > 0 {
659 debug_assert!(!stream.pending_send.is_empty());
660 self.pending_send.push(stream);
661 }
662 }
663
664 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
665 let span = tracing::trace_span!("clear_queue", ?stream.id);
666 let _e = span.enter();
667
668 // TODO: make this more efficient?
669 while let Some(frame) = stream.pending_send.pop_front(buffer) {
670 tracing::trace!(?frame, "dropping");
671 }
672
673 stream.buffered_send_data = 0;
674 stream.requested_send_capacity = 0;
675 if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
676 if stream.key() == key {
677 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
678 self.in_flight_data_frame = InFlightData::Drop;
679 }
680 }
681 }
682
683 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
684 while let Some(mut stream) = self.pending_send.pop(store) {
685 let is_pending_reset = stream.is_pending_reset_expiration();
686 if let Some(reason) = stream.state.get_scheduled_reset() {
687 stream.set_reset(reason, Initiator::Library);
688 }
689 counts.transition_after(stream, is_pending_reset);
690 }
691 }
692
693 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
694 while let Some(stream) = self.pending_open.pop(store) {
695 let is_pending_reset = stream.is_pending_reset_expiration();
696 counts.transition_after(stream, is_pending_reset);
697 }
698 }
699
700 fn pop_frame<B>(
701 &mut self,
702 buffer: &mut Buffer<Frame<B>>,
703 store: &mut Store,
704 max_len: usize,
705 counts: &mut Counts,
706 ) -> Option<Frame<Prioritized<B>>>
707 where
708 B: Buf,
709 {
710 let span = tracing::trace_span!("pop_frame");
711 let _e = span.enter();
712
713 loop {
714 match self.pending_send.pop(store) {
715 Some(mut stream) => {
716 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
717 let _e = span.enter();
718
719 // It's possible that this stream, besides having data to send,
720 // is also queued to send a reset, and thus is already in the queue
721 // to wait for "some time" after a reset.
722 //
723 // To be safe, we just always ask the stream.
724 let is_pending_reset = stream.is_pending_reset_expiration();
725
726 tracing::trace!(is_pending_reset);
727
728 let frame = match stream.pending_send.pop_front(buffer) {
729 Some(Frame::Data(mut frame)) => {
730 // Get the amount of capacity remaining for stream's
731 // window.
732 let stream_capacity = stream.send_flow.available();
733 let sz = frame.payload().remaining();
734
735 tracing::trace!(
736 sz,
737 eos = frame.is_end_stream(),
738 window = %stream_capacity,
739 available = %stream.send_flow.available(),
740 requested = stream.requested_send_capacity,
741 buffered = stream.buffered_send_data,
742 "data frame"
743 );
744
745 // Zero length data frames always have capacity to
746 // be sent.
747 if sz > 0 && stream_capacity == 0 {
748 tracing::trace!("stream capacity is 0");
749
750 // Ensure that the stream is waiting for
751 // connection level capacity
752 //
753 // TODO: uncomment
754 // debug_assert!(stream.is_pending_send_capacity);
755
756 // The stream has no more capacity, this can
757 // happen if the remote reduced the stream
758 // window. In this case, we need to buffer the
759 // frame and wait for a window update...
760 stream.pending_send.push_front(buffer, frame.into());
761
762 continue;
763 }
764
765 // Only send up to the max frame length
766 let len = cmp::min(sz, max_len);
767
768 // Only send up to the stream's window capacity
769 let len =
770 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
771
772 // There *must* be be enough connection level
773 // capacity at this point.
774 debug_assert!(len <= self.flow.window_size());
775
776 // Check if the stream level window the peer knows is available. In some
777 // scenarios, maybe the window we know is available but the window which
778 // peer knows is not.
779 if len > 0 && len > stream.send_flow.window_size() {
780 stream.pending_send.push_front(buffer, frame.into());
781 continue;
782 }
783
784 tracing::trace!(len, "sending data frame");
785
786 // Update the flow control
787 tracing::trace_span!("updating stream flow").in_scope(|| {
788 stream.send_data(len, self.max_buffer_size);
789
790 // Assign the capacity back to the connection that
791 // was just consumed from the stream in the previous
792 // line.
793 // TODO: proper error handling
794 let _res = self.flow.assign_capacity(len);
795 debug_assert!(_res.is_ok());
796 });
797
798 let (eos, len) = tracing::trace_span!("updating connection flow")
799 .in_scope(|| {
800 // TODO: proper error handling
801 let _res = self.flow.send_data(len);
802 debug_assert!(_res.is_ok());
803
804 // Wrap the frame's data payload to ensure that the
805 // correct amount of data gets written.
806
807 let eos = frame.is_end_stream();
808 let len = len as usize;
809
810 if frame.payload().remaining() > len {
811 frame.set_end_stream(false);
812 }
813 (eos, len)
814 });
815
816 Frame::Data(frame.map(|buf| Prioritized {
817 inner: buf.take(len),
818 end_of_stream: eos,
819 stream: stream.key(),
820 }))
821 }
822 Some(Frame::PushPromise(pp)) => {
823 let mut pushed =
824 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
825 pushed.is_pending_push = false;
826 // Transition stream from pending_push to pending_open
827 // if possible
828 if !pushed.pending_send.is_empty() {
829 if counts.can_inc_num_send_streams() {
830 counts.inc_num_send_streams(&mut pushed);
831 self.pending_send.push(&mut pushed);
832 } else {
833 self.queue_open(&mut pushed);
834 }
835 }
836 Frame::PushPromise(pp)
837 }
838 Some(frame) => frame.map(|_| {
839 unreachable!(
840 "Frame::map closure will only be called \
841 on DATA frames."
842 )
843 }),
844 None => {
845 if let Some(reason) = stream.state.get_scheduled_reset() {
846 stream.set_reset(reason, Initiator::Library);
847
848 let frame = frame::Reset::new(stream.id, reason);
849 Frame::Reset(frame)
850 } else {
851 // If the stream receives a RESET from the peer, it may have
852 // had data buffered to be sent, but all the frames are cleared
853 // in clear_queue(). Instead of doing O(N) traversal through queue
854 // to remove, lets just ignore the stream here.
855 tracing::trace!("removing dangling stream from pending_send");
856 // Since this should only happen as a consequence of `clear_queue`,
857 // we must be in a closed state of some kind.
858 debug_assert!(stream.state.is_closed());
859 counts.transition_after(stream, is_pending_reset);
860 continue;
861 }
862 }
863 };
864
865 tracing::trace!("pop_frame; frame={:?}", frame);
866
867 if cfg!(debug_assertions) && stream.state.is_idle() {
868 debug_assert!(stream.id > self.last_opened_id);
869 self.last_opened_id = stream.id;
870 }
871
872 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
873 // TODO: Only requeue the sender IF it is ready to send
874 // the next frame. i.e. don't requeue it if the next
875 // frame is a data frame and the stream does not have
876 // any more capacity.
877 self.pending_send.push(&mut stream);
878 }
879
880 counts.transition_after(stream, is_pending_reset);
881
882 return Some(frame);
883 }
884 None => return None,
885 }
886 }
887 }
888
889 fn pop_pending_open<'s>(
890 &mut self,
891 store: &'s mut Store,
892 counts: &mut Counts,
893 ) -> Option<store::Ptr<'s>> {
894 tracing::trace!("schedule_pending_open");
895 // check for any pending open streams
896 if counts.can_inc_num_send_streams() {
897 if let Some(mut stream) = self.pending_open.pop(store) {
898 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
899
900 counts.inc_num_send_streams(&mut stream);
901 stream.notify_send();
902 return Some(stream);
903 }
904 }
905
906 None
907 }
908}
909
910// ===== impl Prioritized =====
911
912impl<B> Buf for Prioritized<B>
913where
914 B: Buf,
915{
916 fn remaining(&self) -> usize {
917 self.inner.remaining()
918 }
919
920 fn chunk(&self) -> &[u8] {
921 self.inner.chunk()
922 }
923
924 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
925 self.inner.chunks_vectored(dst)
926 }
927
928 fn advance(&mut self, cnt: usize) {
929 self.inner.advance(cnt)
930 }
931}
932
933impl<B: Buf> fmt::Debug for Prioritized<B> {
934 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
935 fmt.debug_struct("Prioritized")
936 .field("remaining", &self.inner.get_ref().remaining())
937 .field("end_of_stream", &self.end_of_stream)
938 .field("stream", &self.stream)
939 .finish()
940 }
941}