Skip to main content

zbus/connection/
builder.rs

1use async_broadcast::Receiver as ActiveReceiver;
2#[cfg(not(feature = "tokio"))]
3use async_io::Async;
4use enumflags2::BitFlags;
5use event_listener::Event;
6#[cfg(not(feature = "tokio"))]
7use std::net::TcpStream;
8#[cfg(all(unix, not(feature = "tokio")))]
9use std::os::unix::net::UnixStream;
10use std::{
11    collections::{HashMap, HashSet},
12    vec,
13};
14#[cfg(feature = "tokio")]
15use tokio::net::TcpStream;
16#[cfg(all(unix, feature = "tokio"))]
17use tokio::net::UnixStream;
18#[cfg(feature = "tokio-vsock")]
19use tokio_vsock::VsockStream;
20#[cfg(all(windows, not(feature = "tokio")))]
21use uds_windows::UnixStream;
22#[cfg(all(feature = "vsock", not(feature = "tokio")))]
23use vsock::VsockStream;
24
25use zvariant::ObjectPath;
26
27#[cfg(feature = "bus-impl")]
28use crate::MessageStream;
29use crate::{
30    Connection, Error, Executor, Guid, OwnedGuid, Result,
31    address::{self, Address},
32    fdo::RequestNameFlags,
33    message::Message,
34    names::{InterfaceName, WellKnownName},
35    object_server::{ArcInterface, Interface},
36};
37
38use super::{
39    handshake::{AuthMechanism, Authenticated},
40    socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
41};
42
43const DEFAULT_MAX_QUEUED: usize = 64;
44
45#[derive(Debug)]
46enum Target {
47    #[cfg(any(unix, not(feature = "tokio")))]
48    UnixStream(UnixStream),
49    TcpStream(TcpStream),
50    #[cfg(any(
51        all(feature = "vsock", not(feature = "tokio")),
52        feature = "tokio-vsock"
53    ))]
54    VsockStream(VsockStream),
55    Address(Address),
56    Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
57    AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
58}
59
60type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
61
62/// A builder for [`zbus::Connection`].
63///
64/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
65/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
66/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
67///
68/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
69///    enqueued and that the requested name might not be available right after building.
70///
71/// 2. The name may be acquired in between the time the name is requested and the
72///    [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
73///    [`crate::fdo::NameAcquired`] signal.
74#[derive(Debug)]
75#[must_use]
76pub struct Builder<'a> {
77    target: Option<Target>,
78    max_queued: Option<usize>,
79    // This is only set for p2p server case or pre-authenticated sockets.
80    guid: Option<Guid<'a>>,
81    #[cfg(feature = "p2p")]
82    p2p: bool,
83    internal_executor: bool,
84    interfaces: Interfaces<'a>,
85    names: HashSet<WellKnownName<'a>>,
86    auth_mechanism: Option<AuthMechanism>,
87    #[cfg(feature = "bus-impl")]
88    unique_name: Option<crate::names::UniqueName<'a>>,
89    request_name_flags: BitFlags<RequestNameFlags>,
90    method_timeout: Option<std::time::Duration>,
91    user_id: Option<u32>,
92}
93
94impl<'a> Builder<'a> {
95    /// Create a builder for the session/user message bus connection.
96    pub fn session() -> Result<Self> {
97        Ok(Self::new(Target::Address(Address::session()?)))
98    }
99
100    /// Create a builder for the system-wide message bus connection.
101    pub fn system() -> Result<Self> {
102        Ok(Self::new(Target::Address(Address::system()?)))
103    }
104
105    /// Create a builder for an IBus connection.
106    ///
107    /// IBus (Intelligent Input Bus) is an input method framework. This method creates a builder
108    /// that will query the IBus daemon for its D-Bus address using the `ibus address` command.
109    ///
110    /// # Platform Support
111    ///
112    /// This method is available on Unix-like systems where IBus is installed.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if:
117    /// - The `ibus` command is not found or fails to execute
118    /// - The IBus daemon is not running
119    /// - The command output cannot be parsed as a valid D-Bus address
120    ///
121    /// # Example
122    ///
123    /// ```no_run
124    /// # use std::error::Error;
125    /// # use zbus::connection::Builder;
126    /// # use zbus::block_on;
127    /// #
128    /// # block_on(async {
129    /// let conn = Builder::ibus()?
130    ///     .build()
131    ///     .await?;
132    ///
133    /// // Use the connection to interact with IBus services
134    /// # drop(conn);
135    /// # Ok::<(), zbus::Error>(())
136    /// # }).unwrap();
137    /// #
138    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
139    /// ```
140    #[cfg(unix)]
141    pub fn ibus() -> Result<Self> {
142        use crate::address::transport::{Ibus, Transport};
143        Ok(Self::new(Target::Address(Address::from(Transport::Ibus(
144            Ibus::new(),
145        )))))
146    }
147
148    /// Create a builder for a connection that will use the given [D-Bus bus address].
149    ///
150    /// # Example
151    ///
152    /// Here is an example of connecting to an IBus service:
153    ///
154    /// ```no_run
155    /// # use std::error::Error;
156    /// # use zbus::connection::Builder;
157    /// # use zbus::block_on;
158    /// #
159    /// # block_on(async {
160    /// let addr = "unix:\
161    ///     path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
162    ///     guid=fdd08e811a6c7ebe1fef0d9e647230da";
163    /// let conn = Builder::address(addr)?
164    ///     .build()
165    ///     .await?;
166    ///
167    /// // Do something useful with `conn`..
168    /// #     drop(conn);
169    /// #     Ok::<(), zbus::Error>(())
170    /// # }).unwrap();
171    /// #
172    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
173    /// ```
174    ///
175    /// **Note:** The IBus address is different for each session. You can find the address for your
176    /// current session using `ibus address` command. For a more convenient way to connect to IBus,
177    /// see [`Builder::ibus`].
178    ///
179    /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
180    pub fn address<A>(address: A) -> Result<Self>
181    where
182        A: TryInto<Address>,
183        A::Error: Into<Error>,
184    {
185        Ok(Self::new(Target::Address(
186            address.try_into().map_err(Into::into)?,
187        )))
188    }
189
190    /// Create a builder for a connection that will use the given unix stream.
191    ///
192    /// If the default `async-io` feature is disabled, this method will expect a
193    /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
194    /// argument.
195    ///
196    /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
197    /// is not available when the `tokio` feature is enabled and building for Windows target.
198    ///
199    /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
200    #[cfg(any(unix, not(feature = "tokio")))]
201    pub fn unix_stream(stream: UnixStream) -> Self {
202        Self::new(Target::UnixStream(stream))
203    }
204
205    /// Create a builder for a connection that will use the given TCP stream.
206    ///
207    /// If the default `async-io` feature is disabled, this method will expect a
208    /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
209    /// argument.
210    pub fn tcp_stream(stream: TcpStream) -> Self {
211        Self::new(Target::TcpStream(stream))
212    }
213
214    /// Create a builder for a connection that will use the given VSOCK stream.
215    ///
216    /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
217    /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
218    /// with `tokio-vsock` feature.
219    #[cfg(any(
220        all(feature = "vsock", not(feature = "tokio")),
221        feature = "tokio-vsock"
222    ))]
223    pub fn vsock_stream(stream: VsockStream) -> Self {
224        Self::new(Target::VsockStream(stream))
225    }
226
227    /// Create a builder for a connection that will use the given socket.
228    pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
229        Self::new(Target::Socket(socket.into()))
230    }
231
232    /// Create a builder for a connection that will use the given pre-authenticated socket.
233    ///
234    /// This is similar to [`Builder::socket`], except that the socket is either already
235    /// authenticated or does not require authentication.
236    pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
237    where
238        S: Into<BoxedSplit>,
239        G: TryInto<Guid<'a>>,
240        G::Error: Into<Error>,
241    {
242        let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
243        builder.guid = Some(guid.try_into().map_err(Into::into)?);
244
245        Ok(builder)
246    }
247
248    /// Specify the mechanism to use during authentication.
249    pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
250        self.auth_mechanism = Some(auth_mechanism);
251
252        self
253    }
254
255    /// Specify the user id during authentication.
256    ///
257    /// This can be useful when using [`AuthMechanism::External`] with `socat`
258    /// to avoid the host decide what uid to use and instead provide one
259    /// known to have access rights.
260    #[cfg(unix)]
261    pub fn user_id(mut self, id: u32) -> Self {
262        self.user_id = Some(id);
263
264        self
265    }
266
267    /// The to-be-created connection will be a peer-to-peer connection.
268    ///
269    /// This method is only available when the `p2p` feature is enabled.
270    #[cfg(feature = "p2p")]
271    pub fn p2p(mut self) -> Self {
272        self.p2p = true;
273
274        self
275    }
276
277    /// The to-be-created connection will be a server using the given GUID.
278    ///
279    /// The to-be-created connection will wait for incoming client authentication handshake and
280    /// negotiation messages, for peer-to-peer communications after successful creation.
281    ///
282    /// This method is only available when the `p2p` feature is enabled.
283    ///
284    /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
285    /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
286    /// server and a client connection, except for authentication.
287    #[cfg(feature = "p2p")]
288    pub fn server<G>(mut self, guid: G) -> Result<Self>
289    where
290        G: TryInto<Guid<'a>>,
291        G::Error: Into<Error>,
292    {
293        self.guid = Some(guid.try_into().map_err(Into::into)?);
294
295        Ok(self)
296    }
297
298    /// Set the capacity of the main (unfiltered) queue.
299    ///
300    /// Since typically you'd want to set this at instantiation time, you can set it through the
301    /// builder.
302    ///
303    /// # Example
304    ///
305    /// ```
306    /// # use std::error::Error;
307    /// # use zbus::connection::Builder;
308    /// # use zbus::block_on;
309    /// #
310    /// # block_on(async {
311    /// let conn = Builder::session()?
312    ///     .max_queued(30)
313    ///     .build()
314    ///     .await?;
315    /// assert_eq!(conn.max_queued(), 30);
316    ///
317    /// #     Ok::<(), zbus::Error>(())
318    /// # }).unwrap();
319    /// #
320    /// // Do something useful with `conn`..
321    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
322    /// ```
323    pub fn max_queued(mut self, max: usize) -> Self {
324        self.max_queued = Some(max);
325
326        self
327    }
328
329    /// Enable or disable the internal executor thread.
330    ///
331    /// The thread is enabled by default.
332    ///
333    /// See [Connection::executor] for more details.
334    pub fn internal_executor(mut self, enabled: bool) -> Self {
335        self.internal_executor = enabled;
336
337        self
338    }
339
340    /// Register a D-Bus [`Interface`] to be served at a given path.
341    ///
342    /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
343    /// interfaces available immediately after the connection is established. Typically, this is
344    /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
345    /// replace any previously added interface with the same name at the same path.
346    ///
347    /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
348    /// attempt to add yours, [`Builder::build()`] will fail.
349    pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
350    where
351        I: Interface,
352        P: TryInto<ObjectPath<'a>>,
353        P::Error: Into<Error>,
354    {
355        let path = path.try_into().map_err(Into::into)?;
356        let entry = self.interfaces.entry(path).or_default();
357        entry.insert(I::name(), ArcInterface::new(iface));
358        Ok(self)
359    }
360
361    /// Register a well-known name for this connection on the bus.
362    ///
363    /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
364    /// of the connection setup ([`Builder::build`]), immediately after interfaces
365    /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
366    /// exactly what you want.
367    ///
368    /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
369    /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
370    pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
371    where
372        W: TryInto<WellKnownName<'a>>,
373        W::Error: Into<Error>,
374    {
375        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
376        self.names.insert(well_known_name);
377
378        Ok(self)
379    }
380
381    /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
382    /// requesting names.
383    pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
384        self.request_name_flags
385            .set(RequestNameFlags::AllowReplacement, allow_replacement);
386        self
387    }
388
389    /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
390    /// requesting names.
391    pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
392        self.request_name_flags
393            .set(RequestNameFlags::ReplaceExisting, replace_existing);
394        self
395    }
396
397    /// Set the unique name of the connection.
398    ///
399    /// This is mainly provided for bus implementations. All other users should not need to use this
400    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
401    ///
402    /// # Panics
403    ///
404    /// It will panic if the connection is to a message bus as it's the bus that assigns
405    /// peers their unique names.
406    #[cfg(feature = "bus-impl")]
407    pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
408    where
409        U: TryInto<crate::names::UniqueName<'a>>,
410        U::Error: Into<Error>,
411    {
412        if !self.p2p {
413            panic!("unique name can only be set for peer-to-peer connections");
414        }
415        let name = unique_name.try_into().map_err(Into::into)?;
416        self.unique_name = Some(name);
417
418        Ok(self)
419    }
420
421    /// Set a timeout for method calls.
422    ///
423    /// Method calls will return
424    /// `zbus::Error::InputOutput(std::io::Error(kind: ErrorKind::TimedOut))` if a client does not
425    /// receive an answer from a service in time.
426    pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
427        self.method_timeout = Some(timeout);
428
429        self
430    }
431
432    /// Build the connection, consuming the builder.
433    ///
434    /// # Errors
435    ///
436    /// Until server-side bus connection is supported, attempting to build such a connection will
437    /// result in a [`Error::Unsupported`] error.
438    pub async fn build(self) -> Result<Connection> {
439        let (conn, _) = self.build_inner(false).await?;
440        Ok(conn)
441    }
442
443    /// Build the connection and return a [`MessageStream`] to receive messages from it.
444    ///
445    /// This is equivalent to [`Self::build`] followed by `MessageStream::from(&conn)`, except
446    /// that the stream is set up **before** the socket-reader task is started. No messages can
447    /// therefore be lost in the window between `build()` returning and `MessageStream::from`
448    /// being called. Use this when the peer may pipeline traffic right after authentication —
449    /// e.g. a bus implementation reading a `Hello` method call from a just-connected client.
450    ///
451    /// To get the [`Connection`] out of the returned stream, use `Connection::from(&stream)` —
452    /// this is cheap (an `Arc` clone).
453    ///
454    /// This method is only available when the `bus-impl` feature is enabled.
455    ///
456    /// # Example
457    ///
458    /// ```
459    /// # use futures_util::StreamExt;
460    /// # use zbus::{
461    /// #     Connection, Guid, block_on,
462    /// #     connection::{Builder, socket::Channel},
463    /// #     message::Message,
464    /// # };
465    /// #
466    /// # block_on(async {
467    /// let guid = Guid::generate();
468    /// let (c1, c2) = Channel::pair();
469    ///
470    /// // Bus client sends a method call right away (simulates pipelining after auth).
471    /// let client = Builder::authenticated_socket(c1, guid.clone())
472    ///     .unwrap()
473    ///     .build()
474    ///     .await
475    ///     .unwrap();
476    /// let hello = Message::method_call("/org/freedesktop/DBus", "Hello")
477    ///     .unwrap()
478    ///     .destination("org.freedesktop.DBus")
479    ///     .unwrap()
480    ///     .build(&())
481    ///     .unwrap();
482    /// client.send(&hello).await.unwrap();
483    ///
484    /// // Server builds *after* the client has already sent.
485    /// let mut stream = Builder::authenticated_socket(c2, guid)
486    ///     .unwrap()
487    ///     .p2p()
488    ///     .build_message_stream()
489    ///     .await
490    ///     .unwrap();
491    ///
492    /// let msg = stream.next().await.unwrap().unwrap();
493    /// assert_eq!(msg.header().member().unwrap().as_str(), "Hello");
494    ///
495    /// let _conn: Connection = (&stream).into();
496    /// # });
497    /// ```
498    #[cfg(feature = "bus-impl")]
499    pub async fn build_message_stream(self) -> Result<MessageStream> {
500        let (conn, msg_receiver) = self.build_inner(true).await?;
501        let msg_receiver = msg_receiver.expect("build_inner(true) always returns Some");
502
503        Ok(MessageStream::for_subscription_channel(
504            msg_receiver,
505            None,
506            &conn,
507        ))
508    }
509
510    async fn build_inner(
511        self,
512        activate_msg_stream: bool,
513    ) -> Result<(Connection, Option<ActiveReceiver<Result<Message>>>)> {
514        let executor = Executor::new();
515        #[cfg(not(feature = "tokio"))]
516        let internal_executor = self.internal_executor;
517        // Box the future as it's large and can cause stack overflow.
518        let conn =
519            Box::pin(executor.run(self.build_(executor.clone(), activate_msg_stream))).await?;
520
521        #[cfg(not(feature = "tokio"))]
522        start_internal_executor(&executor, internal_executor)?;
523
524        Ok(conn)
525    }
526
527    async fn build_(
528        mut self,
529        executor: Executor<'static>,
530        activate_msg_stream: bool,
531    ) -> Result<(Connection, Option<ActiveReceiver<Result<Message>>>)> {
532        #[cfg(feature = "p2p")]
533        let is_bus_conn = !self.p2p;
534        #[cfg(not(feature = "p2p"))]
535        let is_bus_conn = true;
536
537        let mut auth = self.connect(is_bus_conn).await?;
538
539        // SAFETY: `Authenticated` is always built with these fields set to `Some`.
540        let socket_read = auth.socket_read.take().unwrap();
541        let already_received_bytes = auth.already_received_bytes.drain(..).collect();
542        #[cfg(unix)]
543        let already_received_fds = auth.already_received_fds.drain(..).collect();
544
545        let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
546        conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
547
548        if !self.interfaces.is_empty() {
549            let object_server = conn.ensure_object_server(false);
550            for (path, interfaces) in self.interfaces {
551                for (name, iface) in interfaces {
552                    let added = object_server
553                        .add_arc_interface(path.clone(), name.clone(), iface.clone())
554                        .await?;
555                    if !added {
556                        return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
557                    }
558                }
559            }
560
561            let started_event = Event::new();
562            let listener = started_event.listen();
563            conn.start_object_server(Some(started_event));
564
565            listener.await;
566        }
567
568        // Set up a message receiver before the socket-reader task is spawned so that the
569        // caller cannot miss early messages due to a race with the reader task.
570        let msg_receiver = activate_msg_stream.then(|| conn.inner.msg_receiver.activate_cloned());
571
572        // Start the socket reader task.
573        conn.init_socket_reader(
574            socket_read,
575            already_received_bytes,
576            #[cfg(unix)]
577            already_received_fds,
578        );
579
580        for name in self.names {
581            conn.request_name_with_flags(name, self.request_name_flags)
582                .await?;
583        }
584
585        Ok((conn, msg_receiver))
586    }
587
588    fn new(target: Target) -> Self {
589        Self {
590            target: Some(target),
591            #[cfg(feature = "p2p")]
592            p2p: false,
593            max_queued: None,
594            guid: None,
595            internal_executor: true,
596            interfaces: HashMap::new(),
597            names: HashSet::new(),
598            auth_mechanism: None,
599            #[cfg(feature = "bus-impl")]
600            unique_name: None,
601            request_name_flags: BitFlags::default(),
602            method_timeout: None,
603            user_id: None,
604        }
605    }
606
607    async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
608        #[cfg(not(feature = "bus-impl"))]
609        let unique_name = None;
610        #[cfg(feature = "bus-impl")]
611        let unique_name = self.unique_name.take().map(Into::into);
612
613        #[allow(unused_mut)]
614        let (mut stream, server_guid, authenticated) = self.target_connect().await?;
615        if authenticated {
616            let (socket_read, socket_write) = stream.take();
617            Ok(Authenticated {
618                #[cfg(unix)]
619                cap_unix_fd: socket_read.can_pass_unix_fd(),
620                socket_read: Some(socket_read),
621                socket_write,
622                // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
623                server_guid: server_guid.unwrap(),
624                already_received_bytes: vec![],
625                unique_name,
626                #[cfg(unix)]
627                already_received_fds: vec![],
628            })
629        } else {
630            #[cfg(feature = "p2p")]
631            match self.guid.take() {
632                None => {
633                    // SASL Handshake
634                    Authenticated::client(
635                        stream,
636                        server_guid,
637                        self.auth_mechanism,
638                        is_bus_conn,
639                        self.user_id,
640                    )
641                    .await
642                }
643                Some(guid) => {
644                    if !self.p2p {
645                        return Err(Error::Unsupported);
646                    }
647
648                    let creds = stream.read_mut().peer_credentials().await?;
649                    #[cfg(unix)]
650                    let client_uid = self.user_id.or_else(|| creds.unix_user_id());
651                    #[cfg(windows)]
652                    let client_sid = creds.into_windows_sid();
653
654                    Authenticated::server(
655                        stream,
656                        guid.to_owned().into(),
657                        #[cfg(unix)]
658                        client_uid,
659                        #[cfg(windows)]
660                        client_sid,
661                        self.auth_mechanism,
662                        unique_name,
663                    )
664                    .await
665                }
666            }
667
668            #[cfg(not(feature = "p2p"))]
669            Authenticated::client(
670                stream,
671                server_guid,
672                self.auth_mechanism,
673                is_bus_conn,
674                self.user_id,
675            )
676            .await
677        }
678    }
679
680    async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
681        let mut authenticated = false;
682        let mut guid = None;
683        // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
684        // once.
685        let split = match self.target.take().unwrap() {
686            #[cfg(not(feature = "tokio"))]
687            Target::UnixStream(stream) => Async::new(stream)?.into(),
688            #[cfg(all(unix, feature = "tokio"))]
689            Target::UnixStream(stream) => stream.into(),
690            #[cfg(not(feature = "tokio"))]
691            Target::TcpStream(stream) => Async::new(stream)?.into(),
692            #[cfg(feature = "tokio")]
693            Target::TcpStream(stream) => stream.into(),
694            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
695            Target::VsockStream(stream) => Async::new(stream)?.into(),
696            #[cfg(feature = "tokio-vsock")]
697            Target::VsockStream(stream) => stream.into(),
698            Target::Address(address) => {
699                guid = address.guid().map(|g| g.to_owned().into());
700                match address.connect().await? {
701                    #[cfg(any(unix, not(feature = "tokio")))]
702                    address::transport::Stream::Unix(stream) => stream.into(),
703                    #[cfg(unix)]
704                    address::transport::Stream::Unixexec(stream) => stream.into(),
705                    address::transport::Stream::Tcp(stream) => stream.into(),
706                    #[cfg(any(
707                        all(feature = "vsock", not(feature = "tokio")),
708                        feature = "tokio-vsock"
709                    ))]
710                    address::transport::Stream::Vsock(stream) => stream.into(),
711                }
712            }
713            Target::Socket(stream) => stream,
714            Target::AuthenticatedSocket(stream) => {
715                authenticated = true;
716                guid = self.guid.take().map(Into::into);
717                stream
718            }
719        };
720
721        Ok((split, guid, authenticated))
722    }
723}
724
725/// Start the internal executor thread.
726///
727/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
728/// tasks until socket reader task kicks in.
729#[cfg(not(feature = "tokio"))]
730fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
731    if internal_executor {
732        let executor = executor.clone();
733        std::thread::Builder::new()
734            .name("zbus::Connection executor".into())
735            .spawn(move || {
736                crate::utils::block_on(async move {
737                    // Run as long as there is a task to run.
738                    while !executor.is_empty() {
739                        executor.tick().await;
740                    }
741                })
742            })?;
743    }
744
745    Ok(())
746}