1use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::lock::LockResultExt;
32use crate::common::time::Time;
33use crate::rt::Sleep;
34
35type WindowSize = u32;
36
37pub(super) fn disabled() -> Recorder {
38 Recorder { shared: None }
39}
40
41pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
42 debug_assert!(
43 config.is_enabled(),
44 "ping channel requires bdp or keep-alive config",
45 );
46
47 let bdp = config.bdp_initial_window.map(|wnd| Bdp {
48 bdp: wnd,
49 max_bandwidth: 0.0,
50 rtt: 0.0,
51 ping_delay: Duration::from_millis(100),
52 stable_count: 0,
53 });
54
55 let now = timer.now();
56
57 let (bytes, next_bdp_at) = if bdp.is_some() {
58 (Some(0), Some(now))
59 } else {
60 (None, None)
61 };
62
63 let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
64 interval,
65 timeout: config.keep_alive_timeout,
66 while_idle: config.keep_alive_while_idle,
67 sleep: timer.sleep(interval),
68 state: KeepAliveState::Init,
69 timer: timer.clone(),
70 });
71
72 let last_read_at = keep_alive.as_ref().map(|_| now);
73
74 let shared = Arc::new(Mutex::new(Shared {
75 bytes,
76 last_read_at,
77 is_keep_alive_timed_out: false,
78 ping_pong,
79 ping_sent_at: None,
80 next_bdp_at,
81 timer,
82 }));
83
84 (
85 Recorder {
86 shared: Some(shared.clone()),
87 },
88 Ponger {
89 bdp,
90 keep_alive,
91 shared,
92 },
93 )
94}
95
96#[derive(Clone)]
97pub(super) struct Config {
98 pub(super) bdp_initial_window: Option<WindowSize>,
99 pub(super) keep_alive_interval: Option<Duration>,
101 pub(super) keep_alive_timeout: Duration,
104 pub(super) keep_alive_while_idle: bool,
106}
107
108#[derive(Clone)]
109pub(crate) struct Recorder {
110 shared: Option<Arc<Mutex<Shared>>>,
111}
112
113pub(super) struct Ponger {
114 bdp: Option<Bdp>,
115 keep_alive: Option<KeepAlive>,
116 shared: Arc<Mutex<Shared>>,
117}
118
119struct Shared {
120 ping_pong: PingPong,
121 ping_sent_at: Option<Instant>,
122
123 bytes: Option<usize>,
127 next_bdp_at: Option<Instant>,
130
131 last_read_at: Option<Instant>,
135
136 is_keep_alive_timed_out: bool,
137 timer: Time,
138}
139
140struct Bdp {
141 bdp: u32,
143 max_bandwidth: f64,
145 rtt: f64,
147 ping_delay: Duration,
151 stable_count: u32,
153}
154
155struct KeepAlive {
156 interval: Duration,
158 timeout: Duration,
161 while_idle: bool,
163 state: KeepAliveState,
164 sleep: Pin<Box<dyn Sleep>>,
165 timer: Time,
166}
167
168enum KeepAliveState {
169 Init,
170 Scheduled(Instant),
171 PingSent,
172}
173
174pub(super) enum Ponged {
175 SizeUpdate(WindowSize),
176 KeepAliveTimedOut,
177}
178
179#[derive(Debug)]
180pub(super) struct KeepAliveTimedOut;
181
182impl Config {
185 pub(super) fn is_enabled(&self) -> bool {
186 self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
187 }
188}
189
190impl Recorder {
193 pub(crate) fn record_data(&self, len: usize) {
194 let shared = if let Some(ref shared) = self.shared {
195 shared
196 } else {
197 return;
198 };
199
200 let mut locked = shared.lock().panic_if_poisoned();
201
202 locked.update_last_read_at();
203
204 if let Some(ref next_bdp_at) = locked.next_bdp_at {
208 if locked.timer.now() < *next_bdp_at {
209 return;
210 } else {
211 locked.next_bdp_at = None;
212 }
213 }
214
215 if let Some(ref mut bytes) = locked.bytes {
216 *bytes += len;
217 } else {
218 return;
220 }
221
222 if !locked.is_ping_sent() {
223 locked.send_ping();
224 }
225 }
226
227 pub(crate) fn record_non_data(&self) {
228 let shared = if let Some(ref shared) = self.shared {
229 shared
230 } else {
231 return;
232 };
233
234 let mut locked = shared.lock().panic_if_poisoned();
235
236 locked.update_last_read_at();
237 }
238
239 #[cfg(feature = "client")]
242 pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
243 if stream.is_end_stream() {
244 disabled()
245 } else {
246 self
247 }
248 }
249
250 pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
251 if let Some(ref shared) = self.shared {
252 let locked = shared.lock().panic_if_poisoned();
253 if locked.is_keep_alive_timed_out {
254 return Err(KeepAliveTimedOut.crate_error());
255 }
256 }
257
258 Ok(())
260 }
261}
262
263impl Ponger {
266 pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
267 let mut locked = self.shared.lock().panic_if_poisoned();
268 let now = locked.timer.now(); let is_idle = self.is_idle();
270
271 if let Some(ref mut ka) = self.keep_alive {
272 ka.maybe_schedule(is_idle, &locked);
273 ka.maybe_ping(cx, is_idle, &mut locked);
274 }
275
276 if !locked.is_ping_sent() {
277 return Poll::Pending;
279 }
280
281 match locked.ping_pong.poll_pong(cx) {
282 Poll::Ready(Ok(_pong)) => {
283 let start = locked
284 .ping_sent_at
285 .expect("pong received implies ping_sent_at");
286 locked.ping_sent_at = None;
287 let rtt = now - start;
288 trace!("recv pong");
289
290 if let Some(ref mut ka) = self.keep_alive {
291 locked.update_last_read_at();
292 ka.maybe_schedule(is_idle, &locked);
293 ka.maybe_ping(cx, is_idle, &mut locked);
294 }
295
296 if let Some(ref mut bdp) = self.bdp {
297 let bytes = locked.bytes.expect("bdp enabled implies bytes");
298 locked.bytes = Some(0); trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
300
301 let update = bdp.calculate(bytes, rtt);
302 locked.next_bdp_at = Some(now + bdp.ping_delay);
303 if let Some(update) = update {
304 return Poll::Ready(Ponged::SizeUpdate(update));
305 }
306 }
307 }
308 Poll::Ready(Err(_e)) => {
309 debug!("pong error: {}", _e);
310 }
311 Poll::Pending => {
312 if let Some(ref mut ka) = self.keep_alive {
313 if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
314 self.keep_alive = None;
315 locked.is_keep_alive_timed_out = true;
316 return Poll::Ready(Ponged::KeepAliveTimedOut);
317 }
318 }
319 }
320 }
321
322 Poll::Pending
324 }
325
326 fn is_idle(&self) -> bool {
327 Arc::strong_count(&self.shared) <= 2
328 }
329}
330
331impl Shared {
334 fn send_ping(&mut self) {
335 match self.ping_pong.send_ping(Ping::opaque()) {
336 Ok(()) => {
337 self.ping_sent_at = Some(self.timer.now());
338 trace!("sent ping");
339 }
340 Err(_err) => {
341 debug!("error sending ping: {}", _err);
342 }
343 }
344 }
345
346 fn is_ping_sent(&self) -> bool {
347 self.ping_sent_at.is_some()
348 }
349
350 fn update_last_read_at(&mut self) {
351 if self.last_read_at.is_some() {
352 self.last_read_at = Some(self.timer.now());
353 }
354 }
355
356 fn last_read_at(&self) -> Instant {
357 self.last_read_at.expect("keep_alive expects last_read_at")
358 }
359}
360
361const BDP_LIMIT: usize = 1024 * 1024 * 16;
365
366impl Bdp {
367 fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
368 if self.bdp as usize == BDP_LIMIT {
370 self.stabilize_delay();
371 return None;
372 }
373
374 let rtt = seconds(rtt);
376 if self.rtt == 0.0 {
377 self.rtt = rtt;
379 } else {
380 self.rtt += (rtt - self.rtt) * 0.125;
382 }
383
384 let bw = (bytes as f64) / (self.rtt * 1.5);
386 trace!("current bandwidth = {:.1}B/s", bw);
387
388 if bw < self.max_bandwidth {
389 self.stabilize_delay();
391 return None;
392 } else {
393 self.max_bandwidth = bw;
394 }
395
396 if bytes >= self.bdp as usize * 2 / 3 {
399 self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
400 trace!("BDP increased to {}", self.bdp);
401
402 self.stable_count = 0;
403 self.ping_delay /= 2;
404 Some(self.bdp)
405 } else {
406 self.stabilize_delay();
407 None
408 }
409 }
410
411 fn stabilize_delay(&mut self) {
412 if self.ping_delay < Duration::from_secs(10) {
413 self.stable_count += 1;
414
415 if self.stable_count >= 2 {
416 self.ping_delay *= 4;
417 self.stable_count = 0;
418 }
419 }
420 }
421}
422
423fn seconds(dur: Duration) -> f64 {
424 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
425 let secs = dur.as_secs() as f64;
426 secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
427}
428
429impl KeepAlive {
432 fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
433 match self.state {
434 KeepAliveState::Init => {
435 if !self.while_idle && is_idle {
436 return;
437 }
438
439 self.schedule(shared);
440 }
441 KeepAliveState::PingSent => {
442 if shared.is_ping_sent() {
443 return;
444 }
445 self.schedule(shared);
446 }
447 KeepAliveState::Scheduled(..) => (),
448 }
449 }
450
451 fn schedule(&mut self, shared: &Shared) {
452 let interval = shared.last_read_at() + self.interval;
453 self.state = KeepAliveState::Scheduled(interval);
454 self.timer.reset(&mut self.sleep, interval);
455 }
456
457 fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
458 match self.state {
459 KeepAliveState::Scheduled(at) => {
460 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
461 return;
462 }
463 if shared.last_read_at() + self.interval > at {
465 self.state = KeepAliveState::Init;
466 cx.waker().wake_by_ref(); return;
468 }
469 if !self.while_idle && is_idle {
470 trace!("keep-alive no need to ping when idle and while_idle=false");
471 return;
472 }
473 trace!("keep-alive interval ({:?}) reached", self.interval);
474 shared.send_ping();
475 self.state = KeepAliveState::PingSent;
476 let timeout = self.timer.now() + self.timeout;
477 self.timer.reset(&mut self.sleep, timeout);
478 }
479 KeepAliveState::Init | KeepAliveState::PingSent => (),
480 }
481 }
482
483 fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
484 match self.state {
485 KeepAliveState::PingSent => {
486 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
487 return Ok(());
488 }
489 trace!("keep-alive timeout ({:?}) reached", self.timeout);
490 Err(KeepAliveTimedOut)
491 }
492 KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
493 }
494 }
495}
496
497impl KeepAliveTimedOut {
500 pub(super) fn crate_error(self) -> crate::Error {
501 crate::Error::new(crate::error::Kind::Http2).with(self)
502 }
503}
504
505impl fmt::Display for KeepAliveTimedOut {
506 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
507 f.write_str("keep-alive timed out")
508 }
509}
510
511impl std::error::Error for KeepAliveTimedOut {
512 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
513 Some(&crate::error::TimedOut)
514 }
515}