hyper/proto/h2/
ping.rs

1//! HTTP2 Ping usage
2//!
3//! hyper uses HTTP2 pings for two purposes:
4//!
5//! 1. Adaptive flow control using BDP
6//! 2. Connection keep-alive
7//!
8//! Both cases are optional.
9//!
10//! # BDP Algorithm
11//!
12//! 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13//!    1a. Record current time.
14//!    1b. Send a BDP ping.
15//! 2. Increment the number of received bytes.
16//! 3. When the BDP ping ack is received:
17//!    3a. Record duration from sent time.
18//!    3b. Merge RTT with a running average.
19//!    3c. Calculate bdp as bytes/rtt.
20//!    3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21
22use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::time::Time;
32use crate::rt::Sleep;
33
34type WindowSize = u32;
35
36pub(super) fn disabled() -> Recorder {
37    Recorder { shared: None }
38}
39
40pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
41    debug_assert!(
42        config.is_enabled(),
43        "ping channel requires bdp or keep-alive config",
44    );
45
46    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
47        bdp: wnd,
48        max_bandwidth: 0.0,
49        rtt: 0.0,
50        ping_delay: Duration::from_millis(100),
51        stable_count: 0,
52    });
53
54    let now = timer.now();
55
56    let (bytes, next_bdp_at) = if bdp.is_some() {
57        (Some(0), Some(now))
58    } else {
59        (None, None)
60    };
61
62    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
63        interval,
64        timeout: config.keep_alive_timeout,
65        while_idle: config.keep_alive_while_idle,
66        sleep: timer.sleep(interval),
67        state: KeepAliveState::Init,
68        timer: timer.clone(),
69    });
70
71    let last_read_at = keep_alive.as_ref().map(|_| now);
72
73    let shared = Arc::new(Mutex::new(Shared {
74        bytes,
75        last_read_at,
76        is_keep_alive_timed_out: false,
77        ping_pong,
78        ping_sent_at: None,
79        next_bdp_at,
80        timer,
81    }));
82
83    (
84        Recorder {
85            shared: Some(shared.clone()),
86        },
87        Ponger {
88            bdp,
89            keep_alive,
90            shared,
91        },
92    )
93}
94
95#[derive(Clone)]
96pub(super) struct Config {
97    pub(super) bdp_initial_window: Option<WindowSize>,
98    /// If no frames are received in this amount of time, a PING frame is sent.
99    pub(super) keep_alive_interval: Option<Duration>,
100    /// After sending a keepalive PING, the connection will be closed if
101    /// a pong is not received in this amount of time.
102    pub(super) keep_alive_timeout: Duration,
103    /// If true, sends pings even when there are no active streams.
104    pub(super) keep_alive_while_idle: bool,
105}
106
107#[derive(Clone)]
108pub(crate) struct Recorder {
109    shared: Option<Arc<Mutex<Shared>>>,
110}
111
112pub(super) struct Ponger {
113    bdp: Option<Bdp>,
114    keep_alive: Option<KeepAlive>,
115    shared: Arc<Mutex<Shared>>,
116}
117
118struct Shared {
119    ping_pong: PingPong,
120    ping_sent_at: Option<Instant>,
121
122    // bdp
123    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
124    /// read during the current sample.
125    bytes: Option<usize>,
126    /// We delay a variable amount of time between BDP pings. This allows us
127    /// to send less pings as the bandwidth stabilizes.
128    next_bdp_at: Option<Instant>,
129
130    // keep-alive
131    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
132    /// the connection read the last frame.
133    last_read_at: Option<Instant>,
134
135    is_keep_alive_timed_out: bool,
136    timer: Time,
137}
138
139struct Bdp {
140    /// Current BDP in bytes
141    bdp: u32,
142    /// Largest bandwidth we've seen so far.
143    max_bandwidth: f64,
144    /// Round trip time in seconds
145    rtt: f64,
146    /// Delay the next ping by this amount.
147    ///
148    /// This will change depending on how stable the current bandwidth is.
149    ping_delay: Duration,
150    /// The count of ping round trips where BDP has stayed the same.
151    stable_count: u32,
152}
153
154struct KeepAlive {
155    /// If no frames are received in this amount of time, a PING frame is sent.
156    interval: Duration,
157    /// After sending a keepalive PING, the connection will be closed if
158    /// a pong is not received in this amount of time.
159    timeout: Duration,
160    /// If true, sends pings even when there are no active streams.
161    while_idle: bool,
162    state: KeepAliveState,
163    sleep: Pin<Box<dyn Sleep>>,
164    timer: Time,
165}
166
167enum KeepAliveState {
168    Init,
169    Scheduled(Instant),
170    PingSent,
171}
172
173pub(super) enum Ponged {
174    SizeUpdate(WindowSize),
175    KeepAliveTimedOut,
176}
177
178#[derive(Debug)]
179pub(super) struct KeepAliveTimedOut;
180
181// ===== impl Config =====
182
183impl Config {
184    pub(super) fn is_enabled(&self) -> bool {
185        self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
186    }
187}
188
189// ===== impl Recorder =====
190
191impl Recorder {
192    pub(crate) fn record_data(&self, len: usize) {
193        let shared = if let Some(ref shared) = self.shared {
194            shared
195        } else {
196            return;
197        };
198
199        let mut locked = shared.lock().unwrap();
200
201        locked.update_last_read_at();
202
203        // are we ready to send another bdp ping?
204        // if not, we don't need to record bytes either
205
206        if let Some(ref next_bdp_at) = locked.next_bdp_at {
207            if locked.timer.now() < *next_bdp_at {
208                return;
209            } else {
210                locked.next_bdp_at = None;
211            }
212        }
213
214        if let Some(ref mut bytes) = locked.bytes {
215            *bytes += len;
216        } else {
217            // no need to send bdp ping if bdp is disabled
218            return;
219        }
220
221        if !locked.is_ping_sent() {
222            locked.send_ping();
223        }
224    }
225
226    pub(crate) fn record_non_data(&self) {
227        let shared = if let Some(ref shared) = self.shared {
228            shared
229        } else {
230            return;
231        };
232
233        let mut locked = shared.lock().unwrap();
234
235        locked.update_last_read_at();
236    }
237
238    /// If the incoming stream is already closed, convert self into
239    /// a disabled reporter.
240    #[cfg(feature = "client")]
241    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
242        if stream.is_end_stream() {
243            disabled()
244        } else {
245            self
246        }
247    }
248
249    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
250        if let Some(ref shared) = self.shared {
251            let locked = shared.lock().unwrap();
252            if locked.is_keep_alive_timed_out {
253                return Err(KeepAliveTimedOut.crate_error());
254            }
255        }
256
257        // else
258        Ok(())
259    }
260}
261
262// ===== impl Ponger =====
263
264impl Ponger {
265    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
266        let mut locked = self.shared.lock().unwrap();
267        let now = locked.timer.now(); // hoping this is fine to move within the lock
268        let is_idle = self.is_idle();
269
270        if let Some(ref mut ka) = self.keep_alive {
271            ka.maybe_schedule(is_idle, &locked);
272            ka.maybe_ping(cx, is_idle, &mut locked);
273        }
274
275        if !locked.is_ping_sent() {
276            // XXX: this doesn't register a waker...?
277            return Poll::Pending;
278        }
279
280        match locked.ping_pong.poll_pong(cx) {
281            Poll::Ready(Ok(_pong)) => {
282                let start = locked
283                    .ping_sent_at
284                    .expect("pong received implies ping_sent_at");
285                locked.ping_sent_at = None;
286                let rtt = now - start;
287                trace!("recv pong");
288
289                if let Some(ref mut ka) = self.keep_alive {
290                    locked.update_last_read_at();
291                    ka.maybe_schedule(is_idle, &locked);
292                    ka.maybe_ping(cx, is_idle, &mut locked);
293                }
294
295                if let Some(ref mut bdp) = self.bdp {
296                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
297                    locked.bytes = Some(0); // reset
298                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
299
300                    let update = bdp.calculate(bytes, rtt);
301                    locked.next_bdp_at = Some(now + bdp.ping_delay);
302                    if let Some(update) = update {
303                        return Poll::Ready(Ponged::SizeUpdate(update));
304                    }
305                }
306            }
307            Poll::Ready(Err(_e)) => {
308                debug!("pong error: {}", _e);
309            }
310            Poll::Pending => {
311                if let Some(ref mut ka) = self.keep_alive {
312                    if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
313                        self.keep_alive = None;
314                        locked.is_keep_alive_timed_out = true;
315                        return Poll::Ready(Ponged::KeepAliveTimedOut);
316                    }
317                }
318            }
319        }
320
321        // XXX: this doesn't register a waker...?
322        Poll::Pending
323    }
324
325    fn is_idle(&self) -> bool {
326        Arc::strong_count(&self.shared) <= 2
327    }
328}
329
330// ===== impl Shared =====
331
332impl Shared {
333    fn send_ping(&mut self) {
334        match self.ping_pong.send_ping(Ping::opaque()) {
335            Ok(()) => {
336                self.ping_sent_at = Some(self.timer.now());
337                trace!("sent ping");
338            }
339            Err(_err) => {
340                debug!("error sending ping: {}", _err);
341            }
342        }
343    }
344
345    fn is_ping_sent(&self) -> bool {
346        self.ping_sent_at.is_some()
347    }
348
349    fn update_last_read_at(&mut self) {
350        if self.last_read_at.is_some() {
351            self.last_read_at = Some(self.timer.now());
352        }
353    }
354
355    fn last_read_at(&self) -> Instant {
356        self.last_read_at.expect("keep_alive expects last_read_at")
357    }
358}
359
360// ===== impl Bdp =====
361
362/// Any higher than this likely will be hitting the TCP flow control.
363const BDP_LIMIT: usize = 1024 * 1024 * 16;
364
365impl Bdp {
366    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
367        // No need to do any math if we're at the limit.
368        if self.bdp as usize == BDP_LIMIT {
369            self.stabilize_delay();
370            return None;
371        }
372
373        // average the rtt
374        let rtt = seconds(rtt);
375        if self.rtt == 0.0 {
376            // First sample means rtt is first rtt.
377            self.rtt = rtt;
378        } else {
379            // Weigh this rtt as 1/8 for a moving average.
380            self.rtt += (rtt - self.rtt) * 0.125;
381        }
382
383        // calculate the current bandwidth
384        let bw = (bytes as f64) / (self.rtt * 1.5);
385        trace!("current bandwidth = {:.1}B/s", bw);
386
387        if bw < self.max_bandwidth {
388            // not a faster bandwidth, so don't update
389            self.stabilize_delay();
390            return None;
391        } else {
392            self.max_bandwidth = bw;
393        }
394
395        // if the current `bytes` sample is at least 2/3 the previous
396        // bdp, increase to double the current sample.
397        if bytes >= self.bdp as usize * 2 / 3 {
398            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
399            trace!("BDP increased to {}", self.bdp);
400
401            self.stable_count = 0;
402            self.ping_delay /= 2;
403            Some(self.bdp)
404        } else {
405            self.stabilize_delay();
406            None
407        }
408    }
409
410    fn stabilize_delay(&mut self) {
411        if self.ping_delay < Duration::from_secs(10) {
412            self.stable_count += 1;
413
414            if self.stable_count >= 2 {
415                self.ping_delay *= 4;
416                self.stable_count = 0;
417            }
418        }
419    }
420}
421
422fn seconds(dur: Duration) -> f64 {
423    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
424    let secs = dur.as_secs() as f64;
425    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
426}
427
428// ===== impl KeepAlive =====
429
430impl KeepAlive {
431    fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
432        match self.state {
433            KeepAliveState::Init => {
434                if !self.while_idle && is_idle {
435                    return;
436                }
437
438                self.schedule(shared);
439            }
440            KeepAliveState::PingSent => {
441                if shared.is_ping_sent() {
442                    return;
443                }
444                self.schedule(shared);
445            }
446            KeepAliveState::Scheduled(..) => (),
447        }
448    }
449
450    fn schedule(&mut self, shared: &Shared) {
451        let interval = shared.last_read_at() + self.interval;
452        self.state = KeepAliveState::Scheduled(interval);
453        self.timer.reset(&mut self.sleep, interval);
454    }
455
456    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
457        match self.state {
458            KeepAliveState::Scheduled(at) => {
459                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
460                    return;
461                }
462                // check if we've received a frame while we were scheduled
463                if shared.last_read_at() + self.interval > at {
464                    self.state = KeepAliveState::Init;
465                    cx.waker().wake_by_ref(); // schedule us again
466                    return;
467                }
468                if !self.while_idle && is_idle {
469                    trace!("keep-alive no need to ping when idle and while_idle=false");
470                    return;
471                }
472                trace!("keep-alive interval ({:?}) reached", self.interval);
473                shared.send_ping();
474                self.state = KeepAliveState::PingSent;
475                let timeout = self.timer.now() + self.timeout;
476                self.timer.reset(&mut self.sleep, timeout);
477            }
478            KeepAliveState::Init | KeepAliveState::PingSent => (),
479        }
480    }
481
482    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
483        match self.state {
484            KeepAliveState::PingSent => {
485                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
486                    return Ok(());
487                }
488                trace!("keep-alive timeout ({:?}) reached", self.timeout);
489                Err(KeepAliveTimedOut)
490            }
491            KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
492        }
493    }
494}
495
496// ===== impl KeepAliveTimedOut =====
497
498impl KeepAliveTimedOut {
499    pub(super) fn crate_error(self) -> crate::Error {
500        crate::Error::new(crate::error::Kind::Http2).with(self)
501    }
502}
503
504impl fmt::Display for KeepAliveTimedOut {
505    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506        f.write_str("keep-alive timed out")
507    }
508}
509
510impl std::error::Error for KeepAliveTimedOut {
511    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
512        Some(&crate::error::TimedOut)
513    }
514}