ipc_channel/router.rs
1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread::{self, JoinHandle};
19
20use crossbeam_channel::{self, Receiver, Sender};
21use serde::{Deserialize, Serialize};
22
23use crate::ipc::{
24 self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver,
25};
26
27/// Global object wrapping a `RouterProxy`.
28/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
29/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
30pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
31
32/// A `RouterProxy` provides methods for talking to the router. Calling
33/// [new](RouterProxy::new) automatically spins up a router thread which
34/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
35/// methods communicate with the running router thread to register new
36/// `IpcReceiver<T>`'s
37pub struct RouterProxy {
38 comm: Mutex<RouterProxyComm>,
39}
40
41#[allow(clippy::new_without_default)]
42impl RouterProxy {
43 pub fn new() -> RouterProxy {
44 // Router acts like a receiver, running in its own thread with both
45 // receiver ends.
46 // Router proxy takes both sending ends.
47 let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
48 let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
49 let handle = thread::Builder::new()
50 .name("router-proxy".to_string())
51 .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
52 .expect("Failed to spawn router proxy thread");
53 RouterProxy {
54 comm: Mutex::new(RouterProxyComm {
55 msg_sender,
56 wakeup_sender,
57 shutdown: false,
58 handle: Some(handle),
59 }),
60 }
61 }
62
63 /// Add a new (receiver, callback) pair to the router, and send a wakeup message
64 /// to the router.
65 ///
66 /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
67 /// mismatches between the receiver and callback types.
68 #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
69 pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
70 let comm = self.comm.lock().unwrap();
71
72 if comm.shutdown {
73 return;
74 }
75
76 comm.msg_sender
77 .send(RouterMsg::AddRoute(receiver, callback))
78 .unwrap();
79 comm.wakeup_sender.send(()).unwrap();
80 }
81
82 /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
83 /// to the router.
84 ///
85 /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
86 /// that the `receiver` and the `callback` use the same message type.
87 pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
88 where
89 T: Serialize + for<'de> Deserialize<'de> + 'static,
90 {
91 // Before passing the message on to the callback, turn it into the appropriate type
92 let modified_callback = move |msg: IpcMessage| {
93 let typed_message = msg.to::<T>();
94 callback(typed_message)
95 };
96
97 #[allow(deprecated)]
98 self.add_route(receiver.to_opaque(), Box::new(modified_callback));
99 }
100
101 /// Send a shutdown message to the router containing a ACK sender,
102 /// send a wakeup message to the router, and block on the ACK.
103 /// Calling it is idempotent,
104 /// which can be useful when running a multi-process system in single-process mode.
105 pub fn shutdown(&self) {
106 let mut comm = self.comm.lock().unwrap();
107
108 if comm.shutdown {
109 return;
110 }
111 comm.shutdown = true;
112
113 let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
114 comm.wakeup_sender
115 .send(())
116 .map(|_| {
117 comm.msg_sender
118 .send(RouterMsg::Shutdown(ack_sender))
119 .unwrap();
120 ack_receiver.recv().unwrap();
121 })
122 .unwrap();
123 comm.handle
124 .take()
125 .expect("Should have a join handle at shutdown")
126 .join()
127 .expect("Failed to join on the router proxy thread");
128 }
129
130 /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
131 pub fn route_ipc_receiver_to_crossbeam_sender<T>(
132 &self,
133 ipc_receiver: IpcReceiver<T>,
134 crossbeam_sender: Sender<T>,
135 ) where
136 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
137 {
138 self.add_typed_route(
139 ipc_receiver,
140 Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
141 )
142 }
143
144 /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
145 /// use of a `Router`.
146 pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
147 &self,
148 ipc_receiver: IpcReceiver<T>,
149 ) -> Receiver<T>
150 where
151 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
152 {
153 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
154 self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
155 crossbeam_receiver
156 }
157}
158
159struct RouterProxyComm {
160 msg_sender: Sender<RouterMsg>,
161 wakeup_sender: IpcSender<()>,
162 shutdown: bool,
163 handle: Option<JoinHandle<()>>,
164}
165
166/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
167/// and listens for events using select().
168struct Router {
169 /// Get messages from RouterProxy.
170 msg_receiver: Receiver<RouterMsg>,
171 /// The ID/index of the special channel we use to identify messages from msg_receiver.
172 msg_wakeup_id: u64,
173 /// Set of all receivers which have been registered for us to select on.
174 ipc_receiver_set: IpcReceiverSet,
175 /// Maps ids to their handler functions.
176 handlers: HashMap<u64, RouterHandler>,
177}
178
179impl Router {
180 fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
181 let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
182 let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
183 Router {
184 msg_receiver,
185 msg_wakeup_id,
186 ipc_receiver_set,
187 handlers: HashMap::new(),
188 }
189 }
190
191 /// Continuously loop waiting for wakeup signals from router proxy.
192 /// Iterate over events either:
193 /// 1) If a message comes in from our special `wakeup_receiver` (identified through
194 /// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
195 /// to our receiver set.
196 /// 2) Call appropriate handler based on message id.
197 /// 3) Remove handler once channel closes.
198 fn run(&mut self) {
199 loop {
200 // Wait for events to come from our select() new channels are added to
201 // our ReceiverSet below.
202 let results = match self.ipc_receiver_set.select() {
203 Ok(results) => results,
204 Err(_) => break,
205 };
206
207 // Iterate over numerous events that were ready at this time.
208 for result in results.into_iter() {
209 match result {
210 // Message came from the RouterProxy. Listen on our `msg_receiver`
211 // channel.
212 IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
213 match self.msg_receiver.recv().unwrap() {
214 RouterMsg::AddRoute(receiver, handler) => {
215 let new_receiver_id =
216 self.ipc_receiver_set.add_opaque(receiver).unwrap();
217 self.handlers.insert(new_receiver_id, handler);
218 },
219 RouterMsg::Shutdown(sender) => {
220 sender
221 .send(())
222 .expect("Failed to send comfirmation of shutdown.");
223 return;
224 },
225 }
226 },
227 // Event from one of our registered receivers, call callback.
228 IpcSelectionResult::MessageReceived(id, message) => {
229 self.handlers.get_mut(&id).unwrap()(message)
230 },
231 IpcSelectionResult::ChannelClosed(id) => {
232 let _ = self.handlers.remove(&id).unwrap();
233 },
234 }
235 }
236 }
237 }
238}
239
240enum RouterMsg {
241 /// Register the receiver OpaqueIpcReceiver for listening for events on.
242 /// When a message comes from this receiver, call RouterHandler.
243 AddRoute(OpaqueIpcReceiver, RouterHandler),
244 /// Shutdown the router, providing a sender to send an acknowledgement.
245 Shutdown(Sender<()>),
246}
247
248/// Function to call when a new event is received from the corresponding receiver.
249pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
250
251/// Like [RouterHandler] but includes the type that will be passed to the callback
252pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;