base/
generic_channel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod oneshot;
25mod shared_memory;
26pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
27pub use shared_memory::GenericSharedMemory;
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31/// Cache for being in Ipc Mode
32static USE_IPC: OnceLock<bool> = OnceLock::new();
33
34/// Return if we should be in IPC Mode
35fn use_ipc() -> bool {
36    *USE_IPC.get_or_init(|| {
37        servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
38    })
39}
40
41/// Abstraction of the ability to send a particular type of message cross-process.
42/// This can be used to ease the use of GenericSender sub-fields.
43pub trait GenericSend<T>
44where
45    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
46{
47    /// send message T
48    fn send(&self, _: T) -> SendResult;
49    /// get underlying sender
50    fn sender(&self) -> GenericSender<T>;
51}
52
53/// A GenericSender that sends messages to a [GenericReceiver].
54///
55/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
56pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
57
58/// The actual GenericSender variant.
59///
60/// This enum is private, so that outside code can't construct a GenericSender itself.
61/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
62enum GenericSenderVariants<T: Serialize> {
63    Ipc(ipc_channel::ipc::IpcSender<T>),
64    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
65    /// which propagates the IPC error, the inner type is a Result.
66    /// In the IPC case, the Router deserializes the message, which can fail, and sends
67    /// the result to a crossbeam receiver.
68    /// The crossbeam channel does not involve serializing, so we can't have this error,
69    /// but replicating the API allows us to have one channel type as the receiver
70    /// after routing the receiver .
71    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
72}
73
74fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
75    value: &GenericSenderVariants<T>,
76    s: S,
77) -> Result<S::Ok, S::Error> {
78    match value {
79        GenericSenderVariants::Ipc(sender) => {
80            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
81        },
82        // All GenericSenders will be IPC channels in multi-process mode, so sending a
83        // GenericChannel over existing IPC channels is no problem and won't fail.
84        // In single-process mode, we can also send GenericSenders over other GenericSenders
85        // just fine, since no serialization is required.
86        // The only reason we need / want serialization is to support sending GenericSenders
87        // over existing IPC channels **in single process mode**. This allows us to
88        // incrementally port channels to the GenericChannel, without needing to follow a
89        // top-to-bottom approach.
90        // Long-term we can remove this branch in the code again and replace it with
91        // unreachable, since likely all IPC channels would be GenericChannels.
92        GenericSenderVariants::Crossbeam(sender) => {
93            if opts::get().multiprocess {
94                return Err(serde::ser::Error::custom(
95                    "Crossbeam channel found in multiprocess mode!",
96                ));
97            } // We know everything is in one address-space, so we can "serialize" the sender by
98            // sending a leaked Box pointer.
99            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
100            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
101        },
102    }
103}
104
105impl<T: Serialize> Serialize for GenericSender<T> {
106    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
107        serialize_generic_sender_variants(&self.0, s)
108    }
109}
110
111struct GenericSenderVisitor<T> {
112    marker: PhantomData<T>,
113}
114
115impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
116    type Value = GenericSenderVariants<T>;
117
118    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
119        formatter.write_str("a GenericSender variant")
120    }
121
122    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
123    where
124        A: serde::de::EnumAccess<'de>,
125    {
126        #[derive(Deserialize)]
127        enum GenericSenderVariantNames {
128            Ipc,
129            Crossbeam,
130        }
131
132        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
133
134        match variant_name {
135            GenericSenderVariantNames::Ipc => variant_data
136                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
137                .map(|sender| GenericSenderVariants::Ipc(sender)),
138            GenericSenderVariantNames::Crossbeam => {
139                if opts::get().multiprocess {
140                    return Err(serde::de::Error::custom(
141                        "Crossbeam channel found in multiprocess mode!",
142                    ));
143                }
144                let addr = variant_data.newtype_variant::<usize>()?;
145                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
146                // SAFETY: We know we are in the same address space as the sender, so we can safely
147                // reconstruct the Box.
148                #[expect(unsafe_code)]
149                let sender = unsafe { Box::from_raw(ptr) };
150                Ok(GenericSenderVariants::Crossbeam(*sender))
151            },
152        }
153    }
154}
155
156impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
157    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
158    where
159        D: Deserializer<'a>,
160    {
161        d.deserialize_enum(
162            "GenericSender",
163            &["Ipc", "Crossbeam"],
164            GenericSenderVisitor {
165                marker: PhantomData,
166            },
167        )
168        .map(|variant| GenericSender(variant))
169    }
170}
171
172impl<T> Clone for GenericSender<T>
173where
174    T: Serialize,
175{
176    fn clone(&self) -> Self {
177        match self.0 {
178            GenericSenderVariants::Ipc(ref chan) => {
179                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
180            },
181            GenericSenderVariants::Crossbeam(ref chan) => {
182                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
183            },
184        }
185    }
186}
187
188impl<T: Serialize> fmt::Debug for GenericSender<T> {
189    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190        write!(f, "Sender(..)")
191    }
192}
193
194impl<T: Serialize> GenericSender<T> {
195    #[inline]
196    pub fn send(&self, msg: T) -> SendResult {
197        match self.0 {
198            GenericSenderVariants::Ipc(ref sender) => sender
199                .send(msg)
200                .map_err(|e| SendError::SerializationError(format!("{e}"))),
201            GenericSenderVariants::Crossbeam(ref sender) => {
202                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
203            },
204        }
205    }
206}
207
208impl<T: Serialize> MallocSizeOf for GenericSender<T> {
209    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
210        match &self.0 {
211            GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
212            GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
213        }
214    }
215}
216
217#[derive(Debug)]
218pub enum SendError {
219    Disconnected,
220    SerializationError(String),
221}
222
223impl From<IpcError> for SendError {
224    fn from(value: IpcError) -> Self {
225        match value {
226            IpcError::SerializationError(ser_de_error) => {
227                SendError::SerializationError(ser_de_error.to_string())
228            },
229            IpcError::Io(error) => {
230                log::error!("IO Error in ipc {:?}", error);
231                SendError::Disconnected
232            },
233            IpcError::Disconnected => SendError::Disconnected,
234        }
235    }
236}
237
238impl Display for SendError {
239    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
240        write!(f, "{self:?}")
241    }
242}
243
244pub type SendResult = Result<(), SendError>;
245
246#[derive(Debug)]
247pub enum ReceiveError {
248    DeserializationFailed(String),
249    /// Io Error. May occur when using IPC.
250    Io(std::io::Error),
251    /// The channel was closed.
252    Disconnected,
253}
254
255impl From<IpcError> for ReceiveError {
256    fn from(e: IpcError) -> Self {
257        match e {
258            IpcError::Disconnected => ReceiveError::Disconnected,
259            IpcError::Io(reason) => ReceiveError::Io(reason),
260            IpcError::SerializationError(ser_de_error) => {
261                ReceiveError::DeserializationFailed(ser_de_error.to_string())
262            },
263        }
264    }
265}
266
267impl From<crossbeam_channel::RecvError> for ReceiveError {
268    fn from(_: crossbeam_channel::RecvError) -> Self {
269        ReceiveError::Disconnected
270    }
271}
272
273impl fmt::Display for ReceiveError {
274    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
275        match *self {
276            ReceiveError::DeserializationFailed(ref error) => {
277                write!(fmt, "deserialization error: {error}")
278            },
279            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
280            ReceiveError::Disconnected => write!(fmt, "disconnected"),
281        }
282    }
283}
284impl From<std::io::Error> for ReceiveError {
285    fn from(value: std::io::Error) -> Self {
286        ReceiveError::Io(value)
287    }
288}
289
290pub enum TryReceiveError {
291    Empty,
292    ReceiveError(ReceiveError),
293}
294
295impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
296    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
297        match value {
298            RecvTimeoutError::Timeout => TryReceiveError::Empty,
299            RecvTimeoutError::Disconnected => {
300                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
301            },
302        }
303    }
304}
305
306impl From<ipc_channel::TryRecvError> for TryReceiveError {
307    fn from(e: ipc_channel::TryRecvError) -> Self {
308        match e {
309            ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
310            ipc_channel::TryRecvError::IpcError(inner) => {
311                TryReceiveError::ReceiveError(inner.into())
312            },
313        }
314    }
315}
316
317impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
318    fn from(e: crossbeam_channel::TryRecvError) -> Self {
319        match e {
320            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
321            crossbeam_channel::TryRecvError::Disconnected => {
322                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
323            },
324        }
325    }
326}
327
328pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
329pub type ReceiveResult<T> = Result<T, ReceiveError>;
330pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
331pub type RoutedReceiverReceiveResult<T> =
332    Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
333
334pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
335    match receive_result {
336        Ok(Ok(msg)) => Ok(msg),
337        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
338        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
339    }
340}
341
342#[derive(MallocSizeOf)]
343pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
344where
345    T: for<'de> Deserialize<'de> + Serialize;
346
347impl<T> std::fmt::Debug for GenericReceiver<T>
348where
349    T: for<'de> Deserialize<'de> + Serialize,
350{
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        f.debug_tuple("GenericReceiver").finish()
353    }
354}
355
356#[derive(MallocSizeOf)]
357enum GenericReceiverVariants<T>
358where
359    T: for<'de> Deserialize<'de> + Serialize,
360{
361    Ipc(ipc_channel::ipc::IpcReceiver<T>),
362    Crossbeam(RoutedReceiver<T>),
363}
364
365impl<T> GenericReceiver<T>
366where
367    T: for<'de> Deserialize<'de> + Serialize,
368{
369    #[inline]
370    pub fn recv(&self) -> ReceiveResult<T> {
371        match self.0 {
372            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
373            GenericReceiverVariants::Crossbeam(ref receiver) => {
374                // `recv()` returns an error if the channel is disconnected
375                let msg = receiver.recv()?;
376                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
377                // unconditionally send an `Ok(T)`
378                Ok(msg.expect("Infallible"))
379            },
380        }
381    }
382
383    #[inline]
384    pub fn try_recv(&self) -> TryReceiveResult<T> {
385        match self.0 {
386            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
387            GenericReceiverVariants::Crossbeam(ref receiver) => {
388                let msg = receiver.try_recv()?;
389                Ok(msg.expect("Infallible"))
390            },
391        }
392    }
393
394    /// Blocks up to the specific duration attempting to receive a message.
395    #[inline]
396    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
397        match self.0 {
398            GenericReceiverVariants::Ipc(ref ipc_receiver) => {
399                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
400            },
401            GenericReceiverVariants::Crossbeam(ref receiver) => {
402                match receiver.recv_timeout(timeout) {
403                    Ok(Ok(value)) => Ok(value),
404                    Ok(Err(_)) => unreachable!("Infallable"),
405                    Err(RecvTimeoutError::Disconnected) => {
406                        Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
407                    },
408                    Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
409                }
410            },
411        }
412    }
413
414    /// Route to a crossbeam receiver, preserving any errors.
415    ///
416    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
417    /// this creates a route.
418    #[inline]
419    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
420    where
421        T: Send + 'static,
422    {
423        match self.0 {
424            GenericReceiverVariants::Ipc(ipc_receiver) => {
425                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
426                let crossbeam_sender_clone = crossbeam_sender.clone();
427                ROUTER.add_typed_route(
428                    ipc_receiver,
429                    Box::new(move |message| {
430                        let _ = crossbeam_sender_clone
431                            .send(message.map_err(IpcError::SerializationError));
432                    }),
433                );
434                crossbeam_receiver
435            },
436            GenericReceiverVariants::Crossbeam(receiver) => receiver,
437        }
438    }
439}
440
441impl<T> Serialize for GenericReceiver<T>
442where
443    T: for<'de> Deserialize<'de> + Serialize,
444{
445    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
446        match &self.0 {
447            GenericReceiverVariants::Ipc(receiver) => {
448                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
449            },
450            GenericReceiverVariants::Crossbeam(receiver) => {
451                if opts::get().multiprocess {
452                    return Err(serde::ser::Error::custom(
453                        "Crossbeam channel found in multiprocess mode!",
454                    ));
455                } // We know everything is in one address-space, so we can "serialize" the receiver by
456                // sending a leaked Box pointer.
457                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
458                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
459            },
460        }
461    }
462}
463
464struct GenericReceiverVisitor<T> {
465    marker: PhantomData<T>,
466}
467impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
468where
469    T: for<'a> Deserialize<'a> + Serialize,
470{
471    type Value = GenericReceiver<T>;
472
473    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
474        formatter.write_str("a GenericReceiver variant")
475    }
476
477    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
478    where
479        A: serde::de::EnumAccess<'de>,
480    {
481        #[derive(Deserialize)]
482        enum GenericReceiverVariantNames {
483            Ipc,
484            Crossbeam,
485        }
486
487        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
488
489        match variant_name {
490            GenericReceiverVariantNames::Ipc => variant_data
491                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
492                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
493            GenericReceiverVariantNames::Crossbeam => {
494                if use_ipc() {
495                    return Err(serde::de::Error::custom(
496                        "Crossbeam channel found in multiprocess mode!",
497                    ));
498                }
499                let addr = variant_data.newtype_variant::<usize>()?;
500                let ptr = addr as *mut RoutedReceiver<T>;
501                // SAFETY: We know we are in the same address space as the sender, so we can safely
502                // reconstruct the Box.
503                #[expect(unsafe_code)]
504                let receiver = unsafe { Box::from_raw(ptr) };
505                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
506                    *receiver,
507                )))
508            },
509        }
510    }
511}
512
513impl<'a, T> Deserialize<'a> for GenericReceiver<T>
514where
515    T: for<'de> Deserialize<'de> + Serialize,
516{
517    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
518    where
519        D: Deserializer<'a>,
520    {
521        d.deserialize_enum(
522            "GenericReceiver",
523            &["Ipc", "Crossbeam"],
524            GenericReceiverVisitor {
525                marker: PhantomData,
526            },
527        )
528    }
529}
530
531/// Private helper function to create a crossbeam based channel.
532///
533/// Do NOT make this function public!
534fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
535where
536    T: Serialize + for<'de> serde::Deserialize<'de>,
537{
538    let (tx, rx) = crossbeam_channel::unbounded();
539    (
540        GenericSender(GenericSenderVariants::Crossbeam(tx)),
541        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
542    )
543}
544
545fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
546where
547    T: Serialize + for<'de> serde::Deserialize<'de>,
548{
549    ipc_channel::ipc::channel().map(|(tx, rx)| {
550        (
551            GenericSender(GenericSenderVariants::Ipc(tx)),
552            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
553        )
554    })
555}
556
557/// Creates a Servo channel that can select different channel implementations based on multiprocess
558/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
559/// crossbeam channel is preferred.
560pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
561where
562    T: for<'de> Deserialize<'de> + Serialize,
563{
564    if use_ipc() {
565        new_generic_channel_ipc().ok()
566    } else {
567        Some(new_generic_channel_crossbeam())
568    }
569}
570
571#[cfg(test)]
572mod single_process_channel_tests {
573    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
574    //! can be sent over each other without problems in single-process mode.
575    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
576    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
577    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
578
579    #[test]
580    fn generic_crossbeam_can_send() {
581        let (tx, rx) = new_generic_channel_crossbeam();
582        tx.send(5).expect("Send failed");
583        let val = rx.recv().expect("Receive failed");
584        assert_eq!(val, 5);
585    }
586
587    #[test]
588    fn generic_crossbeam_ping_pong() {
589        let (tx, rx) = new_generic_channel_crossbeam();
590        let (tx2, rx2) = new_generic_channel_crossbeam();
591
592        tx.send(tx2).expect("Send failed");
593
594        std::thread::scope(|s| {
595            s.spawn(move || {
596                let reply_sender = rx.recv().expect("Receive failed");
597                reply_sender.send(42).expect("Sending reply failed");
598            });
599        });
600        let res = rx2.recv().expect("Receive of reply failed");
601        assert_eq!(res, 42);
602    }
603
604    #[test]
605    fn generic_ipc_ping_pong() {
606        let (tx, rx) = new_generic_channel_ipc().unwrap();
607        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
608
609        tx.send(tx2).expect("Send failed");
610
611        std::thread::scope(|s| {
612            s.spawn(move || {
613                let reply_sender = rx.recv().expect("Receive failed");
614                reply_sender.send(42).expect("Sending reply failed");
615            });
616        });
617        let res = rx2.recv().expect("Receive of reply failed");
618        assert_eq!(res, 42);
619    }
620
621    #[test]
622    fn send_crossbeam_sender_over_ipc_channel() {
623        let (tx, rx) = new_generic_channel_ipc().unwrap();
624        let (tx2, rx2) = new_generic_channel_crossbeam();
625
626        tx.send(tx2).expect("Send failed");
627
628        std::thread::scope(|s| {
629            s.spawn(move || {
630                let reply_sender = rx.recv().expect("Receive failed");
631                reply_sender.send(42).expect("Sending reply failed");
632            });
633        });
634        let res = rx2.recv().expect("Receive of reply failed");
635        assert_eq!(res, 42);
636    }
637
638    #[test]
639    fn send_generic_ipc_channel_over_crossbeam() {
640        let (tx, rx) = new_generic_channel_crossbeam();
641        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
642
643        tx.send(tx2).expect("Send failed");
644
645        std::thread::scope(|s| {
646            s.spawn(move || {
647                let reply_sender = rx.recv().expect("Receive failed");
648                reply_sender.send(42).expect("Sending reply failed");
649            });
650        });
651        let res = rx2.recv().expect("Receive of reply failed");
652        assert_eq!(res, 42);
653    }
654
655    #[test]
656    fn send_crossbeam_receiver_over_ipc_channel() {
657        let (tx, rx) = new_generic_channel_ipc().unwrap();
658        let (tx2, rx2) = new_generic_channel_crossbeam();
659
660        tx.send(rx2).expect("Send failed");
661        tx2.send(42).expect("Send failed");
662
663        std::thread::scope(|s| {
664            s.spawn(move || {
665                let another_receiver = rx.recv().expect("Receive failed");
666                let res = another_receiver.recv().expect("Receive failed");
667                assert_eq!(res, 42);
668            });
669        });
670    }
671
672    #[test]
673    fn test_timeout_ipc() {
674        let (tx, rx) = new_generic_channel_ipc().unwrap();
675        let timeout_duration = std::time::Duration::from_secs(3);
676        std::thread::spawn(move || {
677            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
678            assert!(tx.send(()).is_ok());
679        });
680        let received = rx.try_recv_timeout(timeout_duration);
681        assert!(received.is_ok());
682    }
683
684    #[test]
685    fn test_timeout_crossbeam() {
686        let (tx, rx) = new_generic_channel_crossbeam();
687        let timeout_duration = std::time::Duration::from_secs(3);
688        std::thread::spawn(move || {
689            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
690            assert!(tx.send(()).is_ok());
691        });
692        let received = rx.try_recv_timeout(timeout_duration);
693        assert!(received.is_ok());
694    }
695}
696
697/// This tests need to be in here because they use the 'new_generic_channel_..' methods
698#[cfg(test)]
699mod generic_receiversets_tests {
700    use std::time::Duration;
701
702    use crate::generic_channel::generic_channelset::{
703        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
704    };
705    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
706
707    #[test]
708    fn test_ipc_side1() {
709        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712        // We keep the senders alive till all threads are done
713        let snd1_c = snd1.clone();
714        let snd2_c = snd2.clone();
715        let mut set = create_ipc_receiver_set();
716        let recv1_select_index = set.add(recv1);
717        let _recv2_select_index = set.add(recv2);
718
719        std::thread::spawn(move || {
720            snd1_c.send(10).unwrap();
721        });
722        std::thread::spawn(move || {
723            std::thread::sleep(Duration::from_secs(1));
724            let _ = snd2_c.send(20); // this might error with closed channel
725        });
726
727        let select_result = set.select();
728        let channel_result = select_result.first().unwrap();
729        assert_eq!(
730            *channel_result,
731            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
732        );
733    }
734
735    #[test]
736    fn test_ipc_side2() {
737        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
738        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
739
740        // We keep the senders alive till all threads are done
741        let snd1_c = snd1.clone();
742        let snd2_c = snd2.clone();
743        let mut set = create_ipc_receiver_set();
744        let _recv1_select_index = set.add(recv1);
745        let recv2_select_index = set.add(recv2);
746
747        std::thread::spawn(move || {
748            std::thread::sleep(Duration::from_secs(1));
749            let _ = snd1_c.send(10);
750        });
751        std::thread::spawn(move || {
752            snd2_c.send(20).unwrap();
753        });
754
755        let select_result = set.select();
756        let channel_result = select_result.first().unwrap();
757        assert_eq!(
758            *channel_result,
759            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
760        );
761    }
762
763    #[test]
764    fn test_crossbeam_side1() {
765        let (snd1, recv1) = new_generic_channel_crossbeam();
766        let (snd2, recv2) = new_generic_channel_crossbeam();
767
768        // We keep the senders alive till all threads are done
769        let snd1_c = snd1.clone();
770        let snd2_c = snd2.clone();
771        let mut set = create_crossbeam_receiver_set();
772        let recv1_select_index = set.add(recv1);
773        let _recv2_select_index = set.add(recv2);
774
775        std::thread::spawn(move || {
776            snd1_c.send(10).unwrap();
777        });
778        std::thread::spawn(move || {
779            std::thread::sleep(Duration::from_secs(2));
780            let _ = snd2_c.send(20);
781        });
782
783        let select_result = set.select();
784        let channel_result = select_result.first().unwrap();
785        assert_eq!(
786            *channel_result,
787            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
788        );
789    }
790
791    #[test]
792    fn test_crossbeam_side2() {
793        let (snd1, recv1) = new_generic_channel_crossbeam();
794        let (snd2, recv2) = new_generic_channel_crossbeam();
795
796        // We keep the senders alive till all threads are done
797        let snd1_c = snd1.clone();
798        let snd2_c = snd2.clone();
799        let mut set = create_crossbeam_receiver_set();
800        let _recv1_select_index = set.add(recv1);
801        let recv2_select_index = set.add(recv2);
802
803        std::thread::spawn(move || {
804            std::thread::sleep(Duration::from_secs(2));
805            let _ = snd1_c.send(10);
806        });
807        std::thread::spawn(move || {
808            snd2_c.send(20).unwrap();
809        });
810
811        let select_result = set.select();
812        let channel_result = select_result.first().unwrap();
813        assert_eq!(
814            *channel_result,
815            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
816        );
817    }
818
819    #[test]
820    fn test_ipc_no_crash_on_disconnect() {
821        // Test that we do not crash if a channel gets disconnected.
822        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
823        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
824        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
825
826        // We keep the senders alive till all threads are done
827        let snd1_c = snd1.clone();
828        let mut set = create_ipc_receiver_set();
829        let _recv1_select_index = set.add(recv1);
830        let recv2_select_index = set.add(recv2);
831
832        std::thread::spawn(move || {
833            std::thread::sleep(Duration::from_secs(2));
834            let _ = snd1_c.send(10);
835        });
836        std::thread::spawn(move || {
837            snd2.send(20).unwrap();
838        });
839        std::thread::sleep(Duration::from_secs(1));
840        let select_result = set.select();
841        let channel_result = select_result.first().unwrap();
842        assert_eq!(
843            *channel_result,
844            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845        );
846    }
847
848    #[test]
849    fn test_crossbeam_no_crash_on_disconnect() {
850        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
851        let (snd1, recv1) = new_generic_channel_crossbeam();
852        let (snd2, recv2) = new_generic_channel_crossbeam();
853
854        // We keep the senders alive till all threads are done
855        let snd1_c = snd1.clone();
856        let mut set = create_crossbeam_receiver_set();
857        let _recv1_select_index = set.add(recv1);
858        let recv2_select_index = set.add(recv2);
859
860        std::thread::spawn(move || {
861            std::thread::sleep(Duration::from_secs(2));
862            let _ = snd1_c.send(10);
863        });
864        std::thread::spawn(move || {
865            snd2.send(20).unwrap();
866        });
867        std::thread::sleep(Duration::from_secs(1));
868        let select_result = set.select();
869        let channel_result = select_result.first().unwrap();
870        assert_eq!(
871            *channel_result,
872            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
873        );
874    }
875
876    #[test]
877    fn test_ipc_disconnect_correct_message() {
878        // Test that we do not crash if a channel gets disconnected.
879        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
880        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
881
882        // We keep the senders alive till all threads are done
883        let snd1_c = snd1.clone();
884        let mut set = create_ipc_receiver_set();
885        let _recv1_select_index = set.add(recv1);
886        let recv2_select_index = set.add(recv2);
887
888        std::thread::spawn(move || {
889            std::thread::sleep(Duration::from_secs(2));
890            let _ = snd1_c.send(10);
891        });
892        std::thread::spawn(move || {
893            drop(snd2);
894        });
895
896        let select_result = set.select();
897        let channel_result = select_result.first().unwrap();
898        assert_eq!(
899            *channel_result,
900            GenericSelectionResult::ChannelClosed(recv2_select_index)
901        );
902    }
903
904    #[test]
905    fn test_crossbeam_disconnect_correct_messaget() {
906        let (snd1, recv1) = new_generic_channel_crossbeam();
907        let (snd2, recv2) = new_generic_channel_crossbeam();
908
909        // We keep the senders alive till all threads are done
910        let snd1_c = snd1.clone();
911        let mut set = create_crossbeam_receiver_set();
912        let _recv1_select_index = set.add(recv1);
913        let recv2_select_index = set.add(recv2);
914
915        std::thread::spawn(move || {
916            std::thread::sleep(Duration::from_secs(2));
917            let _ = snd1_c.send(10);
918        });
919        std::thread::spawn(move || {
920            drop(snd2);
921        });
922
923        let select_result = set.select();
924        let channel_result = select_result.first().unwrap();
925        assert_eq!(
926            *channel_result,
927            GenericSelectionResult::ChannelClosed(recv2_select_index)
928        );
929    }
930}