devtools/protocol.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Low-level wire protocol implementation. Currently only supports
6//! [JSON packets](https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets).
7
8use std::io::{self, ErrorKind, Read, Write};
9use std::net::{Shutdown, SocketAddr, TcpStream};
10use std::sync::{Arc, Mutex};
11
12use malloc_size_of_derive::MallocSizeOf;
13use serde::Serialize;
14use serde_json::{self, Value, json};
15
16use crate::actor::ActorError;
17
18#[derive(Serialize)]
19#[serde(rename_all = "camelCase")]
20pub(crate) struct ActorDescription {
21 pub category: &'static str,
22 pub type_name: &'static str,
23 pub methods: Vec<Method>,
24}
25
26#[derive(Serialize)]
27pub(crate) struct Method {
28 pub name: &'static str,
29 pub request: Value,
30 pub response: Value,
31}
32
33pub trait JsonPacketStream {
34 fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError>;
35 fn read_json_packet(&mut self) -> Result<Option<Value>, String>;
36}
37
38/// Wraps a Remote Debugging Protocol TCP stream, guaranteeing that network
39/// operations are synchronized when cloning across threads.
40#[derive(Clone, MallocSizeOf)]
41pub(crate) struct DevtoolsConnection {
42 /// Copy of [`TcpStream`] handle to use for receiving from the client.
43 ///
44 /// `TcpStream::read` is a mutating I/O operation that doesn't fit with `RwLock`.
45 /// We clone a single stream handle into two mutexes so we can still lock
46 /// reads and writes independently.
47 #[conditional_malloc_size_of]
48 receiver: Arc<Mutex<TcpStream>>,
49 /// Copy of [`TcpStream`] handle to use for sending bytes to the client.
50 #[conditional_malloc_size_of]
51 sender: Arc<Mutex<TcpStream>>,
52}
53
54impl From<TcpStream> for DevtoolsConnection {
55 fn from(value: TcpStream) -> Self {
56 Self {
57 receiver: Arc::new(Mutex::new(value.try_clone().unwrap())),
58 sender: Arc::new(Mutex::new(value)),
59 }
60 }
61}
62
63impl DevtoolsConnection {
64 /// Calls [`TcpStream::peer_addr`] on the underlying stream.
65 pub(crate) fn peer_addr(&self) -> io::Result<SocketAddr> {
66 self.receiver.lock().unwrap().peer_addr()
67 }
68
69 /// Calls [`TcpStream::shutdown`] on the underlying stream.
70 pub(crate) fn shutdown(&self, how: Shutdown) -> io::Result<()> {
71 self.receiver.lock().unwrap().shutdown(how)
72 }
73}
74
75impl JsonPacketStream for DevtoolsConnection {
76 fn write_json_packet<T: serde::Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
77 let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
78 log::debug!("<- {}", s);
79 let mut stream = self.sender.lock().unwrap();
80 write!(*stream, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)
81 }
82
83 fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
84 // https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#stream-transport
85 // In short, each JSON packet is [ascii length]:[JSON data of given length]
86 let mut buffer = vec![];
87 // Guard should be held until exactly one complete message has been read.
88 let mut stream = self.receiver.lock().unwrap();
89 loop {
90 let mut buf = [0];
91 match (*stream).read(&mut buf) {
92 Ok(0) => return Ok(None), // EOF
93 Ok(1) if buf[0] == b':' => {
94 let packet_len_str = String::from_utf8(buffer)
95 .map_err(|_| "nonvalid UTF8 in packet length".to_owned())?;
96 let packet_len = packet_len_str
97 .parse::<u64>()
98 .map_err(|_| "packet length missing / not parsable".to_owned())?;
99 let mut packet = String::new();
100 stream
101 // Temporarily clone stream to allow the object to be
102 // moved out of the guard. This is okay as long as we
103 // guarantee that the clone does not outlive the guard.
104 .try_clone()
105 .unwrap()
106 .take(packet_len)
107 .read_to_string(&mut packet)
108 .map_err(|e| e.to_string())?;
109 log::debug!("{}", packet);
110 return serde_json::from_str(&packet)
111 .map(Some)
112 .map_err(|e| e.to_string());
113 },
114 Ok(1) => buffer.push(buf[0]),
115 Ok(_) => unreachable!(),
116 Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), // EOF
117 Err(e) => return Err(e.to_string()),
118 }
119 }
120 }
121}
122
123/// Wrapper around a client stream that guarantees request/reply invariants.
124///
125/// Client messages, which are always requests, are dispatched to Actor instances one at a time via
126/// [`crate::Actor::handle_message`]. In most cases, each request must be paired with exactly one
127/// reply from the same actor the request was sent to, where a reply is a message with no type. (If
128/// a message from the server has a type, it’s a notification, not a reply).
129///
130/// Unless a request is of one of the few types considered "one-way", failing to reply will almost
131/// always permanently break that actor, because either the client gets stuck waiting for a reply,
132/// or the client receives the reply for a subsequent request as if it was the reply for the current
133/// request. If an actor fails to reply to a request, we want the dispatcher
134/// ([`crate::ActorRegistry::handle_message`]) to send an error of type `unrecognizedPacketType`,
135/// to keep the conversation for that actor in sync.
136///
137/// Since replies come in all shapes and sizes, we want to allow Actor types to send replies without
138/// having to return them to the dispatcher. This wrapper type allows the dispatcher to check if a
139/// valid reply was sent, and guarantees that if the actor tries to send a reply, it’s actually a
140/// valid reply (see [`Self::is_valid_reply`]).
141///
142/// It does not currently guarantee anything about messages sent via the [`DevtoolsConnection`]
143/// released via [`Self::stream`] or the return value of [`Self::reply`].
144pub(crate) struct ClientRequest<'req, 'handled> {
145 /// Client stream.
146 stream: DevtoolsConnection,
147 /// Expected actor name.
148 actor_name: &'req str,
149 /// Flag allowing ActorRegistry to check for unhandled requests.
150 handled: &'handled mut bool,
151}
152
153impl ClientRequest<'_, '_> {
154 /// Run the given handler, with a new request that wraps the given client stream and expected actor name.
155 ///
156 /// Returns [`ActorError::UnrecognizedPacketType`] if the actor did not send a reply.
157 pub fn handle<'req>(
158 stream: DevtoolsConnection,
159 actor_name: &'req str,
160 handler: impl FnOnce(ClientRequest<'req, '_>) -> Result<(), ActorError>,
161 ) -> Result<(), ActorError> {
162 let mut sent = false;
163 let request = ClientRequest {
164 stream,
165 actor_name,
166 handled: &mut sent,
167 };
168 handler(request)?;
169
170 if sent {
171 Ok(())
172 } else {
173 Err(ActorError::UnrecognizedPacketType)
174 }
175 }
176}
177
178impl<'req> ClientRequest<'req, '_> {
179 /// Send the given reply to the request being handled.
180 ///
181 /// If successful, sets the sent flag and returns the underlying stream,
182 /// allowing other messages to be sent after replying to a request.
183 pub fn reply<T: Serialize>(mut self, reply: &T) -> Result<Self, ActorError> {
184 debug_assert!(self.is_valid_reply(reply), "Message is not a valid reply");
185 self.stream.write_json_packet(reply)?;
186 *self.handled = true;
187 Ok(self)
188 }
189
190 /// Like `reply`, but for cases where the actor no longer needs the stream.
191 pub fn reply_final<T: Serialize>(self, reply: &T) -> Result<(), ActorError> {
192 let _stream = self.reply(reply)?;
193 Ok(())
194 }
195
196 /// Return true iff the given message is a reply (has no `type` or `to`), and is from the expected actor.
197 ///
198 /// This incurs a runtime conversion to a BTreeMap, so it should only be used in debug assertions.
199 fn is_valid_reply<T: Serialize>(&self, message: &T) -> bool {
200 let reply = json!(message);
201 reply.get("from").and_then(|from| from.as_str()) == Some(self.actor_name) &&
202 reply.get("to").is_none() &&
203 reply.get("type").is_none()
204 }
205
206 /// Manually mark the request as handled, for one-way message types.
207 pub fn mark_handled(self) -> Self {
208 *self.handled = true;
209 self
210 }
211
212 /// Get a copy of the client connection.
213 pub fn stream(&self) -> DevtoolsConnection {
214 self.stream.clone()
215 }
216}
217
218/// Actors can also send other messages before replying to a request.
219impl JsonPacketStream for ClientRequest<'_, '_> {
220 fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
221 debug_assert!(
222 !self.is_valid_reply(message),
223 "Replies must use reply() or reply_final()"
224 );
225 self.stream.write_json_packet(message)
226 }
227
228 fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
229 self.stream.read_json_packet()
230 }
231}