Skip to main content

servo_base/generic_channel/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Enum wrappers to be able to select different channel implementations at runtime.
6
7use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::panic::Location;
11use std::sync::OnceLock;
12use std::time::Duration;
13
14use crossbeam_channel::RecvTimeoutError;
15use ipc_channel::IpcError;
16use ipc_channel::router::ROUTER;
17use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
18use malloc_size_of_derive::MallocSizeOf;
19use serde::de::VariantAccess;
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use servo_config::opts;
22
23mod callback;
24pub use callback::GenericCallback;
25mod lazy_callback;
26pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
27mod oneshot;
28mod shared_memory;
29pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
30pub use shared_memory::GenericSharedMemory;
31mod generic_channelset;
32pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
33mod buffered;
34pub use buffered::GenericBufferedSender;
35
36/// Cache for being in Ipc Mode
37static USE_IPC: OnceLock<bool> = OnceLock::new();
38
39/// Return if we should be in IPC Mode
40fn use_ipc() -> bool {
41    *USE_IPC.get_or_init(|| {
42        servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
43    })
44}
45
46/// Abstraction of the ability to send a particular type of message cross-process.
47/// This can be used to ease the use of GenericSender sub-fields.
48pub trait GenericSend<T>
49where
50    T: serde::Serialize + for<'de> serde::Deserialize<'de>,
51{
52    /// send message T
53    fn send(&self, _: T) -> SendResult;
54
55    /// Send a message T and log any error (instead of returning it).
56    ///
57    /// In cases where channel closure is possible (because the receiver does not exist anymore),
58    /// this convenience method can be used to ignore the result and log the error as a warning.
59    #[track_caller]
60    fn send_or_warn(&self, message: T) {
61        if let Err(error) = self.send(message) {
62            let location = Location::caller();
63            log::warn!("Failed to send msg due to `{error}` at {location:?}");
64        }
65    }
66
67    /// Send a message T and ignore the result
68    ///
69    /// In cases where channel closure is expected to happen intermittently, and the sender
70    /// doesn't care about the result, this is a short form for `let _ = GenericSend::send();`,
71    /// which makes the intent clearer.
72    fn send_or_ignore(&self, message: T) {
73        let _ = self.send(message);
74    }
75
76    /// get underlying sender
77    fn sender(&self) -> GenericSender<T>;
78}
79
80/// A GenericSender that sends messages to a [GenericReceiver].
81///
82/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
83pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
84
85/// The actual GenericSender variant.
86///
87/// This enum is private, so that outside code can't construct a GenericSender itself.
88/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
89enum GenericSenderVariants<T: Serialize> {
90    Ipc(ipc_channel::ipc::IpcSender<T>),
91    /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
92    /// which propagates the IPC error, the inner type is a Result.
93    /// In the IPC case, the Router deserializes the message, which can fail, and sends
94    /// the result to a crossbeam receiver.
95    /// The crossbeam channel does not involve serializing, so we can't have this error,
96    /// but replicating the API allows us to have one channel type as the receiver
97    /// after routing the receiver .
98    Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
99}
100
101fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
102    value: &GenericSenderVariants<T>,
103    s: S,
104) -> Result<S::Ok, S::Error> {
105    match value {
106        GenericSenderVariants::Ipc(sender) => {
107            s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
108        },
109        // All GenericSenders will be IPC channels in multi-process mode, so sending a
110        // GenericChannel over existing IPC channels is no problem and won't fail.
111        // In single-process mode, we can also send GenericSenders over other GenericSenders
112        // just fine, since no serialization is required.
113        // The only reason we need / want serialization is to support sending GenericSenders
114        // over existing IPC channels **in single process mode**. This allows us to
115        // incrementally port channels to the GenericChannel, without needing to follow a
116        // top-to-bottom approach.
117        // Long-term we can remove this branch in the code again and replace it with
118        // unreachable, since likely all IPC channels would be GenericChannels.
119        GenericSenderVariants::Crossbeam(sender) => {
120            if opts::get().multiprocess {
121                return Err(serde::ser::Error::custom(
122                    "Crossbeam channel found in multiprocess mode!",
123                ));
124            } // We know everything is in one address-space, so we can "serialize" the sender by
125            // sending a leaked Box pointer.
126            let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
127            s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
128        },
129    }
130}
131
132impl<T: Serialize> Serialize for GenericSender<T> {
133    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
134        serialize_generic_sender_variants(&self.0, s)
135    }
136}
137
138struct GenericSenderVisitor<T> {
139    marker: PhantomData<T>,
140}
141
142impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
143    type Value = GenericSenderVariants<T>;
144
145    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
146        formatter.write_str("a GenericSender variant")
147    }
148
149    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
150    where
151        A: serde::de::EnumAccess<'de>,
152    {
153        #[derive(Deserialize)]
154        enum GenericSenderVariantNames {
155            Ipc,
156            Crossbeam,
157        }
158
159        let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
160
161        match variant_name {
162            GenericSenderVariantNames::Ipc => variant_data
163                .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
164                .map(|sender| GenericSenderVariants::Ipc(sender)),
165            GenericSenderVariantNames::Crossbeam => {
166                if opts::get().multiprocess {
167                    return Err(serde::de::Error::custom(
168                        "Crossbeam channel found in multiprocess mode!",
169                    ));
170                }
171                let addr = variant_data.newtype_variant::<usize>()?;
172                let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
173                // SAFETY: We know we are in the same address space as the sender, so we can safely
174                // reconstruct the Box.
175                #[expect(unsafe_code)]
176                let sender = unsafe { Box::from_raw(ptr) };
177                Ok(GenericSenderVariants::Crossbeam(*sender))
178            },
179        }
180    }
181}
182
183impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
184    fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
185    where
186        D: Deserializer<'a>,
187    {
188        d.deserialize_enum(
189            "GenericSender",
190            &["Ipc", "Crossbeam"],
191            GenericSenderVisitor {
192                marker: PhantomData,
193            },
194        )
195        .map(|variant| GenericSender(variant))
196    }
197}
198
199impl<T> Clone for GenericSender<T>
200where
201    T: Serialize,
202{
203    fn clone(&self) -> Self {
204        match &self.0 {
205            GenericSenderVariants::Ipc(chan) => {
206                GenericSender(GenericSenderVariants::Ipc(chan.clone()))
207            },
208            GenericSenderVariants::Crossbeam(chan) => {
209                GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
210            },
211        }
212    }
213}
214
215impl<T: Serialize> fmt::Debug for GenericSender<T> {
216    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217        write!(f, "Sender(..)")
218    }
219}
220
221impl<T: Serialize> GenericSender<T> {
222    #[inline]
223    pub fn send(&self, msg: T) -> SendResult {
224        match &self.0 {
225            GenericSenderVariants::Ipc(sender) => sender
226                .send(msg)
227                .map_err(|e| SendError::SerializationError(e.to_string())),
228            GenericSenderVariants::Crossbeam(sender) => {
229                sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
230            },
231        }
232    }
233
234    /// Send a message T and log any error (instead of returning it).
235    ///
236    /// In cases where channel closure is possible (because the receiver does not exist anymore),
237    /// this convenience method can be used to ignore the result and log the error as a warning.
238    #[inline]
239    pub fn send_or_warn(&self, msg: T) {
240        if let Err(error) = self.send(msg) {
241            let location = Location::caller();
242            log::warn!("Failed to send msg due to `{error}` at {location:?}");
243        }
244    }
245
246    /// Send a message T and ignore the result
247    ///
248    /// In cases where channel closure is expected to happen intermittently, and the sender
249    /// doesn't care about the result, this is a short form for `let _ = GenericSender::send();`,
250    /// which makes the intent clearer.
251    #[inline]
252    pub fn send_or_ignore(&self, msg: T) {
253        let _ = self.send(msg);
254    }
255}
256
257impl<T: Serialize> MallocSizeOf for GenericSender<T> {
258    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
259        match &self.0 {
260            GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
261            GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
262        }
263    }
264}
265
266#[derive(Debug)]
267pub enum SendError {
268    Disconnected,
269    SerializationError(String),
270}
271
272impl From<IpcError> for SendError {
273    fn from(value: IpcError) -> Self {
274        match value {
275            IpcError::SerializationError(ser_de_error) => {
276                SendError::SerializationError(ser_de_error.to_string())
277            },
278            IpcError::Io(error) => {
279                log::error!("IO Error in ipc {:?}", error);
280                SendError::Disconnected
281            },
282            IpcError::Disconnected => SendError::Disconnected,
283        }
284    }
285}
286
287impl Display for SendError {
288    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289        write!(f, "{self:?}")
290    }
291}
292
293pub type SendResult = Result<(), SendError>;
294
295#[derive(Debug)]
296pub enum ReceiveError {
297    DeserializationFailed(String),
298    /// Io Error. May occur when using IPC.
299    Io(std::io::Error),
300    /// The channel was closed.
301    Disconnected,
302}
303
304impl From<IpcError> for ReceiveError {
305    fn from(e: IpcError) -> Self {
306        match e {
307            IpcError::Disconnected => ReceiveError::Disconnected,
308            IpcError::Io(reason) => ReceiveError::Io(reason),
309            IpcError::SerializationError(ser_de_error) => {
310                ReceiveError::DeserializationFailed(ser_de_error.to_string())
311            },
312        }
313    }
314}
315
316impl From<crossbeam_channel::RecvError> for ReceiveError {
317    fn from(_: crossbeam_channel::RecvError) -> Self {
318        ReceiveError::Disconnected
319    }
320}
321
322impl fmt::Display for ReceiveError {
323    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
324        match *self {
325            ReceiveError::DeserializationFailed(ref error) => {
326                write!(fmt, "deserialization error: {error}")
327            },
328            ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
329            ReceiveError::Disconnected => write!(fmt, "disconnected"),
330        }
331    }
332}
333impl From<std::io::Error> for ReceiveError {
334    fn from(value: std::io::Error) -> Self {
335        ReceiveError::Io(value)
336    }
337}
338
339pub enum TryReceiveError {
340    Empty,
341    ReceiveError(ReceiveError),
342}
343
344impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
345    fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
346        match value {
347            RecvTimeoutError::Timeout => TryReceiveError::Empty,
348            RecvTimeoutError::Disconnected => {
349                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
350            },
351        }
352    }
353}
354
355impl From<ipc_channel::TryRecvError> for TryReceiveError {
356    fn from(e: ipc_channel::TryRecvError) -> Self {
357        match e {
358            ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
359            ipc_channel::TryRecvError::IpcError(inner) => {
360                TryReceiveError::ReceiveError(inner.into())
361            },
362        }
363    }
364}
365
366impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
367    fn from(e: crossbeam_channel::TryRecvError) -> Self {
368        match e {
369            crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
370            crossbeam_channel::TryRecvError::Disconnected => {
371                TryReceiveError::ReceiveError(ReceiveError::Disconnected)
372            },
373        }
374    }
375}
376
377pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
378pub type ReceiveResult<T> = Result<T, ReceiveError>;
379pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
380pub type RoutedReceiverReceiveResult<T> =
381    Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
382
383pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
384    match receive_result {
385        Ok(Ok(msg)) => Ok(msg),
386        Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
387        Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
388    }
389}
390
391#[derive(MallocSizeOf)]
392pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
393where
394    T: for<'de> Deserialize<'de> + Serialize;
395
396impl<T> std::fmt::Debug for GenericReceiver<T>
397where
398    T: for<'de> Deserialize<'de> + Serialize,
399{
400    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401        f.debug_tuple("GenericReceiver").finish()
402    }
403}
404
405#[derive(MallocSizeOf)]
406enum GenericReceiverVariants<T>
407where
408    T: for<'de> Deserialize<'de> + Serialize,
409{
410    Ipc(ipc_channel::ipc::IpcReceiver<T>),
411    Crossbeam(RoutedReceiver<T>),
412}
413
414impl<T> GenericReceiver<T>
415where
416    T: for<'de> Deserialize<'de> + Serialize,
417{
418    #[inline]
419    pub fn recv(&self) -> ReceiveResult<T> {
420        match &self.0 {
421            GenericReceiverVariants::Ipc(receiver) => Ok(receiver.recv()?),
422            GenericReceiverVariants::Crossbeam(receiver) => {
423                // `recv()` returns an error if the channel is disconnected
424                let msg = receiver.recv()?;
425                // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
426                // unconditionally send an `Ok(T)`
427                Ok(msg.expect("Infallible"))
428            },
429        }
430    }
431
432    #[inline]
433    pub fn try_recv(&self) -> TryReceiveResult<T> {
434        match &self.0 {
435            GenericReceiverVariants::Ipc(receiver) => Ok(receiver.try_recv()?),
436            GenericReceiverVariants::Crossbeam(receiver) => {
437                let msg = receiver.try_recv()?;
438                Ok(msg.expect("Infallible"))
439            },
440        }
441    }
442
443    /// Blocks up to the specific duration attempting to receive a message.
444    #[inline]
445    pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
446        match &self.0 {
447            GenericReceiverVariants::Ipc(ipc_receiver) => {
448                ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
449            },
450            GenericReceiverVariants::Crossbeam(receiver) => match receiver.recv_timeout(timeout) {
451                Ok(Ok(value)) => Ok(value),
452                Ok(Err(_)) => unreachable!("Infallable"),
453                Err(RecvTimeoutError::Disconnected) => {
454                    Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
455                },
456                Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
457            },
458        }
459    }
460
461    /// Route to a crossbeam receiver, preserving any errors.
462    ///
463    /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
464    /// this creates a route.
465    #[inline]
466    pub fn route_preserving_errors(self) -> RoutedReceiver<T>
467    where
468        T: Send + 'static,
469    {
470        match self.0 {
471            GenericReceiverVariants::Ipc(ipc_receiver) => {
472                let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
473                let crossbeam_sender_clone = crossbeam_sender;
474                ROUTER.add_typed_route(
475                    ipc_receiver,
476                    Box::new(move |message| {
477                        let _ = crossbeam_sender_clone
478                            .send(message.map_err(IpcError::SerializationError));
479                    }),
480                );
481                crossbeam_receiver
482            },
483            GenericReceiverVariants::Crossbeam(receiver) => receiver,
484        }
485    }
486}
487
488impl<T> Serialize for GenericReceiver<T>
489where
490    T: for<'de> Deserialize<'de> + Serialize,
491{
492    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
493        match &self.0 {
494            GenericReceiverVariants::Ipc(receiver) => {
495                s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
496            },
497            GenericReceiverVariants::Crossbeam(receiver) => {
498                if opts::get().multiprocess {
499                    return Err(serde::ser::Error::custom(
500                        "Crossbeam channel found in multiprocess mode!",
501                    ));
502                } // We know everything is in one address-space, so we can "serialize" the receiver by
503                // sending a leaked Box pointer.
504                let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
505                s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
506            },
507        }
508    }
509}
510
511struct GenericReceiverVisitor<T> {
512    marker: PhantomData<T>,
513}
514impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
515where
516    T: for<'a> Deserialize<'a> + Serialize,
517{
518    type Value = GenericReceiver<T>;
519
520    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
521        formatter.write_str("a GenericReceiver variant")
522    }
523
524    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
525    where
526        A: serde::de::EnumAccess<'de>,
527    {
528        #[derive(Deserialize)]
529        enum GenericReceiverVariantNames {
530            Ipc,
531            Crossbeam,
532        }
533
534        let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
535
536        match variant_name {
537            GenericReceiverVariantNames::Ipc => variant_data
538                .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
539                .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
540            GenericReceiverVariantNames::Crossbeam => {
541                if use_ipc() {
542                    return Err(serde::de::Error::custom(
543                        "Crossbeam channel found in multiprocess mode!",
544                    ));
545                }
546                let addr = variant_data.newtype_variant::<usize>()?;
547                let ptr = addr as *mut RoutedReceiver<T>;
548                // SAFETY: We know we are in the same address space as the sender, so we can safely
549                // reconstruct the Box.
550                #[expect(unsafe_code)]
551                let receiver = unsafe { Box::from_raw(ptr) };
552                Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
553                    *receiver,
554                )))
555            },
556        }
557    }
558}
559
560impl<'a, T> Deserialize<'a> for GenericReceiver<T>
561where
562    T: for<'de> Deserialize<'de> + Serialize,
563{
564    fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
565    where
566        D: Deserializer<'a>,
567    {
568        d.deserialize_enum(
569            "GenericReceiver",
570            &["Ipc", "Crossbeam"],
571            GenericReceiverVisitor {
572                marker: PhantomData,
573            },
574        )
575    }
576}
577
578/// Private helper function to create a crossbeam based channel.
579///
580/// Do NOT make this function public!
581fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
582where
583    T: Serialize + for<'de> serde::Deserialize<'de>,
584{
585    let (tx, rx) = crossbeam_channel::unbounded();
586    (
587        GenericSender(GenericSenderVariants::Crossbeam(tx)),
588        GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
589    )
590}
591
592fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
593where
594    T: Serialize + for<'de> serde::Deserialize<'de>,
595{
596    ipc_channel::ipc::channel().map(|(tx, rx)| {
597        (
598            GenericSender(GenericSenderVariants::Ipc(tx)),
599            GenericReceiver(GenericReceiverVariants::Ipc(rx)),
600        )
601    })
602}
603
604/// Creates a Servo channel that can select different channel implementations based on multiprocess
605/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
606/// crossbeam channel is preferred.
607pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
608where
609    T: for<'de> Deserialize<'de> + Serialize,
610{
611    if use_ipc() {
612        new_generic_channel_ipc().ok()
613    } else {
614        Some(new_generic_channel_crossbeam())
615    }
616}
617
618#[cfg(test)]
619mod single_process_channel_tests {
620    //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
621    //! can be sent over each other without problems in single-process mode.
622    //! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
623    //! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
624    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
625
626    #[test]
627    fn generic_crossbeam_can_send() {
628        let (tx, rx) = new_generic_channel_crossbeam();
629        tx.send(5).expect("Send failed");
630        let val = rx.recv().expect("Receive failed");
631        assert_eq!(val, 5);
632    }
633
634    #[test]
635    fn generic_crossbeam_ping_pong() {
636        let (tx, rx) = new_generic_channel_crossbeam();
637        let (tx2, rx2) = new_generic_channel_crossbeam();
638
639        tx.send(tx2).expect("Send failed");
640
641        std::thread::scope(|s| {
642            s.spawn(move || {
643                let reply_sender = rx.recv().expect("Receive failed");
644                reply_sender.send(42).expect("Sending reply failed");
645            });
646        });
647        let res = rx2.recv().expect("Receive of reply failed");
648        assert_eq!(res, 42);
649    }
650
651    #[test]
652    fn generic_ipc_ping_pong() {
653        let (tx, rx) = new_generic_channel_ipc().unwrap();
654        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
655
656        tx.send(tx2).expect("Send failed");
657
658        std::thread::scope(|s| {
659            s.spawn(move || {
660                let reply_sender = rx.recv().expect("Receive failed");
661                reply_sender.send(42).expect("Sending reply failed");
662            });
663        });
664        let res = rx2.recv().expect("Receive of reply failed");
665        assert_eq!(res, 42);
666    }
667
668    #[test]
669    fn send_crossbeam_sender_over_ipc_channel() {
670        let (tx, rx) = new_generic_channel_ipc().unwrap();
671        let (tx2, rx2) = new_generic_channel_crossbeam();
672
673        tx.send(tx2).expect("Send failed");
674
675        std::thread::scope(|s| {
676            s.spawn(move || {
677                let reply_sender = rx.recv().expect("Receive failed");
678                reply_sender.send(42).expect("Sending reply failed");
679            });
680        });
681        let res = rx2.recv().expect("Receive of reply failed");
682        assert_eq!(res, 42);
683    }
684
685    #[test]
686    fn send_generic_ipc_channel_over_crossbeam() {
687        let (tx, rx) = new_generic_channel_crossbeam();
688        let (tx2, rx2) = new_generic_channel_ipc().unwrap();
689
690        tx.send(tx2).expect("Send failed");
691
692        std::thread::scope(|s| {
693            s.spawn(move || {
694                let reply_sender = rx.recv().expect("Receive failed");
695                reply_sender.send(42).expect("Sending reply failed");
696            });
697        });
698        let res = rx2.recv().expect("Receive of reply failed");
699        assert_eq!(res, 42);
700    }
701
702    #[test]
703    fn send_crossbeam_receiver_over_ipc_channel() {
704        let (tx, rx) = new_generic_channel_ipc().unwrap();
705        let (tx2, rx2) = new_generic_channel_crossbeam();
706
707        tx.send(rx2).expect("Send failed");
708        tx2.send(42).expect("Send failed");
709
710        std::thread::scope(|s| {
711            s.spawn(move || {
712                let another_receiver = rx.recv().expect("Receive failed");
713                let res = another_receiver.recv().expect("Receive failed");
714                assert_eq!(res, 42);
715            });
716        });
717    }
718
719    #[test]
720    fn test_timeout_ipc() {
721        let (tx, rx) = new_generic_channel_ipc().unwrap();
722        let timeout_duration = std::time::Duration::from_secs(3);
723        std::thread::spawn(move || {
724            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
725            assert!(tx.send(()).is_ok());
726        });
727        let received = rx.try_recv_timeout(timeout_duration);
728        assert!(received.is_ok());
729    }
730
731    #[test]
732    fn test_timeout_crossbeam() {
733        let (tx, rx) = new_generic_channel_crossbeam();
734        let timeout_duration = std::time::Duration::from_secs(3);
735        std::thread::spawn(move || {
736            std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
737            assert!(tx.send(()).is_ok());
738        });
739        let received = rx.try_recv_timeout(timeout_duration);
740        assert!(received.is_ok());
741    }
742}
743
744/// This tests need to be in here because they use the 'new_generic_channel_..' methods
745#[cfg(test)]
746mod generic_receiversets_tests {
747    use std::time::Duration;
748
749    use crate::generic_channel::generic_channelset::{
750        GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
751    };
752    use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
753
754    #[test]
755    fn test_ipc_side1() {
756        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
757        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
758
759        // We keep the senders alive till all threads are done
760        let snd1_c = snd1.clone();
761        let snd2_c = snd2.clone();
762        let mut set = create_ipc_receiver_set();
763        let recv1_select_index = set.add(recv1);
764        let _recv2_select_index = set.add(recv2);
765
766        std::thread::spawn(move || {
767            snd1_c.send(10).unwrap();
768        });
769        std::thread::spawn(move || {
770            std::thread::sleep(Duration::from_secs(1));
771            let _ = snd2_c.send(20); // this might error with closed channel
772        });
773
774        let select_result = set.select();
775        let channel_result = select_result.first().unwrap();
776        assert_eq!(
777            *channel_result,
778            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
779        );
780    }
781
782    #[test]
783    fn test_ipc_side2() {
784        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
785        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
786
787        // We keep the senders alive till all threads are done
788        let snd1_c = snd1.clone();
789        let snd2_c = snd2.clone();
790        let mut set = create_ipc_receiver_set();
791        let _recv1_select_index = set.add(recv1);
792        let recv2_select_index = set.add(recv2);
793
794        std::thread::spawn(move || {
795            std::thread::sleep(Duration::from_secs(1));
796            let _ = snd1_c.send(10);
797        });
798        std::thread::spawn(move || {
799            snd2_c.send(20).unwrap();
800        });
801
802        let select_result = set.select();
803        let channel_result = select_result.first().unwrap();
804        assert_eq!(
805            *channel_result,
806            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
807        );
808    }
809
810    #[test]
811    fn test_crossbeam_side1() {
812        let (snd1, recv1) = new_generic_channel_crossbeam();
813        let (snd2, recv2) = new_generic_channel_crossbeam();
814
815        // We keep the senders alive till all threads are done
816        let snd1_c = snd1.clone();
817        let snd2_c = snd2.clone();
818        let mut set = create_crossbeam_receiver_set();
819        let recv1_select_index = set.add(recv1);
820        let _recv2_select_index = set.add(recv2);
821
822        std::thread::spawn(move || {
823            snd1_c.send(10).unwrap();
824        });
825        std::thread::spawn(move || {
826            std::thread::sleep(Duration::from_secs(2));
827            let _ = snd2_c.send(20);
828        });
829
830        let select_result = set.select();
831        let channel_result = select_result.first().unwrap();
832        assert_eq!(
833            *channel_result,
834            GenericSelectionResult::MessageReceived(recv1_select_index, 10)
835        );
836    }
837
838    #[test]
839    fn test_crossbeam_side2() {
840        let (snd1, recv1) = new_generic_channel_crossbeam();
841        let (snd2, recv2) = new_generic_channel_crossbeam();
842
843        // We keep the senders alive till all threads are done
844        let snd1_c = snd1.clone();
845        let snd2_c = snd2.clone();
846        let mut set = create_crossbeam_receiver_set();
847        let _recv1_select_index = set.add(recv1);
848        let recv2_select_index = set.add(recv2);
849
850        std::thread::spawn(move || {
851            std::thread::sleep(Duration::from_secs(2));
852            let _ = snd1_c.send(10);
853        });
854        std::thread::spawn(move || {
855            snd2_c.send(20).unwrap();
856        });
857
858        let select_result = set.select();
859        let channel_result = select_result.first().unwrap();
860        assert_eq!(
861            *channel_result,
862            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
863        );
864    }
865
866    #[test]
867    fn test_ipc_no_crash_on_disconnect() {
868        // Test that we do not crash if a channel gets disconnected.
869        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
870        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
871        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
872
873        // We keep the senders alive till all threads are done
874        let snd1_c = snd1.clone();
875        let mut set = create_ipc_receiver_set();
876        let _recv1_select_index = set.add(recv1);
877        let recv2_select_index = set.add(recv2);
878
879        std::thread::spawn(move || {
880            std::thread::sleep(Duration::from_secs(2));
881            let _ = snd1_c.send(10);
882        });
883        std::thread::spawn(move || {
884            snd2.send(20).unwrap();
885        });
886        std::thread::sleep(Duration::from_secs(1));
887        let select_result = set.select();
888        let channel_result = select_result.first().unwrap();
889        assert_eq!(
890            *channel_result,
891            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
892        );
893    }
894
895    #[test]
896    fn test_crossbeam_no_crash_on_disconnect() {
897        // Channel 2 gets disconnected because snd2 gets moved into the thread and then falls out of scope
898        let (snd1, recv1) = new_generic_channel_crossbeam();
899        let (snd2, recv2) = new_generic_channel_crossbeam();
900
901        // We keep the senders alive till all threads are done
902        let snd1_c = snd1.clone();
903        let mut set = create_crossbeam_receiver_set();
904        let _recv1_select_index = set.add(recv1);
905        let recv2_select_index = set.add(recv2);
906
907        std::thread::spawn(move || {
908            std::thread::sleep(Duration::from_secs(2));
909            let _ = snd1_c.send(10);
910        });
911        std::thread::spawn(move || {
912            snd2.send(20).unwrap();
913        });
914        std::thread::sleep(Duration::from_secs(1));
915        let select_result = set.select();
916        let channel_result = select_result.first().unwrap();
917        assert_eq!(
918            *channel_result,
919            GenericSelectionResult::MessageReceived(recv2_select_index, 20)
920        );
921    }
922
923    #[test]
924    fn test_ipc_disconnect_correct_message() {
925        // Test that we do not crash if a channel gets disconnected.
926        let (snd1, recv1) = new_generic_channel_ipc().unwrap();
927        let (snd2, recv2) = new_generic_channel_ipc().unwrap();
928
929        // We keep the senders alive till all threads are done
930        let snd1_c = snd1.clone();
931        let mut set = create_ipc_receiver_set();
932        let _recv1_select_index = set.add(recv1);
933        let recv2_select_index = set.add(recv2);
934
935        std::thread::spawn(move || {
936            std::thread::sleep(Duration::from_secs(2));
937            let _ = snd1_c.send(10);
938        });
939        std::thread::spawn(move || {
940            drop(snd2);
941        });
942
943        let select_result = set.select();
944        let channel_result = select_result.first().unwrap();
945        assert_eq!(
946            *channel_result,
947            GenericSelectionResult::ChannelClosed(recv2_select_index)
948        );
949    }
950
951    #[test]
952    fn test_crossbeam_disconnect_correct_messaget() {
953        let (snd1, recv1) = new_generic_channel_crossbeam();
954        let (snd2, recv2) = new_generic_channel_crossbeam();
955
956        // We keep the senders alive till all threads are done
957        let snd1_c = snd1.clone();
958        let mut set = create_crossbeam_receiver_set();
959        let _recv1_select_index = set.add(recv1);
960        let recv2_select_index = set.add(recv2);
961
962        std::thread::spawn(move || {
963            std::thread::sleep(Duration::from_secs(2));
964            let _ = snd1_c.send(10);
965        });
966        std::thread::spawn(move || {
967            drop(snd2);
968        });
969
970        let select_result = set.select();
971        let channel_result = select_result.first().unwrap();
972        assert_eq!(
973            *channel_result,
974            GenericSelectionResult::ChannelClosed(recv2_select_index)
975        );
976    }
977}