1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10use std::panic::Location;
11use std::sync::OnceLock;
12use std::time::Duration;
13
14use crossbeam_channel::RecvTimeoutError;
15use ipc_channel::IpcError;
16use ipc_channel::router::ROUTER;
17use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
18use malloc_size_of_derive::MallocSizeOf;
19use serde::de::VariantAccess;
20use serde::{Deserialize, Deserializer, Serialize, Serializer};
21use servo_config::opts;
22
23mod callback;
24pub use callback::GenericCallback;
25mod lazy_callback;
26pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
27mod oneshot;
28mod shared_memory;
29pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
30pub use shared_memory::GenericSharedMemory;
31mod generic_channelset;
32pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
33mod buffered;
34pub use buffered::GenericBufferedSender;
35
36static USE_IPC: OnceLock<bool> = OnceLock::new();
38
39fn use_ipc() -> bool {
41 *USE_IPC.get_or_init(|| {
42 servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
43 })
44}
45
46pub trait GenericSend<T>
49where
50 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
51{
52 fn send(&self, _: T) -> SendResult;
54
55 #[track_caller]
60 fn send_or_warn(&self, message: T) {
61 if let Err(error) = self.send(message) {
62 let location = Location::caller();
63 log::warn!("Failed to send msg due to `{error}` at {location:?}");
64 }
65 }
66
67 fn send_or_ignore(&self, message: T) {
73 let _ = self.send(message);
74 }
75
76 fn sender(&self) -> GenericSender<T>;
78}
79
80pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
84
85enum GenericSenderVariants<T: Serialize> {
90 Ipc(ipc_channel::ipc::IpcSender<T>),
91 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
99}
100
101fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
102 value: &GenericSenderVariants<T>,
103 s: S,
104) -> Result<S::Ok, S::Error> {
105 match value {
106 GenericSenderVariants::Ipc(sender) => {
107 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
108 },
109 GenericSenderVariants::Crossbeam(sender) => {
120 if opts::get().multiprocess {
121 return Err(serde::ser::Error::custom(
122 "Crossbeam channel found in multiprocess mode!",
123 ));
124 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
127 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
128 },
129 }
130}
131
132impl<T: Serialize> Serialize for GenericSender<T> {
133 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
134 serialize_generic_sender_variants(&self.0, s)
135 }
136}
137
138struct GenericSenderVisitor<T> {
139 marker: PhantomData<T>,
140}
141
142impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
143 type Value = GenericSenderVariants<T>;
144
145 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
146 formatter.write_str("a GenericSender variant")
147 }
148
149 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
150 where
151 A: serde::de::EnumAccess<'de>,
152 {
153 #[derive(Deserialize)]
154 enum GenericSenderVariantNames {
155 Ipc,
156 Crossbeam,
157 }
158
159 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
160
161 match variant_name {
162 GenericSenderVariantNames::Ipc => variant_data
163 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
164 .map(|sender| GenericSenderVariants::Ipc(sender)),
165 GenericSenderVariantNames::Crossbeam => {
166 if opts::get().multiprocess {
167 return Err(serde::de::Error::custom(
168 "Crossbeam channel found in multiprocess mode!",
169 ));
170 }
171 let addr = variant_data.newtype_variant::<usize>()?;
172 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
173 #[expect(unsafe_code)]
176 let sender = unsafe { Box::from_raw(ptr) };
177 Ok(GenericSenderVariants::Crossbeam(*sender))
178 },
179 }
180 }
181}
182
183impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
184 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
185 where
186 D: Deserializer<'a>,
187 {
188 d.deserialize_enum(
189 "GenericSender",
190 &["Ipc", "Crossbeam"],
191 GenericSenderVisitor {
192 marker: PhantomData,
193 },
194 )
195 .map(|variant| GenericSender(variant))
196 }
197}
198
199impl<T> Clone for GenericSender<T>
200where
201 T: Serialize,
202{
203 fn clone(&self) -> Self {
204 match &self.0 {
205 GenericSenderVariants::Ipc(chan) => {
206 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
207 },
208 GenericSenderVariants::Crossbeam(chan) => {
209 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
210 },
211 }
212 }
213}
214
215impl<T: Serialize> fmt::Debug for GenericSender<T> {
216 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217 write!(f, "Sender(..)")
218 }
219}
220
221impl<T: Serialize> GenericSender<T> {
222 #[inline]
223 pub fn send(&self, msg: T) -> SendResult {
224 match &self.0 {
225 GenericSenderVariants::Ipc(sender) => sender
226 .send(msg)
227 .map_err(|e| SendError::SerializationError(e.to_string())),
228 GenericSenderVariants::Crossbeam(sender) => {
229 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
230 },
231 }
232 }
233
234 #[inline]
239 pub fn send_or_warn(&self, msg: T) {
240 if let Err(error) = self.send(msg) {
241 let location = Location::caller();
242 log::warn!("Failed to send msg due to `{error}` at {location:?}");
243 }
244 }
245
246 #[inline]
252 pub fn send_or_ignore(&self, msg: T) {
253 let _ = self.send(msg);
254 }
255}
256
257impl<T: Serialize> MallocSizeOf for GenericSender<T> {
258 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
259 match &self.0 {
260 GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
261 GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
262 }
263 }
264}
265
266#[derive(Debug)]
267pub enum SendError {
268 Disconnected,
269 SerializationError(String),
270}
271
272impl From<IpcError> for SendError {
273 fn from(value: IpcError) -> Self {
274 match value {
275 IpcError::SerializationError(ser_de_error) => {
276 SendError::SerializationError(ser_de_error.to_string())
277 },
278 IpcError::Io(error) => {
279 log::error!("IO Error in ipc {:?}", error);
280 SendError::Disconnected
281 },
282 IpcError::Disconnected => SendError::Disconnected,
283 }
284 }
285}
286
287impl Display for SendError {
288 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289 write!(f, "{self:?}")
290 }
291}
292
293pub type SendResult = Result<(), SendError>;
294
295#[derive(Debug)]
296pub enum ReceiveError {
297 DeserializationFailed(String),
298 Io(std::io::Error),
300 Disconnected,
302}
303
304impl From<IpcError> for ReceiveError {
305 fn from(e: IpcError) -> Self {
306 match e {
307 IpcError::Disconnected => ReceiveError::Disconnected,
308 IpcError::Io(reason) => ReceiveError::Io(reason),
309 IpcError::SerializationError(ser_de_error) => {
310 ReceiveError::DeserializationFailed(ser_de_error.to_string())
311 },
312 }
313 }
314}
315
316impl From<crossbeam_channel::RecvError> for ReceiveError {
317 fn from(_: crossbeam_channel::RecvError) -> Self {
318 ReceiveError::Disconnected
319 }
320}
321
322impl fmt::Display for ReceiveError {
323 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
324 match *self {
325 ReceiveError::DeserializationFailed(ref error) => {
326 write!(fmt, "deserialization error: {error}")
327 },
328 ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
329 ReceiveError::Disconnected => write!(fmt, "disconnected"),
330 }
331 }
332}
333impl From<std::io::Error> for ReceiveError {
334 fn from(value: std::io::Error) -> Self {
335 ReceiveError::Io(value)
336 }
337}
338
339pub enum TryReceiveError {
340 Empty,
341 ReceiveError(ReceiveError),
342}
343
344impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
345 fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
346 match value {
347 RecvTimeoutError::Timeout => TryReceiveError::Empty,
348 RecvTimeoutError::Disconnected => {
349 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
350 },
351 }
352 }
353}
354
355impl From<ipc_channel::TryRecvError> for TryReceiveError {
356 fn from(e: ipc_channel::TryRecvError) -> Self {
357 match e {
358 ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
359 ipc_channel::TryRecvError::IpcError(inner) => {
360 TryReceiveError::ReceiveError(inner.into())
361 },
362 }
363 }
364}
365
366impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
367 fn from(e: crossbeam_channel::TryRecvError) -> Self {
368 match e {
369 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
370 crossbeam_channel::TryRecvError::Disconnected => {
371 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
372 },
373 }
374 }
375}
376
377pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
378pub type ReceiveResult<T> = Result<T, ReceiveError>;
379pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
380pub type RoutedReceiverReceiveResult<T> =
381 Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
382
383pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
384 match receive_result {
385 Ok(Ok(msg)) => Ok(msg),
386 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
387 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
388 }
389}
390
391#[derive(MallocSizeOf)]
392pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
393where
394 T: for<'de> Deserialize<'de> + Serialize;
395
396impl<T> std::fmt::Debug for GenericReceiver<T>
397where
398 T: for<'de> Deserialize<'de> + Serialize,
399{
400 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401 f.debug_tuple("GenericReceiver").finish()
402 }
403}
404
405#[derive(MallocSizeOf)]
406enum GenericReceiverVariants<T>
407where
408 T: for<'de> Deserialize<'de> + Serialize,
409{
410 Ipc(ipc_channel::ipc::IpcReceiver<T>),
411 Crossbeam(RoutedReceiver<T>),
412}
413
414impl<T> GenericReceiver<T>
415where
416 T: for<'de> Deserialize<'de> + Serialize,
417{
418 #[inline]
419 pub fn recv(&self) -> ReceiveResult<T> {
420 match &self.0 {
421 GenericReceiverVariants::Ipc(receiver) => Ok(receiver.recv()?),
422 GenericReceiverVariants::Crossbeam(receiver) => {
423 let msg = receiver.recv()?;
425 Ok(msg.expect("Infallible"))
428 },
429 }
430 }
431
432 #[inline]
433 pub fn try_recv(&self) -> TryReceiveResult<T> {
434 match &self.0 {
435 GenericReceiverVariants::Ipc(receiver) => Ok(receiver.try_recv()?),
436 GenericReceiverVariants::Crossbeam(receiver) => {
437 let msg = receiver.try_recv()?;
438 Ok(msg.expect("Infallible"))
439 },
440 }
441 }
442
443 #[inline]
445 pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
446 match &self.0 {
447 GenericReceiverVariants::Ipc(ipc_receiver) => {
448 ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
449 },
450 GenericReceiverVariants::Crossbeam(receiver) => match receiver.recv_timeout(timeout) {
451 Ok(Ok(value)) => Ok(value),
452 Ok(Err(_)) => unreachable!("Infallable"),
453 Err(RecvTimeoutError::Disconnected) => {
454 Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
455 },
456 Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
457 },
458 }
459 }
460
461 #[inline]
466 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
467 where
468 T: Send + 'static,
469 {
470 match self.0 {
471 GenericReceiverVariants::Ipc(ipc_receiver) => {
472 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
473 let crossbeam_sender_clone = crossbeam_sender;
474 ROUTER.add_typed_route(
475 ipc_receiver,
476 Box::new(move |message| {
477 let _ = crossbeam_sender_clone
478 .send(message.map_err(IpcError::SerializationError));
479 }),
480 );
481 crossbeam_receiver
482 },
483 GenericReceiverVariants::Crossbeam(receiver) => receiver,
484 }
485 }
486}
487
488impl<T> Serialize for GenericReceiver<T>
489where
490 T: for<'de> Deserialize<'de> + Serialize,
491{
492 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
493 match &self.0 {
494 GenericReceiverVariants::Ipc(receiver) => {
495 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
496 },
497 GenericReceiverVariants::Crossbeam(receiver) => {
498 if opts::get().multiprocess {
499 return Err(serde::ser::Error::custom(
500 "Crossbeam channel found in multiprocess mode!",
501 ));
502 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
505 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
506 },
507 }
508 }
509}
510
511struct GenericReceiverVisitor<T> {
512 marker: PhantomData<T>,
513}
514impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
515where
516 T: for<'a> Deserialize<'a> + Serialize,
517{
518 type Value = GenericReceiver<T>;
519
520 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
521 formatter.write_str("a GenericReceiver variant")
522 }
523
524 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
525 where
526 A: serde::de::EnumAccess<'de>,
527 {
528 #[derive(Deserialize)]
529 enum GenericReceiverVariantNames {
530 Ipc,
531 Crossbeam,
532 }
533
534 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
535
536 match variant_name {
537 GenericReceiverVariantNames::Ipc => variant_data
538 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
539 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
540 GenericReceiverVariantNames::Crossbeam => {
541 if use_ipc() {
542 return Err(serde::de::Error::custom(
543 "Crossbeam channel found in multiprocess mode!",
544 ));
545 }
546 let addr = variant_data.newtype_variant::<usize>()?;
547 let ptr = addr as *mut RoutedReceiver<T>;
548 #[expect(unsafe_code)]
551 let receiver = unsafe { Box::from_raw(ptr) };
552 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
553 *receiver,
554 )))
555 },
556 }
557 }
558}
559
560impl<'a, T> Deserialize<'a> for GenericReceiver<T>
561where
562 T: for<'de> Deserialize<'de> + Serialize,
563{
564 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
565 where
566 D: Deserializer<'a>,
567 {
568 d.deserialize_enum(
569 "GenericReceiver",
570 &["Ipc", "Crossbeam"],
571 GenericReceiverVisitor {
572 marker: PhantomData,
573 },
574 )
575 }
576}
577
578fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
582where
583 T: Serialize + for<'de> serde::Deserialize<'de>,
584{
585 let (tx, rx) = crossbeam_channel::unbounded();
586 (
587 GenericSender(GenericSenderVariants::Crossbeam(tx)),
588 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
589 )
590}
591
592fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
593where
594 T: Serialize + for<'de> serde::Deserialize<'de>,
595{
596 ipc_channel::ipc::channel().map(|(tx, rx)| {
597 (
598 GenericSender(GenericSenderVariants::Ipc(tx)),
599 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
600 )
601 })
602}
603
604pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
608where
609 T: for<'de> Deserialize<'de> + Serialize,
610{
611 if use_ipc() {
612 new_generic_channel_ipc().ok()
613 } else {
614 Some(new_generic_channel_crossbeam())
615 }
616}
617
618#[cfg(test)]
619mod single_process_channel_tests {
620 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
625
626 #[test]
627 fn generic_crossbeam_can_send() {
628 let (tx, rx) = new_generic_channel_crossbeam();
629 tx.send(5).expect("Send failed");
630 let val = rx.recv().expect("Receive failed");
631 assert_eq!(val, 5);
632 }
633
634 #[test]
635 fn generic_crossbeam_ping_pong() {
636 let (tx, rx) = new_generic_channel_crossbeam();
637 let (tx2, rx2) = new_generic_channel_crossbeam();
638
639 tx.send(tx2).expect("Send failed");
640
641 std::thread::scope(|s| {
642 s.spawn(move || {
643 let reply_sender = rx.recv().expect("Receive failed");
644 reply_sender.send(42).expect("Sending reply failed");
645 });
646 });
647 let res = rx2.recv().expect("Receive of reply failed");
648 assert_eq!(res, 42);
649 }
650
651 #[test]
652 fn generic_ipc_ping_pong() {
653 let (tx, rx) = new_generic_channel_ipc().unwrap();
654 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
655
656 tx.send(tx2).expect("Send failed");
657
658 std::thread::scope(|s| {
659 s.spawn(move || {
660 let reply_sender = rx.recv().expect("Receive failed");
661 reply_sender.send(42).expect("Sending reply failed");
662 });
663 });
664 let res = rx2.recv().expect("Receive of reply failed");
665 assert_eq!(res, 42);
666 }
667
668 #[test]
669 fn send_crossbeam_sender_over_ipc_channel() {
670 let (tx, rx) = new_generic_channel_ipc().unwrap();
671 let (tx2, rx2) = new_generic_channel_crossbeam();
672
673 tx.send(tx2).expect("Send failed");
674
675 std::thread::scope(|s| {
676 s.spawn(move || {
677 let reply_sender = rx.recv().expect("Receive failed");
678 reply_sender.send(42).expect("Sending reply failed");
679 });
680 });
681 let res = rx2.recv().expect("Receive of reply failed");
682 assert_eq!(res, 42);
683 }
684
685 #[test]
686 fn send_generic_ipc_channel_over_crossbeam() {
687 let (tx, rx) = new_generic_channel_crossbeam();
688 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
689
690 tx.send(tx2).expect("Send failed");
691
692 std::thread::scope(|s| {
693 s.spawn(move || {
694 let reply_sender = rx.recv().expect("Receive failed");
695 reply_sender.send(42).expect("Sending reply failed");
696 });
697 });
698 let res = rx2.recv().expect("Receive of reply failed");
699 assert_eq!(res, 42);
700 }
701
702 #[test]
703 fn send_crossbeam_receiver_over_ipc_channel() {
704 let (tx, rx) = new_generic_channel_ipc().unwrap();
705 let (tx2, rx2) = new_generic_channel_crossbeam();
706
707 tx.send(rx2).expect("Send failed");
708 tx2.send(42).expect("Send failed");
709
710 std::thread::scope(|s| {
711 s.spawn(move || {
712 let another_receiver = rx.recv().expect("Receive failed");
713 let res = another_receiver.recv().expect("Receive failed");
714 assert_eq!(res, 42);
715 });
716 });
717 }
718
719 #[test]
720 fn test_timeout_ipc() {
721 let (tx, rx) = new_generic_channel_ipc().unwrap();
722 let timeout_duration = std::time::Duration::from_secs(3);
723 std::thread::spawn(move || {
724 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
725 assert!(tx.send(()).is_ok());
726 });
727 let received = rx.try_recv_timeout(timeout_duration);
728 assert!(received.is_ok());
729 }
730
731 #[test]
732 fn test_timeout_crossbeam() {
733 let (tx, rx) = new_generic_channel_crossbeam();
734 let timeout_duration = std::time::Duration::from_secs(3);
735 std::thread::spawn(move || {
736 std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
737 assert!(tx.send(()).is_ok());
738 });
739 let received = rx.try_recv_timeout(timeout_duration);
740 assert!(received.is_ok());
741 }
742}
743
744#[cfg(test)]
746mod generic_receiversets_tests {
747 use std::time::Duration;
748
749 use crate::generic_channel::generic_channelset::{
750 GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
751 };
752 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
753
754 #[test]
755 fn test_ipc_side1() {
756 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
757 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
758
759 let snd1_c = snd1.clone();
761 let snd2_c = snd2.clone();
762 let mut set = create_ipc_receiver_set();
763 let recv1_select_index = set.add(recv1);
764 let _recv2_select_index = set.add(recv2);
765
766 std::thread::spawn(move || {
767 snd1_c.send(10).unwrap();
768 });
769 std::thread::spawn(move || {
770 std::thread::sleep(Duration::from_secs(1));
771 let _ = snd2_c.send(20); });
773
774 let select_result = set.select();
775 let channel_result = select_result.first().unwrap();
776 assert_eq!(
777 *channel_result,
778 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
779 );
780 }
781
782 #[test]
783 fn test_ipc_side2() {
784 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
785 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
786
787 let snd1_c = snd1.clone();
789 let snd2_c = snd2.clone();
790 let mut set = create_ipc_receiver_set();
791 let _recv1_select_index = set.add(recv1);
792 let recv2_select_index = set.add(recv2);
793
794 std::thread::spawn(move || {
795 std::thread::sleep(Duration::from_secs(1));
796 let _ = snd1_c.send(10);
797 });
798 std::thread::spawn(move || {
799 snd2_c.send(20).unwrap();
800 });
801
802 let select_result = set.select();
803 let channel_result = select_result.first().unwrap();
804 assert_eq!(
805 *channel_result,
806 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
807 );
808 }
809
810 #[test]
811 fn test_crossbeam_side1() {
812 let (snd1, recv1) = new_generic_channel_crossbeam();
813 let (snd2, recv2) = new_generic_channel_crossbeam();
814
815 let snd1_c = snd1.clone();
817 let snd2_c = snd2.clone();
818 let mut set = create_crossbeam_receiver_set();
819 let recv1_select_index = set.add(recv1);
820 let _recv2_select_index = set.add(recv2);
821
822 std::thread::spawn(move || {
823 snd1_c.send(10).unwrap();
824 });
825 std::thread::spawn(move || {
826 std::thread::sleep(Duration::from_secs(2));
827 let _ = snd2_c.send(20);
828 });
829
830 let select_result = set.select();
831 let channel_result = select_result.first().unwrap();
832 assert_eq!(
833 *channel_result,
834 GenericSelectionResult::MessageReceived(recv1_select_index, 10)
835 );
836 }
837
838 #[test]
839 fn test_crossbeam_side2() {
840 let (snd1, recv1) = new_generic_channel_crossbeam();
841 let (snd2, recv2) = new_generic_channel_crossbeam();
842
843 let snd1_c = snd1.clone();
845 let snd2_c = snd2.clone();
846 let mut set = create_crossbeam_receiver_set();
847 let _recv1_select_index = set.add(recv1);
848 let recv2_select_index = set.add(recv2);
849
850 std::thread::spawn(move || {
851 std::thread::sleep(Duration::from_secs(2));
852 let _ = snd1_c.send(10);
853 });
854 std::thread::spawn(move || {
855 snd2_c.send(20).unwrap();
856 });
857
858 let select_result = set.select();
859 let channel_result = select_result.first().unwrap();
860 assert_eq!(
861 *channel_result,
862 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
863 );
864 }
865
866 #[test]
867 fn test_ipc_no_crash_on_disconnect() {
868 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
871 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
872
873 let snd1_c = snd1.clone();
875 let mut set = create_ipc_receiver_set();
876 let _recv1_select_index = set.add(recv1);
877 let recv2_select_index = set.add(recv2);
878
879 std::thread::spawn(move || {
880 std::thread::sleep(Duration::from_secs(2));
881 let _ = snd1_c.send(10);
882 });
883 std::thread::spawn(move || {
884 snd2.send(20).unwrap();
885 });
886 std::thread::sleep(Duration::from_secs(1));
887 let select_result = set.select();
888 let channel_result = select_result.first().unwrap();
889 assert_eq!(
890 *channel_result,
891 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
892 );
893 }
894
895 #[test]
896 fn test_crossbeam_no_crash_on_disconnect() {
897 let (snd1, recv1) = new_generic_channel_crossbeam();
899 let (snd2, recv2) = new_generic_channel_crossbeam();
900
901 let snd1_c = snd1.clone();
903 let mut set = create_crossbeam_receiver_set();
904 let _recv1_select_index = set.add(recv1);
905 let recv2_select_index = set.add(recv2);
906
907 std::thread::spawn(move || {
908 std::thread::sleep(Duration::from_secs(2));
909 let _ = snd1_c.send(10);
910 });
911 std::thread::spawn(move || {
912 snd2.send(20).unwrap();
913 });
914 std::thread::sleep(Duration::from_secs(1));
915 let select_result = set.select();
916 let channel_result = select_result.first().unwrap();
917 assert_eq!(
918 *channel_result,
919 GenericSelectionResult::MessageReceived(recv2_select_index, 20)
920 );
921 }
922
923 #[test]
924 fn test_ipc_disconnect_correct_message() {
925 let (snd1, recv1) = new_generic_channel_ipc().unwrap();
927 let (snd2, recv2) = new_generic_channel_ipc().unwrap();
928
929 let snd1_c = snd1.clone();
931 let mut set = create_ipc_receiver_set();
932 let _recv1_select_index = set.add(recv1);
933 let recv2_select_index = set.add(recv2);
934
935 std::thread::spawn(move || {
936 std::thread::sleep(Duration::from_secs(2));
937 let _ = snd1_c.send(10);
938 });
939 std::thread::spawn(move || {
940 drop(snd2);
941 });
942
943 let select_result = set.select();
944 let channel_result = select_result.first().unwrap();
945 assert_eq!(
946 *channel_result,
947 GenericSelectionResult::ChannelClosed(recv2_select_index)
948 );
949 }
950
951 #[test]
952 fn test_crossbeam_disconnect_correct_messaget() {
953 let (snd1, recv1) = new_generic_channel_crossbeam();
954 let (snd2, recv2) = new_generic_channel_crossbeam();
955
956 let snd1_c = snd1.clone();
958 let mut set = create_crossbeam_receiver_set();
959 let _recv1_select_index = set.add(recv1);
960 let recv2_select_index = set.add(recv2);
961
962 std::thread::spawn(move || {
963 std::thread::sleep(Duration::from_secs(2));
964 let _ = snd1_c.send(10);
965 });
966 std::thread::spawn(move || {
967 drop(snd2);
968 });
969
970 let select_result = set.select();
971 let channel_result = select_result.first().unwrap();
972 assert_eq!(
973 *channel_result,
974 GenericSelectionResult::ChannelClosed(recv2_select_index)
975 );
976 }
977}