base/generic_channel/
callback.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! # Generic Callbacks
6//!
7//! When sending cross-process messages, we sometimes want to run custom callbacks when the
8//! recipient has finished processing. The callback should run in the sender's address space, and
9//! could be something like enqueuing a task.
10//! In Multi-process mode we can implement this by providing an `IpcSender` to the recipient,
11//! which the recipient can use to send some data back to the senders process.
12//! To avoid blocking the sender, we can pass the callback to the ROUTER, which runs the callback
13//! when receiving the Ipc message.
14//! The callback will be run on every reply message from the recipient. `IpcSender`s are also
15//! `Clone`able, so the Router will sequentialise callbacks.
16//!
17//! ## Callback scenario visualization
18//!
19//! The following visualization showcases how Ipc and the router thread are currently used
20//! to run callbacks asynchronously on the sender process. The recipient may keep the
21//! ReplySender alive and send an arbitrary amount of messages / replies.
22//!
23//! ```none
24//!               Process A                      |              Process B
25//!                                              |
26//! +---------+   IPC: SendMessage(ReplySender)  |          +-------------+  clone  +-------------+
27//! | Sender  |-------------------------------------------> |  Recipient  | ------> | ReplySender |
28//! +---------+                                  |          +-------------+         +-------------+
29//!   |                                          |                 |                       |
30//!   | RegisterCallback A  +---------+          |  Send Reply 1   |        Send Reply 2   |
31//!   + ------------------> | Router  | <--------------------------+-----------------------+
32//!                         +---------+          |
33//!                             | A(reply1)      |
34//!                             | A(reply2)      |
35//!                             |     ...        |
36//!                             v                |
37//!                                              |
38//! ```
39//!
40//!
41//! ## Optimizing single-process mode.
42//!
43//! In Single-process mode, there is no need for the Recipient to send an IpcReply,
44//! since they are in the same address space and could just execute the callback directly.
45//! Since we want to create an abstraction over such callbacks, we need to consider constraints
46//! that the existing multiprocess Ipc solution imposes on us:
47//!
48//! - Support for `FnMut` callbacks (internal mutable state + multiple calls)
49//! - The abstraction should be `Clone`able
50//!
51//! These constraints motivate the [GenericCallback] type, which supports `FnMut` callbacks
52//! and is clonable. This requires wrapping the callback with `Arc<Mutex<>>`, which also adds
53//! synchronization, which could be something that existing callbacks rely on.
54//!
55//! ### Future work
56//!
57//! - Further abstractions for callbacks with fewer constraints, e.g. callbacks
58//!   which don't need to be cloned by the recipient, or non-mutable callbacks.
59//! - A tracing option to measure callback runtime and identify callbacks which misbehave (block)
60//!   for a long time.
61
62use std::fmt;
63use std::marker::PhantomData;
64use std::sync::{Arc, Mutex};
65
66use ipc_channel::ErrorKind;
67use ipc_channel::ipc::IpcSender;
68use ipc_channel::router::ROUTER;
69use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
70use serde::de::VariantAccess;
71use serde::{Deserialize, Deserializer, Serialize, Serializer};
72use servo_config::opts;
73
74use crate::generic_channel::{SendError, SendResult};
75
76/// The callback type of our messages.
77///
78/// This is equivalent to [TypedRouterHandler][ipc_channel::router::TypedRouterHandler],
79/// except that this type is not wrapped in a Box.
80/// The callback will be wrapped in either a Box or an Arc, depending on if it is run on
81/// the router, or passed to the recipient.
82pub type MsgCallback<T> = dyn FnMut(Result<T, ipc_channel::Error>) + Send;
83
84/// A mechanism to run a callback in the process this callback was constructed in.
85///
86/// The GenericCallback can be sent cross-process (in multi-process mode). In this case
87/// the callback will be executed on the [ROUTER] thread.
88/// In single-process mode the callback will be executed directly.
89pub struct GenericCallback<T>(GenericCallbackVariants<T>)
90where
91    T: Serialize + Send + 'static;
92
93enum GenericCallbackVariants<T>
94where
95    T: Serialize + Send + 'static,
96{
97    CrossProcess(IpcSender<T>),
98    InProcess(Arc<Mutex<MsgCallback<T>>>),
99}
100
101impl<T> Clone for GenericCallback<T>
102where
103    T: Serialize + Send + 'static,
104{
105    fn clone(&self) -> Self {
106        let variant = match &self.0 {
107            GenericCallbackVariants::CrossProcess(sender) => {
108                GenericCallbackVariants::CrossProcess((*sender).clone())
109            },
110            GenericCallbackVariants::InProcess(callback) => {
111                GenericCallbackVariants::InProcess(callback.clone())
112            },
113        };
114        GenericCallback(variant)
115    }
116}
117
118impl<T> MallocSizeOf for GenericCallback<T>
119where
120    T: Serialize + Send + 'static,
121{
122    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
123        0
124    }
125}
126
127impl<T> GenericCallback<T>
128where
129    T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
130{
131    /// Creates a new GenericCallback.
132    ///
133    /// The callback should not do any heavy work and not block.
134    pub fn new<F: FnMut(Result<T, ipc_channel::Error>) + Send + 'static>(
135        callback: F,
136    ) -> Result<Self, ipc_channel::Error> {
137        let generic_callback = if opts::get().multiprocess || opts::get().force_ipc {
138            let (ipc_sender, ipc_receiver) = ipc_channel::ipc::channel()?;
139            ROUTER.add_typed_route(ipc_receiver, Box::new(callback));
140            GenericCallback(GenericCallbackVariants::CrossProcess(ipc_sender))
141        } else {
142            let callback = Arc::new(Mutex::new(callback));
143            GenericCallback(GenericCallbackVariants::InProcess(callback))
144        };
145        Ok(generic_callback)
146    }
147
148    /// Send `value` to the callback.
149    ///
150    /// Note that a return value of `Ok()` simply means that value was sent successfully
151    /// to the callback. The callback itself does not return any value.
152    /// The caller may not assume that the callback is executed synchronously.
153    pub fn send(&self, value: T) -> SendResult {
154        match &self.0 {
155            GenericCallbackVariants::CrossProcess(sender) => {
156                sender.send(value).map_err(|error| match *error {
157                    ErrorKind::Io(_) => SendError::Disconnected,
158                    serialization_error => {
159                        SendError::SerializationError(serialization_error.to_string())
160                    },
161                })
162            },
163            GenericCallbackVariants::InProcess(callback) => {
164                let mut cb = callback.lock().expect("poisoned");
165                (*cb)(Ok(value));
166                Ok(())
167            },
168        }
169    }
170}
171
172impl<T> Serialize for GenericCallback<T>
173where
174    T: Serialize + Send + 'static,
175{
176    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
177        match &self.0 {
178            GenericCallbackVariants::CrossProcess(sender) => {
179                s.serialize_newtype_variant("GenericCallback", 0, "CrossProcess", sender)
180            },
181            // The only reason we need / want serialization in single-process mode is to support
182            // sending GenericCallbacks over existing IPC channels. This allows us to
183            // incrementally port IPC channels to the GenericChannel, without needing to follow a
184            // top-to-bottom approach.
185            // Long-term we can remove this branch in the code again and replace it with
186            // unreachable, since likely all IPC channels would be GenericChannels.
187            GenericCallbackVariants::InProcess(wrapped_callback) => {
188                if opts::get().multiprocess {
189                    return Err(serde::ser::Error::custom(
190                        "InProcess callback can't be serialized in multiprocess mode",
191                    ));
192                }
193                // Due to the signature of `serialize` we need to clone the Arc to get an owned
194                // pointer we can leak.
195                // We additionally need to Box to get a thin pointer.
196                let cloned_callback = Box::new(wrapped_callback.clone());
197                let sender_clone_addr = Box::leak(cloned_callback) as *mut Arc<_> as usize;
198                s.serialize_newtype_variant("GenericCallback", 1, "InProcess", &sender_clone_addr)
199            },
200        }
201    }
202}
203
204struct GenericCallbackVisitor<T> {
205    marker: PhantomData<T>,
206}
207
208impl<'de, T> serde::de::Visitor<'de> for GenericCallbackVisitor<T>
209where
210    T: Serialize + Deserialize<'de> + Send + 'static,
211{
212    type Value = GenericCallback<T>;
213
214    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
215        formatter.write_str("a GenericCallback variant")
216    }
217
218    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
219    where
220        A: serde::de::EnumAccess<'de>,
221    {
222        #[derive(Deserialize)]
223        enum GenericCallbackVariantNames {
224            CrossProcess,
225            InProcess,
226        }
227
228        let (variant_name, variant_data): (GenericCallbackVariantNames, _) = data.variant()?;
229
230        match variant_name {
231            GenericCallbackVariantNames::CrossProcess => variant_data
232                .newtype_variant::<IpcSender<T>>()
233                .map(|sender| GenericCallback(GenericCallbackVariants::CrossProcess(sender))),
234            GenericCallbackVariantNames::InProcess => {
235                if opts::get().multiprocess {
236                    return Err(serde::de::Error::custom(
237                        "InProcess callback found in multiprocess mode",
238                    ));
239                }
240                let addr = variant_data.newtype_variant::<usize>()?;
241                let ptr = addr as *mut Arc<Mutex<_>>;
242                // SAFETY: We know we are in the same address space as the sender, so we can safely
243                // reconstruct the Arc, that we previously leaked with `into_raw` during
244                // serialization.
245                // Attention: Code reviewers should carefully compare the deserialization here
246                // with the serialization above.
247                #[allow(unsafe_code)]
248                let callback = unsafe { Box::from_raw(ptr) };
249                Ok(GenericCallback(GenericCallbackVariants::InProcess(
250                    *callback,
251                )))
252            },
253        }
254    }
255}
256
257impl<'a, T> Deserialize<'a> for GenericCallback<T>
258where
259    T: Serialize + Deserialize<'a> + Send + 'static,
260{
261    fn deserialize<D>(d: D) -> Result<GenericCallback<T>, D::Error>
262    where
263        D: Deserializer<'a>,
264    {
265        d.deserialize_enum(
266            "GenericCallback",
267            &["CrossProcess", "InProcess"],
268            GenericCallbackVisitor {
269                marker: PhantomData,
270            },
271        )
272    }
273}
274
275impl<T> fmt::Debug for GenericCallback<T>
276where
277    T: Serialize + Send + 'static,
278{
279    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
280        write!(f, "GenericCallback(..)")
281    }
282}
283
284#[cfg(test)]
285mod single_process_callback_test {
286    use std::sync::Arc;
287    use std::sync::atomic::{AtomicUsize, Ordering};
288
289    use crate::generic_channel::GenericCallback;
290
291    #[test]
292    fn generic_callback() {
293        let number = Arc::new(AtomicUsize::new(0));
294        let number_clone = number.clone();
295        let callback = move |msg: Result<usize, ipc_channel::Error>| {
296            number_clone.store(msg.unwrap(), Ordering::SeqCst)
297        };
298        let generic_callback = GenericCallback::new(callback).unwrap();
299        std::thread::scope(|s| {
300            s.spawn(move || generic_callback.send(42));
301        });
302        assert_eq!(number.load(Ordering::SeqCst), 42);
303    }
304
305    #[test]
306    fn generic_callback_via_generic_sender() {
307        let number = Arc::new(AtomicUsize::new(0));
308        let number_clone = number.clone();
309        let callback = move |msg: Result<usize, ipc_channel::Error>| {
310            number_clone.store(msg.unwrap(), Ordering::SeqCst)
311        };
312        let generic_callback = GenericCallback::new(callback).unwrap();
313        let (tx, rx) = crate::generic_channel::channel().unwrap();
314
315        tx.send(generic_callback).unwrap();
316        std::thread::scope(|s| {
317            s.spawn(move || {
318                let callback = rx.recv().unwrap();
319                callback.send(42).unwrap();
320            });
321        });
322        assert_eq!(number.load(Ordering::SeqCst), 42);
323    }
324
325    #[test]
326    fn generic_callback_via_ipc_sender() {
327        let number = Arc::new(AtomicUsize::new(0));
328        let number_clone = number.clone();
329        let callback = move |msg: Result<usize, ipc_channel::Error>| {
330            number_clone.store(msg.unwrap(), Ordering::SeqCst)
331        };
332        let generic_callback = GenericCallback::new(callback).unwrap();
333        let (tx, rx) = ipc_channel::ipc::channel().unwrap();
334
335        tx.send(generic_callback).unwrap();
336        std::thread::scope(|s| {
337            s.spawn(move || {
338                let callback = rx.recv().unwrap();
339                callback.send(42).unwrap();
340            });
341        });
342        assert_eq!(number.load(Ordering::SeqCst), 42);
343    }
344}