1use std::io::{self, ErrorKind, Read, Write};
9use std::net::{Shutdown, SocketAddr, TcpStream};
10use std::sync::{Arc, Mutex};
11
12use log::debug;
13use malloc_size_of_derive::MallocSizeOf;
14use serde::Serialize;
15use serde_json::{self, Value, json};
16
17use crate::actor::ActorError;
18
19#[derive(Serialize)]
20#[serde(rename_all = "camelCase")]
21pub(crate) struct ActorDescription {
22 pub category: &'static str,
23 pub type_name: &'static str,
24 pub methods: Vec<Method>,
25}
26
27#[derive(Serialize)]
28pub(crate) struct Method {
29 pub name: &'static str,
30 pub request: Value,
31 pub response: Value,
32}
33
34pub trait JsonPacketStream {
35 fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError>;
36 fn read_json_packet(&mut self) -> Result<Option<Value>, String>;
37}
38
39impl JsonPacketStream for TcpStream {
40 fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
41 let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
42 debug!("<- {}", s);
43 write!(self, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)?;
44 Ok(())
45 }
46
47 fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
48 let mut buffer = vec![];
51 loop {
52 let mut buf = [0];
53 let byte = match self.read(&mut buf) {
54 Ok(0) => return Ok(None), Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), Ok(1) => buf[0],
57 Ok(_) => unreachable!(),
58 Err(e) => return Err(e.to_string()),
59 };
60 match byte {
61 b':' => {
62 let packet_len_str = match String::from_utf8(buffer) {
63 Ok(packet_len) => packet_len,
64 Err(_) => return Err("nonvalid UTF8 in packet length".to_owned()),
65 };
66 let packet_len = match packet_len_str.parse::<u64>() {
67 Ok(packet_len) => packet_len,
68 Err(_) => return Err("packet length missing / not parsable".to_owned()),
69 };
70 let mut packet = String::new();
71 self.take(packet_len)
72 .read_to_string(&mut packet)
73 .map_err(|e| e.to_string())?;
74 debug!("{}", packet);
75 return match serde_json::from_str(&packet) {
76 Ok(json) => Ok(Some(json)),
77 Err(err) => Err(err.to_string()),
78 };
79 },
80 c => buffer.push(c),
81 }
82 }
83 }
84}
85
86#[derive(Clone, MallocSizeOf)]
89pub(crate) struct DevtoolsConnection {
90 #[conditional_malloc_size_of]
96 receiver: Arc<Mutex<TcpStream>>,
97 #[conditional_malloc_size_of]
99 sender: Arc<Mutex<TcpStream>>,
100}
101
102impl From<TcpStream> for DevtoolsConnection {
103 fn from(value: TcpStream) -> Self {
104 Self {
105 receiver: Arc::new(Mutex::new(value.try_clone().unwrap())),
106 sender: Arc::new(Mutex::new(value)),
107 }
108 }
109}
110
111impl DevtoolsConnection {
112 pub(crate) fn peer_addr(&self) -> io::Result<SocketAddr> {
114 self.receiver.lock().unwrap().peer_addr()
115 }
116
117 pub(crate) fn shutdown(&self, how: Shutdown) -> io::Result<()> {
119 self.receiver.lock().unwrap().shutdown(how)
120 }
121}
122
123impl JsonPacketStream for DevtoolsConnection {
124 fn write_json_packet<T: serde::Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
125 let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
126 log::debug!("<- {}", s);
127 let mut stream = self.sender.lock().unwrap();
128 write!(*stream, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)
129 }
130
131 fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
132 let mut buffer = vec![];
135 let mut stream = self.receiver.lock().unwrap();
137 loop {
138 let mut buf = [0];
139 match (*stream).read(&mut buf) {
140 Ok(0) => return Ok(None), Ok(1) if buf[0] == b':' => {
142 let packet_len_str = String::from_utf8(buffer)
143 .map_err(|_| "nonvalid UTF8 in packet length".to_owned())?;
144 let packet_len = packet_len_str
145 .parse::<u64>()
146 .map_err(|_| "packet length missing / not parsable".to_owned())?;
147 let mut packet = String::new();
148 stream
149 .try_clone()
153 .unwrap()
154 .take(packet_len)
155 .read_to_string(&mut packet)
156 .map_err(|e| e.to_string())?;
157 log::debug!("{}", packet);
158 return serde_json::from_str(&packet)
159 .map(Some)
160 .map_err(|e| e.to_string());
161 },
162 Ok(1) => buffer.push(buf[0]),
163 Ok(_) => unreachable!(),
164 Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), Err(e) => return Err(e.to_string()),
166 }
167 }
168 }
169}
170
171pub(crate) struct ClientRequest<'req, 'handled> {
193 stream: DevtoolsConnection,
195 actor_name: &'req str,
197 handled: &'handled mut bool,
199}
200
201impl ClientRequest<'_, '_> {
202 pub fn handle<'req>(
206 stream: DevtoolsConnection,
207 actor_name: &'req str,
208 handler: impl FnOnce(ClientRequest<'req, '_>) -> Result<(), ActorError>,
209 ) -> Result<(), ActorError> {
210 let mut sent = false;
211 let request = ClientRequest {
212 stream,
213 actor_name,
214 handled: &mut sent,
215 };
216 handler(request)?;
217
218 if sent {
219 Ok(())
220 } else {
221 Err(ActorError::UnrecognizedPacketType)
222 }
223 }
224}
225
226impl<'req> ClientRequest<'req, '_> {
227 pub fn reply<T: Serialize>(mut self, reply: &T) -> Result<Self, ActorError> {
232 debug_assert!(self.is_valid_reply(reply), "Message is not a valid reply");
233 self.stream.write_json_packet(reply)?;
234 *self.handled = true;
235 Ok(self)
236 }
237
238 pub fn reply_final<T: Serialize>(self, reply: &T) -> Result<(), ActorError> {
240 let _stream = self.reply(reply)?;
241 Ok(())
242 }
243
244 fn is_valid_reply<T: Serialize>(&self, message: &T) -> bool {
248 let reply = json!(message);
249 reply.get("from").and_then(|from| from.as_str()) == Some(self.actor_name) &&
250 reply.get("to").is_none() &&
251 reply.get("type").is_none()
252 }
253
254 pub fn mark_handled(self) -> Self {
256 *self.handled = true;
257 self
258 }
259
260 pub fn stream(&self) -> DevtoolsConnection {
262 self.stream.clone()
263 }
264}
265
266impl JsonPacketStream for ClientRequest<'_, '_> {
268 fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
269 debug_assert!(
270 !self.is_valid_reply(message),
271 "Replies must use reply() or reply_final()"
272 );
273 self.stream.write_json_packet(message)
274 }
275
276 fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
277 self.stream.read_json_packet()
278 }
279}