Skip to main content

zbus/object_server/
mod.rs

1//! The object server API.
2
3use std::{collections::HashMap, marker::PhantomData, sync::Arc};
4use tracing::{Instrument, debug, instrument, trace, trace_span};
5
6use zbus_names::InterfaceName;
7use zvariant::{ObjectPath, Value};
8
9use crate::{
10    Connection, Error, Result,
11    async_lock::RwLock,
12    connection::WeakConnection,
13    fdo,
14    fdo::ObjectManager,
15    message::{Header, Message},
16};
17
18mod interface;
19pub(crate) use interface::ArcInterface;
20#[allow(deprecated)]
21pub use interface::DispatchResult;
22pub use interface::{DispatchResult2, Interface, InterfaceDeref, InterfaceDerefMut, InterfaceRef};
23
24mod signal_emitter;
25pub use signal_emitter::SignalEmitter;
26#[deprecated(since = "5.0.0", note = "Please use `SignalEmitter` instead.")]
27pub type SignalContext<'s> = SignalEmitter<'s>;
28
29mod dispatch_notifier;
30pub use dispatch_notifier::ResponseDispatchNotifier;
31
32mod node;
33pub(crate) use node::Node;
34
35/// An object server, holding server-side D-Bus objects & interfaces.
36///
37/// Object servers hold interfaces on various object paths, and expose them over D-Bus.
38///
39/// All object paths will have the standard interfaces implemented on your behalf, such as
40/// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`.
41///
42/// # Example
43///
44/// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path`
45/// path.
46///
47/// ```no_run
48/// # use std::error::Error;
49/// use zbus::{Connection, interface};
50/// use event_listener::Event;
51/// # use async_io::block_on;
52///
53/// struct Example {
54///     // Interfaces are owned by the ObjectServer. They can have
55///     // `&mut self` methods.
56///     quit_event: Event,
57/// }
58///
59/// impl Example {
60///     fn new(quit_event: Event) -> Self {
61///         Self { quit_event }
62///     }
63/// }
64///
65/// #[interface(name = "org.myiface.Example")]
66/// impl Example {
67///     // This will be the "Quit" D-Bus method.
68///     async fn quit(&mut self) {
69///         self.quit_event.notify(1);
70///     }
71///
72///     // See `interface` documentation to learn
73///     // how to expose properties & signals as well.
74/// }
75///
76/// # block_on(async {
77/// let connection = Connection::session().await?;
78///
79/// let quit_event = Event::new();
80/// let quit_listener = quit_event.listen();
81/// let interface = Example::new(quit_event);
82/// connection
83///     .object_server()
84///     .at("/org/zbus/path", interface)
85///     .await?;
86///
87/// quit_listener.await;
88/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
89/// # })?;
90/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
91/// ```
92#[derive(Debug, Clone)]
93pub struct ObjectServer {
94    conn: WeakConnection,
95    root: Arc<RwLock<Node>>,
96}
97
98impl ObjectServer {
99    /// Create a new D-Bus `ObjectServer`.
100    pub(crate) fn new(conn: &Connection) -> Self {
101        Self {
102            conn: conn.into(),
103            root: Arc::new(RwLock::new(Node::new(
104                "/".try_into().expect("zvariant bug"),
105            ))),
106        }
107    }
108
109    pub(crate) fn root(&self) -> &RwLock<Node> {
110        &self.root
111    }
112
113    /// Register a D-Bus [`Interface`] at a given path (see the example above).
114    ///
115    /// Typically you'd want your interfaces to be registered immediately after the associated
116    /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead.
117    /// However, there are situations where you'd need to register interfaces dynamically and that's
118    /// where this method becomes useful.
119    ///
120    /// If the interface already exists at this path, returns false.
121    pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
122    where
123        I: Interface,
124        P: TryInto<ObjectPath<'p>>,
125        P::Error: Into<Error>,
126    {
127        self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
128            .await
129    }
130
131    pub(crate) async fn add_arc_interface<'p, P>(
132        &self,
133        path: P,
134        name: InterfaceName<'static>,
135        arc_iface: ArcInterface,
136    ) -> Result<bool>
137    where
138        P: TryInto<ObjectPath<'p>>,
139        P::Error: Into<Error>,
140    {
141        let path = path.try_into().map_err(Into::into)?;
142        let mut root = self.root().write().await;
143        let (node, manager_path) = root.get_child_mut(&path, true);
144        let node = node.unwrap();
145        let added = node.add_arc_interface(name.clone(), arc_iface);
146        if added {
147            if name == ObjectManager::name() {
148                // Just added an object manager. Need to signal all managed objects under it.
149                let emitter = SignalEmitter::new(&self.connection(), path)?;
150                let objects = node.get_managed_objects(self, &self.connection()).await?;
151                for (path, owned_interfaces) in objects {
152                    let interfaces = owned_interfaces
153                        .iter()
154                        .map(|(i, props)| {
155                            let props = props
156                                .iter()
157                                .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
158                                .collect::<Result<_>>();
159                            Ok((i.into(), props?))
160                        })
161                        .collect::<Result<_>>()?;
162                    ObjectManager::interfaces_added(&emitter, path.into(), interfaces).await?;
163                }
164            } else if let Some(manager_path) = manager_path {
165                let emitter = SignalEmitter::new(&self.connection(), manager_path.clone())?;
166                let mut interfaces = HashMap::new();
167                let owned_props = node
168                    .get_properties(self, &self.connection(), name.clone())
169                    .await?;
170                let props = owned_props
171                    .iter()
172                    .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
173                    .collect::<Result<_>>()?;
174                interfaces.insert(name, props);
175
176                ObjectManager::interfaces_added(&emitter, path, interfaces).await?;
177            }
178        }
179
180        Ok(added)
181    }
182
183    /// Unregister a D-Bus [`Interface`] at a given path.
184    ///
185    /// If there are no more interfaces left at that path, destroys the object as well.
186    /// Returns whether the object was destroyed.
187    pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
188    where
189        I: Interface,
190        P: TryInto<ObjectPath<'p>>,
191        P::Error: Into<Error>,
192    {
193        let path = path.try_into().map_err(Into::into)?;
194        let mut root = self.root.write().await;
195        let (node, manager_path) = root.get_child_mut(&path, false);
196        let node = node.ok_or(Error::InterfaceNotFound)?;
197        if !node.remove_interface(I::name()) {
198            return Err(Error::InterfaceNotFound);
199        }
200        if let Some(manager_path) = manager_path {
201            let ctxt = SignalEmitter::new(&self.connection(), manager_path.clone())?;
202            ObjectManager::interfaces_removed(&ctxt, path.clone(), (&[I::name()]).into()).await?;
203        }
204        if node.is_empty() {
205            let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
206            let last_part = path_parts.next().unwrap();
207            let ppath = ObjectPath::from_string_unchecked(
208                path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
209            );
210            root.get_child_mut(&ppath, false)
211                .0
212                .unwrap()
213                .remove_node(last_part);
214            return Ok(true);
215        }
216        Ok(false)
217    }
218
219    /// Get the interface at the given path.
220    ///
221    /// # Errors
222    ///
223    /// If the interface is not registered at the given path, an `Error::InterfaceNotFound` error is
224    /// returned.
225    ///
226    /// # Examples
227    ///
228    /// The typical use of this is property changes outside of a dispatched handler:
229    ///
230    /// ```no_run
231    /// # use std::error::Error;
232    /// # use zbus::{Connection, interface};
233    /// # use async_io::block_on;
234    /// #
235    /// struct MyIface(u32);
236    ///
237    /// #[interface(name = "org.myiface.MyIface")]
238    /// impl MyIface {
239    ///      #[zbus(property)]
240    ///      async fn count(&self) -> u32 {
241    ///          self.0
242    ///      }
243    /// }
244    ///
245    /// # block_on(async {
246    /// # let connection = Connection::session().await?;
247    /// #
248    /// # let path = "/org/zbus/path";
249    /// # connection.object_server().at(path, MyIface(0)).await?;
250    /// let iface_ref = connection
251    ///     .object_server()
252    ///     .interface::<_, MyIface>(path).await?;
253    /// let mut iface = iface_ref.get_mut().await;
254    /// iface.0 = 42;
255    /// iface.count_changed(iface_ref.signal_emitter()).await?;
256    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
257    /// # })?;
258    /// #
259    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
260    /// ```
261    pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
262    where
263        I: Interface,
264        P: TryInto<ObjectPath<'p>>,
265        P::Error: Into<Error>,
266    {
267        let path = path.try_into().map_err(Into::into)?;
268        let root = self.root().read().await;
269        let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
270
271        let lock = node
272            .interface_lock(I::name())
273            .ok_or(Error::InterfaceNotFound)?
274            .instance
275            .clone();
276
277        // Ensure what we return can later be dowcasted safely.
278        lock.read()
279            .await
280            .downcast_ref::<I>()
281            .ok_or(Error::InterfaceNotFound)?;
282
283        let conn = self.connection();
284        // SAFETY: We know that there is a valid path on the node as we already converted w/o error.
285        let emitter = SignalEmitter::new(&conn, path).unwrap().into_owned();
286
287        Ok(InterfaceRef {
288            emitter,
289            lock,
290            phantom: PhantomData,
291        })
292    }
293
294    async fn dispatch_call_to_iface(
295        &self,
296        iface: Arc<RwLock<dyn Interface>>,
297        connection: &Connection,
298        msg: &Message,
299        hdr: &Header<'_>,
300    ) -> fdo::Result<()> {
301        let member = hdr
302            .member()
303            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
304        let iface_name = hdr
305            .interface()
306            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
307
308        trace!("acquiring read lock on interface `{}`", iface_name);
309        let read_lock = iface.read().await;
310        trace!("acquired read lock on interface `{}`", iface_name);
311        match read_lock.call(self, connection, msg, member.as_ref()) {
312            DispatchResult2::NotFound => {
313                return Err(fdo::Error::UnknownMethod(format!(
314                    "Unknown method '{member}'"
315                )));
316            }
317            DispatchResult2::Async(f) => {
318                return f.await;
319            }
320            DispatchResult2::RequiresMut => {}
321        }
322        drop(read_lock);
323        trace!("acquiring write lock on interface `{}`", iface_name);
324        let mut write_lock = iface.write().await;
325        trace!("acquired write lock on interface `{}`", iface_name);
326        match write_lock.call_mut(self, connection, msg, member.as_ref()) {
327            DispatchResult2::NotFound => {}
328            DispatchResult2::RequiresMut => {}
329            DispatchResult2::Async(f) => {
330                return f.await;
331            }
332        }
333        drop(write_lock);
334        Err(fdo::Error::UnknownMethod(format!(
335            "Unknown method '{member}'"
336        )))
337    }
338
339    async fn dispatch_method_call_try(
340        &self,
341        connection: &Connection,
342        msg: &Message,
343        hdr: &Header<'_>,
344    ) -> fdo::Result<()> {
345        let path = hdr
346            .path()
347            .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
348        let iface_name = hdr
349            .interface()
350            // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same
351            // object have a method with the same name, it is undefined which of those
352            // methods will be invoked. Implementations may choose to either return an
353            // error, or deliver the message as though it had an arbitrary one of those
354            // interfaces.
355            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
356        // Check that the message has a member before spawning.
357        // Note that an unknown member will still spawn a task. We should instead gather
358        // all the details for the call before spawning.
359        // See also https://github.com/z-galaxy/zbus/issues/674 for future of Interface.
360        let _ = hdr
361            .member()
362            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
363
364        // Ensure the root lock isn't held while dispatching the message. That
365        // way, the object server can be mutated during that time.
366        let (iface, with_spawn) = {
367            let root = self.root.read().await;
368
369            // D-Bus spec: org.freedesktop.DBus.Peer interface works on ANY path, even unregistered
370            // ones. See: https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-peer
371            // Switch the path to "/" for Peer interface calls.
372            let path = if *iface_name == fdo::Peer::name() {
373                ObjectPath::from_static_str_unchecked("/")
374            } else {
375                path.clone()
376            };
377
378            let node = root
379                .get_child(&path)
380                .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
381
382            let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
383                fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
384            })?;
385            (iface.instance, iface.spawn_tasks_for_methods)
386        };
387
388        if with_spawn {
389            let executor = connection.executor().clone();
390            let task_name = format!("`{msg}` method dispatcher");
391            let connection = connection.clone();
392            let msg = msg.clone();
393            executor
394                .spawn(
395                    async move {
396                        let server = connection.object_server();
397                        let hdr = msg.header();
398                        if let Err(e) = server
399                            .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
400                            .await
401                        {
402                            // When not spawning a task, this error is handled by the caller.
403                            debug!("Returning error: {}", e);
404                            if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
405                                debug!(
406                                    "Error dispatching message. Message: {:?}, error: {:?}",
407                                    msg, e
408                                );
409                            }
410                        }
411                    }
412                    .instrument(trace_span!("{}", task_name)),
413                    &task_name,
414                )
415                .detach();
416            Ok(())
417        } else {
418            self.dispatch_call_to_iface(iface, connection, msg, hdr)
419                .await
420        }
421    }
422
423    /// Dispatch an incoming message to a registered interface.
424    ///
425    /// The object server will handle the message by:
426    ///
427    /// - looking up the called object path & interface,
428    ///
429    /// - calling the associated method if one exists,
430    ///
431    /// - returning a message (responding to the caller with either a return or error message) to
432    ///   the caller through the associated server connection.
433    ///
434    /// Returns an error if the message is malformed.
435    #[instrument(skip(self))]
436    pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
437        let conn = self.connection();
438
439        if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
440            debug!("Returning error: {}", e);
441            conn.reply_dbus_error(hdr, e).await?;
442        }
443        trace!("Handled: {}", msg);
444
445        Ok(())
446    }
447
448    pub(crate) fn connection(&self) -> Connection {
449        self.conn
450            .upgrade()
451            .expect("ObjectServer can't exist w/o an associated Connection")
452    }
453}
454
455#[cfg(feature = "blocking-api")]
456impl From<crate::blocking::ObjectServer> for ObjectServer {
457    fn from(server: crate::blocking::ObjectServer) -> Self {
458        server.into_inner()
459    }
460}