1use std::fmt;
8use std::fmt::Display;
9use std::marker::PhantomData;
10
11use ipc_channel::ipc::IpcError;
12use ipc_channel::router::ROUTER;
13use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
14use serde::de::VariantAccess;
15use serde::{Deserialize, Deserializer, Serialize, Serializer};
16use servo_config::opts;
17
18mod callback;
19pub use callback::GenericCallback;
20
21pub trait GenericSend<T>
24where
25 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
26{
27 fn send(&self, _: T) -> SendResult;
29 fn sender(&self) -> GenericSender<T>;
31}
32
33pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
37
38enum GenericSenderVariants<T: Serialize> {
43 Ipc(ipc_channel::ipc::IpcSender<T>),
44 Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
52}
53
54impl<T: Serialize> Serialize for GenericSender<T> {
55 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
56 match &self.0 {
57 GenericSenderVariants::Ipc(sender) => {
58 s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
59 },
60 GenericSenderVariants::Crossbeam(sender) => {
71 if opts::get().multiprocess {
72 return Err(serde::ser::Error::custom(
73 "Crossbeam channel found in multiprocess mode!",
74 ));
75 } let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
78 s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
79 },
80 }
81 }
82}
83
84struct GenericSenderVisitor<T> {
85 marker: PhantomData<T>,
86}
87
88impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
89 type Value = GenericSender<T>;
90
91 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
92 formatter.write_str("a GenericSender variant")
93 }
94
95 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
96 where
97 A: serde::de::EnumAccess<'de>,
98 {
99 #[derive(Deserialize)]
100 enum GenericSenderVariantNames {
101 Ipc,
102 Crossbeam,
103 }
104
105 let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
106
107 match variant_name {
108 GenericSenderVariantNames::Ipc => variant_data
109 .newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
110 .map(|sender| GenericSender(GenericSenderVariants::Ipc(sender))),
111 GenericSenderVariantNames::Crossbeam => {
112 if opts::get().multiprocess {
113 return Err(serde::de::Error::custom(
114 "Crossbeam channel found in multiprocess mode!",
115 ));
116 }
117 let addr = variant_data.newtype_variant::<usize>()?;
118 let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
119 #[allow(unsafe_code)]
122 let sender = unsafe { Box::from_raw(ptr) };
123 Ok(GenericSender(GenericSenderVariants::Crossbeam(*sender)))
124 },
125 }
126 }
127}
128
129impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
130 fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
131 where
132 D: Deserializer<'a>,
133 {
134 d.deserialize_enum(
135 "GenericSender",
136 &["Ipc", "Crossbeam"],
137 GenericSenderVisitor {
138 marker: PhantomData,
139 },
140 )
141 }
142}
143
144impl<T> Clone for GenericSender<T>
145where
146 T: Serialize,
147{
148 fn clone(&self) -> Self {
149 match self.0 {
150 GenericSenderVariants::Ipc(ref chan) => {
151 GenericSender(GenericSenderVariants::Ipc(chan.clone()))
152 },
153 GenericSenderVariants::Crossbeam(ref chan) => {
154 GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
155 },
156 }
157 }
158}
159
160impl<T: Serialize> fmt::Debug for GenericSender<T> {
161 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162 write!(f, "Sender(..)")
163 }
164}
165
166impl<T: Serialize> GenericSender<T> {
167 #[inline]
168 pub fn send(&self, msg: T) -> SendResult {
169 match self.0 {
170 GenericSenderVariants::Ipc(ref sender) => sender
171 .send(msg)
172 .map_err(|e| SendError::SerializationError(format!("{e}"))),
173 GenericSenderVariants::Crossbeam(ref sender) => {
174 sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
175 },
176 }
177 }
178}
179
180impl<T: Serialize> MallocSizeOf for GenericSender<T> {
181 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
182 0
183 }
184}
185
186#[derive(Debug)]
187pub enum SendError {
188 Disconnected,
189 SerializationError(String),
190}
191
192impl Display for SendError {
193 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194 write!(f, "{self:?}")
195 }
196}
197
198pub type SendResult = Result<(), SendError>;
199
200#[derive(Debug)]
201pub enum ReceiveError {
202 DeserializationFailed(String),
203 Io(std::io::Error),
205 Disconnected,
207}
208
209impl From<IpcError> for ReceiveError {
210 fn from(e: IpcError) -> Self {
211 match e {
212 IpcError::Disconnected => ReceiveError::Disconnected,
213 IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
214 IpcError::Io(reason) => ReceiveError::Io(reason),
215 }
216 }
217}
218
219impl From<crossbeam_channel::RecvError> for ReceiveError {
220 fn from(_: crossbeam_channel::RecvError) -> Self {
221 ReceiveError::Disconnected
222 }
223}
224
225pub enum TryReceiveError {
226 Empty,
227 ReceiveError(ReceiveError),
228}
229
230impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
231 fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
232 match e {
233 ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
234 ipc_channel::ipc::TryRecvError::IpcError(inner) => {
235 TryReceiveError::ReceiveError(inner.into())
236 },
237 }
238 }
239}
240
241impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
242 fn from(e: crossbeam_channel::TryRecvError) -> Self {
243 match e {
244 crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
245 crossbeam_channel::TryRecvError::Disconnected => {
246 TryReceiveError::ReceiveError(ReceiveError::Disconnected)
247 },
248 }
249 }
250}
251
252pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
253pub type ReceiveResult<T> = Result<T, ReceiveError>;
254pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
255pub type RoutedReceiverReceiveResult<T> =
256 Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
257
258pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
259 match receive_result {
260 Ok(Ok(msg)) => Ok(msg),
261 Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
262 Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
263 }
264}
265
266pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
267where
268 T: for<'de> Deserialize<'de> + Serialize;
269
270enum GenericReceiverVariants<T>
271where
272 T: for<'de> Deserialize<'de> + Serialize,
273{
274 Ipc(ipc_channel::ipc::IpcReceiver<T>),
275 Crossbeam(RoutedReceiver<T>),
276}
277
278impl<T> GenericReceiver<T>
279where
280 T: for<'de> Deserialize<'de> + Serialize,
281{
282 #[inline]
283 pub fn recv(&self) -> ReceiveResult<T> {
284 match self.0 {
285 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
286 GenericReceiverVariants::Crossbeam(ref receiver) => {
287 let msg = receiver.recv()?;
289 Ok(msg.expect("Infallible"))
292 },
293 }
294 }
295
296 #[inline]
297 pub fn try_recv(&self) -> TryReceiveResult<T> {
298 match self.0 {
299 GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
300 GenericReceiverVariants::Crossbeam(ref receiver) => {
301 let msg = receiver.try_recv()?;
302 Ok(msg.expect("Infallible"))
303 },
304 }
305 }
306
307 #[inline]
312 pub fn route_preserving_errors(self) -> RoutedReceiver<T>
313 where
314 T: Send + 'static,
315 {
316 match self.0 {
317 GenericReceiverVariants::Ipc(ipc_receiver) => {
318 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
319 let crossbeam_sender_clone = crossbeam_sender.clone();
320 ROUTER.add_typed_route(
321 ipc_receiver,
322 Box::new(move |message| {
323 let _ = crossbeam_sender_clone.send(message);
324 }),
325 );
326 crossbeam_receiver
327 },
328 GenericReceiverVariants::Crossbeam(receiver) => receiver,
329 }
330 }
331}
332
333impl<T> Serialize for GenericReceiver<T>
334where
335 T: for<'de> Deserialize<'de> + Serialize,
336{
337 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
338 match &self.0 {
339 GenericReceiverVariants::Ipc(receiver) => {
340 s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
341 },
342 GenericReceiverVariants::Crossbeam(receiver) => {
343 if opts::get().multiprocess {
344 return Err(serde::ser::Error::custom(
345 "Crossbeam channel found in multiprocess mode!",
346 ));
347 } let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
350 s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
351 },
352 }
353 }
354}
355
356struct GenericReceiverVisitor<T> {
357 marker: PhantomData<T>,
358}
359impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
360where
361 T: for<'a> Deserialize<'a> + Serialize,
362{
363 type Value = GenericReceiver<T>;
364
365 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
366 formatter.write_str("a GenericReceiver variant")
367 }
368
369 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
370 where
371 A: serde::de::EnumAccess<'de>,
372 {
373 #[derive(Deserialize)]
374 enum GenericReceiverVariantNames {
375 Ipc,
376 Crossbeam,
377 }
378
379 let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
380
381 match variant_name {
382 GenericReceiverVariantNames::Ipc => variant_data
383 .newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
384 .map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
385 GenericReceiverVariantNames::Crossbeam => {
386 if opts::get().multiprocess {
387 return Err(serde::de::Error::custom(
388 "Crossbeam channel found in multiprocess mode!",
389 ));
390 }
391 let addr = variant_data.newtype_variant::<usize>()?;
392 let ptr = addr as *mut RoutedReceiver<T>;
393 #[allow(unsafe_code)]
396 let receiver = unsafe { Box::from_raw(ptr) };
397 Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
398 *receiver,
399 )))
400 },
401 }
402 }
403}
404
405impl<'a, T> Deserialize<'a> for GenericReceiver<T>
406where
407 T: for<'de> Deserialize<'de> + Serialize,
408{
409 fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
410 where
411 D: Deserializer<'a>,
412 {
413 d.deserialize_enum(
414 "GenericReceiver",
415 &["Ipc", "Crossbeam"],
416 GenericReceiverVisitor {
417 marker: PhantomData,
418 },
419 )
420 }
421}
422
423fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
427where
428 T: Serialize + for<'de> serde::Deserialize<'de>,
429{
430 let (tx, rx) = crossbeam_channel::unbounded();
431 (
432 GenericSender(GenericSenderVariants::Crossbeam(tx)),
433 GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
434 )
435}
436
437fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
438where
439 T: Serialize + for<'de> serde::Deserialize<'de>,
440{
441 ipc_channel::ipc::channel().map(|(tx, rx)| {
442 (
443 GenericSender(GenericSenderVariants::Ipc(tx)),
444 GenericReceiver(GenericReceiverVariants::Ipc(rx)),
445 )
446 })
447}
448
449pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
453where
454 T: for<'de> Deserialize<'de> + Serialize,
455{
456 if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
457 new_generic_channel_ipc().ok()
458 } else {
459 Some(new_generic_channel_crossbeam())
460 }
461}
462
463#[cfg(test)]
464mod single_process_channel_tests {
465 use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
470
471 #[test]
472 fn generic_crossbeam_can_send() {
473 let (tx, rx) = new_generic_channel_crossbeam();
474 tx.send(5).expect("Send failed");
475 let val = rx.recv().expect("Receive failed");
476 assert_eq!(val, 5);
477 }
478
479 #[test]
480 fn generic_crossbeam_ping_pong() {
481 let (tx, rx) = new_generic_channel_crossbeam();
482 let (tx2, rx2) = new_generic_channel_crossbeam();
483
484 tx.send(tx2).expect("Send failed");
485
486 std::thread::scope(|s| {
487 s.spawn(move || {
488 let reply_sender = rx.recv().expect("Receive failed");
489 reply_sender.send(42).expect("Sending reply failed");
490 });
491 });
492 let res = rx2.recv().expect("Receive of reply failed");
493 assert_eq!(res, 42);
494 }
495
496 #[test]
497 fn generic_ipc_ping_pong() {
498 let (tx, rx) = new_generic_channel_ipc().unwrap();
499 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
500
501 tx.send(tx2).expect("Send failed");
502
503 std::thread::scope(|s| {
504 s.spawn(move || {
505 let reply_sender = rx.recv().expect("Receive failed");
506 reply_sender.send(42).expect("Sending reply failed");
507 });
508 });
509 let res = rx2.recv().expect("Receive of reply failed");
510 assert_eq!(res, 42);
511 }
512
513 #[test]
514 fn send_crossbeam_sender_over_ipc_channel() {
515 let (tx, rx) = new_generic_channel_ipc().unwrap();
516 let (tx2, rx2) = new_generic_channel_crossbeam();
517
518 tx.send(tx2).expect("Send failed");
519
520 std::thread::scope(|s| {
521 s.spawn(move || {
522 let reply_sender = rx.recv().expect("Receive failed");
523 reply_sender.send(42).expect("Sending reply failed");
524 });
525 });
526 let res = rx2.recv().expect("Receive of reply failed");
527 assert_eq!(res, 42);
528 }
529
530 #[test]
531 fn send_generic_ipc_channel_over_crossbeam() {
532 let (tx, rx) = new_generic_channel_crossbeam();
533 let (tx2, rx2) = new_generic_channel_ipc().unwrap();
534
535 tx.send(tx2).expect("Send failed");
536
537 std::thread::scope(|s| {
538 s.spawn(move || {
539 let reply_sender = rx.recv().expect("Receive failed");
540 reply_sender.send(42).expect("Sending reply failed");
541 });
542 });
543 let res = rx2.recv().expect("Receive of reply failed");
544 assert_eq!(res, 42);
545 }
546
547 #[test]
548 fn send_crossbeam_receiver_over_ipc_channel() {
549 let (tx, rx) = new_generic_channel_ipc().unwrap();
550 let (tx2, rx2) = new_generic_channel_crossbeam();
551
552 tx.send(rx2).expect("Send failed");
553 tx2.send(42).expect("Send failed");
554
555 std::thread::scope(|s| {
556 s.spawn(move || {
557 let another_receiver = rx.recv().expect("Receive failed");
558 let res = another_receiver.recv().expect("Receive failed");
559 assert_eq!(res, 42);
560 });
561 });
562 }
563}