1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::time::Duration;
11
12use crossbeam_channel::RecvTimeoutError;
13use ipc_channel::ipc::IpcError;
14use ipc_channel::router::ROUTER;
15use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
16use malloc_size_of_derive::MallocSizeOf;
17use serde::de::VariantAccess;
18use serde::{Deserialize, Deserializer, Serialize, Serializer};
19use servo_config::opts;
20
21mod callback;
22pub use callback::GenericCallback;
23mod oneshot;
24pub use ipc_channel::ipc::IpcSharedMemory as GenericSharedMemory;
27pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31pub trait GenericSend<T>
34where
35 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
36{
37 fn send(&self, _: T) -> SendResult;
39 fn sender(&self) -> GenericSender<T>;
41}
42
43pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
47
48enum GenericSenderVariants<T: Serialize> {
53 Ipc(ipc_channel::ipc::IpcSender<T>),
54 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
62}
63
64fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
65 value: &GenericSenderVariants<T>,
66 s: S,
67) -> Result<S::Ok, S::Error> {
68 match value {
69 GenericSenderVariants::Ipc(sender) => {
70 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
71 },
72 GenericSenderVariants::Crossbeam(sender) => {
83 if opts::get().multiprocess {
84 return Err(serde::ser::Error::custom(
85 "Crossbeam channel found in multiprocess mode!",
86 ));
87 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
90 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
91 },
92 }
93}
94
95impl<T: Serialize> Serialize for GenericSender<T> {
96 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
97 serialize_generic_sender_variants(&self.0, s)
98 }
99}
100
101struct GenericSenderVisitor<T> {
102 marker: PhantomData<T>,
103}
104
105impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
106 type Value = GenericSenderVariants<T>;
107
108 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
109 formatter.write_str("a GenericSender variant")
110 }
111
112 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
113 where
114 A: serde::de::EnumAccess<'de>,
115 {
116 #[derive(Deserialize)]
117 enum GenericSenderVariantNames {
118 Ipc,
119 Crossbeam,
120 }
121
122 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
123
124 match variant_name {
125 GenericSenderVariantNames::Ipc => variant_data
126 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
127 .map(|sender| GenericSenderVariants::Ipc(sender)),
128 GenericSenderVariantNames::Crossbeam => {
129 if opts::get().multiprocess {
130 return Err(serde::de::Error::custom(
131 "Crossbeam channel found in multiprocess mode!",
132 ));
133 }
134 let addr = variant_data.newtype_variant::<usize>()?;
135 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
136 #[expect(unsafe_code)]
139 let sender = unsafe { Box::from_raw(ptr) };
140 Ok(GenericSenderVariants::Crossbeam(*sender))
141 },
142 }
143 }
144}
145
146impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
147 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
148 where
149 D: Deserializer<'a>,
150 {
151 d.deserialize_enum(
152 "GenericSender",
153 &["Ipc", "Crossbeam"],
154 GenericSenderVisitor {
155 marker: PhantomData,
156 },
157 )
158 .map(|variant| GenericSender(variant))
159 }
160}
161
162impl<T> Clone for GenericSender<T>
163where
164 T: Serialize,
165{
166 fn clone(&self) -> Self {
167 match self.0 {
168 GenericSenderVariants::Ipc(ref chan) => {
169 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
170 },
171 GenericSenderVariants::Crossbeam(ref chan) => {
172 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
173 },
174 }
175 }
176}
177
178impl<T: Serialize> fmt::Debug for GenericSender<T> {
179 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
180 write!(f, "Sender(..)")
181 }
182}
183
184impl<T: Serialize> GenericSender<T> {
185 #[inline]
186 pub fn send(&self, msg: T) -> SendResult {
187 match self.0 {
188 GenericSenderVariants::Ipc(ref sender) => sender
189 .send(msg)
190 .map_err(|e| SendError::SerializationError(format!("{e}"))),
191 GenericSenderVariants::Crossbeam(ref sender) => {
192 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
193 },
194 }
195 }
196}
197
198impl<T: Serialize> MallocSizeOf for GenericSender<T> {
199 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
200 match &self.0 {
201 GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
202 GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
203 }
204 }
205}
206
207#[derive(Debug)]
208pub enum SendError {
209 Disconnected,
210 SerializationError(String),
211}
212
213impl Display for SendError {
214 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215 write!(f, "{self:?}")
216 }
217}
218
219pub type SendResult = Result<(), SendError>;
220
221#[derive(Debug)]
222pub enum ReceiveError {
223 DeserializationFailed(String),
224 Io(std::io::Error),
226 Disconnected,
228}
229
230impl From<IpcError> for ReceiveError {
231 fn from(e: IpcError) -> Self {
232 match e {
233 IpcError::Disconnected => ReceiveError::Disconnected,
234 IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
235 IpcError::Io(reason) => ReceiveError::Io(reason),
236 }
237 }
238}
239
240impl From<crossbeam_channel::RecvError> for ReceiveError {
241 fn from(_: crossbeam_channel::RecvError) -> Self {
242 ReceiveError::Disconnected
243 }
244}
245
246impl fmt::Display for ReceiveError {
247 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
248 match *self {
249 ReceiveError::DeserializationFailed(ref error) => {
250 write!(fmt, "deserialization error: {error}")
251 },
252 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
253 ReceiveError::Disconnected => write!(fmt, "disconnected"),
254 }
255 }
256}
257impl From<std::io::Error> for ReceiveError {
258 fn from(value: std::io::Error) -> Self {
259 ReceiveError::Io(value)
260 }
261}
262
263pub enum TryReceiveError {
264 Empty,
265 ReceiveError(ReceiveError),
266}
267
268impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
269 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
270 match value {
271 RecvTimeoutError::Timeout => TryReceiveError::Empty,
272 RecvTimeoutError::Disconnected => {
273 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
274 },
275 }
276 }
277}
278
279impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
280 fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
281 match e {
282 ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
283 ipc_channel::ipc::TryRecvError::IpcError(inner) => {
284 TryReceiveError::ReceiveError(inner.into())
285 },
286 }
287 }
288}
289
290impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
291 fn from(e: crossbeam_channel::TryRecvError) -> Self {
292 match e {
293 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
294 crossbeam_channel::TryRecvError::Disconnected => {
295 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
296 },
297 }
298 }
299}
300
301pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
302pub type ReceiveResult<T> = Result<T, ReceiveError>;
303pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
304pub type RoutedReceiverReceiveResult<T> =
305 Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
306
307pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
308 match receive_result {
309 Ok(Ok(msg)) => Ok(msg),
310 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
311 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
312 }
313}
314
315#[derive(MallocSizeOf)]
316pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
317where
318 T: for<'de> Deserialize<'de> + Serialize;
319
320impl<T> std::fmt::Debug for GenericReceiver<T>
321where
322 T: for<'de> Deserialize<'de> + Serialize,
323{
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 f.debug_tuple("GenericReceiver").finish()
326 }
327}
328
329#[derive(MallocSizeOf)]
330enum GenericReceiverVariants<T>
331where
332 T: for<'de> Deserialize<'de> + Serialize,
333{
334 Ipc(ipc_channel::ipc::IpcReceiver<T>),
335 Crossbeam(RoutedReceiver<T>),
336}
337
338impl<T> GenericReceiver<T>
339where
340 T: for<'de> Deserialize<'de> + Serialize,
341{
342 #[inline]
343 pub fn recv(&self) -> ReceiveResult<T> {
344 match self.0 {
345 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
346 GenericReceiverVariants::Crossbeam(ref receiver) => {
347 let msg = receiver.recv()?;
349 Ok(msg.expect("Infallible"))
352 },
353 }
354 }
355
356 #[inline]
357 pub fn try_recv(&self) -> TryReceiveResult<T> {
358 match self.0 {
359 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
360 GenericReceiverVariants::Crossbeam(ref receiver) => {
361 let msg = receiver.try_recv()?;
362 Ok(msg.expect("Infallible"))
363 },
364 }
365 }
366
367 #[inline]
369 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
370 match self.0 {
371 GenericReceiverVariants::Ipc(ref ipc_receiver) => {
372 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
373 },
374 GenericReceiverVariants::Crossbeam(ref receiver) => {
375 match receiver.recv_timeout(timeout) {
376 Ok(Ok(value)) => Ok(value),
377 Ok(Err(_)) => unreachable!("Infallable"),
378 Err(RecvTimeoutError::Disconnected) => {
379 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
380 },
381 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
382 }
383 },
384 }
385 }
386
387 #[inline]
392 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
393 where
394 T: Send + 'static,
395 {
396 match self.0 {
397 GenericReceiverVariants::Ipc(ipc_receiver) => {
398 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
399 let crossbeam_sender_clone = crossbeam_sender.clone();
400 ROUTER.add_typed_route(
401 ipc_receiver,
402 Box::new(move |message| {
403 let _ = crossbeam_sender_clone.send(message);
404 }),
405 );
406 crossbeam_receiver
407 },
408 GenericReceiverVariants::Crossbeam(receiver) => receiver,
409 }
410 }
411}
412
413impl<T> Serialize for GenericReceiver<T>
414where
415 T: for<'de> Deserialize<'de> + Serialize,
416{
417 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
418 match &self.0 {
419 GenericReceiverVariants::Ipc(receiver) => {
420 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
421 },
422 GenericReceiverVariants::Crossbeam(receiver) => {
423 if opts::get().multiprocess {
424 return Err(serde::ser::Error::custom(
425 "Crossbeam channel found in multiprocess mode!",
426 ));
427 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
430 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
431 },
432 }
433 }
434}
435
436struct GenericReceiverVisitor<T> {
437 marker: PhantomData<T>,
438}
439impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
440where
441 T: for<'a> Deserialize<'a> + Serialize,
442{
443 type Value = GenericReceiver<T>;
444
445 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
446 formatter.write_str("a GenericReceiver variant")
447 }
448
449 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
450 where
451 A: serde::de::EnumAccess<'de>,
452 {
453 #[derive(Deserialize)]
454 enum GenericReceiverVariantNames {
455 Ipc,
456 Crossbeam,
457 }
458
459 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
460
461 match variant_name {
462 GenericReceiverVariantNames::Ipc => variant_data
463 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
464 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
465 GenericReceiverVariantNames::Crossbeam => {
466 if opts::get().multiprocess {
467 return Err(serde::de::Error::custom(
468 "Crossbeam channel found in multiprocess mode!",
469 ));
470 }
471 let addr = variant_data.newtype_variant::<usize>()?;
472 let ptr = addr as *mut RoutedReceiver<T>;
473 #[expect(unsafe_code)]
476 let receiver = unsafe { Box::from_raw(ptr) };
477 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
478 *receiver,
479 )))
480 },
481 }
482 }
483}
484
485impl<'a, T> Deserialize<'a> for GenericReceiver<T>
486where
487 T: for<'de> Deserialize<'de> + Serialize,
488{
489 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
490 where
491 D: Deserializer<'a>,
492 {
493 d.deserialize_enum(
494 "GenericReceiver",
495 &["Ipc", "Crossbeam"],
496 GenericReceiverVisitor {
497 marker: PhantomData,
498 },
499 )
500 }
501}
502
503fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
507where
508 T: Serialize + for<'de> serde::Deserialize<'de>,
509{
510 let (tx, rx) = crossbeam_channel::unbounded();
511 (
512 GenericSender(GenericSenderVariants::Crossbeam(tx)),
513 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
514 )
515}
516
517fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
518where
519 T: Serialize + for<'de> serde::Deserialize<'de>,
520{
521 ipc_channel::ipc::channel().map(|(tx, rx)| {
522 (
523 GenericSender(GenericSenderVariants::Ipc(tx)),
524 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
525 )
526 })
527}
528
529pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
533where
534 T: for<'de> Deserialize<'de> + Serialize,
535{
536 if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
537 new_generic_channel_ipc().ok()
538 } else {
539 Some(new_generic_channel_crossbeam())
540 }
541}
542
543#[cfg(test)]
544mod single_process_channel_tests {
545 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
550
551 #[test]
552 fn generic_crossbeam_can_send() {
553 let (tx, rx) = new_generic_channel_crossbeam();
554 tx.send(5).expect("Send failed");
555 let val = rx.recv().expect("Receive failed");
556 assert_eq!(val, 5);
557 }
558
559 #[test]
560 fn generic_crossbeam_ping_pong() {
561 let (tx, rx) = new_generic_channel_crossbeam();
562 let (tx2, rx2) = new_generic_channel_crossbeam();
563
564 tx.send(tx2).expect("Send failed");
565
566 std::thread::scope(|s| {
567 s.spawn(move || {
568 let reply_sender = rx.recv().expect("Receive failed");
569 reply_sender.send(42).expect("Sending reply failed");
570 });
571 });
572 let res = rx2.recv().expect("Receive of reply failed");
573 assert_eq!(res, 42);
574 }
575
576 #[test]
577 fn generic_ipc_ping_pong() {
578 let (tx, rx) = new_generic_channel_ipc().unwrap();
579 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
580
581 tx.send(tx2).expect("Send failed");
582
583 std::thread::scope(|s| {
584 s.spawn(move || {
585 let reply_sender = rx.recv().expect("Receive failed");
586 reply_sender.send(42).expect("Sending reply failed");
587 });
588 });
589 let res = rx2.recv().expect("Receive of reply failed");
590 assert_eq!(res, 42);
591 }
592
593 #[test]
594 fn send_crossbeam_sender_over_ipc_channel() {
595 let (tx, rx) = new_generic_channel_ipc().unwrap();
596 let (tx2, rx2) = new_generic_channel_crossbeam();
597
598 tx.send(tx2).expect("Send failed");
599
600 std::thread::scope(|s| {
601 s.spawn(move || {
602 let reply_sender = rx.recv().expect("Receive failed");
603 reply_sender.send(42).expect("Sending reply failed");
604 });
605 });
606 let res = rx2.recv().expect("Receive of reply failed");
607 assert_eq!(res, 42);
608 }
609
610 #[test]
611 fn send_generic_ipc_channel_over_crossbeam() {
612 let (tx, rx) = new_generic_channel_crossbeam();
613 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
614
615 tx.send(tx2).expect("Send failed");
616
617 std::thread::scope(|s| {
618 s.spawn(move || {
619 let reply_sender = rx.recv().expect("Receive failed");
620 reply_sender.send(42).expect("Sending reply failed");
621 });
622 });
623 let res = rx2.recv().expect("Receive of reply failed");
624 assert_eq!(res, 42);
625 }
626
627 #[test]
628 fn send_crossbeam_receiver_over_ipc_channel() {
629 let (tx, rx) = new_generic_channel_ipc().unwrap();
630 let (tx2, rx2) = new_generic_channel_crossbeam();
631
632 tx.send(rx2).expect("Send failed");
633 tx2.send(42).expect("Send failed");
634
635 std::thread::scope(|s| {
636 s.spawn(move || {
637 let another_receiver = rx.recv().expect("Receive failed");
638 let res = another_receiver.recv().expect("Receive failed");
639 assert_eq!(res, 42);
640 });
641 });
642 }
643
644 #[test]
645 fn test_timeout_ipc() {
646 let (tx, rx) = new_generic_channel_ipc().unwrap();
647 let timeout_duration = std::time::Duration::from_secs(3);
648 std::thread::spawn(move || {
649 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
650 assert!(tx.send(()).is_ok());
651 });
652 let received = rx.try_recv_timeout(timeout_duration);
653 assert!(received.is_ok());
654 }
655
656 #[test]
657 fn test_timeout_crossbeam() {
658 let (tx, rx) = new_generic_channel_crossbeam();
659 let timeout_duration = std::time::Duration::from_secs(3);
660 std::thread::spawn(move || {
661 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
662 assert!(tx.send(()).is_ok());
663 });
664 let received = rx.try_recv_timeout(timeout_duration);
665 assert!(received.is_ok());
666 }
667}
668
669#[cfg(test)]
671mod generic_receiversets_tests {
672 use std::time::Duration;
673
674 use crate::generic_channel::generic_channelset::{
675 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
676 };
677 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
678
679 #[test]
680 fn test_ipc_side1() {
681 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
682 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
683
684 let snd1_c = snd1.clone();
686 let snd2_c = snd2.clone();
687 let mut set = create_ipc_receiver_set();
688 let recv1_select_index = set.add(recv1);
689 let _recv2_select_index = set.add(recv2);
690
691 std::thread::spawn(move || {
692 snd1_c.send(10).unwrap();
693 });
694 std::thread::spawn(move || {
695 std::thread::sleep(Duration::from_secs(1));
696 let _ = snd2_c.send(20); });
698
699 let select_result = set.select();
700 let channel_result = select_result.first().unwrap();
701 assert_eq!(
702 *channel_result,
703 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
704 );
705 }
706
707 #[test]
708 fn test_ipc_side2() {
709 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712 let snd1_c = snd1.clone();
714 let snd2_c = snd2.clone();
715 let mut set = create_ipc_receiver_set();
716 let _recv1_select_index = set.add(recv1);
717 let recv2_select_index = set.add(recv2);
718
719 std::thread::spawn(move || {
720 std::thread::sleep(Duration::from_secs(1));
721 let _ = snd1_c.send(10);
722 });
723 std::thread::spawn(move || {
724 snd2_c.send(20).unwrap();
725 });
726
727 let select_result = set.select();
728 let channel_result = select_result.first().unwrap();
729 assert_eq!(
730 *channel_result,
731 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
732 );
733 }
734
735 #[test]
736 fn test_crossbeam_side1() {
737 let (snd1, recv1) = new_generic_channel_crossbeam();
738 let (snd2, recv2) = new_generic_channel_crossbeam();
739
740 let snd1_c = snd1.clone();
742 let snd2_c = snd2.clone();
743 let mut set = create_crossbeam_receiver_set();
744 let recv1_select_index = set.add(recv1);
745 let _recv2_select_index = set.add(recv2);
746
747 std::thread::spawn(move || {
748 snd1_c.send(10).unwrap();
749 });
750 std::thread::spawn(move || {
751 std::thread::sleep(Duration::from_secs(2));
752 let _ = snd2_c.send(20);
753 });
754
755 let select_result = set.select();
756 let channel_result = select_result.first().unwrap();
757 assert_eq!(
758 *channel_result,
759 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
760 );
761 }
762
763 #[test]
764 fn test_crossbeam_side2() {
765 let (snd1, recv1) = new_generic_channel_crossbeam();
766 let (snd2, recv2) = new_generic_channel_crossbeam();
767
768 let snd1_c = snd1.clone();
770 let snd2_c = snd2.clone();
771 let mut set = create_crossbeam_receiver_set();
772 let _recv1_select_index = set.add(recv1);
773 let recv2_select_index = set.add(recv2);
774
775 std::thread::spawn(move || {
776 std::thread::sleep(Duration::from_secs(2));
777 let _ = snd1_c.send(10);
778 });
779 std::thread::spawn(move || {
780 snd2_c.send(20).unwrap();
781 });
782
783 let select_result = set.select();
784 let channel_result = select_result.first().unwrap();
785 assert_eq!(
786 *channel_result,
787 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
788 );
789 }
790
791 #[test]
792 fn test_ipc_no_crash_on_disconnect() {
793 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
796 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
797
798 let snd1_c = snd1.clone();
800 let mut set = create_ipc_receiver_set();
801 let _recv1_select_index = set.add(recv1);
802 let recv2_select_index = set.add(recv2);
803
804 std::thread::spawn(move || {
805 std::thread::sleep(Duration::from_secs(2));
806 let _ = snd1_c.send(10);
807 });
808 std::thread::spawn(move || {
809 snd2.send(20).unwrap();
810 });
811 std::thread::sleep(Duration::from_secs(1));
812 let select_result = set.select();
813 let channel_result = select_result.first().unwrap();
814 assert_eq!(
815 *channel_result,
816 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
817 );
818 }
819
820 #[test]
821 fn test_crossbeam_no_crash_on_disconnect() {
822 let (snd1, recv1) = new_generic_channel_crossbeam();
824 let (snd2, recv2) = new_generic_channel_crossbeam();
825
826 let snd1_c = snd1.clone();
828 let mut set = create_crossbeam_receiver_set();
829 let _recv1_select_index = set.add(recv1);
830 let recv2_select_index = set.add(recv2);
831
832 std::thread::spawn(move || {
833 std::thread::sleep(Duration::from_secs(2));
834 let _ = snd1_c.send(10);
835 });
836 std::thread::spawn(move || {
837 snd2.send(20).unwrap();
838 });
839 std::thread::sleep(Duration::from_secs(1));
840 let select_result = set.select();
841 let channel_result = select_result.first().unwrap();
842 assert_eq!(
843 *channel_result,
844 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845 );
846 }
847
848 #[test]
849 fn test_ipc_disconnect_correct_message() {
850 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
852 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
853
854 let snd1_c = snd1.clone();
856 let mut set = create_ipc_receiver_set();
857 let _recv1_select_index = set.add(recv1);
858 let recv2_select_index = set.add(recv2);
859
860 std::thread::spawn(move || {
861 std::thread::sleep(Duration::from_secs(2));
862 let _ = snd1_c.send(10);
863 });
864 std::thread::spawn(move || {
865 drop(snd2);
866 });
867
868 let select_result = set.select();
869 let channel_result = select_result.first().unwrap();
870 assert_eq!(
871 *channel_result,
872 GenericSelectionResult::ChannelClosed(recv2_select_index)
873 );
874 }
875
876 #[test]
877 fn test_crossbeam_disconnect_correct_messaget() {
878 let (snd1, recv1) = new_generic_channel_crossbeam();
879 let (snd2, recv2) = new_generic_channel_crossbeam();
880
881 let snd1_c = snd1.clone();
883 let mut set = create_crossbeam_receiver_set();
884 let _recv1_select_index = set.add(recv1);
885 let recv2_select_index = set.add(recv2);
886
887 std::thread::spawn(move || {
888 std::thread::sleep(Duration::from_secs(2));
889 let _ = snd1_c.send(10);
890 });
891 std::thread::spawn(move || {
892 drop(snd2);
893 });
894
895 let select_result = set.select();
896 let channel_result = select_result.first().unwrap();
897 assert_eq!(
898 *channel_result,
899 GenericSelectionResult::ChannelClosed(recv2_select_index)
900 );
901 }
902}