1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::sync::OnceLock;
11use std::time::Duration;
12
13use crossbeam_channel::RecvTimeoutError;
14use ipc_channel::IpcError;
15use ipc_channel::router::ROUTER;
16use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
17use malloc_size_of_derive::MallocSizeOf;
18use serde::de::VariantAccess;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use servo_config::opts;
21
22mod callback;
23pub use callback::GenericCallback;
24mod lazy_callback;
25pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
26mod oneshot;
27mod shared_memory;
28pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
29pub use shared_memory::GenericSharedMemory;
30mod generic_channelset;
31pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
32
33static USE_IPC: OnceLock<bool> = OnceLock::new();
35
36fn use_ipc() -> bool {
38 *USE_IPC.get_or_init(|| {
39 servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
40 })
41}
42
43pub trait GenericSend<T>
46where
47 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
48{
49 fn send(&self, _: T) -> SendResult;
51 fn sender(&self) -> GenericSender<T>;
53}
54
55pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
59
60enum GenericSenderVariants<T: Serialize> {
65 Ipc(ipc_channel::ipc::IpcSender<T>),
66 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
74}
75
76fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
77 value: &GenericSenderVariants<T>,
78 s: S,
79) -> Result<S::Ok, S::Error> {
80 match value {
81 GenericSenderVariants::Ipc(sender) => {
82 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
83 },
84 GenericSenderVariants::Crossbeam(sender) => {
95 if opts::get().multiprocess {
96 return Err(serde::ser::Error::custom(
97 "Crossbeam channel found in multiprocess mode!",
98 ));
99 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
102 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
103 },
104 }
105}
106
107impl<T: Serialize> Serialize for GenericSender<T> {
108 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
109 serialize_generic_sender_variants(&self.0, s)
110 }
111}
112
113struct GenericSenderVisitor<T> {
114 marker: PhantomData<T>,
115}
116
117impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
118 type Value = GenericSenderVariants<T>;
119
120 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
121 formatter.write_str("a GenericSender variant")
122 }
123
124 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
125 where
126 A: serde::de::EnumAccess<'de>,
127 {
128 #[derive(Deserialize)]
129 enum GenericSenderVariantNames {
130 Ipc,
131 Crossbeam,
132 }
133
134 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
135
136 match variant_name {
137 GenericSenderVariantNames::Ipc => variant_data
138 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
139 .map(|sender| GenericSenderVariants::Ipc(sender)),
140 GenericSenderVariantNames::Crossbeam => {
141 if opts::get().multiprocess {
142 return Err(serde::de::Error::custom(
143 "Crossbeam channel found in multiprocess mode!",
144 ));
145 }
146 let addr = variant_data.newtype_variant::<usize>()?;
147 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
148 #[expect(unsafe_code)]
151 let sender = unsafe { Box::from_raw(ptr) };
152 Ok(GenericSenderVariants::Crossbeam(*sender))
153 },
154 }
155 }
156}
157
158impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
159 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
160 where
161 D: Deserializer<'a>,
162 {
163 d.deserialize_enum(
164 "GenericSender",
165 &["Ipc", "Crossbeam"],
166 GenericSenderVisitor {
167 marker: PhantomData,
168 },
169 )
170 .map(|variant| GenericSender(variant))
171 }
172}
173
174impl<T> Clone for GenericSender<T>
175where
176 T: Serialize,
177{
178 fn clone(&self) -> Self {
179 match self.0 {
180 GenericSenderVariants::Ipc(ref chan) => {
181 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
182 },
183 GenericSenderVariants::Crossbeam(ref chan) => {
184 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
185 },
186 }
187 }
188}
189
190impl<T: Serialize> fmt::Debug for GenericSender<T> {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 write!(f, "Sender(..)")
193 }
194}
195
196impl<T: Serialize> GenericSender<T> {
197 #[inline]
198 pub fn send(&self, msg: T) -> SendResult {
199 match self.0 {
200 GenericSenderVariants::Ipc(ref sender) => sender
201 .send(msg)
202 .map_err(|e| SendError::SerializationError(format!("{e}"))),
203 GenericSenderVariants::Crossbeam(ref sender) => {
204 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
205 },
206 }
207 }
208}
209
210impl<T: Serialize> MallocSizeOf for GenericSender<T> {
211 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
212 match &self.0 {
213 GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
214 GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
215 }
216 }
217}
218
219#[derive(Debug)]
220pub enum SendError {
221 Disconnected,
222 SerializationError(String),
223}
224
225impl From<IpcError> for SendError {
226 fn from(value: IpcError) -> Self {
227 match value {
228 IpcError::SerializationError(ser_de_error) => {
229 SendError::SerializationError(ser_de_error.to_string())
230 },
231 IpcError::Io(error) => {
232 log::error!("IO Error in ipc {:?}", error);
233 SendError::Disconnected
234 },
235 IpcError::Disconnected => SendError::Disconnected,
236 }
237 }
238}
239
240impl Display for SendError {
241 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
242 write!(f, "{self:?}")
243 }
244}
245
246pub type SendResult = Result<(), SendError>;
247
248#[derive(Debug)]
249pub enum ReceiveError {
250 DeserializationFailed(String),
251 Io(std::io::Error),
253 Disconnected,
255}
256
257impl From<IpcError> for ReceiveError {
258 fn from(e: IpcError) -> Self {
259 match e {
260 IpcError::Disconnected => ReceiveError::Disconnected,
261 IpcError::Io(reason) => ReceiveError::Io(reason),
262 IpcError::SerializationError(ser_de_error) => {
263 ReceiveError::DeserializationFailed(ser_de_error.to_string())
264 },
265 }
266 }
267}
268
269impl From<crossbeam_channel::RecvError> for ReceiveError {
270 fn from(_: crossbeam_channel::RecvError) -> Self {
271 ReceiveError::Disconnected
272 }
273}
274
275impl fmt::Display for ReceiveError {
276 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
277 match *self {
278 ReceiveError::DeserializationFailed(ref error) => {
279 write!(fmt, "deserialization error: {error}")
280 },
281 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
282 ReceiveError::Disconnected => write!(fmt, "disconnected"),
283 }
284 }
285}
286impl From<std::io::Error> for ReceiveError {
287 fn from(value: std::io::Error) -> Self {
288 ReceiveError::Io(value)
289 }
290}
291
292pub enum TryReceiveError {
293 Empty,
294 ReceiveError(ReceiveError),
295}
296
297impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
298 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
299 match value {
300 RecvTimeoutError::Timeout => TryReceiveError::Empty,
301 RecvTimeoutError::Disconnected => {
302 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
303 },
304 }
305 }
306}
307
308impl From<ipc_channel::TryRecvError> for TryReceiveError {
309 fn from(e: ipc_channel::TryRecvError) -> Self {
310 match e {
311 ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
312 ipc_channel::TryRecvError::IpcError(inner) => {
313 TryReceiveError::ReceiveError(inner.into())
314 },
315 }
316 }
317}
318
319impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
320 fn from(e: crossbeam_channel::TryRecvError) -> Self {
321 match e {
322 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
323 crossbeam_channel::TryRecvError::Disconnected => {
324 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
325 },
326 }
327 }
328}
329
330pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
331pub type ReceiveResult<T> = Result<T, ReceiveError>;
332pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
333pub type RoutedReceiverReceiveResult<T> =
334 Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
335
336pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
337 match receive_result {
338 Ok(Ok(msg)) => Ok(msg),
339 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
340 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
341 }
342}
343
344#[derive(MallocSizeOf)]
345pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
346where
347 T: for<'de> Deserialize<'de> + Serialize;
348
349impl<T> std::fmt::Debug for GenericReceiver<T>
350where
351 T: for<'de> Deserialize<'de> + Serialize,
352{
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.debug_tuple("GenericReceiver").finish()
355 }
356}
357
358#[derive(MallocSizeOf)]
359enum GenericReceiverVariants<T>
360where
361 T: for<'de> Deserialize<'de> + Serialize,
362{
363 Ipc(ipc_channel::ipc::IpcReceiver<T>),
364 Crossbeam(RoutedReceiver<T>),
365}
366
367impl<T> GenericReceiver<T>
368where
369 T: for<'de> Deserialize<'de> + Serialize,
370{
371 #[inline]
372 pub fn recv(&self) -> ReceiveResult<T> {
373 match self.0 {
374 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
375 GenericReceiverVariants::Crossbeam(ref receiver) => {
376 let msg = receiver.recv()?;
378 Ok(msg.expect("Infallible"))
381 },
382 }
383 }
384
385 #[inline]
386 pub fn try_recv(&self) -> TryReceiveResult<T> {
387 match self.0 {
388 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
389 GenericReceiverVariants::Crossbeam(ref receiver) => {
390 let msg = receiver.try_recv()?;
391 Ok(msg.expect("Infallible"))
392 },
393 }
394 }
395
396 #[inline]
398 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
399 match self.0 {
400 GenericReceiverVariants::Ipc(ref ipc_receiver) => {
401 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
402 },
403 GenericReceiverVariants::Crossbeam(ref receiver) => {
404 match receiver.recv_timeout(timeout) {
405 Ok(Ok(value)) => Ok(value),
406 Ok(Err(_)) => unreachable!("Infallable"),
407 Err(RecvTimeoutError::Disconnected) => {
408 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
409 },
410 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
411 }
412 },
413 }
414 }
415
416 #[inline]
421 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
422 where
423 T: Send + 'static,
424 {
425 match self.0 {
426 GenericReceiverVariants::Ipc(ipc_receiver) => {
427 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
428 let crossbeam_sender_clone = crossbeam_sender;
429 ROUTER.add_typed_route(
430 ipc_receiver,
431 Box::new(move |message| {
432 let _ = crossbeam_sender_clone
433 .send(message.map_err(IpcError::SerializationError));
434 }),
435 );
436 crossbeam_receiver
437 },
438 GenericReceiverVariants::Crossbeam(receiver) => receiver,
439 }
440 }
441}
442
443impl<T> Serialize for GenericReceiver<T>
444where
445 T: for<'de> Deserialize<'de> + Serialize,
446{
447 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
448 match &self.0 {
449 GenericReceiverVariants::Ipc(receiver) => {
450 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
451 },
452 GenericReceiverVariants::Crossbeam(receiver) => {
453 if opts::get().multiprocess {
454 return Err(serde::ser::Error::custom(
455 "Crossbeam channel found in multiprocess mode!",
456 ));
457 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
460 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
461 },
462 }
463 }
464}
465
466struct GenericReceiverVisitor<T> {
467 marker: PhantomData<T>,
468}
469impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
470where
471 T: for<'a> Deserialize<'a> + Serialize,
472{
473 type Value = GenericReceiver<T>;
474
475 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
476 formatter.write_str("a GenericReceiver variant")
477 }
478
479 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
480 where
481 A: serde::de::EnumAccess<'de>,
482 {
483 #[derive(Deserialize)]
484 enum GenericReceiverVariantNames {
485 Ipc,
486 Crossbeam,
487 }
488
489 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
490
491 match variant_name {
492 GenericReceiverVariantNames::Ipc => variant_data
493 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
494 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
495 GenericReceiverVariantNames::Crossbeam => {
496 if use_ipc() {
497 return Err(serde::de::Error::custom(
498 "Crossbeam channel found in multiprocess mode!",
499 ));
500 }
501 let addr = variant_data.newtype_variant::<usize>()?;
502 let ptr = addr as *mut RoutedReceiver<T>;
503 #[expect(unsafe_code)]
506 let receiver = unsafe { Box::from_raw(ptr) };
507 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
508 *receiver,
509 )))
510 },
511 }
512 }
513}
514
515impl<'a, T> Deserialize<'a> for GenericReceiver<T>
516where
517 T: for<'de> Deserialize<'de> + Serialize,
518{
519 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
520 where
521 D: Deserializer<'a>,
522 {
523 d.deserialize_enum(
524 "GenericReceiver",
525 &["Ipc", "Crossbeam"],
526 GenericReceiverVisitor {
527 marker: PhantomData,
528 },
529 )
530 }
531}
532
533fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
537where
538 T: Serialize + for<'de> serde::Deserialize<'de>,
539{
540 let (tx, rx) = crossbeam_channel::unbounded();
541 (
542 GenericSender(GenericSenderVariants::Crossbeam(tx)),
543 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
544 )
545}
546
547fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
548where
549 T: Serialize + for<'de> serde::Deserialize<'de>,
550{
551 ipc_channel::ipc::channel().map(|(tx, rx)| {
552 (
553 GenericSender(GenericSenderVariants::Ipc(tx)),
554 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
555 )
556 })
557}
558
559pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
563where
564 T: for<'de> Deserialize<'de> + Serialize,
565{
566 if use_ipc() {
567 new_generic_channel_ipc().ok()
568 } else {
569 Some(new_generic_channel_crossbeam())
570 }
571}
572
573#[cfg(test)]
574mod single_process_channel_tests {
575 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
580
581 #[test]
582 fn generic_crossbeam_can_send() {
583 let (tx, rx) = new_generic_channel_crossbeam();
584 tx.send(5).expect("Send failed");
585 let val = rx.recv().expect("Receive failed");
586 assert_eq!(val, 5);
587 }
588
589 #[test]
590 fn generic_crossbeam_ping_pong() {
591 let (tx, rx) = new_generic_channel_crossbeam();
592 let (tx2, rx2) = new_generic_channel_crossbeam();
593
594 tx.send(tx2).expect("Send failed");
595
596 std::thread::scope(|s| {
597 s.spawn(move || {
598 let reply_sender = rx.recv().expect("Receive failed");
599 reply_sender.send(42).expect("Sending reply failed");
600 });
601 });
602 let res = rx2.recv().expect("Receive of reply failed");
603 assert_eq!(res, 42);
604 }
605
606 #[test]
607 fn generic_ipc_ping_pong() {
608 let (tx, rx) = new_generic_channel_ipc().unwrap();
609 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
610
611 tx.send(tx2).expect("Send failed");
612
613 std::thread::scope(|s| {
614 s.spawn(move || {
615 let reply_sender = rx.recv().expect("Receive failed");
616 reply_sender.send(42).expect("Sending reply failed");
617 });
618 });
619 let res = rx2.recv().expect("Receive of reply failed");
620 assert_eq!(res, 42);
621 }
622
623 #[test]
624 fn send_crossbeam_sender_over_ipc_channel() {
625 let (tx, rx) = new_generic_channel_ipc().unwrap();
626 let (tx2, rx2) = new_generic_channel_crossbeam();
627
628 tx.send(tx2).expect("Send failed");
629
630 std::thread::scope(|s| {
631 s.spawn(move || {
632 let reply_sender = rx.recv().expect("Receive failed");
633 reply_sender.send(42).expect("Sending reply failed");
634 });
635 });
636 let res = rx2.recv().expect("Receive of reply failed");
637 assert_eq!(res, 42);
638 }
639
640 #[test]
641 fn send_generic_ipc_channel_over_crossbeam() {
642 let (tx, rx) = new_generic_channel_crossbeam();
643 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
644
645 tx.send(tx2).expect("Send failed");
646
647 std::thread::scope(|s| {
648 s.spawn(move || {
649 let reply_sender = rx.recv().expect("Receive failed");
650 reply_sender.send(42).expect("Sending reply failed");
651 });
652 });
653 let res = rx2.recv().expect("Receive of reply failed");
654 assert_eq!(res, 42);
655 }
656
657 #[test]
658 fn send_crossbeam_receiver_over_ipc_channel() {
659 let (tx, rx) = new_generic_channel_ipc().unwrap();
660 let (tx2, rx2) = new_generic_channel_crossbeam();
661
662 tx.send(rx2).expect("Send failed");
663 tx2.send(42).expect("Send failed");
664
665 std::thread::scope(|s| {
666 s.spawn(move || {
667 let another_receiver = rx.recv().expect("Receive failed");
668 let res = another_receiver.recv().expect("Receive failed");
669 assert_eq!(res, 42);
670 });
671 });
672 }
673
674 #[test]
675 fn test_timeout_ipc() {
676 let (tx, rx) = new_generic_channel_ipc().unwrap();
677 let timeout_duration = std::time::Duration::from_secs(3);
678 std::thread::spawn(move || {
679 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
680 assert!(tx.send(()).is_ok());
681 });
682 let received = rx.try_recv_timeout(timeout_duration);
683 assert!(received.is_ok());
684 }
685
686 #[test]
687 fn test_timeout_crossbeam() {
688 let (tx, rx) = new_generic_channel_crossbeam();
689 let timeout_duration = std::time::Duration::from_secs(3);
690 std::thread::spawn(move || {
691 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
692 assert!(tx.send(()).is_ok());
693 });
694 let received = rx.try_recv_timeout(timeout_duration);
695 assert!(received.is_ok());
696 }
697}
698
699#[cfg(test)]
701mod generic_receiversets_tests {
702 use std::time::Duration;
703
704 use crate::generic_channel::generic_channelset::{
705 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
706 };
707 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
708
709 #[test]
710 fn test_ipc_side1() {
711 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
712 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
713
714 let snd1_c = snd1.clone();
716 let snd2_c = snd2.clone();
717 let mut set = create_ipc_receiver_set();
718 let recv1_select_index = set.add(recv1);
719 let _recv2_select_index = set.add(recv2);
720
721 std::thread::spawn(move || {
722 snd1_c.send(10).unwrap();
723 });
724 std::thread::spawn(move || {
725 std::thread::sleep(Duration::from_secs(1));
726 let _ = snd2_c.send(20); });
728
729 let select_result = set.select();
730 let channel_result = select_result.first().unwrap();
731 assert_eq!(
732 *channel_result,
733 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
734 );
735 }
736
737 #[test]
738 fn test_ipc_side2() {
739 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
740 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
741
742 let snd1_c = snd1.clone();
744 let snd2_c = snd2.clone();
745 let mut set = create_ipc_receiver_set();
746 let _recv1_select_index = set.add(recv1);
747 let recv2_select_index = set.add(recv2);
748
749 std::thread::spawn(move || {
750 std::thread::sleep(Duration::from_secs(1));
751 let _ = snd1_c.send(10);
752 });
753 std::thread::spawn(move || {
754 snd2_c.send(20).unwrap();
755 });
756
757 let select_result = set.select();
758 let channel_result = select_result.first().unwrap();
759 assert_eq!(
760 *channel_result,
761 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
762 );
763 }
764
765 #[test]
766 fn test_crossbeam_side1() {
767 let (snd1, recv1) = new_generic_channel_crossbeam();
768 let (snd2, recv2) = new_generic_channel_crossbeam();
769
770 let snd1_c = snd1.clone();
772 let snd2_c = snd2.clone();
773 let mut set = create_crossbeam_receiver_set();
774 let recv1_select_index = set.add(recv1);
775 let _recv2_select_index = set.add(recv2);
776
777 std::thread::spawn(move || {
778 snd1_c.send(10).unwrap();
779 });
780 std::thread::spawn(move || {
781 std::thread::sleep(Duration::from_secs(2));
782 let _ = snd2_c.send(20);
783 });
784
785 let select_result = set.select();
786 let channel_result = select_result.first().unwrap();
787 assert_eq!(
788 *channel_result,
789 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
790 );
791 }
792
793 #[test]
794 fn test_crossbeam_side2() {
795 let (snd1, recv1) = new_generic_channel_crossbeam();
796 let (snd2, recv2) = new_generic_channel_crossbeam();
797
798 let snd1_c = snd1.clone();
800 let snd2_c = snd2.clone();
801 let mut set = create_crossbeam_receiver_set();
802 let _recv1_select_index = set.add(recv1);
803 let recv2_select_index = set.add(recv2);
804
805 std::thread::spawn(move || {
806 std::thread::sleep(Duration::from_secs(2));
807 let _ = snd1_c.send(10);
808 });
809 std::thread::spawn(move || {
810 snd2_c.send(20).unwrap();
811 });
812
813 let select_result = set.select();
814 let channel_result = select_result.first().unwrap();
815 assert_eq!(
816 *channel_result,
817 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
818 );
819 }
820
821 #[test]
822 fn test_ipc_no_crash_on_disconnect() {
823 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
826 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
827
828 let snd1_c = snd1.clone();
830 let mut set = create_ipc_receiver_set();
831 let _recv1_select_index = set.add(recv1);
832 let recv2_select_index = set.add(recv2);
833
834 std::thread::spawn(move || {
835 std::thread::sleep(Duration::from_secs(2));
836 let _ = snd1_c.send(10);
837 });
838 std::thread::spawn(move || {
839 snd2.send(20).unwrap();
840 });
841 std::thread::sleep(Duration::from_secs(1));
842 let select_result = set.select();
843 let channel_result = select_result.first().unwrap();
844 assert_eq!(
845 *channel_result,
846 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
847 );
848 }
849
850 #[test]
851 fn test_crossbeam_no_crash_on_disconnect() {
852 let (snd1, recv1) = new_generic_channel_crossbeam();
854 let (snd2, recv2) = new_generic_channel_crossbeam();
855
856 let snd1_c = snd1.clone();
858 let mut set = create_crossbeam_receiver_set();
859 let _recv1_select_index = set.add(recv1);
860 let recv2_select_index = set.add(recv2);
861
862 std::thread::spawn(move || {
863 std::thread::sleep(Duration::from_secs(2));
864 let _ = snd1_c.send(10);
865 });
866 std::thread::spawn(move || {
867 snd2.send(20).unwrap();
868 });
869 std::thread::sleep(Duration::from_secs(1));
870 let select_result = set.select();
871 let channel_result = select_result.first().unwrap();
872 assert_eq!(
873 *channel_result,
874 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
875 );
876 }
877
878 #[test]
879 fn test_ipc_disconnect_correct_message() {
880 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
882 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
883
884 let snd1_c = snd1.clone();
886 let mut set = create_ipc_receiver_set();
887 let _recv1_select_index = set.add(recv1);
888 let recv2_select_index = set.add(recv2);
889
890 std::thread::spawn(move || {
891 std::thread::sleep(Duration::from_secs(2));
892 let _ = snd1_c.send(10);
893 });
894 std::thread::spawn(move || {
895 drop(snd2);
896 });
897
898 let select_result = set.select();
899 let channel_result = select_result.first().unwrap();
900 assert_eq!(
901 *channel_result,
902 GenericSelectionResult::ChannelClosed(recv2_select_index)
903 );
904 }
905
906 #[test]
907 fn test_crossbeam_disconnect_correct_messaget() {
908 let (snd1, recv1) = new_generic_channel_crossbeam();
909 let (snd2, recv2) = new_generic_channel_crossbeam();
910
911 let snd1_c = snd1.clone();
913 let mut set = create_crossbeam_receiver_set();
914 let _recv1_select_index = set.add(recv1);
915 let recv2_select_index = set.add(recv2);
916
917 std::thread::spawn(move || {
918 std::thread::sleep(Duration::from_secs(2));
919 let _ = snd1_c.send(10);
920 });
921 std::thread::spawn(move || {
922 drop(snd2);
923 });
924
925 let select_result = set.select();
926 let channel_result = select_result.first().unwrap();
927 assert_eq!(
928 *channel_result,
929 GenericSelectionResult::ChannelClosed(recv2_select_index)
930 );
931 }
932}