base/
generic_channel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::time::Duration;
11
12use crossbeam_channel::RecvTimeoutError;
13use ipc_channel::ipc::IpcError;
14use ipc_channel::router::ROUTER;
15use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
16use malloc_size_of_derive::MallocSizeOf;
17use serde::de::VariantAccess;
18use serde::{Deserialize, Deserializer, Serialize, Serializer};
19use servo_config::opts;
20
21mod callback;
22pub use callback::GenericCallback;
23mod oneshot;
24/// We want to discourage anybody from using the ipc_channel crate in servo and use 'GenericChannels' instead.
25/// 'GenericSharedMemory' is, however, still useful so we reexport it under a different name for future optimization.
26pub use ipc_channel::ipc::IpcSharedMemory as GenericSharedMemory;
27pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31/// Abstraction of the ability to send a particular type of message cross-process.
32/// This can be used to ease the use of GenericSender sub-fields.
33pub trait GenericSend<T>
34where
35    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
36{
37    /// send message T
38    fn send(&self, _: T) -> SendResult;
39    /// get underlying sender
40    fn sender(&self) -> GenericSender<T>;
41}
42
43/// A GenericSender that sends messages to a [GenericReceiver].
44///
45/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
46pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
47
48/// The actual GenericSender variant.
49///
50/// This enum is private, so that outside code can't construct a GenericSender itself.
51/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
52enum GenericSenderVariants<T: Serialize> {
53    Ipc(ipc_channel::ipc::IpcSender<T>),
54    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
55    /// which propagates the IPC error, the inner type is a Result.
56    /// In the IPC case, the Router deserializes the message, which can fail, and sends
57    /// the result to a crossbeam receiver.
58    /// The crossbeam channel does not involve serializing, so we can't have this error,
59    /// but replicating the API allows us to have one channel type as the receiver
60    /// after routing the receiver .
61    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
62}
63
64fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
65    value: &GenericSenderVariants<T>,
66    s: S,
67) -> Result<S::Ok, S::Error> {
68    match value {
69        GenericSenderVariants::Ipc(sender) => {
70            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
71        },
72        // All GenericSenders will be IPC channels in multi-process mode, so sending a
73        // GenericChannel over existing IPC channels is no problem and won't fail.
74        // In single-process mode, we can also send GenericSenders over other GenericSenders
75        // just fine, since no serialization is required.
76        // The only reason we need / want serialization is to support sending GenericSenders
77        // over existing IPC channels **in single process mode**. This allows us to
78        // incrementally port channels to the GenericChannel, without needing to follow a
79        // top-to-bottom approach.
80        // Long-term we can remove this branch in the code again and replace it with
81        // unreachable, since likely all IPC channels would be GenericChannels.
82        GenericSenderVariants::Crossbeam(sender) => {
83            if opts::get().multiprocess {
84                return Err(serde::ser::Error::custom(
85                    "Crossbeam channel found in multiprocess mode!",
86                ));
87            } // We know everything is in one address-space, so we can "serialize" the sender by
88            // sending a leaked Box pointer.
89            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
90            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
91        },
92    }
93}
94
95impl<T: Serialize> Serialize for GenericSender<T> {
96    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
97        serialize_generic_sender_variants(&self.0, s)
98    }
99}
100
101struct GenericSenderVisitor<T> {
102    marker: PhantomData<T>,
103}
104
105impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
106    type Value = GenericSenderVariants<T>;
107
108    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
109        formatter.write_str("a GenericSender variant")
110    }
111
112    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
113    where
114        A: serde::de::EnumAccess<'de>,
115    {
116        #[derive(Deserialize)]
117        enum GenericSenderVariantNames {
118            Ipc,
119            Crossbeam,
120        }
121
122        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
123
124        match variant_name {
125            GenericSenderVariantNames::Ipc => variant_data
126                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
127                .map(|sender| GenericSenderVariants::Ipc(sender)),
128            GenericSenderVariantNames::Crossbeam => {
129                if opts::get().multiprocess {
130                    return Err(serde::de::Error::custom(
131                        "Crossbeam channel found in multiprocess mode!",
132                    ));
133                }
134                let addr = variant_data.newtype_variant::<usize>()?;
135                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
136                // SAFETY: We know we are in the same address space as the sender, so we can safely
137                // reconstruct the Box.
138                #[expect(unsafe_code)]
139                let sender = unsafe { Box::from_raw(ptr) };
140                Ok(GenericSenderVariants::Crossbeam(*sender))
141            },
142        }
143    }
144}
145
146impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
147    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
148    where
149        D: Deserializer<'a>,
150    {
151        d.deserialize_enum(
152            "GenericSender",
153            &["Ipc", "Crossbeam"],
154            GenericSenderVisitor {
155                marker: PhantomData,
156            },
157        )
158        .map(|variant| GenericSender(variant))
159    }
160}
161
162impl<T> Clone for GenericSender<T>
163where
164    T: Serialize,
165{
166    fn clone(&self) -> Self {
167        match self.0 {
168            GenericSenderVariants::Ipc(ref chan) => {
169                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
170            },
171            GenericSenderVariants::Crossbeam(ref chan) => {
172                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
173            },
174        }
175    }
176}
177
178impl<T: Serialize> fmt::Debug for GenericSender<T> {
179    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
180        write!(f, "Sender(..)")
181    }
182}
183
184impl<T: Serialize> GenericSender<T> {
185    #[inline]
186    pub fn send(&self, msg: T) -> SendResult {
187        match self.0 {
188            GenericSenderVariants::Ipc(ref sender) => sender
189                .send(msg)
190                .map_err(|e| SendError::SerializationError(format!("{e}"))),
191            GenericSenderVariants::Crossbeam(ref sender) => {
192                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
193            },
194        }
195    }
196}
197
198impl<T: Serialize> MallocSizeOf for GenericSender<T> {
199    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
200        match &self.0 {
201            GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
202            GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
203        }
204    }
205}
206
207#[derive(Debug)]
208pub enum SendError {
209    Disconnected,
210    SerializationError(String),
211}
212
213impl Display for SendError {
214    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215        write!(f, "{self:?}")
216    }
217}
218
219pub type SendResult = Result<(), SendError>;
220
221#[derive(Debug)]
222pub enum ReceiveError {
223    DeserializationFailed(String),
224    /// Io Error. May occur when using IPC.
225    Io(std::io::Error),
226    /// The channel was closed.
227    Disconnected,
228}
229
230impl From<IpcError> for ReceiveError {
231    fn from(e: IpcError) -> Self {
232        match e {
233            IpcError::Disconnected => ReceiveError::Disconnected,
234            IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
235            IpcError::Io(reason) => ReceiveError::Io(reason),
236        }
237    }
238}
239
240impl From<crossbeam_channel::RecvError> for ReceiveError {
241    fn from(_: crossbeam_channel::RecvError) -> Self {
242        ReceiveError::Disconnected
243    }
244}
245
246impl fmt::Display for ReceiveError {
247    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
248        match *self {
249            ReceiveError::DeserializationFailed(ref error) => {
250                write!(fmt, "deserialization error: {error}")
251            },
252            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
253            ReceiveError::Disconnected => write!(fmt, "disconnected"),
254        }
255    }
256}
257impl From<std::io::Error> for ReceiveError {
258    fn from(value: std::io::Error) -> Self {
259        ReceiveError::Io(value)
260    }
261}
262
263pub enum TryReceiveError {
264    Empty,
265    ReceiveError(ReceiveError),
266}
267
268impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
269    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
270        match value {
271            RecvTimeoutError::Timeout => TryReceiveError::Empty,
272            RecvTimeoutError::Disconnected => {
273                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
274            },
275        }
276    }
277}
278
279impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
280    fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
281        match e {
282            ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
283            ipc_channel::ipc::TryRecvError::IpcError(inner) => {
284                TryReceiveError::ReceiveError(inner.into())
285            },
286        }
287    }
288}
289
290impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
291    fn from(e: crossbeam_channel::TryRecvError) -> Self {
292        match e {
293            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
294            crossbeam_channel::TryRecvError::Disconnected => {
295                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
296            },
297        }
298    }
299}
300
301pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
302pub type ReceiveResult<T> = Result<T, ReceiveError>;
303pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
304pub type RoutedReceiverReceiveResult<T> =
305    Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
306
307pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
308    match receive_result {
309        Ok(Ok(msg)) => Ok(msg),
310        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
311        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
312    }
313}
314
315#[derive(MallocSizeOf)]
316pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
317where
318    T: for<'de> Deserialize<'de> + Serialize;
319
320impl<T> std::fmt::Debug for GenericReceiver<T>
321where
322    T: for<'de> Deserialize<'de> + Serialize,
323{
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        f.debug_tuple("GenericReceiver").finish()
326    }
327}
328
329#[derive(MallocSizeOf)]
330enum GenericReceiverVariants<T>
331where
332    T: for<'de> Deserialize<'de> + Serialize,
333{
334    Ipc(ipc_channel::ipc::IpcReceiver<T>),
335    Crossbeam(RoutedReceiver<T>),
336}
337
338impl<T> GenericReceiver<T>
339where
340    T: for<'de> Deserialize<'de> + Serialize,
341{
342    #[inline]
343    pub fn recv(&self) -> ReceiveResult<T> {
344        match self.0 {
345            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
346            GenericReceiverVariants::Crossbeam(ref receiver) => {
347                // `recv()` returns an error if the channel is disconnected
348                let msg = receiver.recv()?;
349                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
350                // unconditionally send an `Ok(T)`
351                Ok(msg.expect("Infallible"))
352            },
353        }
354    }
355
356    #[inline]
357    pub fn try_recv(&self) -> TryReceiveResult<T> {
358        match self.0 {
359            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
360            GenericReceiverVariants::Crossbeam(ref receiver) => {
361                let msg = receiver.try_recv()?;
362                Ok(msg.expect("Infallible"))
363            },
364        }
365    }
366
367    /// Blocks up to the specific duration attempting to receive a message.
368    #[inline]
369    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
370        match self.0 {
371            GenericReceiverVariants::Ipc(ref ipc_receiver) => {
372                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
373            },
374            GenericReceiverVariants::Crossbeam(ref receiver) => {
375                match receiver.recv_timeout(timeout) {
376                    Ok(Ok(value)) => Ok(value),
377                    Ok(Err(_)) => unreachable!("Infallable"),
378                    Err(RecvTimeoutError::Disconnected) => {
379                        Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
380                    },
381                    Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
382                }
383            },
384        }
385    }
386
387    /// Route to a crossbeam receiver, preserving any errors.
388    ///
389    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
390    /// this creates a route.
391    #[inline]
392    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
393    where
394        T: Send + 'static,
395    {
396        match self.0 {
397            GenericReceiverVariants::Ipc(ipc_receiver) => {
398                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
399                let crossbeam_sender_clone = crossbeam_sender.clone();
400                ROUTER.add_typed_route(
401                    ipc_receiver,
402                    Box::new(move |message| {
403                        let _ = crossbeam_sender_clone.send(message);
404                    }),
405                );
406                crossbeam_receiver
407            },
408            GenericReceiverVariants::Crossbeam(receiver) => receiver,
409        }
410    }
411}
412
413impl<T> Serialize for GenericReceiver<T>
414where
415    T: for<'de> Deserialize<'de> + Serialize,
416{
417    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
418        match &self.0 {
419            GenericReceiverVariants::Ipc(receiver) => {
420                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
421            },
422            GenericReceiverVariants::Crossbeam(receiver) => {
423                if opts::get().multiprocess {
424                    return Err(serde::ser::Error::custom(
425                        "Crossbeam channel found in multiprocess mode!",
426                    ));
427                } // We know everything is in one address-space, so we can "serialize" the receiver by
428                // sending a leaked Box pointer.
429                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
430                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
431            },
432        }
433    }
434}
435
436struct GenericReceiverVisitor<T> {
437    marker: PhantomData<T>,
438}
439impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
440where
441    T: for<'a> Deserialize<'a> + Serialize,
442{
443    type Value = GenericReceiver<T>;
444
445    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
446        formatter.write_str("a GenericReceiver variant")
447    }
448
449    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
450    where
451        A: serde::de::EnumAccess<'de>,
452    {
453        #[derive(Deserialize)]
454        enum GenericReceiverVariantNames {
455            Ipc,
456            Crossbeam,
457        }
458
459        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
460
461        match variant_name {
462            GenericReceiverVariantNames::Ipc => variant_data
463                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
464                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
465            GenericReceiverVariantNames::Crossbeam => {
466                if opts::get().multiprocess {
467                    return Err(serde::de::Error::custom(
468                        "Crossbeam channel found in multiprocess mode!",
469                    ));
470                }
471                let addr = variant_data.newtype_variant::<usize>()?;
472                let ptr = addr as *mut RoutedReceiver<T>;
473                // SAFETY: We know we are in the same address space as the sender, so we can safely
474                // reconstruct the Box.
475                #[expect(unsafe_code)]
476                let receiver = unsafe { Box::from_raw(ptr) };
477                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
478                    *receiver,
479                )))
480            },
481        }
482    }
483}
484
485impl<'a, T> Deserialize<'a> for GenericReceiver<T>
486where
487    T: for<'de> Deserialize<'de> + Serialize,
488{
489    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
490    where
491        D: Deserializer<'a>,
492    {
493        d.deserialize_enum(
494            "GenericReceiver",
495            &["Ipc", "Crossbeam"],
496            GenericReceiverVisitor {
497                marker: PhantomData,
498            },
499        )
500    }
501}
502
503/// Private helper function to create a crossbeam based channel.
504///
505/// Do NOT make this function public!
506fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
507where
508    T: Serialize + for<'de> serde::Deserialize<'de>,
509{
510    let (tx, rx) = crossbeam_channel::unbounded();
511    (
512        GenericSender(GenericSenderVariants::Crossbeam(tx)),
513        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
514    )
515}
516
517fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
518where
519    T: Serialize + for<'de> serde::Deserialize<'de>,
520{
521    ipc_channel::ipc::channel().map(|(tx, rx)| {
522        (
523            GenericSender(GenericSenderVariants::Ipc(tx)),
524            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
525        )
526    })
527}
528
529/// Creates a Servo channel that can select different channel implementations based on multiprocess
530/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
531/// crossbeam channel is preferred.
532pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
533where
534    T: for<'de> Deserialize<'de> + Serialize,
535{
536    if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
537        new_generic_channel_ipc().ok()
538    } else {
539        Some(new_generic_channel_crossbeam())
540    }
541}
542
543#[cfg(test)]
544mod single_process_channel_tests {
545    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
546    //! can be sent over each other without problems in single-process mode.
547    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
548    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
549    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
550
551    #[test]
552    fn generic_crossbeam_can_send() {
553        let (tx, rx) = new_generic_channel_crossbeam();
554        tx.send(5).expect("Send failed");
555        let val = rx.recv().expect("Receive failed");
556        assert_eq!(val, 5);
557    }
558
559    #[test]
560    fn generic_crossbeam_ping_pong() {
561        let (tx, rx) = new_generic_channel_crossbeam();
562        let (tx2, rx2) = new_generic_channel_crossbeam();
563
564        tx.send(tx2).expect("Send failed");
565
566        std::thread::scope(|s| {
567            s.spawn(move || {
568                let reply_sender = rx.recv().expect("Receive failed");
569                reply_sender.send(42).expect("Sending reply failed");
570            });
571        });
572        let res = rx2.recv().expect("Receive of reply failed");
573        assert_eq!(res, 42);
574    }
575
576    #[test]
577    fn generic_ipc_ping_pong() {
578        let (tx, rx) = new_generic_channel_ipc().unwrap();
579        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
580
581        tx.send(tx2).expect("Send failed");
582
583        std::thread::scope(|s| {
584            s.spawn(move || {
585                let reply_sender = rx.recv().expect("Receive failed");
586                reply_sender.send(42).expect("Sending reply failed");
587            });
588        });
589        let res = rx2.recv().expect("Receive of reply failed");
590        assert_eq!(res, 42);
591    }
592
593    #[test]
594    fn send_crossbeam_sender_over_ipc_channel() {
595        let (tx, rx) = new_generic_channel_ipc().unwrap();
596        let (tx2, rx2) = new_generic_channel_crossbeam();
597
598        tx.send(tx2).expect("Send failed");
599
600        std::thread::scope(|s| {
601            s.spawn(move || {
602                let reply_sender = rx.recv().expect("Receive failed");
603                reply_sender.send(42).expect("Sending reply failed");
604            });
605        });
606        let res = rx2.recv().expect("Receive of reply failed");
607        assert_eq!(res, 42);
608    }
609
610    #[test]
611    fn send_generic_ipc_channel_over_crossbeam() {
612        let (tx, rx) = new_generic_channel_crossbeam();
613        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
614
615        tx.send(tx2).expect("Send failed");
616
617        std::thread::scope(|s| {
618            s.spawn(move || {
619                let reply_sender = rx.recv().expect("Receive failed");
620                reply_sender.send(42).expect("Sending reply failed");
621            });
622        });
623        let res = rx2.recv().expect("Receive of reply failed");
624        assert_eq!(res, 42);
625    }
626
627    #[test]
628    fn send_crossbeam_receiver_over_ipc_channel() {
629        let (tx, rx) = new_generic_channel_ipc().unwrap();
630        let (tx2, rx2) = new_generic_channel_crossbeam();
631
632        tx.send(rx2).expect("Send failed");
633        tx2.send(42).expect("Send failed");
634
635        std::thread::scope(|s| {
636            s.spawn(move || {
637                let another_receiver = rx.recv().expect("Receive failed");
638                let res = another_receiver.recv().expect("Receive failed");
639                assert_eq!(res, 42);
640            });
641        });
642    }
643
644    #[test]
645    fn test_timeout_ipc() {
646        let (tx, rx) = new_generic_channel_ipc().unwrap();
647        let timeout_duration = std::time::Duration::from_secs(3);
648        std::thread::spawn(move || {
649            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
650            assert!(tx.send(()).is_ok());
651        });
652        let received = rx.try_recv_timeout(timeout_duration);
653        assert!(received.is_ok());
654    }
655
656    #[test]
657    fn test_timeout_crossbeam() {
658        let (tx, rx) = new_generic_channel_crossbeam();
659        let timeout_duration = std::time::Duration::from_secs(3);
660        std::thread::spawn(move || {
661            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
662            assert!(tx.send(()).is_ok());
663        });
664        let received = rx.try_recv_timeout(timeout_duration);
665        assert!(received.is_ok());
666    }
667}
668
669/// This tests need to be in here because they use the 'new_generic_channel_..' methods
670#[cfg(test)]
671mod generic_receiversets_tests {
672    use std::time::Duration;
673
674    use crate::generic_channel::generic_channelset::{
675        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
676    };
677    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
678
679    #[test]
680    fn test_ipc_side1() {
681        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
682        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
683
684        // We keep the senders alive till all threads are done
685        let snd1_c = snd1.clone();
686        let snd2_c = snd2.clone();
687        let mut set = create_ipc_receiver_set();
688        let recv1_select_index = set.add(recv1);
689        let _recv2_select_index = set.add(recv2);
690
691        std::thread::spawn(move || {
692            snd1_c.send(10).unwrap();
693        });
694        std::thread::spawn(move || {
695            std::thread::sleep(Duration::from_secs(1));
696            let _ = snd2_c.send(20); // this might error with closed channel
697        });
698
699        let select_result = set.select();
700        let channel_result = select_result.first().unwrap();
701        assert_eq!(
702            *channel_result,
703            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
704        );
705    }
706
707    #[test]
708    fn test_ipc_side2() {
709        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712        // We keep the senders alive till all threads are done
713        let snd1_c = snd1.clone();
714        let snd2_c = snd2.clone();
715        let mut set = create_ipc_receiver_set();
716        let _recv1_select_index = set.add(recv1);
717        let recv2_select_index = set.add(recv2);
718
719        std::thread::spawn(move || {
720            std::thread::sleep(Duration::from_secs(1));
721            let _ = snd1_c.send(10);
722        });
723        std::thread::spawn(move || {
724            snd2_c.send(20).unwrap();
725        });
726
727        let select_result = set.select();
728        let channel_result = select_result.first().unwrap();
729        assert_eq!(
730            *channel_result,
731            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
732        );
733    }
734
735    #[test]
736    fn test_crossbeam_side1() {
737        let (snd1, recv1) = new_generic_channel_crossbeam();
738        let (snd2, recv2) = new_generic_channel_crossbeam();
739
740        // We keep the senders alive till all threads are done
741        let snd1_c = snd1.clone();
742        let snd2_c = snd2.clone();
743        let mut set = create_crossbeam_receiver_set();
744        let recv1_select_index = set.add(recv1);
745        let _recv2_select_index = set.add(recv2);
746
747        std::thread::spawn(move || {
748            snd1_c.send(10).unwrap();
749        });
750        std::thread::spawn(move || {
751            std::thread::sleep(Duration::from_secs(2));
752            let _ = snd2_c.send(20);
753        });
754
755        let select_result = set.select();
756        let channel_result = select_result.first().unwrap();
757        assert_eq!(
758            *channel_result,
759            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
760        );
761    }
762
763    #[test]
764    fn test_crossbeam_side2() {
765        let (snd1, recv1) = new_generic_channel_crossbeam();
766        let (snd2, recv2) = new_generic_channel_crossbeam();
767
768        // We keep the senders alive till all threads are done
769        let snd1_c = snd1.clone();
770        let snd2_c = snd2.clone();
771        let mut set = create_crossbeam_receiver_set();
772        let _recv1_select_index = set.add(recv1);
773        let recv2_select_index = set.add(recv2);
774
775        std::thread::spawn(move || {
776            std::thread::sleep(Duration::from_secs(2));
777            let _ = snd1_c.send(10);
778        });
779        std::thread::spawn(move || {
780            snd2_c.send(20).unwrap();
781        });
782
783        let select_result = set.select();
784        let channel_result = select_result.first().unwrap();
785        assert_eq!(
786            *channel_result,
787            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
788        );
789    }
790
791    #[test]
792    fn test_ipc_no_crash_on_disconnect() {
793        // Test that we do not crash if a channel gets disconnected.
794        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
795        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
796        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
797
798        // We keep the senders alive till all threads are done
799        let snd1_c = snd1.clone();
800        let mut set = create_ipc_receiver_set();
801        let _recv1_select_index = set.add(recv1);
802        let recv2_select_index = set.add(recv2);
803
804        std::thread::spawn(move || {
805            std::thread::sleep(Duration::from_secs(2));
806            let _ = snd1_c.send(10);
807        });
808        std::thread::spawn(move || {
809            snd2.send(20).unwrap();
810        });
811        std::thread::sleep(Duration::from_secs(1));
812        let select_result = set.select();
813        let channel_result = select_result.first().unwrap();
814        assert_eq!(
815            *channel_result,
816            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
817        );
818    }
819
820    #[test]
821    fn test_crossbeam_no_crash_on_disconnect() {
822        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
823        let (snd1, recv1) = new_generic_channel_crossbeam();
824        let (snd2, recv2) = new_generic_channel_crossbeam();
825
826        // We keep the senders alive till all threads are done
827        let snd1_c = snd1.clone();
828        let mut set = create_crossbeam_receiver_set();
829        let _recv1_select_index = set.add(recv1);
830        let recv2_select_index = set.add(recv2);
831
832        std::thread::spawn(move || {
833            std::thread::sleep(Duration::from_secs(2));
834            let _ = snd1_c.send(10);
835        });
836        std::thread::spawn(move || {
837            snd2.send(20).unwrap();
838        });
839        std::thread::sleep(Duration::from_secs(1));
840        let select_result = set.select();
841        let channel_result = select_result.first().unwrap();
842        assert_eq!(
843            *channel_result,
844            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845        );
846    }
847
848    #[test]
849    fn test_ipc_disconnect_correct_message() {
850        // Test that we do not crash if a channel gets disconnected.
851        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
852        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
853
854        // We keep the senders alive till all threads are done
855        let snd1_c = snd1.clone();
856        let mut set = create_ipc_receiver_set();
857        let _recv1_select_index = set.add(recv1);
858        let recv2_select_index = set.add(recv2);
859
860        std::thread::spawn(move || {
861            std::thread::sleep(Duration::from_secs(2));
862            let _ = snd1_c.send(10);
863        });
864        std::thread::spawn(move || {
865            drop(snd2);
866        });
867
868        let select_result = set.select();
869        let channel_result = select_result.first().unwrap();
870        assert_eq!(
871            *channel_result,
872            GenericSelectionResult::ChannelClosed(recv2_select_index)
873        );
874    }
875
876    #[test]
877    fn test_crossbeam_disconnect_correct_messaget() {
878        let (snd1, recv1) = new_generic_channel_crossbeam();
879        let (snd2, recv2) = new_generic_channel_crossbeam();
880
881        // We keep the senders alive till all threads are done
882        let snd1_c = snd1.clone();
883        let mut set = create_crossbeam_receiver_set();
884        let _recv1_select_index = set.add(recv1);
885        let recv2_select_index = set.add(recv2);
886
887        std::thread::spawn(move || {
888            std::thread::sleep(Duration::from_secs(2));
889            let _ = snd1_c.send(10);
890        });
891        std::thread::spawn(move || {
892            drop(snd2);
893        });
894
895        let select_result = set.select();
896        let channel_result = select_result.first().unwrap();
897        assert_eq!(
898            *channel_result,
899            GenericSelectionResult::ChannelClosed(recv2_select_index)
900        );
901    }
902}