Skip to main content

servo_base/generic_channel/
generic_channelset.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
5use serde::{Deserialize, Serialize};
6
7use crate::generic_channel::{GenericReceiver, GenericReceiverVariants, use_ipc};
8
9/// A GenericReceiverSet. Allows you to wait on multiple GenericReceivers.
10/// Automatically selects either Ipc or crossbeam depending on multiprocess mode.
11/// # Examples
12/// ```ignore
13/// let mut rx_set = GenericReceiverSet::new();
14/// let private_channel = generic_channel::channel().unwrap();
15/// let public_channel = generic_channel::channel().unwrap();
16/// let reporter_channel = generic_channel::channel().unwrap();
17/// let private_id = rx_set.add(private_receiver);
18/// let public_id = rx_set.add(public_receiver);
19/// let reporter_id = rx_set.add(memory_reporter);
20/// for received in rx_set.select().into_iter() {
21///     match received {
22///         GenericSelectionResult::ChannelClosed(_) => continue,
23///         GenericSelectionResult::Error => println!("Found selection error"),
24///         GenericSelectionResult::MessageReceived(id, msg) => { /*...*/ }
25///     }
26/// }
27/// ```
28pub struct GenericReceiverSet<T: Serialize + for<'de> Deserialize<'de>>(
29    GenericReceiverSetVariants<T>,
30);
31
32impl<T: Serialize + for<'de> Deserialize<'de>> Default for GenericReceiverSet<T> {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37enum GenericReceiverSetVariants<T: for<'de> Deserialize<'de>> {
38    Ipc(ipc_channel::ipc::IpcReceiverSet),
39    Crossbeam(Vec<crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>>),
40}
41
42#[cfg(test)]
43pub fn create_ipc_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
44-> GenericReceiverSet<T> {
45    GenericReceiverSet(GenericReceiverSetVariants::Ipc(
46        IpcReceiverSet::new().expect("Could not create ipc receiver"),
47    ))
48}
49
50#[cfg(test)]
51pub fn create_crossbeam_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
52-> GenericReceiverSet<T> {
53    GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
54}
55
56/// Result for readable events returned from [GenericReceiverSet::select].
57#[derive(Debug, PartialEq)]
58pub enum GenericSelectionResult<T> {
59    /// A message received from the [`GenericReceiver`],
60    /// identified by the `u64` value and Deserialized into a 'T'.
61    MessageReceived(u64, T),
62    /// The channel has been closed for the [GenericReceiver] identified by the `u64` value.
63    ChannelClosed(u64),
64    /// An error occurred decoding the message.
65    Error(String),
66}
67
68impl<T: Serialize + for<'de> serde::Deserialize<'de>> From<IpcSelectionResult>
69    for GenericSelectionResult<T>
70{
71    fn from(value: IpcSelectionResult) -> Self {
72        match value {
73            IpcSelectionResult::MessageReceived(channel_id, ipc_message) => {
74                match ipc_message.to() {
75                    Ok(value) => GenericSelectionResult::MessageReceived(channel_id, value),
76                    Err(error) => GenericSelectionResult::Error(error.to_string()),
77                }
78            },
79            IpcSelectionResult::ChannelClosed(channel_id) => {
80                GenericSelectionResult::ChannelClosed(channel_id)
81            },
82        }
83    }
84}
85
86impl<T: Serialize + for<'de> Deserialize<'de>> GenericReceiverSet<T> {
87    /// Create a new ReceiverSet.
88    pub fn new() -> GenericReceiverSet<T> {
89        if use_ipc() {
90            GenericReceiverSet(GenericReceiverSetVariants::Ipc(
91                IpcReceiverSet::new().expect("Could not create ipc receiver"),
92            ))
93        } else {
94            GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
95        }
96    }
97
98    /// Add a receiver to the set.
99    pub fn add(&mut self, receiver: GenericReceiver<T>) -> u64 {
100        match (&mut self.0, receiver.0) {
101            (
102                GenericReceiverSetVariants::Ipc(ipc_receiver_set),
103                GenericReceiverVariants::Ipc(ipc_receiver),
104            ) => ipc_receiver_set
105                .add(ipc_receiver)
106                .expect("Could not add channel"),
107            (GenericReceiverSetVariants::Ipc(_), GenericReceiverVariants::Crossbeam(_)) => {
108                unreachable!()
109            },
110            (GenericReceiverSetVariants::Crossbeam(_), GenericReceiverVariants::Ipc(_)) => {
111                unreachable!()
112            },
113            (
114                GenericReceiverSetVariants::Crossbeam(receivers),
115                GenericReceiverVariants::Crossbeam(receiver),
116            ) => {
117                let index = receivers.len() as u64;
118                receivers.push(receiver);
119                index
120            },
121        }
122    }
123
124    /// Create a [`Selector`] that owns the underlying select state.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    ///  use servo_base::generic_channel::{self, GenericReceiverSet};
130    ///
131    ///  let (_, receiver_one) = generic_channel::channel::<()>().unwrap();
132    ///  let (_, receiver_two) = generic_channel::channel::<()>().unwrap();
133    ///  let mut rx_set = GenericReceiverSet::<()>::new();
134    ///  let _select_idx_1 = rx_set.add(receiver_one);
135    ///  let _select_idx_2 = rx_set.add(receiver_two);
136    ///  // Build the Selector once, before the loop if all receivers are known in advance.
137    ///  let mut selector = rx_set.selector();
138    ///  loop {
139    ///    for received in selector.select().into_iter() {
140    ///      // do something
141    ///    }
142    ///  }
143    /// ```
144    pub fn selector(&mut self) -> Selector<'_, T> {
145        let inner = match &mut self.0 {
146            GenericReceiverSetVariants::Ipc(set) => SelectorInner::Ipc(set),
147            GenericReceiverSetVariants::Crossbeam(receivers) => {
148                let mut sel = crossbeam_channel::Select::new();
149                for receiver in receivers.iter() {
150                    sel.recv(receiver);
151                }
152                SelectorInner::Crossbeam {
153                    receivers: receivers.as_slice(),
154                    sel,
155                }
156            },
157        };
158        Selector { inner }
159    }
160
161    /// One-shot select. Builds a [`Selector`], runs select once and drops it.
162    ///
163    /// For usage in loops consider using [`GenericReceiverSet::selector()`] to build the selector
164    /// once upfront.
165    pub fn select(&mut self) -> Vec<GenericSelectionResult<T>> {
166        self.selector().select()
167    }
168}
169
170/// Borrows of a [`GenericReceiverSet`] used to drive repeated `select` calls
171/// without rebuilding the underlying `crossbeam_channel::Select` each time.
172/// See [`GenericReceiverSet::selector`].
173pub struct Selector<'a, T: Serialize + for<'de> Deserialize<'de>> {
174    inner: SelectorInner<'a, T>,
175}
176
177enum SelectorInner<'a, T: for<'de> Deserialize<'de>> {
178    Ipc(&'a mut IpcReceiverSet),
179    Crossbeam {
180        receivers: &'a [crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>],
181        sel: crossbeam_channel::Select<'a>,
182    },
183}
184
185impl<'a, T: Serialize + for<'de> Deserialize<'de>> Selector<'a, T> {
186    /// Block until at least one of the Receivers receives a message.
187    ///
188    /// Note: The IPC variant can return multiple results in one call.
189    /// The crossbeam variant always returns exactly one.
190    pub fn select(&mut self) -> Vec<GenericSelectionResult<T>> {
191        match &mut self.inner {
192            SelectorInner::Ipc(ipc_receiver_set) => ipc_receiver_set
193                .select()
194                .map(|result_value| {
195                    result_value
196                        .into_iter()
197                        .map(|selection_result| selection_result.into())
198                        .collect()
199                })
200                .unwrap_or_else(|e| vec![GenericSelectionResult::Error(e.to_string())]),
201            SelectorInner::Crossbeam { receivers, sel } => {
202                let selected = sel.select();
203                let index = selected.index();
204                let selection_result = match receivers.get(index) {
205                    None => GenericSelectionResult::ChannelClosed(index as u64),
206                    Some(receiver) => match selected.recv(receiver) {
207                        Ok(Ok(value)) => {
208                            GenericSelectionResult::MessageReceived(index as u64, value)
209                        },
210                        Ok(Err(error)) => GenericSelectionResult::Error(error.to_string()),
211                        Err(_) => GenericSelectionResult::ChannelClosed(index as u64),
212                    },
213                };
214                vec![selection_result]
215            },
216        }
217    }
218}