zbus/connection/builder.rs
1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet},
11 vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27 Connection, Error, Executor, Guid, OwnedGuid, Result,
28 address::{self, Address},
29 fdo::RequestNameFlags,
30 names::{InterfaceName, WellKnownName},
31 object_server::{ArcInterface, Interface},
32};
33
34use super::{
35 handshake::{AuthMechanism, Authenticated},
36 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43 #[cfg(any(unix, not(feature = "tokio")))]
44 UnixStream(UnixStream),
45 TcpStream(TcpStream),
46 #[cfg(any(
47 all(feature = "vsock", not(feature = "tokio")),
48 feature = "tokio-vsock"
49 ))]
50 VsockStream(VsockStream),
51 Address(Address),
52 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58/// A builder for [`zbus::Connection`].
59///
60/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
61/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
62/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
63///
64/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
65/// enqueued and that the requested name might not be available right after building.
66///
67/// 2. The name may be acquired in between the time the name is requested and the
68/// [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
69/// [`crate::fdo::NameAcquired`] signal.
70#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73 target: Option<Target>,
74 max_queued: Option<usize>,
75 // This is only set for p2p server case or pre-authenticated sockets.
76 guid: Option<Guid<'a>>,
77 #[cfg(feature = "p2p")]
78 p2p: bool,
79 internal_executor: bool,
80 interfaces: Interfaces<'a>,
81 names: HashSet<WellKnownName<'a>>,
82 auth_mechanism: Option<AuthMechanism>,
83 #[cfg(feature = "bus-impl")]
84 unique_name: Option<crate::names::UniqueName<'a>>,
85 request_name_flags: BitFlags<RequestNameFlags>,
86 method_timeout: Option<std::time::Duration>,
87 user_id: Option<u32>,
88}
89
90impl<'a> Builder<'a> {
91 /// Create a builder for the session/user message bus connection.
92 pub fn session() -> Result<Self> {
93 Ok(Self::new(Target::Address(Address::session()?)))
94 }
95
96 /// Create a builder for the system-wide message bus connection.
97 pub fn system() -> Result<Self> {
98 Ok(Self::new(Target::Address(Address::system()?)))
99 }
100
101 /// Create a builder for an IBus connection.
102 ///
103 /// IBus (Intelligent Input Bus) is an input method framework. This method creates a builder
104 /// that will query the IBus daemon for its D-Bus address using the `ibus address` command.
105 ///
106 /// # Platform Support
107 ///
108 /// This method is available on Unix-like systems where IBus is installed.
109 ///
110 /// # Errors
111 ///
112 /// Returns an error if:
113 /// - The `ibus` command is not found or fails to execute
114 /// - The IBus daemon is not running
115 /// - The command output cannot be parsed as a valid D-Bus address
116 ///
117 /// # Example
118 ///
119 /// ```no_run
120 /// # use std::error::Error;
121 /// # use zbus::connection::Builder;
122 /// # use zbus::block_on;
123 /// #
124 /// # block_on(async {
125 /// let conn = Builder::ibus()?
126 /// .build()
127 /// .await?;
128 ///
129 /// // Use the connection to interact with IBus services
130 /// # drop(conn);
131 /// # Ok::<(), zbus::Error>(())
132 /// # }).unwrap();
133 /// #
134 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
135 /// ```
136 #[cfg(unix)]
137 pub fn ibus() -> Result<Self> {
138 use crate::address::transport::{Ibus, Transport};
139 Ok(Self::new(Target::Address(Address::from(Transport::Ibus(
140 Ibus::new(),
141 )))))
142 }
143
144 /// Create a builder for a connection that will use the given [D-Bus bus address].
145 ///
146 /// # Example
147 ///
148 /// Here is an example of connecting to an IBus service:
149 ///
150 /// ```no_run
151 /// # use std::error::Error;
152 /// # use zbus::connection::Builder;
153 /// # use zbus::block_on;
154 /// #
155 /// # block_on(async {
156 /// let addr = "unix:\
157 /// path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
158 /// guid=fdd08e811a6c7ebe1fef0d9e647230da";
159 /// let conn = Builder::address(addr)?
160 /// .build()
161 /// .await?;
162 ///
163 /// // Do something useful with `conn`..
164 /// # drop(conn);
165 /// # Ok::<(), zbus::Error>(())
166 /// # }).unwrap();
167 /// #
168 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
169 /// ```
170 ///
171 /// **Note:** The IBus address is different for each session. You can find the address for your
172 /// current session using `ibus address` command. For a more convenient way to connect to IBus,
173 /// see [`Builder::ibus`].
174 ///
175 /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
176 pub fn address<A>(address: A) -> Result<Self>
177 where
178 A: TryInto<Address>,
179 A::Error: Into<Error>,
180 {
181 Ok(Self::new(Target::Address(
182 address.try_into().map_err(Into::into)?,
183 )))
184 }
185
186 /// Create a builder for a connection that will use the given unix stream.
187 ///
188 /// If the default `async-io` feature is disabled, this method will expect a
189 /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
190 /// argument.
191 ///
192 /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
193 /// is not available when the `tokio` feature is enabled and building for Windows target.
194 ///
195 /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
196 #[cfg(any(unix, not(feature = "tokio")))]
197 pub fn unix_stream(stream: UnixStream) -> Self {
198 Self::new(Target::UnixStream(stream))
199 }
200
201 /// Create a builder for a connection that will use the given TCP stream.
202 ///
203 /// If the default `async-io` feature is disabled, this method will expect a
204 /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
205 /// argument.
206 pub fn tcp_stream(stream: TcpStream) -> Self {
207 Self::new(Target::TcpStream(stream))
208 }
209
210 /// Create a builder for a connection that will use the given VSOCK stream.
211 ///
212 /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
213 /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
214 /// with `tokio-vsock` feature.
215 #[cfg(any(
216 all(feature = "vsock", not(feature = "tokio")),
217 feature = "tokio-vsock"
218 ))]
219 pub fn vsock_stream(stream: VsockStream) -> Self {
220 Self::new(Target::VsockStream(stream))
221 }
222
223 /// Create a builder for a connection that will use the given socket.
224 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
225 Self::new(Target::Socket(socket.into()))
226 }
227
228 /// Create a builder for a connection that will use the given pre-authenticated socket.
229 ///
230 /// This is similar to [`Builder::socket`], except that the socket is either already
231 /// authenticated or does not require authentication.
232 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
233 where
234 S: Into<BoxedSplit>,
235 G: TryInto<Guid<'a>>,
236 G::Error: Into<Error>,
237 {
238 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
239 builder.guid = Some(guid.try_into().map_err(Into::into)?);
240
241 Ok(builder)
242 }
243
244 /// Specify the mechanism to use during authentication.
245 pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
246 self.auth_mechanism = Some(auth_mechanism);
247
248 self
249 }
250
251 /// Specify the user id during authentication.
252 ///
253 /// This can be useful when using [`AuthMechanism::External`] with `socat`
254 /// to avoid the host decide what uid to use and instead provide one
255 /// known to have access rights.
256 #[cfg(unix)]
257 pub fn user_id(mut self, id: u32) -> Self {
258 self.user_id = Some(id);
259
260 self
261 }
262
263 /// The to-be-created connection will be a peer-to-peer connection.
264 ///
265 /// This method is only available when the `p2p` feature is enabled.
266 #[cfg(feature = "p2p")]
267 pub fn p2p(mut self) -> Self {
268 self.p2p = true;
269
270 self
271 }
272
273 /// The to-be-created connection will be a server using the given GUID.
274 ///
275 /// The to-be-created connection will wait for incoming client authentication handshake and
276 /// negotiation messages, for peer-to-peer communications after successful creation.
277 ///
278 /// This method is only available when the `p2p` feature is enabled.
279 ///
280 /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
281 /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
282 /// server and a client connection, except for authentication.
283 #[cfg(feature = "p2p")]
284 pub fn server<G>(mut self, guid: G) -> Result<Self>
285 where
286 G: TryInto<Guid<'a>>,
287 G::Error: Into<Error>,
288 {
289 self.guid = Some(guid.try_into().map_err(Into::into)?);
290
291 Ok(self)
292 }
293
294 /// Set the capacity of the main (unfiltered) queue.
295 ///
296 /// Since typically you'd want to set this at instantiation time, you can set it through the
297 /// builder.
298 ///
299 /// # Example
300 ///
301 /// ```
302 /// # use std::error::Error;
303 /// # use zbus::connection::Builder;
304 /// # use zbus::block_on;
305 /// #
306 /// # block_on(async {
307 /// let conn = Builder::session()?
308 /// .max_queued(30)
309 /// .build()
310 /// .await?;
311 /// assert_eq!(conn.max_queued(), 30);
312 ///
313 /// # Ok::<(), zbus::Error>(())
314 /// # }).unwrap();
315 /// #
316 /// // Do something useful with `conn`..
317 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
318 /// ```
319 pub fn max_queued(mut self, max: usize) -> Self {
320 self.max_queued = Some(max);
321
322 self
323 }
324
325 /// Enable or disable the internal executor thread.
326 ///
327 /// The thread is enabled by default.
328 ///
329 /// See [Connection::executor] for more details.
330 pub fn internal_executor(mut self, enabled: bool) -> Self {
331 self.internal_executor = enabled;
332
333 self
334 }
335
336 /// Register a D-Bus [`Interface`] to be served at a given path.
337 ///
338 /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
339 /// interfaces available immediately after the connection is established. Typically, this is
340 /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
341 /// replace any previously added interface with the same name at the same path.
342 ///
343 /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
344 /// attempt to add yours, [`Builder::build()`] will fail.
345 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
346 where
347 I: Interface,
348 P: TryInto<ObjectPath<'a>>,
349 P::Error: Into<Error>,
350 {
351 let path = path.try_into().map_err(Into::into)?;
352 let entry = self.interfaces.entry(path).or_default();
353 entry.insert(I::name(), ArcInterface::new(iface));
354 Ok(self)
355 }
356
357 /// Register a well-known name for this connection on the bus.
358 ///
359 /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
360 /// of the connection setup ([`Builder::build`]), immediately after interfaces
361 /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
362 /// exactly what you want.
363 ///
364 /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
365 /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
366 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
367 where
368 W: TryInto<WellKnownName<'a>>,
369 W::Error: Into<Error>,
370 {
371 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
372 self.names.insert(well_known_name);
373
374 Ok(self)
375 }
376
377 /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
378 /// requesting names.
379 pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
380 self.request_name_flags
381 .set(RequestNameFlags::AllowReplacement, allow_replacement);
382 self
383 }
384
385 /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
386 /// requesting names.
387 pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
388 self.request_name_flags
389 .set(RequestNameFlags::ReplaceExisting, replace_existing);
390 self
391 }
392
393 /// Set the unique name of the connection.
394 ///
395 /// This is mainly provided for bus implementations. All other users should not need to use this
396 /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
397 ///
398 /// # Panics
399 ///
400 /// It will panic if the connection is to a message bus as it's the bus that assigns
401 /// peers their unique names.
402 #[cfg(feature = "bus-impl")]
403 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
404 where
405 U: TryInto<crate::names::UniqueName<'a>>,
406 U::Error: Into<Error>,
407 {
408 if !self.p2p {
409 panic!("unique name can only be set for peer-to-peer connections");
410 }
411 let name = unique_name.try_into().map_err(Into::into)?;
412 self.unique_name = Some(name);
413
414 Ok(self)
415 }
416
417 /// Set a timeout for method calls.
418 ///
419 /// Method calls will return
420 /// `zbus::Error::InputOutput(std::io::Error(kind: ErrorKind::TimedOut))` if a client does not
421 /// receive an answer from a service in time.
422 pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
423 self.method_timeout = Some(timeout);
424
425 self
426 }
427
428 /// Build the connection, consuming the builder.
429 ///
430 /// # Errors
431 ///
432 /// Until server-side bus connection is supported, attempting to build such a connection will
433 /// result in a [`Error::Unsupported`] error.
434 pub async fn build(self) -> Result<Connection> {
435 let executor = Executor::new();
436 #[cfg(not(feature = "tokio"))]
437 let internal_executor = self.internal_executor;
438 // Box the future as it's large and can cause stack overflow.
439 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
440
441 #[cfg(not(feature = "tokio"))]
442 start_internal_executor(&executor, internal_executor)?;
443
444 Ok(conn)
445 }
446
447 async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
448 #[cfg(feature = "p2p")]
449 let is_bus_conn = !self.p2p;
450 #[cfg(not(feature = "p2p"))]
451 let is_bus_conn = true;
452
453 let mut auth = self.connect(is_bus_conn).await?;
454
455 // SAFETY: `Authenticated` is always built with these fields set to `Some`.
456 let socket_read = auth.socket_read.take().unwrap();
457 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
458 #[cfg(unix)]
459 let already_received_fds = auth.already_received_fds.drain(..).collect();
460
461 let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
462 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
463
464 if !self.interfaces.is_empty() {
465 let object_server = conn.ensure_object_server(false);
466 for (path, interfaces) in self.interfaces {
467 for (name, iface) in interfaces {
468 let added = object_server
469 .add_arc_interface(path.clone(), name.clone(), iface.clone())
470 .await?;
471 if !added {
472 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
473 }
474 }
475 }
476
477 let started_event = Event::new();
478 let listener = started_event.listen();
479 conn.start_object_server(Some(started_event));
480
481 listener.await;
482 }
483
484 // Start the socket reader task.
485 conn.init_socket_reader(
486 socket_read,
487 already_received_bytes,
488 #[cfg(unix)]
489 already_received_fds,
490 );
491
492 for name in self.names {
493 conn.request_name_with_flags(name, self.request_name_flags)
494 .await?;
495 }
496
497 Ok(conn)
498 }
499
500 fn new(target: Target) -> Self {
501 Self {
502 target: Some(target),
503 #[cfg(feature = "p2p")]
504 p2p: false,
505 max_queued: None,
506 guid: None,
507 internal_executor: true,
508 interfaces: HashMap::new(),
509 names: HashSet::new(),
510 auth_mechanism: None,
511 #[cfg(feature = "bus-impl")]
512 unique_name: None,
513 request_name_flags: BitFlags::default(),
514 method_timeout: None,
515 user_id: None,
516 }
517 }
518
519 async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
520 #[cfg(not(feature = "bus-impl"))]
521 let unique_name = None;
522 #[cfg(feature = "bus-impl")]
523 let unique_name = self.unique_name.take().map(Into::into);
524
525 #[allow(unused_mut)]
526 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
527 if authenticated {
528 let (socket_read, socket_write) = stream.take();
529 Ok(Authenticated {
530 #[cfg(unix)]
531 cap_unix_fd: socket_read.can_pass_unix_fd(),
532 socket_read: Some(socket_read),
533 socket_write,
534 // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
535 server_guid: server_guid.unwrap(),
536 already_received_bytes: vec![],
537 unique_name,
538 #[cfg(unix)]
539 already_received_fds: vec![],
540 })
541 } else {
542 #[cfg(feature = "p2p")]
543 match self.guid.take() {
544 None => {
545 // SASL Handshake
546 Authenticated::client(
547 stream,
548 server_guid,
549 self.auth_mechanism,
550 is_bus_conn,
551 self.user_id,
552 )
553 .await
554 }
555 Some(guid) => {
556 if !self.p2p {
557 return Err(Error::Unsupported);
558 }
559
560 let creds = stream.read_mut().peer_credentials().await?;
561 #[cfg(unix)]
562 let client_uid = self.user_id.or_else(|| creds.unix_user_id());
563 #[cfg(windows)]
564 let client_sid = creds.into_windows_sid();
565
566 Authenticated::server(
567 stream,
568 guid.to_owned().into(),
569 #[cfg(unix)]
570 client_uid,
571 #[cfg(windows)]
572 client_sid,
573 self.auth_mechanism,
574 unique_name,
575 )
576 .await
577 }
578 }
579
580 #[cfg(not(feature = "p2p"))]
581 Authenticated::client(
582 stream,
583 server_guid,
584 self.auth_mechanism,
585 is_bus_conn,
586 self.user_id,
587 )
588 .await
589 }
590 }
591
592 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
593 let mut authenticated = false;
594 let mut guid = None;
595 // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
596 // once.
597 let split = match self.target.take().unwrap() {
598 #[cfg(not(feature = "tokio"))]
599 Target::UnixStream(stream) => Async::new(stream)?.into(),
600 #[cfg(all(unix, feature = "tokio"))]
601 Target::UnixStream(stream) => stream.into(),
602 #[cfg(not(feature = "tokio"))]
603 Target::TcpStream(stream) => Async::new(stream)?.into(),
604 #[cfg(feature = "tokio")]
605 Target::TcpStream(stream) => stream.into(),
606 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
607 Target::VsockStream(stream) => Async::new(stream)?.into(),
608 #[cfg(feature = "tokio-vsock")]
609 Target::VsockStream(stream) => stream.into(),
610 Target::Address(address) => {
611 guid = address.guid().map(|g| g.to_owned().into());
612 match address.connect().await? {
613 #[cfg(any(unix, not(feature = "tokio")))]
614 address::transport::Stream::Unix(stream) => stream.into(),
615 #[cfg(unix)]
616 address::transport::Stream::Unixexec(stream) => stream.into(),
617 address::transport::Stream::Tcp(stream) => stream.into(),
618 #[cfg(any(
619 all(feature = "vsock", not(feature = "tokio")),
620 feature = "tokio-vsock"
621 ))]
622 address::transport::Stream::Vsock(stream) => stream.into(),
623 }
624 }
625 Target::Socket(stream) => stream,
626 Target::AuthenticatedSocket(stream) => {
627 authenticated = true;
628 guid = self.guid.take().map(Into::into);
629 stream
630 }
631 };
632
633 Ok((split, guid, authenticated))
634 }
635}
636
637/// Start the internal executor thread.
638///
639/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
640/// tasks until socket reader task kicks in.
641#[cfg(not(feature = "tokio"))]
642fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
643 if internal_executor {
644 let executor = executor.clone();
645 std::thread::Builder::new()
646 .name("zbus::Connection executor".into())
647 .spawn(move || {
648 crate::utils::block_on(async move {
649 // Run as long as there is a task to run.
650 while !executor.is_empty() {
651 executor.tick().await;
652 }
653 })
654 })?;
655 }
656
657 Ok(())
658}