servo_base/generic_channel/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod lazy_callback;
25pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
26mod oneshot;
27mod shared_memory;
28pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
29pub use shared_memory::GenericSharedMemory;
30mod generic_channelset;
31pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
32
33/// Cache for being in Ipc Mode
34static USE_IPC: OnceLock<bool> = OnceLock::new();
35
36/// Return if we should be in IPC Mode
37fn use_ipc() -> bool {
38    *USE_IPC.get_or_init(|| {
39        servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
40    })
41}
42
43/// Abstraction of the ability to send a particular type of message cross-process.
44/// This can be used to ease the use of GenericSender sub-fields.
45pub trait GenericSend<T>
46where
47    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
48{
49    /// send message T
50    fn send(&self, _: T) -> SendResult;
51    /// get underlying sender
52    fn sender(&self) -> GenericSender<T>;
53}
54
55/// A GenericSender that sends messages to a [GenericReceiver].
56///
57/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
58pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
59
60/// The actual GenericSender variant.
61///
62/// This enum is private, so that outside code can't construct a GenericSender itself.
63/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
64enum GenericSenderVariants<T: Serialize> {
65    Ipc(ipc_channel::ipc::IpcSender<T>),
66    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
67    /// which propagates the IPC error, the inner type is a Result.
68    /// In the IPC case, the Router deserializes the message, which can fail, and sends
69    /// the result to a crossbeam receiver.
70    /// The crossbeam channel does not involve serializing, so we can't have this error,
71    /// but replicating the API allows us to have one channel type as the receiver
72    /// after routing the receiver .
73    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
74}
75
76fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
77    value: &GenericSenderVariants<T>,
78    s: S,
79) -> Result<S::Ok, S::Error> {
80    match value {
81        GenericSenderVariants::Ipc(sender) => {
82            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
83        },
84        // All GenericSenders will be IPC channels in multi-process mode, so sending a
85        // GenericChannel over existing IPC channels is no problem and won't fail.
86        // In single-process mode, we can also send GenericSenders over other GenericSenders
87        // just fine, since no serialization is required.
88        // The only reason we need / want serialization is to support sending GenericSenders
89        // over existing IPC channels **in single process mode**. This allows us to
90        // incrementally port channels to the GenericChannel, without needing to follow a
91        // top-to-bottom approach.
92        // Long-term we can remove this branch in the code again and replace it with
93        // unreachable, since likely all IPC channels would be GenericChannels.
94        GenericSenderVariants::Crossbeam(sender) => {
95            if opts::get().multiprocess {
96                return Err(serde::ser::Error::custom(
97                    "Crossbeam channel found in multiprocess mode!",
98                ));
99            } // We know everything is in one address-space, so we can "serialize" the sender by
100            // sending a leaked Box pointer.
101            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
102            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
103        },
104    }
105}
106
107impl<T: Serialize> Serialize for GenericSender<T> {
108    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
109        serialize_generic_sender_variants(&self.0, s)
110    }
111}
112
113struct GenericSenderVisitor<T> {
114    marker: PhantomData<T>,
115}
116
117impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
118    type Value = GenericSenderVariants<T>;
119
120    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121        formatter.write_str("a GenericSender variant")
122    }
123
124    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
125    where
126        A: serde::de::EnumAccess<'de>,
127    {
128        #[derive(Deserialize)]
129        enum GenericSenderVariantNames {
130            Ipc,
131            Crossbeam,
132        }
133
134        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
135
136        match variant_name {
137            GenericSenderVariantNames::Ipc => variant_data
138                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
139                .map(|sender| GenericSenderVariants::Ipc(sender)),
140            GenericSenderVariantNames::Crossbeam => {
141                if opts::get().multiprocess {
142                    return Err(serde::de::Error::custom(
143                        "Crossbeam channel found in multiprocess mode!",
144                    ));
145                }
146                let addr = variant_data.newtype_variant::<usize>()?;
147                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
148                // SAFETY: We know we are in the same address space as the sender, so we can safely
149                // reconstruct the Box.
150                #[expect(unsafe_code)]
151                let sender = unsafe { Box::from_raw(ptr) };
152                Ok(GenericSenderVariants::Crossbeam(*sender))
153            },
154        }
155    }
156}
157
158impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
159    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
160    where
161        D: Deserializer<'a>,
162    {
163        d.deserialize_enum(
164            "GenericSender",
165            &["Ipc", "Crossbeam"],
166            GenericSenderVisitor {
167                marker: PhantomData,
168            },
169        )
170        .map(|variant| GenericSender(variant))
171    }
172}
173
174impl<T> Clone for GenericSender<T>
175where
176    T: Serialize,
177{
178    fn clone(&self) -> Self {
179        match &self.0 {
180            GenericSenderVariants::Ipc(chan) => {
181                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
182            },
183            GenericSenderVariants::Crossbeam(chan) => {
184                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
185            },
186        }
187    }
188}
189
190impl<T: Serialize> fmt::Debug for GenericSender<T> {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        write!(f, "Sender(..)")
193    }
194}
195
196impl<T: Serialize> GenericSender<T> {
197    #[inline]
198    pub fn send(&self, msg: T) -> SendResult {
199        match &self.0 {
200            GenericSenderVariants::Ipc(sender) => sender
201                .send(msg)
202                .map_err(|e| SendError::SerializationError(e.to_string())),
203            GenericSenderVariants::Crossbeam(sender) => {
204                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
205            },
206        }
207    }
208}
209
210impl<T: Serialize> MallocSizeOf for GenericSender<T> {
211    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
212        match &self.0 {
213            GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
214            GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
215        }
216    }
217}
218
219#[derive(Debug)]
220pub enum SendError {
221    Disconnected,
222    SerializationError(String),
223}
224
225impl From<IpcError> for SendError {
226    fn from(value: IpcError) -> Self {
227        match value {
228            IpcError::SerializationError(ser_de_error) => {
229                SendError::SerializationError(ser_de_error.to_string())
230            },
231            IpcError::Io(error) => {
232                log::error!("IO Error in ipc {:?}", error);
233                SendError::Disconnected
234            },
235            IpcError::Disconnected => SendError::Disconnected,
236        }
237    }
238}
239
240impl Display for SendError {
241    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242        write!(f, "{self:?}")
243    }
244}
245
246pub type SendResult = Result<(), SendError>;
247
248#[derive(Debug)]
249pub enum ReceiveError {
250    DeserializationFailed(String),
251    /// Io Error. May occur when using IPC.
252    Io(std::io::Error),
253    /// The channel was closed.
254    Disconnected,
255}
256
257impl From<IpcError> for ReceiveError {
258    fn from(e: IpcError) -> Self {
259        match e {
260            IpcError::Disconnected => ReceiveError::Disconnected,
261            IpcError::Io(reason) => ReceiveError::Io(reason),
262            IpcError::SerializationError(ser_de_error) => {
263                ReceiveError::DeserializationFailed(ser_de_error.to_string())
264            },
265        }
266    }
267}
268
269impl From<crossbeam_channel::RecvError> for ReceiveError {
270    fn from(_: crossbeam_channel::RecvError) -> Self {
271        ReceiveError::Disconnected
272    }
273}
274
275impl fmt::Display for ReceiveError {
276    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
277        match *self {
278            ReceiveError::DeserializationFailed(ref error) => {
279                write!(fmt, "deserialization error: {error}")
280            },
281            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
282            ReceiveError::Disconnected => write!(fmt, "disconnected"),
283        }
284    }
285}
286impl From<std::io::Error> for ReceiveError {
287    fn from(value: std::io::Error) -> Self {
288        ReceiveError::Io(value)
289    }
290}
291
292pub enum TryReceiveError {
293    Empty,
294    ReceiveError(ReceiveError),
295}
296
297impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
298    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
299        match value {
300            RecvTimeoutError::Timeout => TryReceiveError::Empty,
301            RecvTimeoutError::Disconnected => {
302                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
303            },
304        }
305    }
306}
307
308impl From<ipc_channel::TryRecvError> for TryReceiveError {
309    fn from(e: ipc_channel::TryRecvError) -> Self {
310        match e {
311            ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
312            ipc_channel::TryRecvError::IpcError(inner) => {
313                TryReceiveError::ReceiveError(inner.into())
314            },
315        }
316    }
317}
318
319impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
320    fn from(e: crossbeam_channel::TryRecvError) -> Self {
321        match e {
322            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
323            crossbeam_channel::TryRecvError::Disconnected => {
324                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
325            },
326        }
327    }
328}
329
330pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
331pub type ReceiveResult<T> = Result<T, ReceiveError>;
332pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
333pub type RoutedReceiverReceiveResult<T> =
334    Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
335
336pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
337    match receive_result {
338        Ok(Ok(msg)) => Ok(msg),
339        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
340        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
341    }
342}
343
344#[derive(MallocSizeOf)]
345pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
346where
347    T: for<'de> Deserialize<'de> + Serialize;
348
349impl<T> std::fmt::Debug for GenericReceiver<T>
350where
351    T: for<'de> Deserialize<'de> + Serialize,
352{
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        f.debug_tuple("GenericReceiver").finish()
355    }
356}
357
358#[derive(MallocSizeOf)]
359enum GenericReceiverVariants<T>
360where
361    T: for<'de> Deserialize<'de> + Serialize,
362{
363    Ipc(ipc_channel::ipc::IpcReceiver<T>),
364    Crossbeam(RoutedReceiver<T>),
365}
366
367impl<T> GenericReceiver<T>
368where
369    T: for<'de> Deserialize<'de> + Serialize,
370{
371    #[inline]
372    pub fn recv(&self) -> ReceiveResult<T> {
373        match &self.0 {
374            GenericReceiverVariants::Ipc(receiver) => Ok(receiver.recv()?),
375            GenericReceiverVariants::Crossbeam(receiver) => {
376                // `recv()` returns an error if the channel is disconnected
377                let msg = receiver.recv()?;
378                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
379                // unconditionally send an `Ok(T)`
380                Ok(msg.expect("Infallible"))
381            },
382        }
383    }
384
385    #[inline]
386    pub fn try_recv(&self) -> TryReceiveResult<T> {
387        match &self.0 {
388            GenericReceiverVariants::Ipc(receiver) => Ok(receiver.try_recv()?),
389            GenericReceiverVariants::Crossbeam(receiver) => {
390                let msg = receiver.try_recv()?;
391                Ok(msg.expect("Infallible"))
392            },
393        }
394    }
395
396    /// Blocks up to the specific duration attempting to receive a message.
397    #[inline]
398    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
399        match &self.0 {
400            GenericReceiverVariants::Ipc(ipc_receiver) => {
401                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
402            },
403            GenericReceiverVariants::Crossbeam(receiver) => match receiver.recv_timeout(timeout) {
404                Ok(Ok(value)) => Ok(value),
405                Ok(Err(_)) => unreachable!("Infallable"),
406                Err(RecvTimeoutError::Disconnected) => {
407                    Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
408                },
409                Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
410            },
411        }
412    }
413
414    /// Route to a crossbeam receiver, preserving any errors.
415    ///
416    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
417    /// this creates a route.
418    #[inline]
419    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
420    where
421        T: Send + 'static,
422    {
423        match self.0 {
424            GenericReceiverVariants::Ipc(ipc_receiver) => {
425                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
426                let crossbeam_sender_clone = crossbeam_sender;
427                ROUTER.add_typed_route(
428                    ipc_receiver,
429                    Box::new(move |message| {
430                        let _ = crossbeam_sender_clone
431                            .send(message.map_err(IpcError::SerializationError));
432                    }),
433                );
434                crossbeam_receiver
435            },
436            GenericReceiverVariants::Crossbeam(receiver) => receiver,
437        }
438    }
439}
440
441impl<T> Serialize for GenericReceiver<T>
442where
443    T: for<'de> Deserialize<'de> + Serialize,
444{
445    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
446        match &self.0 {
447            GenericReceiverVariants::Ipc(receiver) => {
448                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
449            },
450            GenericReceiverVariants::Crossbeam(receiver) => {
451                if opts::get().multiprocess {
452                    return Err(serde::ser::Error::custom(
453                        "Crossbeam channel found in multiprocess mode!",
454                    ));
455                } // We know everything is in one address-space, so we can "serialize" the receiver by
456                // sending a leaked Box pointer.
457                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
458                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
459            },
460        }
461    }
462}
463
464struct GenericReceiverVisitor<T> {
465    marker: PhantomData<T>,
466}
467impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
468where
469    T: for<'a> Deserialize<'a> + Serialize,
470{
471    type Value = GenericReceiver<T>;
472
473    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
474        formatter.write_str("a GenericReceiver variant")
475    }
476
477    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
478    where
479        A: serde::de::EnumAccess<'de>,
480    {
481        #[derive(Deserialize)]
482        enum GenericReceiverVariantNames {
483            Ipc,
484            Crossbeam,
485        }
486
487        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
488
489        match variant_name {
490            GenericReceiverVariantNames::Ipc => variant_data
491                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
492                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
493            GenericReceiverVariantNames::Crossbeam => {
494                if use_ipc() {
495                    return Err(serde::de::Error::custom(
496                        "Crossbeam channel found in multiprocess mode!",
497                    ));
498                }
499                let addr = variant_data.newtype_variant::<usize>()?;
500                let ptr = addr as *mut RoutedReceiver<T>;
501                // SAFETY: We know we are in the same address space as the sender, so we can safely
502                // reconstruct the Box.
503                #[expect(unsafe_code)]
504                let receiver = unsafe { Box::from_raw(ptr) };
505                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
506                    *receiver,
507                )))
508            },
509        }
510    }
511}
512
513impl<'a, T> Deserialize<'a> for GenericReceiver<T>
514where
515    T: for<'de> Deserialize<'de> + Serialize,
516{
517    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
518    where
519        D: Deserializer<'a>,
520    {
521        d.deserialize_enum(
522            "GenericReceiver",
523            &["Ipc", "Crossbeam"],
524            GenericReceiverVisitor {
525                marker: PhantomData,
526            },
527        )
528    }
529}
530
531/// Private helper function to create a crossbeam based channel.
532///
533/// Do NOT make this function public!
534fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
535where
536    T: Serialize + for<'de> serde::Deserialize<'de>,
537{
538    let (tx, rx) = crossbeam_channel::unbounded();
539    (
540        GenericSender(GenericSenderVariants::Crossbeam(tx)),
541        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
542    )
543}
544
545fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
546where
547    T: Serialize + for<'de> serde::Deserialize<'de>,
548{
549    ipc_channel::ipc::channel().map(|(tx, rx)| {
550        (
551            GenericSender(GenericSenderVariants::Ipc(tx)),
552            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
553        )
554    })
555}
556
557/// Creates a Servo channel that can select different channel implementations based on multiprocess
558/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
559/// crossbeam channel is preferred.
560pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
561where
562    T: for<'de> Deserialize<'de> + Serialize,
563{
564    if use_ipc() {
565        new_generic_channel_ipc().ok()
566    } else {
567        Some(new_generic_channel_crossbeam())
568    }
569}
570
571#[cfg(test)]
572mod single_process_channel_tests {
573    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
574    //! can be sent over each other without problems in single-process mode.
575    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
576    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
577    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
578
579    #[test]
580    fn generic_crossbeam_can_send() {
581        let (tx, rx) = new_generic_channel_crossbeam();
582        tx.send(5).expect("Send failed");
583        let val = rx.recv().expect("Receive failed");
584        assert_eq!(val, 5);
585    }
586
587    #[test]
588    fn generic_crossbeam_ping_pong() {
589        let (tx, rx) = new_generic_channel_crossbeam();
590        let (tx2, rx2) = new_generic_channel_crossbeam();
591
592        tx.send(tx2).expect("Send failed");
593
594        std::thread::scope(|s| {
595            s.spawn(move || {
596                let reply_sender = rx.recv().expect("Receive failed");
597                reply_sender.send(42).expect("Sending reply failed");
598            });
599        });
600        let res = rx2.recv().expect("Receive of reply failed");
601        assert_eq!(res, 42);
602    }
603
604    #[test]
605    fn generic_ipc_ping_pong() {
606        let (tx, rx) = new_generic_channel_ipc().unwrap();
607        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
608
609        tx.send(tx2).expect("Send failed");
610
611        std::thread::scope(|s| {
612            s.spawn(move || {
613                let reply_sender = rx.recv().expect("Receive failed");
614                reply_sender.send(42).expect("Sending reply failed");
615            });
616        });
617        let res = rx2.recv().expect("Receive of reply failed");
618        assert_eq!(res, 42);
619    }
620
621    #[test]
622    fn send_crossbeam_sender_over_ipc_channel() {
623        let (tx, rx) = new_generic_channel_ipc().unwrap();
624        let (tx2, rx2) = new_generic_channel_crossbeam();
625
626        tx.send(tx2).expect("Send failed");
627
628        std::thread::scope(|s| {
629            s.spawn(move || {
630                let reply_sender = rx.recv().expect("Receive failed");
631                reply_sender.send(42).expect("Sending reply failed");
632            });
633        });
634        let res = rx2.recv().expect("Receive of reply failed");
635        assert_eq!(res, 42);
636    }
637
638    #[test]
639    fn send_generic_ipc_channel_over_crossbeam() {
640        let (tx, rx) = new_generic_channel_crossbeam();
641        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
642
643        tx.send(tx2).expect("Send failed");
644
645        std::thread::scope(|s| {
646            s.spawn(move || {
647                let reply_sender = rx.recv().expect("Receive failed");
648                reply_sender.send(42).expect("Sending reply failed");
649            });
650        });
651        let res = rx2.recv().expect("Receive of reply failed");
652        assert_eq!(res, 42);
653    }
654
655    #[test]
656    fn send_crossbeam_receiver_over_ipc_channel() {
657        let (tx, rx) = new_generic_channel_ipc().unwrap();
658        let (tx2, rx2) = new_generic_channel_crossbeam();
659
660        tx.send(rx2).expect("Send failed");
661        tx2.send(42).expect("Send failed");
662
663        std::thread::scope(|s| {
664            s.spawn(move || {
665                let another_receiver = rx.recv().expect("Receive failed");
666                let res = another_receiver.recv().expect("Receive failed");
667                assert_eq!(res, 42);
668            });
669        });
670    }
671
672    #[test]
673    fn test_timeout_ipc() {
674        let (tx, rx) = new_generic_channel_ipc().unwrap();
675        let timeout_duration = std::time::Duration::from_secs(3);
676        std::thread::spawn(move || {
677            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
678            assert!(tx.send(()).is_ok());
679        });
680        let received = rx.try_recv_timeout(timeout_duration);
681        assert!(received.is_ok());
682    }
683
684    #[test]
685    fn test_timeout_crossbeam() {
686        let (tx, rx) = new_generic_channel_crossbeam();
687        let timeout_duration = std::time::Duration::from_secs(3);
688        std::thread::spawn(move || {
689            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
690            assert!(tx.send(()).is_ok());
691        });
692        let received = rx.try_recv_timeout(timeout_duration);
693        assert!(received.is_ok());
694    }
695}
696
697/// This tests need to be in here because they use the 'new_generic_channel_..' methods
698#[cfg(test)]
699mod generic_receiversets_tests {
700    use std::time::Duration;
701
702    use crate::generic_channel::generic_channelset::{
703        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
704    };
705    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
706
707    #[test]
708    fn test_ipc_side1() {
709        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712        // We keep the senders alive till all threads are done
713        let snd1_c = snd1.clone();
714        let snd2_c = snd2.clone();
715        let mut set = create_ipc_receiver_set();
716        let recv1_select_index = set.add(recv1);
717        let _recv2_select_index = set.add(recv2);
718
719        std::thread::spawn(move || {
720            snd1_c.send(10).unwrap();
721        });
722        std::thread::spawn(move || {
723            std::thread::sleep(Duration::from_secs(1));
724            let _ = snd2_c.send(20); // this might error with closed channel
725        });
726
727        let select_result = set.select();
728        let channel_result = select_result.first().unwrap();
729        assert_eq!(
730            *channel_result,
731            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
732        );
733    }
734
735    #[test]
736    fn test_ipc_side2() {
737        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
738        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
739
740        // We keep the senders alive till all threads are done
741        let snd1_c = snd1.clone();
742        let snd2_c = snd2.clone();
743        let mut set = create_ipc_receiver_set();
744        let _recv1_select_index = set.add(recv1);
745        let recv2_select_index = set.add(recv2);
746
747        std::thread::spawn(move || {
748            std::thread::sleep(Duration::from_secs(1));
749            let _ = snd1_c.send(10);
750        });
751        std::thread::spawn(move || {
752            snd2_c.send(20).unwrap();
753        });
754
755        let select_result = set.select();
756        let channel_result = select_result.first().unwrap();
757        assert_eq!(
758            *channel_result,
759            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
760        );
761    }
762
763    #[test]
764    fn test_crossbeam_side1() {
765        let (snd1, recv1) = new_generic_channel_crossbeam();
766        let (snd2, recv2) = new_generic_channel_crossbeam();
767
768        // We keep the senders alive till all threads are done
769        let snd1_c = snd1.clone();
770        let snd2_c = snd2.clone();
771        let mut set = create_crossbeam_receiver_set();
772        let recv1_select_index = set.add(recv1);
773        let _recv2_select_index = set.add(recv2);
774
775        std::thread::spawn(move || {
776            snd1_c.send(10).unwrap();
777        });
778        std::thread::spawn(move || {
779            std::thread::sleep(Duration::from_secs(2));
780            let _ = snd2_c.send(20);
781        });
782
783        let select_result = set.select();
784        let channel_result = select_result.first().unwrap();
785        assert_eq!(
786            *channel_result,
787            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
788        );
789    }
790
791    #[test]
792    fn test_crossbeam_side2() {
793        let (snd1, recv1) = new_generic_channel_crossbeam();
794        let (snd2, recv2) = new_generic_channel_crossbeam();
795
796        // We keep the senders alive till all threads are done
797        let snd1_c = snd1.clone();
798        let snd2_c = snd2.clone();
799        let mut set = create_crossbeam_receiver_set();
800        let _recv1_select_index = set.add(recv1);
801        let recv2_select_index = set.add(recv2);
802
803        std::thread::spawn(move || {
804            std::thread::sleep(Duration::from_secs(2));
805            let _ = snd1_c.send(10);
806        });
807        std::thread::spawn(move || {
808            snd2_c.send(20).unwrap();
809        });
810
811        let select_result = set.select();
812        let channel_result = select_result.first().unwrap();
813        assert_eq!(
814            *channel_result,
815            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
816        );
817    }
818
819    #[test]
820    fn test_ipc_no_crash_on_disconnect() {
821        // Test that we do not crash if a channel gets disconnected.
822        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
823        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
824        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
825
826        // We keep the senders alive till all threads are done
827        let snd1_c = snd1.clone();
828        let mut set = create_ipc_receiver_set();
829        let _recv1_select_index = set.add(recv1);
830        let recv2_select_index = set.add(recv2);
831
832        std::thread::spawn(move || {
833            std::thread::sleep(Duration::from_secs(2));
834            let _ = snd1_c.send(10);
835        });
836        std::thread::spawn(move || {
837            snd2.send(20).unwrap();
838        });
839        std::thread::sleep(Duration::from_secs(1));
840        let select_result = set.select();
841        let channel_result = select_result.first().unwrap();
842        assert_eq!(
843            *channel_result,
844            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845        );
846    }
847
848    #[test]
849    fn test_crossbeam_no_crash_on_disconnect() {
850        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
851        let (snd1, recv1) = new_generic_channel_crossbeam();
852        let (snd2, recv2) = new_generic_channel_crossbeam();
853
854        // We keep the senders alive till all threads are done
855        let snd1_c = snd1.clone();
856        let mut set = create_crossbeam_receiver_set();
857        let _recv1_select_index = set.add(recv1);
858        let recv2_select_index = set.add(recv2);
859
860        std::thread::spawn(move || {
861            std::thread::sleep(Duration::from_secs(2));
862            let _ = snd1_c.send(10);
863        });
864        std::thread::spawn(move || {
865            snd2.send(20).unwrap();
866        });
867        std::thread::sleep(Duration::from_secs(1));
868        let select_result = set.select();
869        let channel_result = select_result.first().unwrap();
870        assert_eq!(
871            *channel_result,
872            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
873        );
874    }
875
876    #[test]
877    fn test_ipc_disconnect_correct_message() {
878        // Test that we do not crash if a channel gets disconnected.
879        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
880        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
881
882        // We keep the senders alive till all threads are done
883        let snd1_c = snd1.clone();
884        let mut set = create_ipc_receiver_set();
885        let _recv1_select_index = set.add(recv1);
886        let recv2_select_index = set.add(recv2);
887
888        std::thread::spawn(move || {
889            std::thread::sleep(Duration::from_secs(2));
890            let _ = snd1_c.send(10);
891        });
892        std::thread::spawn(move || {
893            drop(snd2);
894        });
895
896        let select_result = set.select();
897        let channel_result = select_result.first().unwrap();
898        assert_eq!(
899            *channel_result,
900            GenericSelectionResult::ChannelClosed(recv2_select_index)
901        );
902    }
903
904    #[test]
905    fn test_crossbeam_disconnect_correct_messaget() {
906        let (snd1, recv1) = new_generic_channel_crossbeam();
907        let (snd2, recv2) = new_generic_channel_crossbeam();
908
909        // We keep the senders alive till all threads are done
910        let snd1_c = snd1.clone();
911        let mut set = create_crossbeam_receiver_set();
912        let _recv1_select_index = set.add(recv1);
913        let recv2_select_index = set.add(recv2);
914
915        std::thread::spawn(move || {
916            std::thread::sleep(Duration::from_secs(2));
917            let _ = snd1_c.send(10);
918        });
919        std::thread::spawn(move || {
920            drop(snd2);
921        });
922
923        let select_result = set.select();
924        let channel_result = select_result.first().unwrap();
925        assert_eq!(
926            *channel_result,
927            GenericSelectionResult::ChannelClosed(recv2_select_index)
928        );
929    }
930}