servo_base/generic_channel/buffered.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::mem;
7
8use serde::Serialize;
9
10use super::{GenericSender, SendResult};
11
12/// A buffered sender that collects individual messages (`U`) and sends them
13/// as a single batched message (`T`) via a user-provided packing closure.
14///
15/// The buffer is flushed automatically when it reaches `max_buffer` items,
16/// or explicitly via [`flush`](GenericBufferedSender::flush).
17/// [`send_immediate`](GenericBufferedSender::send_immediate)
18/// combines the current buffer contents with the new message into a single
19/// packed message, ensuring ordering without an extra flush step.
20pub struct GenericBufferedSender<T, U>
21where
22 T: Serialize,
23{
24 sender: GenericSender<T>,
25 buffer: RefCell<Vec<U>>,
26 buffering: Box<dyn Fn(Vec<U>) -> T>,
27 max_buffer: usize,
28}
29
30impl<T: Serialize, U> GenericBufferedSender<T, U> {
31 /// Create a new buffered sender.
32 ///
33 /// * `sender` — the underlying `GenericSender<T>` that delivers packed messages.
34 /// * `buffering` — closure that packs a `Vec<U>` into a single `T`.
35 /// * `max_buffer` — automatic flush is triggered when the buffer reaches this size.
36 pub fn new(
37 sender: GenericSender<T>,
38 buffering: Box<dyn Fn(Vec<U>) -> T>,
39 max_buffer: usize,
40 ) -> Self {
41 Self {
42 sender,
43 buffer: RefCell::new(Vec::new()),
44 buffering,
45 max_buffer,
46 }
47 }
48
49 /// Returns `true` if the buffer contains no pending messages.
50 pub fn is_empty(&self) -> bool {
51 self.buffer.borrow().is_empty()
52 }
53
54 /// Returns the number of pending messages in the buffer.
55 pub fn len(&self) -> usize {
56 self.buffer.borrow().len()
57 }
58
59 /// Change the auto-flush threshold.
60 pub fn set_max_buffer(&mut self, max: usize) {
61 self.max_buffer = max;
62 }
63
64 /// Buffer a message for later batched delivery.
65 ///
66 /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
67 pub fn send(&self, msg: U) -> SendResult {
68 if self.buffer.borrow().len() + 1 >= self.max_buffer {
69 self.send_immediate(msg)
70 } else {
71 self.buffer.borrow_mut().push(msg);
72 Ok(())
73 }
74 }
75
76 /// Deliver a message immediately, combining it with any
77 /// buffered messages into a single packed `T`.
78 pub fn send_immediate(&self, msg: U) -> SendResult {
79 let mut buffer = self.buffer.borrow_mut();
80 buffer.push(msg);
81 let msgs = mem::take(&mut *buffer);
82 drop(buffer);
83 let packed = (self.buffering)(msgs);
84 self.sender.send(packed)
85 }
86
87 /// Flush all buffered messages by packing them into a single `T` and
88 /// sending it.
89 pub fn flush(&self) -> SendResult {
90 let mut buffer = self.buffer.borrow_mut();
91 if buffer.is_empty() {
92 return Ok(());
93 }
94 let msgs = mem::take(&mut *buffer);
95 drop(buffer);
96 let packed = (self.buffering)(msgs);
97 self.sender.send(packed)
98 }
99}
100
101impl<T: Serialize, U> Drop for GenericBufferedSender<T, U> {
102 fn drop(&mut self) {
103 // Best-effort flush on drop. Ignore send failures.
104 if !self.buffer.borrow().is_empty() {
105 let msgs = mem::take(&mut *self.buffer.borrow_mut());
106 let packed = (self.buffering)(msgs);
107 let _ = self.sender.send(packed);
108 }
109 }
110}