ipc_channel/
ipc.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender};
11use crate::platform::{
12    OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel,
13};
14
15use bincode;
16use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
17use std::cell::RefCell;
18use std::cmp::min;
19use std::error::Error as StdError;
20use std::fmt::{self, Debug, Formatter};
21use std::io;
22use std::marker::PhantomData;
23use std::mem;
24use std::ops::Deref;
25use std::time::Duration;
26
27thread_local! {
28    static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
29        const { RefCell::new(Vec::new()) };
30
31    static OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION:
32        RefCell<Vec<Option<OsIpcSharedMemory>>> = const { RefCell::new(Vec::new()) };
33
34    static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell<Vec<OsIpcChannel>> = const { RefCell::new(Vec::new()) };
35
36    static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemory>> =
37        const { RefCell::new(Vec::new()) }
38}
39
40#[derive(Debug)]
41pub enum IpcError {
42    Bincode(bincode::Error),
43    Io(io::Error),
44    Disconnected,
45}
46
47impl fmt::Display for IpcError {
48    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
49        match *self {
50            IpcError::Bincode(ref err) => write!(fmt, "bincode error: {err}"),
51            IpcError::Io(ref err) => write!(fmt, "io error: {err}"),
52            IpcError::Disconnected => write!(fmt, "disconnected"),
53        }
54    }
55}
56
57impl StdError for IpcError {
58    fn source(&self) -> Option<&(dyn StdError + 'static)> {
59        match *self {
60            IpcError::Bincode(ref err) => Some(err),
61            IpcError::Io(ref err) => Some(err),
62            IpcError::Disconnected => None,
63        }
64    }
65}
66
67#[derive(Debug)]
68pub enum TryRecvError {
69    IpcError(IpcError),
70    Empty,
71}
72
73impl fmt::Display for TryRecvError {
74    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
75        match *self {
76            TryRecvError::IpcError(ref err) => write!(fmt, "ipc error: {err}"),
77            TryRecvError::Empty => write!(fmt, "empty"),
78        }
79    }
80}
81
82impl StdError for TryRecvError {
83    fn source(&self) -> Option<&(dyn StdError + 'static)> {
84        match *self {
85            TryRecvError::IpcError(ref err) => Some(err),
86            TryRecvError::Empty => None,
87        }
88    }
89}
90
91/// Create a connected [IpcSender] and [IpcReceiver] that
92/// transfer messages of a given type provided by type `T`
93/// or inferred by the types of messages sent by the sender.
94///
95/// Messages sent by the sender will be available to the
96/// receiver even if the sender or receiver has been moved
97/// to a different process. In addition, receivers and senders
98/// may be sent over an existing channel.
99///
100/// # Examples
101///
102/// ```
103/// # use ipc_channel::ipc;
104///
105/// let payload = "Hello, World!".to_owned();
106///
107/// // Create a channel
108/// let (tx, rx) = ipc::channel().unwrap();
109///
110/// // Send data
111/// tx.send(payload).unwrap();
112///
113/// // Receive the data
114/// let response = rx.recv().unwrap();
115///
116/// assert_eq!(response, "Hello, World!".to_owned());
117/// ```
118///
119/// [IpcSender]: struct.IpcSender.html
120/// [IpcReceiver]: struct.IpcReceiver.html
121pub fn channel<T>() -> Result<(IpcSender<T>, IpcReceiver<T>), io::Error>
122where
123    T: for<'de> Deserialize<'de> + Serialize,
124{
125    let (os_sender, os_receiver) = platform::channel()?;
126    let ipc_receiver = IpcReceiver {
127        os_receiver,
128        phantom: PhantomData,
129    };
130    let ipc_sender = IpcSender {
131        os_sender,
132        phantom: PhantomData,
133    };
134    Ok((ipc_sender, ipc_receiver))
135}
136
137/// Create a connected [IpcBytesSender] and [IpcBytesReceiver].
138///
139/// Note: The [IpcBytesSender] transfers messages of the type `[u8]`
140/// and the [IpcBytesReceiver] receives a `Vec<u8>`. This sender/receiver
141/// type does not serialize/deserialize messages through `serde`, making
142/// it more efficient where applicable.
143///
144/// # Examples
145///
146/// ```
147/// # use ipc_channel::ipc;
148///
149/// let payload = b"'Tis but a scratch!!";
150///
151/// // Create a channel
152/// let (tx, rx) = ipc::bytes_channel().unwrap();
153///
154/// // Send data
155/// tx.send(payload).unwrap();
156///
157/// // Receive the data
158/// let response = rx.recv().unwrap();
159///
160/// assert_eq!(response, payload);
161/// ```
162///
163/// [IpcBytesReceiver]: struct.IpcBytesReceiver.html
164/// [IpcBytesSender]: struct.IpcBytesSender.html
165pub fn bytes_channel() -> Result<(IpcBytesSender, IpcBytesReceiver), io::Error> {
166    let (os_sender, os_receiver) = platform::channel()?;
167    let ipc_bytes_receiver = IpcBytesReceiver { os_receiver };
168    let ipc_bytes_sender = IpcBytesSender { os_sender };
169    Ok((ipc_bytes_sender, ipc_bytes_receiver))
170}
171
172/// Receiving end of a channel using serialized messages.
173///
174/// # Examples
175///
176/// ## Blocking IO
177///
178/// ```
179/// # use ipc_channel::ipc;
180/// #
181/// # let (tx, rx) = ipc::channel().unwrap();
182/// #
183/// # let q = "Answer to the ultimate question of life, the universe, and everything";
184/// #
185/// # tx.send(q.to_owned()).unwrap();
186/// let response = rx.recv().unwrap();
187/// println!("Received data...");
188/// # assert_eq!(response, q);
189/// ```
190///
191/// ## Non-blocking IO
192///
193/// ```
194/// # use ipc_channel::ipc;
195/// #
196/// # let (tx, rx) = ipc::channel().unwrap();
197/// #
198/// # let answer = "42";
199/// #
200/// # tx.send(answer.to_owned()).unwrap();
201/// loop {
202///     match rx.try_recv() {
203///         Ok(res) => {
204///             // Do something interesting with your result
205///             println!("Received data...");
206///             break;
207///         },
208///         Err(_) => {
209///             // Do something else useful while we wait
210///             println!("Still waiting...");
211///         }
212///     }
213/// }
214/// ```
215///
216/// ## Embedding Receivers
217///
218/// ```
219/// # use ipc_channel::ipc;
220/// #
221/// let (tx, rx) = ipc::channel().unwrap();
222/// let (embedded_tx, embedded_rx) = ipc::channel().unwrap();
223/// # let data = [0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x65, 0x64, 0x00];
224/// // Send the IpcReceiver
225/// tx.send(embedded_rx).unwrap();
226/// # embedded_tx.send(data.to_owned()).unwrap();
227/// // Receive the sent IpcReceiver
228/// let received_rx = rx.recv().unwrap();
229/// // Receive any data sent to the received IpcReceiver
230/// let rx_data = received_rx.recv().unwrap();
231/// # assert_eq!(rx_data, data);
232/// ```
233///
234/// # Implementation details
235///
236/// Each [IpcReceiver] is backed by the OS specific implementations of `OsIpcReceiver`.
237///
238/// [IpcReceiver]: struct.IpcReceiver.html
239#[derive(Debug)]
240pub struct IpcReceiver<T> {
241    os_receiver: OsIpcReceiver,
242    phantom: PhantomData<T>,
243}
244
245impl<T> IpcReceiver<T>
246where
247    T: for<'de> Deserialize<'de> + Serialize,
248{
249    /// Blocking receive.
250    pub fn recv(&self) -> Result<T, IpcError> {
251        self.os_receiver.recv()?.to().map_err(IpcError::Bincode)
252    }
253
254    /// Non-blocking receive
255    pub fn try_recv(&self) -> Result<T, TryRecvError> {
256        self.os_receiver
257            .try_recv()?
258            .to()
259            .map_err(IpcError::Bincode)
260            .map_err(TryRecvError::IpcError)
261    }
262
263    /// Blocks for up to the specified duration attempting to receive a message.
264    ///
265    /// This may block for longer than the specified duration if the channel is busy. If your timeout
266    /// exceeds the duration that your operating system can represent in milliseconds, this may
267    /// block forever. At the time of writing, the smallest duration that may trigger this behavior
268    /// is over 24 days.
269    pub fn try_recv_timeout(&self, duration: Duration) -> Result<T, TryRecvError> {
270        self.os_receiver
271            .try_recv_timeout(duration)?
272            .to()
273            .map_err(IpcError::Bincode)
274            .map_err(TryRecvError::IpcError)
275    }
276
277    /// Erase the type of the channel.
278    ///
279    /// Useful for adding routes to a `RouterProxy`.
280    pub fn to_opaque(self) -> OpaqueIpcReceiver {
281        OpaqueIpcReceiver {
282            os_receiver: self.os_receiver,
283        }
284    }
285}
286
287impl<'de, T> Deserialize<'de> for IpcReceiver<T> {
288    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
289    where
290        D: Deserializer<'de>,
291    {
292        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
293        Ok(IpcReceiver {
294            os_receiver,
295            phantom: PhantomData,
296        })
297    }
298}
299
300impl<T> Serialize for IpcReceiver<T> {
301    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
302    where
303        S: Serializer,
304    {
305        serialize_os_ipc_receiver(&self.os_receiver, serializer)
306    }
307}
308
309/// Sending end of a channel using serialized messages.
310///
311///
312/// ## Embedding Senders
313///
314/// ```
315/// # use ipc_channel::ipc;
316/// #
317/// # let (tx, rx) = ipc::channel().unwrap();
318/// # let (embedded_tx, embedded_rx) = ipc::channel().unwrap();
319/// # let data = [0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x65, 0x64, 0x00];
320/// // Send the IpcSender
321/// tx.send(embedded_tx).unwrap();
322/// // Receive the sent IpcSender
323/// let received_tx = rx.recv().unwrap();
324/// // Send data from the received IpcSender
325/// received_tx.send(data.clone()).unwrap();
326/// # let rx_data = embedded_rx.recv().unwrap();
327/// # assert_eq!(rx_data, data);
328/// ```
329#[derive(Debug)]
330pub struct IpcSender<T> {
331    os_sender: OsIpcSender,
332    phantom: PhantomData<T>,
333}
334
335impl<T> Clone for IpcSender<T>
336where
337    T: Serialize,
338{
339    fn clone(&self) -> IpcSender<T> {
340        IpcSender {
341            os_sender: self.os_sender.clone(),
342            phantom: PhantomData,
343        }
344    }
345}
346
347impl<T> IpcSender<T>
348where
349    T: Serialize,
350{
351    /// Create an [IpcSender] connected to a previously defined [IpcOneShotServer].
352    ///
353    /// This function should not be called more than once per [IpcOneShotServer],
354    /// otherwise the behaviour is unpredictable.
355    /// See [issue 378](https://github.com/servo/ipc-channel/issues/378) for details.
356    ///
357    /// [IpcSender]: struct.IpcSender.html
358    /// [IpcOneShotServer]: struct.IpcOneShotServer.html
359    pub fn connect(name: String) -> Result<IpcSender<T>, io::Error> {
360        Ok(IpcSender {
361            os_sender: OsIpcSender::connect(name)?,
362            phantom: PhantomData,
363        })
364    }
365
366    /// Send data across the channel to the receiver.
367    pub fn send(&self, data: T) -> Result<(), bincode::Error> {
368        let mut bytes = Vec::with_capacity(4096);
369        OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
370            OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
371                |os_ipc_shared_memory_regions_for_serialization| {
372                    let old_os_ipc_channels =
373                        mem::take(&mut *os_ipc_channels_for_serialization.borrow_mut());
374                    let old_os_ipc_shared_memory_regions = mem::take(
375                        &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
376                    );
377                    let os_ipc_shared_memory_regions;
378                    let os_ipc_channels;
379                    {
380                        bincode::serialize_into(&mut bytes, &data)?;
381                        os_ipc_channels = mem::replace(
382                            &mut *os_ipc_channels_for_serialization.borrow_mut(),
383                            old_os_ipc_channels,
384                        );
385                        os_ipc_shared_memory_regions = mem::replace(
386                            &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
387                            old_os_ipc_shared_memory_regions,
388                        );
389                    };
390                    Ok(self.os_sender.send(
391                        &bytes[..],
392                        os_ipc_channels,
393                        os_ipc_shared_memory_regions,
394                    )?)
395                },
396            )
397        })
398    }
399
400    pub fn to_opaque(self) -> OpaqueIpcSender {
401        OpaqueIpcSender {
402            os_sender: self.os_sender,
403        }
404    }
405}
406
407impl<'de, T> Deserialize<'de> for IpcSender<T> {
408    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
409    where
410        D: Deserializer<'de>,
411    {
412        let os_sender = deserialize_os_ipc_sender(deserializer)?;
413        Ok(IpcSender {
414            os_sender,
415            phantom: PhantomData,
416        })
417    }
418}
419
420impl<T> Serialize for IpcSender<T> {
421    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
422    where
423        S: Serializer,
424    {
425        serialize_os_ipc_sender(&self.os_sender, serializer)
426    }
427}
428
429/// Collection of [IpcReceiver]s moved into the set; thus creating a common
430/// (and exclusive) endpoint for receiving messages on any of the added
431/// channels.
432///
433/// # Examples
434///
435/// ```
436/// # use ipc_channel::ipc::{self, IpcReceiverSet, IpcSelectionResult};
437/// let data = vec![0x52, 0x75, 0x73, 0x74, 0x00];
438/// let (tx, rx) = ipc::channel().unwrap();
439/// let mut rx_set = IpcReceiverSet::new().unwrap();
440///
441/// // Add the receiver to the receiver set and send the data
442/// // from the sender
443/// let rx_id = rx_set.add(rx).unwrap();
444/// tx.send(data.clone()).unwrap();
445///
446/// // Poll the receiver set for any readable events
447/// for event in rx_set.select().unwrap() {
448///     match event {
449///         IpcSelectionResult::MessageReceived(id, message) => {
450///             let rx_data: Vec<u8> = message.to().unwrap();
451///             assert_eq!(id, rx_id);
452///             assert_eq!(data, rx_data);
453///             println!("Received: {:?} from {}...", data, id);
454///         },
455///         IpcSelectionResult::ChannelClosed(id) => {
456///             assert_eq!(id, rx_id);
457///             println!("No more data from {}...", id);
458///         }
459///     }
460/// }
461/// ```
462/// [IpcReceiver]: struct.IpcReceiver.html
463pub struct IpcReceiverSet {
464    os_receiver_set: OsIpcReceiverSet,
465}
466
467impl IpcReceiverSet {
468    /// Create a new empty [IpcReceiverSet].
469    ///
470    /// Receivers may then be added to the set with the [add]
471    /// method.
472    ///
473    /// [add]: #method.add
474    /// [IpcReceiverSet]: struct.IpcReceiverSet.html
475    pub fn new() -> Result<IpcReceiverSet, io::Error> {
476        Ok(IpcReceiverSet {
477            os_receiver_set: OsIpcReceiverSet::new()?,
478        })
479    }
480
481    /// Add and consume the [IpcReceiver] to the set of receivers to be polled.
482    /// [IpcReceiver]: struct.IpcReceiver.html
483    pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<u64, io::Error>
484    where
485        T: for<'de> Deserialize<'de> + Serialize,
486    {
487        Ok(self.os_receiver_set.add(receiver.os_receiver)?)
488    }
489
490    /// Add an [OpaqueIpcReceiver] to the set of receivers to be polled.
491    /// [OpaqueIpcReceiver]: struct.OpaqueIpcReceiver.html
492    pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<u64, io::Error> {
493        Ok(self.os_receiver_set.add(receiver.os_receiver)?)
494    }
495
496    /// Wait for IPC messages received on any of the receivers in the set. The
497    /// method will return multiple events. An event may be either a message
498    /// received or a channel closed event.
499    ///
500    /// [IpcReceiver]: struct.IpcReceiver.html
501    pub fn select(&mut self) -> Result<Vec<IpcSelectionResult>, io::Error> {
502        let results = self.os_receiver_set.select()?;
503        Ok(results
504            .into_iter()
505            .map(|result| match result {
506                OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
507                    IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
508                },
509                OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
510                    IpcSelectionResult::ChannelClosed(os_receiver_id)
511                },
512            })
513            .collect())
514    }
515}
516
517/// Shared memory descriptor that will be made accessible to the receiver
518/// of an IPC message that contains the discriptor.
519///
520/// # Examples
521/// ```
522/// # use ipc_channel::ipc::{self, IpcSharedMemory};
523/// # let (tx, rx) = ipc::channel().unwrap();
524/// # let data = [0x76, 0x69, 0x6d, 0x00];
525/// let shmem = IpcSharedMemory::from_bytes(&data);
526/// tx.send(shmem.clone()).unwrap();
527/// # let rx_shmem = rx.recv().unwrap();
528/// # assert_eq!(shmem, rx_shmem);
529/// ```
530#[derive(Clone, Debug, PartialEq)]
531pub struct IpcSharedMemory {
532    /// None represents no data (empty slice)
533    os_shared_memory: Option<OsIpcSharedMemory>,
534}
535
536impl Deref for IpcSharedMemory {
537    type Target = [u8];
538
539    #[inline]
540    fn deref(&self) -> &[u8] {
541        if let Some(os_shared_memory) = &self.os_shared_memory {
542            os_shared_memory
543        } else {
544            &[]
545        }
546    }
547}
548
549impl IpcSharedMemory {
550    /// Returns a mutable reference to the deref of this [`IpcSharedMemory`].
551    ///
552    /// # Safety
553    ///
554    /// This is safe if there is only one reader/writer on the data.
555    /// User can achieve this by not cloning [`IpcSharedMemory`]
556    /// and serializing/deserializing only once.
557    #[inline]
558    pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
559        if let Some(os_shared_memory) = &mut self.os_shared_memory {
560            os_shared_memory.deref_mut()
561        } else {
562            &mut []
563        }
564    }
565}
566
567impl<'de> Deserialize<'de> for IpcSharedMemory {
568    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
569    where
570        D: Deserializer<'de>,
571    {
572        let index: usize = Deserialize::deserialize(deserializer)?;
573        if index == usize::MAX {
574            return Ok(IpcSharedMemory::empty());
575        }
576
577        let os_shared_memory = OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
578            |os_ipc_shared_memory_regions_for_deserialization| {
579                let mut regions =  os_ipc_shared_memory_regions_for_deserialization.borrow_mut();
580                let Some(region) = regions.get_mut(index) else {
581                    return Err(format!("Cannot consume shared memory region at index {index}, there are only {} regions available", regions.len()));
582                };
583
584                region.take().ok_or_else(|| format!("Shared memory region {index} has already been consumed"))
585            },
586        ).map_err(D::Error::custom)?;
587
588        Ok(IpcSharedMemory {
589            os_shared_memory: Some(os_shared_memory),
590        })
591    }
592}
593
594impl Serialize for IpcSharedMemory {
595    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
596    where
597        S: Serializer,
598    {
599        if let Some(os_shared_memory) = &self.os_shared_memory {
600            let index = OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
601                |os_ipc_shared_memory_regions_for_serialization| {
602                    let mut os_ipc_shared_memory_regions_for_serialization =
603                        os_ipc_shared_memory_regions_for_serialization.borrow_mut();
604                    let index = os_ipc_shared_memory_regions_for_serialization.len();
605                    os_ipc_shared_memory_regions_for_serialization.push(os_shared_memory.clone());
606                    index
607                },
608            );
609            debug_assert!(index < usize::MAX);
610            index
611        } else {
612            usize::MAX
613        }
614        .serialize(serializer)
615    }
616}
617
618impl IpcSharedMemory {
619    const fn empty() -> Self {
620        Self {
621            os_shared_memory: None,
622        }
623    }
624
625    /// Create shared memory initialized with the bytes provided.
626    pub fn from_bytes(bytes: &[u8]) -> IpcSharedMemory {
627        if bytes.is_empty() {
628            IpcSharedMemory::empty()
629        } else {
630            IpcSharedMemory {
631                os_shared_memory: Some(OsIpcSharedMemory::from_bytes(bytes)),
632            }
633        }
634    }
635
636    /// Create a chunk of shared memory that is filled with the byte
637    /// provided.
638    pub fn from_byte(byte: u8, length: usize) -> IpcSharedMemory {
639        if length == 0 {
640            IpcSharedMemory::empty()
641        } else {
642            IpcSharedMemory {
643                os_shared_memory: Some(OsIpcSharedMemory::from_byte(byte, length)),
644            }
645        }
646    }
647}
648
649/// Result for readable events returned from [IpcReceiverSet::select].
650///
651/// [IpcReceiverSet::select]: struct.IpcReceiverSet.html#method.select
652pub enum IpcSelectionResult {
653    /// A message received from the [`IpcReceiver`] in the [`IpcMessage`] form,
654    /// identified by the `u64` value.
655    MessageReceived(u64, IpcMessage),
656    /// The channel has been closed for the [IpcReceiver] identified by the `u64` value.
657    /// [IpcReceiver]: struct.IpcReceiver.html
658    ChannelClosed(u64),
659}
660
661impl IpcSelectionResult {
662    /// Helper method to move the value out of the [IpcSelectionResult] if it
663    /// is [MessageReceived].
664    ///
665    /// # Panics
666    ///
667    /// If the result is [ChannelClosed] this call will panic.
668    ///
669    /// [IpcSelectionResult]: enum.IpcSelectionResult.html
670    /// [MessageReceived]: enum.IpcSelectionResult.html#variant.MessageReceived
671    /// [ChannelClosed]: enum.IpcSelectionResult.html#variant.ChannelClosed
672    pub fn unwrap(self) -> (u64, IpcMessage) {
673        match self {
674            IpcSelectionResult::MessageReceived(id, message) => (id, message),
675            IpcSelectionResult::ChannelClosed(id) => {
676                panic!("IpcSelectionResult::unwrap(): channel {id} closed")
677            },
678        }
679    }
680}
681
682/// Structure used to represent a raw message from an [`IpcSender`].
683///
684/// Use the [to] method to deserialize the raw result into the requested type.
685///
686/// [to]: #method.to
687#[derive(PartialEq)]
688pub struct IpcMessage {
689    pub(crate) data: Vec<u8>,
690    pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
691    pub(crate) os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
692}
693
694impl IpcMessage {
695    /// Create a new [`IpcMessage`] with data and without any [`OsOpaqueIpcChannel`]s and
696    /// [`OsIpcSharedMemory`] regions.
697    pub fn from_data(data: Vec<u8>) -> Self {
698        Self {
699            data,
700            os_ipc_channels: vec![],
701            os_ipc_shared_memory_regions: vec![],
702        }
703    }
704}
705
706impl Debug for IpcMessage {
707    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
708        match String::from_utf8(self.data.clone()) {
709            Ok(string) => string.chars().take(256).collect::<String>().fmt(formatter),
710            Err(..) => self.data[0..min(self.data.len(), 256)].fmt(formatter),
711        }
712    }
713}
714
715impl IpcMessage {
716    pub(crate) fn new(
717        data: Vec<u8>,
718        os_ipc_channels: Vec<OsOpaqueIpcChannel>,
719        os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
720    ) -> IpcMessage {
721        IpcMessage {
722            data,
723            os_ipc_channels,
724            os_ipc_shared_memory_regions,
725        }
726    }
727
728    /// Deserialize the raw data in the contained message into the inferred type.
729    pub fn to<T>(mut self) -> Result<T, bincode::Error>
730    where
731        T: for<'de> Deserialize<'de> + Serialize,
732    {
733        OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
734            OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
735                |os_ipc_shared_memory_regions_for_deserialization| {
736                    mem::swap(
737                        &mut *os_ipc_channels_for_deserialization.borrow_mut(),
738                        &mut self.os_ipc_channels,
739                    );
740                    let old_ipc_shared_memory_regions_for_deserialization = mem::replace(
741                        &mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(),
742                        self.os_ipc_shared_memory_regions
743                            .into_iter()
744                            .map(Some)
745                            .collect(),
746                    );
747                    let result = bincode::deserialize(&self.data[..]);
748                    *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() =
749                        old_ipc_shared_memory_regions_for_deserialization;
750                    mem::swap(
751                        &mut *os_ipc_channels_for_deserialization.borrow_mut(),
752                        &mut self.os_ipc_channels,
753                    );
754                    /* Error check comes after doing cleanup,
755                     * since we need the cleanup both in the success and the error cases. */
756                    result
757                },
758            )
759        })
760    }
761}
762
763#[derive(Clone, Debug)]
764pub struct OpaqueIpcSender {
765    os_sender: OsIpcSender,
766}
767
768impl OpaqueIpcSender {
769    pub fn to<'de, T>(self) -> IpcSender<T>
770    where
771        T: Deserialize<'de> + Serialize,
772    {
773        IpcSender {
774            os_sender: self.os_sender,
775            phantom: PhantomData,
776        }
777    }
778}
779
780impl<'de> Deserialize<'de> for OpaqueIpcSender {
781    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
782    where
783        D: Deserializer<'de>,
784    {
785        let os_sender = deserialize_os_ipc_sender(deserializer)?;
786        Ok(OpaqueIpcSender { os_sender })
787    }
788}
789
790impl Serialize for OpaqueIpcSender {
791    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
792    where
793        S: Serializer,
794    {
795        serialize_os_ipc_sender(&self.os_sender, serializer)
796    }
797}
798
799#[derive(Debug)]
800pub struct OpaqueIpcReceiver {
801    os_receiver: OsIpcReceiver,
802}
803
804impl OpaqueIpcReceiver {
805    pub fn to<'de, T>(self) -> IpcReceiver<T>
806    where
807        T: Deserialize<'de> + Serialize,
808    {
809        IpcReceiver {
810            os_receiver: self.os_receiver,
811            phantom: PhantomData,
812        }
813    }
814}
815
816impl<'de> Deserialize<'de> for OpaqueIpcReceiver {
817    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
818    where
819        D: Deserializer<'de>,
820    {
821        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
822        Ok(OpaqueIpcReceiver { os_receiver })
823    }
824}
825
826impl Serialize for OpaqueIpcReceiver {
827    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
828    where
829        S: Serializer,
830    {
831        serialize_os_ipc_receiver(&self.os_receiver, serializer)
832    }
833}
834
835/// A server associated with a given name. The server is "one-shot" because
836/// it accepts only one connect request from a client.
837///
838/// # Examples
839///
840/// ## Basic Usage
841///
842/// ```
843/// use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender, IpcReceiver};
844///
845/// let (server, server_name) = IpcOneShotServer::new().unwrap();
846/// let tx: IpcSender<Vec<u8>> = IpcSender::connect(server_name).unwrap();
847///
848/// tx.send(vec![0x10, 0x11, 0x12, 0x13]).unwrap();
849/// let (_, data): (_, Vec<u8>) = server.accept().unwrap();
850/// assert_eq!(data, vec![0x10, 0x11, 0x12, 0x13]);
851/// ```
852///
853/// ## Sending an [IpcSender]
854/// ```
855/// use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender, IpcReceiver};
856/// let (server, name) = IpcOneShotServer::new().unwrap();
857///
858/// let (tx1, rx1): (IpcSender<Vec<u8>>, IpcReceiver<Vec<u8>>) = ipc::channel().unwrap();
859/// let tx0 = IpcSender::connect(name).unwrap();
860/// tx0.send(tx1).unwrap();
861///
862/// let (_, tx1): (_, IpcSender<Vec<u8>>) = server.accept().unwrap();
863/// tx1.send(vec![0x48, 0x65, 0x6b, 0x6b, 0x6f, 0x00]).unwrap();
864///
865/// let data = rx1.recv().unwrap();
866/// assert_eq!(data, vec![0x48, 0x65, 0x6b, 0x6b, 0x6f, 0x00]);
867/// ```
868/// [IpcSender]: struct.IpcSender.html
869pub struct IpcOneShotServer<T> {
870    os_server: OsIpcOneShotServer,
871    phantom: PhantomData<T>,
872}
873
874impl<T> IpcOneShotServer<T>
875where
876    T: for<'de> Deserialize<'de> + Serialize,
877{
878    pub fn new() -> Result<(IpcOneShotServer<T>, String), io::Error> {
879        let (os_server, name) = OsIpcOneShotServer::new()?;
880        Ok((
881            IpcOneShotServer {
882                os_server,
883                phantom: PhantomData,
884            },
885            name,
886        ))
887    }
888
889    pub fn accept(self) -> Result<(IpcReceiver<T>, T), bincode::Error> {
890        let (os_receiver, ipc_message) = self.os_server.accept()?;
891        Ok((
892            IpcReceiver {
893                os_receiver,
894                phantom: PhantomData,
895            },
896            ipc_message.to()?,
897        ))
898    }
899}
900
901/// Receiving end of a channel that does not used serialized messages.
902#[derive(Debug)]
903pub struct IpcBytesReceiver {
904    os_receiver: OsIpcReceiver,
905}
906
907impl IpcBytesReceiver {
908    /// Blocking receive.
909    #[inline]
910    pub fn recv(&self) -> Result<Vec<u8>, IpcError> {
911        match self.os_receiver.recv() {
912            Ok(ipc_message) => Ok(ipc_message.data),
913            Err(err) => Err(err.into()),
914        }
915    }
916
917    /// Non-blocking receive
918    pub fn try_recv(&self) -> Result<Vec<u8>, TryRecvError> {
919        match self.os_receiver.try_recv() {
920            Ok(ipc_message) => Ok(ipc_message.data),
921            Err(err) => Err(err.into()),
922        }
923    }
924}
925
926impl<'de> Deserialize<'de> for IpcBytesReceiver {
927    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
928    where
929        D: Deserializer<'de>,
930    {
931        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
932        Ok(IpcBytesReceiver { os_receiver })
933    }
934}
935
936impl Serialize for IpcBytesReceiver {
937    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
938    where
939        S: Serializer,
940    {
941        serialize_os_ipc_receiver(&self.os_receiver, serializer)
942    }
943}
944
945/// Sending end of a channel that does not used serialized messages.
946#[derive(Debug)]
947pub struct IpcBytesSender {
948    os_sender: OsIpcSender,
949}
950
951impl Clone for IpcBytesSender {
952    fn clone(&self) -> IpcBytesSender {
953        IpcBytesSender {
954            os_sender: self.os_sender.clone(),
955        }
956    }
957}
958
959impl<'de> Deserialize<'de> for IpcBytesSender {
960    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
961    where
962        D: Deserializer<'de>,
963    {
964        let os_sender = deserialize_os_ipc_sender(deserializer)?;
965        Ok(IpcBytesSender { os_sender })
966    }
967}
968
969impl Serialize for IpcBytesSender {
970    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
971    where
972        S: Serializer,
973    {
974        serialize_os_ipc_sender(&self.os_sender, serializer)
975    }
976}
977
978impl IpcBytesSender {
979    #[inline]
980    pub fn send(&self, data: &[u8]) -> Result<(), io::Error> {
981        self.os_sender
982            .send(data, vec![], vec![])
983            .map_err(io::Error::from)
984    }
985}
986
987fn serialize_os_ipc_sender<S>(os_ipc_sender: &OsIpcSender, serializer: S) -> Result<S::Ok, S::Error>
988where
989    S: Serializer,
990{
991    let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
992        let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
993        let index = os_ipc_channels_for_serialization.len();
994        os_ipc_channels_for_serialization.push(OsIpcChannel::Sender(os_ipc_sender.clone()));
995        index
996    });
997    index.serialize(serializer)
998}
999
1000fn deserialize_os_ipc_sender<'de, D>(deserializer: D) -> Result<OsIpcSender, D::Error>
1001where
1002    D: Deserializer<'de>,
1003{
1004    let index: usize = Deserialize::deserialize(deserializer)?;
1005    OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1006        // FIXME(pcwalton): This could panic if the data was corrupt and the index was out of
1007        // bounds. We should return an `Err` result instead.
1008        Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_sender())
1009    })
1010}
1011
1012fn serialize_os_ipc_receiver<S>(
1013    os_receiver: &OsIpcReceiver,
1014    serializer: S,
1015) -> Result<S::Ok, S::Error>
1016where
1017    S: Serializer,
1018{
1019    let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
1020        let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
1021        let index = os_ipc_channels_for_serialization.len();
1022        os_ipc_channels_for_serialization.push(OsIpcChannel::Receiver(os_receiver.consume()));
1023        index
1024    });
1025    index.serialize(serializer)
1026}
1027
1028fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) -> Result<OsIpcReceiver, D::Error>
1029where
1030    D: Deserializer<'de>,
1031{
1032    let index: usize = Deserialize::deserialize(deserializer)?;
1033
1034    OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1035        // FIXME(pcwalton): This could panic if the data was corrupt and the index was out
1036        // of bounds. We should return an `Err` result instead.
1037        Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_receiver())
1038    })
1039}