base/
generic_channel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod lazy_callback;
25pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
26mod oneshot;
27mod shared_memory;
28pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
29pub use shared_memory::GenericSharedMemory;
30mod generic_channelset;
31pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
32
33/// Cache for being in Ipc Mode
34static USE_IPC: OnceLock<bool> = OnceLock::new();
35
36/// Return if we should be in IPC Mode
37fn use_ipc() -> bool {
38    *USE_IPC.get_or_init(|| {
39        servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
40    })
41}
42
43/// Abstraction of the ability to send a particular type of message cross-process.
44/// This can be used to ease the use of GenericSender sub-fields.
45pub trait GenericSend<T>
46where
47    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
48{
49    /// send message T
50    fn send(&self, _: T) -> SendResult;
51    /// get underlying sender
52    fn sender(&self) -> GenericSender<T>;
53}
54
55/// A GenericSender that sends messages to a [GenericReceiver].
56///
57/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
58pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
59
60/// The actual GenericSender variant.
61///
62/// This enum is private, so that outside code can't construct a GenericSender itself.
63/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
64enum GenericSenderVariants<T: Serialize> {
65    Ipc(ipc_channel::ipc::IpcSender<T>),
66    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
67    /// which propagates the IPC error, the inner type is a Result.
68    /// In the IPC case, the Router deserializes the message, which can fail, and sends
69    /// the result to a crossbeam receiver.
70    /// The crossbeam channel does not involve serializing, so we can't have this error,
71    /// but replicating the API allows us to have one channel type as the receiver
72    /// after routing the receiver .
73    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
74}
75
76fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
77    value: &GenericSenderVariants<T>,
78    s: S,
79) -> Result<S::Ok, S::Error> {
80    match value {
81        GenericSenderVariants::Ipc(sender) => {
82            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
83        },
84        // All GenericSenders will be IPC channels in multi-process mode, so sending a
85        // GenericChannel over existing IPC channels is no problem and won't fail.
86        // In single-process mode, we can also send GenericSenders over other GenericSenders
87        // just fine, since no serialization is required.
88        // The only reason we need / want serialization is to support sending GenericSenders
89        // over existing IPC channels **in single process mode**. This allows us to
90        // incrementally port channels to the GenericChannel, without needing to follow a
91        // top-to-bottom approach.
92        // Long-term we can remove this branch in the code again and replace it with
93        // unreachable, since likely all IPC channels would be GenericChannels.
94        GenericSenderVariants::Crossbeam(sender) => {
95            if opts::get().multiprocess {
96                return Err(serde::ser::Error::custom(
97                    "Crossbeam channel found in multiprocess mode!",
98                ));
99            } // We know everything is in one address-space, so we can "serialize" the sender by
100            // sending a leaked Box pointer.
101            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
102            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
103        },
104    }
105}
106
107impl<T: Serialize> Serialize for GenericSender<T> {
108    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
109        serialize_generic_sender_variants(&self.0, s)
110    }
111}
112
113struct GenericSenderVisitor<T> {
114    marker: PhantomData<T>,
115}
116
117impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
118    type Value = GenericSenderVariants<T>;
119
120    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121        formatter.write_str("a GenericSender variant")
122    }
123
124    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
125    where
126        A: serde::de::EnumAccess<'de>,
127    {
128        #[derive(Deserialize)]
129        enum GenericSenderVariantNames {
130            Ipc,
131            Crossbeam,
132        }
133
134        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
135
136        match variant_name {
137            GenericSenderVariantNames::Ipc => variant_data
138                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
139                .map(|sender| GenericSenderVariants::Ipc(sender)),
140            GenericSenderVariantNames::Crossbeam => {
141                if opts::get().multiprocess {
142                    return Err(serde::de::Error::custom(
143                        "Crossbeam channel found in multiprocess mode!",
144                    ));
145                }
146                let addr = variant_data.newtype_variant::<usize>()?;
147                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
148                // SAFETY: We know we are in the same address space as the sender, so we can safely
149                // reconstruct the Box.
150                #[expect(unsafe_code)]
151                let sender = unsafe { Box::from_raw(ptr) };
152                Ok(GenericSenderVariants::Crossbeam(*sender))
153            },
154        }
155    }
156}
157
158impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
159    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
160    where
161        D: Deserializer<'a>,
162    {
163        d.deserialize_enum(
164            "GenericSender",
165            &["Ipc", "Crossbeam"],
166            GenericSenderVisitor {
167                marker: PhantomData,
168            },
169        )
170        .map(|variant| GenericSender(variant))
171    }
172}
173
174impl<T> Clone for GenericSender<T>
175where
176    T: Serialize,
177{
178    fn clone(&self) -> Self {
179        match self.0 {
180            GenericSenderVariants::Ipc(ref chan) => {
181                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
182            },
183            GenericSenderVariants::Crossbeam(ref chan) => {
184                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
185            },
186        }
187    }
188}
189
190impl<T: Serialize> fmt::Debug for GenericSender<T> {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        write!(f, "Sender(..)")
193    }
194}
195
196impl<T: Serialize> GenericSender<T> {
197    #[inline]
198    pub fn send(&self, msg: T) -> SendResult {
199        match self.0 {
200            GenericSenderVariants::Ipc(ref sender) => sender
201                .send(msg)
202                .map_err(|e| SendError::SerializationError(format!("{e}"))),
203            GenericSenderVariants::Crossbeam(ref sender) => {
204                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
205            },
206        }
207    }
208}
209
210impl<T: Serialize> MallocSizeOf for GenericSender<T> {
211    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
212        match &self.0 {
213            GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
214            GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
215        }
216    }
217}
218
219#[derive(Debug)]
220pub enum SendError {
221    Disconnected,
222    SerializationError(String),
223}
224
225impl From<IpcError> for SendError {
226    fn from(value: IpcError) -> Self {
227        match value {
228            IpcError::SerializationError(ser_de_error) => {
229                SendError::SerializationError(ser_de_error.to_string())
230            },
231            IpcError::Io(error) => {
232                log::error!("IO Error in ipc {:?}", error);
233                SendError::Disconnected
234            },
235            IpcError::Disconnected => SendError::Disconnected,
236        }
237    }
238}
239
240impl Display for SendError {
241    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242        write!(f, "{self:?}")
243    }
244}
245
246pub type SendResult = Result<(), SendError>;
247
248#[derive(Debug)]
249pub enum ReceiveError {
250    DeserializationFailed(String),
251    /// Io Error. May occur when using IPC.
252    Io(std::io::Error),
253    /// The channel was closed.
254    Disconnected,
255}
256
257impl From<IpcError> for ReceiveError {
258    fn from(e: IpcError) -> Self {
259        match e {
260            IpcError::Disconnected => ReceiveError::Disconnected,
261            IpcError::Io(reason) => ReceiveError::Io(reason),
262            IpcError::SerializationError(ser_de_error) => {
263                ReceiveError::DeserializationFailed(ser_de_error.to_string())
264            },
265        }
266    }
267}
268
269impl From<crossbeam_channel::RecvError> for ReceiveError {
270    fn from(_: crossbeam_channel::RecvError) -> Self {
271        ReceiveError::Disconnected
272    }
273}
274
275impl fmt::Display for ReceiveError {
276    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
277        match *self {
278            ReceiveError::DeserializationFailed(ref error) => {
279                write!(fmt, "deserialization error: {error}")
280            },
281            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
282            ReceiveError::Disconnected => write!(fmt, "disconnected"),
283        }
284    }
285}
286impl From<std::io::Error> for ReceiveError {
287    fn from(value: std::io::Error) -> Self {
288        ReceiveError::Io(value)
289    }
290}
291
292pub enum TryReceiveError {
293    Empty,
294    ReceiveError(ReceiveError),
295}
296
297impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
298    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
299        match value {
300            RecvTimeoutError::Timeout => TryReceiveError::Empty,
301            RecvTimeoutError::Disconnected => {
302                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
303            },
304        }
305    }
306}
307
308impl From<ipc_channel::TryRecvError> for TryReceiveError {
309    fn from(e: ipc_channel::TryRecvError) -> Self {
310        match e {
311            ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
312            ipc_channel::TryRecvError::IpcError(inner) => {
313                TryReceiveError::ReceiveError(inner.into())
314            },
315        }
316    }
317}
318
319impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
320    fn from(e: crossbeam_channel::TryRecvError) -> Self {
321        match e {
322            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
323            crossbeam_channel::TryRecvError::Disconnected => {
324                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
325            },
326        }
327    }
328}
329
330pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
331pub type ReceiveResult<T> = Result<T, ReceiveError>;
332pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
333pub type RoutedReceiverReceiveResult<T> =
334    Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
335
336pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
337    match receive_result {
338        Ok(Ok(msg)) => Ok(msg),
339        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
340        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
341    }
342}
343
344#[derive(MallocSizeOf)]
345pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
346where
347    T: for<'de> Deserialize<'de> + Serialize;
348
349impl<T> std::fmt::Debug for GenericReceiver<T>
350where
351    T: for<'de> Deserialize<'de> + Serialize,
352{
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        f.debug_tuple("GenericReceiver").finish()
355    }
356}
357
358#[derive(MallocSizeOf)]
359enum GenericReceiverVariants<T>
360where
361    T: for<'de> Deserialize<'de> + Serialize,
362{
363    Ipc(ipc_channel::ipc::IpcReceiver<T>),
364    Crossbeam(RoutedReceiver<T>),
365}
366
367impl<T> GenericReceiver<T>
368where
369    T: for<'de> Deserialize<'de> + Serialize,
370{
371    #[inline]
372    pub fn recv(&self) -> ReceiveResult<T> {
373        match self.0 {
374            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
375            GenericReceiverVariants::Crossbeam(ref receiver) => {
376                // `recv()` returns an error if the channel is disconnected
377                let msg = receiver.recv()?;
378                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
379                // unconditionally send an `Ok(T)`
380                Ok(msg.expect("Infallible"))
381            },
382        }
383    }
384
385    #[inline]
386    pub fn try_recv(&self) -> TryReceiveResult<T> {
387        match self.0 {
388            GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
389            GenericReceiverVariants::Crossbeam(ref receiver) => {
390                let msg = receiver.try_recv()?;
391                Ok(msg.expect("Infallible"))
392            },
393        }
394    }
395
396    /// Blocks up to the specific duration attempting to receive a message.
397    #[inline]
398    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
399        match self.0 {
400            GenericReceiverVariants::Ipc(ref ipc_receiver) => {
401                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
402            },
403            GenericReceiverVariants::Crossbeam(ref receiver) => {
404                match receiver.recv_timeout(timeout) {
405                    Ok(Ok(value)) => Ok(value),
406                    Ok(Err(_)) => unreachable!("Infallable"),
407                    Err(RecvTimeoutError::Disconnected) => {
408                        Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
409                    },
410                    Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
411                }
412            },
413        }
414    }
415
416    /// Route to a crossbeam receiver, preserving any errors.
417    ///
418    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
419    /// this creates a route.
420    #[inline]
421    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
422    where
423        T: Send + 'static,
424    {
425        match self.0 {
426            GenericReceiverVariants::Ipc(ipc_receiver) => {
427                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
428                let crossbeam_sender_clone = crossbeam_sender;
429                ROUTER.add_typed_route(
430                    ipc_receiver,
431                    Box::new(move |message| {
432                        let _ = crossbeam_sender_clone
433                            .send(message.map_err(IpcError::SerializationError));
434                    }),
435                );
436                crossbeam_receiver
437            },
438            GenericReceiverVariants::Crossbeam(receiver) => receiver,
439        }
440    }
441}
442
443impl<T> Serialize for GenericReceiver<T>
444where
445    T: for<'de> Deserialize<'de> + Serialize,
446{
447    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
448        match &self.0 {
449            GenericReceiverVariants::Ipc(receiver) => {
450                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
451            },
452            GenericReceiverVariants::Crossbeam(receiver) => {
453                if opts::get().multiprocess {
454                    return Err(serde::ser::Error::custom(
455                        "Crossbeam channel found in multiprocess mode!",
456                    ));
457                } // We know everything is in one address-space, so we can "serialize" the receiver by
458                // sending a leaked Box pointer.
459                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
460                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
461            },
462        }
463    }
464}
465
466struct GenericReceiverVisitor<T> {
467    marker: PhantomData<T>,
468}
469impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
470where
471    T: for<'a> Deserialize<'a> + Serialize,
472{
473    type Value = GenericReceiver<T>;
474
475    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
476        formatter.write_str("a GenericReceiver variant")
477    }
478
479    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
480    where
481        A: serde::de::EnumAccess<'de>,
482    {
483        #[derive(Deserialize)]
484        enum GenericReceiverVariantNames {
485            Ipc,
486            Crossbeam,
487        }
488
489        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
490
491        match variant_name {
492            GenericReceiverVariantNames::Ipc => variant_data
493                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
494                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
495            GenericReceiverVariantNames::Crossbeam => {
496                if use_ipc() {
497                    return Err(serde::de::Error::custom(
498                        "Crossbeam channel found in multiprocess mode!",
499                    ));
500                }
501                let addr = variant_data.newtype_variant::<usize>()?;
502                let ptr = addr as *mut RoutedReceiver<T>;
503                // SAFETY: We know we are in the same address space as the sender, so we can safely
504                // reconstruct the Box.
505                #[expect(unsafe_code)]
506                let receiver = unsafe { Box::from_raw(ptr) };
507                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
508                    *receiver,
509                )))
510            },
511        }
512    }
513}
514
515impl<'a, T> Deserialize<'a> for GenericReceiver<T>
516where
517    T: for<'de> Deserialize<'de> + Serialize,
518{
519    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
520    where
521        D: Deserializer<'a>,
522    {
523        d.deserialize_enum(
524            "GenericReceiver",
525            &["Ipc", "Crossbeam"],
526            GenericReceiverVisitor {
527                marker: PhantomData,
528            },
529        )
530    }
531}
532
533/// Private helper function to create a crossbeam based channel.
534///
535/// Do NOT make this function public!
536fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
537where
538    T: Serialize + for<'de> serde::Deserialize<'de>,
539{
540    let (tx, rx) = crossbeam_channel::unbounded();
541    (
542        GenericSender(GenericSenderVariants::Crossbeam(tx)),
543        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
544    )
545}
546
547fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
548where
549    T: Serialize + for<'de> serde::Deserialize<'de>,
550{
551    ipc_channel::ipc::channel().map(|(tx, rx)| {
552        (
553            GenericSender(GenericSenderVariants::Ipc(tx)),
554            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
555        )
556    })
557}
558
559/// Creates a Servo channel that can select different channel implementations based on multiprocess
560/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
561/// crossbeam channel is preferred.
562pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
563where
564    T: for<'de> Deserialize<'de> + Serialize,
565{
566    if use_ipc() {
567        new_generic_channel_ipc().ok()
568    } else {
569        Some(new_generic_channel_crossbeam())
570    }
571}
572
573#[cfg(test)]
574mod single_process_channel_tests {
575    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
576    //! can be sent over each other without problems in single-process mode.
577    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
578    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
579    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
580
581    #[test]
582    fn generic_crossbeam_can_send() {
583        let (tx, rx) = new_generic_channel_crossbeam();
584        tx.send(5).expect("Send failed");
585        let val = rx.recv().expect("Receive failed");
586        assert_eq!(val, 5);
587    }
588
589    #[test]
590    fn generic_crossbeam_ping_pong() {
591        let (tx, rx) = new_generic_channel_crossbeam();
592        let (tx2, rx2) = new_generic_channel_crossbeam();
593
594        tx.send(tx2).expect("Send failed");
595
596        std::thread::scope(|s| {
597            s.spawn(move || {
598                let reply_sender = rx.recv().expect("Receive failed");
599                reply_sender.send(42).expect("Sending reply failed");
600            });
601        });
602        let res = rx2.recv().expect("Receive of reply failed");
603        assert_eq!(res, 42);
604    }
605
606    #[test]
607    fn generic_ipc_ping_pong() {
608        let (tx, rx) = new_generic_channel_ipc().unwrap();
609        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
610
611        tx.send(tx2).expect("Send failed");
612
613        std::thread::scope(|s| {
614            s.spawn(move || {
615                let reply_sender = rx.recv().expect("Receive failed");
616                reply_sender.send(42).expect("Sending reply failed");
617            });
618        });
619        let res = rx2.recv().expect("Receive of reply failed");
620        assert_eq!(res, 42);
621    }
622
623    #[test]
624    fn send_crossbeam_sender_over_ipc_channel() {
625        let (tx, rx) = new_generic_channel_ipc().unwrap();
626        let (tx2, rx2) = new_generic_channel_crossbeam();
627
628        tx.send(tx2).expect("Send failed");
629
630        std::thread::scope(|s| {
631            s.spawn(move || {
632                let reply_sender = rx.recv().expect("Receive failed");
633                reply_sender.send(42).expect("Sending reply failed");
634            });
635        });
636        let res = rx2.recv().expect("Receive of reply failed");
637        assert_eq!(res, 42);
638    }
639
640    #[test]
641    fn send_generic_ipc_channel_over_crossbeam() {
642        let (tx, rx) = new_generic_channel_crossbeam();
643        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
644
645        tx.send(tx2).expect("Send failed");
646
647        std::thread::scope(|s| {
648            s.spawn(move || {
649                let reply_sender = rx.recv().expect("Receive failed");
650                reply_sender.send(42).expect("Sending reply failed");
651            });
652        });
653        let res = rx2.recv().expect("Receive of reply failed");
654        assert_eq!(res, 42);
655    }
656
657    #[test]
658    fn send_crossbeam_receiver_over_ipc_channel() {
659        let (tx, rx) = new_generic_channel_ipc().unwrap();
660        let (tx2, rx2) = new_generic_channel_crossbeam();
661
662        tx.send(rx2).expect("Send failed");
663        tx2.send(42).expect("Send failed");
664
665        std::thread::scope(|s| {
666            s.spawn(move || {
667                let another_receiver = rx.recv().expect("Receive failed");
668                let res = another_receiver.recv().expect("Receive failed");
669                assert_eq!(res, 42);
670            });
671        });
672    }
673
674    #[test]
675    fn test_timeout_ipc() {
676        let (tx, rx) = new_generic_channel_ipc().unwrap();
677        let timeout_duration = std::time::Duration::from_secs(3);
678        std::thread::spawn(move || {
679            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
680            assert!(tx.send(()).is_ok());
681        });
682        let received = rx.try_recv_timeout(timeout_duration);
683        assert!(received.is_ok());
684    }
685
686    #[test]
687    fn test_timeout_crossbeam() {
688        let (tx, rx) = new_generic_channel_crossbeam();
689        let timeout_duration = std::time::Duration::from_secs(3);
690        std::thread::spawn(move || {
691            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
692            assert!(tx.send(()).is_ok());
693        });
694        let received = rx.try_recv_timeout(timeout_duration);
695        assert!(received.is_ok());
696    }
697}
698
699/// This tests need to be in here because they use the 'new_generic_channel_..' methods
700#[cfg(test)]
701mod generic_receiversets_tests {
702    use std::time::Duration;
703
704    use crate::generic_channel::generic_channelset::{
705        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
706    };
707    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
708
709    #[test]
710    fn test_ipc_side1() {
711        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
712        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
713
714        // We keep the senders alive till all threads are done
715        let snd1_c = snd1.clone();
716        let snd2_c = snd2.clone();
717        let mut set = create_ipc_receiver_set();
718        let recv1_select_index = set.add(recv1);
719        let _recv2_select_index = set.add(recv2);
720
721        std::thread::spawn(move || {
722            snd1_c.send(10).unwrap();
723        });
724        std::thread::spawn(move || {
725            std::thread::sleep(Duration::from_secs(1));
726            let _ = snd2_c.send(20); // this might error with closed channel
727        });
728
729        let select_result = set.select();
730        let channel_result = select_result.first().unwrap();
731        assert_eq!(
732            *channel_result,
733            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
734        );
735    }
736
737    #[test]
738    fn test_ipc_side2() {
739        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
740        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
741
742        // We keep the senders alive till all threads are done
743        let snd1_c = snd1.clone();
744        let snd2_c = snd2.clone();
745        let mut set = create_ipc_receiver_set();
746        let _recv1_select_index = set.add(recv1);
747        let recv2_select_index = set.add(recv2);
748
749        std::thread::spawn(move || {
750            std::thread::sleep(Duration::from_secs(1));
751            let _ = snd1_c.send(10);
752        });
753        std::thread::spawn(move || {
754            snd2_c.send(20).unwrap();
755        });
756
757        let select_result = set.select();
758        let channel_result = select_result.first().unwrap();
759        assert_eq!(
760            *channel_result,
761            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
762        );
763    }
764
765    #[test]
766    fn test_crossbeam_side1() {
767        let (snd1, recv1) = new_generic_channel_crossbeam();
768        let (snd2, recv2) = new_generic_channel_crossbeam();
769
770        // We keep the senders alive till all threads are done
771        let snd1_c = snd1.clone();
772        let snd2_c = snd2.clone();
773        let mut set = create_crossbeam_receiver_set();
774        let recv1_select_index = set.add(recv1);
775        let _recv2_select_index = set.add(recv2);
776
777        std::thread::spawn(move || {
778            snd1_c.send(10).unwrap();
779        });
780        std::thread::spawn(move || {
781            std::thread::sleep(Duration::from_secs(2));
782            let _ = snd2_c.send(20);
783        });
784
785        let select_result = set.select();
786        let channel_result = select_result.first().unwrap();
787        assert_eq!(
788            *channel_result,
789            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
790        );
791    }
792
793    #[test]
794    fn test_crossbeam_side2() {
795        let (snd1, recv1) = new_generic_channel_crossbeam();
796        let (snd2, recv2) = new_generic_channel_crossbeam();
797
798        // We keep the senders alive till all threads are done
799        let snd1_c = snd1.clone();
800        let snd2_c = snd2.clone();
801        let mut set = create_crossbeam_receiver_set();
802        let _recv1_select_index = set.add(recv1);
803        let recv2_select_index = set.add(recv2);
804
805        std::thread::spawn(move || {
806            std::thread::sleep(Duration::from_secs(2));
807            let _ = snd1_c.send(10);
808        });
809        std::thread::spawn(move || {
810            snd2_c.send(20).unwrap();
811        });
812
813        let select_result = set.select();
814        let channel_result = select_result.first().unwrap();
815        assert_eq!(
816            *channel_result,
817            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
818        );
819    }
820
821    #[test]
822    fn test_ipc_no_crash_on_disconnect() {
823        // Test that we do not crash if a channel gets disconnected.
824        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
825        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
826        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
827
828        // We keep the senders alive till all threads are done
829        let snd1_c = snd1.clone();
830        let mut set = create_ipc_receiver_set();
831        let _recv1_select_index = set.add(recv1);
832        let recv2_select_index = set.add(recv2);
833
834        std::thread::spawn(move || {
835            std::thread::sleep(Duration::from_secs(2));
836            let _ = snd1_c.send(10);
837        });
838        std::thread::spawn(move || {
839            snd2.send(20).unwrap();
840        });
841        std::thread::sleep(Duration::from_secs(1));
842        let select_result = set.select();
843        let channel_result = select_result.first().unwrap();
844        assert_eq!(
845            *channel_result,
846            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
847        );
848    }
849
850    #[test]
851    fn test_crossbeam_no_crash_on_disconnect() {
852        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
853        let (snd1, recv1) = new_generic_channel_crossbeam();
854        let (snd2, recv2) = new_generic_channel_crossbeam();
855
856        // We keep the senders alive till all threads are done
857        let snd1_c = snd1.clone();
858        let mut set = create_crossbeam_receiver_set();
859        let _recv1_select_index = set.add(recv1);
860        let recv2_select_index = set.add(recv2);
861
862        std::thread::spawn(move || {
863            std::thread::sleep(Duration::from_secs(2));
864            let _ = snd1_c.send(10);
865        });
866        std::thread::spawn(move || {
867            snd2.send(20).unwrap();
868        });
869        std::thread::sleep(Duration::from_secs(1));
870        let select_result = set.select();
871        let channel_result = select_result.first().unwrap();
872        assert_eq!(
873            *channel_result,
874            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
875        );
876    }
877
878    #[test]
879    fn test_ipc_disconnect_correct_message() {
880        // Test that we do not crash if a channel gets disconnected.
881        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
882        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
883
884        // We keep the senders alive till all threads are done
885        let snd1_c = snd1.clone();
886        let mut set = create_ipc_receiver_set();
887        let _recv1_select_index = set.add(recv1);
888        let recv2_select_index = set.add(recv2);
889
890        std::thread::spawn(move || {
891            std::thread::sleep(Duration::from_secs(2));
892            let _ = snd1_c.send(10);
893        });
894        std::thread::spawn(move || {
895            drop(snd2);
896        });
897
898        let select_result = set.select();
899        let channel_result = select_result.first().unwrap();
900        assert_eq!(
901            *channel_result,
902            GenericSelectionResult::ChannelClosed(recv2_select_index)
903        );
904    }
905
906    #[test]
907    fn test_crossbeam_disconnect_correct_messaget() {
908        let (snd1, recv1) = new_generic_channel_crossbeam();
909        let (snd2, recv2) = new_generic_channel_crossbeam();
910
911        // We keep the senders alive till all threads are done
912        let snd1_c = snd1.clone();
913        let mut set = create_crossbeam_receiver_set();
914        let _recv1_select_index = set.add(recv1);
915        let recv2_select_index = set.add(recv2);
916
917        std::thread::spawn(move || {
918            std::thread::sleep(Duration::from_secs(2));
919            let _ = snd1_c.send(10);
920        });
921        std::thread::spawn(move || {
922            drop(snd2);
923        });
924
925        let select_result = set.select();
926        let channel_result = select_result.first().unwrap();
927        assert_eq!(
928            *channel_result,
929            GenericSelectionResult::ChannelClosed(recv2_select_index)
930        );
931    }
932}