zbus/connection/
builder.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10    collections::{HashMap, HashSet},
11    vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27    Connection, Error, Executor, Guid, OwnedGuid, Result,
28    address::{self, Address},
29    fdo::RequestNameFlags,
30    names::{InterfaceName, WellKnownName},
31    object_server::{ArcInterface, Interface},
32};
33
34use super::{
35    handshake::{AuthMechanism, Authenticated},
36    socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43    #[cfg(any(unix, not(feature = "tokio")))]
44    UnixStream(UnixStream),
45    TcpStream(TcpStream),
46    #[cfg(any(
47        all(feature = "vsock", not(feature = "tokio")),
48        feature = "tokio-vsock"
49    ))]
50    VsockStream(VsockStream),
51    Address(Address),
52    Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53    AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58/// A builder for [`zbus::Connection`].
59///
60/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
61/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
62/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
63///
64/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
65///    enqueued and that the requested name might not be available right after building.
66///
67/// 2. The name may be acquired in between the time the name is requested and the
68///    [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
69///    [`crate::fdo::NameAcquired`] signal.
70#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73    target: Option<Target>,
74    max_queued: Option<usize>,
75    // This is only set for p2p server case or pre-authenticated sockets.
76    guid: Option<Guid<'a>>,
77    #[cfg(feature = "p2p")]
78    p2p: bool,
79    internal_executor: bool,
80    interfaces: Interfaces<'a>,
81    names: HashSet<WellKnownName<'a>>,
82    auth_mechanism: Option<AuthMechanism>,
83    #[cfg(feature = "bus-impl")]
84    unique_name: Option<crate::names::UniqueName<'a>>,
85    request_name_flags: BitFlags<RequestNameFlags>,
86    method_timeout: Option<std::time::Duration>,
87    user_id: Option<u32>,
88}
89
90impl<'a> Builder<'a> {
91    /// Create a builder for the session/user message bus connection.
92    pub fn session() -> Result<Self> {
93        Ok(Self::new(Target::Address(Address::session()?)))
94    }
95
96    /// Create a builder for the system-wide message bus connection.
97    pub fn system() -> Result<Self> {
98        Ok(Self::new(Target::Address(Address::system()?)))
99    }
100
101    /// Create a builder for an IBus connection.
102    ///
103    /// IBus (Intelligent Input Bus) is an input method framework. This method creates a builder
104    /// that will query the IBus daemon for its D-Bus address using the `ibus address` command.
105    ///
106    /// # Platform Support
107    ///
108    /// This method is available on Unix-like systems where IBus is installed.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if:
113    /// - The `ibus` command is not found or fails to execute
114    /// - The IBus daemon is not running
115    /// - The command output cannot be parsed as a valid D-Bus address
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// # use std::error::Error;
121    /// # use zbus::connection::Builder;
122    /// # use zbus::block_on;
123    /// #
124    /// # block_on(async {
125    /// let conn = Builder::ibus()?
126    ///     .build()
127    ///     .await?;
128    ///
129    /// // Use the connection to interact with IBus services
130    /// # drop(conn);
131    /// # Ok::<(), zbus::Error>(())
132    /// # }).unwrap();
133    /// #
134    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
135    /// ```
136    #[cfg(unix)]
137    pub fn ibus() -> Result<Self> {
138        use crate::address::transport::{Ibus, Transport};
139        Ok(Self::new(Target::Address(Address::from(Transport::Ibus(
140            Ibus::new(),
141        )))))
142    }
143
144    /// Create a builder for a connection that will use the given [D-Bus bus address].
145    ///
146    /// # Example
147    ///
148    /// Here is an example of connecting to an IBus service:
149    ///
150    /// ```no_run
151    /// # use std::error::Error;
152    /// # use zbus::connection::Builder;
153    /// # use zbus::block_on;
154    /// #
155    /// # block_on(async {
156    /// let addr = "unix:\
157    ///     path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
158    ///     guid=fdd08e811a6c7ebe1fef0d9e647230da";
159    /// let conn = Builder::address(addr)?
160    ///     .build()
161    ///     .await?;
162    ///
163    /// // Do something useful with `conn`..
164    /// #     drop(conn);
165    /// #     Ok::<(), zbus::Error>(())
166    /// # }).unwrap();
167    /// #
168    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
169    /// ```
170    ///
171    /// **Note:** The IBus address is different for each session. You can find the address for your
172    /// current session using `ibus address` command. For a more convenient way to connect to IBus,
173    /// see [`Builder::ibus`].
174    ///
175    /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
176    pub fn address<A>(address: A) -> Result<Self>
177    where
178        A: TryInto<Address>,
179        A::Error: Into<Error>,
180    {
181        Ok(Self::new(Target::Address(
182            address.try_into().map_err(Into::into)?,
183        )))
184    }
185
186    /// Create a builder for a connection that will use the given unix stream.
187    ///
188    /// If the default `async-io` feature is disabled, this method will expect a
189    /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
190    /// argument.
191    ///
192    /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
193    /// is not available when the `tokio` feature is enabled and building for Windows target.
194    ///
195    /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
196    #[cfg(any(unix, not(feature = "tokio")))]
197    pub fn unix_stream(stream: UnixStream) -> Self {
198        Self::new(Target::UnixStream(stream))
199    }
200
201    /// Create a builder for a connection that will use the given TCP stream.
202    ///
203    /// If the default `async-io` feature is disabled, this method will expect a
204    /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
205    /// argument.
206    pub fn tcp_stream(stream: TcpStream) -> Self {
207        Self::new(Target::TcpStream(stream))
208    }
209
210    /// Create a builder for a connection that will use the given VSOCK stream.
211    ///
212    /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
213    /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
214    /// with `tokio-vsock` feature.
215    #[cfg(any(
216        all(feature = "vsock", not(feature = "tokio")),
217        feature = "tokio-vsock"
218    ))]
219    pub fn vsock_stream(stream: VsockStream) -> Self {
220        Self::new(Target::VsockStream(stream))
221    }
222
223    /// Create a builder for a connection that will use the given socket.
224    pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
225        Self::new(Target::Socket(socket.into()))
226    }
227
228    /// Create a builder for a connection that will use the given pre-authenticated socket.
229    ///
230    /// This is similar to [`Builder::socket`], except that the socket is either already
231    /// authenticated or does not require authentication.
232    pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
233    where
234        S: Into<BoxedSplit>,
235        G: TryInto<Guid<'a>>,
236        G::Error: Into<Error>,
237    {
238        let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
239        builder.guid = Some(guid.try_into().map_err(Into::into)?);
240
241        Ok(builder)
242    }
243
244    /// Specify the mechanism to use during authentication.
245    pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
246        self.auth_mechanism = Some(auth_mechanism);
247
248        self
249    }
250
251    /// Specify the user id during authentication.
252    ///
253    /// This can be useful when using [`AuthMechanism::External`] with `socat`
254    /// to avoid the host decide what uid to use and instead provide one
255    /// known to have access rights.
256    #[cfg(unix)]
257    pub fn user_id(mut self, id: u32) -> Self {
258        self.user_id = Some(id);
259
260        self
261    }
262
263    /// The to-be-created connection will be a peer-to-peer connection.
264    ///
265    /// This method is only available when the `p2p` feature is enabled.
266    #[cfg(feature = "p2p")]
267    pub fn p2p(mut self) -> Self {
268        self.p2p = true;
269
270        self
271    }
272
273    /// The to-be-created connection will be a server using the given GUID.
274    ///
275    /// The to-be-created connection will wait for incoming client authentication handshake and
276    /// negotiation messages, for peer-to-peer communications after successful creation.
277    ///
278    /// This method is only available when the `p2p` feature is enabled.
279    ///
280    /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
281    /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
282    /// server and a client connection, except for authentication.
283    #[cfg(feature = "p2p")]
284    pub fn server<G>(mut self, guid: G) -> Result<Self>
285    where
286        G: TryInto<Guid<'a>>,
287        G::Error: Into<Error>,
288    {
289        self.guid = Some(guid.try_into().map_err(Into::into)?);
290
291        Ok(self)
292    }
293
294    /// Set the capacity of the main (unfiltered) queue.
295    ///
296    /// Since typically you'd want to set this at instantiation time, you can set it through the
297    /// builder.
298    ///
299    /// # Example
300    ///
301    /// ```
302    /// # use std::error::Error;
303    /// # use zbus::connection::Builder;
304    /// # use zbus::block_on;
305    /// #
306    /// # block_on(async {
307    /// let conn = Builder::session()?
308    ///     .max_queued(30)
309    ///     .build()
310    ///     .await?;
311    /// assert_eq!(conn.max_queued(), 30);
312    ///
313    /// #     Ok::<(), zbus::Error>(())
314    /// # }).unwrap();
315    /// #
316    /// // Do something useful with `conn`..
317    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
318    /// ```
319    pub fn max_queued(mut self, max: usize) -> Self {
320        self.max_queued = Some(max);
321
322        self
323    }
324
325    /// Enable or disable the internal executor thread.
326    ///
327    /// The thread is enabled by default.
328    ///
329    /// See [Connection::executor] for more details.
330    pub fn internal_executor(mut self, enabled: bool) -> Self {
331        self.internal_executor = enabled;
332
333        self
334    }
335
336    /// Register a D-Bus [`Interface`] to be served at a given path.
337    ///
338    /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
339    /// interfaces available immediately after the connection is established. Typically, this is
340    /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
341    /// replace any previously added interface with the same name at the same path.
342    ///
343    /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
344    /// attempt to add yours, [`Builder::build()`] will fail.
345    pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
346    where
347        I: Interface,
348        P: TryInto<ObjectPath<'a>>,
349        P::Error: Into<Error>,
350    {
351        let path = path.try_into().map_err(Into::into)?;
352        let entry = self.interfaces.entry(path).or_default();
353        entry.insert(I::name(), ArcInterface::new(iface));
354        Ok(self)
355    }
356
357    /// Register a well-known name for this connection on the bus.
358    ///
359    /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
360    /// of the connection setup ([`Builder::build`]), immediately after interfaces
361    /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
362    /// exactly what you want.
363    ///
364    /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
365    /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
366    pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
367    where
368        W: TryInto<WellKnownName<'a>>,
369        W::Error: Into<Error>,
370    {
371        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
372        self.names.insert(well_known_name);
373
374        Ok(self)
375    }
376
377    /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
378    /// requesting names.
379    pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
380        self.request_name_flags
381            .set(RequestNameFlags::AllowReplacement, allow_replacement);
382        self
383    }
384
385    /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
386    /// requesting names.
387    pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
388        self.request_name_flags
389            .set(RequestNameFlags::ReplaceExisting, replace_existing);
390        self
391    }
392
393    /// Set the unique name of the connection.
394    ///
395    /// This is mainly provided for bus implementations. All other users should not need to use this
396    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
397    ///
398    /// # Panics
399    ///
400    /// It will panic if the connection is to a message bus as it's the bus that assigns
401    /// peers their unique names.
402    #[cfg(feature = "bus-impl")]
403    pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
404    where
405        U: TryInto<crate::names::UniqueName<'a>>,
406        U::Error: Into<Error>,
407    {
408        if !self.p2p {
409            panic!("unique name can only be set for peer-to-peer connections");
410        }
411        let name = unique_name.try_into().map_err(Into::into)?;
412        self.unique_name = Some(name);
413
414        Ok(self)
415    }
416
417    /// Set a timeout for method calls.
418    ///
419    /// Method calls will return
420    /// `zbus::Error::InputOutput(std::io::Error(kind: ErrorKind::TimedOut))` if a client does not
421    /// receive an answer from a service in time.
422    pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
423        self.method_timeout = Some(timeout);
424
425        self
426    }
427
428    /// Build the connection, consuming the builder.
429    ///
430    /// # Errors
431    ///
432    /// Until server-side bus connection is supported, attempting to build such a connection will
433    /// result in a [`Error::Unsupported`] error.
434    pub async fn build(self) -> Result<Connection> {
435        let executor = Executor::new();
436        #[cfg(not(feature = "tokio"))]
437        let internal_executor = self.internal_executor;
438        // Box the future as it's large and can cause stack overflow.
439        let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
440
441        #[cfg(not(feature = "tokio"))]
442        start_internal_executor(&executor, internal_executor)?;
443
444        Ok(conn)
445    }
446
447    async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
448        #[cfg(feature = "p2p")]
449        let is_bus_conn = !self.p2p;
450        #[cfg(not(feature = "p2p"))]
451        let is_bus_conn = true;
452
453        let mut auth = self.connect(is_bus_conn).await?;
454
455        // SAFETY: `Authenticated` is always built with these fields set to `Some`.
456        let socket_read = auth.socket_read.take().unwrap();
457        let already_received_bytes = auth.already_received_bytes.drain(..).collect();
458        #[cfg(unix)]
459        let already_received_fds = auth.already_received_fds.drain(..).collect();
460
461        let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
462        conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
463
464        if !self.interfaces.is_empty() {
465            let object_server = conn.ensure_object_server(false);
466            for (path, interfaces) in self.interfaces {
467                for (name, iface) in interfaces {
468                    let added = object_server
469                        .add_arc_interface(path.clone(), name.clone(), iface.clone())
470                        .await?;
471                    if !added {
472                        return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
473                    }
474                }
475            }
476
477            let started_event = Event::new();
478            let listener = started_event.listen();
479            conn.start_object_server(Some(started_event));
480
481            listener.await;
482        }
483
484        // Start the socket reader task.
485        conn.init_socket_reader(
486            socket_read,
487            already_received_bytes,
488            #[cfg(unix)]
489            already_received_fds,
490        );
491
492        for name in self.names {
493            conn.request_name_with_flags(name, self.request_name_flags)
494                .await?;
495        }
496
497        Ok(conn)
498    }
499
500    fn new(target: Target) -> Self {
501        Self {
502            target: Some(target),
503            #[cfg(feature = "p2p")]
504            p2p: false,
505            max_queued: None,
506            guid: None,
507            internal_executor: true,
508            interfaces: HashMap::new(),
509            names: HashSet::new(),
510            auth_mechanism: None,
511            #[cfg(feature = "bus-impl")]
512            unique_name: None,
513            request_name_flags: BitFlags::default(),
514            method_timeout: None,
515            user_id: None,
516        }
517    }
518
519    async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
520        #[cfg(not(feature = "bus-impl"))]
521        let unique_name = None;
522        #[cfg(feature = "bus-impl")]
523        let unique_name = self.unique_name.take().map(Into::into);
524
525        #[allow(unused_mut)]
526        let (mut stream, server_guid, authenticated) = self.target_connect().await?;
527        if authenticated {
528            let (socket_read, socket_write) = stream.take();
529            Ok(Authenticated {
530                #[cfg(unix)]
531                cap_unix_fd: socket_read.can_pass_unix_fd(),
532                socket_read: Some(socket_read),
533                socket_write,
534                // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
535                server_guid: server_guid.unwrap(),
536                already_received_bytes: vec![],
537                unique_name,
538                #[cfg(unix)]
539                already_received_fds: vec![],
540            })
541        } else {
542            #[cfg(feature = "p2p")]
543            match self.guid.take() {
544                None => {
545                    // SASL Handshake
546                    Authenticated::client(
547                        stream,
548                        server_guid,
549                        self.auth_mechanism,
550                        is_bus_conn,
551                        self.user_id,
552                    )
553                    .await
554                }
555                Some(guid) => {
556                    if !self.p2p {
557                        return Err(Error::Unsupported);
558                    }
559
560                    let creds = stream.read_mut().peer_credentials().await?;
561                    #[cfg(unix)]
562                    let client_uid = self.user_id.or_else(|| creds.unix_user_id());
563                    #[cfg(windows)]
564                    let client_sid = creds.into_windows_sid();
565
566                    Authenticated::server(
567                        stream,
568                        guid.to_owned().into(),
569                        #[cfg(unix)]
570                        client_uid,
571                        #[cfg(windows)]
572                        client_sid,
573                        self.auth_mechanism,
574                        unique_name,
575                    )
576                    .await
577                }
578            }
579
580            #[cfg(not(feature = "p2p"))]
581            Authenticated::client(
582                stream,
583                server_guid,
584                self.auth_mechanism,
585                is_bus_conn,
586                self.user_id,
587            )
588            .await
589        }
590    }
591
592    async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
593        let mut authenticated = false;
594        let mut guid = None;
595        // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
596        // once.
597        let split = match self.target.take().unwrap() {
598            #[cfg(not(feature = "tokio"))]
599            Target::UnixStream(stream) => Async::new(stream)?.into(),
600            #[cfg(all(unix, feature = "tokio"))]
601            Target::UnixStream(stream) => stream.into(),
602            #[cfg(not(feature = "tokio"))]
603            Target::TcpStream(stream) => Async::new(stream)?.into(),
604            #[cfg(feature = "tokio")]
605            Target::TcpStream(stream) => stream.into(),
606            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
607            Target::VsockStream(stream) => Async::new(stream)?.into(),
608            #[cfg(feature = "tokio-vsock")]
609            Target::VsockStream(stream) => stream.into(),
610            Target::Address(address) => {
611                guid = address.guid().map(|g| g.to_owned().into());
612                match address.connect().await? {
613                    #[cfg(any(unix, not(feature = "tokio")))]
614                    address::transport::Stream::Unix(stream) => stream.into(),
615                    #[cfg(unix)]
616                    address::transport::Stream::Unixexec(stream) => stream.into(),
617                    address::transport::Stream::Tcp(stream) => stream.into(),
618                    #[cfg(any(
619                        all(feature = "vsock", not(feature = "tokio")),
620                        feature = "tokio-vsock"
621                    ))]
622                    address::transport::Stream::Vsock(stream) => stream.into(),
623                }
624            }
625            Target::Socket(stream) => stream,
626            Target::AuthenticatedSocket(stream) => {
627                authenticated = true;
628                guid = self.guid.take().map(Into::into);
629                stream
630            }
631        };
632
633        Ok((split, guid, authenticated))
634    }
635}
636
637/// Start the internal executor thread.
638///
639/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
640/// tasks until socket reader task kicks in.
641#[cfg(not(feature = "tokio"))]
642fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
643    if internal_executor {
644        let executor = executor.clone();
645        std::thread::Builder::new()
646            .name("zbus::Connection executor".into())
647            .spawn(move || {
648                crate::utils::block_on(async move {
649                    // Run as long as there is a task to run.
650                    while !executor.is_empty() {
651                        executor.tick().await;
652                    }
653                })
654            })?;
655    }
656
657    Ok(())
658}