zbus/connection/
mod.rs

1//! Connection API.
2use async_broadcast::{InactiveReceiver, Receiver, Sender as Broadcaster, broadcast};
3use enumflags2::BitFlags;
4use event_listener::{Event, EventListener};
5use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
6use std::{
7    collections::HashMap,
8    io::{self, ErrorKind},
9    num::NonZeroU32,
10    pin::Pin,
11    sync::{Arc, OnceLock, Weak},
12    task::{Context, Poll},
13    time::Duration,
14};
15use tracing::{Instrument, debug, info_span, instrument, trace, trace_span, warn};
16use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
17use zvariant::ObjectPath;
18
19use futures_core::Future;
20use futures_lite::StreamExt;
21
22use crate::{
23    DBusError, Error, Executor, MatchRule, MessageStream, ObjectServer, OwnedGuid, OwnedMatchRule,
24    Result, Task,
25    async_lock::{Mutex, Semaphore, SemaphorePermit},
26    fdo::{ConnectionCredentials, ReleaseNameReply, RequestNameFlags, RequestNameReply},
27    is_flatpak,
28    message::{Flags, Message, Type},
29    timeout::timeout,
30};
31
32mod builder;
33pub use builder::Builder;
34
35pub mod socket;
36pub use socket::Socket;
37
38mod socket_reader;
39use socket_reader::SocketReader;
40
41pub(crate) mod handshake;
42pub use handshake::AuthMechanism;
43use handshake::Authenticated;
44
45const DEFAULT_MAX_QUEUED: usize = 64;
46const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
47
48/// Inner state shared by Connection and WeakConnection
49#[derive(Debug)]
50pub(crate) struct ConnectionInner {
51    server_guid: OwnedGuid,
52    #[cfg(unix)]
53    cap_unix_fd: bool,
54    #[cfg(feature = "p2p")]
55    bus_conn: bool,
56    unique_name: OnceLock<OwnedUniqueName>,
57    registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
58
59    activity_event: Arc<Event>,
60    socket_write: Mutex<Box<dyn socket::WriteHalf>>,
61
62    // Our executor
63    executor: Executor<'static>,
64
65    // Socket reader task
66    #[allow(unused)]
67    socket_reader_task: OnceLock<Task<()>>,
68
69    pub(crate) msg_receiver: InactiveReceiver<Result<Message>>,
70    pub(crate) method_return_receiver: InactiveReceiver<Result<Message>>,
71    msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
72
73    subscriptions: Mutex<Subscriptions>,
74
75    object_server: OnceLock<ObjectServer>,
76    object_server_dispatch_task: OnceLock<Task<()>>,
77
78    drop_event: Event,
79
80    method_timeout: Option<Duration>,
81    // Cache the credentials.
82    credentials: OnceLock<Arc<ConnectionCredentials>>,
83}
84
85impl Drop for ConnectionInner {
86    fn drop(&mut self) {
87        // Notify anyone waiting that the connection is going away. Since we're being dropped, it's
88        // not possible for any new listeners to be created after this notification, so this is
89        // race-free.
90        self.drop_event.notify(usize::MAX);
91    }
92}
93
94type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Message>>)>;
95
96pub(crate) type MsgBroadcaster = Broadcaster<Result<Message>>;
97
98/// A D-Bus connection.
99///
100/// A connection to a D-Bus bus, or a direct peer.
101///
102/// Once created, the connection is authenticated and negotiated and messages can be sent or
103/// received, such as [method calls] or [signals].
104///
105/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
106/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
107/// [`macro@crate::proxy`] and [`macro@crate::interface`] macros instead of doing it directly on a
108/// `Connection`.
109///
110/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
111/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
112/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
113/// [`Connection::object_server`].
114///
115/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
116/// data is not cloned. This makes it very convenient to share the connection between different
117/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
118/// so you can send and share a connection instance across threads as well.
119///
120/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
121/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
122/// method. When the queue is full, no more messages can be received until room is created for more.
123/// This is why it's important to ensure that all [`crate::MessageStream`] and
124/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
125/// respectively.
126///
127/// For sending messages you can use the [`Connection::send`] method.
128///
129/// To gracefully close a connection while waiting for any outstanding method calls to complete,
130/// use [`Connection::graceful_shutdown`]. To immediately close a connection in a way that will
131/// disrupt any outstanding method calls, use [`Connection::close`]. If you do not need the
132/// shutdown to be immediate and do not care about waiting for outstanding method calls, you can
133/// also simply drop the `Connection` instance, which will act similarly to spawning
134/// `graceful_shutdown` in the background.
135///
136/// [method calls]: struct.Connection.html#method.call_method
137/// [signals]: struct.Connection.html#method.emit_signal
138/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
139/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
140///
141/// ### Examples
142///
143/// #### Get the session bus ID
144///
145/// ```
146/// # zbus::block_on(async {
147/// use zbus::Connection;
148///
149/// let connection = Connection::session().await?;
150///
151/// let reply_body = connection
152///     .call_method(
153///         Some("org.freedesktop.DBus"),
154///         "/org/freedesktop/DBus",
155///         Some("org.freedesktop.DBus"),
156///         "GetId",
157///         &(),
158///     )
159///     .await?
160///     .body();
161///
162/// let id: &str = reply_body.deserialize()?;
163/// println!("Unique ID of the bus: {}", id);
164/// # Ok::<(), zbus::Error>(())
165/// # }).unwrap();
166/// ```
167///
168/// #### Monitoring all messages
169///
170/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
171///
172/// ```rust,no_run
173/// # zbus::block_on(async {
174/// use futures_util::stream::TryStreamExt;
175/// use zbus::{Connection, MessageStream};
176///
177/// let connection = Connection::session().await?;
178///
179/// connection
180///     .call_method(
181///         Some("org.freedesktop.DBus"),
182///         "/org/freedesktop/DBus",
183///         Some("org.freedesktop.DBus.Monitoring"),
184///         "BecomeMonitor",
185///         &(&[] as &[&str], 0u32),
186///     )
187///     .await?;
188///
189/// let mut stream = MessageStream::from(connection);
190/// while let Some(msg) = stream.try_next().await? {
191///     println!("Got message: {}", msg);
192/// }
193///
194/// # Ok::<(), zbus::Error>(())
195/// # }).unwrap();
196/// ```
197///
198/// This should print something like:
199///
200/// ```console
201/// Got message: Signal NameAcquired from org.freedesktop.DBus
202/// Got message: Signal NameLost from org.freedesktop.DBus
203/// Got message: Method call GetConnectionUnixProcessID from :1.1324
204/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
205///              Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
206/// Got message: Method call AddMatch from :1.918
207/// Got message: Method return from org.freedesktop.DBus
208/// ```
209///
210/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
211#[derive(Clone, Debug)]
212#[must_use = "Dropping a `Connection` will close the underlying socket."]
213pub struct Connection {
214    pub(crate) inner: Arc<ConnectionInner>,
215}
216
217/// A method call whose completion can be awaited or joined with other streams.
218///
219/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
220/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
221/// population whose task is scheduled later.
222#[derive(Debug)]
223pub(crate) struct PendingMethodCall {
224    stream: Option<MessageStream>,
225    serial: NonZeroU32,
226}
227
228impl Future for PendingMethodCall {
229    type Output = Result<Message>;
230
231    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232        self.poll_before(cx, None).map(|ret| {
233            ret.map(|(_, r)| r).unwrap_or_else(|| {
234                Err(crate::Error::InputOutput(
235                    io::Error::new(ErrorKind::BrokenPipe, "socket closed").into(),
236                ))
237            })
238        })
239    }
240}
241
242impl OrderedFuture for PendingMethodCall {
243    type Output = Result<Message>;
244    type Ordering = zbus::message::Sequence;
245
246    fn poll_before(
247        self: Pin<&mut Self>,
248        cx: &mut Context<'_>,
249        before: Option<&Self::Ordering>,
250    ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
251        let this = self.get_mut();
252        if let Some(stream) = &mut this.stream {
253            loop {
254                match Pin::new(&mut *stream).poll_next_before(cx, before) {
255                    Poll::Ready(PollResult::Item {
256                        data: Ok(msg),
257                        ordering,
258                    }) => {
259                        if msg.header().reply_serial() != Some(this.serial) {
260                            continue;
261                        }
262                        let res = match msg.message_type() {
263                            Type::Error => Err(msg.into()),
264                            Type::MethodReturn => Ok(msg),
265                            _ => continue,
266                        };
267                        this.stream = None;
268                        return Poll::Ready(Some((ordering, res)));
269                    }
270                    Poll::Ready(PollResult::Item {
271                        data: Err(e),
272                        ordering,
273                    }) => {
274                        return Poll::Ready(Some((ordering, Err(e))));
275                    }
276
277                    Poll::Ready(PollResult::NoneBefore) => {
278                        return Poll::Ready(None);
279                    }
280                    Poll::Ready(PollResult::Terminated) => {
281                        return Poll::Ready(None);
282                    }
283                    Poll::Pending => return Poll::Pending,
284                }
285            }
286        }
287        Poll::Ready(None)
288    }
289}
290
291impl Connection {
292    /// Send `msg` to the peer.
293    pub async fn send(&self, msg: &Message) -> Result<()> {
294        #[cfg(unix)]
295        if !msg.data().fds().is_empty() && !self.inner.cap_unix_fd {
296            return Err(Error::Unsupported);
297        }
298
299        self.inner.activity_event.notify(usize::MAX);
300        let mut write = self.inner.socket_write.lock().await;
301
302        write.send_message(msg).await
303    }
304
305    /// Send a method call.
306    ///
307    /// Create a method-call message, send it over the connection, then wait for the reply.
308    ///
309    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
310    /// error replies are returned as [`Error::MethodError`].
311    pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
312        &self,
313        destination: Option<D>,
314        path: P,
315        interface: Option<I>,
316        method_name: M,
317        body: &B,
318    ) -> Result<Message>
319    where
320        D: TryInto<BusName<'d>>,
321        P: TryInto<ObjectPath<'p>>,
322        I: TryInto<InterfaceName<'i>>,
323        M: TryInto<MemberName<'m>>,
324        D::Error: Into<Error>,
325        P::Error: Into<Error>,
326        I::Error: Into<Error>,
327        M::Error: Into<Error>,
328        B: serde::ser::Serialize + zvariant::DynamicType,
329    {
330        let method = self
331            .call_method_raw(
332                destination,
333                path,
334                interface,
335                method_name,
336                BitFlags::empty(),
337                body,
338            )
339            .await?
340            .expect("no reply");
341
342        if let Some(tout) = self.method_timeout() {
343            timeout(method, tout).await
344        } else {
345            method.await
346        }
347    }
348
349    /// Send a method call.
350    ///
351    /// Send the given message, which must be a method call, over the connection and return an
352    /// object that allows the reply to be retrieved.  Typically you'd want to use
353    /// [`Connection::call_method`] instead.
354    ///
355    /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
356    /// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
357    ///
358    /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
359    pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
360        &self,
361        destination: Option<D>,
362        path: P,
363        interface: Option<I>,
364        method_name: M,
365        flags: BitFlags<Flags>,
366        body: &B,
367    ) -> Result<Option<PendingMethodCall>>
368    where
369        D: TryInto<BusName<'d>>,
370        P: TryInto<ObjectPath<'p>>,
371        I: TryInto<InterfaceName<'i>>,
372        M: TryInto<MemberName<'m>>,
373        D::Error: Into<Error>,
374        P::Error: Into<Error>,
375        I::Error: Into<Error>,
376        M::Error: Into<Error>,
377        B: serde::ser::Serialize + zvariant::DynamicType,
378    {
379        let _permit = acquire_serial_num_semaphore().await;
380
381        let mut builder = Message::method_call(path, method_name)?;
382        if let Some(sender) = self.unique_name() {
383            builder = builder.sender(sender)?
384        }
385        if let Some(destination) = destination {
386            builder = builder.destination(destination)?
387        }
388        if let Some(interface) = interface {
389            builder = builder.interface(interface)?
390        }
391        for flag in flags {
392            builder = builder.with_flags(flag)?;
393        }
394        let msg = builder.build(body)?;
395
396        let msg_receiver = self.inner.method_return_receiver.activate_cloned();
397        let stream = Some(MessageStream::for_subscription_channel(
398            msg_receiver,
399            // This is a lie but we only use the stream internally so it's fine.
400            None,
401            self,
402        ));
403        let serial = msg.primary_header().serial_num();
404        self.send(&msg).await?;
405        if flags.contains(Flags::NoReplyExpected) {
406            Ok(None)
407        } else {
408            Ok(Some(PendingMethodCall { stream, serial }))
409        }
410    }
411
412    /// Emit a signal.
413    ///
414    /// Create a signal message, and send it over the connection.
415    pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
416        &self,
417        destination: Option<D>,
418        path: P,
419        interface: I,
420        signal_name: M,
421        body: &B,
422    ) -> Result<()>
423    where
424        D: TryInto<BusName<'d>>,
425        P: TryInto<ObjectPath<'p>>,
426        I: TryInto<InterfaceName<'i>>,
427        M: TryInto<MemberName<'m>>,
428        D::Error: Into<Error>,
429        P::Error: Into<Error>,
430        I::Error: Into<Error>,
431        M::Error: Into<Error>,
432        B: serde::ser::Serialize + zvariant::DynamicType,
433    {
434        let _permit = acquire_serial_num_semaphore().await;
435
436        let mut b = Message::signal(path, interface, signal_name)?;
437        if let Some(sender) = self.unique_name() {
438            b = b.sender(sender)?;
439        }
440        if let Some(destination) = destination {
441            b = b.destination(destination)?;
442        }
443        let m = b.build(body)?;
444
445        self.send(&m).await
446    }
447
448    /// Reply to a message.
449    ///
450    /// Given an existing message (likely a method call), send a reply back to the caller with the
451    /// given `body`.
452    pub async fn reply<B>(&self, call: &zbus::message::Header<'_>, body: &B) -> Result<()>
453    where
454        B: serde::ser::Serialize + zvariant::DynamicType,
455    {
456        let _permit = acquire_serial_num_semaphore().await;
457
458        let mut b = Message::method_return(call)?;
459        if let Some(sender) = self.unique_name() {
460            b = b.sender(sender)?;
461        }
462        let m = b.build(body)?;
463        self.send(&m).await
464    }
465
466    /// Reply an error to a message.
467    ///
468    /// Given an existing message (likely a method call), send an error reply back to the caller
469    /// with the given `error_name` and `body`.
470    pub async fn reply_error<'e, E, B>(
471        &self,
472        call: &zbus::message::Header<'_>,
473        error_name: E,
474        body: &B,
475    ) -> Result<()>
476    where
477        B: serde::ser::Serialize + zvariant::DynamicType,
478        E: TryInto<ErrorName<'e>>,
479        E::Error: Into<Error>,
480    {
481        let _permit = acquire_serial_num_semaphore().await;
482
483        let mut b = Message::error(call, error_name)?;
484        if let Some(sender) = self.unique_name() {
485            b = b.sender(sender)?;
486        }
487        let m = b.build(body)?;
488        self.send(&m).await
489    }
490
491    /// Reply an error to a message.
492    ///
493    /// Given an existing message (likely a method call), send an error reply back to the caller
494    /// using one of the standard interface reply types.
495    pub async fn reply_dbus_error(
496        &self,
497        call: &zbus::message::Header<'_>,
498        err: impl DBusError,
499    ) -> Result<()> {
500        let _permit = acquire_serial_num_semaphore().await;
501
502        let m = err.create_reply(call)?;
503        self.send(&m).await
504    }
505
506    /// Register a well-known name for this connection.
507    ///
508    /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
509    /// name (if requested) is used for self-identification.
510    ///
511    /// You can request multiple names for the same connection. Use [`Connection::release_name`] for
512    /// deregistering names registered through this method.
513    ///
514    /// Note that exclusive ownership without queueing is requested (using
515    /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
516    /// is the most typical case. If that is not what you want, you should use
517    /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
518    /// **after** you've set up your service implementation with the `ObjectServer`).
519    ///
520    /// # Caveats
521    ///
522    /// The associated `ObjectServer` will only handle method calls destined for the unique name of
523    /// this connection or any of the registered well-known names. If no well-known name is
524    /// registered, the method calls destined to all well-known names will be handled.
525    ///
526    /// Since names registered through any other means than `Connection` or [`Builder`]
527    /// API are not known to the connection, method calls destined to those names will only be
528    /// handled by the associated `ObjectServer` if none of the names are registered through
529    /// `Connection*` API. Simply put, either register all the names through `Connection*` API or
530    /// none of them.
531    ///
532    /// # Errors
533    ///
534    /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
535    pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
536    where
537        W: TryInto<WellKnownName<'w>>,
538        W::Error: Into<Error>,
539    {
540        self.request_name_with_flags(well_known_name, BitFlags::default())
541            .await
542            .map(|_| ())
543    }
544
545    /// Register a well-known name for this connection.
546    ///
547    /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
548    /// requesting the name.
549    ///
550    /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
551    /// queue, you can use [`crate::fdo::NameAcquiredStream`] to be notified when the name is
552    /// acquired. A queued name request can be cancelled using [`Connection::release_name`].
553    ///
554    /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
555    /// lost if another peer requests the same name. You can use [`crate::fdo::NameLostStream`] to
556    /// be notified when the name is lost
557    ///
558    /// # Example
559    ///
560    /// ```
561    /// #
562    /// # zbus::block_on(async {
563    /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
564    /// use enumflags2::BitFlags;
565    /// use futures_util::stream::StreamExt;
566    ///
567    /// let name = "org.freedesktop.zbus.QueuedNameTest";
568    /// let conn1 = Connection::session().await?;
569    /// // This should just work right away.
570    /// conn1.request_name_with_flags(name, RequestNameFlags::DoNotQueue.into()).await?;
571    ///
572    /// let conn2 = Connection::session().await?;
573    /// // A second request from the another connection will fail with `DoNotQueue` flag, which is
574    /// // implicit with `request_name` method.
575    /// assert!(conn2.request_name(name).await.is_err());
576    ///
577    /// // Now let's try w/o `DoNotQueue` and we should be queued.
578    /// let reply = conn2
579    ///     .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
580    ///     .await?;
581    /// assert_eq!(reply, RequestNameReply::InQueue);
582    /// // Another request should just give us the same response.
583    /// let reply = conn2
584    ///     // The flags on subsequent requests will however be ignored.
585    ///     .request_name_with_flags(name, BitFlags::empty())
586    ///     .await?;
587    /// assert_eq!(reply, RequestNameReply::InQueue);
588    /// let mut acquired_stream = DBusProxy::new(&conn2)
589    ///     .await?
590    ///     .receive_name_acquired()
591    ///     .await?;
592    /// assert!(conn1.release_name(name).await?);
593    /// // This would have waited forever if `conn1` hadn't just release the name.
594    /// let acquired = acquired_stream.next().await.unwrap();
595    /// assert_eq!(acquired.args().unwrap().name, name);
596    ///
597    /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
598    /// // able to take it back.
599    /// let mut lost_stream = DBusProxy::new(&conn2)
600    ///     .await?
601    ///     .receive_name_lost()
602    ///     .await?;
603    /// conn1.request_name(name).await?;
604    /// let lost = lost_stream.next().await.unwrap();
605    /// assert_eq!(lost.args().unwrap().name, name);
606    ///
607    /// # Ok::<(), zbus::Error>(())
608    /// # }).unwrap();
609    /// ```
610    ///
611    /// # Caveats
612    ///
613    /// * Same as that of [`Connection::request_name`].
614    /// * If you wish to track changes to name ownership after this call, make sure that the
615    ///   [`crate::fdo::NameAcquired`] and/or [`crate::fdo::NameLostStream`] instance(s) are created
616    ///   **before** calling this method. Otherwise, you may loose the signal if it's emitted after
617    ///   this call but just before the stream instance get created.
618    pub async fn request_name_with_flags<'w, W>(
619        &self,
620        well_known_name: W,
621        flags: BitFlags<RequestNameFlags>,
622    ) -> Result<RequestNameReply>
623    where
624        W: TryInto<WellKnownName<'w>>,
625        W::Error: Into<Error>,
626    {
627        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
628
629        // Warn if requesting a name before setting up the object server, as this can cause
630        // method calls to be lost.
631        if self.is_bus() && self.inner.object_server.get().is_none() {
632            warn!(
633                "Requesting name `{well_known_name}` before setting up the object server. \
634                Method calls arriving before interfaces are registered may be lost. \
635                Consider using `connection::Builder::serve_at()` and `::name()` instead.",
636            );
637        }
638        // We keep the lock until the end of this function so that the (possibly) spawned task
639        // doesn't end up accessing the name entry before it's inserted.
640        let mut names = self.inner.registered_names.lock().await;
641
642        match names.get(&well_known_name) {
643            Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
644            Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
645            None => (),
646        }
647
648        if !self.is_bus() {
649            names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
650
651            return Ok(RequestNameReply::PrimaryOwner);
652        }
653
654        let acquired_match_rule = MatchRule::fdo_signal_builder("NameAcquired")
655            .arg(0, well_known_name.as_ref())
656            .unwrap()
657            .build();
658        let mut acquired_stream = self.add_match(acquired_match_rule.into(), None).await?;
659        let lost_match_rule = MatchRule::fdo_signal_builder("NameLost")
660            .arg(0, well_known_name.as_ref())
661            .unwrap()
662            .build();
663        let mut lost_stream = self.add_match(lost_match_rule.into(), None).await?;
664        let reply = self
665            .call_method(
666                Some("org.freedesktop.DBus"),
667                "/org/freedesktop/DBus",
668                Some("org.freedesktop.DBus"),
669                "RequestName",
670                &(well_known_name.clone(), flags),
671            )
672            .await?
673            .body()
674            .deserialize::<RequestNameReply>()?;
675        let lost_task_name = format!("monitor_name_lost{{name={well_known_name}}}");
676        let lost_task_name_span = info_span!("monitor_name_lost", name = %well_known_name);
677        let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
678            let weak_conn = WeakConnection::from(self);
679            let well_known_name = well_known_name.to_owned();
680            Some(
681                async move {
682                    loop {
683                        let signal = lost_stream.next().await;
684                        let inner = match weak_conn.upgrade() {
685                            Some(conn) => conn.inner.clone(),
686                            None => break,
687                        };
688
689                        match signal {
690                            Some(signal) => match signal {
691                                Ok(_) => {
692                                    tracing::info!(
693                                        "Connection `{}` lost name `{}`",
694                                        // SAFETY: This is bus connection so unique name can't be
695                                        // None.
696                                        inner.unique_name.get().unwrap(),
697                                        well_known_name
698                                    );
699                                    inner.registered_names.lock().await.remove(&well_known_name);
700
701                                    break;
702                                }
703                                Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
704                            },
705                            None => {
706                                trace!("`NameLost` signal stream closed");
707                                // This is a very strange state we end up in. Now the name is
708                                // question remains in the queue
709                                // forever. Maybe we can do better here but I
710                                // think it's a very unlikely scenario anyway.
711                                //
712                                // Can happen if the connection is lost/dropped but then the whole
713                                // `Connection` instance will go away soon anyway and hence this
714                                // strange state along with it.
715                                break;
716                            }
717                        }
718                    }
719                }
720                .instrument(lost_task_name_span),
721            )
722        } else {
723            None
724        };
725        let status = match reply {
726            RequestNameReply::InQueue => {
727                let weak_conn = WeakConnection::from(self);
728                let well_known_name = well_known_name.to_owned();
729                let task_name = format!("monitor_name_acquired{{name={well_known_name}}}");
730                let task_name_span = info_span!("monitor_name_acquired", name = %well_known_name);
731                let task = self.executor().spawn(
732                    async move {
733                        loop {
734                            let signal = acquired_stream.next().await;
735                            let inner = match weak_conn.upgrade() {
736                                Some(conn) => conn.inner.clone(),
737                                None => break,
738                            };
739                            match signal {
740                                Some(signal) => match signal {
741                                    Ok(_) => {
742                                        let mut names = inner.registered_names.lock().await;
743                                        if let Some(status) = names.get_mut(&well_known_name) {
744                                            let task = name_lost_fut.map(|fut| {
745                                                inner.executor.spawn(fut, &lost_task_name)
746                                            });
747                                            *status = NameStatus::Owner(task);
748
749                                            break;
750                                        }
751                                        // else the name was released in the meantime. :shrug:
752                                    }
753                                    Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
754                                },
755                                None => {
756                                    trace!("`NameAcquired` signal stream closed");
757                                    // See comment above for similar state in case of `NameLost`
758                                    // stream.
759                                    break;
760                                }
761                            }
762                        }
763                    }
764                    .instrument(task_name_span),
765                    &task_name,
766                );
767
768                NameStatus::Queued(task)
769            }
770            RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
771                let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
772
773                NameStatus::Owner(task)
774            }
775            RequestNameReply::Exists => return Err(Error::NameTaken),
776        };
777
778        names.insert(well_known_name.to_owned(), status);
779
780        Ok(reply)
781    }
782
783    /// Deregister a previously registered well-known name for this service on the bus.
784    ///
785    /// Use this method to deregister a well-known name, registered through
786    /// [`Connection::request_name`].
787    ///
788    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
789    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
790    /// was not previously registered or already deregistered.
791    pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
792    where
793        W: TryInto<WellKnownName<'w>>,
794        W::Error: Into<Error>,
795    {
796        let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
797        let mut names = self.inner.registered_names.lock().await;
798        // FIXME: Should be possible to avoid cloning/allocation here
799        if names.remove(&well_known_name.to_owned()).is_none() {
800            return Ok(false);
801        };
802
803        if !self.is_bus() {
804            return Ok(true);
805        }
806
807        self.call_method(
808            Some("org.freedesktop.DBus"),
809            "/org/freedesktop/DBus",
810            Some("org.freedesktop.DBus"),
811            "ReleaseName",
812            &well_known_name,
813        )
814        .await?
815        .body()
816        .deserialize::<ReleaseNameReply>()
817        .map(|r| r == ReleaseNameReply::Released)
818    }
819
820    /// Check if `self` is a connection to a message bus.
821    ///
822    /// This will return `false` for p2p connections. When the `p2p` feature is disabled, this will
823    /// always return `true`.
824    pub fn is_bus(&self) -> bool {
825        #[cfg(feature = "p2p")]
826        {
827            self.inner.bus_conn
828        }
829        #[cfg(not(feature = "p2p"))]
830        {
831            true
832        }
833    }
834
835    /// The unique name of the connection, if set/applicable.
836    ///
837    /// The unique name is assigned by the message bus, or set manually using
838    /// [`Connection::set_unique_name`].
839    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
840        self.inner.unique_name.get()
841    }
842
843    /// Set the unique name of the connection (if not already set).
844    ///
845    /// This is mainly provided for bus implementations. All other users should not need to use this
846    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
847    ///
848    /// # Panics
849    ///
850    /// This method panics if the unique name is already set. It will always panic if the connection
851    /// is to a message bus as it's the bus that assigns peers their unique names.
852    #[cfg(feature = "bus-impl")]
853    pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
854    where
855        U: TryInto<OwnedUniqueName>,
856        U::Error: Into<Error>,
857    {
858        let name = unique_name.try_into().map_err(Into::into)?;
859        self.set_unique_name_(name);
860
861        Ok(())
862    }
863
864    /// The capacity of the main (unfiltered) queue.
865    pub fn max_queued(&self) -> usize {
866        self.inner.msg_receiver.capacity()
867    }
868
869    /// Set the capacity of the main (unfiltered) queue.
870    pub fn set_max_queued(&mut self, max: usize) {
871        self.inner.msg_receiver.clone().set_capacity(max);
872    }
873
874    /// The server's GUID.
875    pub fn server_guid(&self) -> &OwnedGuid {
876        &self.inner.server_guid
877    }
878
879    /// The underlying executor.
880    ///
881    /// When a connection is built with internal_executor set to false, zbus will not spawn a
882    /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
883    /// Failure to do so will result in hangs.
884    ///
885    /// # Examples
886    ///
887    /// Here is how one would typically run the zbus executor through tokio's scheduler:
888    ///
889    /// ```
890    /// use zbus::connection::Builder;
891    /// use tokio::task::spawn;
892    ///
893    /// # struct SomeIface;
894    /// #
895    /// # #[zbus::interface]
896    /// # impl SomeIface {
897    /// # }
898    /// #
899    /// #[tokio::main]
900    /// async fn main() {
901    ///     let conn = Builder::session()
902    ///         .unwrap()
903    ///         .internal_executor(false)
904    /// #         // This is only for testing a deadlock that used to happen with this combo.
905    /// #         .serve_at("/some/iface", SomeIface)
906    /// #         .unwrap()
907    ///         .build()
908    ///         .await
909    ///         .unwrap();
910    ///     {
911    ///        let conn = conn.clone();
912    ///        spawn(async move {
913    ///            loop {
914    ///                conn.executor().tick().await;
915    ///            }
916    ///        });
917    ///     }
918    ///
919    ///     // All your other async code goes here.
920    /// }
921    /// ```
922    ///
923    /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
924    /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
925    /// `tokio` feature. You should also disable the (default) `async-io` feature in your
926    /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
927    /// `async-io` was required to enable tight `tokio` integration.
928    ///
929    /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
930    pub fn executor(&self) -> &Executor<'static> {
931        &self.inner.executor
932    }
933
934    /// Get a reference to the associated [`ObjectServer`].
935    ///
936    /// The `ObjectServer` is created on-demand.
937    ///
938    /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
939    /// received on `self`. If you want to manually reply to method calls, do not use this
940    /// method (or any of the `ObjectServer` related API).
941    pub fn object_server(&self) -> &ObjectServer {
942        self.ensure_object_server(true)
943    }
944
945    pub(crate) fn ensure_object_server(&self, start: bool) -> &ObjectServer {
946        self.inner
947            .object_server
948            .get_or_init(move || self.setup_object_server(start, None))
949    }
950
951    fn setup_object_server(&self, start: bool, started_event: Option<Event>) -> ObjectServer {
952        if start {
953            self.start_object_server(started_event);
954        }
955
956        ObjectServer::new(self)
957    }
958
959    #[instrument(skip(self))]
960    pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
961        self.inner.object_server_dispatch_task.get_or_init(|| {
962            trace!("starting ObjectServer task");
963            let weak_conn = WeakConnection::from(self);
964
965            self.inner.executor.spawn(
966                async move {
967                    let mut stream = match weak_conn.upgrade() {
968                        Some(conn) => {
969                            let mut builder = MatchRule::builder().msg_type(Type::MethodCall);
970                            if let Some(unique_name) = conn.unique_name() {
971                                builder = builder.destination(&**unique_name).expect("unique name");
972                            }
973                            let rule = builder.build();
974                            match conn.add_match(rule.into(), None).await {
975                                Ok(stream) => stream,
976                                Err(e) => {
977                                    // Very unlikely but can happen I guess if connection is closed.
978                                    debug!("Failed to create message stream: {}", e);
979
980                                    return;
981                                }
982                            }
983                        }
984                        None => {
985                            trace!("Connection is gone, stopping associated object server task");
986
987                            return;
988                        }
989                    };
990                    if let Some(started_event) = started_event {
991                        started_event.notify(1);
992                    }
993
994                    trace!("waiting for incoming method call messages..");
995                    while let Some(msg) = stream.next().await.and_then(|m| {
996                        if let Err(e) = &m {
997                            debug!("Error while reading from object server stream: {:?}", e);
998                        }
999                        m.ok()
1000                    }) {
1001                        if let Some(conn) = weak_conn.upgrade() {
1002                            let hdr = msg.header();
1003                            // If we're connected to a bus, skip the destination check as the
1004                            // server will only send us method calls destined to us.
1005                            if !conn.is_bus() {
1006                                match hdr.destination() {
1007                                    // Unique name is already checked by the match rule.
1008                                    Some(BusName::Unique(_)) | None => (),
1009                                    Some(BusName::WellKnown(dest)) => {
1010                                        let names = conn.inner.registered_names.lock().await;
1011                                        // destination doesn't matter if no name has been registered
1012                                        // (probably means the name is registered through external
1013                                        // means).
1014                                        if !names.is_empty() && !names.contains_key(dest) {
1015                                            trace!(
1016                                                "Got a method call for a different destination: {}",
1017                                                dest
1018                                            );
1019
1020                                            continue;
1021                                        }
1022                                    }
1023                                }
1024                            }
1025                            let server = conn.object_server();
1026                            if let Err(e) = server.dispatch_call(&msg, &hdr).await {
1027                                debug!(
1028                                    "Error dispatching message. Message: {:?}, error: {:?}",
1029                                    msg, e
1030                                );
1031                            }
1032                        } else {
1033                            // If connection is completely gone, no reason to keep running the task
1034                            // anymore.
1035                            trace!("Connection is gone, stopping associated object server task");
1036                            break;
1037                        }
1038                    }
1039                }
1040                .instrument(info_span!("obj_server_task")),
1041                "obj_server_task",
1042            )
1043        });
1044    }
1045
1046    pub(crate) async fn add_match(
1047        &self,
1048        rule: OwnedMatchRule,
1049        max_queued: Option<usize>,
1050    ) -> Result<Receiver<Result<Message>>> {
1051        use std::collections::hash_map::Entry;
1052
1053        if self.inner.msg_senders.lock().await.is_empty() {
1054            // This only happens if socket reader task has errored out.
1055            return Err(Error::InputOutput(Arc::new(io::Error::new(
1056                io::ErrorKind::BrokenPipe,
1057                "Socket reader task has errored out",
1058            ))));
1059        }
1060
1061        let mut subscriptions = self.inner.subscriptions.lock().await;
1062        let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1063        match subscriptions.entry(rule.clone()) {
1064            Entry::Vacant(e) => {
1065                let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED);
1066                let (sender, mut receiver) = broadcast(max_queued);
1067                receiver.set_await_active(false);
1068                if self.is_bus() && msg_type == Type::Signal {
1069                    self.call_method(
1070                        Some("org.freedesktop.DBus"),
1071                        "/org/freedesktop/DBus",
1072                        Some("org.freedesktop.DBus"),
1073                        "AddMatch",
1074                        &e.key(),
1075                    )
1076                    .await?;
1077                }
1078                e.insert((1, receiver.clone().deactivate()));
1079                self.inner
1080                    .msg_senders
1081                    .lock()
1082                    .await
1083                    .insert(Some(rule), sender);
1084
1085                Ok(receiver)
1086            }
1087            Entry::Occupied(mut e) => {
1088                let (num_subscriptions, receiver) = e.get_mut();
1089                *num_subscriptions += 1;
1090                if let Some(max_queued) = max_queued {
1091                    if max_queued > receiver.capacity() {
1092                        receiver.set_capacity(max_queued);
1093                    }
1094                }
1095
1096                Ok(receiver.activate_cloned())
1097            }
1098        }
1099    }
1100
1101    pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> {
1102        use std::collections::hash_map::Entry;
1103        let mut subscriptions = self.inner.subscriptions.lock().await;
1104        // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
1105        // (both here and in add_match)
1106        let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1107        match subscriptions.entry(rule) {
1108            Entry::Vacant(_) => Ok(false),
1109            Entry::Occupied(mut e) => {
1110                let rule = e.key().inner().clone();
1111                e.get_mut().0 -= 1;
1112                if e.get().0 == 0 {
1113                    if self.is_bus() && msg_type == Type::Signal {
1114                        self.call_method(
1115                            Some("org.freedesktop.DBus"),
1116                            "/org/freedesktop/DBus",
1117                            Some("org.freedesktop.DBus"),
1118                            "RemoveMatch",
1119                            &rule,
1120                        )
1121                        .await?;
1122                    }
1123                    e.remove();
1124                    self.inner
1125                        .msg_senders
1126                        .lock()
1127                        .await
1128                        .remove(&Some(rule.into()));
1129                }
1130                Ok(true)
1131            }
1132        }
1133    }
1134
1135    pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
1136        let conn = self.clone();
1137        let task_name = format!("Remove match `{}`", *rule);
1138        let remove_match =
1139            async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
1140        self.inner.executor.spawn(remove_match, &task_name).detach()
1141    }
1142
1143    /// The method_timeout (if any). See [Builder::method_timeout] for details.
1144    pub fn method_timeout(&self) -> Option<Duration> {
1145        self.inner.method_timeout
1146    }
1147
1148    pub(crate) async fn new(
1149        auth: Authenticated,
1150        #[allow(unused)] bus_connection: bool,
1151        executor: Executor<'static>,
1152        method_timeout: Option<Duration>,
1153    ) -> Result<Self> {
1154        #[cfg(unix)]
1155        let cap_unix_fd = auth.cap_unix_fd;
1156
1157        macro_rules! create_msg_broadcast_channel {
1158            ($size:expr) => {{
1159                let (msg_sender, msg_receiver) = broadcast($size);
1160                let mut msg_receiver = msg_receiver.deactivate();
1161                msg_receiver.set_await_active(false);
1162
1163                (msg_sender, msg_receiver)
1164            }};
1165        }
1166        // The unfiltered message channel.
1167        let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED);
1168        let mut msg_senders = HashMap::new();
1169        msg_senders.insert(None, msg_sender);
1170
1171        // The special method return & error channel.
1172        let (method_return_sender, method_return_receiver) =
1173            create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED);
1174        let rule = MatchRule::builder()
1175            .msg_type(Type::MethodReturn)
1176            .build()
1177            .into();
1178        msg_senders.insert(Some(rule), method_return_sender.clone());
1179        let rule = MatchRule::builder().msg_type(Type::Error).build().into();
1180        msg_senders.insert(Some(rule), method_return_sender);
1181        let msg_senders = Arc::new(Mutex::new(msg_senders));
1182        let subscriptions = Mutex::new(HashMap::new());
1183
1184        let connection = Self {
1185            inner: Arc::new(ConnectionInner {
1186                activity_event: Arc::new(Event::new()),
1187                socket_write: Mutex::new(auth.socket_write),
1188                server_guid: auth.server_guid,
1189                #[cfg(unix)]
1190                cap_unix_fd,
1191                #[cfg(feature = "p2p")]
1192                bus_conn: bus_connection,
1193                unique_name: OnceLock::new(),
1194                subscriptions,
1195                object_server: OnceLock::new(),
1196                object_server_dispatch_task: OnceLock::new(),
1197                executor,
1198                socket_reader_task: OnceLock::new(),
1199                msg_senders,
1200                msg_receiver,
1201                method_return_receiver,
1202                registered_names: Mutex::new(HashMap::new()),
1203                drop_event: Event::new(),
1204                method_timeout,
1205                credentials: OnceLock::new(),
1206            }),
1207        };
1208
1209        if let Some(unique_name) = auth.unique_name {
1210            connection.set_unique_name_(unique_name);
1211        }
1212
1213        Ok(connection)
1214    }
1215
1216    /// Create a `Connection` to the session/user message bus.
1217    pub async fn session() -> Result<Self> {
1218        Builder::session()?.build().await
1219    }
1220
1221    /// Create a `Connection` to the system-wide message bus.
1222    pub async fn system() -> Result<Self> {
1223        Builder::system()?.build().await
1224    }
1225
1226    /// Return a listener, notified on various connection activity.
1227    ///
1228    /// This function is meant for the caller to implement idle or timeout on inactivity.
1229    pub fn monitor_activity(&self) -> EventListener {
1230        self.inner.activity_event.listen()
1231    }
1232
1233    /// Return the peer credentials.
1234    ///
1235    /// The fields are populated on the best effort basis. Some or all fields may not even make
1236    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
1237    ///
1238    /// This method caches the credentials on the first call for you.
1239    ///
1240    /// # Caveats
1241    ///
1242    /// Currently `linux_security_label` field is not populated.
1243    pub async fn peer_creds(&self) -> io::Result<&Arc<ConnectionCredentials>> {
1244        let mut socket_write = self.inner.socket_write.lock().await;
1245
1246        // Keeping the `socket_write` lock guard ensures that this isn't racy.
1247        if let Some(creds) = self.inner.credentials.get() {
1248            return Ok(creds);
1249        }
1250
1251        self.inner
1252            .credentials
1253            .set(socket_write.peer_credentials().await.map(Arc::new)?)
1254            .expect("credentials cache set more than once");
1255
1256        Ok(self
1257            .inner
1258            .credentials
1259            .get()
1260            .expect("credentials should have been set"))
1261    }
1262
1263    /// Return the peer credentials.
1264    ///
1265    /// The fields are populated on the best effort basis. Some or all fields may not even make
1266    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
1267    ///
1268    /// # Caveats
1269    ///
1270    /// Currently `linux_security_label` field is not populated.
1271    #[deprecated(since = "5.13.0", note = "Use `peer_creds` instead")]
1272    pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
1273        self.inner
1274            .socket_write
1275            .lock()
1276            .await
1277            .peer_credentials()
1278            .await
1279    }
1280
1281    /// Close the connection.
1282    ///
1283    /// After this call, all reading and writing operations will fail.
1284    pub async fn close(self) -> Result<()> {
1285        self.inner.activity_event.notify(usize::MAX);
1286        self.inner
1287            .socket_write
1288            .lock()
1289            .await
1290            .close()
1291            .await
1292            .map_err(Into::into)
1293    }
1294
1295    /// Gracefully close the connection, waiting for all other references to be dropped.
1296    ///
1297    /// This will not disrupt any incoming or outgoing method calls, and will await their
1298    /// completion.
1299    ///
1300    /// # Caveats
1301    ///
1302    /// * This will not prevent new incoming messages from keeping the connection alive (and
1303    ///   indefinitely delaying this method's completion).
1304    ///
1305    /// * The shutdown will not complete until the underlying connection is fully dropped, so beware
1306    ///   of deadlocks if you are holding any other clones of this `Connection`.
1307    ///
1308    /// # Example
1309    ///
1310    /// ```
1311    /// # use std::error::Error;
1312    /// # use zbus::connection::Builder;
1313    /// # use zbus::interface;
1314    /// #
1315    /// # struct MyInterface;
1316    /// #
1317    /// # #[interface(name = "foo.bar.baz")]
1318    /// # impl MyInterface {
1319    /// #     async fn do_thing(&self) {}
1320    /// # }
1321    /// #
1322    /// # #[tokio::main]
1323    /// # async fn main() -> Result<(), Box<dyn Error>> {
1324    /// let conn = Builder::session()?
1325    ///     .name("foo.bar.baz")?
1326    ///     .serve_at("/foo/bar/baz", MyInterface)?
1327    ///     .build()
1328    ///     .await?;
1329    ///
1330    /// # let some_exit_condition = std::future::ready(());
1331    /// some_exit_condition.await;
1332    ///
1333    /// conn.graceful_shutdown().await;
1334    /// #
1335    /// # Ok(())
1336    /// # }
1337    /// ```
1338    pub async fn graceful_shutdown(self) {
1339        let listener = self.inner.drop_event.listen();
1340        drop(self);
1341        listener.await;
1342    }
1343
1344    pub(crate) fn init_socket_reader(
1345        &self,
1346        socket_read: Box<dyn socket::ReadHalf>,
1347        already_read: Vec<u8>,
1348        #[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
1349    ) {
1350        let inner = &self.inner;
1351        inner
1352            .socket_reader_task
1353            .set(
1354                SocketReader::new(
1355                    socket_read,
1356                    inner.msg_senders.clone(),
1357                    already_read,
1358                    #[cfg(unix)]
1359                    already_received_fds,
1360                    inner.activity_event.clone(),
1361                )
1362                .spawn(&inner.executor),
1363            )
1364            .expect("Attempted to set `socket_reader_task` twice");
1365    }
1366
1367    fn set_unique_name_(&self, name: OwnedUniqueName) {
1368        self.inner
1369            .unique_name
1370            .set(name)
1371            // programmer (probably our) error if this fails.
1372            .expect("unique name already set");
1373    }
1374}
1375
1376#[cfg(feature = "blocking-api")]
1377impl From<crate::blocking::Connection> for Connection {
1378    fn from(conn: crate::blocking::Connection) -> Self {
1379        conn.into_inner()
1380    }
1381}
1382
1383// Internal API that allows keeping a weak connection ref around.
1384#[derive(Debug, Clone)]
1385pub(crate) struct WeakConnection {
1386    inner: Weak<ConnectionInner>,
1387}
1388
1389impl WeakConnection {
1390    /// Upgrade to a Connection.
1391    pub fn upgrade(&self) -> Option<Connection> {
1392        self.inner.upgrade().map(|inner| Connection { inner })
1393    }
1394}
1395
1396impl From<&Connection> for WeakConnection {
1397    fn from(conn: &Connection) -> Self {
1398        Self {
1399            inner: Arc::downgrade(&conn.inner),
1400        }
1401    }
1402}
1403
1404#[derive(Debug)]
1405enum NameStatus {
1406    // The task waits for name lost signal if owner allows replacement.
1407    Owner(#[allow(unused)] Option<Task<()>>),
1408    // The task waits for name acquisition signal.
1409    Queued(#[allow(unused)] Task<()>),
1410}
1411
1412static SERIAL_NUM_SEMAPHORE: Semaphore = Semaphore::new(1);
1413
1414// Make message creation and sending an atomic operation, using an async
1415// semaphore if flatpak portal is detected to workaround an xdg-dbus-proxy issue:
1416//
1417// https://github.com/flatpak/xdg-dbus-proxy/issues/46
1418async fn acquire_serial_num_semaphore() -> Option<SemaphorePermit<'static>> {
1419    if is_flatpak() {
1420        Some(SERIAL_NUM_SEMAPHORE.acquire().await)
1421    } else {
1422        None
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use super::*;
1429    use crate::fdo::DBusProxy;
1430    use ntest::timeout;
1431    use std::{pin::pin, time::Duration};
1432    use test_log::test;
1433
1434    #[cfg(windows)]
1435    #[test]
1436    fn connect_autolaunch_session_bus() {
1437        let addr =
1438            crate::win32::autolaunch_bus_address().expect("Unable to get session bus address");
1439
1440        crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
1441    }
1442
1443    #[cfg(target_os = "macos")]
1444    #[test]
1445    fn connect_launchd_session_bus() {
1446        use crate::address::{Address, Transport, transport::Launchd};
1447        crate::block_on(async {
1448            let addr = Address::from(Transport::Launchd(Launchd::new(
1449                "DBUS_LAUNCHD_SESSION_BUS_SOCKET",
1450            )));
1451            addr.connect().await
1452        })
1453        .expect("Unable to connect to session bus");
1454    }
1455
1456    #[test]
1457    #[timeout(15000)]
1458    fn disconnect_on_drop() {
1459        // Reproducer for https://github.com/z-galaxy/zbus/issues/308 where setting up the
1460        // objectserver would cause the connection to not disconnect on drop.
1461        crate::utils::block_on(test_disconnect_on_drop());
1462    }
1463
1464    async fn test_disconnect_on_drop() {
1465        #[derive(Default)]
1466        struct MyInterface {}
1467
1468        #[crate::interface(name = "dev.peelz.FooBar.Baz")]
1469        impl MyInterface {
1470            fn do_thing(&self) {}
1471        }
1472        let name = "dev.peelz.foobar";
1473        let connection = Builder::session()
1474            .unwrap()
1475            .name(name)
1476            .unwrap()
1477            .serve_at("/dev/peelz/FooBar", MyInterface::default())
1478            .unwrap()
1479            .build()
1480            .await
1481            .unwrap();
1482
1483        let connection2 = Connection::session().await.unwrap();
1484        let dbus = DBusProxy::new(&connection2).await.unwrap();
1485        let mut stream = dbus
1486            .receive_name_owner_changed_with_args(&[(0, name), (2, "")])
1487            .await
1488            .unwrap();
1489
1490        drop(connection);
1491
1492        // If the connection is not dropped, this will hang forever.
1493        stream.next().await.unwrap();
1494
1495        // Let's still make sure the name is gone.
1496        let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap();
1497        assert!(!name_has_owner);
1498    }
1499
1500    #[tokio::test(start_paused = true)]
1501    #[timeout(15000)]
1502    async fn test_graceful_shutdown() {
1503        // If we have a second reference, it should wait until we drop it.
1504        let connection = Connection::session().await.unwrap();
1505        let clone = connection.clone();
1506        let mut shutdown = pin!(connection.graceful_shutdown());
1507        // Due to start_paused above, tokio will auto-advance time once the runtime is idle.
1508        // See https://docs.rs/tokio/latest/tokio/time/fn.pause.html.
1509        tokio::select! {
1510            _ = tokio::time::sleep(Duration::from_secs(u64::MAX)) => {},
1511            _ = &mut shutdown => {
1512                panic!("Graceful shutdown unexpectedly completed");
1513            }
1514        }
1515
1516        drop(clone);
1517        shutdown.await;
1518
1519        // An outstanding method call should also be sufficient to keep the connection alive.
1520        struct GracefulInterface {
1521            method_called: Event,
1522            wait_before_return: Option<EventListener>,
1523            announce_done: Event,
1524        }
1525
1526        #[crate::interface(name = "dev.peelz.TestGracefulShutdown")]
1527        impl GracefulInterface {
1528            async fn do_thing(&mut self) {
1529                self.method_called.notify(1);
1530                if let Some(listener) = self.wait_before_return.take() {
1531                    listener.await;
1532                }
1533                self.announce_done.notify(1);
1534            }
1535        }
1536
1537        let method_called = Event::new();
1538        let method_called_listener = method_called.listen();
1539
1540        let trigger_return = Event::new();
1541        let wait_before_return = Some(trigger_return.listen());
1542
1543        let announce_done = Event::new();
1544        let done_listener = announce_done.listen();
1545
1546        let interface = GracefulInterface {
1547            method_called,
1548            wait_before_return,
1549            announce_done,
1550        };
1551
1552        let name = "dev.peelz.TestGracefulShutdown";
1553        let obj = "/dev/peelz/TestGracefulShutdown";
1554        let connection = Builder::session()
1555            .unwrap()
1556            .name(name)
1557            .unwrap()
1558            .serve_at(obj, interface)
1559            .unwrap()
1560            .build()
1561            .await
1562            .unwrap();
1563
1564        // Call the method from another connection - it won't return until we tell it to.
1565        let client_conn = Connection::session().await.unwrap();
1566        tokio::spawn(async move {
1567            client_conn
1568                .call_method(Some(name), obj, Some(name), "DoThing", &())
1569                .await
1570                .unwrap();
1571        });
1572
1573        // Avoid races - make sure we've actually received the method call before we drop our
1574        // Connection handle.
1575        method_called_listener.await;
1576
1577        let mut shutdown = pin!(connection.graceful_shutdown());
1578        tokio::select! {
1579            _ = tokio::time::sleep(Duration::from_secs(u64::MAX)) => {},
1580            _ = &mut shutdown => {
1581                // While that method call is outstanding, graceful shutdown should not complete.
1582                panic!("Graceful shutdown unexpectedly completed");
1583            }
1584        }
1585
1586        // If we let the call complete, then the shutdown should complete eventually.
1587        trigger_return.notify(1);
1588        shutdown.await;
1589
1590        // The method call should have been allowed to finish properly.
1591        done_listener.await;
1592    }
1593}
1594
1595#[cfg(feature = "p2p")]
1596#[cfg(test)]
1597mod p2p_tests {
1598    use event_listener::Event;
1599    use futures_util::TryStreamExt;
1600    use ntest::timeout;
1601    use test_log::test;
1602    use zvariant::{Endian, NATIVE_ENDIAN};
1603
1604    use super::{Builder, Connection, socket};
1605    use crate::{Guid, Message, MessageStream, Result, conn::AuthMechanism};
1606
1607    // Same numbered client and server are already paired up.
1608    async fn test_p2p(
1609        server1: Connection,
1610        client1: Connection,
1611        server2: Connection,
1612        client2: Connection,
1613    ) -> Result<()> {
1614        let forward1 = {
1615            let stream = MessageStream::from(server1.clone());
1616            let sink = client2.clone();
1617
1618            stream.try_for_each(move |msg| {
1619                let sink = sink.clone();
1620                async move { sink.send(&msg).await }
1621            })
1622        };
1623        let forward2 = {
1624            let stream = MessageStream::from(client2.clone());
1625            let sink = server1.clone();
1626
1627            stream.try_for_each(move |msg| {
1628                let sink = sink.clone();
1629                async move { sink.send(&msg).await }
1630            })
1631        };
1632        let _forward_task = client1.executor().spawn(
1633            async move { futures_util::try_join!(forward1, forward2) },
1634            "forward_task",
1635        );
1636
1637        let server_ready = Event::new();
1638        let server_ready_listener = server_ready.listen();
1639        let client_done = Event::new();
1640        let client_done_listener = client_done.listen();
1641
1642        let server_future = async move {
1643            let mut stream = MessageStream::from(&server2);
1644            server_ready.notify(1);
1645            let method = loop {
1646                let m = stream.try_next().await?.unwrap();
1647                if m.to_string() == "Method call Test" {
1648                    assert_eq!(m.body().deserialize::<u64>().unwrap(), 64);
1649                    break m;
1650                }
1651            };
1652
1653            // Send another message first to check the queueing function on client side.
1654            server2
1655                .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
1656                .await?;
1657            server2.reply(&method.header(), &("yay")).await?;
1658            client_done_listener.await;
1659
1660            Ok(())
1661        };
1662
1663        let client_future = async move {
1664            let mut stream = MessageStream::from(&client1);
1665            server_ready_listener.await;
1666            // We want to set non-native endian to ensure that:
1667            // 1. the message is actually encoded with the specified endian.
1668            // 2. the server side is able to decode it and replies in the same encoding.
1669            let endian = match NATIVE_ENDIAN {
1670                Endian::Little => Endian::Big,
1671                Endian::Big => Endian::Little,
1672            };
1673            let method = Message::method_call("/", "Test")?
1674                .interface("org.zbus.p2p")?
1675                .endian(endian)
1676                .build(&64u64)?;
1677            client1.send(&method).await?;
1678            // Check we didn't miss the signal that was sent during the call.
1679            let m = stream.try_next().await?.unwrap();
1680            client_done.notify(1);
1681            assert_eq!(m.to_string(), "Signal ASignalForYou");
1682            let reply = stream.try_next().await?.unwrap();
1683            assert_eq!(reply.to_string(), "Method return");
1684            // Check if the reply was in the non-native endian.
1685            assert_eq!(Endian::from(reply.primary_header().endian_sig()), endian);
1686            reply.body().deserialize::<String>()
1687        };
1688
1689        let (val, _) = futures_util::try_join!(client_future, server_future,)?;
1690        assert_eq!(val, "yay");
1691
1692        Ok(())
1693    }
1694
1695    #[test]
1696    #[timeout(15000)]
1697    fn tcp_p2p() {
1698        crate::utils::block_on(test_tcp_p2p()).unwrap();
1699    }
1700
1701    async fn test_tcp_p2p() -> Result<()> {
1702        let (server1, client1) = tcp_p2p_pipe().await?;
1703        let (server2, client2) = tcp_p2p_pipe().await?;
1704
1705        test_p2p(server1, client1, server2, client2).await
1706    }
1707
1708    async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
1709        let guid = Guid::generate();
1710
1711        #[cfg(not(feature = "tokio"))]
1712        let (server_conn_builder, client_conn_builder) = {
1713            let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1714            let addr = listener.local_addr().unwrap();
1715            let p1 = std::net::TcpStream::connect(addr).unwrap();
1716            let p0 = listener.incoming().next().unwrap().unwrap();
1717
1718            (
1719                Builder::tcp_stream(p0)
1720                    .server(guid)
1721                    .unwrap()
1722                    .p2p()
1723                    .auth_mechanism(AuthMechanism::Anonymous),
1724                Builder::tcp_stream(p1).p2p(),
1725            )
1726        };
1727
1728        #[cfg(feature = "tokio")]
1729        let (server_conn_builder, client_conn_builder) = {
1730            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1731            let addr = listener.local_addr().unwrap();
1732            let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
1733            let p0 = listener.accept().await.unwrap().0;
1734
1735            (
1736                Builder::tcp_stream(p0)
1737                    .server(guid)
1738                    .unwrap()
1739                    .p2p()
1740                    .auth_mechanism(AuthMechanism::Anonymous),
1741                Builder::tcp_stream(p1).p2p(),
1742            )
1743        };
1744
1745        futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
1746    }
1747
1748    #[cfg(unix)]
1749    #[test]
1750    #[timeout(15000)]
1751    fn unix_p2p() {
1752        crate::utils::block_on(test_unix_p2p()).unwrap();
1753    }
1754
1755    #[cfg(unix)]
1756    async fn test_unix_p2p() -> Result<()> {
1757        let (server1, client1) = unix_p2p_pipe().await?;
1758        let (server2, client2) = unix_p2p_pipe().await?;
1759
1760        test_p2p(server1, client1, server2, client2).await
1761    }
1762
1763    #[cfg(unix)]
1764    async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
1765        #[cfg(not(feature = "tokio"))]
1766        use std::os::unix::net::UnixStream;
1767        #[cfg(feature = "tokio")]
1768        use tokio::net::UnixStream;
1769        #[cfg(all(windows, not(feature = "tokio")))]
1770        use uds_windows::UnixStream;
1771
1772        let guid = Guid::generate();
1773
1774        let (p0, p1) = UnixStream::pair().unwrap();
1775
1776        futures_util::try_join!(
1777            Builder::unix_stream(p1).p2p().build(),
1778            Builder::unix_stream(p0).server(guid).unwrap().p2p().build(),
1779        )
1780    }
1781
1782    #[cfg(any(
1783        all(feature = "vsock", not(feature = "tokio")),
1784        feature = "tokio-vsock"
1785    ))]
1786    #[test]
1787    #[timeout(15000)]
1788    fn vsock_connect() {
1789        let _ = crate::utils::block_on(test_vsock_connect()).unwrap();
1790    }
1791
1792    #[cfg(any(
1793        all(feature = "vsock", not(feature = "tokio")),
1794        feature = "tokio-vsock"
1795    ))]
1796    async fn test_vsock_connect() -> Result<(Connection, Connection)> {
1797        #[cfg(feature = "tokio-vsock")]
1798        use futures_util::StreamExt;
1799
1800        let guid = Guid::generate();
1801
1802        #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1803        let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_LOCAL, u32::MAX)?;
1804        #[cfg(feature = "tokio-vsock")]
1805        let listener = tokio_vsock::VsockListener::bind(tokio_vsock::VsockAddr::new(1, u32::MAX))?;
1806
1807        let addr = listener.local_addr()?;
1808        let addr = format!("vsock:cid={},port={},guid={guid}", addr.cid(), addr.port());
1809
1810        let server = async {
1811            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1812            let server =
1813                crate::Task::spawn_blocking(move || listener.incoming().next(), "").await?;
1814            #[cfg(feature = "tokio-vsock")]
1815            let server = listener.incoming().next().await;
1816            Builder::vsock_stream(server.unwrap()?)
1817                .server(guid)?
1818                .p2p()
1819                .auth_mechanism(AuthMechanism::Anonymous)
1820                .build()
1821                .await
1822        };
1823
1824        let client = crate::connection::Builder::address(addr.as_str())?
1825            .p2p()
1826            .build();
1827
1828        futures_util::try_join!(server, client)
1829    }
1830
1831    #[cfg(any(
1832        all(feature = "vsock", not(feature = "tokio")),
1833        feature = "tokio-vsock"
1834    ))]
1835    #[test]
1836    #[timeout(15000)]
1837    fn vsock_p2p() {
1838        crate::utils::block_on(test_vsock_p2p()).unwrap();
1839    }
1840
1841    #[cfg(any(
1842        all(feature = "vsock", not(feature = "tokio")),
1843        feature = "tokio-vsock"
1844    ))]
1845    async fn test_vsock_p2p() -> Result<()> {
1846        let (server1, client1) = vsock_p2p_pipe().await?;
1847        let (server2, client2) = vsock_p2p_pipe().await?;
1848
1849        test_p2p(server1, client1, server2, client2).await
1850    }
1851
1852    #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1853    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1854        let guid = Guid::generate();
1855
1856        let listener =
1857            vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_LOCAL, u32::MAX).unwrap();
1858        let addr = listener.local_addr().unwrap();
1859        let client = vsock::VsockStream::connect(&addr).unwrap();
1860        let server = listener.incoming().next().unwrap().unwrap();
1861
1862        futures_util::try_join!(
1863            Builder::vsock_stream(server)
1864                .server(guid)
1865                .unwrap()
1866                .p2p()
1867                .auth_mechanism(AuthMechanism::Anonymous)
1868                .build(),
1869            Builder::vsock_stream(client).p2p().build(),
1870        )
1871    }
1872
1873    #[cfg(feature = "tokio-vsock")]
1874    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1875        use futures_util::StreamExt;
1876        use tokio_vsock::VsockAddr;
1877
1878        let guid = Guid::generate();
1879
1880        let listener = tokio_vsock::VsockListener::bind(VsockAddr::new(1, u32::MAX)).unwrap();
1881        let addr = listener.local_addr().unwrap();
1882        let client = tokio_vsock::VsockStream::connect(addr).await.unwrap();
1883        let server = listener.incoming().next().await.unwrap().unwrap();
1884
1885        futures_util::try_join!(
1886            Builder::vsock_stream(server)
1887                .server(guid)
1888                .unwrap()
1889                .p2p()
1890                .auth_mechanism(AuthMechanism::Anonymous)
1891                .build(),
1892            Builder::vsock_stream(client).p2p().build(),
1893        )
1894    }
1895
1896    #[test]
1897    #[timeout(15000)]
1898    fn channel_pair() {
1899        crate::utils::block_on(test_channel_pair()).unwrap();
1900    }
1901
1902    async fn test_channel_pair() -> Result<()> {
1903        let (server1, client1) = create_channel_pair().await;
1904        let (server2, client2) = create_channel_pair().await;
1905
1906        test_p2p(server1, client1, server2, client2).await
1907    }
1908
1909    async fn create_channel_pair() -> (Connection, Connection) {
1910        let (a, b) = socket::Channel::pair();
1911
1912        let guid = crate::Guid::generate();
1913        let conn1 = Builder::authenticated_socket(a, guid.clone())
1914            .unwrap()
1915            .p2p()
1916            .build()
1917            .await
1918            .unwrap();
1919        let conn2 = Builder::authenticated_socket(b, guid)
1920            .unwrap()
1921            .p2p()
1922            .build()
1923            .await
1924            .unwrap();
1925
1926        (conn1, conn2)
1927    }
1928}