Skip to main content

servo_base/generic_channel/
buffered.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::mem;
7use std::panic::Location;
8
9use serde::Serialize;
10
11use super::{GenericSender, SendResult};
12
13/// A buffered sender that collects individual messages (`U`) and sends them
14/// as a single batched message (`T`) via a user-provided packing closure.
15///
16/// The buffer is flushed automatically when it reaches `max_buffer` items,
17/// or explicitly via [`flush`](GenericBufferedSender::flush).
18/// [`send_immediate`](GenericBufferedSender::send_immediate)
19/// combines the current buffer contents with the new message into a single
20/// packed message, ensuring ordering without an extra flush step.
21pub struct GenericBufferedSender<T, U>
22where
23    T: Serialize,
24{
25    sender: GenericSender<T>,
26    buffer: RefCell<Vec<U>>,
27    buffering: Box<dyn Fn(Vec<U>) -> T>,
28    max_buffer: usize,
29}
30
31impl<T: Serialize, U> GenericBufferedSender<T, U> {
32    /// Create a new buffered sender.
33    ///
34    /// * `sender` — the underlying `GenericSender<T>` that delivers packed messages.
35    /// * `buffering` — closure that packs a `Vec<U>` into a single `T`.
36    /// * `max_buffer` — automatic flush is triggered when the buffer reaches this size.
37    pub fn new(
38        sender: GenericSender<T>,
39        buffering: Box<dyn Fn(Vec<U>) -> T>,
40        max_buffer: usize,
41    ) -> Self {
42        Self {
43            sender,
44            buffer: RefCell::new(Vec::new()),
45            buffering,
46            max_buffer,
47        }
48    }
49
50    /// Returns `true` if the buffer contains no pending messages.
51    pub fn is_empty(&self) -> bool {
52        self.buffer.borrow().is_empty()
53    }
54
55    /// Returns the number of pending messages in the buffer.
56    pub fn len(&self) -> usize {
57        self.buffer.borrow().len()
58    }
59
60    /// Buffer a message for later batched delivery.
61    ///
62    /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
63    pub fn send(&self, msg: U) -> SendResult {
64        if self.buffer.borrow().len() + 1 >= self.max_buffer {
65            self.send_immediate(msg)
66        } else {
67            self.buffer.borrow_mut().push(msg);
68            Ok(())
69        }
70    }
71
72    #[inline]
73    #[track_caller]
74    /// Buffer a message for later batched delivery.
75    ///
76    /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
77    /// Errors on automatic flush are logged.
78    pub fn send_or_warn(&self, msg: U) {
79        if let Err(error) = self.send(msg) {
80            let location = Location::caller();
81            log::warn!("Failed to send buffered messages due to `{error}` at {location:?}");
82        }
83    }
84
85    /// Deliver a message immediately, combining it with any
86    /// buffered messages into a single packed `T`.
87    pub fn send_immediate(&self, msg: U) -> SendResult {
88        let mut buffer = self.buffer.borrow_mut();
89        buffer.push(msg);
90        let msgs = mem::take(&mut *buffer);
91        drop(buffer);
92        let packed = (self.buffering)(msgs);
93        self.sender.send(packed)
94    }
95
96    #[inline]
97    #[track_caller]
98    /// Deliver a message immediately, combining it with any
99    /// buffered messages into a single packed `T`.
100    ///
101    /// Errors are logged.
102    pub fn send_immediate_or_warn(&self, msg: U) {
103        if let Err(error) = self.send_immediate(msg) {
104            let location = Location::caller();
105            log::warn!(
106                "Failed to send (immediate) buffered messages due to `{error}` at {location:?}"
107            );
108        }
109    }
110
111    /// Flush all buffered messages by packing them into a single `T` and
112    /// sending it.
113    pub fn flush(&self) -> SendResult {
114        let mut buffer = self.buffer.borrow_mut();
115        if buffer.is_empty() {
116            return Ok(());
117        }
118        let msgs = mem::take(&mut *buffer);
119        drop(buffer);
120        let packed = (self.buffering)(msgs);
121        self.sender.send(packed)
122    }
123
124    #[inline]
125    #[track_caller]
126    /// Flush all buffered messages by packing them into a single `T` and sending it.
127    /// Errors are logged
128    pub fn flush_or_warn(&self) {
129        if let Err(error) = self.flush() {
130            let location = Location::caller();
131            log::warn!("Failed to flush buffered messages due to `{error}` at {location:?}");
132        }
133    }
134
135    /// Discard all buffered messages without sending them.
136    pub fn discard(&self) {
137        self.buffer.borrow_mut().clear();
138    }
139}
140
141impl<T: Serialize, U> Drop for GenericBufferedSender<T, U> {
142    fn drop(&mut self) {
143        // Best-effort flush on drop. Ignore send failures.
144        if !self.buffer.borrow().is_empty() {
145            let msgs = mem::take(&mut *self.buffer.borrow_mut());
146            let packed = (self.buffering)(msgs);
147            let _ = self.sender.send(packed);
148        }
149    }
150}