devtools/
protocol.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Low-level wire protocol implementation. Currently only supports
6//! [JSON packets](https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets).
7
8use std::io::{self, ErrorKind, Read, Write};
9use std::net::{Shutdown, SocketAddr, TcpStream};
10use std::sync::{Arc, Mutex};
11
12use log::debug;
13use malloc_size_of_derive::MallocSizeOf;
14use serde::Serialize;
15use serde_json::{self, Value, json};
16
17use crate::actor::ActorError;
18
19#[derive(Serialize)]
20#[serde(rename_all = "camelCase")]
21pub(crate) struct ActorDescription {
22    pub category: &'static str,
23    pub type_name: &'static str,
24    pub methods: Vec<Method>,
25}
26
27#[derive(Serialize)]
28pub(crate) struct Method {
29    pub name: &'static str,
30    pub request: Value,
31    pub response: Value,
32}
33
34pub trait JsonPacketStream {
35    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError>;
36    fn read_json_packet(&mut self) -> Result<Option<Value>, String>;
37}
38
39impl JsonPacketStream for TcpStream {
40    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
41        let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
42        debug!("<- {}", s);
43        write!(self, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)?;
44        Ok(())
45    }
46
47    fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
48        // https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#stream-transport
49        // In short, each JSON packet is [ascii length]:[JSON data of given length]
50        let mut buffer = vec![];
51        loop {
52            let mut buf = [0];
53            let byte = match self.read(&mut buf) {
54                Ok(0) => return Ok(None),                                            // EOF
55                Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), // EOF
56                Ok(1) => buf[0],
57                Ok(_) => unreachable!(),
58                Err(e) => return Err(e.to_string()),
59            };
60            match byte {
61                b':' => {
62                    let packet_len_str = match String::from_utf8(buffer) {
63                        Ok(packet_len) => packet_len,
64                        Err(_) => return Err("nonvalid UTF8 in packet length".to_owned()),
65                    };
66                    let packet_len = match packet_len_str.parse::<u64>() {
67                        Ok(packet_len) => packet_len,
68                        Err(_) => return Err("packet length missing / not parsable".to_owned()),
69                    };
70                    let mut packet = String::new();
71                    self.take(packet_len)
72                        .read_to_string(&mut packet)
73                        .map_err(|e| e.to_string())?;
74                    debug!("{}", packet);
75                    return match serde_json::from_str(&packet) {
76                        Ok(json) => Ok(Some(json)),
77                        Err(err) => Err(err.to_string()),
78                    };
79                },
80                c => buffer.push(c),
81            }
82        }
83    }
84}
85
86/// Wraps a Remote Debugging Protocol TCP stream, guaranteeing that network
87/// operations are synchronized when cloning across threads.
88#[derive(Clone, MallocSizeOf)]
89pub(crate) struct DevtoolsConnection {
90    /// Copy of [`TcpStream`] handle to use for receiving from the client.
91    ///
92    /// `TcpStream::read` is a mutating I/O operation that doesn't fit with `RwLock`.
93    /// We clone a single stream handle into two mutexes so we can still lock
94    /// reads and writes independently.
95    #[conditional_malloc_size_of]
96    receiver: Arc<Mutex<TcpStream>>,
97    /// Copy of [`TcpStream`] handle to use for sending bytes to the client.
98    #[conditional_malloc_size_of]
99    sender: Arc<Mutex<TcpStream>>,
100}
101
102impl From<TcpStream> for DevtoolsConnection {
103    fn from(value: TcpStream) -> Self {
104        Self {
105            receiver: Arc::new(Mutex::new(value.try_clone().unwrap())),
106            sender: Arc::new(Mutex::new(value)),
107        }
108    }
109}
110
111impl DevtoolsConnection {
112    /// Calls [`TcpStream::peer_addr`] on the underlying stream.
113    pub(crate) fn peer_addr(&self) -> io::Result<SocketAddr> {
114        self.receiver.lock().unwrap().peer_addr()
115    }
116
117    /// Calls [`TcpStream::shutdown`] on the underlying stream.
118    pub(crate) fn shutdown(&self, how: Shutdown) -> io::Result<()> {
119        self.receiver.lock().unwrap().shutdown(how)
120    }
121}
122
123impl JsonPacketStream for DevtoolsConnection {
124    fn write_json_packet<T: serde::Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
125        let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
126        log::debug!("<- {}", s);
127        let mut stream = self.sender.lock().unwrap();
128        write!(*stream, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)
129    }
130
131    fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
132        // https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#stream-transport
133        // In short, each JSON packet is [ascii length]:[JSON data of given length]
134        let mut buffer = vec![];
135        // Guard should be held until exactly one complete message has been read.
136        let mut stream = self.receiver.lock().unwrap();
137        loop {
138            let mut buf = [0];
139            match (*stream).read(&mut buf) {
140                Ok(0) => return Ok(None), // EOF
141                Ok(1) if buf[0] == b':' => {
142                    let packet_len_str = String::from_utf8(buffer)
143                        .map_err(|_| "nonvalid UTF8 in packet length".to_owned())?;
144                    let packet_len = packet_len_str
145                        .parse::<u64>()
146                        .map_err(|_| "packet length missing / not parsable".to_owned())?;
147                    let mut packet = String::new();
148                    stream
149                        // Temporarily clone stream to allow the object to be
150                        // moved out of the guard. This is okay as long as we
151                        // guarantee that the clone does not outlive the guard.
152                        .try_clone()
153                        .unwrap()
154                        .take(packet_len)
155                        .read_to_string(&mut packet)
156                        .map_err(|e| e.to_string())?;
157                    log::debug!("{}", packet);
158                    return serde_json::from_str(&packet)
159                        .map(Some)
160                        .map_err(|e| e.to_string());
161                },
162                Ok(1) => buffer.push(buf[0]),
163                Ok(_) => unreachable!(),
164                Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), // EOF
165                Err(e) => return Err(e.to_string()),
166            }
167        }
168    }
169}
170
171/// Wrapper around a client stream that guarantees request/reply invariants.
172///
173/// Client messages, which are always requests, are dispatched to Actor instances one at a time via
174/// [`crate::Actor::handle_message`]. In most cases, each request must be paired with exactly one
175/// reply from the same actor the request was sent to, where a reply is a message with no type. (If
176/// a message from the server has a type, it’s a notification, not a reply).
177///
178/// Unless a request is of one of the few types considered "one-way", failing to reply will almost
179/// always permanently break that actor, because either the client gets stuck waiting for a reply,
180/// or the client receives the reply for a subsequent request as if it was the reply for the current
181/// request. If an actor fails to reply to a request, we want the dispatcher
182/// ([`crate::ActorRegistry::handle_message`]) to send an error of type `unrecognizedPacketType`,
183/// to keep the conversation for that actor in sync.
184///
185/// Since replies come in all shapes and sizes, we want to allow Actor types to send replies without
186/// having to return them to the dispatcher. This wrapper type allows the dispatcher to check if a
187/// valid reply was sent, and guarantees that if the actor tries to send a reply, it’s actually a
188/// valid reply (see [`Self::is_valid_reply`]).
189///
190/// It does not currently guarantee anything about messages sent via the [`DevtoolsConnection`]
191/// released via [`Self::stream`] or the return value of [`Self::reply`].
192pub(crate) struct ClientRequest<'req, 'handled> {
193    /// Client stream.
194    stream: DevtoolsConnection,
195    /// Expected actor name.
196    actor_name: &'req str,
197    /// Flag allowing ActorRegistry to check for unhandled requests.
198    handled: &'handled mut bool,
199}
200
201impl ClientRequest<'_, '_> {
202    /// Run the given handler, with a new request that wraps the given client stream and expected actor name.
203    ///
204    /// Returns [`ActorError::UnrecognizedPacketType`] if the actor did not send a reply.
205    pub fn handle<'req>(
206        stream: DevtoolsConnection,
207        actor_name: &'req str,
208        handler: impl FnOnce(ClientRequest<'req, '_>) -> Result<(), ActorError>,
209    ) -> Result<(), ActorError> {
210        let mut sent = false;
211        let request = ClientRequest {
212            stream,
213            actor_name,
214            handled: &mut sent,
215        };
216        handler(request)?;
217
218        if sent {
219            Ok(())
220        } else {
221            Err(ActorError::UnrecognizedPacketType)
222        }
223    }
224}
225
226impl<'req> ClientRequest<'req, '_> {
227    /// Send the given reply to the request being handled.
228    ///
229    /// If successful, sets the sent flag and returns the underlying stream,
230    /// allowing other messages to be sent after replying to a request.
231    pub fn reply<T: Serialize>(mut self, reply: &T) -> Result<Self, ActorError> {
232        debug_assert!(self.is_valid_reply(reply), "Message is not a valid reply");
233        self.stream.write_json_packet(reply)?;
234        *self.handled = true;
235        Ok(self)
236    }
237
238    /// Like `reply`, but for cases where the actor no longer needs the stream.
239    pub fn reply_final<T: Serialize>(self, reply: &T) -> Result<(), ActorError> {
240        let _stream = self.reply(reply)?;
241        Ok(())
242    }
243
244    /// Return true iff the given message is a reply (has no `type` or `to`), and is from the expected actor.
245    ///
246    /// This incurs a runtime conversion to a BTreeMap, so it should only be used in debug assertions.
247    fn is_valid_reply<T: Serialize>(&self, message: &T) -> bool {
248        let reply = json!(message);
249        reply.get("from").and_then(|from| from.as_str()) == Some(self.actor_name) &&
250            reply.get("to").is_none() &&
251            reply.get("type").is_none()
252    }
253
254    /// Manually mark the request as handled, for one-way message types.
255    pub fn mark_handled(self) -> Self {
256        *self.handled = true;
257        self
258    }
259
260    /// Get a copy of the client connection.
261    pub fn stream(&self) -> DevtoolsConnection {
262        self.stream.clone()
263    }
264}
265
266/// Actors can also send other messages before replying to a request.
267impl JsonPacketStream for ClientRequest<'_, '_> {
268    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
269        debug_assert!(
270            !self.is_valid_reply(message),
271            "Replies must use reply() or reply_final()"
272        );
273        self.stream.write_json_packet(message)
274    }
275
276    fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
277        self.stream.read_json_packet()
278    }
279}