Skip to main content

hyper/proto/h2/
ping.rs

1//! HTTP2 Ping usage.
2//!
3//! hyper uses HTTP2 pings for two purposes:
4//!
5//! 1. Adaptive flow control using BDP
6//! 2. Connection keep-alive
7//!
8//! Both cases are optional.
9//!
10//! # BDP Algorithm
11//!
12//! 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13//!    1a. Record current time.
14//!    1b. Send a BDP ping.
15//! 2. Increment the number of received bytes.
16//! 3. When the BDP ping ack is received:
17//!    3a. Record duration from sent time.
18//!    3b. Merge RTT with a running average.
19//!    3c. Calculate bdp as bytes/rtt.
20//!    3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21
22use std::fmt;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use std::task::{self, Poll};
27use std::time::{Duration, Instant};
28
29use h2::{Ping, PingPong};
30
31use crate::common::lock::LockResultExt;
32use crate::common::time::Time;
33use crate::rt::Sleep;
34
35type WindowSize = u32;
36
37pub(super) fn disabled() -> Recorder {
38    Recorder { shared: None }
39}
40
41pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
42    debug_assert!(
43        config.is_enabled(),
44        "ping channel requires bdp or keep-alive config",
45    );
46
47    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
48        bdp: wnd,
49        max_bandwidth: 0.0,
50        rtt: 0.0,
51        ping_delay: Duration::from_millis(100),
52        stable_count: 0,
53    });
54
55    let now = timer.now();
56
57    let (bytes, next_bdp_at) = if bdp.is_some() {
58        (Some(0), Some(now))
59    } else {
60        (None, None)
61    };
62
63    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
64        interval,
65        timeout: config.keep_alive_timeout,
66        while_idle: config.keep_alive_while_idle,
67        sleep: timer.sleep(interval),
68        state: KeepAliveState::Init,
69        timer: timer.clone(),
70    });
71
72    let last_read_at = keep_alive.as_ref().map(|_| now);
73
74    let shared = Arc::new(Mutex::new(Shared {
75        bytes,
76        last_read_at,
77        is_keep_alive_timed_out: false,
78        ping_pong,
79        ping_sent_at: None,
80        next_bdp_at,
81        timer,
82    }));
83
84    (
85        Recorder {
86            shared: Some(shared.clone()),
87        },
88        Ponger {
89            bdp,
90            keep_alive,
91            shared,
92        },
93    )
94}
95
96#[derive(Clone)]
97pub(super) struct Config {
98    pub(super) bdp_initial_window: Option<WindowSize>,
99    /// If no frames are received in this amount of time, a PING frame is sent.
100    pub(super) keep_alive_interval: Option<Duration>,
101    /// After sending a keepalive PING, the connection will be closed if
102    /// a pong is not received in this amount of time.
103    pub(super) keep_alive_timeout: Duration,
104    /// If true, sends pings even when there are no active streams.
105    pub(super) keep_alive_while_idle: bool,
106}
107
108#[derive(Clone)]
109pub(crate) struct Recorder {
110    shared: Option<Arc<Mutex<Shared>>>,
111}
112
113pub(super) struct Ponger {
114    bdp: Option<Bdp>,
115    keep_alive: Option<KeepAlive>,
116    shared: Arc<Mutex<Shared>>,
117}
118
119struct Shared {
120    ping_pong: PingPong,
121    ping_sent_at: Option<Instant>,
122
123    // bdp
124    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
125    /// read during the current sample.
126    bytes: Option<usize>,
127    /// We delay a variable amount of time between BDP pings. This allows us
128    /// to send less pings as the bandwidth stabilizes.
129    next_bdp_at: Option<Instant>,
130
131    // keep-alive
132    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
133    /// the connection read the last frame.
134    last_read_at: Option<Instant>,
135
136    is_keep_alive_timed_out: bool,
137    timer: Time,
138}
139
140struct Bdp {
141    /// Current BDP in bytes.
142    bdp: u32,
143    /// Largest bandwidth we've seen so far.
144    max_bandwidth: f64,
145    /// Round trip time in seconds.
146    rtt: f64,
147    /// Delay the next ping by this amount.
148    ///
149    /// This will change depending on how stable the current bandwidth is.
150    ping_delay: Duration,
151    /// The count of ping round trips where BDP has stayed the same.
152    stable_count: u32,
153}
154
155struct KeepAlive {
156    /// If no frames are received in this amount of time, a PING frame is sent.
157    interval: Duration,
158    /// After sending a keepalive PING, the connection will be closed if
159    /// a pong is not received in this amount of time.
160    timeout: Duration,
161    /// If true, sends pings even when there are no active streams.
162    while_idle: bool,
163    state: KeepAliveState,
164    sleep: Pin<Box<dyn Sleep>>,
165    timer: Time,
166}
167
168enum KeepAliveState {
169    Init,
170    Scheduled(Instant),
171    PingSent,
172}
173
174pub(super) enum Ponged {
175    SizeUpdate(WindowSize),
176    KeepAliveTimedOut,
177}
178
179#[derive(Debug)]
180pub(super) struct KeepAliveTimedOut;
181
182// ===== impl Config =====
183
184impl Config {
185    pub(super) fn is_enabled(&self) -> bool {
186        self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
187    }
188}
189
190// ===== impl Recorder =====
191
192impl Recorder {
193    pub(crate) fn record_data(&self, len: usize) {
194        let shared = if let Some(ref shared) = self.shared {
195            shared
196        } else {
197            return;
198        };
199
200        let mut locked = shared.lock().panic_if_poisoned();
201
202        locked.update_last_read_at();
203
204        // are we ready to send another bdp ping?
205        // if not, we don't need to record bytes either
206
207        if let Some(ref next_bdp_at) = locked.next_bdp_at {
208            if locked.timer.now() < *next_bdp_at {
209                return;
210            } else {
211                locked.next_bdp_at = None;
212            }
213        }
214
215        if let Some(ref mut bytes) = locked.bytes {
216            *bytes += len;
217        } else {
218            // no need to send bdp ping if bdp is disabled
219            return;
220        }
221
222        if !locked.is_ping_sent() {
223            locked.send_ping();
224        }
225    }
226
227    pub(crate) fn record_non_data(&self) {
228        let shared = if let Some(ref shared) = self.shared {
229            shared
230        } else {
231            return;
232        };
233
234        let mut locked = shared.lock().panic_if_poisoned();
235
236        locked.update_last_read_at();
237    }
238
239    /// If the incoming stream is already closed, convert self into
240    /// a disabled reporter.
241    #[cfg(feature = "client")]
242    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
243        if stream.is_end_stream() {
244            disabled()
245        } else {
246            self
247        }
248    }
249
250    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
251        if let Some(ref shared) = self.shared {
252            let locked = shared.lock().panic_if_poisoned();
253            if locked.is_keep_alive_timed_out {
254                return Err(KeepAliveTimedOut.crate_error());
255            }
256        }
257
258        // else
259        Ok(())
260    }
261}
262
263// ===== impl Ponger =====
264
265impl Ponger {
266    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
267        let mut locked = self.shared.lock().panic_if_poisoned();
268        let now = locked.timer.now(); // hoping this is fine to move within the lock
269        let is_idle = self.is_idle();
270
271        if let Some(ref mut ka) = self.keep_alive {
272            ka.maybe_schedule(is_idle, &locked);
273            ka.maybe_ping(cx, is_idle, &mut locked);
274        }
275
276        if !locked.is_ping_sent() {
277            // XXX: this doesn't register a waker...?
278            return Poll::Pending;
279        }
280
281        match locked.ping_pong.poll_pong(cx) {
282            Poll::Ready(Ok(_pong)) => {
283                let start = locked
284                    .ping_sent_at
285                    .expect("pong received implies ping_sent_at");
286                locked.ping_sent_at = None;
287                let rtt = now - start;
288                trace!("recv pong");
289
290                if let Some(ref mut ka) = self.keep_alive {
291                    locked.update_last_read_at();
292                    ka.maybe_schedule(is_idle, &locked);
293                    ka.maybe_ping(cx, is_idle, &mut locked);
294                }
295
296                if let Some(ref mut bdp) = self.bdp {
297                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
298                    locked.bytes = Some(0); // reset
299                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
300
301                    let update = bdp.calculate(bytes, rtt);
302                    locked.next_bdp_at = Some(now + bdp.ping_delay);
303                    if let Some(update) = update {
304                        return Poll::Ready(Ponged::SizeUpdate(update));
305                    }
306                }
307            }
308            Poll::Ready(Err(_e)) => {
309                debug!("pong error: {}", _e);
310            }
311            Poll::Pending => {
312                if let Some(ref mut ka) = self.keep_alive {
313                    if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
314                        self.keep_alive = None;
315                        locked.is_keep_alive_timed_out = true;
316                        return Poll::Ready(Ponged::KeepAliveTimedOut);
317                    }
318                }
319            }
320        }
321
322        // XXX: this doesn't register a waker...?
323        Poll::Pending
324    }
325
326    fn is_idle(&self) -> bool {
327        Arc::strong_count(&self.shared) <= 2
328    }
329}
330
331// ===== impl Shared =====
332
333impl Shared {
334    fn send_ping(&mut self) {
335        match self.ping_pong.send_ping(Ping::opaque()) {
336            Ok(()) => {
337                self.ping_sent_at = Some(self.timer.now());
338                trace!("sent ping");
339            }
340            Err(_err) => {
341                debug!("error sending ping: {}", _err);
342            }
343        }
344    }
345
346    fn is_ping_sent(&self) -> bool {
347        self.ping_sent_at.is_some()
348    }
349
350    fn update_last_read_at(&mut self) {
351        if self.last_read_at.is_some() {
352            self.last_read_at = Some(self.timer.now());
353        }
354    }
355
356    fn last_read_at(&self) -> Instant {
357        self.last_read_at.expect("keep_alive expects last_read_at")
358    }
359}
360
361// ===== impl Bdp =====
362
363/// Any higher than this likely will be hitting the TCP flow control.
364const BDP_LIMIT: usize = 1024 * 1024 * 16;
365
366impl Bdp {
367    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
368        // No need to do any math if we're at the limit.
369        if self.bdp as usize == BDP_LIMIT {
370            self.stabilize_delay();
371            return None;
372        }
373
374        // average the rtt
375        let rtt = seconds(rtt);
376        if self.rtt == 0.0 {
377            // First sample means rtt is first rtt.
378            self.rtt = rtt;
379        } else {
380            // Weigh this rtt as 1/8 for a moving average.
381            self.rtt += (rtt - self.rtt) * 0.125;
382        }
383
384        // calculate the current bandwidth
385        let bw = (bytes as f64) / (self.rtt * 1.5);
386        trace!("current bandwidth = {:.1}B/s", bw);
387
388        if bw < self.max_bandwidth {
389            // not a faster bandwidth, so don't update
390            self.stabilize_delay();
391            return None;
392        } else {
393            self.max_bandwidth = bw;
394        }
395
396        // if the current `bytes` sample is at least 2/3 the previous
397        // bdp, increase to double the current sample.
398        if bytes >= self.bdp as usize * 2 / 3 {
399            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
400            trace!("BDP increased to {}", self.bdp);
401
402            self.stable_count = 0;
403            self.ping_delay /= 2;
404            Some(self.bdp)
405        } else {
406            self.stabilize_delay();
407            None
408        }
409    }
410
411    fn stabilize_delay(&mut self) {
412        if self.ping_delay < Duration::from_secs(10) {
413            self.stable_count += 1;
414
415            if self.stable_count >= 2 {
416                self.ping_delay *= 4;
417                self.stable_count = 0;
418            }
419        }
420    }
421}
422
423fn seconds(dur: Duration) -> f64 {
424    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
425    let secs = dur.as_secs() as f64;
426    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
427}
428
429// ===== impl KeepAlive =====
430
431impl KeepAlive {
432    fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
433        match self.state {
434            KeepAliveState::Init => {
435                if !self.while_idle && is_idle {
436                    return;
437                }
438
439                self.schedule(shared);
440            }
441            KeepAliveState::PingSent => {
442                if shared.is_ping_sent() {
443                    return;
444                }
445                self.schedule(shared);
446            }
447            KeepAliveState::Scheduled(..) => (),
448        }
449    }
450
451    fn schedule(&mut self, shared: &Shared) {
452        let interval = shared.last_read_at() + self.interval;
453        self.state = KeepAliveState::Scheduled(interval);
454        self.timer.reset(&mut self.sleep, interval);
455    }
456
457    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
458        match self.state {
459            KeepAliveState::Scheduled(at) => {
460                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
461                    return;
462                }
463                // check if we've received a frame while we were scheduled
464                if shared.last_read_at() + self.interval > at {
465                    self.state = KeepAliveState::Init;
466                    cx.waker().wake_by_ref(); // schedule us again
467                    return;
468                }
469                if !self.while_idle && is_idle {
470                    trace!("keep-alive no need to ping when idle and while_idle=false");
471                    return;
472                }
473                trace!("keep-alive interval ({:?}) reached", self.interval);
474                shared.send_ping();
475                self.state = KeepAliveState::PingSent;
476                let timeout = self.timer.now() + self.timeout;
477                self.timer.reset(&mut self.sleep, timeout);
478            }
479            KeepAliveState::Init | KeepAliveState::PingSent => (),
480        }
481    }
482
483    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
484        match self.state {
485            KeepAliveState::PingSent => {
486                if Pin::new(&mut self.sleep).poll(cx).is_pending() {
487                    return Ok(());
488                }
489                trace!("keep-alive timeout ({:?}) reached", self.timeout);
490                Err(KeepAliveTimedOut)
491            }
492            KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
493        }
494    }
495}
496
497// ===== impl KeepAliveTimedOut =====
498
499impl KeepAliveTimedOut {
500    pub(super) fn crate_error(self) -> crate::Error {
501        crate::Error::new(crate::error::Kind::Http2).with(self)
502    }
503}
504
505impl fmt::Display for KeepAliveTimedOut {
506    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
507        f.write_str("keep-alive timed out")
508    }
509}
510
511impl std::error::Error for KeepAliveTimedOut {
512    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
513        Some(&crate::error::TimedOut)
514    }
515}