base/
generic_channel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10
11use ipc_channel::ipc::IpcError;
12use ipc_channel::router::ROUTER;
13use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
14use serde::de::VariantAccess;
15use serde::{Deserialize, Deserializer, Serialize, Serializer};
16use servo_config::opts;
17
18mod callback;
19pub use callback::GenericCallback;
20
21/// Abstraction of the ability to send a particular type of message cross-process.
22/// This can be used to ease the use of GenericSender sub-fields.
23pub trait GenericSend<T>
24where
25    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
26{
27    /// send message T
28    fn send(&self, _: T) -> SendResult;
29    /// get underlying sender
30    fn sender(&self) -> GenericSender<T>;
31}
32
33/// A GenericSender that sends messages to a [GenericReceiver].
34///
35/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
36pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
37
38/// The actual GenericSender variant.
39///
40/// This enum is private, so that outside code can't construct a GenericSender itself.
41/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
42enum GenericSenderVariants<T: Serialize> {
43    Ipc(ipc_channel::ipc::IpcSender<T>),
44    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
45    /// which propagates the IPC error, the inner type is a Result.
46    /// In the IPC case, the Router deserializes the message, which can fail, and sends
47    /// the result to a crossbeam receiver.
48    /// The crossbeam channel does not involve serializing, so we can't have this error,
49    /// but replicating the API allows us to have one channel type as the receiver
50    /// after routing the receiver .
51    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
52}
53
54impl<T: Serialize> Serialize for GenericSender<T> {
55    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
56        match &self.0 {
57            GenericSenderVariants::Ipc(sender) => {
58                s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
59            },
60            // All GenericSenders will be IPC channels in multi-process mode, so sending a
61            // GenericChannel over existing IPC channels is no problem and won't fail.
62            // In single-process mode, we can also send GenericSenders over other GenericSenders
63            // just fine, since no serialization is required.
64            // The only reason we need / want serialization is to support sending GenericSenders
65            // over existing IPC channels **in single process mode**. This allows us to
66            // incrementally port channels to the GenericChannel, without needing to follow a
67            // top-to-bottom approach.
68            // Long-term we can remove this branch in the code again and replace it with
69            // unreachable, since likely all IPC channels would be GenericChannels.
70            GenericSenderVariants::Crossbeam(sender) => {
71                if opts::get().multiprocess {
72                    return Err(serde::ser::Error::custom(
73                        "Crossbeam channel found in multiprocess mode!",
74                    ));
75                } // We know everything is in one address-space, so we can "serialize" the sender by
76                // sending a leaked Box pointer.
77                let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
78                s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
79            },
80        }
81    }
82}
83
84struct GenericSenderVisitor<T> {
85    marker: PhantomData<T>,
86}
87
88impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
89    type Value = GenericSender<T>;
90
91    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
92        formatter.write_str("a GenericSender variant")
93    }
94
95    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
96    where
97        A: serde::de::EnumAccess<'de>,
98    {
99        #[derive(Deserialize)]
100        enum GenericSenderVariantNames {
101            Ipc,
102            Crossbeam,
103        }
104
105        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
106
107        match variant_name {
108            GenericSenderVariantNames::Ipc => variant_data
109                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
110                .map(|sender| GenericSender(GenericSenderVariants::Ipc(sender))),
111            GenericSenderVariantNames::Crossbeam => {
112                if opts::get().multiprocess {
113                    return Err(serde::de::Error::custom(
114                        "Crossbeam channel found in multiprocess mode!",
115                    ));
116                }
117                let addr = variant_data.newtype_variant::<usize>()?;
118                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
119                // SAFETY: We know we are in the same address space as the sender, so we can safely
120                // reconstruct the Box.
121                #[allow(unsafe_code)]
122                let sender = unsafe { Box::from_raw(ptr) };
123                Ok(GenericSender(GenericSenderVariants::Crossbeam(*sender)))
124            },
125        }
126    }
127}
128
129impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
130    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
131    where
132        D: Deserializer<'a>,
133    {
134        d.deserialize_enum(
135            "GenericSender",
136            &["Ipc", "Crossbeam"],
137            GenericSenderVisitor {
138                marker: PhantomData,
139            },
140        )
141    }
142}
143
144impl<T> Clone for GenericSender<T>
145where
146    T: Serialize,
147{
148    fn clone(&self) -> Self {
149        match self.0 {
150            GenericSenderVariants::Ipc(ref chan) => {
151                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
152            },
153            GenericSenderVariants::Crossbeam(ref chan) => {
154                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
155            },
156        }
157    }
158}
159
160impl<T: Serialize> fmt::Debug for GenericSender<T> {
161    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162        write!(f, "Sender(..)")
163    }
164}
165
166impl<T: Serialize> GenericSender<T> {
167    #[inline]
168    pub fn send(&self, msg: T) -> SendResult {
169        match self.0 {
170            GenericSenderVariants::Ipc(ref sender) => sender
171                .send(msg)
172                .map_err(|e| SendError::SerializationError(format!("{e}"))),
173            GenericSenderVariants::Crossbeam(ref sender) => {
174                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
175            },
176        }
177    }
178}
179
180impl<T: Serialize> MallocSizeOf for GenericSender<T> {
181    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
182        0
183    }
184}
185
186#[derive(Debug)]
187pub enum SendError {
188    Disconnected,
189    SerializationError(String),
190}
191
192impl Display for SendError {
193    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194        write!(f, "{self:?}")
195    }
196}
197
198pub type SendResult = Result<(), SendError>;
199
200#[derive(Debug)]
201pub enum ReceiveError {
202    DeserializationFailed(String),
203    /// Io Error. May occur when using IPC.
204    Io(std::io::Error),
205    /// The channel was closed.
206    Disconnected,
207}
208
209impl From<IpcError> for ReceiveError {
210    fn from(e: IpcError) -> Self {
211        match e {
212            IpcError::Disconnected => ReceiveError::Disconnected,
213            IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
214            IpcError::Io(reason) => ReceiveError::Io(reason),
215        }
216    }
217}
218
219impl From<crossbeam_channel::RecvError> for ReceiveError {
220    fn from(_: crossbeam_channel::RecvError) -> Self {
221        ReceiveError::Disconnected
222    }
223}
224
225pub enum TryReceiveError {
226    Empty,
227    ReceiveError(ReceiveError),
228}
229
230impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
231    fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
232        match e {
233            ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
234            ipc_channel::ipc::TryRecvError::IpcError(inner) => {
235                TryReceiveError::ReceiveError(inner.into())
236            },
237        }
238    }
239}
240
241impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
242    fn from(e: crossbeam_channel::TryRecvError) -> Self {
243        match e {
244            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
245            crossbeam_channel::TryRecvError::Disconnected => {
246                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
247            },
248        }
249    }
250}
251
252pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
253pub type ReceiveResult<T> = Result<T, ReceiveError>;
254pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
255pub type RoutedReceiverReceiveResult<T> =
256    Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
257
258pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
259    match receive_result {
260        Ok(Ok(msg)) => Ok(msg),
261        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
262        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
263    }
264}
265
266pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
267where
268    T: for<'de> Deserialize<'de> + Serialize;
269
270enum GenericReceiverVariants<T>
271where
272    T: for<'de> Deserialize<'de> + Serialize,
273{
274    Ipc(ipc_channel::ipc::IpcReceiver<T>),
275    Crossbeam(RoutedReceiver<T>),
276}
277
278impl<T> GenericReceiver<T>
279where
280    T: for<'de> Deserialize<'de> + Serialize,
281{
282    #[inline]
283    pub fn recv(&self) -> ReceiveResult<T> {
284        match self.0 {
285            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
286            GenericReceiverVariants::Crossbeam(ref receiver) => {
287                // `recv()` returns an error if the channel is disconnected
288                let msg = receiver.recv()?;
289                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
290                // unconditionally send an `Ok(T)`
291                Ok(msg.expect("Infallible"))
292            },
293        }
294    }
295
296    #[inline]
297    pub fn try_recv(&self) -> TryReceiveResult<T> {
298        match self.0 {
299            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
300            GenericReceiverVariants::Crossbeam(ref receiver) => {
301                let msg = receiver.try_recv()?;
302                Ok(msg.expect("Infallible"))
303            },
304        }
305    }
306
307    /// Route to a crossbeam receiver, preserving any errors.
308    ///
309    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
310    /// this creates a route.
311    #[inline]
312    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
313    where
314        T: Send + 'static,
315    {
316        match self.0 {
317            GenericReceiverVariants::Ipc(ipc_receiver) => {
318                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
319                let crossbeam_sender_clone = crossbeam_sender.clone();
320                ROUTER.add_typed_route(
321                    ipc_receiver,
322                    Box::new(move |message| {
323                        let _ = crossbeam_sender_clone.send(message);
324                    }),
325                );
326                crossbeam_receiver
327            },
328            GenericReceiverVariants::Crossbeam(receiver) => receiver,
329        }
330    }
331}
332
333impl<T> Serialize for GenericReceiver<T>
334where
335    T: for<'de> Deserialize<'de> + Serialize,
336{
337    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
338        match &self.0 {
339            GenericReceiverVariants::Ipc(receiver) => {
340                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
341            },
342            GenericReceiverVariants::Crossbeam(receiver) => {
343                if opts::get().multiprocess {
344                    return Err(serde::ser::Error::custom(
345                        "Crossbeam channel found in multiprocess mode!",
346                    ));
347                } // We know everything is in one address-space, so we can "serialize" the receiver by
348                // sending a leaked Box pointer.
349                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
350                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
351            },
352        }
353    }
354}
355
356struct GenericReceiverVisitor<T> {
357    marker: PhantomData<T>,
358}
359impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
360where
361    T: for<'a> Deserialize<'a> + Serialize,
362{
363    type Value = GenericReceiver<T>;
364
365    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
366        formatter.write_str("a GenericReceiver variant")
367    }
368
369    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
370    where
371        A: serde::de::EnumAccess<'de>,
372    {
373        #[derive(Deserialize)]
374        enum GenericReceiverVariantNames {
375            Ipc,
376            Crossbeam,
377        }
378
379        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
380
381        match variant_name {
382            GenericReceiverVariantNames::Ipc => variant_data
383                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
384                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
385            GenericReceiverVariantNames::Crossbeam => {
386                if opts::get().multiprocess {
387                    return Err(serde::de::Error::custom(
388                        "Crossbeam channel found in multiprocess mode!",
389                    ));
390                }
391                let addr = variant_data.newtype_variant::<usize>()?;
392                let ptr = addr as *mut RoutedReceiver<T>;
393                // SAFETY: We know we are in the same address space as the sender, so we can safely
394                // reconstruct the Box.
395                #[allow(unsafe_code)]
396                let receiver = unsafe { Box::from_raw(ptr) };
397                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
398                    *receiver,
399                )))
400            },
401        }
402    }
403}
404
405impl<'a, T> Deserialize<'a> for GenericReceiver<T>
406where
407    T: for<'de> Deserialize<'de> + Serialize,
408{
409    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
410    where
411        D: Deserializer<'a>,
412    {
413        d.deserialize_enum(
414            "GenericReceiver",
415            &["Ipc", "Crossbeam"],
416            GenericReceiverVisitor {
417                marker: PhantomData,
418            },
419        )
420    }
421}
422
423/// Private helper function to create a crossbeam based channel.
424///
425/// Do NOT make this function public!
426fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
427where
428    T: Serialize + for<'de> serde::Deserialize<'de>,
429{
430    let (tx, rx) = crossbeam_channel::unbounded();
431    (
432        GenericSender(GenericSenderVariants::Crossbeam(tx)),
433        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
434    )
435}
436
437fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
438where
439    T: Serialize + for<'de> serde::Deserialize<'de>,
440{
441    ipc_channel::ipc::channel().map(|(tx, rx)| {
442        (
443            GenericSender(GenericSenderVariants::Ipc(tx)),
444            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
445        )
446    })
447}
448
449/// Creates a Servo channel that can select different channel implementations based on multiprocess
450/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
451/// crossbeam channel is preferred.
452pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
453where
454    T: for<'de> Deserialize<'de> + Serialize,
455{
456    if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
457        new_generic_channel_ipc().ok()
458    } else {
459        Some(new_generic_channel_crossbeam())
460    }
461}
462
463#[cfg(test)]
464mod single_process_channel_tests {
465    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
466    //! can be sent over each other without problems in single-process mode.
467    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
468    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
469    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
470
471    #[test]
472    fn generic_crossbeam_can_send() {
473        let (tx, rx) = new_generic_channel_crossbeam();
474        tx.send(5).expect("Send failed");
475        let val = rx.recv().expect("Receive failed");
476        assert_eq!(val, 5);
477    }
478
479    #[test]
480    fn generic_crossbeam_ping_pong() {
481        let (tx, rx) = new_generic_channel_crossbeam();
482        let (tx2, rx2) = new_generic_channel_crossbeam();
483
484        tx.send(tx2).expect("Send failed");
485
486        std::thread::scope(|s| {
487            s.spawn(move || {
488                let reply_sender = rx.recv().expect("Receive failed");
489                reply_sender.send(42).expect("Sending reply failed");
490            });
491        });
492        let res = rx2.recv().expect("Receive of reply failed");
493        assert_eq!(res, 42);
494    }
495
496    #[test]
497    fn generic_ipc_ping_pong() {
498        let (tx, rx) = new_generic_channel_ipc().unwrap();
499        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
500
501        tx.send(tx2).expect("Send failed");
502
503        std::thread::scope(|s| {
504            s.spawn(move || {
505                let reply_sender = rx.recv().expect("Receive failed");
506                reply_sender.send(42).expect("Sending reply failed");
507            });
508        });
509        let res = rx2.recv().expect("Receive of reply failed");
510        assert_eq!(res, 42);
511    }
512
513    #[test]
514    fn send_crossbeam_sender_over_ipc_channel() {
515        let (tx, rx) = new_generic_channel_ipc().unwrap();
516        let (tx2, rx2) = new_generic_channel_crossbeam();
517
518        tx.send(tx2).expect("Send failed");
519
520        std::thread::scope(|s| {
521            s.spawn(move || {
522                let reply_sender = rx.recv().expect("Receive failed");
523                reply_sender.send(42).expect("Sending reply failed");
524            });
525        });
526        let res = rx2.recv().expect("Receive of reply failed");
527        assert_eq!(res, 42);
528    }
529
530    #[test]
531    fn send_generic_ipc_channel_over_crossbeam() {
532        let (tx, rx) = new_generic_channel_crossbeam();
533        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
534
535        tx.send(tx2).expect("Send failed");
536
537        std::thread::scope(|s| {
538            s.spawn(move || {
539                let reply_sender = rx.recv().expect("Receive failed");
540                reply_sender.send(42).expect("Sending reply failed");
541            });
542        });
543        let res = rx2.recv().expect("Receive of reply failed");
544        assert_eq!(res, 42);
545    }
546
547    #[test]
548    fn send_crossbeam_receiver_over_ipc_channel() {
549        let (tx, rx) = new_generic_channel_ipc().unwrap();
550        let (tx2, rx2) = new_generic_channel_crossbeam();
551
552        tx.send(rx2).expect("Send failed");
553        tx2.send(42).expect("Send failed");
554
555        std::thread::scope(|s| {
556            s.spawn(move || {
557                let another_receiver = rx.recv().expect("Receive failed");
558                let res = another_receiver.recv().expect("Receive failed");
559                assert_eq!(res, 42);
560            });
561        });
562    }
563}