devtools/
protocol.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Low-level wire protocol implementation. Currently only supports
6//! [JSON packets](https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets).
7
8use std::io::{ErrorKind, Read, Write};
9use std::net::TcpStream;
10
11use log::debug;
12use serde::Serialize;
13use serde_json::{self, Value, json};
14
15use crate::actor::ActorError;
16
17#[derive(Serialize)]
18#[serde(rename_all = "camelCase")]
19pub struct ActorDescription {
20    pub category: &'static str,
21    pub type_name: &'static str,
22    pub methods: Vec<Method>,
23}
24
25#[derive(Serialize)]
26pub struct Method {
27    pub name: &'static str,
28    pub request: Value,
29    pub response: Value,
30}
31
32pub trait JsonPacketStream {
33    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError>;
34    fn read_json_packet(&mut self) -> Result<Option<Value>, String>;
35}
36
37impl JsonPacketStream for TcpStream {
38    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
39        let s = serde_json::to_string(message).map_err(|_| ActorError::Internal)?;
40        debug!("<- {}", s);
41        write!(self, "{}:{}", s.len(), s).map_err(|_| ActorError::Internal)?;
42        Ok(())
43    }
44
45    fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
46        // https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#stream-transport
47        // In short, each JSON packet is [ascii length]:[JSON data of given length]
48        let mut buffer = vec![];
49        loop {
50            let mut buf = [0];
51            let byte = match self.read(&mut buf) {
52                Ok(0) => return Ok(None),                                            // EOF
53                Err(e) if e.kind() == ErrorKind::ConnectionReset => return Ok(None), // EOF
54                Ok(1) => buf[0],
55                Ok(_) => unreachable!(),
56                Err(e) => return Err(e.to_string()),
57            };
58            match byte {
59                b':' => {
60                    let packet_len_str = match String::from_utf8(buffer) {
61                        Ok(packet_len) => packet_len,
62                        Err(_) => return Err("nonvalid UTF8 in packet length".to_owned()),
63                    };
64                    let packet_len = match packet_len_str.parse::<u64>() {
65                        Ok(packet_len) => packet_len,
66                        Err(_) => return Err("packet length missing / not parsable".to_owned()),
67                    };
68                    let mut packet = String::new();
69                    self.take(packet_len)
70                        .read_to_string(&mut packet)
71                        .map_err(|e| e.to_string())?;
72                    debug!("{}", packet);
73                    return match serde_json::from_str(&packet) {
74                        Ok(json) => Ok(Some(json)),
75                        Err(err) => Err(err.to_string()),
76                    };
77                },
78                c => buffer.push(c),
79            }
80        }
81    }
82}
83
84/// Wrapper around a client stream that guarantees request/reply invariants.
85///
86/// Client messages, which are always requests, are dispatched to Actor instances one at a time via
87/// [`crate::Actor::handle_message`]. Each request must be paired with exactly one reply from the
88/// same actor the request was sent to, where a reply is a message with no type (if a message from
89/// the server has a type, it’s a notification, not a reply).
90///
91/// Failing to reply to a request will almost always permanently break that actor, because either
92/// the client gets stuck waiting for a reply, or the client receives the reply for a subsequent
93/// request as if it was the reply for the current request. If an actor fails to reply to a request,
94/// we want the dispatcher ([`crate::ActorRegistry::handle_message`]) to send an error of type
95/// `unrecognizedPacketType`, to keep the conversation for that actor in sync.
96///
97/// Since replies come in all shapes and sizes, we want to allow Actor types to send replies without
98/// having to return them to the dispatcher. This wrapper type allows the dispatcher to check if a
99/// valid reply was sent, and guarantees that if the actor tries to send a reply, it’s actually a
100/// valid reply (see [`Self::is_valid_reply`]).
101///
102/// It does not currently guarantee anything about messages sent via the [`TcpStream`] released via
103/// [`Self::try_clone_stream`] or the return value of [`Self::reply`].
104pub struct ClientRequest<'req, 'sent> {
105    /// Client stream.
106    stream: &'req mut TcpStream,
107    /// Expected actor name.
108    actor_name: &'req str,
109    /// Sent flag, allowing ActorRegistry to check for unhandled requests.
110    sent: &'sent mut bool,
111}
112
113impl ClientRequest<'_, '_> {
114    /// Run the given handler, with a new request that wraps the given client stream and expected actor name.
115    ///
116    /// Returns [`ActorError::UnrecognizedPacketType`] if the actor did not send a reply.
117    pub fn handle<'req>(
118        client: &'req mut TcpStream,
119        actor_name: &'req str,
120        handler: impl FnOnce(ClientRequest<'req, '_>) -> Result<(), ActorError>,
121    ) -> Result<(), ActorError> {
122        let mut sent = false;
123        let request = ClientRequest {
124            stream: client,
125            actor_name,
126            sent: &mut sent,
127        };
128        handler(request)?;
129
130        if sent {
131            Ok(())
132        } else {
133            Err(ActorError::UnrecognizedPacketType)
134        }
135    }
136}
137
138impl<'req> ClientRequest<'req, '_> {
139    /// Send the given reply to the request being handled.
140    ///
141    /// If successful, sets the sent flag and returns the underlying stream,
142    /// allowing other messages to be sent after replying to a request.
143    pub fn reply<T: Serialize>(self, reply: &T) -> Result<&'req mut TcpStream, ActorError> {
144        debug_assert!(self.is_valid_reply(reply), "Message is not a valid reply");
145        self.stream.write_json_packet(reply)?;
146        *self.sent = true;
147        Ok(self.stream)
148    }
149
150    /// Like `reply`, but for cases where the actor no longer needs the stream.
151    pub fn reply_final<T: Serialize>(self, reply: &T) -> Result<(), ActorError> {
152        debug_assert!(self.is_valid_reply(reply), "Message is not a valid reply");
153        let _stream = self.reply(reply)?;
154        Ok(())
155    }
156
157    pub fn try_clone_stream(&self) -> std::io::Result<TcpStream> {
158        self.stream.try_clone()
159    }
160
161    /// Return true iff the given message is a reply (has no `type` or `to`), and is from the expected actor.
162    ///
163    /// This incurs a runtime conversion to a BTreeMap, so it should only be used in debug assertions.
164    fn is_valid_reply<T: Serialize>(&self, message: &T) -> bool {
165        let reply = json!(message);
166        reply.get("from").and_then(|from| from.as_str()) == Some(self.actor_name) &&
167            reply.get("to").is_none() &&
168            reply.get("type").is_none()
169    }
170}
171
172/// Actors can also send other messages before replying to a request.
173impl JsonPacketStream for ClientRequest<'_, '_> {
174    fn write_json_packet<T: Serialize>(&mut self, message: &T) -> Result<(), ActorError> {
175        debug_assert!(
176            !self.is_valid_reply(message),
177            "Replies must use reply() or reply_final()"
178        );
179        self.stream.write_json_packet(message)
180    }
181
182    fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
183        self.stream.read_json_packet()
184    }
185}