zbus/object_server/
mod.rs

1//! The object server API.
2
3use std::{collections::HashMap, marker::PhantomData, sync::Arc};
4use tracing::{Instrument, debug, instrument, trace, trace_span};
5
6use zbus_names::InterfaceName;
7use zvariant::{ObjectPath, Value};
8
9use crate::{
10    Connection, Error, Result,
11    async_lock::RwLock,
12    connection::WeakConnection,
13    fdo,
14    fdo::ObjectManager,
15    message::{Header, Message},
16};
17
18mod interface;
19pub(crate) use interface::ArcInterface;
20pub use interface::{DispatchResult, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
21
22mod signal_emitter;
23pub use signal_emitter::SignalEmitter;
24#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
25pub type SignalContext<'s> = SignalEmitter<'s>;
26
27mod dispatch_notifier;
28pub use dispatch_notifier::ResponseDispatchNotifier;
29
30mod node;
31pub(crate) use node::Node;
32
33/// An object server, holding server-side D-Bus objects & interfaces.
34///
35/// Object servers hold interfaces on various object paths, and expose them over D-Bus.
36///
37/// All object paths will have the standard interfaces implemented on your behalf, such as
38/// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`.
39///
40/// # Example
41///
42/// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path`
43/// path.
44///
45/// ```no_run
46/// # use std::error::Error;
47/// use zbus::{Connection, interface};
48/// use event_listener::Event;
49/// # use async_io::block_on;
50///
51/// struct Example {
52///     // Interfaces are owned by the ObjectServer. They can have
53///     // `&mut self` methods.
54///     quit_event: Event,
55/// }
56///
57/// impl Example {
58///     fn new(quit_event: Event) -> Self {
59///         Self { quit_event }
60///     }
61/// }
62///
63/// #[interface(name = "org.myiface.Example")]
64/// impl Example {
65///     // This will be the "Quit" D-Bus method.
66///     async fn quit(&mut self) {
67///         self.quit_event.notify(1);
68///     }
69///
70///     // See `interface` documentation to learn
71///     // how to expose properties & signals as well.
72/// }
73///
74/// # block_on(async {
75/// let connection = Connection::session().await?;
76///
77/// let quit_event = Event::new();
78/// let quit_listener = quit_event.listen();
79/// let interface = Example::new(quit_event);
80/// connection
81///     .object_server()
82///     .at("/org/zbus/path", interface)
83///     .await?;
84///
85/// quit_listener.await;
86/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
87/// # })?;
88/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
89/// ```
90#[derive(Debug, Clone)]
91pub struct ObjectServer {
92    conn: WeakConnection,
93    root: Arc<RwLock<Node>>,
94}
95
96impl ObjectServer {
97    /// Create a new D-Bus `ObjectServer`.
98    pub(crate) fn new(conn: &Connection) -> Self {
99        Self {
100            conn: conn.into(),
101            root: Arc::new(RwLock::new(Node::new(
102                "/".try_into().expect("zvariant bug"),
103            ))),
104        }
105    }
106
107    pub(crate) fn root(&self) -> &RwLock<Node> {
108        &self.root
109    }
110
111    /// Register a D-Bus [`Interface`] at a given path (see the example above).
112    ///
113    /// Typically you'd want your interfaces to be registered immediately after the associated
114    /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead.
115    /// However, there are situations where you'd need to register interfaces dynamically and that's
116    /// where this method becomes useful.
117    ///
118    /// If the interface already exists at this path, returns false.
119    pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
120    where
121        I: Interface,
122        P: TryInto<ObjectPath<'p>>,
123        P::Error: Into<Error>,
124    {
125        self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
126            .await
127    }
128
129    pub(crate) async fn add_arc_interface<'p, P>(
130        &self,
131        path: P,
132        name: InterfaceName<'static>,
133        arc_iface: ArcInterface,
134    ) -> Result<bool>
135    where
136        P: TryInto<ObjectPath<'p>>,
137        P::Error: Into<Error>,
138    {
139        let path = path.try_into().map_err(Into::into)?;
140        let mut root = self.root().write().await;
141        let (node, manager_path) = root.get_child_mut(&path, true);
142        let node = node.unwrap();
143        let added = node.add_arc_interface(name.clone(), arc_iface);
144        if added {
145            if name == ObjectManager::name() {
146                // Just added an object manager. Need to signal all managed objects under it.
147                let emitter = SignalEmitter::new(&self.connection(), path)?;
148                let objects = node.get_managed_objects(self, &self.connection()).await?;
149                for (path, owned_interfaces) in objects {
150                    let interfaces = owned_interfaces
151                        .iter()
152                        .map(|(i, props)| {
153                            let props = props
154                                .iter()
155                                .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
156                                .collect::<Result<_>>();
157                            Ok((i.into(), props?))
158                        })
159                        .collect::<Result<_>>()?;
160                    ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
161                }
162            } else if let Some(manager_path) = manager_path {
163                let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
164                let mut interfaces = HashMap::new();
165                let owned_props = node
166                    .get_properties(self, &self.connection(), name.clone())
167                    .await?;
168                let props = owned_props
169                    .iter()
170                    .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
171                    .collect::<Result<_>>()?;
172                interfaces.insert(name, props);
173
174                ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
175            }
176        }
177
178        Ok(added)
179    }
180
181    /// Unregister a D-Bus [`Interface`] at a given path.
182    ///
183    /// If there are no more interfaces left at that path, destroys the object as well.
184    /// Returns whether the object was destroyed.
185    pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
186    where
187        I: Interface,
188        P: TryInto<ObjectPath<'p>>,
189        P::Error: Into<Error>,
190    {
191        let path = path.try_into().map_err(Into::into)?;
192        let mut root = self.root.write().await;
193        let (node, manager_path) = root.get_child_mut(&path, false);
194        let node = node.ok_or(Error::InterfaceNotFound)?;
195        if !node.remove_interface(I::name()) {
196            return Err(Error::InterfaceNotFound);
197        }
198        if let Some(manager_path) = manager_path {
199            let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
200            ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
201        }
202        if node.is_empty() {
203            let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
204            let last_part = path_parts.next().unwrap();
205            let ppath = ObjectPath::from_string_unchecked(
206                path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
207            );
208            root.get_child_mut(&ppath, false)
209                .0
210                .unwrap()
211                .remove_node(last_part);
212            return Ok(true);
213        }
214        Ok(false)
215    }
216
217    /// Get the interface at the given path.
218    ///
219    /// # Errors
220    ///
221    /// If the interface is not registered at the given path, an `Error::InterfaceNotFound` error is
222    /// returned.
223    ///
224    /// # Examples
225    ///
226    /// The typical use of this is property changes outside of a dispatched handler:
227    ///
228    /// ```no_run
229    /// # use std::error::Error;
230    /// # use zbus::{Connection, interface};
231    /// # use async_io::block_on;
232    /// #
233    /// struct MyIface(u32);
234    ///
235    /// #[interface(name = "org.myiface.MyIface")]
236    /// impl MyIface {
237    ///      #[zbus(property)]
238    ///      async fn count(&self) -> u32 {
239    ///          self.0
240    ///      }
241    /// }
242    ///
243    /// # block_on(async {
244    /// # let connection = Connection::session().await?;
245    /// #
246    /// # let path = "/org/zbus/path";
247    /// # connection.object_server().at(path, MyIface(0)).await?;
248    /// let iface_ref = connection
249    ///     .object_server()
250    ///     .interface::<_, MyIface>(path).await?;
251    /// let mut iface = iface_ref.get_mut().await;
252    /// iface.0 = 42;
253    /// iface.count_changed(iface_ref.signal_emitter()).await?;
254    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
255    /// # })?;
256    /// #
257    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
258    /// ```
259    pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
260    where
261        I: Interface,
262        P: TryInto<ObjectPath<'p>>,
263        P::Error: Into<Error>,
264    {
265        let path = path.try_into().map_err(Into::into)?;
266        let root = self.root().read().await;
267        let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
268
269        let lock = node
270            .interface_lock(I::name())
271            .ok_or(Error::InterfaceNotFound)?
272            .instance
273            .clone();
274
275        // Ensure what we return can later be dowcasted safely.
276        lock.read()
277            .await
278            .downcast_ref::<I>()
279            .ok_or(Error::InterfaceNotFound)?;
280
281        let conn = self.connection();
282        // SAFETY: We know that there is a valid path on the node as we already converted w/o error.
283        let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
284
285        Ok(InterfaceRef {
286            emitter,
287            lock,
288            phantom: PhantomData,
289        })
290    }
291
292    async fn dispatch_call_to_iface(
293        &self,
294        iface: Arc<RwLock<dyn Interface>>,
295        connection: &Connection,
296        msg: &Message,
297        hdr: &Header<'_>,
298    ) -> fdo::Result<()> {
299        let member = hdr
300            .member()
301            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
302        let iface_name = hdr
303            .interface()
304            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
305
306        trace!("acquiring read lock on interface `{}`", iface_name);
307        let read_lock = iface.read().await;
308        trace!("acquired read lock on interface `{}`", iface_name);
309        match read_lock.call(self, connection, msg, member.as_ref()) {
310            DispatchResult::NotFound => {
311                return Err(fdo::Error::UnknownMethod(format!(
312                    "Unknown method '{member}'"
313                )));
314            }
315            DispatchResult::Async(f) => {
316                return f.await.map_err(|e| match e {
317                    Error::FDO(e) => *e,
318                    e => fdo::Error::Failed(format!("{e}")),
319                });
320            }
321            DispatchResult::RequiresMut => {}
322        }
323        drop(read_lock);
324        trace!("acquiring write lock on interface `{}`", iface_name);
325        let mut write_lock = iface.write().await;
326        trace!("acquired write lock on interface `{}`", iface_name);
327        match write_lock.call_mut(self, connection, msg, member.as_ref()) {
328            DispatchResult::NotFound => {}
329            DispatchResult::RequiresMut => {}
330            DispatchResult::Async(f) => {
331                return f.await.map_err(|e| match e {
332                    Error::FDO(e) => *e,
333                    e => fdo::Error::Failed(format!("{e}")),
334                });
335            }
336        }
337        drop(write_lock);
338        Err(fdo::Error::UnknownMethod(format!(
339            "Unknown method '{member}'"
340        )))
341    }
342
343    async fn dispatch_method_call_try(
344        &self,
345        connection: &Connection,
346        msg: &Message,
347        hdr: &Header<'_>,
348    ) -> fdo::Result<()> {
349        let path = hdr
350            .path()
351            .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
352        let iface_name = hdr
353            .interface()
354            // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same
355            // object have a method with the same name, it is undefined which of those
356            // methods will be invoked. Implementations may choose to either return an
357            // error, or deliver the message as though it had an arbitrary one of those
358            // interfaces.
359            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
360        // Check that the message has a member before spawning.
361        // Note that an unknown member will still spawn a task. We should instead gather
362        // all the details for the call before spawning.
363        // See also https://github.com/z-galaxy/zbus/issues/674 for future of Interface.
364        let _ = hdr
365            .member()
366            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
367
368        // Ensure the root lock isn't held while dispatching the message. That
369        // way, the object server can be mutated during that time.
370        let (iface, with_spawn) = {
371            let root = self.root.read().await;
372
373            // D-Bus spec: org.freedesktop.DBus.Peer interface works on ANY path, even unregistered
374            // ones. See: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
375            // Switch the path to "/" for Peer interface calls.
376            let path = if *iface_name == fdo::Peer::name() {
377                ObjectPath::from_static_str_unchecked("/")
378            } else {
379                path.clone()
380            };
381
382            let node = root
383                .get_child(&path)
384                .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
385
386            let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
387                fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
388            })?;
389            (iface.instance, iface.spawn_tasks_for_methods)
390        };
391
392        if with_spawn {
393            let executor = connection.executor().clone();
394            let task_name = format!("`{msg}` method dispatcher");
395            let connection = connection.clone();
396            let msg = msg.clone();
397            executor
398                .spawn(
399                    async move {
400                        let server = connection.object_server();
401                        let hdr = msg.header();
402                        if let Err(e) = server
403                            .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
404                            .await
405                        {
406                            // When not spawning a task, this error is handled by the caller.
407                            debug!("Returning error: {}", e);
408                            if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
409                                debug!(
410                                    "Error dispatching message. Message: {:?}, error: {:?}",
411                                    msg, e
412                                );
413                            }
414                        }
415                    }
416                    .instrument(trace_span!("{}", task_name)),
417                    &task_name,
418                )
419                .detach();
420            Ok(())
421        } else {
422            self.dispatch_call_to_iface(iface, connection, msg, hdr)
423                .await
424        }
425    }
426
427    /// Dispatch an incoming message to a registered interface.
428    ///
429    /// The object server will handle the message by:
430    ///
431    /// - looking up the called object path & interface,
432    ///
433    /// - calling the associated method if one exists,
434    ///
435    /// - returning a message (responding to the caller with either a return or error message) to
436    ///   the caller through the associated server connection.
437    ///
438    /// Returns an error if the message is malformed.
439    #[instrument(skip(self))]
440    pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
441        let conn = self.connection();
442
443        if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
444            debug!("Returning error: {}", e);
445            conn.reply_dbus_error(hdr, e).await?;
446        }
447        trace!("Handled: {}", msg);
448
449        Ok(())
450    }
451
452    pub(crate) fn connection(&self) -> Connection {
453        self.conn
454            .upgrade()
455            .expect("ObjectServer can't exist w/o an associated Connection")
456    }
457}
458
459#[cfg(feature = "blocking-api")]
460impl From<crate::blocking::ObjectServer> for ObjectServer {
461    fn from(server: crate::blocking::ObjectServer) -> Self {
462        server.into_inner()
463    }
464}