1use std::cell::RefCell;
6
7use base::generic_channel::{GenericSender, SendError, SendResult};
8use crossbeam_channel::{Receiver, Sender, TryRecvError, unbounded};
9use log::warn;
10use serde::Serialize;
11use tokio::sync::mpsc::UnboundedSender as TokioSender;
12use tokio::sync::oneshot::Sender as TokioOneshotSender;
13
14use crate::ServoError;
15
16pub(crate) struct ServoErrorSender {
20 sender: Sender<ServoError>,
21}
22
23impl ServoErrorSender {
24 pub(crate) fn raise_response_send_error(&self, error: SendError) {
25 if let Err(error) = self.sender.send(ServoError::ResponseFailedToSend(error)) {
26 warn!("Failed to send Servo error: {error:?}");
27 }
28 }
29}
30
31pub(crate) struct ServoErrorChannel {
35 sender: Sender<ServoError>,
36 receiver: Receiver<ServoError>,
37}
38
39impl Default for ServoErrorChannel {
40 fn default() -> Self {
41 let (sender, receiver) = unbounded();
42 Self { sender, receiver }
43 }
44}
45
46impl ServoErrorChannel {
47 pub(crate) fn sender(&self) -> ServoErrorSender {
48 ServoErrorSender {
49 sender: self.sender.clone(),
50 }
51 }
52
53 pub(crate) fn try_recv(&self) -> Option<ServoError> {
54 match self.receiver.try_recv() {
55 Ok(result) => Some(result),
56 Err(error) => {
57 debug_assert_eq!(error, TryRecvError::Empty);
58 None
59 },
60 }
61 }
62}
63
64pub(crate) trait AbstractSender {
65 type Message;
66 fn send(&self, value: Self::Message) -> SendResult;
67}
68
69impl<T: Serialize> AbstractSender for GenericSender<T> {
70 type Message = T;
71 fn send(&self, value: T) -> SendResult {
72 GenericSender::send(self, value)
73 }
74}
75
76impl<T> AbstractSender for TokioSender<T> {
77 type Message = T;
78 fn send(&self, value: T) -> SendResult {
79 TokioSender::send(self, value).map_err(|_| SendError::Disconnected)
80 }
81}
82
83pub(crate) struct OneshotSender<T>(RefCell<Option<TokioOneshotSender<T>>>);
84
85impl<T> From<TokioOneshotSender<T>> for OneshotSender<T> {
86 fn from(sender: TokioOneshotSender<T>) -> Self {
87 Self(RefCell::new(Some(sender)))
88 }
89}
90
91impl<T> AbstractSender for OneshotSender<T> {
92 type Message = T;
93 fn send(&self, value: T) -> SendResult {
94 let sender = self.0.borrow_mut().take();
95 if let Some(sender) = sender {
96 TokioOneshotSender::send(sender, value).map_err(|_| SendError::Disconnected)
97 } else {
98 Err(SendError::Disconnected)
99 }
100 }
101}
102
103pub(crate) struct IpcResponder<T> {
105 response_sender: Box<dyn AbstractSender<Message = T>>,
106 response_sent: bool,
107 default_response: Option<T>,
109}
110
111impl<T: Serialize + 'static> IpcResponder<T> {
112 pub(crate) fn new(response_sender: GenericSender<T>, default_response: T) -> Self {
113 Self {
114 response_sender: Box::new(response_sender),
115 response_sent: false,
116 default_response: Some(default_response),
117 }
118 }
119}
120
121impl<T: 'static> IpcResponder<T> {
122 pub(crate) fn new_same_process(
123 response_sender: Box<dyn AbstractSender<Message = T>>,
124 default_response: T,
125 ) -> Self {
126 Self {
127 response_sender,
128 response_sent: false,
129 default_response: Some(default_response),
130 }
131 }
132}
133
134impl<T> IpcResponder<T> {
135 pub(crate) fn send(&mut self, response: T) -> SendResult {
136 let result = self.response_sender.send(response);
137 self.response_sent = true;
138 result
139 }
140}
141
142impl<T> Drop for IpcResponder<T> {
143 fn drop(&mut self) {
144 if !self.response_sent {
145 let response = self
146 .default_response
147 .take()
148 .expect("Guaranteed by inherent impl");
149 let _ = self.response_sender.send(response);
152 }
153 }
154}