servo_base/generic_channel/buffered.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::mem;
7use std::panic::Location;
8
9use serde::Serialize;
10
11use super::{GenericSender, SendResult};
12
13/// A buffered sender that collects individual messages (`U`) and sends them
14/// as a single batched message (`T`) via a user-provided packing closure.
15///
16/// The buffer is flushed automatically when it reaches `max_buffer` items,
17/// or explicitly via [`flush`](GenericBufferedSender::flush).
18/// [`send_immediate`](GenericBufferedSender::send_immediate)
19/// combines the current buffer contents with the new message into a single
20/// packed message, ensuring ordering without an extra flush step.
21pub struct GenericBufferedSender<T, U>
22where
23 T: Serialize,
24{
25 sender: GenericSender<T>,
26 buffer: RefCell<Vec<U>>,
27 buffering: Box<dyn Fn(Vec<U>) -> T>,
28 max_buffer: usize,
29}
30
31impl<T: Serialize, U> GenericBufferedSender<T, U> {
32 /// Create a new buffered sender.
33 ///
34 /// * `sender` — the underlying `GenericSender<T>` that delivers packed messages.
35 /// * `buffering` — closure that packs a `Vec<U>` into a single `T`.
36 /// * `max_buffer` — automatic flush is triggered when the buffer reaches this size.
37 pub fn new(
38 sender: GenericSender<T>,
39 buffering: Box<dyn Fn(Vec<U>) -> T>,
40 max_buffer: usize,
41 ) -> Self {
42 Self {
43 sender,
44 buffer: RefCell::new(Vec::new()),
45 buffering,
46 max_buffer,
47 }
48 }
49
50 /// Returns `true` if the buffer contains no pending messages.
51 pub fn is_empty(&self) -> bool {
52 self.buffer.borrow().is_empty()
53 }
54
55 /// Returns the number of pending messages in the buffer.
56 pub fn len(&self) -> usize {
57 self.buffer.borrow().len()
58 }
59
60 /// Buffer a message for later batched delivery.
61 ///
62 /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
63 pub fn send(&self, msg: U) -> SendResult {
64 if self.buffer.borrow().len() + 1 >= self.max_buffer {
65 self.send_immediate(msg)
66 } else {
67 self.buffer.borrow_mut().push(msg);
68 Ok(())
69 }
70 }
71
72 #[inline]
73 #[track_caller]
74 /// Buffer a message for later batched delivery.
75 ///
76 /// If the buffer reaches `max_buffer` items an automatic flush is triggered.
77 /// Errors on automatic flush are logged.
78 pub fn send_or_warn(&self, msg: U) {
79 if let Err(error) = self.send(msg) {
80 let location = Location::caller();
81 log::warn!("Failed to send buffered messages due to `{error}` at {location:?}");
82 }
83 }
84
85 /// Deliver a message immediately, combining it with any
86 /// buffered messages into a single packed `T`.
87 pub fn send_immediate(&self, msg: U) -> SendResult {
88 let mut buffer = self.buffer.borrow_mut();
89 buffer.push(msg);
90 let msgs = mem::take(&mut *buffer);
91 drop(buffer);
92 let packed = (self.buffering)(msgs);
93 self.sender.send(packed)
94 }
95
96 #[inline]
97 #[track_caller]
98 /// Deliver a message immediately, combining it with any
99 /// buffered messages into a single packed `T`.
100 ///
101 /// Errors are logged.
102 pub fn send_immediate_or_warn(&self, msg: U) {
103 if let Err(error) = self.send_immediate(msg) {
104 let location = Location::caller();
105 log::warn!(
106 "Failed to send (immediate) buffered messages due to `{error}` at {location:?}"
107 );
108 }
109 }
110
111 /// Flush all buffered messages by packing them into a single `T` and
112 /// sending it.
113 pub fn flush(&self) -> SendResult {
114 let mut buffer = self.buffer.borrow_mut();
115 if buffer.is_empty() {
116 return Ok(());
117 }
118 let msgs = mem::take(&mut *buffer);
119 drop(buffer);
120 let packed = (self.buffering)(msgs);
121 self.sender.send(packed)
122 }
123
124 #[inline]
125 #[track_caller]
126 /// Flush all buffered messages by packing them into a single `T` and sending it.
127 /// Errors are logged
128 pub fn flush_or_warn(&self) {
129 if let Err(error) = self.flush() {
130 let location = Location::caller();
131 log::warn!("Failed to flush buffered messages due to `{error}` at {location:?}");
132 }
133 }
134
135 /// Discard all buffered messages without sending them.
136 pub fn discard(&self) {
137 self.buffer.borrow_mut().clear();
138 }
139}
140
141impl<T: Serialize, U> Drop for GenericBufferedSender<T, U> {
142 fn drop(&mut self) {
143 // Best-effort flush on drop. Ignore send failures.
144 if !self.buffer.borrow().is_empty() {
145 let msgs = mem::take(&mut *self.buffer.borrow_mut());
146 let packed = (self.buffering)(msgs);
147 let _ = self.sender.send(packed);
148 }
149 }
150}