1use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::time::Time;
32use crate::rt::Sleep;
33
34type WindowSize = u32;
35
36pub(super) fn disabled() -> Recorder {
37 Recorder { shared: None }
38}
39
40pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
41 debug_assert!(
42 config.is_enabled(),
43 "ping channel requires bdp or keep-alive config",
44 );
45
46 let bdp = config.bdp_initial_window.map(|wnd| Bdp {
47 bdp: wnd,
48 max_bandwidth: 0.0,
49 rtt: 0.0,
50 ping_delay: Duration::from_millis(100),
51 stable_count: 0,
52 });
53
54 let (bytes, next_bdp_at) = if bdp.is_some() {
55 (Some(0), Some(Instant::now()))
56 } else {
57 (None, None)
58 };
59
60 let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
61 interval,
62 timeout: config.keep_alive_timeout,
63 while_idle: config.keep_alive_while_idle,
64 sleep: __timer.sleep(interval),
65 state: KeepAliveState::Init,
66 timer: __timer,
67 });
68
69 let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
70
71 let shared = Arc::new(Mutex::new(Shared {
72 bytes,
73 last_read_at,
74 is_keep_alive_timed_out: false,
75 ping_pong,
76 ping_sent_at: None,
77 next_bdp_at,
78 }));
79
80 (
81 Recorder {
82 shared: Some(shared.clone()),
83 },
84 Ponger {
85 bdp,
86 keep_alive,
87 shared,
88 },
89 )
90}
91
92#[derive(Clone)]
93pub(super) struct Config {
94 pub(super) bdp_initial_window: Option<WindowSize>,
95 pub(super) keep_alive_interval: Option<Duration>,
97 pub(super) keep_alive_timeout: Duration,
100 pub(super) keep_alive_while_idle: bool,
102}
103
104#[derive(Clone)]
105pub(crate) struct Recorder {
106 shared: Option<Arc<Mutex<Shared>>>,
107}
108
109pub(super) struct Ponger {
110 bdp: Option<Bdp>,
111 keep_alive: Option<KeepAlive>,
112 shared: Arc<Mutex<Shared>>,
113}
114
115struct Shared {
116 ping_pong: PingPong,
117 ping_sent_at: Option<Instant>,
118
119 bytes: Option<usize>,
123 next_bdp_at: Option<Instant>,
126
127 last_read_at: Option<Instant>,
131
132 is_keep_alive_timed_out: bool,
133}
134
135struct Bdp {
136 bdp: u32,
138 max_bandwidth: f64,
140 rtt: f64,
142 ping_delay: Duration,
146 stable_count: u32,
148}
149
150struct KeepAlive {
151 interval: Duration,
153 timeout: Duration,
156 while_idle: bool,
158 state: KeepAliveState,
159 sleep: Pin<Box<dyn Sleep>>,
160 timer: Time,
161}
162
163enum KeepAliveState {
164 Init,
165 Scheduled(Instant),
166 PingSent,
167}
168
169pub(super) enum Ponged {
170 SizeUpdate(WindowSize),
171 KeepAliveTimedOut,
172}
173
174#[derive(Debug)]
175pub(super) struct KeepAliveTimedOut;
176
177impl Config {
180 pub(super) fn is_enabled(&self) -> bool {
181 self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
182 }
183}
184
185impl Recorder {
188 pub(crate) fn record_data(&self, len: usize) {
189 let shared = if let Some(ref shared) = self.shared {
190 shared
191 } else {
192 return;
193 };
194
195 let mut locked = shared.lock().unwrap();
196
197 locked.update_last_read_at();
198
199 if let Some(ref next_bdp_at) = locked.next_bdp_at {
203 if Instant::now() < *next_bdp_at {
204 return;
205 } else {
206 locked.next_bdp_at = None;
207 }
208 }
209
210 if let Some(ref mut bytes) = locked.bytes {
211 *bytes += len;
212 } else {
213 return;
215 }
216
217 if !locked.is_ping_sent() {
218 locked.send_ping();
219 }
220 }
221
222 pub(crate) fn record_non_data(&self) {
223 let shared = if let Some(ref shared) = self.shared {
224 shared
225 } else {
226 return;
227 };
228
229 let mut locked = shared.lock().unwrap();
230
231 locked.update_last_read_at();
232 }
233
234 #[cfg(feature = "client")]
237 pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
238 if stream.is_end_stream() {
239 disabled()
240 } else {
241 self
242 }
243 }
244
245 pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
246 if let Some(ref shared) = self.shared {
247 let locked = shared.lock().unwrap();
248 if locked.is_keep_alive_timed_out {
249 return Err(KeepAliveTimedOut.crate_error());
250 }
251 }
252
253 Ok(())
255 }
256}
257
258impl Ponger {
261 pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
262 let now = Instant::now();
263 let mut locked = self.shared.lock().unwrap();
264 let is_idle = self.is_idle();
265
266 if let Some(ref mut ka) = self.keep_alive {
267 ka.maybe_schedule(is_idle, &locked);
268 ka.maybe_ping(cx, is_idle, &mut locked);
269 }
270
271 if !locked.is_ping_sent() {
272 return Poll::Pending;
274 }
275
276 match locked.ping_pong.poll_pong(cx) {
277 Poll::Ready(Ok(_pong)) => {
278 let start = locked
279 .ping_sent_at
280 .expect("pong received implies ping_sent_at");
281 locked.ping_sent_at = None;
282 let rtt = now - start;
283 trace!("recv pong");
284
285 if let Some(ref mut ka) = self.keep_alive {
286 locked.update_last_read_at();
287 ka.maybe_schedule(is_idle, &locked);
288 ka.maybe_ping(cx, is_idle, &mut locked);
289 }
290
291 if let Some(ref mut bdp) = self.bdp {
292 let bytes = locked.bytes.expect("bdp enabled implies bytes");
293 locked.bytes = Some(0); trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
295
296 let update = bdp.calculate(bytes, rtt);
297 locked.next_bdp_at = Some(now + bdp.ping_delay);
298 if let Some(update) = update {
299 return Poll::Ready(Ponged::SizeUpdate(update));
300 }
301 }
302 }
303 Poll::Ready(Err(_e)) => {
304 debug!("pong error: {}", _e);
305 }
306 Poll::Pending => {
307 if let Some(ref mut ka) = self.keep_alive {
308 if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
309 self.keep_alive = None;
310 locked.is_keep_alive_timed_out = true;
311 return Poll::Ready(Ponged::KeepAliveTimedOut);
312 }
313 }
314 }
315 }
316
317 Poll::Pending
319 }
320
321 fn is_idle(&self) -> bool {
322 Arc::strong_count(&self.shared) <= 2
323 }
324}
325
326impl Shared {
329 fn send_ping(&mut self) {
330 match self.ping_pong.send_ping(Ping::opaque()) {
331 Ok(()) => {
332 self.ping_sent_at = Some(Instant::now());
333 trace!("sent ping");
334 }
335 Err(_err) => {
336 debug!("error sending ping: {}", _err);
337 }
338 }
339 }
340
341 fn is_ping_sent(&self) -> bool {
342 self.ping_sent_at.is_some()
343 }
344
345 fn update_last_read_at(&mut self) {
346 if self.last_read_at.is_some() {
347 self.last_read_at = Some(Instant::now());
348 }
349 }
350
351 fn last_read_at(&self) -> Instant {
352 self.last_read_at.expect("keep_alive expects last_read_at")
353 }
354}
355
356const BDP_LIMIT: usize = 1024 * 1024 * 16;
360
361impl Bdp {
362 fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
363 if self.bdp as usize == BDP_LIMIT {
365 self.stabilize_delay();
366 return None;
367 }
368
369 let rtt = seconds(rtt);
371 if self.rtt == 0.0 {
372 self.rtt = rtt;
374 } else {
375 self.rtt += (rtt - self.rtt) * 0.125;
377 }
378
379 let bw = (bytes as f64) / (self.rtt * 1.5);
381 trace!("current bandwidth = {:.1}B/s", bw);
382
383 if bw < self.max_bandwidth {
384 self.stabilize_delay();
386 return None;
387 } else {
388 self.max_bandwidth = bw;
389 }
390
391 if bytes >= self.bdp as usize * 2 / 3 {
394 self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
395 trace!("BDP increased to {}", self.bdp);
396
397 self.stable_count = 0;
398 self.ping_delay /= 2;
399 Some(self.bdp)
400 } else {
401 self.stabilize_delay();
402 None
403 }
404 }
405
406 fn stabilize_delay(&mut self) {
407 if self.ping_delay < Duration::from_secs(10) {
408 self.stable_count += 1;
409
410 if self.stable_count >= 2 {
411 self.ping_delay *= 4;
412 self.stable_count = 0;
413 }
414 }
415 }
416}
417
418fn seconds(dur: Duration) -> f64 {
419 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
420 let secs = dur.as_secs() as f64;
421 secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
422}
423
424impl KeepAlive {
427 fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
428 match self.state {
429 KeepAliveState::Init => {
430 if !self.while_idle && is_idle {
431 return;
432 }
433
434 self.schedule(shared);
435 }
436 KeepAliveState::PingSent => {
437 if shared.is_ping_sent() {
438 return;
439 }
440 self.schedule(shared);
441 }
442 KeepAliveState::Scheduled(..) => (),
443 }
444 }
445
446 fn schedule(&mut self, shared: &Shared) {
447 let interval = shared.last_read_at() + self.interval;
448 self.state = KeepAliveState::Scheduled(interval);
449 self.timer.reset(&mut self.sleep, interval);
450 }
451
452 fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
453 match self.state {
454 KeepAliveState::Scheduled(at) => {
455 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
456 return;
457 }
458 if shared.last_read_at() + self.interval > at {
460 self.state = KeepAliveState::Init;
461 cx.waker().wake_by_ref(); return;
463 }
464 if !self.while_idle && is_idle {
465 trace!("keep-alive no need to ping when idle and while_idle=false");
466 return;
467 }
468 trace!("keep-alive interval ({:?}) reached", self.interval);
469 shared.send_ping();
470 self.state = KeepAliveState::PingSent;
471 let timeout = Instant::now() + self.timeout;
472 self.timer.reset(&mut self.sleep, timeout);
473 }
474 KeepAliveState::Init | KeepAliveState::PingSent => (),
475 }
476 }
477
478 fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
479 match self.state {
480 KeepAliveState::PingSent => {
481 if Pin::new(&mut self.sleep).poll(cx).is_pending() {
482 return Ok(());
483 }
484 trace!("keep-alive timeout ({:?}) reached", self.timeout);
485 Err(KeepAliveTimedOut)
486 }
487 KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
488 }
489 }
490}
491
492impl KeepAliveTimedOut {
495 pub(super) fn crate_error(self) -> crate::Error {
496 crate::Error::new(crate::error::Kind::Http2).with(self)
497 }
498}
499
500impl fmt::Display for KeepAliveTimedOut {
501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 f.write_str("keep-alive timed out")
503 }
504}
505
506impl std::error::Error for KeepAliveTimedOut {
507 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
508 Some(&crate::error::TimedOut)
509 }
510}