Skip to main content

servo_base/generic_channel/
buffered.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::mem;
7
8use serde::Serialize;
9
10use super::{GenericSender, SendResult};
11
12/// A buffered sender that collects individual messages (`U`) and sends them
13/// as a single batched message (`T`) via a user-provided packing closure.
14///
15/// The buffer is flushed automatically when it reaches `max_buffer` items,
16/// or explicitly via [`flush`](GenericBufferedSender::flush).
17/// [`send_immediate`](GenericBufferedSender::send_immediate)
18/// combines the current buffer contents with the new message into a single
19/// packed message, ensuring ordering without an extra flush step.
20pub struct GenericBufferedSender<T, U>
21where
22    T: Serialize,
23{
24    sender: GenericSender<T>,
25    buffer: RefCell<Vec<U>>,
26    buffering: Box<dyn Fn(Vec<U>) -> T>,
27    max_buffer: usize,
28}
29
30impl<T: Serialize, U> GenericBufferedSender<T, U> {
31    /// Create a new buffered sender.
32    ///
33    /// * `sender` — the underlying `GenericSender<T>` that delivers packed messages.
34    /// * `buffering` — closure that packs a `Vec<U>` into a single `T`.
35    /// * `max_buffer` — automatic flush is triggered when the buffer reaches this size.
36    pub fn new(
37        sender: GenericSender<T>,
38        buffering: Box<dyn Fn(Vec<U>) -> T>,
39        max_buffer: usize,
40    ) -> Self {
41        Self {
42            sender,
43            buffer: RefCell::new(Vec::new()),
44            buffering,
45            max_buffer,
46        }
47    }
48
49    /// Returns `true` if the buffer contains no pending messages.
50    pub fn is_empty(&self) -> bool {
51        self.buffer.borrow().is_empty()
52    }
53
54    /// Returns the number of pending messages in the buffer.
55    pub fn len(&self) -> usize {
56        self.buffer.borrow().len()
57    }
58
59    /// Change the auto-flush threshold.
60    pub fn set_max_buffer(&mut self, max: usize) {
61        self.max_buffer = max;
62    }
63
64    /// Buffer a message for later batched delivery.
65    ///
66    /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
67    pub fn send(&self, msg: U) -> SendResult {
68        if self.buffer.borrow().len() + 1 >= self.max_buffer {
69            self.send_immediate(msg)
70        } else {
71            self.buffer.borrow_mut().push(msg);
72            Ok(())
73        }
74    }
75
76    /// Deliver a message immediately, combining it with any
77    /// buffered messages into a single packed `T`.
78    pub fn send_immediate(&self, msg: U) -> SendResult {
79        let mut buffer = self.buffer.borrow_mut();
80        buffer.push(msg);
81        let msgs = mem::take(&mut *buffer);
82        drop(buffer);
83        let packed = (self.buffering)(msgs);
84        self.sender.send(packed)
85    }
86
87    /// Flush all buffered messages by packing them into a single `T` and
88    /// sending it.
89    pub fn flush(&self) -> SendResult {
90        let mut buffer = self.buffer.borrow_mut();
91        if buffer.is_empty() {
92            return Ok(());
93        }
94        let msgs = mem::take(&mut *buffer);
95        drop(buffer);
96        let packed = (self.buffering)(msgs);
97        self.sender.send(packed)
98    }
99}
100
101impl<T: Serialize, U> Drop for GenericBufferedSender<T, U> {
102    fn drop(&mut self) {
103        // Best-effort flush on drop. Ignore send failures.
104        if !self.buffer.borrow().is_empty() {
105            let msgs = mem::take(&mut *self.buffer.borrow_mut());
106            let packed = (self.buffering)(msgs);
107            let _ = self.sender.send(packed);
108        }
109    }
110}