1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod oneshot;
25mod shared_memory;
26pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
27pub use shared_memory::GenericSharedMemory;
28mod generic_channelset;
29pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
30
31static USE_IPC: OnceLock<bool> = OnceLock::new();
33
34fn use_ipc() -> bool {
36 *USE_IPC.get_or_init(|| {
37 servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
38 })
39}
40
41pub trait GenericSend<T>
44where
45 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
46{
47 fn send(&self, _: T) -> SendResult;
49 fn sender(&self) -> GenericSender<T>;
51}
52
53pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
57
58enum GenericSenderVariants<T: Serialize> {
63 Ipc(ipc_channel::ipc::IpcSender<T>),
64 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
72}
73
74fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
75 value: &GenericSenderVariants<T>,
76 s: S,
77) -> Result<S::Ok, S::Error> {
78 match value {
79 GenericSenderVariants::Ipc(sender) => {
80 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
81 },
82 GenericSenderVariants::Crossbeam(sender) => {
93 if opts::get().multiprocess {
94 return Err(serde::ser::Error::custom(
95 "Crossbeam channel found in multiprocess mode!",
96 ));
97 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
100 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
101 },
102 }
103}
104
105impl<T: Serialize> Serialize for GenericSender<T> {
106 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
107 serialize_generic_sender_variants(&self.0, s)
108 }
109}
110
111struct GenericSenderVisitor<T> {
112 marker: PhantomData<T>,
113}
114
115impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
116 type Value = GenericSenderVariants<T>;
117
118 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
119 formatter.write_str("a GenericSender variant")
120 }
121
122 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
123 where
124 A: serde::de::EnumAccess<'de>,
125 {
126 #[derive(Deserialize)]
127 enum GenericSenderVariantNames {
128 Ipc,
129 Crossbeam,
130 }
131
132 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
133
134 match variant_name {
135 GenericSenderVariantNames::Ipc => variant_data
136 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
137 .map(|sender| GenericSenderVariants::Ipc(sender)),
138 GenericSenderVariantNames::Crossbeam => {
139 if opts::get().multiprocess {
140 return Err(serde::de::Error::custom(
141 "Crossbeam channel found in multiprocess mode!",
142 ));
143 }
144 let addr = variant_data.newtype_variant::<usize>()?;
145 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
146 #[expect(unsafe_code)]
149 let sender = unsafe { Box::from_raw(ptr) };
150 Ok(GenericSenderVariants::Crossbeam(*sender))
151 },
152 }
153 }
154}
155
156impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
157 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
158 where
159 D: Deserializer<'a>,
160 {
161 d.deserialize_enum(
162 "GenericSender",
163 &["Ipc", "Crossbeam"],
164 GenericSenderVisitor {
165 marker: PhantomData,
166 },
167 )
168 .map(|variant| GenericSender(variant))
169 }
170}
171
172impl<T> Clone for GenericSender<T>
173where
174 T: Serialize,
175{
176 fn clone(&self) -> Self {
177 match self.0 {
178 GenericSenderVariants::Ipc(ref chan) => {
179 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
180 },
181 GenericSenderVariants::Crossbeam(ref chan) => {
182 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
183 },
184 }
185 }
186}
187
188impl<T: Serialize> fmt::Debug for GenericSender<T> {
189 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190 write!(f, "Sender(..)")
191 }
192}
193
194impl<T: Serialize> GenericSender<T> {
195 #[inline]
196 pub fn send(&self, msg: T) -> SendResult {
197 match self.0 {
198 GenericSenderVariants::Ipc(ref sender) => sender
199 .send(msg)
200 .map_err(|e| SendError::SerializationError(format!("{e}"))),
201 GenericSenderVariants::Crossbeam(ref sender) => {
202 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
203 },
204 }
205 }
206}
207
208impl<T: Serialize> MallocSizeOf for GenericSender<T> {
209 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
210 match &self.0 {
211 GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
212 GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
213 }
214 }
215}
216
217#[derive(Debug)]
218pub enum SendError {
219 Disconnected,
220 SerializationError(String),
221}
222
223impl From<IpcError> for SendError {
224 fn from(value: IpcError) -> Self {
225 match value {
226 IpcError::SerializationError(ser_de_error) => {
227 SendError::SerializationError(ser_de_error.to_string())
228 },
229 IpcError::Io(error) => {
230 log::error!("IO Error in ipc {:?}", error);
231 SendError::Disconnected
232 },
233 IpcError::Disconnected => SendError::Disconnected,
234 }
235 }
236}
237
238impl Display for SendError {
239 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
240 write!(f, "{self:?}")
241 }
242}
243
244pub type SendResult = Result<(), SendError>;
245
246#[derive(Debug)]
247pub enum ReceiveError {
248 DeserializationFailed(String),
249 Io(std::io::Error),
251 Disconnected,
253}
254
255impl From<IpcError> for ReceiveError {
256 fn from(e: IpcError) -> Self {
257 match e {
258 IpcError::Disconnected => ReceiveError::Disconnected,
259 IpcError::Io(reason) => ReceiveError::Io(reason),
260 IpcError::SerializationError(ser_de_error) => {
261 ReceiveError::DeserializationFailed(ser_de_error.to_string())
262 },
263 }
264 }
265}
266
267impl From<crossbeam_channel::RecvError> for ReceiveError {
268 fn from(_: crossbeam_channel::RecvError) -> Self {
269 ReceiveError::Disconnected
270 }
271}
272
273impl fmt::Display for ReceiveError {
274 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
275 match *self {
276 ReceiveError::DeserializationFailed(ref error) => {
277 write!(fmt, "deserialization error: {error}")
278 },
279 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
280 ReceiveError::Disconnected => write!(fmt, "disconnected"),
281 }
282 }
283}
284impl From<std::io::Error> for ReceiveError {
285 fn from(value: std::io::Error) -> Self {
286 ReceiveError::Io(value)
287 }
288}
289
290pub enum TryReceiveError {
291 Empty,
292 ReceiveError(ReceiveError),
293}
294
295impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
296 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
297 match value {
298 RecvTimeoutError::Timeout => TryReceiveError::Empty,
299 RecvTimeoutError::Disconnected => {
300 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
301 },
302 }
303 }
304}
305
306impl From<ipc_channel::TryRecvError> for TryReceiveError {
307 fn from(e: ipc_channel::TryRecvError) -> Self {
308 match e {
309 ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
310 ipc_channel::TryRecvError::IpcError(inner) => {
311 TryReceiveError::ReceiveError(inner.into())
312 },
313 }
314 }
315}
316
317impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
318 fn from(e: crossbeam_channel::TryRecvError) -> Self {
319 match e {
320 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
321 crossbeam_channel::TryRecvError::Disconnected => {
322 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
323 },
324 }
325 }
326}
327
328pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
329pub type ReceiveResult<T> = Result<T, ReceiveError>;
330pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
331pub type RoutedReceiverReceiveResult<T> =
332 Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
333
334pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
335 match receive_result {
336 Ok(Ok(msg)) => Ok(msg),
337 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
338 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
339 }
340}
341
342#[derive(MallocSizeOf)]
343pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
344where
345 T: for<'de> Deserialize<'de> + Serialize;
346
347impl<T> std::fmt::Debug for GenericReceiver<T>
348where
349 T: for<'de> Deserialize<'de> + Serialize,
350{
351 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352 f.debug_tuple("GenericReceiver").finish()
353 }
354}
355
356#[derive(MallocSizeOf)]
357enum GenericReceiverVariants<T>
358where
359 T: for<'de> Deserialize<'de> + Serialize,
360{
361 Ipc(ipc_channel::ipc::IpcReceiver<T>),
362 Crossbeam(RoutedReceiver<T>),
363}
364
365impl<T> GenericReceiver<T>
366where
367 T: for<'de> Deserialize<'de> + Serialize,
368{
369 #[inline]
370 pub fn recv(&self) -> ReceiveResult<T> {
371 match self.0 {
372 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
373 GenericReceiverVariants::Crossbeam(ref receiver) => {
374 let msg = receiver.recv()?;
376 Ok(msg.expect("Infallible"))
379 },
380 }
381 }
382
383 #[inline]
384 pub fn try_recv(&self) -> TryReceiveResult<T> {
385 match self.0 {
386 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
387 GenericReceiverVariants::Crossbeam(ref receiver) => {
388 let msg = receiver.try_recv()?;
389 Ok(msg.expect("Infallible"))
390 },
391 }
392 }
393
394 #[inline]
396 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
397 match self.0 {
398 GenericReceiverVariants::Ipc(ref ipc_receiver) => {
399 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
400 },
401 GenericReceiverVariants::Crossbeam(ref receiver) => {
402 match receiver.recv_timeout(timeout) {
403 Ok(Ok(value)) => Ok(value),
404 Ok(Err(_)) => unreachable!("Infallable"),
405 Err(RecvTimeoutError::Disconnected) => {
406 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
407 },
408 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
409 }
410 },
411 }
412 }
413
414 #[inline]
419 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
420 where
421 T: Send + 'static,
422 {
423 match self.0 {
424 GenericReceiverVariants::Ipc(ipc_receiver) => {
425 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
426 let crossbeam_sender_clone = crossbeam_sender.clone();
427 ROUTER.add_typed_route(
428 ipc_receiver,
429 Box::new(move |message| {
430 let _ = crossbeam_sender_clone
431 .send(message.map_err(IpcError::SerializationError));
432 }),
433 );
434 crossbeam_receiver
435 },
436 GenericReceiverVariants::Crossbeam(receiver) => receiver,
437 }
438 }
439}
440
441impl<T> Serialize for GenericReceiver<T>
442where
443 T: for<'de> Deserialize<'de> + Serialize,
444{
445 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
446 match &self.0 {
447 GenericReceiverVariants::Ipc(receiver) => {
448 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
449 },
450 GenericReceiverVariants::Crossbeam(receiver) => {
451 if opts::get().multiprocess {
452 return Err(serde::ser::Error::custom(
453 "Crossbeam channel found in multiprocess mode!",
454 ));
455 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
458 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
459 },
460 }
461 }
462}
463
464struct GenericReceiverVisitor<T> {
465 marker: PhantomData<T>,
466}
467impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
468where
469 T: for<'a> Deserialize<'a> + Serialize,
470{
471 type Value = GenericReceiver<T>;
472
473 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
474 formatter.write_str("a GenericReceiver variant")
475 }
476
477 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
478 where
479 A: serde::de::EnumAccess<'de>,
480 {
481 #[derive(Deserialize)]
482 enum GenericReceiverVariantNames {
483 Ipc,
484 Crossbeam,
485 }
486
487 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
488
489 match variant_name {
490 GenericReceiverVariantNames::Ipc => variant_data
491 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
492 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
493 GenericReceiverVariantNames::Crossbeam => {
494 if use_ipc() {
495 return Err(serde::de::Error::custom(
496 "Crossbeam channel found in multiprocess mode!",
497 ));
498 }
499 let addr = variant_data.newtype_variant::<usize>()?;
500 let ptr = addr as *mut RoutedReceiver<T>;
501 #[expect(unsafe_code)]
504 let receiver = unsafe { Box::from_raw(ptr) };
505 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
506 *receiver,
507 )))
508 },
509 }
510 }
511}
512
513impl<'a, T> Deserialize<'a> for GenericReceiver<T>
514where
515 T: for<'de> Deserialize<'de> + Serialize,
516{
517 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
518 where
519 D: Deserializer<'a>,
520 {
521 d.deserialize_enum(
522 "GenericReceiver",
523 &["Ipc", "Crossbeam"],
524 GenericReceiverVisitor {
525 marker: PhantomData,
526 },
527 )
528 }
529}
530
531fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
535where
536 T: Serialize + for<'de> serde::Deserialize<'de>,
537{
538 let (tx, rx) = crossbeam_channel::unbounded();
539 (
540 GenericSender(GenericSenderVariants::Crossbeam(tx)),
541 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
542 )
543}
544
545fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
546where
547 T: Serialize + for<'de> serde::Deserialize<'de>,
548{
549 ipc_channel::ipc::channel().map(|(tx, rx)| {
550 (
551 GenericSender(GenericSenderVariants::Ipc(tx)),
552 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
553 )
554 })
555}
556
557pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
561where
562 T: for<'de> Deserialize<'de> + Serialize,
563{
564 if use_ipc() {
565 new_generic_channel_ipc().ok()
566 } else {
567 Some(new_generic_channel_crossbeam())
568 }
569}
570
571#[cfg(test)]
572mod single_process_channel_tests {
573 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
578
579 #[test]
580 fn generic_crossbeam_can_send() {
581 let (tx, rx) = new_generic_channel_crossbeam();
582 tx.send(5).expect("Send failed");
583 let val = rx.recv().expect("Receive failed");
584 assert_eq!(val, 5);
585 }
586
587 #[test]
588 fn generic_crossbeam_ping_pong() {
589 let (tx, rx) = new_generic_channel_crossbeam();
590 let (tx2, rx2) = new_generic_channel_crossbeam();
591
592 tx.send(tx2).expect("Send failed");
593
594 std::thread::scope(|s| {
595 s.spawn(move || {
596 let reply_sender = rx.recv().expect("Receive failed");
597 reply_sender.send(42).expect("Sending reply failed");
598 });
599 });
600 let res = rx2.recv().expect("Receive of reply failed");
601 assert_eq!(res, 42);
602 }
603
604 #[test]
605 fn generic_ipc_ping_pong() {
606 let (tx, rx) = new_generic_channel_ipc().unwrap();
607 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
608
609 tx.send(tx2).expect("Send failed");
610
611 std::thread::scope(|s| {
612 s.spawn(move || {
613 let reply_sender = rx.recv().expect("Receive failed");
614 reply_sender.send(42).expect("Sending reply failed");
615 });
616 });
617 let res = rx2.recv().expect("Receive of reply failed");
618 assert_eq!(res, 42);
619 }
620
621 #[test]
622 fn send_crossbeam_sender_over_ipc_channel() {
623 let (tx, rx) = new_generic_channel_ipc().unwrap();
624 let (tx2, rx2) = new_generic_channel_crossbeam();
625
626 tx.send(tx2).expect("Send failed");
627
628 std::thread::scope(|s| {
629 s.spawn(move || {
630 let reply_sender = rx.recv().expect("Receive failed");
631 reply_sender.send(42).expect("Sending reply failed");
632 });
633 });
634 let res = rx2.recv().expect("Receive of reply failed");
635 assert_eq!(res, 42);
636 }
637
638 #[test]
639 fn send_generic_ipc_channel_over_crossbeam() {
640 let (tx, rx) = new_generic_channel_crossbeam();
641 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
642
643 tx.send(tx2).expect("Send failed");
644
645 std::thread::scope(|s| {
646 s.spawn(move || {
647 let reply_sender = rx.recv().expect("Receive failed");
648 reply_sender.send(42).expect("Sending reply failed");
649 });
650 });
651 let res = rx2.recv().expect("Receive of reply failed");
652 assert_eq!(res, 42);
653 }
654
655 #[test]
656 fn send_crossbeam_receiver_over_ipc_channel() {
657 let (tx, rx) = new_generic_channel_ipc().unwrap();
658 let (tx2, rx2) = new_generic_channel_crossbeam();
659
660 tx.send(rx2).expect("Send failed");
661 tx2.send(42).expect("Send failed");
662
663 std::thread::scope(|s| {
664 s.spawn(move || {
665 let another_receiver = rx.recv().expect("Receive failed");
666 let res = another_receiver.recv().expect("Receive failed");
667 assert_eq!(res, 42);
668 });
669 });
670 }
671
672 #[test]
673 fn test_timeout_ipc() {
674 let (tx, rx) = new_generic_channel_ipc().unwrap();
675 let timeout_duration = std::time::Duration::from_secs(3);
676 std::thread::spawn(move || {
677 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
678 assert!(tx.send(()).is_ok());
679 });
680 let received = rx.try_recv_timeout(timeout_duration);
681 assert!(received.is_ok());
682 }
683
684 #[test]
685 fn test_timeout_crossbeam() {
686 let (tx, rx) = new_generic_channel_crossbeam();
687 let timeout_duration = std::time::Duration::from_secs(3);
688 std::thread::spawn(move || {
689 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
690 assert!(tx.send(()).is_ok());
691 });
692 let received = rx.try_recv_timeout(timeout_duration);
693 assert!(received.is_ok());
694 }
695}
696
697#[cfg(test)]
699mod generic_receiversets_tests {
700 use std::time::Duration;
701
702 use crate::generic_channel::generic_channelset::{
703 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
704 };
705 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
706
707 #[test]
708 fn test_ipc_side1() {
709 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
710 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
711
712 let snd1_c = snd1.clone();
714 let snd2_c = snd2.clone();
715 let mut set = create_ipc_receiver_set();
716 let recv1_select_index = set.add(recv1);
717 let _recv2_select_index = set.add(recv2);
718
719 std::thread::spawn(move || {
720 snd1_c.send(10).unwrap();
721 });
722 std::thread::spawn(move || {
723 std::thread::sleep(Duration::from_secs(1));
724 let _ = snd2_c.send(20); });
726
727 let select_result = set.select();
728 let channel_result = select_result.first().unwrap();
729 assert_eq!(
730 *channel_result,
731 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
732 );
733 }
734
735 #[test]
736 fn test_ipc_side2() {
737 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
738 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
739
740 let snd1_c = snd1.clone();
742 let snd2_c = snd2.clone();
743 let mut set = create_ipc_receiver_set();
744 let _recv1_select_index = set.add(recv1);
745 let recv2_select_index = set.add(recv2);
746
747 std::thread::spawn(move || {
748 std::thread::sleep(Duration::from_secs(1));
749 let _ = snd1_c.send(10);
750 });
751 std::thread::spawn(move || {
752 snd2_c.send(20).unwrap();
753 });
754
755 let select_result = set.select();
756 let channel_result = select_result.first().unwrap();
757 assert_eq!(
758 *channel_result,
759 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
760 );
761 }
762
763 #[test]
764 fn test_crossbeam_side1() {
765 let (snd1, recv1) = new_generic_channel_crossbeam();
766 let (snd2, recv2) = new_generic_channel_crossbeam();
767
768 let snd1_c = snd1.clone();
770 let snd2_c = snd2.clone();
771 let mut set = create_crossbeam_receiver_set();
772 let recv1_select_index = set.add(recv1);
773 let _recv2_select_index = set.add(recv2);
774
775 std::thread::spawn(move || {
776 snd1_c.send(10).unwrap();
777 });
778 std::thread::spawn(move || {
779 std::thread::sleep(Duration::from_secs(2));
780 let _ = snd2_c.send(20);
781 });
782
783 let select_result = set.select();
784 let channel_result = select_result.first().unwrap();
785 assert_eq!(
786 *channel_result,
787 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
788 );
789 }
790
791 #[test]
792 fn test_crossbeam_side2() {
793 let (snd1, recv1) = new_generic_channel_crossbeam();
794 let (snd2, recv2) = new_generic_channel_crossbeam();
795
796 let snd1_c = snd1.clone();
798 let snd2_c = snd2.clone();
799 let mut set = create_crossbeam_receiver_set();
800 let _recv1_select_index = set.add(recv1);
801 let recv2_select_index = set.add(recv2);
802
803 std::thread::spawn(move || {
804 std::thread::sleep(Duration::from_secs(2));
805 let _ = snd1_c.send(10);
806 });
807 std::thread::spawn(move || {
808 snd2_c.send(20).unwrap();
809 });
810
811 let select_result = set.select();
812 let channel_result = select_result.first().unwrap();
813 assert_eq!(
814 *channel_result,
815 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
816 );
817 }
818
819 #[test]
820 fn test_ipc_no_crash_on_disconnect() {
821 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
824 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
825
826 let snd1_c = snd1.clone();
828 let mut set = create_ipc_receiver_set();
829 let _recv1_select_index = set.add(recv1);
830 let recv2_select_index = set.add(recv2);
831
832 std::thread::spawn(move || {
833 std::thread::sleep(Duration::from_secs(2));
834 let _ = snd1_c.send(10);
835 });
836 std::thread::spawn(move || {
837 snd2.send(20).unwrap();
838 });
839 std::thread::sleep(Duration::from_secs(1));
840 let select_result = set.select();
841 let channel_result = select_result.first().unwrap();
842 assert_eq!(
843 *channel_result,
844 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
845 );
846 }
847
848 #[test]
849 fn test_crossbeam_no_crash_on_disconnect() {
850 let (snd1, recv1) = new_generic_channel_crossbeam();
852 let (snd2, recv2) = new_generic_channel_crossbeam();
853
854 let snd1_c = snd1.clone();
856 let mut set = create_crossbeam_receiver_set();
857 let _recv1_select_index = set.add(recv1);
858 let recv2_select_index = set.add(recv2);
859
860 std::thread::spawn(move || {
861 std::thread::sleep(Duration::from_secs(2));
862 let _ = snd1_c.send(10);
863 });
864 std::thread::spawn(move || {
865 snd2.send(20).unwrap();
866 });
867 std::thread::sleep(Duration::from_secs(1));
868 let select_result = set.select();
869 let channel_result = select_result.first().unwrap();
870 assert_eq!(
871 *channel_result,
872 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
873 );
874 }
875
876 #[test]
877 fn test_ipc_disconnect_correct_message() {
878 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
880 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
881
882 let snd1_c = snd1.clone();
884 let mut set = create_ipc_receiver_set();
885 let _recv1_select_index = set.add(recv1);
886 let recv2_select_index = set.add(recv2);
887
888 std::thread::spawn(move || {
889 std::thread::sleep(Duration::from_secs(2));
890 let _ = snd1_c.send(10);
891 });
892 std::thread::spawn(move || {
893 drop(snd2);
894 });
895
896 let select_result = set.select();
897 let channel_result = select_result.first().unwrap();
898 assert_eq!(
899 *channel_result,
900 GenericSelectionResult::ChannelClosed(recv2_select_index)
901 );
902 }
903
904 #[test]
905 fn test_crossbeam_disconnect_correct_messaget() {
906 let (snd1, recv1) = new_generic_channel_crossbeam();
907 let (snd2, recv2) = new_generic_channel_crossbeam();
908
909 let snd1_c = snd1.clone();
911 let mut set = create_crossbeam_receiver_set();
912 let _recv1_select_index = set.add(recv1);
913 let recv2_select_index = set.add(recv2);
914
915 std::thread::spawn(move || {
916 std::thread::sleep(Duration::from_secs(2));
917 let _ = snd1_c.send(10);
918 });
919 std::thread::spawn(move || {
920 drop(snd2);
921 });
922
923 let select_result = set.select();
924 let channel_result = select_result.first().unwrap();
925 assert_eq!(
926 *channel_result,
927 GenericSelectionResult::ChannelClosed(recv2_select_index)
928 );
929 }
930}