base/generic_channel/
generic_channelset.rs1use ipc_channel::ipc::{IpcReceiverSet, IpcSelectionResult};
5use serde::{Deserialize, Serialize};
6
7use crate::generic_channel::{GenericReceiver, GenericReceiverVariants};
8
9pub struct GenericReceiverSet<T: Serialize + for<'de> Deserialize<'de>>(
29 GenericReceiverSetVariants<T>,
30);
31
32impl<T: Serialize + for<'de> Deserialize<'de>> Default for GenericReceiverSet<T> {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37enum GenericReceiverSetVariants<T: for<'de> Deserialize<'de>> {
38 Ipc(ipc_channel::ipc::IpcReceiverSet),
39 Crossbeam(Vec<crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>>),
40}
41
42#[cfg(test)]
43pub fn create_ipc_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
44-> GenericReceiverSet<T> {
45 GenericReceiverSet(GenericReceiverSetVariants::Ipc(
46 IpcReceiverSet::new().expect("Could not create ipc receiver"),
47 ))
48}
49
50#[cfg(test)]
51pub fn create_crossbeam_receiver_set<T: Serialize + for<'de> serde::Deserialize<'de>>()
52-> GenericReceiverSet<T> {
53 GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
54}
55
56#[derive(Debug, PartialEq)]
58pub enum GenericSelectionResult<T> {
59 MessageReceived(u64, T),
62 ChannelClosed(u64),
64 Error(String),
66}
67
68impl<T: Serialize + for<'de> serde::Deserialize<'de>> From<IpcSelectionResult>
69 for GenericSelectionResult<T>
70{
71 fn from(value: IpcSelectionResult) -> Self {
72 match value {
73 IpcSelectionResult::MessageReceived(channel_id, ipc_message) => {
74 match ipc_message.to() {
75 Ok(value) => GenericSelectionResult::MessageReceived(channel_id, value),
76 Err(error) => GenericSelectionResult::Error(error.to_string()),
77 }
78 },
79 IpcSelectionResult::ChannelClosed(channel_id) => {
80 GenericSelectionResult::ChannelClosed(channel_id)
81 },
82 }
83 }
84}
85
86impl<T: Serialize + for<'de> Deserialize<'de>> GenericReceiverSet<T> {
87 pub fn new() -> GenericReceiverSet<T> {
89 if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
90 GenericReceiverSet(GenericReceiverSetVariants::Ipc(
91 IpcReceiverSet::new().expect("Could not create ipc receiver"),
92 ))
93 } else {
94 GenericReceiverSet(GenericReceiverSetVariants::Crossbeam(vec![]))
95 }
96 }
97
98 pub fn add(&mut self, receiver: GenericReceiver<T>) -> u64 {
100 match (&mut self.0, receiver.0) {
101 (
102 GenericReceiverSetVariants::Ipc(ipc_receiver_set),
103 GenericReceiverVariants::Ipc(ipc_receiver),
104 ) => ipc_receiver_set
105 .add(ipc_receiver)
106 .expect("Could not add channel"),
107 (GenericReceiverSetVariants::Ipc(_), GenericReceiverVariants::Crossbeam(_)) => {
108 unreachable!()
109 },
110 (GenericReceiverSetVariants::Crossbeam(_), GenericReceiverVariants::Ipc(_)) => {
111 unreachable!()
112 },
113 (
114 GenericReceiverSetVariants::Crossbeam(receivers),
115 GenericReceiverVariants::Crossbeam(receiver),
116 ) => {
117 let index = receivers.len() as u64;
118 receivers.push(receiver);
119 index
120 },
121 }
122 }
123
124 pub fn select(&mut self) -> Vec<GenericSelectionResult<T>> {
126 match &mut self.0 {
127 GenericReceiverSetVariants::Ipc(ipc_receiver_set) => ipc_receiver_set
128 .select()
129 .map(|result_value| {
130 result_value
131 .into_iter()
132 .map(|selection_result| selection_result.into())
133 .collect()
134 })
135 .unwrap_or_else(|e| vec![GenericSelectionResult::Error(e.to_string())]),
136 GenericReceiverSetVariants::Crossbeam(receivers) => {
137 let mut sel = crossbeam_channel::Select::new();
138 let _selected_receivers = receivers
140 .iter()
141 .map(|receiver| sel.recv(receiver))
142 .collect::<Vec<usize>>();
143 let selector = sel.select();
144 let index = selector.index();
145 let Some(receiver) = receivers.get(index) else {
146 return vec![GenericSelectionResult::ChannelClosed(index as u64)];
147 };
148 let Ok(result) = selector.recv(receiver) else {
149 return vec![GenericSelectionResult::ChannelClosed(index as u64)];
150 };
151
152 vec![GenericSelectionResult::MessageReceived(
153 index.try_into().unwrap(),
154 result.unwrap(),
155 )]
156 },
157 }
158 }
159}