1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
4use tracing::{Instrument, debug, instrument, trace, trace_span};
5
6use zbus_names::InterfaceName;
7use zvariant::{ObjectPath, Value};
8
9use crate::{
10 Connection, Error, Result,
11 async_lock::RwLock,
12 connection::WeakConnection,
13 fdo,
14 fdo::ObjectManager,
15 message::{Header, Message},
16};
17
18mod interface;
19pub(crate) use interface::ArcInterface;
20#[allow(deprecated)]
21pub use interface::DispatchResult;
22pub use interface::{DispatchResult2, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
23
24mod signal_emitter;
25pub use signal_emitter::SignalEmitter;
26#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
27pub type SignalContext<'s> = SignalEmitter<'s>;
28
29mod dispatch_notifier;
30pub use dispatch_notifier::ResponseDispatchNotifier;
31
32mod node;
33pub(crate) use node::Node;
34
35#[derive(Debug, Clone)]
93pub struct ObjectServer {
94 conn: WeakConnection,
95 root: Arc<RwLock<Node>>,
96}
97
98impl ObjectServer {
99 pub(crate) fn new(conn: &Connection) -> Self {
101 Self {
102 conn: conn.into(),
103 root: Arc::new(RwLock::new(Node::new(
104 "/".try_into().expect("zvariant bug"),
105 ))),
106 }
107 }
108
109 pub(crate) fn root(&self) -> &RwLock<Node> {
110 &self.root
111 }
112
113 pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
122 where
123 I: Interface,
124 P: TryInto<ObjectPath<'p>>,
125 P::Error: Into<Error>,
126 {
127 self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
128 .await
129 }
130
131 pub(crate) async fn add_arc_interface<'p, P>(
132 &self,
133 path: P,
134 name: InterfaceName<'static>,
135 arc_iface: ArcInterface,
136 ) -> Result<bool>
137 where
138 P: TryInto<ObjectPath<'p>>,
139 P::Error: Into<Error>,
140 {
141 let path = path.try_into().map_err(Into::into)?;
142 let mut root = self.root().write().await;
143 let (node, manager_path) = root.get_child_mut(&path, true);
144 let node = node.unwrap();
145 let added = node.add_arc_interface(name.clone(), arc_iface);
146 if added {
147 if name == ObjectManager::name() {
148 let emitter = SignalEmitter::new(&self.connection(), path)?;
150 let objects = node.get_managed_objects(self, &self.connection()).await?;
151 for (path, owned_interfaces) in objects {
152 let interfaces = owned_interfaces
153 .iter()
154 .map(|(i, props)| {
155 let props = props
156 .iter()
157 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
158 .collect::<Result<_>>();
159 Ok((i.into(), props?))
160 })
161 .collect::<Result<_>>()?;
162 ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
163 }
164 } else if let Some(manager_path) = manager_path {
165 let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
166 let mut interfaces = HashMap::new();
167 let owned_props = node
168 .get_properties(self, &self.connection(), name.clone())
169 .await?;
170 let props = owned_props
171 .iter()
172 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
173 .collect::<Result<_>>()?;
174 interfaces.insert(name, props);
175
176 ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
177 }
178 }
179
180 Ok(added)
181 }
182
183 pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
188 where
189 I: Interface,
190 P: TryInto<ObjectPath<'p>>,
191 P::Error: Into<Error>,
192 {
193 let path = path.try_into().map_err(Into::into)?;
194 let mut root = self.root.write().await;
195 let (node, manager_path) = root.get_child_mut(&path, false);
196 let node = node.ok_or(Error::InterfaceNotFound)?;
197 if !node.remove_interface(I::name()) {
198 return Err(Error::InterfaceNotFound);
199 }
200 if let Some(manager_path) = manager_path {
201 let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
202 ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
203 }
204 if node.is_empty() {
205 let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
206 let last_part = path_parts.next().unwrap();
207 let ppath = ObjectPath::from_string_unchecked(
208 path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
209 );
210 root.get_child_mut(&ppath, false)
211 .0
212 .unwrap()
213 .remove_node(last_part);
214 return Ok(true);
215 }
216 Ok(false)
217 }
218
219 pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
262 where
263 I: Interface,
264 P: TryInto<ObjectPath<'p>>,
265 P::Error: Into<Error>,
266 {
267 let path = path.try_into().map_err(Into::into)?;
268 let root = self.root().read().await;
269 let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
270
271 let lock = node
272 .interface_lock(I::name())
273 .ok_or(Error::InterfaceNotFound)?
274 .instance
275 .clone();
276
277 lock.read()
279 .await
280 .downcast_ref::<I>()
281 .ok_or(Error::InterfaceNotFound)?;
282
283 let conn = self.connection();
284 let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
286
287 Ok(InterfaceRef {
288 emitter,
289 lock,
290 phantom: PhantomData,
291 })
292 }
293
294 async fn dispatch_call_to_iface(
295 &self,
296 iface: Arc<RwLock<dyn Interface>>,
297 connection: &Connection,
298 msg: &Message,
299 hdr: &Header<'_>,
300 ) -> fdo::Result<()> {
301 let member = hdr
302 .member()
303 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
304 let iface_name = hdr
305 .interface()
306 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
307
308 trace!("acquiring read lock on interface `{}`", iface_name);
309 let read_lock = iface.read().await;
310 trace!("acquired read lock on interface `{}`", iface_name);
311 match read_lock.call(self, connection, msg, member.as_ref()) {
312 DispatchResult2::NotFound => {
313 return Err(fdo::Error::UnknownMethod(format!(
314 "Unknown method '{member}'"
315 )));
316 }
317 DispatchResult2::Async(f) => {
318 return f.await;
319 }
320 DispatchResult2::RequiresMut => {}
321 }
322 drop(read_lock);
323 trace!("acquiring write lock on interface `{}`", iface_name);
324 let mut write_lock = iface.write().await;
325 trace!("acquired write lock on interface `{}`", iface_name);
326 match write_lock.call_mut(self, connection, msg, member.as_ref()) {
327 DispatchResult2::NotFound => {}
328 DispatchResult2::RequiresMut => {}
329 DispatchResult2::Async(f) => {
330 return f.await;
331 }
332 }
333 drop(write_lock);
334 Err(fdo::Error::UnknownMethod(format!(
335 "Unknown method '{member}'"
336 )))
337 }
338
339 async fn dispatch_method_call_try(
340 &self,
341 connection: &Connection,
342 msg: &Message,
343 hdr: &Header<'_>,
344 ) -> fdo::Result<()> {
345 let path = hdr
346 .path()
347 .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
348 let iface_name = hdr
349 .interface()
350 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
356 let _ = hdr
361 .member()
362 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
363
364 let (iface, with_spawn) = {
367 let root = self.root.read().await;
368
369 let path = if *iface_name == fdo::Peer::name() {
373 ObjectPath::from_static_str_unchecked("/")
374 } else {
375 path.clone()
376 };
377
378 let node = root
379 .get_child(&path)
380 .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
381
382 let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
383 fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
384 })?;
385 (iface.instance, iface.spawn_tasks_for_methods)
386 };
387
388 if with_spawn {
389 let executor = connection.executor().clone();
390 let task_name = format!("`{msg}` method dispatcher");
391 let connection = connection.clone();
392 let msg = msg.clone();
393 executor
394 .spawn(
395 async move {
396 let server = connection.object_server();
397 let hdr = msg.header();
398 if let Err(e) = server
399 .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
400 .await
401 {
402 debug!("Returning error: {}", e);
404 if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
405 debug!(
406 "Error dispatching message. Message: {:?}, error: {:?}",
407 msg, e
408 );
409 }
410 }
411 }
412 .instrument(trace_span!("{}", task_name)),
413 &task_name,
414 )
415 .detach();
416 Ok(())
417 } else {
418 self.dispatch_call_to_iface(iface, connection, msg, hdr)
419 .await
420 }
421 }
422
423 #[instrument(skip(self))]
436 pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
437 let conn = self.connection();
438
439 if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
440 debug!("Returning error: {}", e);
441 conn.reply_dbus_error(hdr, e).await?;
442 }
443 trace!("Handled: {}", msg);
444
445 Ok(())
446 }
447
448 pub(crate) fn connection(&self) -> Connection {
449 self.conn
450 .upgrade()
451 .expect("ObjectServer can't exist w/o an associated Connection")
452 }
453}
454
455#[cfg(feature = "blocking-api")]
456impl From<crate::blocking::ObjectServer> for ObjectServer {
457 fn from(server: crate::blocking::ObjectServer) -> Self {
458 server.into_inner()
459 }
460}