base/generic_channel/
generic_channelset.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
5use serde::{Deserialize, Serialize};
6
7use crate::generic_channel::{GenericReceiver, GenericReceiverVariants};
8
9/// A GenericReceiverSet. Allows you to wait on multiple GenericReceivers.
10/// Automatically selects either Ipc or crossbeam depending on multiprocess mode.
11/// # Examples
12/// ```
13/// # let mut rx_set = GenericReceiverSet::new();
14/// # let private_channel = generic_channel::channel().unwrap();
15/// # let public_channel = generic_channel::channel().unwrap();
16/// # let reporter_channel = generic_channel::channel().unwrap();
17/// # let private_id = rx_set.add(private_receiver);
18/// # let public_id = rx_set.add(public_receiver);
19/// # let reporter_id = rx_set.add(memory_reporter);
20/// # for received in rx_set.select().into_iter() {
21/// #     match received {
22/// #         GenericSelectionResult::ChannelClosed(_) => continue,
23/// #         GenericSelectionResult::Error => println!("Found selection error"),
24/// #         GenericSelectionResult::MessageReceived(id, msg) => {
25/// #     }
26/// # }
27/// ```
28pub struct GenericReceiverSet<T: Serialize + for<'de> Deserialize<'de>>(
29    GenericReceiverSetVariants<T>,
30);
31
32impl<T: Serialize + for<'de> Deserialize<'de>> Default for GenericReceiverSet<T> {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37enum GenericReceiverSetVariants<T: for<'de> Deserialize<'de>> {
38    Ipc(ipc_channel::ipc::IpcReceiverSet),
39    Crossbeam(Vec<crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>>),
40}
41
42#[cfg(test)]
43pub fn create_ipc_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
44-> GenericReceiverSet<T> {
45    GenericReceiverSet(GenericReceiverSetVariants::Ipc(
46        IpcReceiverSet::new().expect("Could not create ipc receiver"),
47    ))
48}
49
50#[cfg(test)]
51pub fn create_crossbeam_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
52-> GenericReceiverSet<T> {
53    GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
54}
55
56/// Result for readable events returned from [GenericReceiverSet::select].
57#[derive(Debug, PartialEq)]
58pub enum GenericSelectionResult<T> {
59    /// A message received from the [`GenericReceiver`],
60    /// identified by the `u64` value and Deserialized into a 'T'.
61    MessageReceived(u64, T),
62    /// The channel has been closed for the [GenericReceiver] identified by the `u64` value.
63    ChannelClosed(u64),
64    /// An error occured decoding the message.
65    Error(String),
66}
67
68impl<T: Serialize + for<'de> serde::Deserialize<'de>> From<IpcSelectionResult>
69    for GenericSelectionResult<T>
70{
71    fn from(value: IpcSelectionResult) -> Self {
72        match value {
73            IpcSelectionResult::MessageReceived(channel_id, ipc_message) => {
74                match ipc_message.to() {
75                    Ok(value) => GenericSelectionResult::MessageReceived(channel_id, value),
76                    Err(error) => GenericSelectionResult::Error(error.to_string()),
77                }
78            },
79            IpcSelectionResult::ChannelClosed(channel_id) => {
80                GenericSelectionResult::ChannelClosed(channel_id)
81            },
82        }
83    }
84}
85
86impl<T: Serialize + for<'de> Deserialize<'de>> GenericReceiverSet<T> {
87    /// Create a new ReceiverSet.
88    pub fn new() -> GenericReceiverSet<T> {
89        if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
90            GenericReceiverSet(GenericReceiverSetVariants::Ipc(
91                IpcReceiverSet::new().expect("Could not create ipc receiver"),
92            ))
93        } else {
94            GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
95        }
96    }
97
98    /// Add a receiver to the set.
99    pub fn add(&mut self, receiver: GenericReceiver<T>) -> u64 {
100        match (&mut self.0, receiver.0) {
101            (
102                GenericReceiverSetVariants::Ipc(ipc_receiver_set),
103                GenericReceiverVariants::Ipc(ipc_receiver),
104            ) => ipc_receiver_set
105                .add(ipc_receiver)
106                .expect("Could not add channel"),
107            (GenericReceiverSetVariants::Ipc(_), GenericReceiverVariants::Crossbeam(_)) => {
108                unreachable!()
109            },
110            (GenericReceiverSetVariants::Crossbeam(_), GenericReceiverVariants::Ipc(_)) => {
111                unreachable!()
112            },
113            (
114                GenericReceiverSetVariants::Crossbeam(receivers),
115                GenericReceiverVariants::Crossbeam(receiver),
116            ) => {
117                let index = receivers.len() as u64;
118                receivers.push(receiver);
119                index
120            },
121        }
122    }
123
124    /// Block until at least one of the Receivers receives a message and return a vector of the received messages.
125    pub fn select(&mut self) -> Vec<GenericSelectionResult<T>> {
126        match &mut self.0 {
127            GenericReceiverSetVariants::Ipc(ipc_receiver_set) => ipc_receiver_set
128                .select()
129                .map(|result_value| {
130                    result_value
131                        .into_iter()
132                        .map(|selection_result| selection_result.into())
133                        .collect()
134                })
135                .unwrap_or_else(|e| vec![GenericSelectionResult::Error(e.to_string())]),
136            GenericReceiverSetVariants::Crossbeam(receivers) => {
137                let mut sel = crossbeam_channel::Select::new();
138                // we need to add all the receivers to the set
139                let _selected_receivers = receivers
140                    .iter()
141                    .map(|receiver| sel.recv(receiver))
142                    .collect::<Vec<usize>>();
143                let selector = sel.select();
144                let index = selector.index();
145                let Some(receiver) = receivers.get(index) else {
146                    return vec![GenericSelectionResult::ChannelClosed(index as u64)];
147                };
148                let Ok(result) = selector.recv(receiver) else {
149                    return vec![GenericSelectionResult::ChannelClosed(index as u64)];
150                };
151
152                vec![GenericSelectionResult::MessageReceived(
153                    index.try_into().unwrap(),
154                    result.unwrap(),
155                )]
156            },
157        }
158    }
159}