zbus/proxy/
mod.rs

1//! The client-side proxy API.
2
3use enumflags2::{BitFlags, bitflags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use ordered_stream::{FromFuture, Join, Map, OrderedStream, PollResult, join as join_streams};
7use std::{
8    collections::{HashMap, HashSet},
9    fmt,
10    future::Future,
11    ops::Deref,
12    pin::Pin,
13    sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
14    task::{Context, Poll},
15};
16use tracing::{Instrument, debug, info_span, instrument, trace};
17
18use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
19use zvariant::{ObjectPath, OwnedValue, Str, Value};
20
21use crate::{
22    AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
23    fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
24    message::{Flags, Message, Sequence, Type},
25};
26
27mod builder;
28pub use builder::{Builder, CacheProperties};
29
30mod defaults;
31pub use defaults::Defaults;
32
33/// A client-side interface proxy.
34///
35/// A `Proxy` is a helper to interact with an interface on a remote object.
36///
37/// # Example
38///
39/// ```
40/// use std::result::Result;
41/// use std::error::Error;
42/// use zbus::{Connection, Proxy};
43///
44/// #[tokio::main]
45/// async fn main() -> Result<(), Box<dyn Error>> {
46///     let connection = Connection::session().await?;
47///     let p = Proxy::new(
48///         &connection,
49///         "org.freedesktop.DBus",
50///         "/org/freedesktop/DBus",
51///         "org.freedesktop.DBus",
52///     ).await?;
53///     // owned return value
54///     let _id: String = p.call("GetId", &()).await?;
55///     // borrowed return value
56///     let body = p.call_method("GetId", &()).await?.body();
57///     let _id: &str = body.deserialize()?;
58///
59///     Ok(())
60/// }
61/// ```
62///
63/// # Note
64///
65/// It is recommended to use the [`macro@crate::proxy`] macro, which provides a more
66/// convenient and type-safe *façade* `Proxy` derived from a Rust trait.
67#[derive(Clone, Debug)]
68pub struct Proxy<'a> {
69    pub(crate) inner: Arc<ProxyInner<'a>>,
70}
71
72/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
73/// (and possibly other crates).
74pub(crate) struct ProxyInnerStatic {
75    pub(crate) conn: Connection,
76    dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
77}
78
79impl fmt::Debug for ProxyInnerStatic {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        f.debug_struct("ProxyInnerStatic")
82            .field(
83                "dest_owner_change_match_rule",
84                &self.dest_owner_change_match_rule,
85            )
86            .finish_non_exhaustive()
87    }
88}
89
90#[derive(Debug)]
91pub(crate) struct ProxyInner<'a> {
92    inner_without_borrows: ProxyInnerStatic,
93    pub(crate) destination: BusName<'a>,
94    pub(crate) path: ObjectPath<'a>,
95    pub(crate) interface: InterfaceName<'a>,
96
97    /// Cache of property values.
98    property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
99    /// Set of properties which do not get cached, by name.
100    /// This overrides proxy-level caching behavior.
101    uncached_properties: HashSet<Str<'a>>,
102}
103
104impl Drop for ProxyInnerStatic {
105    fn drop(&mut self) {
106        if let Some(rule) = self.dest_owner_change_match_rule.take() {
107            self.conn.queue_remove_match(rule);
108        }
109    }
110}
111
112/// A property changed event.
113///
114/// The property changed event generated by [`PropertyStream`].
115pub struct PropertyChanged<'a, T> {
116    name: &'a str,
117    properties: Arc<PropertiesCache>,
118    proxy: Proxy<'a>,
119    phantom: std::marker::PhantomData<T>,
120}
121
122impl<T> PropertyChanged<'_, T> {
123    /// The name of the property that changed.
124    pub fn name(&self) -> &str {
125        self.name
126    }
127
128    /// Get the raw value of the property that changed.
129    ///
130    /// If the notification signal contained the new value, it has been cached already and this call
131    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
132    /// and cache the new value.
133    pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
134        struct Wrapper<'w> {
135            name: &'w str,
136            values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
137        }
138
139        impl Deref for Wrapper<'_> {
140            type Target = Value<'static>;
141
142            fn deref(&self) -> &Self::Target {
143                self.values
144                    .get(self.name)
145                    .expect("PropertyStream with no corresponding property")
146                    .value
147                    .as_ref()
148                    .expect("PropertyStream with no corresponding property")
149            }
150        }
151
152        {
153            let values = self.properties.values.read().expect("lock poisoned");
154            if values
155                .get(self.name)
156                .expect("PropertyStream with no corresponding property")
157                .value
158                .is_some()
159            {
160                return Ok(Wrapper {
161                    name: self.name,
162                    values,
163                });
164            }
165        }
166
167        // The property was invalidated, so we need to fetch the new value.
168        let properties_proxy = self.proxy.properties_proxy();
169        let value = properties_proxy
170            .get(self.proxy.inner.interface.clone(), self.name)
171            .await
172            .map_err(crate::Error::from)?;
173
174        // Save the new value
175        {
176            let mut values = self.properties.values.write().expect("lock poisoned");
177
178            values
179                .get_mut(self.name)
180                .expect("PropertyStream with no corresponding property")
181                .value = Some(value);
182        }
183
184        Ok(Wrapper {
185            name: self.name,
186            values: self.properties.values.read().expect("lock poisoned"),
187        })
188    }
189}
190
191impl<T> PropertyChanged<'_, T>
192where
193    T: TryFrom<zvariant::OwnedValue>,
194    T::Error: Into<crate::Error>,
195{
196    /// Get the value of the property that changed.
197    ///
198    /// If the notification signal contained the new value, it has been cached already and this call
199    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
200    /// and cache the new value.
201    pub async fn get(&self) -> Result<T> {
202        self.get_raw()
203            .await
204            .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
205    }
206}
207
208/// A [`stream::Stream`] implementation that yields property change notifications.
209///
210/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
211#[derive(Debug)]
212pub struct PropertyStream<'a, T> {
213    name: &'a str,
214    proxy: Proxy<'a>,
215    changed_listener: EventListener,
216    phantom: std::marker::PhantomData<T>,
217}
218
219impl<'a, T> stream::Stream for PropertyStream<'a, T>
220where
221    T: Unpin,
222{
223    type Item = PropertyChanged<'a, T>;
224
225    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
226        let m = self.get_mut();
227        let properties = match m.proxy.get_property_cache() {
228            Some(properties) => properties.clone(),
229            // With no cache, we will get no updates; return immediately
230            None => return Poll::Ready(None),
231        };
232        ready!(Pin::new(&mut m.changed_listener).poll(cx));
233
234        m.changed_listener = properties
235            .values
236            .read()
237            .expect("lock poisoned")
238            .get(m.name)
239            .expect("PropertyStream with no corresponding property")
240            .event
241            .listen();
242
243        Poll::Ready(Some(PropertyChanged {
244            name: m.name,
245            properties,
246            proxy: m.proxy.clone(),
247            phantom: std::marker::PhantomData,
248        }))
249    }
250}
251
252#[derive(Debug)]
253pub(crate) struct PropertiesCache {
254    values: RwLock<HashMap<String, PropertyValue>>,
255    caching_result: RwLock<CachingResult>,
256}
257
258#[derive(Debug)]
259enum CachingResult {
260    Caching { ready: Event },
261    Cached { result: Result<()> },
262}
263
264impl PropertiesCache {
265    #[instrument(skip_all, level = "trace")]
266    fn new(
267        proxy: PropertiesProxy<'static>,
268        interface: InterfaceName<'static>,
269        executor: &Executor<'_>,
270        uncached_properties: HashSet<zvariant::Str<'static>>,
271    ) -> (Arc<Self>, Task<()>) {
272        let cache = Arc::new(PropertiesCache {
273            values: Default::default(),
274            caching_result: RwLock::new(CachingResult::Caching {
275                ready: Event::new(),
276            }),
277        });
278
279        let cache_clone = cache.clone();
280        let task_name = format!("{interface} proxy caching");
281        let proxy_caching = async move {
282            let result = cache_clone
283                .init(proxy, interface, uncached_properties)
284                .await;
285            let (prop_changes, interface, uncached_properties) = {
286                let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
287                let ready = match &*caching_result {
288                    CachingResult::Caching { ready } => ready,
289                    // SAFETY: This is the only part of the code that changes this state and it's
290                    // only run once.
291                    _ => unreachable!(),
292                };
293                match result {
294                    Ok((prop_changes, interface, uncached_properties)) => {
295                        ready.notify(usize::MAX);
296                        *caching_result = CachingResult::Cached { result: Ok(()) };
297
298                        (prop_changes, interface, uncached_properties)
299                    }
300                    Err(e) => {
301                        ready.notify(usize::MAX);
302                        *caching_result = CachingResult::Cached { result: Err(e) };
303
304                        return;
305                    }
306                }
307            };
308
309            if let Err(e) = cache_clone
310                .keep_updated(prop_changes, interface, uncached_properties)
311                .await
312            {
313                debug!("Error keeping properties cache updated: {e}");
314            }
315        }
316        .instrument(info_span!("{}", task_name));
317        let task = executor.spawn(proxy_caching, &task_name);
318
319        (cache, task)
320    }
321
322    /// new() runs this in a task it spawns for initialization of properties cache.
323    async fn init(
324        &self,
325        proxy: PropertiesProxy<'static>,
326        interface: InterfaceName<'static>,
327        uncached_properties: HashSet<zvariant::Str<'static>>,
328    ) -> Result<(
329        PropertiesChangedStream,
330        InterfaceName<'static>,
331        HashSet<zvariant::Str<'static>>,
332    )> {
333        use ordered_stream::OrderedStreamExt;
334
335        let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
336
337        let get_all = proxy
338            .inner()
339            .connection()
340            .call_method_raw(
341                Some(proxy.inner().destination()),
342                proxy.inner().path(),
343                Some(proxy.inner().interface()),
344                "GetAll",
345                BitFlags::empty(),
346                &interface,
347            )
348            .await
349            .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
350
351        let mut join = join_streams(prop_changes, get_all);
352
353        loop {
354            match join.next().await {
355                Some(Either::Left(_update)) => {
356                    // discard updates prior to the initial population
357                }
358                Some(Either::Right(populate)) => {
359                    populate?.body().deserialize().map(|values| {
360                        self.update_cache(&uncached_properties, &values, &[], &interface);
361                    })?;
362                    break;
363                }
364                None => break,
365            }
366        }
367        if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
368            // if an update was buffered, then it happened after the get_all returned and needs to
369            // be applied before we discard the join
370            if let Ok(args) = update.args() {
371                if args.interface_name == interface {
372                    self.update_cache(
373                        &uncached_properties,
374                        &args.changed_properties,
375                        &args.invalidated_properties,
376                        &interface,
377                    );
378                }
379            }
380        }
381        // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
382        // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
383        // of directly to the stream.
384        let prop_changes = join.into_inner().0.into_inner();
385
386        Ok((prop_changes, interface, uncached_properties))
387    }
388
389    /// new() runs this in a task it spawns for keeping the cache in sync.
390    #[instrument(skip_all, level = "trace")]
391    async fn keep_updated(
392        &self,
393        mut prop_changes: PropertiesChangedStream,
394        interface: InterfaceName<'static>,
395        uncached_properties: HashSet<zvariant::Str<'static>>,
396    ) -> Result<()> {
397        use futures_lite::StreamExt;
398
399        trace!("Listening for property changes on {interface}...");
400        while let Some(update) = prop_changes.next().await {
401            if let Ok(args) = update.args() {
402                if args.interface_name == interface {
403                    self.update_cache(
404                        &uncached_properties,
405                        &args.changed_properties,
406                        &args.invalidated_properties,
407                        &interface,
408                    );
409                }
410            }
411        }
412
413        Ok(())
414    }
415
416    fn update_cache(
417        &self,
418        uncached_properties: &HashSet<Str<'_>>,
419        changed: &HashMap<&str, Value<'_>>,
420        invalidated: &[&str],
421        interface: &InterfaceName<'_>,
422    ) {
423        let mut values = self.values.write().expect("lock poisoned");
424
425        for inval in invalidated {
426            if uncached_properties.contains(&Str::from(*inval)) {
427                debug!(
428                    "Ignoring invalidation of uncached property `{}.{}`",
429                    interface, inval
430                );
431                continue;
432            }
433            trace!("Property `{interface}.{inval}` invalidated");
434
435            if let Some(entry) = values.get_mut(*inval) {
436                entry.value = None;
437                entry.event.notify(usize::MAX);
438            }
439        }
440
441        for (property_name, value) in changed {
442            if uncached_properties.contains(&Str::from(*property_name)) {
443                debug!(
444                    "Ignoring update of uncached property `{}.{}`",
445                    interface, property_name
446                );
447                continue;
448            }
449            trace!("Property `{interface}.{property_name}` updated");
450
451            let entry = values.entry(property_name.to_string()).or_default();
452
453            let value = match OwnedValue::try_from(value) {
454                Ok(value) => value,
455                Err(e) => {
456                    debug!(
457                        "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
458                    );
459                    continue;
460                }
461            };
462            entry.value = Some(value);
463            entry.event.notify(usize::MAX);
464        }
465    }
466
467    /// Wait for the cache to be populated and return any error encountered during population.
468    pub(crate) async fn ready(&self) -> Result<()> {
469        let listener = match &*self.caching_result.read().expect("lock poisoned") {
470            CachingResult::Caching { ready } => ready.listen(),
471            CachingResult::Cached { result } => return result.clone(),
472        };
473        listener.await;
474
475        // It must be ready now.
476        match &*self.caching_result.read().expect("lock poisoned") {
477            // SAFETY: We were just notified that state has changed to `Cached` and we never go back
478            // to `Caching` once in `Cached`.
479            CachingResult::Caching { .. } => unreachable!(),
480            CachingResult::Cached { result } => result.clone(),
481        }
482    }
483}
484
485impl<'a> ProxyInner<'a> {
486    pub(crate) fn new(
487        conn: Connection,
488        destination: BusName<'a>,
489        path: ObjectPath<'a>,
490        interface: InterfaceName<'a>,
491        cache: CacheProperties,
492        uncached_properties: HashSet<Str<'a>>,
493    ) -> Self {
494        let property_cache = match cache {
495            CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
496            CacheProperties::No => None,
497        };
498        Self {
499            inner_without_borrows: ProxyInnerStatic {
500                conn,
501                dest_owner_change_match_rule: OnceLock::new(),
502            },
503            destination,
504            path,
505            interface,
506            property_cache,
507            uncached_properties,
508        }
509    }
510
511    /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
512    ///
513    /// If the destination is a unique name, we will not subscribe to the signal.
514    pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
515        if !self.inner_without_borrows.conn.is_bus() {
516            // Names don't mean much outside the bus context.
517            return Ok(());
518        }
519
520        let well_known_name = match &self.destination {
521            BusName::WellKnown(well_known_name) => well_known_name,
522            BusName::Unique(_) => return Ok(()),
523        };
524
525        if self
526            .inner_without_borrows
527            .dest_owner_change_match_rule
528            .get()
529            .is_some()
530        {
531            // Already watching over the bus for any name updates so nothing to do here.
532            return Ok(());
533        }
534
535        let conn = &self.inner_without_borrows.conn;
536        let signal_rule: OwnedMatchRule = MatchRule::builder()
537            .msg_type(Type::Signal)
538            .sender("org.freedesktop.DBus")?
539            .path("/org/freedesktop/DBus")?
540            .interface("org.freedesktop.DBus")?
541            .member("NameOwnerChanged")?
542            .add_arg(well_known_name.as_str())?
543            .build()
544            .to_owned()
545            .into();
546
547        conn.add_match(
548            signal_rule.clone(),
549            Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
550        )
551        .await?;
552
553        if self
554            .inner_without_borrows
555            .dest_owner_change_match_rule
556            .set(signal_rule.clone())
557            .is_err()
558        {
559            // we raced another destination_unique_name call and added it twice
560            conn.remove_match(signal_rule).await?;
561        }
562
563        Ok(())
564    }
565}
566
567const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
568
569impl<'a> Proxy<'a> {
570    /// Create a new `Proxy` for the given destination/path/interface.
571    pub async fn new<D, P, I>(
572        conn: &Connection,
573        destination: D,
574        path: P,
575        interface: I,
576    ) -> Result<Proxy<'a>>
577    where
578        D: TryInto<BusName<'a>>,
579        P: TryInto<ObjectPath<'a>>,
580        I: TryInto<InterfaceName<'a>>,
581        D::Error: Into<Error>,
582        P::Error: Into<Error>,
583        I::Error: Into<Error>,
584    {
585        Builder::new(conn)
586            .destination(destination)?
587            .path(path)?
588            .interface(interface)?
589            .build()
590            .await
591    }
592
593    /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
594    /// passed arguments.
595    pub async fn new_owned<D, P, I>(
596        conn: Connection,
597        destination: D,
598        path: P,
599        interface: I,
600    ) -> Result<Proxy<'a>>
601    where
602        D: TryInto<BusName<'static>>,
603        P: TryInto<ObjectPath<'static>>,
604        I: TryInto<InterfaceName<'static>>,
605        D::Error: Into<Error>,
606        P::Error: Into<Error>,
607        I::Error: Into<Error>,
608    {
609        Builder::new(&conn)
610            .destination(destination)?
611            .path(path)?
612            .interface(interface)?
613            .build()
614            .await
615    }
616
617    /// Get a reference to the associated connection.
618    pub fn connection(&self) -> &Connection {
619        &self.inner.inner_without_borrows.conn
620    }
621
622    /// Get a reference to the destination service name.
623    pub fn destination(&self) -> &BusName<'a> {
624        &self.inner.destination
625    }
626
627    /// Get a reference to the object path.
628    pub fn path(&self) -> &ObjectPath<'a> {
629        &self.inner.path
630    }
631
632    /// Get a reference to the interface.
633    pub fn interface(&self) -> &InterfaceName<'a> {
634        &self.inner.interface
635    }
636
637    /// Introspect the associated object, and return the XML description.
638    ///
639    /// See the [xml](https://docs.rs/zbus_xml) crate for parsing the
640    /// result.
641    pub async fn introspect(&self) -> fdo::Result<String> {
642        let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
643            .destination(&self.inner.destination)?
644            .path(&self.inner.path)?
645            .build()
646            .await?;
647
648        proxy.introspect().await
649    }
650
651    fn properties_proxy(&self) -> PropertiesProxy<'_> {
652        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
653            // Safe because already checked earlier
654            .destination(self.inner.destination.as_ref())
655            .unwrap()
656            // Safe because already checked earlier
657            .path(self.inner.path.as_ref())
658            .unwrap()
659            // does not have properties
660            .cache_properties(CacheProperties::No)
661            .build_internal()
662            .unwrap()
663            .into()
664    }
665
666    fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
667        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
668            // Safe because already checked earlier
669            .destination(self.inner.destination.to_owned())
670            .unwrap()
671            // Safe because already checked earlier
672            .path(self.inner.path.to_owned())
673            .unwrap()
674            // does not have properties
675            .cache_properties(CacheProperties::No)
676            .build_internal()
677            .unwrap()
678            .into()
679    }
680
681    /// Get the cache, starting it in the background if needed.
682    ///
683    /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
684    /// encountered in the population.
685    pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
686        let cache = match &self.inner.property_cache {
687            Some(cache) => cache,
688            None => return None,
689        };
690        let (cache, _) = &cache.get_or_init(|| {
691            let proxy = self.owned_properties_proxy();
692            let interface = self.interface().to_owned();
693            let uncached_properties: HashSet<zvariant::Str<'static>> = self
694                .inner
695                .uncached_properties
696                .iter()
697                .map(|s| s.to_owned())
698                .collect();
699            let executor = self.connection().executor();
700
701            PropertiesCache::new(proxy, interface, executor, uncached_properties)
702        });
703
704        Some(cache)
705    }
706
707    /// Get the cached value of the property `property_name`.
708    ///
709    /// This returns `None` if the property is not in the cache.  This could be because the cache
710    /// was invalidated by an update, because caching was disabled for this property or proxy, or
711    /// because the cache has not yet been populated.  Use `get_property` to fetch the value from
712    /// the peer.
713    pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
714    where
715        T: TryFrom<OwnedValue>,
716        T::Error: Into<Error>,
717    {
718        self.cached_property_raw(property_name)
719            .as_deref()
720            .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
721            .transpose()
722    }
723
724    /// Get the cached value of the property `property_name`.
725    ///
726    /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
727    /// is useful if you want to avoid allocations and cloning.
728    pub fn cached_property_raw<'p>(
729        &'p self,
730        property_name: &'p str,
731    ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
732        if let Some(values) = self
733            .inner
734            .property_cache
735            .as_ref()
736            .and_then(OnceLock::get)
737            .map(|c| c.0.values.read().expect("lock poisoned"))
738        {
739            // ensure that the property is in the cache.
740            values
741                .get(property_name)
742                // if the property value has not yet been cached, this will return None.
743                .and_then(|e| e.value.as_ref())?;
744
745            struct Wrapper<'a> {
746                values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
747                property_name: &'a str,
748            }
749
750            impl Deref for Wrapper<'_> {
751                type Target = Value<'static>;
752
753                fn deref(&self) -> &Self::Target {
754                    self.values
755                        .get(self.property_name)
756                        .and_then(|e| e.value.as_ref())
757                        .map(|v| v.deref())
758                        .expect("inexistent property")
759                }
760            }
761
762            Some(Wrapper {
763                values,
764                property_name,
765            })
766        } else {
767            None
768        }
769    }
770
771    async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
772        Ok(self
773            .properties_proxy()
774            .get(self.inner.interface.as_ref(), property_name)
775            .await?)
776    }
777
778    /// Get the property `property_name`.
779    ///
780    /// Get the property value from the cache (if caching is enabled) or call the
781    /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
782    pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
783    where
784        T: TryFrom<OwnedValue>,
785        T::Error: Into<Error>,
786    {
787        if let Some(cache) = self.get_property_cache() {
788            cache.ready().await?;
789        }
790        if let Some(value) = self.cached_property(property_name)? {
791            return Ok(value);
792        }
793
794        let value = self.get_proxy_property(property_name).await?;
795        value.try_into().map_err(Into::into)
796    }
797
798    /// Set the property `property_name`.
799    ///
800    /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
801    pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
802    where
803        T: 't + Into<Value<'t>>,
804    {
805        self.properties_proxy()
806            .set(self.inner.interface.as_ref(), property_name, value.into())
807            .await
808    }
809
810    /// Call a method and return the reply.
811    ///
812    /// Typically, you would want to use [`call`] method instead. Use this method if you need to
813    /// deserialize the reply message manually (this way, you can avoid the memory
814    /// allocation/copying, by deserializing the reply to an unowned type).
815    ///
816    /// [`call`]: struct.Proxy.html#method.call
817    pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
818    where
819        M: TryInto<MemberName<'m>>,
820        M::Error: Into<Error>,
821        B: serde::ser::Serialize + zvariant::DynamicType,
822    {
823        self.inner
824            .inner_without_borrows
825            .conn
826            .call_method(
827                Some(&self.inner.destination),
828                self.inner.path.as_str(),
829                Some(&self.inner.interface),
830                method_name,
831                body,
832            )
833            .await
834    }
835
836    /// Call a method and return the reply body.
837    ///
838    /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
839    ///
840    /// [`call_method`]: struct.Proxy.html#method.call_method
841    pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
842    where
843        M: TryInto<MemberName<'m>>,
844        M::Error: Into<Error>,
845        B: serde::ser::Serialize + zvariant::DynamicType,
846        R: for<'d> zvariant::DynamicDeserialize<'d>,
847    {
848        let reply = self.call_method(method_name, body).await?;
849
850        reply.body().deserialize()
851    }
852
853    /// Call a method and return the reply body, optionally supplying a set of
854    /// method flags to control the way the method call message is sent and handled.
855    ///
856    /// Use [`call`] instead if you do not need any special handling via additional flags.
857    /// If the `NoReplyExpected` flag is passed, this will return None immediately
858    /// after sending the message, similar to [`call_noreply`].
859    ///
860    /// [`call`]: struct.Proxy.html#method.call
861    /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
862    pub async fn call_with_flags<'m, M, B, R>(
863        &self,
864        method_name: M,
865        flags: BitFlags<MethodFlags>,
866        body: &B,
867    ) -> Result<Option<R>>
868    where
869        M: TryInto<MemberName<'m>>,
870        M::Error: Into<Error>,
871        B: serde::ser::Serialize + zvariant::DynamicType,
872        R: for<'d> zvariant::DynamicDeserialize<'d>,
873    {
874        let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
875        match self
876            .inner
877            .inner_without_borrows
878            .conn
879            .call_method_raw(
880                Some(self.destination()),
881                self.path(),
882                Some(self.interface()),
883                method_name,
884                flags,
885                body,
886            )
887            .await?
888        {
889            Some(reply) => reply.await?.body().deserialize().map(Some),
890            None => Ok(None),
891        }
892    }
893
894    /// Call a method without expecting a reply.
895    ///
896    /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
897    pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
898    where
899        M: TryInto<MemberName<'m>>,
900        M::Error: Into<Error>,
901        B: serde::ser::Serialize + zvariant::DynamicType,
902    {
903        self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
904            .await?;
905        Ok(())
906    }
907
908    /// Create a stream for the signal named `signal_name`.
909    ///
910    /// # Errors
911    ///
912    /// Apart from general I/O errors that can result from socket communications, calling this
913    /// method will also result in an error if the destination service has not yet registered its
914    /// well-known name with the bus (assuming you're using the well-known name as destination).
915    pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
916    where
917        M: TryInto<MemberName<'m>>,
918        M::Error: Into<Error>,
919    {
920        self.receive_signal_with_args(signal_name, &[]).await
921    }
922
923    /// Same as [`Proxy::receive_signal`] but with a filter.
924    ///
925    /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
926    /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
927    /// this method where possible. Note that this filtering is limited to arguments of string
928    /// types.
929    ///
930    /// The arguments are passed as tuples of argument index and expected value.
931    pub async fn receive_signal_with_args<'m, M>(
932        &self,
933        signal_name: M,
934        args: &[(u8, &str)],
935    ) -> Result<SignalStream<'m>>
936    where
937        M: TryInto<MemberName<'m>>,
938        M::Error: Into<Error>,
939    {
940        let signal_name = signal_name.try_into().map_err(Into::into)?;
941        self.receive_signals(Some(signal_name), args).await
942    }
943
944    async fn receive_signals<'m>(
945        &self,
946        signal_name: Option<MemberName<'m>>,
947        args: &[(u8, &str)],
948    ) -> Result<SignalStream<'m>> {
949        self.inner.subscribe_dest_owner_change().await?;
950
951        SignalStream::new(self.clone(), signal_name, args).await
952    }
953
954    /// Create a stream for all signals emitted by this service.
955    pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
956        self.receive_signals(None, &[]).await
957    }
958
959    /// Get a stream to receive property changed events.
960    ///
961    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
962    /// will only receive the last update.
963    ///
964    /// The stream will yield the current value first, then wait for the value changes. If caching
965    /// is not enabled on this proxy, the resulting stream will not return any events.
966    pub async fn receive_property_changed<'name: 'a, T>(
967        &self,
968        name: &'name str,
969    ) -> PropertyStream<'a, T> {
970        let properties = self.get_property_cache();
971        let changed_listener = if let Some(properties) = &properties {
972            let mut values = properties.values.write().expect("lock poisoned");
973            let entry = values
974                .entry(name.to_string())
975                .or_insert_with(PropertyValue::default);
976            let listener = entry.event.listen();
977            if entry.value.is_some() {
978                entry.event.notify(1);
979            }
980            listener
981        } else {
982            Event::new().listen()
983        };
984
985        PropertyStream {
986            name,
987            proxy: self.clone(),
988            changed_listener,
989            phantom: std::marker::PhantomData,
990        }
991    }
992
993    /// Get a stream to receive destination owner changed events.
994    ///
995    /// If the proxy destination is a unique name, the stream will be notified of the peer
996    /// disconnection from the bus (with a `None` value).
997    ///
998    /// If the proxy destination is a well-known name, the stream will be notified whenever the name
999    /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
1000    /// name is released (with a `None` value).
1001    ///
1002    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
1003    /// will only receive the last update.
1004    pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
1005        use ordered_stream::OrderedStreamExt;
1006        let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1007            .cache_properties(CacheProperties::No)
1008            .build()
1009            .await?;
1010        Ok(OwnerChangedStream {
1011            stream: dbus_proxy
1012                .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1013                .await?
1014                .map(Box::new(move |signal| {
1015                    let args = signal.args().unwrap();
1016
1017                    args.new_owner().as_ref().map(|owner| owner.to_owned())
1018                })),
1019            name: self.destination().clone(),
1020        })
1021    }
1022}
1023
1024#[derive(Debug, Default)]
1025struct PropertyValue {
1026    value: Option<OwnedValue>,
1027    event: Event,
1028}
1029
1030/// Flags to use with [`Proxy::call_with_flags`].
1031#[bitflags]
1032#[repr(u8)]
1033#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1034pub enum MethodFlags {
1035    /// No response is expected from this method call, regardless of whether the
1036    /// signature for the interface method indicates a reply type. When passed,
1037    /// `call_with_flags` will return `Ok(None)` immediately after successfully
1038    /// sending the method call.
1039    ///
1040    /// Errors encountered while *making* the call will still be returned as
1041    /// an `Err` variant, but any errors that are triggered by the receiver's
1042    /// handling of the call will not be delivered.
1043    NoReplyExpected = 0x1,
1044
1045    /// When set on a call whose destination is a message bus, this flag will instruct
1046    /// the bus not to [launch][al] a service to handle the call if no application
1047    /// on the bus owns the requested name.
1048    ///
1049    /// This flag is ignored when using a peer-to-peer connection.
1050    ///
1051    /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
1052    NoAutoStart = 0x2,
1053
1054    /// Indicates to the receiver that this client is prepared to wait for interactive
1055    /// authorization, which might take a considerable time to complete. For example, the receiver
1056    /// may query the user for confirmation via [polkit] or a similar framework.
1057    ///
1058    /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
1059    AllowInteractiveAuth = 0x4,
1060}
1061
1062impl From<MethodFlags> for Flags {
1063    fn from(method_flag: MethodFlags) -> Self {
1064        match method_flag {
1065            MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1066            MethodFlags::NoAutoStart => Self::NoAutoStart,
1067            MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1068        }
1069    }
1070}
1071
1072type OwnerChangedStreamMap = Map<
1073    fdo::NameOwnerChangedStream,
1074    Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1075>;
1076
1077/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
1078///
1079/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
1080pub struct OwnerChangedStream<'a> {
1081    stream: OwnerChangedStreamMap,
1082    name: BusName<'a>,
1083}
1084
1085impl<'a> OwnerChangedStream<'a> {
1086    /// The bus name being tracked.
1087    pub fn name(&self) -> &BusName<'a> {
1088        &self.name
1089    }
1090}
1091
1092impl stream::Stream for OwnerChangedStream<'_> {
1093    type Item = Option<UniqueName<'static>>;
1094
1095    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1096        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1097    }
1098}
1099
1100impl OrderedStream for OwnerChangedStream<'_> {
1101    type Data = Option<UniqueName<'static>>;
1102    type Ordering = Sequence;
1103
1104    fn poll_next_before(
1105        self: Pin<&mut Self>,
1106        cx: &mut Context<'_>,
1107        before: Option<&Self::Ordering>,
1108    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1109        Pin::new(&mut self.get_mut().stream).poll_next_before(cx, before)
1110    }
1111}
1112
1113/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
1114///
1115/// Use [`Proxy::receive_signal`] to create an instance of this type.
1116///
1117/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
1118/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
1119#[derive(Debug)]
1120pub struct SignalStream<'a> {
1121    stream: Join<MessageStream, Option<MessageStream>>,
1122    src_unique_name: Option<UniqueName<'static>>,
1123    signal_name: Option<MemberName<'a>>,
1124}
1125
1126impl<'a> SignalStream<'a> {
1127    /// The signal name.
1128    pub fn name(&self) -> Option<&MemberName<'a>> {
1129        self.signal_name.as_ref()
1130    }
1131
1132    async fn new(
1133        proxy: Proxy<'_>,
1134        signal_name: Option<MemberName<'a>>,
1135        args: &[(u8, &str)],
1136    ) -> Result<SignalStream<'a>> {
1137        let mut rule_builder = MatchRule::builder()
1138            .msg_type(Type::Signal)
1139            .sender(proxy.destination())?
1140            .path(proxy.path())?
1141            .interface(proxy.interface())?;
1142        if let Some(name) = &signal_name {
1143            rule_builder = rule_builder.member(name)?;
1144        }
1145        for (i, arg) in args {
1146            rule_builder = rule_builder.arg(*i, *arg)?;
1147        }
1148        let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1149        let conn = proxy.connection();
1150
1151        let (src_unique_name, stream) = match proxy.destination().to_owned() {
1152            BusName::Unique(name) => (
1153                Some(name),
1154                join_streams(
1155                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1156                    None,
1157                ),
1158            ),
1159            BusName::WellKnown(name) => {
1160                use ordered_stream::OrderedStreamExt;
1161
1162                let name_owner_changed_rule = MatchRule::builder()
1163                    .msg_type(Type::Signal)
1164                    .sender("org.freedesktop.DBus")?
1165                    .path("/org/freedesktop/DBus")?
1166                    .interface("org.freedesktop.DBus")?
1167                    .member("NameOwnerChanged")?
1168                    .add_arg(name.as_str())?
1169                    .build();
1170                let name_owner_changed_stream = MessageStream::for_match_rule(
1171                    name_owner_changed_rule,
1172                    conn,
1173                    Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1174                )
1175                .await?
1176                .map(Either::Left);
1177
1178                let get_name_owner = conn
1179                    .call_method_raw(
1180                        Some("org.freedesktop.DBus"),
1181                        "/org/freedesktop/DBus",
1182                        Some("org.freedesktop.DBus"),
1183                        "GetNameOwner",
1184                        BitFlags::empty(),
1185                        &name,
1186                    )
1187                    .await
1188                    .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1189
1190                let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1191
1192                let mut src_unique_name = loop {
1193                    match join.next().await {
1194                        Some(Either::Left(Ok(msg))) => {
1195                            let signal = NameOwnerChanged::from_message(msg)
1196                                .expect("`NameOwnerChanged` signal stream got wrong message");
1197                            {
1198                                break signal
1199                                    .args()
1200                                    // SAFETY: The filtering code couldn't have let this through if
1201                                    // args were not in order.
1202                                    .expect("`NameOwnerChanged` signal has no args")
1203                                    .new_owner()
1204                                    .as_ref()
1205                                    .map(UniqueName::to_owned);
1206                            }
1207                        }
1208                        Some(Either::Left(Err(_))) => (),
1209                        Some(Either::Right(Ok(response))) => {
1210                            break Some(
1211                                response.body().deserialize::<UniqueName<'_>>()?.to_owned(),
1212                            );
1213                        }
1214                        Some(Either::Right(Err(e))) => {
1215                            // Probably the name is not owned. Not a problem but let's still log it.
1216                            debug!("Failed to get owner of {name}: {e}");
1217
1218                            break None;
1219                        }
1220                        None => {
1221                            return Err(Error::InputOutput(
1222                                std::io::Error::new(
1223                                    std::io::ErrorKind::BrokenPipe,
1224                                    "connection closed",
1225                                )
1226                                .into(),
1227                            ));
1228                        }
1229                    }
1230                };
1231
1232                // Let's take into account any buffered NameOwnerChanged signal.
1233                let (stream, _, queued) = join.into_inner();
1234                if let Some(msg) = queued.and_then(|e| match e.0 {
1235                    Either::Left(Ok(msg)) => Some(msg),
1236                    Either::Left(Err(_)) | Either::Right(_) => None,
1237                }) {
1238                    if let Some(signal) = NameOwnerChanged::from_message(msg) {
1239                        if let Ok(args) = signal.args() {
1240                            match (args.name(), args.new_owner().deref()) {
1241                                (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1242                                    src_unique_name = Some(new_owner.to_owned());
1243                                }
1244                                _ => (),
1245                            }
1246                        }
1247                    }
1248                }
1249                let name_owner_changed_stream = stream.into_inner();
1250
1251                let stream = join_streams(
1252                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1253                    Some(name_owner_changed_stream),
1254                );
1255
1256                (src_unique_name, stream)
1257            }
1258        };
1259
1260        Ok(SignalStream {
1261            stream,
1262            src_unique_name,
1263            signal_name,
1264        })
1265    }
1266
1267    fn filter(&mut self, msg: &Message) -> Result<bool> {
1268        let header = msg.header();
1269        let sender = header.sender();
1270        if sender == self.src_unique_name.as_ref() {
1271            return Ok(true);
1272        }
1273
1274        // The src_unique_name must be maintained in lock-step with the applied filter
1275        if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1276            let args = signal.args()?;
1277            self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1278        }
1279
1280        Ok(false)
1281    }
1282}
1283
1284impl stream::Stream for SignalStream<'_> {
1285    type Item = Message;
1286
1287    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1288        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1289    }
1290}
1291
1292impl OrderedStream for SignalStream<'_> {
1293    type Data = Message;
1294    type Ordering = Sequence;
1295
1296    fn poll_next_before(
1297        self: Pin<&mut Self>,
1298        cx: &mut Context<'_>,
1299        before: Option<&Self::Ordering>,
1300    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1301        let this = self.get_mut();
1302        loop {
1303            match ready!(OrderedStream::poll_next_before(
1304                Pin::new(&mut this.stream),
1305                cx,
1306                before
1307            )) {
1308                PollResult::Item { data, ordering } => {
1309                    if let Ok(msg) = data {
1310                        if let Ok(true) = this.filter(&msg) {
1311                            return Poll::Ready(PollResult::Item {
1312                                data: msg,
1313                                ordering,
1314                            });
1315                        }
1316                    }
1317                }
1318                PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1319                PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1320            }
1321        }
1322    }
1323}
1324
1325impl stream::FusedStream for SignalStream<'_> {
1326    fn is_terminated(&self) -> bool {
1327        ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1328    }
1329}
1330
1331#[async_trait::async_trait]
1332impl AsyncDrop for SignalStream<'_> {
1333    async fn async_drop(self) {
1334        let (signals, names, _buffered) = self.stream.into_inner();
1335        signals.async_drop().await;
1336        if let Some(names) = names {
1337            names.async_drop().await;
1338        }
1339    }
1340}
1341
1342#[cfg(feature = "blocking-api")]
1343impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1344    fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1345        proxy.into_inner()
1346    }
1347}
1348
1349/// This trait is implemented by all async proxies, which are generated with the
1350/// [`proxy`](macro@zbus::proxy) macro.
1351pub trait ProxyImpl<'c>
1352where
1353    Self: Sized,
1354{
1355    /// Return a customizable builder for this proxy.
1356    fn builder(conn: &Connection) -> Builder<'c, Self>;
1357
1358    /// Consume `self`, returning the underlying `zbus::Proxy`.
1359    fn into_inner(self) -> Proxy<'c>;
1360
1361    /// The reference to the underlying `zbus::Proxy`.
1362    fn inner(&self) -> &Proxy<'c>;
1363}
1364
1365enum Either<L, R> {
1366    Left(L),
1367    Right(R),
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use super::*;
1373    use crate::{connection, interface, object_server::SignalEmitter, proxy, utils::block_on};
1374    use futures_util::StreamExt;
1375    use ntest::timeout;
1376    use test_log::test;
1377
1378    #[test]
1379    #[timeout(15000)]
1380    fn signal() {
1381        block_on(test_signal()).unwrap();
1382    }
1383
1384    async fn test_signal() -> Result<()> {
1385        // Register a well-known name with the session bus and ensure we get the appropriate
1386        // signals called for that.
1387        let conn = Connection::session().await?;
1388        let dest_conn = Connection::session().await?;
1389        let unique_name = dest_conn.unique_name().unwrap().clone();
1390
1391        let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1392        let proxy: Proxy<'_> = Builder::new(&conn)
1393            .destination(well_known)?
1394            .path("/does/not/matter")?
1395            .interface("does.not.matter")?
1396            .build()
1397            .await?;
1398        let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1399
1400        let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1401        let mut name_acquired_stream = proxy
1402            .inner()
1403            .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1404            .await?;
1405
1406        let prop_stream = proxy
1407            .inner()
1408            .receive_property_changed("SomeProp")
1409            .await
1410            .filter_map(|changed| async move {
1411                let v: Option<u32> = changed.get().await.ok();
1412                dbg!(v)
1413            });
1414        drop(proxy);
1415        drop(prop_stream);
1416
1417        dest_conn.request_name(well_known).await?;
1418
1419        let (new_owner, acquired_signal) =
1420            futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1421
1422        assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1423
1424        let acquired_signal = acquired_signal.unwrap();
1425        assert_eq!(
1426            acquired_signal.body().deserialize::<&str>().unwrap(),
1427            well_known
1428        );
1429
1430        let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1431        let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1432
1433        drop(dest_conn);
1434        name_acquired_stream.async_drop().await;
1435
1436        // There shouldn't be an owner anymore.
1437        let new_owner = owner_changed_stream.next().await;
1438        assert!(new_owner.unwrap().is_none());
1439
1440        let new_unique_owner = unique_name_changed_stream.next().await;
1441        assert!(new_unique_owner.unwrap().is_none());
1442
1443        Ok(())
1444    }
1445
1446    #[test]
1447    #[timeout(15000)]
1448    fn signal_stream_deadlock() {
1449        block_on(test_signal_stream_deadlock()).unwrap();
1450    }
1451
1452    /// Tests deadlocking in signal reception when the message queue is full.
1453    ///
1454    /// Creates a connection with a small message queue, and a service that
1455    /// emits signals at a high rate. First a listener is created that listens
1456    /// for that signal which should fill the small queue. Then another signal
1457    /// signal listener is created against another signal. Previously, this second
1458    /// call to add the match rule never resolved and resulted in a deadlock.
1459    async fn test_signal_stream_deadlock() -> Result<()> {
1460        #[proxy(
1461            gen_blocking = false,
1462            default_path = "/org/zbus/Test",
1463            default_service = "org.zbus.Test.MR501",
1464            interface = "org.zbus.Test"
1465        )]
1466        trait Test {
1467            #[zbus(signal)]
1468            fn my_signal(&self, msg: &str) -> Result<()>;
1469        }
1470
1471        struct TestIface;
1472
1473        #[interface(name = "org.zbus.Test")]
1474        impl TestIface {
1475            #[zbus(signal)]
1476            async fn my_signal(context: &SignalEmitter<'_>, msg: &'static str) -> Result<()>;
1477        }
1478
1479        let test_iface = TestIface;
1480        let server_conn = connection::Builder::session()?
1481            .name("org.zbus.Test.MR501")?
1482            .serve_at("/org/zbus/Test", test_iface)?
1483            .build()
1484            .await?;
1485
1486        let client_conn = connection::Builder::session()?
1487            .max_queued(1)
1488            .build()
1489            .await?;
1490
1491        let test_proxy = TestProxy::new(&client_conn).await?;
1492        let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1493            .destination("org.zbus.Test.MR501")?
1494            .path("/org/zbus/Test")?
1495            .build()
1496            .await?;
1497
1498        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1499
1500        let handle = {
1501            let tx = tx.clone();
1502            let conn = server_conn.clone();
1503            let server_fut = async move {
1504                use std::time::Duration;
1505
1506                #[cfg(not(feature = "tokio"))]
1507                use async_io::Timer;
1508
1509                #[cfg(feature = "tokio")]
1510                use tokio::time::sleep;
1511
1512                let iface_ref = conn
1513                    .object_server()
1514                    .interface::<_, TestIface>("/org/zbus/Test")
1515                    .await
1516                    .unwrap();
1517
1518                let context = iface_ref.signal_emitter();
1519                while !tx.is_closed() {
1520                    for _ in 0..10 {
1521                        TestIface::my_signal(context, "This is a test")
1522                            .await
1523                            .unwrap();
1524                    }
1525
1526                    #[cfg(not(feature = "tokio"))]
1527                    Timer::after(Duration::from_millis(5)).await;
1528
1529                    #[cfg(feature = "tokio")]
1530                    sleep(Duration::from_millis(5)).await;
1531                }
1532            };
1533            server_conn.executor().spawn(server_fut, "server_task")
1534        };
1535
1536        let signal_fut = async {
1537            let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1538
1539            tx.send(()).await.unwrap();
1540
1541            while let Some(_signal) = signal_stream.next().await {}
1542        };
1543
1544        let prop_fut = async move {
1545            rx.recv().await.unwrap();
1546            let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1547        };
1548
1549        futures_util::pin_mut!(signal_fut);
1550        futures_util::pin_mut!(prop_fut);
1551
1552        futures_util::future::select(signal_fut, prop_fut).await;
1553
1554        handle.await?;
1555
1556        Ok(())
1557    }
1558}