ipc_channel/
router.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread::{self, JoinHandle};
19
20use crossbeam_channel::{self, Receiver, Sender};
21use serde::{Deserialize, Serialize};
22
23use crate::ipc::{
24    self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver,
25};
26
27/// Global object wrapping a `RouterProxy`.
28/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
29/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
30pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
31
32/// A `RouterProxy` provides methods for talking to the router. Calling
33/// [new](RouterProxy::new) automatically spins up a router thread which
34/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
35/// methods communicate with the running router thread to register new
36/// `IpcReceiver<T>`'s
37pub struct RouterProxy {
38    comm: Mutex<RouterProxyComm>,
39}
40
41#[allow(clippy::new_without_default)]
42impl RouterProxy {
43    pub fn new() -> RouterProxy {
44        // Router acts like a receiver, running in its own thread with both
45        // receiver ends.
46        // Router proxy takes both sending ends.
47        let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
48        let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
49        let handle = thread::Builder::new()
50            .name("router-proxy".to_string())
51            .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
52            .expect("Failed to spawn router proxy thread");
53        RouterProxy {
54            comm: Mutex::new(RouterProxyComm {
55                msg_sender,
56                wakeup_sender,
57                shutdown: false,
58                handle: Some(handle),
59            }),
60        }
61    }
62
63    /// Add a new (receiver, callback) pair to the router, and send a wakeup message
64    /// to the router.
65    ///
66    /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
67    /// mismatches between the receiver and callback types.
68    #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
69    pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
70        let comm = self.comm.lock().unwrap();
71
72        if comm.shutdown {
73            return;
74        }
75
76        comm.msg_sender
77            .send(RouterMsg::AddRoute(receiver, callback))
78            .unwrap();
79        comm.wakeup_sender.send(()).unwrap();
80    }
81
82    /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
83    /// to the router.
84    ///
85    /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
86    /// that the `receiver` and the `callback` use the same message type.
87    pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
88    where
89        T: Serialize + for<'de> Deserialize<'de> + 'static,
90    {
91        // Before passing the message on to the callback, turn it into the appropriate type
92        let modified_callback = move |msg: IpcMessage| {
93            let typed_message = msg.to::<T>();
94            callback(typed_message)
95        };
96
97        #[allow(deprecated)]
98        self.add_route(receiver.to_opaque(), Box::new(modified_callback));
99    }
100
101    /// Send a shutdown message to the router containing a ACK sender,
102    /// send a wakeup message to the router, and block on the ACK.
103    /// Calling it is idempotent,
104    /// which can be useful when running a multi-process system in single-process mode.
105    pub fn shutdown(&self) {
106        let mut comm = self.comm.lock().unwrap();
107
108        if comm.shutdown {
109            return;
110        }
111        comm.shutdown = true;
112
113        let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
114        comm.wakeup_sender
115            .send(())
116            .map(|_| {
117                comm.msg_sender
118                    .send(RouterMsg::Shutdown(ack_sender))
119                    .unwrap();
120                ack_receiver.recv().unwrap();
121            })
122            .unwrap();
123        comm.handle
124            .take()
125            .expect("Should have a join handle at shutdown")
126            .join()
127            .expect("Failed to join on the router proxy thread");
128    }
129
130    /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
131    pub fn route_ipc_receiver_to_crossbeam_sender<T>(
132        &self,
133        ipc_receiver: IpcReceiver<T>,
134        crossbeam_sender: Sender<T>,
135    ) where
136        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
137    {
138        self.add_typed_route(
139            ipc_receiver,
140            Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
141        )
142    }
143
144    /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
145    /// use of a `Router`.
146    pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
147        &self,
148        ipc_receiver: IpcReceiver<T>,
149    ) -> Receiver<T>
150    where
151        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
152    {
153        let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
154        self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
155        crossbeam_receiver
156    }
157}
158
159struct RouterProxyComm {
160    msg_sender: Sender<RouterMsg>,
161    wakeup_sender: IpcSender<()>,
162    shutdown: bool,
163    handle: Option<JoinHandle<()>>,
164}
165
166/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
167/// and listens for events using select().
168struct Router {
169    /// Get messages from RouterProxy.
170    msg_receiver: Receiver<RouterMsg>,
171    /// The ID/index of the special channel we use to identify messages from msg_receiver.
172    msg_wakeup_id: u64,
173    /// Set of all receivers which have been registered for us to select on.
174    ipc_receiver_set: IpcReceiverSet,
175    /// Maps ids to their handler functions.
176    handlers: HashMap<u64, RouterHandler>,
177}
178
179impl Router {
180    fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
181        let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
182        let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
183        Router {
184            msg_receiver,
185            msg_wakeup_id,
186            ipc_receiver_set,
187            handlers: HashMap::new(),
188        }
189    }
190
191    /// Continuously loop waiting for wakeup signals from router proxy.
192    /// Iterate over events either:
193    /// 1) If a message comes in from our special `wakeup_receiver` (identified through
194    ///    msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
195    ///    to our receiver set.
196    /// 2) Call appropriate handler based on message id.
197    /// 3) Remove handler once channel closes.
198    fn run(&mut self) {
199        loop {
200            // Wait for events to come from our select() new channels are added to
201            // our ReceiverSet below.
202            let results = match self.ipc_receiver_set.select() {
203                Ok(results) => results,
204                Err(_) => break,
205            };
206
207            // Iterate over numerous events that were ready at this time.
208            for result in results.into_iter() {
209                match result {
210                    // Message came from the RouterProxy. Listen on our `msg_receiver`
211                    // channel.
212                    IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
213                        match self.msg_receiver.recv().unwrap() {
214                            RouterMsg::AddRoute(receiver, handler) => {
215                                let new_receiver_id =
216                                    self.ipc_receiver_set.add_opaque(receiver).unwrap();
217                                self.handlers.insert(new_receiver_id, handler);
218                            },
219                            RouterMsg::Shutdown(sender) => {
220                                sender
221                                    .send(())
222                                    .expect("Failed to send comfirmation of shutdown.");
223                                return;
224                            },
225                        }
226                    },
227                    // Event from one of our registered receivers, call callback.
228                    IpcSelectionResult::MessageReceived(id, message) => {
229                        self.handlers.get_mut(&id).unwrap()(message)
230                    },
231                    IpcSelectionResult::ChannelClosed(id) => {
232                        let _ = self.handlers.remove(&id).unwrap();
233                    },
234                }
235            }
236        }
237    }
238}
239
240enum RouterMsg {
241    /// Register the receiver OpaqueIpcReceiver for listening for events on.
242    /// When a message comes from this receiver, call RouterHandler.
243    AddRoute(OpaqueIpcReceiver, RouterHandler),
244    /// Shutdown the router, providing a sender to send an acknowledgement.
245    Shutdown(Sender<()>),
246}
247
248/// Function to call when a new event is received from the corresponding receiver.
249pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
250
251/// Like [RouterHandler] but includes the type that will be passed to the callback
252pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;