hyper/proto/h2/
ping.rs

1//! HTTP2 Ping usage
2//!
3//! hyper uses HTTP2 pings for two purposes:
4//!
5//! 1. Adaptive flow control using BDP
6//! 2. Connection keep-alive
7//!
8//! Both cases are optional.
9//!
10//! # BDP Algorithm
11//!
12//! 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13//!    1a. Record current time.
14//!    1b. Send a BDP ping.
15//! 2. Increment the number of received bytes.
16//! 3. When the BDP ping ack is received:
17//!    3a. Record duration from sent time.
18//!    3b. Merge RTT with a running average.
19//!    3c. Calculate bdp as bytes/rtt.
20//!    3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21
22use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::time::Time;
32use crate::rt::Sleep;
33
34type WindowSize = u32;
35
36pub(super) fn disabled() -> Recorder {
37    Recorder { shared: None }
38}
39
40pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
41    debug_assert!(
42        config.is_enabled(),
43        "ping channel requires bdp or keep-alive config",
44    );
45
46    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
47        bdp: wnd,
48        max_bandwidth: 0.0,
49        rtt: 0.0,
50        ping_delay: Duration::from_millis(100),
51        stable_count: 0,
52    });
53
54    let (bytes, next_bdp_at) = if bdp.is_some() {
55        (Some(0), Some(Instant::now()))
56    } else {
57        (None, None)
58    };
59
60    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
61        interval,
62        timeout: config.keep_alive_timeout,
63        while_idle: config.keep_alive_while_idle,
64        sleep: __timer.sleep(interval),
65        state: KeepAliveState::Init,
66        timer: __timer,
67    });
68
69    let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
70
71    let shared = Arc::new(Mutex::new(Shared {
72        bytes,
73        last_read_at,
74        is_keep_alive_timed_out: false,
75        ping_pong,
76        ping_sent_at: None,
77        next_bdp_at,
78    }));
79
80    (
81        Recorder {
82            shared: Some(shared.clone()),
83        },
84        Ponger {
85            bdp,
86            keep_alive,
87            shared,
88        },
89    )
90}
91
92#[derive(Clone)]
93pub(super) struct Config {
94    pub(super) bdp_initial_window: Option<WindowSize>,
95    /// If no frames are received in this amount of time, a PING frame is sent.
96    pub(super) keep_alive_interval: Option<Duration>,
97    /// After sending a keepalive PING, the connection will be closed if
98    /// a pong is not received in this amount of time.
99    pub(super) keep_alive_timeout: Duration,
100    /// If true, sends pings even when there are no active streams.
101    pub(super) keep_alive_while_idle: bool,
102}
103
104#[derive(Clone)]
105pub(crate) struct Recorder {
106    shared: Option<Arc<Mutex<Shared>>>,
107}
108
109pub(super) struct Ponger {
110    bdp: Option<Bdp>,
111    keep_alive: Option<KeepAlive>,
112    shared: Arc<Mutex<Shared>>,
113}
114
115struct Shared {
116    ping_pong: PingPong,
117    ping_sent_at: Option<Instant>,
118
119    // bdp
120    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
121    /// read during the current sample.
122    bytes: Option<usize>,
123    /// We delay a variable amount of time between BDP pings. This allows us
124    /// to send less pings as the bandwidth stabilizes.
125    next_bdp_at: Option<Instant>,
126
127    // keep-alive
128    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
129    /// the connection read the last frame.
130    last_read_at: Option<Instant>,
131
132    is_keep_alive_timed_out: bool,
133}
134
135struct Bdp {
136    /// Current BDP in bytes
137    bdp: u32,
138    /// Largest bandwidth we've seen so far.
139    max_bandwidth: f64,
140    /// Round trip time in seconds
141    rtt: f64,
142    /// Delay the next ping by this amount.
143    ///
144    /// This will change depending on how stable the current bandwidth is.
145    ping_delay: Duration,
146    /// The count of ping round trips where BDP has stayed the same.
147    stable_count: u32,
148}
149
150struct KeepAlive {
151    /// If no frames are received in this amount of time, a PING frame is sent.
152    interval: Duration,
153    /// After sending a keepalive PING, the connection will be closed if
154    /// a pong is not received in this amount of time.
155    timeout: Duration,
156    /// If true, sends pings even when there are no active streams.
157    while_idle: bool,
158    state: KeepAliveState,
159    sleep: Pin<Box<dyn Sleep>>,
160    timer: Time,
161}
162
163enum KeepAliveState {
164    Init,
165    Scheduled(Instant),
166    PingSent,
167}
168
169pub(super) enum Ponged {
170    SizeUpdate(WindowSize),
171    KeepAliveTimedOut,
172}
173
174#[derive(Debug)]
175pub(super) struct KeepAliveTimedOut;
176
177// ===== impl Config =====
178
179impl Config {
180    pub(super) fn is_enabled(&self) -> bool {
181        self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
182    }
183}
184
185// ===== impl Recorder =====
186
187impl Recorder {
188    pub(crate) fn record_data(&self, len: usize) {
189        let shared = if let Some(ref shared) = self.shared {
190            shared
191        } else {
192            return;
193        };
194
195        let mut locked = shared.lock().unwrap();
196
197        locked.update_last_read_at();
198
199        // are we ready to send another bdp ping?
200        // if not, we don't need to record bytes either
201
202        if let Some(ref next_bdp_at) = locked.next_bdp_at {
203            if Instant::now() < *next_bdp_at {
204                return;
205            } else {
206                locked.next_bdp_at = None;
207            }
208        }
209
210        if let Some(ref mut bytes) = locked.bytes {
211            *bytes += len;
212        } else {
213            // no need to send bdp ping if bdp is disabled
214            return;
215        }
216
217        if !locked.is_ping_sent() {
218            locked.send_ping();
219        }
220    }
221
222    pub(crate) fn record_non_data(&self) {
223        let shared = if let Some(ref shared) = self.shared {
224            shared
225        } else {
226            return;
227        };
228
229        let mut locked = shared.lock().unwrap();
230
231        locked.update_last_read_at();
232    }
233
234    /// If the incoming stream is already closed, convert self into
235    /// a disabled reporter.
236    #[cfg(feature = "client")]
237    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
238        if stream.is_end_stream() {
239            disabled()
240        } else {
241            self
242        }
243    }
244
245    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
246        if let Some(ref shared) = self.shared {
247            let locked = shared.lock().unwrap();
248            if locked.is_keep_alive_timed_out {
249                return Err(KeepAliveTimedOut.crate_error());
250            }
251        }
252
253        // else
254        Ok(())
255    }
256}
257
258// ===== impl Ponger =====
259
260impl Ponger {
261    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
262        let now = Instant::now();
263        let mut locked = self.shared.lock().unwrap();
264        let is_idle = self.is_idle();
265
266        if let Some(ref mut ka) = self.keep_alive {
267            ka.maybe_schedule(is_idle, &locked);
268            ka.maybe_ping(cx, is_idle, &mut locked);
269        }
270
271        if !locked.is_ping_sent() {
272            // XXX: this doesn't register a waker...?
273            return Poll::Pending;
274        }
275
276        match locked.ping_pong.poll_pong(cx) {
277            Poll::Ready(Ok(_pong)) => {
278                let start = locked
279                    .ping_sent_at
280                    .expect("pong received implies ping_sent_at");
281                locked.ping_sent_at = None;
282                let rtt = now - start;
283                trace!("recv pong");
284
285                if let Some(ref mut ka) = self.keep_alive {
286                    locked.update_last_read_at();
287                    ka.maybe_schedule(is_idle, &locked);
288                    ka.maybe_ping(cx, is_idle, &mut locked);
289                }
290
291                if let Some(ref mut bdp) = self.bdp {
292                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
293                    locked.bytes = Some(0); // reset
294                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
295
296                    let update = bdp.calculate(bytes, rtt);
297                    locked.next_bdp_at = Some(now + bdp.ping_delay);
298                    if let Some(update) = update {
299                        return Poll::Ready(Ponged::SizeUpdate(update));
300                    }
301                }
302            }
303            Poll::Ready(Err(_e)) => {
304                debug!("pong error: {}", _e);
305            }
306            Poll::Pending => {
307                if let Some(ref mut ka) = self.keep_alive {
308                    if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
309                        self.keep_alive = None;
310                        locked.is_keep_alive_timed_out = true;
311                        return Poll::Ready(Ponged::KeepAliveTimedOut);
312                    }
313                }
314            }
315        }
316
317        // XXX: this doesn't register a waker...?
318        Poll::Pending
319    }
320
321    fn is_idle(&self) -> bool {
322        Arc::strong_count(&self.shared) <= 2
323    }
324}
325
326// ===== impl Shared =====
327
328impl Shared {
329    fn send_ping(&mut self) {
330        match self.ping_pong.send_ping(Ping::opaque()) {
331            Ok(()) => {
332                self.ping_sent_at = Some(Instant::now());
333                trace!("sent ping");
334            }
335            Err(_err) => {
336                debug!("error sending ping: {}", _err);
337            }
338        }
339    }
340
341    fn is_ping_sent(&self) -> bool {
342        self.ping_sent_at.is_some()
343    }
344
345    fn update_last_read_at(&mut self) {
346        if self.last_read_at.is_some() {
347            self.last_read_at = Some(Instant::now());
348        }
349    }
350
351    fn last_read_at(&self) -> Instant {
352        self.last_read_at.expect("keep_alive expects last_read_at")
353    }
354}
355
356// ===== impl Bdp =====
357
358/// Any higher than this likely will be hitting the TCP flow control.
359const BDP_LIMIT: usize = 1024 * 1024 * 16;
360
361impl Bdp {
362    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
363        // No need to do any math if we're at the limit.
364        if self.bdp as usize == BDP_LIMIT {
365            self.stabilize_delay();
366            return None;
367        }
368
369        // average the rtt
370        let rtt = seconds(rtt);
371        if self.rtt == 0.0 {
372            // First sample means rtt is first rtt.
373            self.rtt = rtt;
374        } else {
375            // Weigh this rtt as 1/8 for a moving average.
376            self.rtt += (rtt - self.rtt) * 0.125;
377        }
378
379        // calculate the current bandwidth
380        let bw = (bytes as f64) / (self.rtt * 1.5);
381        trace!("current bandwidth = {:.1}B/s", bw);
382
383        if bw < self.max_bandwidth {
384            // not a faster bandwidth, so don't update
385            self.stabilize_delay();
386            return None;
387        } else {
388            self.max_bandwidth = bw;
389        }
390
391        // if the current `bytes` sample is at least 2/3 the previous
392        // bdp, increase to double the current sample.
393        if bytes >= self.bdp as usize * 2 / 3 {
394            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
395            trace!("BDP increased to {}", self.bdp);
396
397            self.stable_count = 0;
398            self.ping_delay /= 2;
399            Some(self.bdp)
400        } else {
401            self.stabilize_delay();
402            None
403        }
404    }
405
406    fn stabilize_delay(&mut self) {
407        if self.ping_delay < Duration::from_secs(10) {
408            self.stable_count += 1;
409
410            if self.stable_count >= 2 {
411                self.ping_delay *= 4;
412                self.stable_count = 0;
413            }
414        }
415    }
416}
417
418fn seconds(dur: Duration) -> f64 {
419    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
420    let secs = dur.as_secs() as f64;
421    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
422}
423
424// ===== impl KeepAlive =====
425
426impl KeepAlive {
427    fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
428        match self.state {
429            KeepAliveState::Init => {
430                if !self.while_idle && is_idle {
431                    return;
432                }
433
434                self.schedule(shared);
435            }
436            KeepAliveState::PingSent => {
437                if shared.is_ping_sent() {
438                    return;
439                }
440                self.schedule(shared);
441            }
442            KeepAliveState::Scheduled(..) => (),
443        }
444    }
445
446    fn schedule(&mut self, shared: &Shared) {
447        let interval = shared.last_read_at() + self.interval;
448        self.state = KeepAliveState::Scheduled(interval);
449        self.timer.reset(&mut self.sleep, interval);
450    }
451
452    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
453        match self.state {
454            KeepAliveState::Scheduled(at) => {
455                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
456                    return;
457                }
458                // check if we've received a frame while we were scheduled
459                if shared.last_read_at() + self.interval > at {
460                    self.state = KeepAliveState::Init;
461                    cx.waker().wake_by_ref(); // schedule us again
462                    return;
463                }
464                if !self.while_idle && is_idle {
465                    trace!("keep-alive no need to ping when idle and while_idle=false");
466                    return;
467                }
468                trace!("keep-alive interval ({:?}) reached", self.interval);
469                shared.send_ping();
470                self.state = KeepAliveState::PingSent;
471                let timeout = Instant::now() + self.timeout;
472                self.timer.reset(&mut self.sleep, timeout);
473            }
474            KeepAliveState::Init | KeepAliveState::PingSent => (),
475        }
476    }
477
478    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
479        match self.state {
480            KeepAliveState::PingSent => {
481                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
482                    return Ok(());
483                }
484                trace!("keep-alive timeout ({:?}) reached", self.timeout);
485                Err(KeepAliveTimedOut)
486            }
487            KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
488        }
489    }
490}
491
492// ===== impl KeepAliveTimedOut =====
493
494impl KeepAliveTimedOut {
495    pub(super) fn crate_error(self) -> crate::Error {
496        crate::Error::new(crate::error::Kind::Http2).with(self)
497    }
498}
499
500impl fmt::Display for KeepAliveTimedOut {
501    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502        f.write_str("keep-alive timed out")
503    }
504}
505
506impl std::error::Error for KeepAliveTimedOut {
507    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
508        Some(&crate::error::TimedOut)
509    }
510}