zbus/connection/
socket_reader.rs1use std::{collections::HashMap, sync::Arc};
2
3use event_listener::Event;
4use tracing::{debug, instrument, trace};
5
6use crate::{
7 async_lock::Mutex, connection::MsgBroadcaster, Executor, Message, OwnedMatchRule, Task,
8};
9
10use super::socket::ReadHalf;
11
12#[derive(Debug)]
13pub(crate) struct SocketReader {
14 socket: Box<dyn ReadHalf>,
15 senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
16 already_received_bytes: Vec<u8>,
17 #[cfg(unix)]
18 already_received_fds: Vec<std::os::fd::OwnedFd>,
19 prev_seq: u64,
20 activity_event: Arc<Event>,
21}
22
23impl SocketReader {
24 pub fn new(
25 socket: Box<dyn ReadHalf>,
26 senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
27 already_received_bytes: Vec<u8>,
28 #[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
29 activity_event: Arc<Event>,
30 ) -> Self {
31 Self {
32 socket,
33 senders,
34 already_received_bytes,
35 #[cfg(unix)]
36 already_received_fds,
37 prev_seq: 0,
38 activity_event,
39 }
40 }
41
42 pub fn spawn(self, executor: &Executor<'_>) -> Task<()> {
43 executor.spawn(self.receive_msg(), "socket reader")
44 }
45
46 #[instrument(name = "socket reader", skip(self))]
48 async fn receive_msg(mut self) {
49 loop {
50 trace!("Waiting for message on the socket..");
51 let msg = self.read_socket().await;
52 match &msg {
53 Ok(msg) => trace!("Message received on the socket: {:?}", msg),
54 Err(e) => trace!("Error reading from the socket: {:?}", e),
55 };
56
57 let mut senders = self.senders.lock().await;
58 for (rule, sender) in &*senders {
59 if let Ok(msg) = &msg {
60 if let Some(rule) = rule.as_ref() {
61 match rule.matches(msg) {
62 Ok(true) => (),
63 Ok(false) => continue,
64 Err(e) => {
65 debug!("Error matching message against rule: {:?}", e);
66
67 continue;
68 }
69 }
70 }
71 }
72
73 if let Err(e) = sender.broadcast_direct(msg.clone()).await {
74 if rule.is_some() {
82 trace!(
83 "Error broadcasting message to stream for `{:?}`: {:?}",
84 rule,
85 e
86 );
87 }
88 }
89 }
90 trace!("Broadcasted to all streams: {:?}", msg);
91
92 if msg.is_err() {
93 senders.clear();
94 trace!("Socket reading task stopped");
95
96 return;
97 }
98 }
99 }
100
101 #[instrument(skip(self))]
102 async fn read_socket(&mut self) -> crate::Result<Message> {
103 self.activity_event.notify(usize::MAX);
104 let seq = self.prev_seq + 1;
105 let msg = self
106 .socket
107 .receive_message(
108 seq,
109 &mut self.already_received_bytes,
110 #[cfg(unix)]
111 &mut self.already_received_fds,
112 )
113 .await?;
114 self.prev_seq = seq;
115
116 Ok(msg)
117 }
118}