1use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::time::Time;
32use crate::rt::Sleep;
33
34type WindowSize = u32;
35
36pub(super) fn disabled() -> Recorder {
37 Recorder { shared: None }
38}
39
40pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
41 debug_assert!(
42 config.is_enabled(),
43 "ping channel requires bdp or keep-alive config",
44 );
45
46 let bdp = config.bdp_initial_window.map(|wnd| Bdp {
47 bdp: wnd,
48 max_bandwidth: 0.0,
49 rtt: 0.0,
50 ping_delay: Duration::from_millis(100),
51 stable_count: 0,
52 });
53
54 let now = timer.now();
55
56 let (bytes, next_bdp_at) = if bdp.is_some() {
57 (Some(0), Some(now))
58 } else {
59 (None, None)
60 };
61
62 let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
63 interval,
64 timeout: config.keep_alive_timeout,
65 while_idle: config.keep_alive_while_idle,
66 sleep: timer.sleep(interval),
67 state: KeepAliveState::Init,
68 timer: timer.clone(),
69 });
70
71 let last_read_at = keep_alive.as_ref().map(|_| now);
72
73 let shared = Arc::new(Mutex::new(Shared {
74 bytes,
75 last_read_at,
76 is_keep_alive_timed_out: false,
77 ping_pong,
78 ping_sent_at: None,
79 next_bdp_at,
80 timer,
81 }));
82
83 (
84 Recorder {
85 shared: Some(shared.clone()),
86 },
87 Ponger {
88 bdp,
89 keep_alive,
90 shared,
91 },
92 )
93}
94
95#[derive(Clone)]
96pub(super) struct Config {
97 pub(super) bdp_initial_window: Option<WindowSize>,
98 pub(super) keep_alive_interval: Option<Duration>,
100 pub(super) keep_alive_timeout: Duration,
103 pub(super) keep_alive_while_idle: bool,
105}
106
107#[derive(Clone)]
108pub(crate) struct Recorder {
109 shared: Option<Arc<Mutex<Shared>>>,
110}
111
112pub(super) struct Ponger {
113 bdp: Option<Bdp>,
114 keep_alive: Option<KeepAlive>,
115 shared: Arc<Mutex<Shared>>,
116}
117
118struct Shared {
119 ping_pong: PingPong,
120 ping_sent_at: Option<Instant>,
121
122 bytes: Option<usize>,
126 next_bdp_at: Option<Instant>,
129
130 last_read_at: Option<Instant>,
134
135 is_keep_alive_timed_out: bool,
136 timer: Time,
137}
138
139struct Bdp {
140 bdp: u32,
142 max_bandwidth: f64,
144 rtt: f64,
146 ping_delay: Duration,
150 stable_count: u32,
152}
153
154struct KeepAlive {
155 interval: Duration,
157 timeout: Duration,
160 while_idle: bool,
162 state: KeepAliveState,
163 sleep: Pin<Box<dyn Sleep>>,
164 timer: Time,
165}
166
167enum KeepAliveState {
168 Init,
169 Scheduled(Instant),
170 PingSent,
171}
172
173pub(super) enum Ponged {
174 SizeUpdate(WindowSize),
175 KeepAliveTimedOut,
176}
177
178#[derive(Debug)]
179pub(super) struct KeepAliveTimedOut;
180
181impl Config {
184 pub(super) fn is_enabled(&self) -> bool {
185 self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
186 }
187}
188
189impl Recorder {
192 pub(crate) fn record_data(&self, len: usize) {
193 let shared = if let Some(ref shared) = self.shared {
194 shared
195 } else {
196 return;
197 };
198
199 let mut locked = shared.lock().unwrap();
200
201 locked.update_last_read_at();
202
203 if let Some(ref next_bdp_at) = locked.next_bdp_at {
207 if locked.timer.now() < *next_bdp_at {
208 return;
209 } else {
210 locked.next_bdp_at = None;
211 }
212 }
213
214 if let Some(ref mut bytes) = locked.bytes {
215 *bytes += len;
216 } else {
217 return;
219 }
220
221 if !locked.is_ping_sent() {
222 locked.send_ping();
223 }
224 }
225
226 pub(crate) fn record_non_data(&self) {
227 let shared = if let Some(ref shared) = self.shared {
228 shared
229 } else {
230 return;
231 };
232
233 let mut locked = shared.lock().unwrap();
234
235 locked.update_last_read_at();
236 }
237
238 #[cfg(feature = "client")]
241 pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
242 if stream.is_end_stream() {
243 disabled()
244 } else {
245 self
246 }
247 }
248
249 pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
250 if let Some(ref shared) = self.shared {
251 let locked = shared.lock().unwrap();
252 if locked.is_keep_alive_timed_out {
253 return Err(KeepAliveTimedOut.crate_error());
254 }
255 }
256
257 Ok(())
259 }
260}
261
262impl Ponger {
265 pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
266 let mut locked = self.shared.lock().unwrap();
267 let now = locked.timer.now(); let is_idle = self.is_idle();
269
270 if let Some(ref mut ka) = self.keep_alive {
271 ka.maybe_schedule(is_idle, &locked);
272 ka.maybe_ping(cx, is_idle, &mut locked);
273 }
274
275 if !locked.is_ping_sent() {
276 return Poll::Pending;
278 }
279
280 match locked.ping_pong.poll_pong(cx) {
281 Poll::Ready(Ok(_pong)) => {
282 let start = locked
283 .ping_sent_at
284 .expect("pong received implies ping_sent_at");
285 locked.ping_sent_at = None;
286 let rtt = now - start;
287 trace!("recv pong");
288
289 if let Some(ref mut ka) = self.keep_alive {
290 locked.update_last_read_at();
291 ka.maybe_schedule(is_idle, &locked);
292 ka.maybe_ping(cx, is_idle, &mut locked);
293 }
294
295 if let Some(ref mut bdp) = self.bdp {
296 let bytes = locked.bytes.expect("bdp enabled implies bytes");
297 locked.bytes = Some(0); trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
299
300 let update = bdp.calculate(bytes, rtt);
301 locked.next_bdp_at = Some(now + bdp.ping_delay);
302 if let Some(update) = update {
303 return Poll::Ready(Ponged::SizeUpdate(update));
304 }
305 }
306 }
307 Poll::Ready(Err(_e)) => {
308 debug!("pong error: {}", _e);
309 }
310 Poll::Pending => {
311 if let Some(ref mut ka) = self.keep_alive {
312 if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
313 self.keep_alive = None;
314 locked.is_keep_alive_timed_out = true;
315 return Poll::Ready(Ponged::KeepAliveTimedOut);
316 }
317 }
318 }
319 }
320
321 Poll::Pending
323 }
324
325 fn is_idle(&self) -> bool {
326 Arc::strong_count(&self.shared) <= 2
327 }
328}
329
330impl Shared {
333 fn send_ping(&mut self) {
334 match self.ping_pong.send_ping(Ping::opaque()) {
335 Ok(()) => {
336 self.ping_sent_at = Some(self.timer.now());
337 trace!("sent ping");
338 }
339 Err(_err) => {
340 debug!("error sending ping: {}", _err);
341 }
342 }
343 }
344
345 fn is_ping_sent(&self) -> bool {
346 self.ping_sent_at.is_some()
347 }
348
349 fn update_last_read_at(&mut self) {
350 if self.last_read_at.is_some() {
351 self.last_read_at = Some(self.timer.now());
352 }
353 }
354
355 fn last_read_at(&self) -> Instant {
356 self.last_read_at.expect("keep_alive expects last_read_at")
357 }
358}
359
360const BDP_LIMIT: usize = 1024 * 1024 * 16;
364
365impl Bdp {
366 fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
367 if self.bdp as usize == BDP_LIMIT {
369 self.stabilize_delay();
370 return None;
371 }
372
373 let rtt = seconds(rtt);
375 if self.rtt == 0.0 {
376 self.rtt = rtt;
378 } else {
379 self.rtt += (rtt - self.rtt) * 0.125;
381 }
382
383 let bw = (bytes as f64) / (self.rtt * 1.5);
385 trace!("current bandwidth = {:.1}B/s", bw);
386
387 if bw < self.max_bandwidth {
388 self.stabilize_delay();
390 return None;
391 } else {
392 self.max_bandwidth = bw;
393 }
394
395 if bytes >= self.bdp as usize * 2 / 3 {
398 self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
399 trace!("BDP increased to {}", self.bdp);
400
401 self.stable_count = 0;
402 self.ping_delay /= 2;
403 Some(self.bdp)
404 } else {
405 self.stabilize_delay();
406 None
407 }
408 }
409
410 fn stabilize_delay(&mut self) {
411 if self.ping_delay < Duration::from_secs(10) {
412 self.stable_count += 1;
413
414 if self.stable_count >= 2 {
415 self.ping_delay *= 4;
416 self.stable_count = 0;
417 }
418 }
419 }
420}
421
422fn seconds(dur: Duration) -> f64 {
423 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
424 let secs = dur.as_secs() as f64;
425 secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
426}
427
428impl KeepAlive {
431 fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
432 match self.state {
433 KeepAliveState::Init => {
434 if !self.while_idle && is_idle {
435 return;
436 }
437
438 self.schedule(shared);
439 }
440 KeepAliveState::PingSent => {
441 if shared.is_ping_sent() {
442 return;
443 }
444 self.schedule(shared);
445 }
446 KeepAliveState::Scheduled(..) => (),
447 }
448 }
449
450 fn schedule(&mut self, shared: &Shared) {
451 let interval = shared.last_read_at() + self.interval;
452 self.state = KeepAliveState::Scheduled(interval);
453 self.timer.reset(&mut self.sleep, interval);
454 }
455
456 fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
457 match self.state {
458 KeepAliveState::Scheduled(at) => {
459 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
460 return;
461 }
462 if shared.last_read_at() + self.interval > at {
464 self.state = KeepAliveState::Init;
465 cx.waker().wake_by_ref(); return;
467 }
468 if !self.while_idle && is_idle {
469 trace!("keep-alive no need to ping when idle and while_idle=false");
470 return;
471 }
472 trace!("keep-alive interval ({:?}) reached", self.interval);
473 shared.send_ping();
474 self.state = KeepAliveState::PingSent;
475 let timeout = self.timer.now() + self.timeout;
476 self.timer.reset(&mut self.sleep, timeout);
477 }
478 KeepAliveState::Init | KeepAliveState::PingSent => (),
479 }
480 }
481
482 fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
483 match self.state {
484 KeepAliveState::PingSent => {
485 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
486 return Ok(());
487 }
488 trace!("keep-alive timeout ({:?}) reached", self.timeout);
489 Err(KeepAliveTimedOut)
490 }
491 KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
492 }
493 }
494}
495
496impl KeepAliveTimedOut {
499 pub(super) fn crate_error(self) -> crate::Error {
500 crate::Error::new(crate::error::Kind::Http2).with(self)
501 }
502}
503
504impl fmt::Display for KeepAliveTimedOut {
505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506 f.write_str("keep-alive timed out")
507 }
508}
509
510impl std::error::Error for KeepAliveTimedOut {
511 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
512 Some(&crate::error::TimedOut)
513 }
514}