base/
generic_channel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::time::Duration;
11
12use crossbeam_channel::RecvTimeoutError;
13use ipc_channel::ipc::IpcError;
14use ipc_channel::router::ROUTER;
15use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
16use malloc_size_of_derive::MallocSizeOf;
17use serde::de::VariantAccess;
18use serde::{Deserialize, Deserializer, Serialize, Serializer};
19use servo_config::opts;
20
21mod callback;
22pub use callback::GenericCallback;
23mod oneshot;
24/// We want to discourage anybody from using the ipc_channel crate in servo and use 'GenericChannels' instead.
25/// 'GenericSharedMemory' is, however, still useful so we reexport it under a different name for future optimization.
26pub use ipc_channel::ipc::IpcSharedMemory as GenericSharedMemory;
27pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31/// Abstraction of the ability to send a particular type of message cross-process.
32/// This can be used to ease the use of GenericSender sub-fields.
33pub trait GenericSend<T>
34where
35    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
36{
37    /// send message T
38    fn send(&self, _: T) -> SendResult;
39    /// get underlying sender
40    fn sender(&self) -> GenericSender<T>;
41}
42
43/// A GenericSender that sends messages to a [GenericReceiver].
44///
45/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
46pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
47
48/// The actual GenericSender variant.
49///
50/// This enum is private, so that outside code can't construct a GenericSender itself.
51/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
52enum GenericSenderVariants<T: Serialize> {
53    Ipc(ipc_channel::ipc::IpcSender<T>),
54    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
55    /// which propagates the IPC error, the inner type is a Result.
56    /// In the IPC case, the Router deserializes the message, which can fail, and sends
57    /// the result to a crossbeam receiver.
58    /// The crossbeam channel does not involve serializing, so we can't have this error,
59    /// but replicating the API allows us to have one channel type as the receiver
60    /// after routing the receiver .
61    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
62}
63
64fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
65    value: &GenericSenderVariants<T>,
66    s: S,
67) -> Result<S::Ok, S::Error> {
68    match value {
69        GenericSenderVariants::Ipc(sender) => {
70            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
71        },
72        // All GenericSenders will be IPC channels in multi-process mode, so sending a
73        // GenericChannel over existing IPC channels is no problem and won't fail.
74        // In single-process mode, we can also send GenericSenders over other GenericSenders
75        // just fine, since no serialization is required.
76        // The only reason we need / want serialization is to support sending GenericSenders
77        // over existing IPC channels **in single process mode**. This allows us to
78        // incrementally port channels to the GenericChannel, without needing to follow a
79        // top-to-bottom approach.
80        // Long-term we can remove this branch in the code again and replace it with
81        // unreachable, since likely all IPC channels would be GenericChannels.
82        GenericSenderVariants::Crossbeam(sender) => {
83            if opts::get().multiprocess {
84                return Err(serde::ser::Error::custom(
85                    "Crossbeam channel found in multiprocess mode!",
86                ));
87            } // We know everything is in one address-space, so we can "serialize" the sender by
88            // sending a leaked Box pointer.
89            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
90            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
91        },
92    }
93}
94
95impl<T: Serialize> Serialize for GenericSender<T> {
96    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
97        serialize_generic_sender_variants(&self.0, s)
98    }
99}
100
101struct GenericSenderVisitor<T> {
102    marker: PhantomData<T>,
103}
104
105impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
106    type Value = GenericSenderVariants<T>;
107
108    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
109        formatter.write_str("a GenericSender variant")
110    }
111
112    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
113    where
114        A: serde::de::EnumAccess<'de>,
115    {
116        #[derive(Deserialize)]
117        enum GenericSenderVariantNames {
118            Ipc,
119            Crossbeam,
120        }
121
122        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
123
124        match variant_name {
125            GenericSenderVariantNames::Ipc => variant_data
126                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
127                .map(|sender| GenericSenderVariants::Ipc(sender)),
128            GenericSenderVariantNames::Crossbeam => {
129                if opts::get().multiprocess {
130                    return Err(serde::de::Error::custom(
131                        "Crossbeam channel found in multiprocess mode!",
132                    ));
133                }
134                let addr = variant_data.newtype_variant::<usize>()?;
135                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
136                // SAFETY: We know we are in the same address space as the sender, so we can safely
137                // reconstruct the Box.
138                #[expect(unsafe_code)]
139                let sender = unsafe { Box::from_raw(ptr) };
140                Ok(GenericSenderVariants::Crossbeam(*sender))
141            },
142        }
143    }
144}
145
146impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
147    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
148    where
149        D: Deserializer<'a>,
150    {
151        d.deserialize_enum(
152            "GenericSender",
153            &["Ipc", "Crossbeam"],
154            GenericSenderVisitor {
155                marker: PhantomData,
156            },
157        )
158        .map(|variant| GenericSender(variant))
159    }
160}
161
162impl<T> Clone for GenericSender<T>
163where
164    T: Serialize,
165{
166    fn clone(&self) -> Self {
167        match self.0 {
168            GenericSenderVariants::Ipc(ref chan) => {
169                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
170            },
171            GenericSenderVariants::Crossbeam(ref chan) => {
172                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
173            },
174        }
175    }
176}
177
178impl<T: Serialize> fmt::Debug for GenericSender<T> {
179    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
180        write!(f, "Sender(..)")
181    }
182}
183
184impl<T: Serialize> GenericSender<T> {
185    #[inline]
186    pub fn send(&self, msg: T) -> SendResult {
187        match self.0 {
188            GenericSenderVariants::Ipc(ref sender) => sender
189                .send(msg)
190                .map_err(|e| SendError::SerializationError(format!("{e}"))),
191            GenericSenderVariants::Crossbeam(ref sender) => {
192                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
193            },
194        }
195    }
196}
197
198impl<T: Serialize> MallocSizeOf for GenericSender<T> {
199    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
200        0
201    }
202}
203
204#[derive(Debug)]
205pub enum SendError {
206    Disconnected,
207    SerializationError(String),
208}
209
210impl Display for SendError {
211    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
212        write!(f, "{self:?}")
213    }
214}
215
216pub type SendResult = Result<(), SendError>;
217
218#[derive(Debug)]
219pub enum ReceiveError {
220    DeserializationFailed(String),
221    /// Io Error. May occur when using IPC.
222    Io(std::io::Error),
223    /// The channel was closed.
224    Disconnected,
225}
226
227impl From<IpcError> for ReceiveError {
228    fn from(e: IpcError) -> Self {
229        match e {
230            IpcError::Disconnected => ReceiveError::Disconnected,
231            IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
232            IpcError::Io(reason) => ReceiveError::Io(reason),
233        }
234    }
235}
236
237impl From<crossbeam_channel::RecvError> for ReceiveError {
238    fn from(_: crossbeam_channel::RecvError) -> Self {
239        ReceiveError::Disconnected
240    }
241}
242
243impl fmt::Display for ReceiveError {
244    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
245        match *self {
246            ReceiveError::DeserializationFailed(ref error) => {
247                write!(fmt, "deserialization error: {error}")
248            },
249            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
250            ReceiveError::Disconnected => write!(fmt, "disconnected"),
251        }
252    }
253}
254impl From<std::io::Error> for ReceiveError {
255    fn from(value: std::io::Error) -> Self {
256        ReceiveError::Io(value)
257    }
258}
259
260pub enum TryReceiveError {
261    Empty,
262    ReceiveError(ReceiveError),
263}
264
265impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
266    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
267        match value {
268            RecvTimeoutError::Timeout => TryReceiveError::Empty,
269            RecvTimeoutError::Disconnected => {
270                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
271            },
272        }
273    }
274}
275
276impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
277    fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
278        match e {
279            ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
280            ipc_channel::ipc::TryRecvError::IpcError(inner) => {
281                TryReceiveError::ReceiveError(inner.into())
282            },
283        }
284    }
285}
286
287impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
288    fn from(e: crossbeam_channel::TryRecvError) -> Self {
289        match e {
290            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
291            crossbeam_channel::TryRecvError::Disconnected => {
292                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
293            },
294        }
295    }
296}
297
298pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
299pub type ReceiveResult<T> = Result<T, ReceiveError>;
300pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
301pub type RoutedReceiverReceiveResult<T> =
302    Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
303
304pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
305    match receive_result {
306        Ok(Ok(msg)) => Ok(msg),
307        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
308        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
309    }
310}
311
312#[derive(MallocSizeOf)]
313pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
314where
315    T: for<'de> Deserialize<'de> + Serialize;
316
317#[derive(MallocSizeOf)]
318enum GenericReceiverVariants<T>
319where
320    T: for<'de> Deserialize<'de> + Serialize,
321{
322    Ipc(ipc_channel::ipc::IpcReceiver<T>),
323    Crossbeam(RoutedReceiver<T>),
324}
325
326impl<T> GenericReceiver<T>
327where
328    T: for<'de> Deserialize<'de> + Serialize,
329{
330    #[inline]
331    pub fn recv(&self) -> ReceiveResult<T> {
332        match self.0 {
333            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
334            GenericReceiverVariants::Crossbeam(ref receiver) => {
335                // `recv()` returns an error if the channel is disconnected
336                let msg = receiver.recv()?;
337                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
338                // unconditionally send an `Ok(T)`
339                Ok(msg.expect("Infallible"))
340            },
341        }
342    }
343
344    #[inline]
345    pub fn try_recv(&self) -> TryReceiveResult<T> {
346        match self.0 {
347            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
348            GenericReceiverVariants::Crossbeam(ref receiver) => {
349                let msg = receiver.try_recv()?;
350                Ok(msg.expect("Infallible"))
351            },
352        }
353    }
354
355    /// Blocks up to the specific duration attempting to receive a message.
356    #[inline]
357    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
358        match self.0 {
359            GenericReceiverVariants::Ipc(ref ipc_receiver) => {
360                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
361            },
362            GenericReceiverVariants::Crossbeam(ref receiver) => {
363                match receiver.recv_timeout(timeout) {
364                    Ok(Ok(value)) => Ok(value),
365                    Ok(Err(_)) => unreachable!("Infallable"),
366                    Err(RecvTimeoutError::Disconnected) => {
367                        Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
368                    },
369                    Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
370                }
371            },
372        }
373    }
374
375    /// Route to a crossbeam receiver, preserving any errors.
376    ///
377    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
378    /// this creates a route.
379    #[inline]
380    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
381    where
382        T: Send + 'static,
383    {
384        match self.0 {
385            GenericReceiverVariants::Ipc(ipc_receiver) => {
386                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
387                let crossbeam_sender_clone = crossbeam_sender.clone();
388                ROUTER.add_typed_route(
389                    ipc_receiver,
390                    Box::new(move |message| {
391                        let _ = crossbeam_sender_clone.send(message);
392                    }),
393                );
394                crossbeam_receiver
395            },
396            GenericReceiverVariants::Crossbeam(receiver) => receiver,
397        }
398    }
399}
400
401impl<T> Serialize for GenericReceiver<T>
402where
403    T: for<'de> Deserialize<'de> + Serialize,
404{
405    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
406        match &self.0 {
407            GenericReceiverVariants::Ipc(receiver) => {
408                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
409            },
410            GenericReceiverVariants::Crossbeam(receiver) => {
411                if opts::get().multiprocess {
412                    return Err(serde::ser::Error::custom(
413                        "Crossbeam channel found in multiprocess mode!",
414                    ));
415                } // We know everything is in one address-space, so we can "serialize" the receiver by
416                // sending a leaked Box pointer.
417                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
418                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
419            },
420        }
421    }
422}
423
424struct GenericReceiverVisitor<T> {
425    marker: PhantomData<T>,
426}
427impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
428where
429    T: for<'a> Deserialize<'a> + Serialize,
430{
431    type Value = GenericReceiver<T>;
432
433    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
434        formatter.write_str("a GenericReceiver variant")
435    }
436
437    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
438    where
439        A: serde::de::EnumAccess<'de>,
440    {
441        #[derive(Deserialize)]
442        enum GenericReceiverVariantNames {
443            Ipc,
444            Crossbeam,
445        }
446
447        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
448
449        match variant_name {
450            GenericReceiverVariantNames::Ipc => variant_data
451                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
452                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
453            GenericReceiverVariantNames::Crossbeam => {
454                if opts::get().multiprocess {
455                    return Err(serde::de::Error::custom(
456                        "Crossbeam channel found in multiprocess mode!",
457                    ));
458                }
459                let addr = variant_data.newtype_variant::<usize>()?;
460                let ptr = addr as *mut RoutedReceiver<T>;
461                // SAFETY: We know we are in the same address space as the sender, so we can safely
462                // reconstruct the Box.
463                #[expect(unsafe_code)]
464                let receiver = unsafe { Box::from_raw(ptr) };
465                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
466                    *receiver,
467                )))
468            },
469        }
470    }
471}
472
473impl<'a, T> Deserialize<'a> for GenericReceiver<T>
474where
475    T: for<'de> Deserialize<'de> + Serialize,
476{
477    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
478    where
479        D: Deserializer<'a>,
480    {
481        d.deserialize_enum(
482            "GenericReceiver",
483            &["Ipc", "Crossbeam"],
484            GenericReceiverVisitor {
485                marker: PhantomData,
486            },
487        )
488    }
489}
490
491/// Private helper function to create a crossbeam based channel.
492///
493/// Do NOT make this function public!
494fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
495where
496    T: Serialize + for<'de> serde::Deserialize<'de>,
497{
498    let (tx, rx) = crossbeam_channel::unbounded();
499    (
500        GenericSender(GenericSenderVariants::Crossbeam(tx)),
501        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
502    )
503}
504
505fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
506where
507    T: Serialize + for<'de> serde::Deserialize<'de>,
508{
509    ipc_channel::ipc::channel().map(|(tx, rx)| {
510        (
511            GenericSender(GenericSenderVariants::Ipc(tx)),
512            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
513        )
514    })
515}
516
517/// Creates a Servo channel that can select different channel implementations based on multiprocess
518/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
519/// crossbeam channel is preferred.
520pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
521where
522    T: for<'de> Deserialize<'de> + Serialize,
523{
524    if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
525        new_generic_channel_ipc().ok()
526    } else {
527        Some(new_generic_channel_crossbeam())
528    }
529}
530
531#[cfg(test)]
532mod single_process_channel_tests {
533    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
534    //! can be sent over each other without problems in single-process mode.
535    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
536    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
537    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
538
539    #[test]
540    fn generic_crossbeam_can_send() {
541        let (tx, rx) = new_generic_channel_crossbeam();
542        tx.send(5).expect("Send failed");
543        let val = rx.recv().expect("Receive failed");
544        assert_eq!(val, 5);
545    }
546
547    #[test]
548    fn generic_crossbeam_ping_pong() {
549        let (tx, rx) = new_generic_channel_crossbeam();
550        let (tx2, rx2) = new_generic_channel_crossbeam();
551
552        tx.send(tx2).expect("Send failed");
553
554        std::thread::scope(|s| {
555            s.spawn(move || {
556                let reply_sender = rx.recv().expect("Receive failed");
557                reply_sender.send(42).expect("Sending reply failed");
558            });
559        });
560        let res = rx2.recv().expect("Receive of reply failed");
561        assert_eq!(res, 42);
562    }
563
564    #[test]
565    fn generic_ipc_ping_pong() {
566        let (tx, rx) = new_generic_channel_ipc().unwrap();
567        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
568
569        tx.send(tx2).expect("Send failed");
570
571        std::thread::scope(|s| {
572            s.spawn(move || {
573                let reply_sender = rx.recv().expect("Receive failed");
574                reply_sender.send(42).expect("Sending reply failed");
575            });
576        });
577        let res = rx2.recv().expect("Receive of reply failed");
578        assert_eq!(res, 42);
579    }
580
581    #[test]
582    fn send_crossbeam_sender_over_ipc_channel() {
583        let (tx, rx) = new_generic_channel_ipc().unwrap();
584        let (tx2, rx2) = new_generic_channel_crossbeam();
585
586        tx.send(tx2).expect("Send failed");
587
588        std::thread::scope(|s| {
589            s.spawn(move || {
590                let reply_sender = rx.recv().expect("Receive failed");
591                reply_sender.send(42).expect("Sending reply failed");
592            });
593        });
594        let res = rx2.recv().expect("Receive of reply failed");
595        assert_eq!(res, 42);
596    }
597
598    #[test]
599    fn send_generic_ipc_channel_over_crossbeam() {
600        let (tx, rx) = new_generic_channel_crossbeam();
601        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
602
603        tx.send(tx2).expect("Send failed");
604
605        std::thread::scope(|s| {
606            s.spawn(move || {
607                let reply_sender = rx.recv().expect("Receive failed");
608                reply_sender.send(42).expect("Sending reply failed");
609            });
610        });
611        let res = rx2.recv().expect("Receive of reply failed");
612        assert_eq!(res, 42);
613    }
614
615    #[test]
616    fn send_crossbeam_receiver_over_ipc_channel() {
617        let (tx, rx) = new_generic_channel_ipc().unwrap();
618        let (tx2, rx2) = new_generic_channel_crossbeam();
619
620        tx.send(rx2).expect("Send failed");
621        tx2.send(42).expect("Send failed");
622
623        std::thread::scope(|s| {
624            s.spawn(move || {
625                let another_receiver = rx.recv().expect("Receive failed");
626                let res = another_receiver.recv().expect("Receive failed");
627                assert_eq!(res, 42);
628            });
629        });
630    }
631
632    #[test]
633    fn test_timeout_ipc() {
634        let (tx, rx) = new_generic_channel_ipc().unwrap();
635        let timeout_duration = std::time::Duration::from_secs(3);
636        std::thread::spawn(move || {
637            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
638            assert!(tx.send(()).is_ok());
639        });
640        let received = rx.try_recv_timeout(timeout_duration);
641        assert!(received.is_ok());
642    }
643
644    #[test]
645    fn test_timeout_crossbeam() {
646        let (tx, rx) = new_generic_channel_crossbeam();
647        let timeout_duration = std::time::Duration::from_secs(3);
648        std::thread::spawn(move || {
649            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
650            assert!(tx.send(()).is_ok());
651        });
652        let received = rx.try_recv_timeout(timeout_duration);
653        assert!(received.is_ok());
654    }
655}
656
657/// This tests need to be in here because they use the 'new_generic_channel_..' methods
658#[cfg(test)]
659mod generic_receiversets_tests {
660    use std::time::Duration;
661
662    use crate::generic_channel::generic_channelset::{
663        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
664    };
665    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
666
667    #[test]
668    fn test_ipc_side1() {
669        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
670        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
671
672        // We keep the senders alive till all threads are done
673        let snd1_c = snd1.clone();
674        let snd2_c = snd2.clone();
675        let mut set = create_ipc_receiver_set();
676        let recv1_select_index = set.add(recv1);
677        let _recv2_select_index = set.add(recv2);
678
679        std::thread::spawn(move || {
680            snd1_c.send(10).unwrap();
681        });
682        std::thread::spawn(move || {
683            std::thread::sleep(Duration::from_secs(1));
684            let _ = snd2_c.send(20); // this might error with closed channel
685        });
686
687        let select_result = set.select();
688        let channel_result = select_result.first().unwrap();
689        assert_eq!(
690            *channel_result,
691            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
692        );
693    }
694
695    #[test]
696    fn test_ipc_side2() {
697        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
698        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
699
700        // We keep the senders alive till all threads are done
701        let snd1_c = snd1.clone();
702        let snd2_c = snd2.clone();
703        let mut set = create_ipc_receiver_set();
704        let _recv1_select_index = set.add(recv1);
705        let recv2_select_index = set.add(recv2);
706
707        std::thread::spawn(move || {
708            std::thread::sleep(Duration::from_secs(1));
709            let _ = snd1_c.send(10);
710        });
711        std::thread::spawn(move || {
712            snd2_c.send(20).unwrap();
713        });
714
715        let select_result = set.select();
716        let channel_result = select_result.first().unwrap();
717        assert_eq!(
718            *channel_result,
719            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
720        );
721    }
722
723    #[test]
724    fn test_crossbeam_side1() {
725        let (snd1, recv1) = new_generic_channel_crossbeam();
726        let (snd2, recv2) = new_generic_channel_crossbeam();
727
728        // We keep the senders alive till all threads are done
729        let snd1_c = snd1.clone();
730        let snd2_c = snd2.clone();
731        let mut set = create_crossbeam_receiver_set();
732        let recv1_select_index = set.add(recv1);
733        let _recv2_select_index = set.add(recv2);
734
735        std::thread::spawn(move || {
736            snd1_c.send(10).unwrap();
737        });
738        std::thread::spawn(move || {
739            std::thread::sleep(Duration::from_secs(2));
740            let _ = snd2_c.send(20);
741        });
742
743        let select_result = set.select();
744        let channel_result = select_result.first().unwrap();
745        assert_eq!(
746            *channel_result,
747            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
748        );
749    }
750
751    #[test]
752    fn test_crossbeam_side2() {
753        let (snd1, recv1) = new_generic_channel_crossbeam();
754        let (snd2, recv2) = new_generic_channel_crossbeam();
755
756        // We keep the senders alive till all threads are done
757        let snd1_c = snd1.clone();
758        let snd2_c = snd2.clone();
759        let mut set = create_crossbeam_receiver_set();
760        let _recv1_select_index = set.add(recv1);
761        let recv2_select_index = set.add(recv2);
762
763        std::thread::spawn(move || {
764            std::thread::sleep(Duration::from_secs(2));
765            let _ = snd1_c.send(10);
766        });
767        std::thread::spawn(move || {
768            snd2_c.send(20).unwrap();
769        });
770
771        let select_result = set.select();
772        let channel_result = select_result.first().unwrap();
773        assert_eq!(
774            *channel_result,
775            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
776        );
777    }
778
779    #[test]
780    fn test_ipc_no_crash_on_disconnect() {
781        // Test that we do not crash if a channel gets disconnected.
782        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
783        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
784        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
785
786        // We keep the senders alive till all threads are done
787        let snd1_c = snd1.clone();
788        let mut set = create_ipc_receiver_set();
789        let _recv1_select_index = set.add(recv1);
790        let recv2_select_index = set.add(recv2);
791
792        std::thread::spawn(move || {
793            std::thread::sleep(Duration::from_secs(2));
794            let _ = snd1_c.send(10);
795        });
796        std::thread::spawn(move || {
797            snd2.send(20).unwrap();
798        });
799        std::thread::sleep(Duration::from_secs(1));
800        let select_result = set.select();
801        let channel_result = select_result.first().unwrap();
802        assert_eq!(
803            *channel_result,
804            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
805        );
806    }
807
808    #[test]
809    fn test_crossbeam_no_crash_on_disconnect() {
810        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
811        let (snd1, recv1) = new_generic_channel_crossbeam();
812        let (snd2, recv2) = new_generic_channel_crossbeam();
813
814        // We keep the senders alive till all threads are done
815        let snd1_c = snd1.clone();
816        let mut set = create_crossbeam_receiver_set();
817        let _recv1_select_index = set.add(recv1);
818        let recv2_select_index = set.add(recv2);
819
820        std::thread::spawn(move || {
821            std::thread::sleep(Duration::from_secs(2));
822            let _ = snd1_c.send(10);
823        });
824        std::thread::spawn(move || {
825            snd2.send(20).unwrap();
826        });
827        std::thread::sleep(Duration::from_secs(1));
828        let select_result = set.select();
829        let channel_result = select_result.first().unwrap();
830        assert_eq!(
831            *channel_result,
832            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
833        );
834    }
835
836    #[test]
837    fn test_ipc_disconnect_correct_message() {
838        // Test that we do not crash if a channel gets disconnected.
839        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
840        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
841
842        // We keep the senders alive till all threads are done
843        let snd1_c = snd1.clone();
844        let mut set = create_ipc_receiver_set();
845        let _recv1_select_index = set.add(recv1);
846        let recv2_select_index = set.add(recv2);
847
848        std::thread::spawn(move || {
849            std::thread::sleep(Duration::from_secs(2));
850            let _ = snd1_c.send(10);
851        });
852        std::thread::spawn(move || {
853            drop(snd2);
854        });
855
856        let select_result = set.select();
857        let channel_result = select_result.first().unwrap();
858        assert_eq!(
859            *channel_result,
860            GenericSelectionResult::ChannelClosed(recv2_select_index)
861        );
862    }
863
864    #[test]
865    fn test_crossbeam_disconnect_correct_messaget() {
866        let (snd1, recv1) = new_generic_channel_crossbeam();
867        let (snd2, recv2) = new_generic_channel_crossbeam();
868
869        // We keep the senders alive till all threads are done
870        let snd1_c = snd1.clone();
871        let mut set = create_crossbeam_receiver_set();
872        let _recv1_select_index = set.add(recv1);
873        let recv2_select_index = set.add(recv2);
874
875        std::thread::spawn(move || {
876            std::thread::sleep(Duration::from_secs(2));
877            let _ = snd1_c.send(10);
878        });
879        std::thread::spawn(move || {
880            drop(snd2);
881        });
882
883        let select_result = set.select();
884        let channel_result = select_result.first().unwrap();
885        assert_eq!(
886            *channel_result,
887            GenericSelectionResult::ChannelClosed(recv2_select_index)
888        );
889    }
890}