zbus/connection/builder.rs
1use async_broadcast::Receiver as ActiveReceiver;
2#[cfg(not(feature = "tokio"))]
3use async_io::Async;
4use enumflags2::BitFlags;
5use event_listener::Event;
6#[cfg(not(feature = "tokio"))]
7use std::net::TcpStream;
8#[cfg(all(unix, not(feature = "tokio")))]
9use std::os::unix::net::UnixStream;
10use std::{
11 collections::{HashMap, HashSet},
12 vec,
13};
14#[cfg(feature = "tokio")]
15use tokio::net::TcpStream;
16#[cfg(all(unix, feature = "tokio"))]
17use tokio::net::UnixStream;
18#[cfg(feature = "tokio-vsock")]
19use tokio_vsock::VsockStream;
20#[cfg(all(windows, not(feature = "tokio")))]
21use uds_windows::UnixStream;
22#[cfg(all(feature = "vsock", not(feature = "tokio")))]
23use vsock::VsockStream;
24
25use zvariant::ObjectPath;
26
27#[cfg(feature = "bus-impl")]
28use crate::MessageStream;
29use crate::{
30 Connection, Error, Executor, Guid, OwnedGuid, Result,
31 address::{self, Address},
32 fdo::RequestNameFlags,
33 message::Message,
34 names::{InterfaceName, WellKnownName},
35 object_server::{ArcInterface, Interface},
36};
37
38use super::{
39 handshake::{AuthMechanism, Authenticated},
40 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
41};
42
43const DEFAULT_MAX_QUEUED: usize = 64;
44
45#[derive(Debug)]
46enum Target {
47 #[cfg(any(unix, not(feature = "tokio")))]
48 UnixStream(UnixStream),
49 TcpStream(TcpStream),
50 #[cfg(any(
51 all(feature = "vsock", not(feature = "tokio")),
52 feature = "tokio-vsock"
53 ))]
54 VsockStream(VsockStream),
55 Address(Address),
56 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
57 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
58}
59
60type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
61
62/// A builder for [`zbus::Connection`].
63///
64/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
65/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
66/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
67///
68/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
69/// enqueued and that the requested name might not be available right after building.
70///
71/// 2. The name may be acquired in between the time the name is requested and the
72/// [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
73/// [`crate::fdo::NameAcquired`] signal.
74#[derive(Debug)]
75#[must_use]
76pub struct Builder<'a> {
77 target: Option<Target>,
78 max_queued: Option<usize>,
79 // This is only set for p2p server case or pre-authenticated sockets.
80 guid: Option<Guid<'a>>,
81 #[cfg(feature = "p2p")]
82 p2p: bool,
83 internal_executor: bool,
84 interfaces: Interfaces<'a>,
85 names: HashSet<WellKnownName<'a>>,
86 auth_mechanism: Option<AuthMechanism>,
87 #[cfg(feature = "bus-impl")]
88 unique_name: Option<crate::names::UniqueName<'a>>,
89 request_name_flags: BitFlags<RequestNameFlags>,
90 method_timeout: Option<std::time::Duration>,
91 user_id: Option<u32>,
92}
93
94impl<'a> Builder<'a> {
95 /// Create a builder for the session/user message bus connection.
96 pub fn session() -> Result<Self> {
97 Ok(Self::new(Target::Address(Address::session()?)))
98 }
99
100 /// Create a builder for the system-wide message bus connection.
101 pub fn system() -> Result<Self> {
102 Ok(Self::new(Target::Address(Address::system()?)))
103 }
104
105 /// Create a builder for an IBus connection.
106 ///
107 /// IBus (Intelligent Input Bus) is an input method framework. This method creates a builder
108 /// that will query the IBus daemon for its D-Bus address using the `ibus address` command.
109 ///
110 /// # Platform Support
111 ///
112 /// This method is available on Unix-like systems where IBus is installed.
113 ///
114 /// # Errors
115 ///
116 /// Returns an error if:
117 /// - The `ibus` command is not found or fails to execute
118 /// - The IBus daemon is not running
119 /// - The command output cannot be parsed as a valid D-Bus address
120 ///
121 /// # Example
122 ///
123 /// ```no_run
124 /// # use std::error::Error;
125 /// # use zbus::connection::Builder;
126 /// # use zbus::block_on;
127 /// #
128 /// # block_on(async {
129 /// let conn = Builder::ibus()?
130 /// .build()
131 /// .await?;
132 ///
133 /// // Use the connection to interact with IBus services
134 /// # drop(conn);
135 /// # Ok::<(), zbus::Error>(())
136 /// # }).unwrap();
137 /// #
138 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
139 /// ```
140 #[cfg(unix)]
141 pub fn ibus() -> Result<Self> {
142 use crate::address::transport::{Ibus, Transport};
143 Ok(Self::new(Target::Address(Address::from(Transport::Ibus(
144 Ibus::new(),
145 )))))
146 }
147
148 /// Create a builder for a connection that will use the given [D-Bus bus address].
149 ///
150 /// # Example
151 ///
152 /// Here is an example of connecting to an IBus service:
153 ///
154 /// ```no_run
155 /// # use std::error::Error;
156 /// # use zbus::connection::Builder;
157 /// # use zbus::block_on;
158 /// #
159 /// # block_on(async {
160 /// let addr = "unix:\
161 /// path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
162 /// guid=fdd08e811a6c7ebe1fef0d9e647230da";
163 /// let conn = Builder::address(addr)?
164 /// .build()
165 /// .await?;
166 ///
167 /// // Do something useful with `conn`..
168 /// # drop(conn);
169 /// # Ok::<(), zbus::Error>(())
170 /// # }).unwrap();
171 /// #
172 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
173 /// ```
174 ///
175 /// **Note:** The IBus address is different for each session. You can find the address for your
176 /// current session using `ibus address` command. For a more convenient way to connect to IBus,
177 /// see [`Builder::ibus`].
178 ///
179 /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
180 pub fn address<A>(address: A) -> Result<Self>
181 where
182 A: TryInto<Address>,
183 A::Error: Into<Error>,
184 {
185 Ok(Self::new(Target::Address(
186 address.try_into().map_err(Into::into)?,
187 )))
188 }
189
190 /// Create a builder for a connection that will use the given unix stream.
191 ///
192 /// If the default `async-io` feature is disabled, this method will expect a
193 /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
194 /// argument.
195 ///
196 /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
197 /// is not available when the `tokio` feature is enabled and building for Windows target.
198 ///
199 /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
200 #[cfg(any(unix, not(feature = "tokio")))]
201 pub fn unix_stream(stream: UnixStream) -> Self {
202 Self::new(Target::UnixStream(stream))
203 }
204
205 /// Create a builder for a connection that will use the given TCP stream.
206 ///
207 /// If the default `async-io` feature is disabled, this method will expect a
208 /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
209 /// argument.
210 pub fn tcp_stream(stream: TcpStream) -> Self {
211 Self::new(Target::TcpStream(stream))
212 }
213
214 /// Create a builder for a connection that will use the given VSOCK stream.
215 ///
216 /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
217 /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
218 /// with `tokio-vsock` feature.
219 #[cfg(any(
220 all(feature = "vsock", not(feature = "tokio")),
221 feature = "tokio-vsock"
222 ))]
223 pub fn vsock_stream(stream: VsockStream) -> Self {
224 Self::new(Target::VsockStream(stream))
225 }
226
227 /// Create a builder for a connection that will use the given socket.
228 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
229 Self::new(Target::Socket(socket.into()))
230 }
231
232 /// Create a builder for a connection that will use the given pre-authenticated socket.
233 ///
234 /// This is similar to [`Builder::socket`], except that the socket is either already
235 /// authenticated or does not require authentication.
236 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
237 where
238 S: Into<BoxedSplit>,
239 G: TryInto<Guid<'a>>,
240 G::Error: Into<Error>,
241 {
242 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
243 builder.guid = Some(guid.try_into().map_err(Into::into)?);
244
245 Ok(builder)
246 }
247
248 /// Specify the mechanism to use during authentication.
249 pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
250 self.auth_mechanism = Some(auth_mechanism);
251
252 self
253 }
254
255 /// Specify the user id during authentication.
256 ///
257 /// This can be useful when using [`AuthMechanism::External`] with `socat`
258 /// to avoid the host decide what uid to use and instead provide one
259 /// known to have access rights.
260 #[cfg(unix)]
261 pub fn user_id(mut self, id: u32) -> Self {
262 self.user_id = Some(id);
263
264 self
265 }
266
267 /// The to-be-created connection will be a peer-to-peer connection.
268 ///
269 /// This method is only available when the `p2p` feature is enabled.
270 #[cfg(feature = "p2p")]
271 pub fn p2p(mut self) -> Self {
272 self.p2p = true;
273
274 self
275 }
276
277 /// The to-be-created connection will be a server using the given GUID.
278 ///
279 /// The to-be-created connection will wait for incoming client authentication handshake and
280 /// negotiation messages, for peer-to-peer communications after successful creation.
281 ///
282 /// This method is only available when the `p2p` feature is enabled.
283 ///
284 /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
285 /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
286 /// server and a client connection, except for authentication.
287 #[cfg(feature = "p2p")]
288 pub fn server<G>(mut self, guid: G) -> Result<Self>
289 where
290 G: TryInto<Guid<'a>>,
291 G::Error: Into<Error>,
292 {
293 self.guid = Some(guid.try_into().map_err(Into::into)?);
294
295 Ok(self)
296 }
297
298 /// Set the capacity of the main (unfiltered) queue.
299 ///
300 /// Since typically you'd want to set this at instantiation time, you can set it through the
301 /// builder.
302 ///
303 /// # Example
304 ///
305 /// ```
306 /// # use std::error::Error;
307 /// # use zbus::connection::Builder;
308 /// # use zbus::block_on;
309 /// #
310 /// # block_on(async {
311 /// let conn = Builder::session()?
312 /// .max_queued(30)
313 /// .build()
314 /// .await?;
315 /// assert_eq!(conn.max_queued(), 30);
316 ///
317 /// # Ok::<(), zbus::Error>(())
318 /// # }).unwrap();
319 /// #
320 /// // Do something useful with `conn`..
321 /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
322 /// ```
323 pub fn max_queued(mut self, max: usize) -> Self {
324 self.max_queued = Some(max);
325
326 self
327 }
328
329 /// Enable or disable the internal executor thread.
330 ///
331 /// The thread is enabled by default.
332 ///
333 /// See [Connection::executor] for more details.
334 pub fn internal_executor(mut self, enabled: bool) -> Self {
335 self.internal_executor = enabled;
336
337 self
338 }
339
340 /// Register a D-Bus [`Interface`] to be served at a given path.
341 ///
342 /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
343 /// interfaces available immediately after the connection is established. Typically, this is
344 /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
345 /// replace any previously added interface with the same name at the same path.
346 ///
347 /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
348 /// attempt to add yours, [`Builder::build()`] will fail.
349 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
350 where
351 I: Interface,
352 P: TryInto<ObjectPath<'a>>,
353 P::Error: Into<Error>,
354 {
355 let path = path.try_into().map_err(Into::into)?;
356 let entry = self.interfaces.entry(path).or_default();
357 entry.insert(I::name(), ArcInterface::new(iface));
358 Ok(self)
359 }
360
361 /// Register a well-known name for this connection on the bus.
362 ///
363 /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
364 /// of the connection setup ([`Builder::build`]), immediately after interfaces
365 /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
366 /// exactly what you want.
367 ///
368 /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
369 /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
370 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
371 where
372 W: TryInto<WellKnownName<'a>>,
373 W::Error: Into<Error>,
374 {
375 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
376 self.names.insert(well_known_name);
377
378 Ok(self)
379 }
380
381 /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
382 /// requesting names.
383 pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
384 self.request_name_flags
385 .set(RequestNameFlags::AllowReplacement, allow_replacement);
386 self
387 }
388
389 /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
390 /// requesting names.
391 pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
392 self.request_name_flags
393 .set(RequestNameFlags::ReplaceExisting, replace_existing);
394 self
395 }
396
397 /// Set the unique name of the connection.
398 ///
399 /// This is mainly provided for bus implementations. All other users should not need to use this
400 /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
401 ///
402 /// # Panics
403 ///
404 /// It will panic if the connection is to a message bus as it's the bus that assigns
405 /// peers their unique names.
406 #[cfg(feature = "bus-impl")]
407 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
408 where
409 U: TryInto<crate::names::UniqueName<'a>>,
410 U::Error: Into<Error>,
411 {
412 if !self.p2p {
413 panic!("unique name can only be set for peer-to-peer connections");
414 }
415 let name = unique_name.try_into().map_err(Into::into)?;
416 self.unique_name = Some(name);
417
418 Ok(self)
419 }
420
421 /// Set a timeout for method calls.
422 ///
423 /// Method calls will return
424 /// `zbus::Error::InputOutput(std::io::Error(kind: ErrorKind::TimedOut))` if a client does not
425 /// receive an answer from a service in time.
426 pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
427 self.method_timeout = Some(timeout);
428
429 self
430 }
431
432 /// Build the connection, consuming the builder.
433 ///
434 /// # Errors
435 ///
436 /// Until server-side bus connection is supported, attempting to build such a connection will
437 /// result in a [`Error::Unsupported`] error.
438 pub async fn build(self) -> Result<Connection> {
439 let (conn, _) = self.build_inner(false).await?;
440 Ok(conn)
441 }
442
443 /// Build the connection and return a [`MessageStream`] to receive messages from it.
444 ///
445 /// This is equivalent to [`Self::build`] followed by `MessageStream::from(&conn)`, except
446 /// that the stream is set up **before** the socket-reader task is started. No messages can
447 /// therefore be lost in the window between `build()` returning and `MessageStream::from`
448 /// being called. Use this when the peer may pipeline traffic right after authentication —
449 /// e.g. a bus implementation reading a `Hello` method call from a just-connected client.
450 ///
451 /// To get the [`Connection`] out of the returned stream, use `Connection::from(&stream)` —
452 /// this is cheap (an `Arc` clone).
453 ///
454 /// This method is only available when the `bus-impl` feature is enabled.
455 ///
456 /// # Example
457 ///
458 /// ```
459 /// # use futures_util::StreamExt;
460 /// # use zbus::{
461 /// # Connection, Guid, block_on,
462 /// # connection::{Builder, socket::Channel},
463 /// # message::Message,
464 /// # };
465 /// #
466 /// # block_on(async {
467 /// let guid = Guid::generate();
468 /// let (c1, c2) = Channel::pair();
469 ///
470 /// // Bus client sends a method call right away (simulates pipelining after auth).
471 /// let client = Builder::authenticated_socket(c1, guid.clone())
472 /// .unwrap()
473 /// .build()
474 /// .await
475 /// .unwrap();
476 /// let hello = Message::method_call("/org/freedesktop/DBus", "Hello")
477 /// .unwrap()
478 /// .destination("org.freedesktop.DBus")
479 /// .unwrap()
480 /// .build(&())
481 /// .unwrap();
482 /// client.send(&hello).await.unwrap();
483 ///
484 /// // Server builds *after* the client has already sent.
485 /// let mut stream = Builder::authenticated_socket(c2, guid)
486 /// .unwrap()
487 /// .p2p()
488 /// .build_message_stream()
489 /// .await
490 /// .unwrap();
491 ///
492 /// let msg = stream.next().await.unwrap().unwrap();
493 /// assert_eq!(msg.header().member().unwrap().as_str(), "Hello");
494 ///
495 /// let _conn: Connection = (&stream).into();
496 /// # });
497 /// ```
498 #[cfg(feature = "bus-impl")]
499 pub async fn build_message_stream(self) -> Result<MessageStream> {
500 let (conn, msg_receiver) = self.build_inner(true).await?;
501 let msg_receiver = msg_receiver.expect("build_inner(true) always returns Some");
502
503 Ok(MessageStream::for_subscription_channel(
504 msg_receiver,
505 None,
506 &conn,
507 ))
508 }
509
510 async fn build_inner(
511 self,
512 activate_msg_stream: bool,
513 ) -> Result<(Connection, Option<ActiveReceiver<Result<Message>>>)> {
514 let executor = Executor::new();
515 #[cfg(not(feature = "tokio"))]
516 let internal_executor = self.internal_executor;
517 // Box the future as it's large and can cause stack overflow.
518 let conn =
519 Box::pin(executor.run(self.build_(executor.clone(), activate_msg_stream))).await?;
520
521 #[cfg(not(feature = "tokio"))]
522 start_internal_executor(&executor, internal_executor)?;
523
524 Ok(conn)
525 }
526
527 async fn build_(
528 mut self,
529 executor: Executor<'static>,
530 activate_msg_stream: bool,
531 ) -> Result<(Connection, Option<ActiveReceiver<Result<Message>>>)> {
532 #[cfg(feature = "p2p")]
533 let is_bus_conn = !self.p2p;
534 #[cfg(not(feature = "p2p"))]
535 let is_bus_conn = true;
536
537 let mut auth = self.connect(is_bus_conn).await?;
538
539 // SAFETY: `Authenticated` is always built with these fields set to `Some`.
540 let socket_read = auth.socket_read.take().unwrap();
541 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
542 #[cfg(unix)]
543 let already_received_fds = auth.already_received_fds.drain(..).collect();
544
545 let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
546 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
547
548 if !self.interfaces.is_empty() {
549 let object_server = conn.ensure_object_server(false);
550 for (path, interfaces) in self.interfaces {
551 for (name, iface) in interfaces {
552 let added = object_server
553 .add_arc_interface(path.clone(), name.clone(), iface.clone())
554 .await?;
555 if !added {
556 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
557 }
558 }
559 }
560
561 let started_event = Event::new();
562 let listener = started_event.listen();
563 conn.start_object_server(Some(started_event));
564
565 listener.await;
566 }
567
568 // Set up a message receiver before the socket-reader task is spawned so that the
569 // caller cannot miss early messages due to a race with the reader task.
570 let msg_receiver = activate_msg_stream.then(|| conn.inner.msg_receiver.activate_cloned());
571
572 // Start the socket reader task.
573 conn.init_socket_reader(
574 socket_read,
575 already_received_bytes,
576 #[cfg(unix)]
577 already_received_fds,
578 );
579
580 for name in self.names {
581 conn.request_name_with_flags(name, self.request_name_flags)
582 .await?;
583 }
584
585 Ok((conn, msg_receiver))
586 }
587
588 fn new(target: Target) -> Self {
589 Self {
590 target: Some(target),
591 #[cfg(feature = "p2p")]
592 p2p: false,
593 max_queued: None,
594 guid: None,
595 internal_executor: true,
596 interfaces: HashMap::new(),
597 names: HashSet::new(),
598 auth_mechanism: None,
599 #[cfg(feature = "bus-impl")]
600 unique_name: None,
601 request_name_flags: BitFlags::default(),
602 method_timeout: None,
603 user_id: None,
604 }
605 }
606
607 async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
608 #[cfg(not(feature = "bus-impl"))]
609 let unique_name = None;
610 #[cfg(feature = "bus-impl")]
611 let unique_name = self.unique_name.take().map(Into::into);
612
613 #[allow(unused_mut)]
614 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
615 if authenticated {
616 let (socket_read, socket_write) = stream.take();
617 Ok(Authenticated {
618 #[cfg(unix)]
619 cap_unix_fd: socket_read.can_pass_unix_fd(),
620 socket_read: Some(socket_read),
621 socket_write,
622 // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
623 server_guid: server_guid.unwrap(),
624 already_received_bytes: vec![],
625 unique_name,
626 #[cfg(unix)]
627 already_received_fds: vec![],
628 })
629 } else {
630 #[cfg(feature = "p2p")]
631 match self.guid.take() {
632 None => {
633 // SASL Handshake
634 Authenticated::client(
635 stream,
636 server_guid,
637 self.auth_mechanism,
638 is_bus_conn,
639 self.user_id,
640 )
641 .await
642 }
643 Some(guid) => {
644 if !self.p2p {
645 return Err(Error::Unsupported);
646 }
647
648 let creds = stream.read_mut().peer_credentials().await?;
649 #[cfg(unix)]
650 let client_uid = self.user_id.or_else(|| creds.unix_user_id());
651 #[cfg(windows)]
652 let client_sid = creds.into_windows_sid();
653
654 Authenticated::server(
655 stream,
656 guid.to_owned().into(),
657 #[cfg(unix)]
658 client_uid,
659 #[cfg(windows)]
660 client_sid,
661 self.auth_mechanism,
662 unique_name,
663 )
664 .await
665 }
666 }
667
668 #[cfg(not(feature = "p2p"))]
669 Authenticated::client(
670 stream,
671 server_guid,
672 self.auth_mechanism,
673 is_bus_conn,
674 self.user_id,
675 )
676 .await
677 }
678 }
679
680 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
681 let mut authenticated = false;
682 let mut guid = None;
683 // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
684 // once.
685 let split = match self.target.take().unwrap() {
686 #[cfg(not(feature = "tokio"))]
687 Target::UnixStream(stream) => Async::new(stream)?.into(),
688 #[cfg(all(unix, feature = "tokio"))]
689 Target::UnixStream(stream) => stream.into(),
690 #[cfg(not(feature = "tokio"))]
691 Target::TcpStream(stream) => Async::new(stream)?.into(),
692 #[cfg(feature = "tokio")]
693 Target::TcpStream(stream) => stream.into(),
694 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
695 Target::VsockStream(stream) => Async::new(stream)?.into(),
696 #[cfg(feature = "tokio-vsock")]
697 Target::VsockStream(stream) => stream.into(),
698 Target::Address(address) => {
699 guid = address.guid().map(|g| g.to_owned().into());
700 match address.connect().await? {
701 #[cfg(any(unix, not(feature = "tokio")))]
702 address::transport::Stream::Unix(stream) => stream.into(),
703 #[cfg(unix)]
704 address::transport::Stream::Unixexec(stream) => stream.into(),
705 address::transport::Stream::Tcp(stream) => stream.into(),
706 #[cfg(any(
707 all(feature = "vsock", not(feature = "tokio")),
708 feature = "tokio-vsock"
709 ))]
710 address::transport::Stream::Vsock(stream) => stream.into(),
711 }
712 }
713 Target::Socket(stream) => stream,
714 Target::AuthenticatedSocket(stream) => {
715 authenticated = true;
716 guid = self.guid.take().map(Into::into);
717 stream
718 }
719 };
720
721 Ok((split, guid, authenticated))
722 }
723}
724
725/// Start the internal executor thread.
726///
727/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
728/// tasks until socket reader task kicks in.
729#[cfg(not(feature = "tokio"))]
730fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
731 if internal_executor {
732 let executor = executor.clone();
733 std::thread::Builder::new()
734 .name("zbus::Connection executor".into())
735 .spawn(move || {
736 crate::utils::block_on(async move {
737 // Run as long as there is a task to run.
738 while !executor.is_empty() {
739 executor.tick().await;
740 }
741 })
742 })?;
743 }
744
745 Ok(())
746}