1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::time::Duration;
11
12use crossbeam_channel::RecvTimeoutError;
13use ipc_channel::ipc::IpcError;
14use ipc_channel::router::ROUTER;
15use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
16use malloc_size_of_derive::MallocSizeOf;
17use serde::de::VariantAccess;
18use serde::{Deserialize, Deserializer, Serialize, Serializer};
19use servo_config::opts;
20
21mod callback;
22pub use callback::GenericCallback;
23mod oneshot;
24pub use ipc_channel::ipc::IpcSharedMemory as GenericSharedMemory;
27pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31pub trait GenericSend<T>
34where
35 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
36{
37 fn send(&self, _: T) -> SendResult;
39 fn sender(&self) -> GenericSender<T>;
41}
42
43pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
47
48enum GenericSenderVariants<T: Serialize> {
53 Ipc(ipc_channel::ipc::IpcSender<T>),
54 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
62}
63
64fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
65 value: &GenericSenderVariants<T>,
66 s: S,
67) -> Result<S::Ok, S::Error> {
68 match value {
69 GenericSenderVariants::Ipc(sender) => {
70 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
71 },
72 GenericSenderVariants::Crossbeam(sender) => {
83 if opts::get().multiprocess {
84 return Err(serde::ser::Error::custom(
85 "Crossbeam channel found in multiprocess mode!",
86 ));
87 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
90 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
91 },
92 }
93}
94
95impl<T: Serialize> Serialize for GenericSender<T> {
96 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
97 serialize_generic_sender_variants(&self.0, s)
98 }
99}
100
101struct GenericSenderVisitor<T> {
102 marker: PhantomData<T>,
103}
104
105impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
106 type Value = GenericSenderVariants<T>;
107
108 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
109 formatter.write_str("a GenericSender variant")
110 }
111
112 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
113 where
114 A: serde::de::EnumAccess<'de>,
115 {
116 #[derive(Deserialize)]
117 enum GenericSenderVariantNames {
118 Ipc,
119 Crossbeam,
120 }
121
122 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
123
124 match variant_name {
125 GenericSenderVariantNames::Ipc => variant_data
126 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
127 .map(|sender| GenericSenderVariants::Ipc(sender)),
128 GenericSenderVariantNames::Crossbeam => {
129 if opts::get().multiprocess {
130 return Err(serde::de::Error::custom(
131 "Crossbeam channel found in multiprocess mode!",
132 ));
133 }
134 let addr = variant_data.newtype_variant::<usize>()?;
135 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
136 #[expect(unsafe_code)]
139 let sender = unsafe { Box::from_raw(ptr) };
140 Ok(GenericSenderVariants::Crossbeam(*sender))
141 },
142 }
143 }
144}
145
146impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
147 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
148 where
149 D: Deserializer<'a>,
150 {
151 d.deserialize_enum(
152 "GenericSender",
153 &["Ipc", "Crossbeam"],
154 GenericSenderVisitor {
155 marker: PhantomData,
156 },
157 )
158 .map(|variant| GenericSender(variant))
159 }
160}
161
162impl<T> Clone for GenericSender<T>
163where
164 T: Serialize,
165{
166 fn clone(&self) -> Self {
167 match self.0 {
168 GenericSenderVariants::Ipc(ref chan) => {
169 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
170 },
171 GenericSenderVariants::Crossbeam(ref chan) => {
172 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
173 },
174 }
175 }
176}
177
178impl<T: Serialize> fmt::Debug for GenericSender<T> {
179 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
180 write!(f, "Sender(..)")
181 }
182}
183
184impl<T: Serialize> GenericSender<T> {
185 #[inline]
186 pub fn send(&self, msg: T) -> SendResult {
187 match self.0 {
188 GenericSenderVariants::Ipc(ref sender) => sender
189 .send(msg)
190 .map_err(|e| SendError::SerializationError(format!("{e}"))),
191 GenericSenderVariants::Crossbeam(ref sender) => {
192 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
193 },
194 }
195 }
196}
197
198impl<T: Serialize> MallocSizeOf for GenericSender<T> {
199 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
200 0
201 }
202}
203
204#[derive(Debug)]
205pub enum SendError {
206 Disconnected,
207 SerializationError(String),
208}
209
210impl Display for SendError {
211 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
212 write!(f, "{self:?}")
213 }
214}
215
216pub type SendResult = Result<(), SendError>;
217
218#[derive(Debug)]
219pub enum ReceiveError {
220 DeserializationFailed(String),
221 Io(std::io::Error),
223 Disconnected,
225}
226
227impl From<IpcError> for ReceiveError {
228 fn from(e: IpcError) -> Self {
229 match e {
230 IpcError::Disconnected => ReceiveError::Disconnected,
231 IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
232 IpcError::Io(reason) => ReceiveError::Io(reason),
233 }
234 }
235}
236
237impl From<crossbeam_channel::RecvError> for ReceiveError {
238 fn from(_: crossbeam_channel::RecvError) -> Self {
239 ReceiveError::Disconnected
240 }
241}
242
243impl fmt::Display for ReceiveError {
244 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
245 match *self {
246 ReceiveError::DeserializationFailed(ref error) => {
247 write!(fmt, "deserialization error: {error}")
248 },
249 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
250 ReceiveError::Disconnected => write!(fmt, "disconnected"),
251 }
252 }
253}
254impl From<std::io::Error> for ReceiveError {
255 fn from(value: std::io::Error) -> Self {
256 ReceiveError::Io(value)
257 }
258}
259
260pub enum TryReceiveError {
261 Empty,
262 ReceiveError(ReceiveError),
263}
264
265impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
266 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
267 match value {
268 RecvTimeoutError::Timeout => TryReceiveError::Empty,
269 RecvTimeoutError::Disconnected => {
270 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
271 },
272 }
273 }
274}
275
276impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
277 fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
278 match e {
279 ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
280 ipc_channel::ipc::TryRecvError::IpcError(inner) => {
281 TryReceiveError::ReceiveError(inner.into())
282 },
283 }
284 }
285}
286
287impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
288 fn from(e: crossbeam_channel::TryRecvError) -> Self {
289 match e {
290 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
291 crossbeam_channel::TryRecvError::Disconnected => {
292 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
293 },
294 }
295 }
296}
297
298pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
299pub type ReceiveResult<T> = Result<T, ReceiveError>;
300pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
301pub type RoutedReceiverReceiveResult<T> =
302 Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
303
304pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
305 match receive_result {
306 Ok(Ok(msg)) => Ok(msg),
307 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
308 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
309 }
310}
311
312#[derive(MallocSizeOf)]
313pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
314where
315 T: for<'de> Deserialize<'de> + Serialize;
316
317#[derive(MallocSizeOf)]
318enum GenericReceiverVariants<T>
319where
320 T: for<'de> Deserialize<'de> + Serialize,
321{
322 Ipc(ipc_channel::ipc::IpcReceiver<T>),
323 Crossbeam(RoutedReceiver<T>),
324}
325
326impl<T> GenericReceiver<T>
327where
328 T: for<'de> Deserialize<'de> + Serialize,
329{
330 #[inline]
331 pub fn recv(&self) -> ReceiveResult<T> {
332 match self.0 {
333 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
334 GenericReceiverVariants::Crossbeam(ref receiver) => {
335 let msg = receiver.recv()?;
337 Ok(msg.expect("Infallible"))
340 },
341 }
342 }
343
344 #[inline]
345 pub fn try_recv(&self) -> TryReceiveResult<T> {
346 match self.0 {
347 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
348 GenericReceiverVariants::Crossbeam(ref receiver) => {
349 let msg = receiver.try_recv()?;
350 Ok(msg.expect("Infallible"))
351 },
352 }
353 }
354
355 #[inline]
357 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
358 match self.0 {
359 GenericReceiverVariants::Ipc(ref ipc_receiver) => {
360 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
361 },
362 GenericReceiverVariants::Crossbeam(ref receiver) => {
363 match receiver.recv_timeout(timeout) {
364 Ok(Ok(value)) => Ok(value),
365 Ok(Err(_)) => unreachable!("Infallable"),
366 Err(RecvTimeoutError::Disconnected) => {
367 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
368 },
369 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
370 }
371 },
372 }
373 }
374
375 #[inline]
380 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
381 where
382 T: Send + 'static,
383 {
384 match self.0 {
385 GenericReceiverVariants::Ipc(ipc_receiver) => {
386 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
387 let crossbeam_sender_clone = crossbeam_sender.clone();
388 ROUTER.add_typed_route(
389 ipc_receiver,
390 Box::new(move |message| {
391 let _ = crossbeam_sender_clone.send(message);
392 }),
393 );
394 crossbeam_receiver
395 },
396 GenericReceiverVariants::Crossbeam(receiver) => receiver,
397 }
398 }
399}
400
401impl<T> Serialize for GenericReceiver<T>
402where
403 T: for<'de> Deserialize<'de> + Serialize,
404{
405 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
406 match &self.0 {
407 GenericReceiverVariants::Ipc(receiver) => {
408 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
409 },
410 GenericReceiverVariants::Crossbeam(receiver) => {
411 if opts::get().multiprocess {
412 return Err(serde::ser::Error::custom(
413 "Crossbeam channel found in multiprocess mode!",
414 ));
415 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
418 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
419 },
420 }
421 }
422}
423
424struct GenericReceiverVisitor<T> {
425 marker: PhantomData<T>,
426}
427impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
428where
429 T: for<'a> Deserialize<'a> + Serialize,
430{
431 type Value = GenericReceiver<T>;
432
433 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
434 formatter.write_str("a GenericReceiver variant")
435 }
436
437 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
438 where
439 A: serde::de::EnumAccess<'de>,
440 {
441 #[derive(Deserialize)]
442 enum GenericReceiverVariantNames {
443 Ipc,
444 Crossbeam,
445 }
446
447 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
448
449 match variant_name {
450 GenericReceiverVariantNames::Ipc => variant_data
451 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
452 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
453 GenericReceiverVariantNames::Crossbeam => {
454 if opts::get().multiprocess {
455 return Err(serde::de::Error::custom(
456 "Crossbeam channel found in multiprocess mode!",
457 ));
458 }
459 let addr = variant_data.newtype_variant::<usize>()?;
460 let ptr = addr as *mut RoutedReceiver<T>;
461 #[expect(unsafe_code)]
464 let receiver = unsafe { Box::from_raw(ptr) };
465 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
466 *receiver,
467 )))
468 },
469 }
470 }
471}
472
473impl<'a, T> Deserialize<'a> for GenericReceiver<T>
474where
475 T: for<'de> Deserialize<'de> + Serialize,
476{
477 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
478 where
479 D: Deserializer<'a>,
480 {
481 d.deserialize_enum(
482 "GenericReceiver",
483 &["Ipc", "Crossbeam"],
484 GenericReceiverVisitor {
485 marker: PhantomData,
486 },
487 )
488 }
489}
490
491fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
495where
496 T: Serialize + for<'de> serde::Deserialize<'de>,
497{
498 let (tx, rx) = crossbeam_channel::unbounded();
499 (
500 GenericSender(GenericSenderVariants::Crossbeam(tx)),
501 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
502 )
503}
504
505fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
506where
507 T: Serialize + for<'de> serde::Deserialize<'de>,
508{
509 ipc_channel::ipc::channel().map(|(tx, rx)| {
510 (
511 GenericSender(GenericSenderVariants::Ipc(tx)),
512 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
513 )
514 })
515}
516
517pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
521where
522 T: for<'de> Deserialize<'de> + Serialize,
523{
524 if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
525 new_generic_channel_ipc().ok()
526 } else {
527 Some(new_generic_channel_crossbeam())
528 }
529}
530
531#[cfg(test)]
532mod single_process_channel_tests {
533 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
538
539 #[test]
540 fn generic_crossbeam_can_send() {
541 let (tx, rx) = new_generic_channel_crossbeam();
542 tx.send(5).expect("Send failed");
543 let val = rx.recv().expect("Receive failed");
544 assert_eq!(val, 5);
545 }
546
547 #[test]
548 fn generic_crossbeam_ping_pong() {
549 let (tx, rx) = new_generic_channel_crossbeam();
550 let (tx2, rx2) = new_generic_channel_crossbeam();
551
552 tx.send(tx2).expect("Send failed");
553
554 std::thread::scope(|s| {
555 s.spawn(move || {
556 let reply_sender = rx.recv().expect("Receive failed");
557 reply_sender.send(42).expect("Sending reply failed");
558 });
559 });
560 let res = rx2.recv().expect("Receive of reply failed");
561 assert_eq!(res, 42);
562 }
563
564 #[test]
565 fn generic_ipc_ping_pong() {
566 let (tx, rx) = new_generic_channel_ipc().unwrap();
567 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
568
569 tx.send(tx2).expect("Send failed");
570
571 std::thread::scope(|s| {
572 s.spawn(move || {
573 let reply_sender = rx.recv().expect("Receive failed");
574 reply_sender.send(42).expect("Sending reply failed");
575 });
576 });
577 let res = rx2.recv().expect("Receive of reply failed");
578 assert_eq!(res, 42);
579 }
580
581 #[test]
582 fn send_crossbeam_sender_over_ipc_channel() {
583 let (tx, rx) = new_generic_channel_ipc().unwrap();
584 let (tx2, rx2) = new_generic_channel_crossbeam();
585
586 tx.send(tx2).expect("Send failed");
587
588 std::thread::scope(|s| {
589 s.spawn(move || {
590 let reply_sender = rx.recv().expect("Receive failed");
591 reply_sender.send(42).expect("Sending reply failed");
592 });
593 });
594 let res = rx2.recv().expect("Receive of reply failed");
595 assert_eq!(res, 42);
596 }
597
598 #[test]
599 fn send_generic_ipc_channel_over_crossbeam() {
600 let (tx, rx) = new_generic_channel_crossbeam();
601 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
602
603 tx.send(tx2).expect("Send failed");
604
605 std::thread::scope(|s| {
606 s.spawn(move || {
607 let reply_sender = rx.recv().expect("Receive failed");
608 reply_sender.send(42).expect("Sending reply failed");
609 });
610 });
611 let res = rx2.recv().expect("Receive of reply failed");
612 assert_eq!(res, 42);
613 }
614
615 #[test]
616 fn send_crossbeam_receiver_over_ipc_channel() {
617 let (tx, rx) = new_generic_channel_ipc().unwrap();
618 let (tx2, rx2) = new_generic_channel_crossbeam();
619
620 tx.send(rx2).expect("Send failed");
621 tx2.send(42).expect("Send failed");
622
623 std::thread::scope(|s| {
624 s.spawn(move || {
625 let another_receiver = rx.recv().expect("Receive failed");
626 let res = another_receiver.recv().expect("Receive failed");
627 assert_eq!(res, 42);
628 });
629 });
630 }
631
632 #[test]
633 fn test_timeout_ipc() {
634 let (tx, rx) = new_generic_channel_ipc().unwrap();
635 let timeout_duration = std::time::Duration::from_secs(3);
636 std::thread::spawn(move || {
637 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
638 assert!(tx.send(()).is_ok());
639 });
640 let received = rx.try_recv_timeout(timeout_duration);
641 assert!(received.is_ok());
642 }
643
644 #[test]
645 fn test_timeout_crossbeam() {
646 let (tx, rx) = new_generic_channel_crossbeam();
647 let timeout_duration = std::time::Duration::from_secs(3);
648 std::thread::spawn(move || {
649 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
650 assert!(tx.send(()).is_ok());
651 });
652 let received = rx.try_recv_timeout(timeout_duration);
653 assert!(received.is_ok());
654 }
655}
656
657#[cfg(test)]
659mod generic_receiversets_tests {
660 use std::time::Duration;
661
662 use crate::generic_channel::generic_channelset::{
663 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
664 };
665 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
666
667 #[test]
668 fn test_ipc_side1() {
669 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
670 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
671
672 let snd1_c = snd1.clone();
674 let snd2_c = snd2.clone();
675 let mut set = create_ipc_receiver_set();
676 let recv1_select_index = set.add(recv1);
677 let _recv2_select_index = set.add(recv2);
678
679 std::thread::spawn(move || {
680 snd1_c.send(10).unwrap();
681 });
682 std::thread::spawn(move || {
683 std::thread::sleep(Duration::from_secs(1));
684 let _ = snd2_c.send(20); });
686
687 let select_result = set.select();
688 let channel_result = select_result.first().unwrap();
689 assert_eq!(
690 *channel_result,
691 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
692 );
693 }
694
695 #[test]
696 fn test_ipc_side2() {
697 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
698 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
699
700 let snd1_c = snd1.clone();
702 let snd2_c = snd2.clone();
703 let mut set = create_ipc_receiver_set();
704 let _recv1_select_index = set.add(recv1);
705 let recv2_select_index = set.add(recv2);
706
707 std::thread::spawn(move || {
708 std::thread::sleep(Duration::from_secs(1));
709 let _ = snd1_c.send(10);
710 });
711 std::thread::spawn(move || {
712 snd2_c.send(20).unwrap();
713 });
714
715 let select_result = set.select();
716 let channel_result = select_result.first().unwrap();
717 assert_eq!(
718 *channel_result,
719 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
720 );
721 }
722
723 #[test]
724 fn test_crossbeam_side1() {
725 let (snd1, recv1) = new_generic_channel_crossbeam();
726 let (snd2, recv2) = new_generic_channel_crossbeam();
727
728 let snd1_c = snd1.clone();
730 let snd2_c = snd2.clone();
731 let mut set = create_crossbeam_receiver_set();
732 let recv1_select_index = set.add(recv1);
733 let _recv2_select_index = set.add(recv2);
734
735 std::thread::spawn(move || {
736 snd1_c.send(10).unwrap();
737 });
738 std::thread::spawn(move || {
739 std::thread::sleep(Duration::from_secs(2));
740 let _ = snd2_c.send(20);
741 });
742
743 let select_result = set.select();
744 let channel_result = select_result.first().unwrap();
745 assert_eq!(
746 *channel_result,
747 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
748 );
749 }
750
751 #[test]
752 fn test_crossbeam_side2() {
753 let (snd1, recv1) = new_generic_channel_crossbeam();
754 let (snd2, recv2) = new_generic_channel_crossbeam();
755
756 let snd1_c = snd1.clone();
758 let snd2_c = snd2.clone();
759 let mut set = create_crossbeam_receiver_set();
760 let _recv1_select_index = set.add(recv1);
761 let recv2_select_index = set.add(recv2);
762
763 std::thread::spawn(move || {
764 std::thread::sleep(Duration::from_secs(2));
765 let _ = snd1_c.send(10);
766 });
767 std::thread::spawn(move || {
768 snd2_c.send(20).unwrap();
769 });
770
771 let select_result = set.select();
772 let channel_result = select_result.first().unwrap();
773 assert_eq!(
774 *channel_result,
775 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
776 );
777 }
778
779 #[test]
780 fn test_ipc_no_crash_on_disconnect() {
781 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
784 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
785
786 let snd1_c = snd1.clone();
788 let mut set = create_ipc_receiver_set();
789 let _recv1_select_index = set.add(recv1);
790 let recv2_select_index = set.add(recv2);
791
792 std::thread::spawn(move || {
793 std::thread::sleep(Duration::from_secs(2));
794 let _ = snd1_c.send(10);
795 });
796 std::thread::spawn(move || {
797 snd2.send(20).unwrap();
798 });
799 std::thread::sleep(Duration::from_secs(1));
800 let select_result = set.select();
801 let channel_result = select_result.first().unwrap();
802 assert_eq!(
803 *channel_result,
804 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
805 );
806 }
807
808 #[test]
809 fn test_crossbeam_no_crash_on_disconnect() {
810 let (snd1, recv1) = new_generic_channel_crossbeam();
812 let (snd2, recv2) = new_generic_channel_crossbeam();
813
814 let snd1_c = snd1.clone();
816 let mut set = create_crossbeam_receiver_set();
817 let _recv1_select_index = set.add(recv1);
818 let recv2_select_index = set.add(recv2);
819
820 std::thread::spawn(move || {
821 std::thread::sleep(Duration::from_secs(2));
822 let _ = snd1_c.send(10);
823 });
824 std::thread::spawn(move || {
825 snd2.send(20).unwrap();
826 });
827 std::thread::sleep(Duration::from_secs(1));
828 let select_result = set.select();
829 let channel_result = select_result.first().unwrap();
830 assert_eq!(
831 *channel_result,
832 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
833 );
834 }
835
836 #[test]
837 fn test_ipc_disconnect_correct_message() {
838 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
840 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
841
842 let snd1_c = snd1.clone();
844 let mut set = create_ipc_receiver_set();
845 let _recv1_select_index = set.add(recv1);
846 let recv2_select_index = set.add(recv2);
847
848 std::thread::spawn(move || {
849 std::thread::sleep(Duration::from_secs(2));
850 let _ = snd1_c.send(10);
851 });
852 std::thread::spawn(move || {
853 drop(snd2);
854 });
855
856 let select_result = set.select();
857 let channel_result = select_result.first().unwrap();
858 assert_eq!(
859 *channel_result,
860 GenericSelectionResult::ChannelClosed(recv2_select_index)
861 );
862 }
863
864 #[test]
865 fn test_crossbeam_disconnect_correct_messaget() {
866 let (snd1, recv1) = new_generic_channel_crossbeam();
867 let (snd2, recv2) = new_generic_channel_crossbeam();
868
869 let snd1_c = snd1.clone();
871 let mut set = create_crossbeam_receiver_set();
872 let _recv1_select_index = set.add(recv1);
873 let recv2_select_index = set.add(recv2);
874
875 std::thread::spawn(move || {
876 std::thread::sleep(Duration::from_secs(2));
877 let _ = snd1_c.send(10);
878 });
879 std::thread::spawn(move || {
880 drop(snd2);
881 });
882
883 let select_result = set.select();
884 let channel_result = select_result.first().unwrap();
885 assert_eq!(
886 *channel_result,
887 GenericSelectionResult::ChannelClosed(recv2_select_index)
888 );
889 }
890}