1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod lazy_callback;
25pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
26mod oneshot;
27mod shared_memory;
28pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
29pub use shared_memory::GenericSharedMemory;
30mod generic_channelset;
31pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
32
33static USE_IPC: OnceLock<bool> = OnceLock::new();
35
36fn use_ipc() -> bool {
38 *USE_IPC.get_or_init(|| {
39 servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
40 })
41}
42
43pub trait GenericSend<T>
46where
47 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
48{
49 fn send(&self, _: T) -> SendResult;
51 fn sender(&self) -> GenericSender<T>;
53}
54
55pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
59
60enum GenericSenderVariants<T: Serialize> {
65 Ipc(ipc_channel::ipc::IpcSender<T>),
66 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
74}
75
76fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
77 value: &GenericSenderVariants<T>,
78 s: S,
79) -> Result<S::Ok, S::Error> {
80 match value {
81 GenericSenderVariants::Ipc(sender) => {
82 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
83 },
84 GenericSenderVariants::Crossbeam(sender) => {
95 if opts::get().multiprocess {
96 return Err(serde::ser::Error::custom(
97 "Crossbeam channel found in multiprocess mode!",
98 ));
99 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
102 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
103 },
104 }
105}
106
107impl<T: Serialize> Serialize for GenericSender<T> {
108 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
109 serialize_generic_sender_variants(&self.0, s)
110 }
111}
112
113struct GenericSenderVisitor<T> {
114 marker: PhantomData<T>,
115}
116
117impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
118 type Value = GenericSenderVariants<T>;
119
120 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121 formatter.write_str("a GenericSender variant")
122 }
123
124 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
125 where
126 A: serde::de::EnumAccess<'de>,
127 {
128 #[derive(Deserialize)]
129 enum GenericSenderVariantNames {
130 Ipc,
131 Crossbeam,
132 }
133
134 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
135
136 match variant_name {
137 GenericSenderVariantNames::Ipc => variant_data
138 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
139 .map(|sender| GenericSenderVariants::Ipc(sender)),
140 GenericSenderVariantNames::Crossbeam => {
141 if opts::get().multiprocess {
142 return Err(serde::de::Error::custom(
143 "Crossbeam channel found in multiprocess mode!",
144 ));
145 }
146 let addr = variant_data.newtype_variant::<usize>()?;
147 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
148 #[expect(unsafe_code)]
151 let sender = unsafe { Box::from_raw(ptr) };
152 Ok(GenericSenderVariants::Crossbeam(*sender))
153 },
154 }
155 }
156}
157
158impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
159 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
160 where
161 D: Deserializer<'a>,
162 {
163 d.deserialize_enum(
164 "GenericSender",
165 &["Ipc", "Crossbeam"],
166 GenericSenderVisitor {
167 marker: PhantomData,
168 },
169 )
170 .map(|variant| GenericSender(variant))
171 }
172}
173
174impl<T> Clone for GenericSender<T>
175where
176 T: Serialize,
177{
178 fn clone(&self) -> Self {
179 match &self.0 {
180 GenericSenderVariants::Ipc(chan) => {
181 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
182 },
183 GenericSenderVariants::Crossbeam(chan) => {
184 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
185 },
186 }
187 }
188}
189
190impl<T: Serialize> fmt::Debug for GenericSender<T> {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 write!(f, "Sender(..)")
193 }
194}
195
196impl<T: Serialize> GenericSender<T> {
197 #[inline]
198 pub fn send(&self, msg: T) -> SendResult {
199 match &self.0 {
200 GenericSenderVariants::Ipc(sender) => sender
201 .send(msg)
202 .map_err(|e| SendError::SerializationError(e.to_string())),
203 GenericSenderVariants::Crossbeam(sender) => {
204 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
205 },
206 }
207 }
208}
209
210impl<T: Serialize> MallocSizeOf for GenericSender<T> {
211 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
212 match &self.0 {
213 GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
214 GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
215 }
216 }
217}
218
219#[derive(Debug)]
220pub enum SendError {
221 Disconnected,
222 SerializationError(String),
223}
224
225impl From<IpcError> for SendError {
226 fn from(value: IpcError) -> Self {
227 match value {
228 IpcError::SerializationError(ser_de_error) => {
229 SendError::SerializationError(ser_de_error.to_string())
230 },
231 IpcError::Io(error) => {
232 log::error!("IO Error in ipc {:?}", error);
233 SendError::Disconnected
234 },
235 IpcError::Disconnected => SendError::Disconnected,
236 }
237 }
238}
239
240impl Display for SendError {
241 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242 write!(f, "{self:?}")
243 }
244}
245
246pub type SendResult = Result<(), SendError>;
247
248#[derive(Debug)]
249pub enum ReceiveError {
250 DeserializationFailed(String),
251 Io(std::io::Error),
253 Disconnected,
255}
256
257impl From<IpcError> for ReceiveError {
258 fn from(e: IpcError) -> Self {
259 match e {
260 IpcError::Disconnected => ReceiveError::Disconnected,
261 IpcError::Io(reason) => ReceiveError::Io(reason),
262 IpcError::SerializationError(ser_de_error) => {
263 ReceiveError::DeserializationFailed(ser_de_error.to_string())
264 },
265 }
266 }
267}
268
269impl From<crossbeam_channel::RecvError> for ReceiveError {
270 fn from(_: crossbeam_channel::RecvError) -> Self {
271 ReceiveError::Disconnected
272 }
273}
274
275impl fmt::Display for ReceiveError {
276 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
277 match *self {
278 ReceiveError::DeserializationFailed(ref error) => {
279 write!(fmt, "deserialization error: {error}")
280 },
281 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
282 ReceiveError::Disconnected => write!(fmt, "disconnected"),
283 }
284 }
285}
286impl From<std::io::Error> for ReceiveError {
287 fn from(value: std::io::Error) -> Self {
288 ReceiveError::Io(value)
289 }
290}
291
292pub enum TryReceiveError {
293 Empty,
294 ReceiveError(ReceiveError),
295}
296
297impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
298 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
299 match value {
300 RecvTimeoutError::Timeout => TryReceiveError::Empty,
301 RecvTimeoutError::Disconnected => {
302 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
303 },
304 }
305 }
306}
307
308impl From<ipc_channel::TryRecvError> for TryReceiveError {
309 fn from(e: ipc_channel::TryRecvError) -> Self {
310 match e {
311 ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
312 ipc_channel::TryRecvError::IpcError(inner) => {
313 TryReceiveError::ReceiveError(inner.into())
314 },
315 }
316 }
317}
318
319impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
320 fn from(e: crossbeam_channel::TryRecvError) -> Self {
321 match e {
322 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
323 crossbeam_channel::TryRecvError::Disconnected => {
324 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
325 },
326 }
327 }
328}
329
330pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
331pub type ReceiveResult<T> = Result<T, ReceiveError>;
332pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
333pub type RoutedReceiverReceiveResult<T> =
334 Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
335
336pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
337 match receive_result {
338 Ok(Ok(msg)) => Ok(msg),
339 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
340 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
341 }
342}
343
344#[derive(MallocSizeOf)]
345pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
346where
347 T: for<'de> Deserialize<'de> + Serialize;
348
349impl<T> std::fmt::Debug for GenericReceiver<T>
350where
351 T: for<'de> Deserialize<'de> + Serialize,
352{
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.debug_tuple("GenericReceiver").finish()
355 }
356}
357
358#[derive(MallocSizeOf)]
359enum GenericReceiverVariants<T>
360where
361 T: for<'de> Deserialize<'de> + Serialize,
362{
363 Ipc(ipc_channel::ipc::IpcReceiver<T>),
364 Crossbeam(RoutedReceiver<T>),
365}
366
367impl<T> GenericReceiver<T>
368where
369 T: for<'de> Deserialize<'de> + Serialize,
370{
371 #[inline]
372 pub fn recv(&self) -> ReceiveResult<T> {
373 match &self.0 {
374 GenericReceiverVariants::Ipc(receiver) => Ok(receiver.recv()?),
375 GenericReceiverVariants::Crossbeam(receiver) => {
376 let msg = receiver.recv()?;
378 Ok(msg.expect("Infallible"))
381 },
382 }
383 }
384
385 #[inline]
386 pub fn try_recv(&self) -> TryReceiveResult<T> {
387 match &self.0 {
388 GenericReceiverVariants::Ipc(receiver) => Ok(receiver.try_recv()?),
389 GenericReceiverVariants::Crossbeam(receiver) => {
390 let msg = receiver.try_recv()?;
391 Ok(msg.expect("Infallible"))
392 },
393 }
394 }
395
396 #[inline]
398 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
399 match &self.0 {
400 GenericReceiverVariants::Ipc(ipc_receiver) => {
401 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
402 },
403 GenericReceiverVariants::Crossbeam(receiver) => match receiver.recv_timeout(timeout) {
404 Ok(Ok(value)) => Ok(value),
405 Ok(Err(_)) => unreachable!("Infallable"),
406 Err(RecvTimeoutError::Disconnected) => {
407 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
408 },
409 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
410 },
411 }
412 }
413
414 #[inline]
419 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
420 where
421 T: Send + 'static,
422 {
423 match self.0 {
424 GenericReceiverVariants::Ipc(ipc_receiver) => {
425 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
426 let crossbeam_sender_clone = crossbeam_sender;
427 ROUTER.add_typed_route(
428 ipc_receiver,
429 Box::new(move |message| {
430 let _ = crossbeam_sender_clone
431 .send(message.map_err(IpcError::SerializationError));
432 }),
433 );
434 crossbeam_receiver
435 },
436 GenericReceiverVariants::Crossbeam(receiver) => receiver,
437 }
438 }
439}
440
441impl<T> Serialize for GenericReceiver<T>
442where
443 T: for<'de> Deserialize<'de> + Serialize,
444{
445 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
446 match &self.0 {
447 GenericReceiverVariants::Ipc(receiver) => {
448 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
449 },
450 GenericReceiverVariants::Crossbeam(receiver) => {
451 if opts::get().multiprocess {
452 return Err(serde::ser::Error::custom(
453 "Crossbeam channel found in multiprocess mode!",
454 ));
455 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
458 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
459 },
460 }
461 }
462}
463
464struct GenericReceiverVisitor<T> {
465 marker: PhantomData<T>,
466}
467impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
468where
469 T: for<'a> Deserialize<'a> + Serialize,
470{
471 type Value = GenericReceiver<T>;
472
473 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
474 formatter.write_str("a GenericReceiver variant")
475 }
476
477 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
478 where
479 A: serde::de::EnumAccess<'de>,
480 {
481 #[derive(Deserialize)]
482 enum GenericReceiverVariantNames {
483 Ipc,
484 Crossbeam,
485 }
486
487 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
488
489 match variant_name {
490 GenericReceiverVariantNames::Ipc => variant_data
491 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
492 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
493 GenericReceiverVariantNames::Crossbeam => {
494 if use_ipc() {
495 return Err(serde::de::Error::custom(
496 "Crossbeam channel found in multiprocess mode!",
497 ));
498 }
499 let addr = variant_data.newtype_variant::<usize>()?;
500 let ptr = addr as *mut RoutedReceiver<T>;
501 #[expect(unsafe_code)]
504 let receiver = unsafe { Box::from_raw(ptr) };
505 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
506 *receiver,
507 )))
508 },
509 }
510 }
511}
512
513impl<'a, T> Deserialize<'a> for GenericReceiver<T>
514where
515 T: for<'de> Deserialize<'de> + Serialize,
516{
517 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
518 where
519 D: Deserializer<'a>,
520 {
521 d.deserialize_enum(
522 "GenericReceiver",
523 &["Ipc", "Crossbeam"],
524 GenericReceiverVisitor {
525 marker: PhantomData,
526 },
527 )
528 }
529}
530
531fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
535where
536 T: Serialize + for<'de> serde::Deserialize<'de>,
537{
538 let (tx, rx) = crossbeam_channel::unbounded();
539 (
540 GenericSender(GenericSenderVariants::Crossbeam(tx)),
541 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
542 )
543}
544
545fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
546where
547 T: Serialize + for<'de> serde::Deserialize<'de>,
548{
549 ipc_channel::ipc::channel().map(|(tx, rx)| {
550 (
551 GenericSender(GenericSenderVariants::Ipc(tx)),
552 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
553 )
554 })
555}
556
557pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
561where
562 T: for<'de> Deserialize<'de> + Serialize,
563{
564 if use_ipc() {
565 new_generic_channel_ipc().ok()
566 } else {
567 Some(new_generic_channel_crossbeam())
568 }
569}
570
571#[cfg(test)]
572mod single_process_channel_tests {
573 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
578
579 #[test]
580 fn generic_crossbeam_can_send() {
581 let (tx, rx) = new_generic_channel_crossbeam();
582 tx.send(5).expect("Send failed");
583 let val = rx.recv().expect("Receive failed");
584 assert_eq!(val, 5);
585 }
586
587 #[test]
588 fn generic_crossbeam_ping_pong() {
589 let (tx, rx) = new_generic_channel_crossbeam();
590 let (tx2, rx2) = new_generic_channel_crossbeam();
591
592 tx.send(tx2).expect("Send failed");
593
594 std::thread::scope(|s| {
595 s.spawn(move || {
596 let reply_sender = rx.recv().expect("Receive failed");
597 reply_sender.send(42).expect("Sending reply failed");
598 });
599 });
600 let res = rx2.recv().expect("Receive of reply failed");
601 assert_eq!(res, 42);
602 }
603
604 #[test]
605 fn generic_ipc_ping_pong() {
606 let (tx, rx) = new_generic_channel_ipc().unwrap();
607 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
608
609 tx.send(tx2).expect("Send failed");
610
611 std::thread::scope(|s| {
612 s.spawn(move || {
613 let reply_sender = rx.recv().expect("Receive failed");
614 reply_sender.send(42).expect("Sending reply failed");
615 });
616 });
617 let res = rx2.recv().expect("Receive of reply failed");
618 assert_eq!(res, 42);
619 }
620
621 #[test]
622 fn send_crossbeam_sender_over_ipc_channel() {
623 let (tx, rx) = new_generic_channel_ipc().unwrap();
624 let (tx2, rx2) = new_generic_channel_crossbeam();
625
626 tx.send(tx2).expect("Send failed");
627
628 std::thread::scope(|s| {
629 s.spawn(move || {
630 let reply_sender = rx.recv().expect("Receive failed");
631 reply_sender.send(42).expect("Sending reply failed");
632 });
633 });
634 let res = rx2.recv().expect("Receive of reply failed");
635 assert_eq!(res, 42);
636 }
637
638 #[test]
639 fn send_generic_ipc_channel_over_crossbeam() {
640 let (tx, rx) = new_generic_channel_crossbeam();
641 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
642
643 tx.send(tx2).expect("Send failed");
644
645 std::thread::scope(|s| {
646 s.spawn(move || {
647 let reply_sender = rx.recv().expect("Receive failed");
648 reply_sender.send(42).expect("Sending reply failed");
649 });
650 });
651 let res = rx2.recv().expect("Receive of reply failed");
652 assert_eq!(res, 42);
653 }
654
655 #[test]
656 fn send_crossbeam_receiver_over_ipc_channel() {
657 let (tx, rx) = new_generic_channel_ipc().unwrap();
658 let (tx2, rx2) = new_generic_channel_crossbeam();
659
660 tx.send(rx2).expect("Send failed");
661 tx2.send(42).expect("Send failed");
662
663 std::thread::scope(|s| {
664 s.spawn(move || {
665 let another_receiver = rx.recv().expect("Receive failed");
666 let res = another_receiver.recv().expect("Receive failed");
667 assert_eq!(res, 42);
668 });
669 });
670 }
671
672 #[test]
673 fn test_timeout_ipc() {
674 let (tx, rx) = new_generic_channel_ipc().unwrap();
675 let timeout_duration = std::time::Duration::from_secs(3);
676 std::thread::spawn(move || {
677 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
678 assert!(tx.send(()).is_ok());
679 });
680 let received = rx.try_recv_timeout(timeout_duration);
681 assert!(received.is_ok());
682 }
683
684 #[test]
685 fn test_timeout_crossbeam() {
686 let (tx, rx) = new_generic_channel_crossbeam();
687 let timeout_duration = std::time::Duration::from_secs(3);
688 std::thread::spawn(move || {
689 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
690 assert!(tx.send(()).is_ok());
691 });
692 let received = rx.try_recv_timeout(timeout_duration);
693 assert!(received.is_ok());
694 }
695}
696
697#[cfg(test)]
699mod generic_receiversets_tests {
700 use std::time::Duration;
701
702 use crate::generic_channel::generic_channelset::{
703 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
704 };
705 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
706
707 #[test]
708 fn test_ipc_side1() {
709 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712 let snd1_c = snd1.clone();
714 let snd2_c = snd2.clone();
715 let mut set = create_ipc_receiver_set();
716 let recv1_select_index = set.add(recv1);
717 let _recv2_select_index = set.add(recv2);
718
719 std::thread::spawn(move || {
720 snd1_c.send(10).unwrap();
721 });
722 std::thread::spawn(move || {
723 std::thread::sleep(Duration::from_secs(1));
724 let _ = snd2_c.send(20); });
726
727 let select_result = set.select();
728 let channel_result = select_result.first().unwrap();
729 assert_eq!(
730 *channel_result,
731 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
732 );
733 }
734
735 #[test]
736 fn test_ipc_side2() {
737 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
738 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
739
740 let snd1_c = snd1.clone();
742 let snd2_c = snd2.clone();
743 let mut set = create_ipc_receiver_set();
744 let _recv1_select_index = set.add(recv1);
745 let recv2_select_index = set.add(recv2);
746
747 std::thread::spawn(move || {
748 std::thread::sleep(Duration::from_secs(1));
749 let _ = snd1_c.send(10);
750 });
751 std::thread::spawn(move || {
752 snd2_c.send(20).unwrap();
753 });
754
755 let select_result = set.select();
756 let channel_result = select_result.first().unwrap();
757 assert_eq!(
758 *channel_result,
759 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
760 );
761 }
762
763 #[test]
764 fn test_crossbeam_side1() {
765 let (snd1, recv1) = new_generic_channel_crossbeam();
766 let (snd2, recv2) = new_generic_channel_crossbeam();
767
768 let snd1_c = snd1.clone();
770 let snd2_c = snd2.clone();
771 let mut set = create_crossbeam_receiver_set();
772 let recv1_select_index = set.add(recv1);
773 let _recv2_select_index = set.add(recv2);
774
775 std::thread::spawn(move || {
776 snd1_c.send(10).unwrap();
777 });
778 std::thread::spawn(move || {
779 std::thread::sleep(Duration::from_secs(2));
780 let _ = snd2_c.send(20);
781 });
782
783 let select_result = set.select();
784 let channel_result = select_result.first().unwrap();
785 assert_eq!(
786 *channel_result,
787 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
788 );
789 }
790
791 #[test]
792 fn test_crossbeam_side2() {
793 let (snd1, recv1) = new_generic_channel_crossbeam();
794 let (snd2, recv2) = new_generic_channel_crossbeam();
795
796 let snd1_c = snd1.clone();
798 let snd2_c = snd2.clone();
799 let mut set = create_crossbeam_receiver_set();
800 let _recv1_select_index = set.add(recv1);
801 let recv2_select_index = set.add(recv2);
802
803 std::thread::spawn(move || {
804 std::thread::sleep(Duration::from_secs(2));
805 let _ = snd1_c.send(10);
806 });
807 std::thread::spawn(move || {
808 snd2_c.send(20).unwrap();
809 });
810
811 let select_result = set.select();
812 let channel_result = select_result.first().unwrap();
813 assert_eq!(
814 *channel_result,
815 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
816 );
817 }
818
819 #[test]
820 fn test_ipc_no_crash_on_disconnect() {
821 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
824 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
825
826 let snd1_c = snd1.clone();
828 let mut set = create_ipc_receiver_set();
829 let _recv1_select_index = set.add(recv1);
830 let recv2_select_index = set.add(recv2);
831
832 std::thread::spawn(move || {
833 std::thread::sleep(Duration::from_secs(2));
834 let _ = snd1_c.send(10);
835 });
836 std::thread::spawn(move || {
837 snd2.send(20).unwrap();
838 });
839 std::thread::sleep(Duration::from_secs(1));
840 let select_result = set.select();
841 let channel_result = select_result.first().unwrap();
842 assert_eq!(
843 *channel_result,
844 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845 );
846 }
847
848 #[test]
849 fn test_crossbeam_no_crash_on_disconnect() {
850 let (snd1, recv1) = new_generic_channel_crossbeam();
852 let (snd2, recv2) = new_generic_channel_crossbeam();
853
854 let snd1_c = snd1.clone();
856 let mut set = create_crossbeam_receiver_set();
857 let _recv1_select_index = set.add(recv1);
858 let recv2_select_index = set.add(recv2);
859
860 std::thread::spawn(move || {
861 std::thread::sleep(Duration::from_secs(2));
862 let _ = snd1_c.send(10);
863 });
864 std::thread::spawn(move || {
865 snd2.send(20).unwrap();
866 });
867 std::thread::sleep(Duration::from_secs(1));
868 let select_result = set.select();
869 let channel_result = select_result.first().unwrap();
870 assert_eq!(
871 *channel_result,
872 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
873 );
874 }
875
876 #[test]
877 fn test_ipc_disconnect_correct_message() {
878 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
880 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
881
882 let snd1_c = snd1.clone();
884 let mut set = create_ipc_receiver_set();
885 let _recv1_select_index = set.add(recv1);
886 let recv2_select_index = set.add(recv2);
887
888 std::thread::spawn(move || {
889 std::thread::sleep(Duration::from_secs(2));
890 let _ = snd1_c.send(10);
891 });
892 std::thread::spawn(move || {
893 drop(snd2);
894 });
895
896 let select_result = set.select();
897 let channel_result = select_result.first().unwrap();
898 assert_eq!(
899 *channel_result,
900 GenericSelectionResult::ChannelClosed(recv2_select_index)
901 );
902 }
903
904 #[test]
905 fn test_crossbeam_disconnect_correct_messaget() {
906 let (snd1, recv1) = new_generic_channel_crossbeam();
907 let (snd2, recv2) = new_generic_channel_crossbeam();
908
909 let snd1_c = snd1.clone();
911 let mut set = create_crossbeam_receiver_set();
912 let _recv1_select_index = set.add(recv1);
913 let recv2_select_index = set.add(recv2);
914
915 std::thread::spawn(move || {
916 std::thread::sleep(Duration::from_secs(2));
917 let _ = snd1_c.send(10);
918 });
919 std::thread::spawn(move || {
920 drop(snd2);
921 });
922
923 let select_result = set.select();
924 let channel_result = select_result.first().unwrap();
925 assert_eq!(
926 *channel_result,
927 GenericSelectionResult::ChannelClosed(recv2_select_index)
928 );
929 }
930}