1use enumflags2::{BitFlags, bitflags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use ordered_stream::{FromFuture, Join, Map, OrderedStream, PollResult, join as join_streams};
7use std::{
8 collections::{HashMap, HashSet},
9 fmt,
10 future::Future,
11 ops::Deref,
12 pin::Pin,
13 sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
14 task::{Context, Poll},
15};
16use tracing::{Instrument, debug, info_span, instrument, trace, warn};
17
18use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
19use zvariant::{ObjectPath, OwnedValue, Str, Value};
20
21use crate::{
22 AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
23 fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
24 message::{Flags, Message, Sequence, Type},
25};
26
27mod builder;
28pub use builder::{Builder, CacheProperties};
29
30mod defaults;
31pub use defaults::Defaults;
32
33#[derive(Clone, Debug)]
68pub struct Proxy<'a> {
69 pub(crate) inner: Arc<ProxyInner<'a>>,
70}
71
72pub(crate) struct ProxyInnerStatic {
75 pub(crate) conn: Connection,
76 dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
77}
78
79impl fmt::Debug for ProxyInnerStatic {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 f.debug_struct("ProxyInnerStatic")
82 .field(
83 "dest_owner_change_match_rule",
84 &self.dest_owner_change_match_rule,
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90#[derive(Debug)]
91pub(crate) struct ProxyInner<'a> {
92 inner_without_borrows: ProxyInnerStatic,
93 pub(crate) destination: BusName<'a>,
94 pub(crate) path: ObjectPath<'a>,
95 pub(crate) interface: InterfaceName<'a>,
96
97 property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
99 uncached_properties: HashSet<Str<'a>>,
102}
103
104impl Drop for ProxyInnerStatic {
105 fn drop(&mut self) {
106 if let Some(rule) = self.dest_owner_change_match_rule.take() {
107 self.conn.queue_remove_match(rule);
108 }
109 }
110}
111
112pub struct PropertyChanged<'a, T> {
116 name: &'a str,
117 properties: Arc<PropertiesCache>,
118 proxy: Proxy<'a>,
119 phantom: std::marker::PhantomData<T>,
120}
121
122impl<T> PropertyChanged<'_, T> {
123 pub fn name(&self) -> &str {
125 self.name
126 }
127
128 pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
134 struct Wrapper<'w> {
135 name: &'w str,
136 values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
137 }
138
139 impl Deref for Wrapper<'_> {
140 type Target = Value<'static>;
141
142 fn deref(&self) -> &Self::Target {
143 self.values
144 .get(self.name)
145 .expect("PropertyStream with no corresponding property")
146 .value
147 .as_ref()
148 .expect("PropertyStream with no corresponding property")
149 }
150 }
151
152 {
153 let values = self.properties.values.read().expect("lock poisoned");
154 if values
155 .get(self.name)
156 .expect("PropertyStream with no corresponding property")
157 .value
158 .is_some()
159 {
160 return Ok(Wrapper {
161 name: self.name,
162 values,
163 });
164 }
165 }
166
167 let properties_proxy = self.proxy.properties_proxy();
169 let value = properties_proxy
170 .get(self.proxy.inner.interface.clone(), self.name)
171 .await
172 .map_err(crate::Error::from)?;
173
174 {
176 let mut values = self.properties.values.write().expect("lock poisoned");
177
178 values
179 .get_mut(self.name)
180 .expect("PropertyStream with no corresponding property")
181 .value = Some(value);
182 }
183
184 Ok(Wrapper {
185 name: self.name,
186 values: self.properties.values.read().expect("lock poisoned"),
187 })
188 }
189}
190
191impl<T> PropertyChanged<'_, T>
192where
193 T: TryFrom<zvariant::OwnedValue>,
194 T::Error: Into<crate::Error>,
195{
196 pub async fn get(&self) -> Result<T> {
202 self.get_raw()
203 .await
204 .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
205 }
206}
207
208#[derive(Debug)]
212pub struct PropertyStream<'a, T> {
213 name: &'a str,
214 proxy: Proxy<'a>,
215 changed_listener: EventListener,
216 phantom: std::marker::PhantomData<T>,
217}
218
219impl<'a, T> stream::Stream for PropertyStream<'a, T>
220where
221 T: Unpin,
222{
223 type Item = PropertyChanged<'a, T>;
224
225 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
226 let m = self.get_mut();
227 let properties = match m.proxy.get_property_cache() {
228 Some(properties) => properties.clone(),
229 None => return Poll::Ready(None),
231 };
232 ready!(Pin::new(&mut m.changed_listener).poll(cx));
233
234 m.changed_listener = properties
235 .values
236 .read()
237 .expect("lock poisoned")
238 .get(m.name)
239 .expect("PropertyStream with no corresponding property")
240 .event
241 .listen();
242
243 Poll::Ready(Some(PropertyChanged {
244 name: m.name,
245 properties,
246 proxy: m.proxy.clone(),
247 phantom: std::marker::PhantomData,
248 }))
249 }
250}
251
252#[derive(Debug)]
253pub(crate) struct PropertiesCache {
254 values: RwLock<HashMap<String, PropertyValue>>,
255 caching_result: RwLock<CachingResult>,
256}
257
258#[derive(Debug)]
259enum CachingResult {
260 Caching { ready: Event },
261 Cached { result: Result<()> },
262}
263
264impl PropertiesCache {
265 #[instrument(skip_all, level = "trace")]
266 fn new(
267 proxy: PropertiesProxy<'static>,
268 interface: InterfaceName<'static>,
269 executor: &Executor<'_>,
270 uncached_properties: HashSet<zvariant::Str<'static>>,
271 ) -> (Arc<Self>, Task<()>) {
272 let cache = Arc::new(PropertiesCache {
273 values: Default::default(),
274 caching_result: RwLock::new(CachingResult::Caching {
275 ready: Event::new(),
276 }),
277 });
278
279 let cache_clone = cache.clone();
280 let task_name = format!("{interface} proxy caching");
281 let proxy_caching = async move {
282 let result = cache_clone
283 .init(proxy, interface, uncached_properties)
284 .await;
285 let (prop_changes, interface, uncached_properties) = {
286 let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
287 let ready = match &*caching_result {
288 CachingResult::Caching { ready } => ready,
289 _ => unreachable!(),
292 };
293 match result {
294 Ok((prop_changes, interface, uncached_properties)) => {
295 ready.notify(usize::MAX);
296 *caching_result = CachingResult::Cached { result: Ok(()) };
297
298 (prop_changes, interface, uncached_properties)
299 }
300 Err(e) => {
301 warn!(
302 "Failed to populate properties cache via GetAll: {e}. \
303 Property change streams will not produce values."
304 );
305 ready.notify(usize::MAX);
306 *caching_result = CachingResult::Cached { result: Err(e) };
307
308 return;
309 }
310 }
311 };
312
313 if let Err(e) = cache_clone
314 .keep_updated(prop_changes, interface, uncached_properties)
315 .await
316 {
317 debug!("Error keeping properties cache updated: {e}");
318 }
319 }
320 .instrument(info_span!("{}", task_name));
321 let task = executor.spawn(proxy_caching, &task_name);
322
323 (cache, task)
324 }
325
326 async fn init(
328 &self,
329 proxy: PropertiesProxy<'static>,
330 interface: InterfaceName<'static>,
331 uncached_properties: HashSet<zvariant::Str<'static>>,
332 ) -> Result<(
333 PropertiesChangedStream,
334 InterfaceName<'static>,
335 HashSet<zvariant::Str<'static>>,
336 )> {
337 use ordered_stream::OrderedStreamExt;
338
339 let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
340
341 let get_all = proxy
342 .inner()
343 .connection()
344 .call_method_raw(
345 Some(proxy.inner().destination()),
346 proxy.inner().path(),
347 Some(proxy.inner().interface()),
348 "GetAll",
349 BitFlags::empty(),
350 &interface,
351 )
352 .await
353 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
354
355 let mut join = join_streams(prop_changes, get_all);
356
357 loop {
358 match join.next().await {
359 Some(Either::Left(_update)) => {
360 }
362 Some(Either::Right(populate)) => {
363 populate?.body().deserialize().map(|values| {
364 self.update_cache(&uncached_properties, &values, &[], &interface);
365 })?;
366 break;
367 }
368 None => break,
369 }
370 }
371 if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
372 if let Ok(args) = update.args() {
375 if args.interface_name == interface {
376 self.update_cache(
377 &uncached_properties,
378 &args.changed_properties,
379 &args.invalidated_properties,
380 &interface,
381 );
382 }
383 }
384 }
385 let prop_changes = join.into_inner().0.into_inner();
389
390 Ok((prop_changes, interface, uncached_properties))
391 }
392
393 #[instrument(skip_all, level = "trace")]
395 async fn keep_updated(
396 &self,
397 mut prop_changes: PropertiesChangedStream,
398 interface: InterfaceName<'static>,
399 uncached_properties: HashSet<zvariant::Str<'static>>,
400 ) -> Result<()> {
401 use futures_lite::StreamExt;
402
403 trace!("Listening for property changes on {interface}...");
404 while let Some(update) = prop_changes.next().await {
405 if let Ok(args) = update.args() {
406 if args.interface_name == interface {
407 self.update_cache(
408 &uncached_properties,
409 &args.changed_properties,
410 &args.invalidated_properties,
411 &interface,
412 );
413 }
414 }
415 }
416
417 Ok(())
418 }
419
420 fn update_cache(
421 &self,
422 uncached_properties: &HashSet<Str<'_>>,
423 changed: &HashMap<&str, Value<'_>>,
424 invalidated: &[&str],
425 interface: &InterfaceName<'_>,
426 ) {
427 let mut values = self.values.write().expect("lock poisoned");
428
429 for inval in invalidated {
430 if uncached_properties.contains(&Str::from(*inval)) {
431 debug!(
432 "Ignoring invalidation of uncached property `{}.{}`",
433 interface, inval
434 );
435 continue;
436 }
437 trace!("Property `{interface}.{inval}` invalidated");
438
439 if let Some(entry) = values.get_mut(*inval) {
440 entry.value = None;
441 entry.event.notify(usize::MAX);
442 }
443 }
444
445 for (property_name, value) in changed {
446 if uncached_properties.contains(&Str::from(*property_name)) {
447 debug!(
448 "Ignoring update of uncached property `{}.{}`",
449 interface, property_name
450 );
451 continue;
452 }
453 trace!("Property `{interface}.{property_name}` updated");
454
455 let entry = values.entry(property_name.to_string()).or_default();
456
457 let value = match OwnedValue::try_from(value) {
458 Ok(value) => value,
459 Err(e) => {
460 debug!(
461 "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
462 );
463 continue;
464 }
465 };
466 entry.value = Some(value);
467 entry.event.notify(usize::MAX);
468 }
469 }
470
471 pub(crate) async fn ready(&self) -> Result<()> {
473 let listener = match &*self.caching_result.read().expect("lock poisoned") {
474 CachingResult::Caching { ready } => ready.listen(),
475 CachingResult::Cached { result } => return result.clone(),
476 };
477 listener.await;
478
479 match &*self.caching_result.read().expect("lock poisoned") {
481 CachingResult::Caching { .. } => unreachable!(),
484 CachingResult::Cached { result } => result.clone(),
485 }
486 }
487}
488
489impl<'a> ProxyInner<'a> {
490 pub(crate) fn new(
491 conn: Connection,
492 destination: BusName<'a>,
493 path: ObjectPath<'a>,
494 interface: InterfaceName<'a>,
495 cache: CacheProperties,
496 uncached_properties: HashSet<Str<'a>>,
497 ) -> Self {
498 let property_cache = match cache {
499 CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
500 CacheProperties::No => None,
501 };
502 Self {
503 inner_without_borrows: ProxyInnerStatic {
504 conn,
505 dest_owner_change_match_rule: OnceLock::new(),
506 },
507 destination,
508 path,
509 interface,
510 property_cache,
511 uncached_properties,
512 }
513 }
514
515 pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
519 if !self.inner_without_borrows.conn.is_bus() {
520 return Ok(());
522 }
523
524 let well_known_name = match &self.destination {
525 BusName::WellKnown(well_known_name) => well_known_name,
526 BusName::Unique(_) => return Ok(()),
527 };
528
529 if self
530 .inner_without_borrows
531 .dest_owner_change_match_rule
532 .get()
533 .is_some()
534 {
535 return Ok(());
537 }
538
539 let conn = &self.inner_without_borrows.conn;
540 let signal_rule: OwnedMatchRule = MatchRule::builder()
541 .msg_type(Type::Signal)
542 .sender("org.freedesktop.DBus")?
543 .path("/org/freedesktop/DBus")?
544 .interface("org.freedesktop.DBus")?
545 .member("NameOwnerChanged")?
546 .add_arg(well_known_name.as_str())?
547 .build()
548 .to_owned()
549 .into();
550
551 conn.add_match(
552 signal_rule.clone(),
553 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
554 )
555 .await?;
556
557 if self
558 .inner_without_borrows
559 .dest_owner_change_match_rule
560 .set(signal_rule.clone())
561 .is_err()
562 {
563 conn.remove_match(signal_rule).await?;
565 }
566
567 Ok(())
568 }
569}
570
571const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
572
573impl<'a> Proxy<'a> {
574 pub async fn new<D, P, I>(
576 conn: &Connection,
577 destination: D,
578 path: P,
579 interface: I,
580 ) -> Result<Proxy<'a>>
581 where
582 D: TryInto<BusName<'a>>,
583 P: TryInto<ObjectPath<'a>>,
584 I: TryInto<InterfaceName<'a>>,
585 D::Error: Into<Error>,
586 P::Error: Into<Error>,
587 I::Error: Into<Error>,
588 {
589 Builder::new(conn)
590 .destination(destination)?
591 .path(path)?
592 .interface(interface)?
593 .build()
594 .await
595 }
596
597 pub async fn new_owned<D, P, I>(
600 conn: Connection,
601 destination: D,
602 path: P,
603 interface: I,
604 ) -> Result<Proxy<'a>>
605 where
606 D: TryInto<BusName<'static>>,
607 P: TryInto<ObjectPath<'static>>,
608 I: TryInto<InterfaceName<'static>>,
609 D::Error: Into<Error>,
610 P::Error: Into<Error>,
611 I::Error: Into<Error>,
612 {
613 Builder::new(&conn)
614 .destination(destination)?
615 .path(path)?
616 .interface(interface)?
617 .build()
618 .await
619 }
620
621 pub fn connection(&self) -> &Connection {
623 &self.inner.inner_without_borrows.conn
624 }
625
626 pub fn destination(&self) -> &BusName<'a> {
628 &self.inner.destination
629 }
630
631 pub fn path(&self) -> &ObjectPath<'a> {
633 &self.inner.path
634 }
635
636 pub fn interface(&self) -> &InterfaceName<'a> {
638 &self.inner.interface
639 }
640
641 pub async fn introspect(&self) -> fdo::Result<String> {
646 let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
647 .destination(&self.inner.destination)?
648 .path(&self.inner.path)?
649 .build()
650 .await?;
651
652 proxy.introspect().await
653 }
654
655 fn properties_proxy(&self) -> PropertiesProxy<'_> {
656 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
657 .destination(self.inner.destination.as_ref())
659 .unwrap()
660 .path(self.inner.path.as_ref())
662 .unwrap()
663 .cache_properties(CacheProperties::No)
665 .build_internal()
666 .unwrap()
667 .into()
668 }
669
670 fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
671 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
672 .destination(self.inner.destination.to_owned())
674 .unwrap()
675 .path(self.inner.path.to_owned())
677 .unwrap()
678 .cache_properties(CacheProperties::No)
680 .build_internal()
681 .unwrap()
682 .into()
683 }
684
685 pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
690 let cache = match &self.inner.property_cache {
691 Some(cache) => cache,
692 None => return None,
693 };
694 let (cache, _) = &cache.get_or_init(|| {
695 let proxy = self.owned_properties_proxy();
696 let interface = self.interface().to_owned();
697 let uncached_properties: HashSet<zvariant::Str<'static>> = self
698 .inner
699 .uncached_properties
700 .iter()
701 .map(|s| s.to_owned())
702 .collect();
703 let executor = self.connection().executor();
704
705 PropertiesCache::new(proxy, interface, executor, uncached_properties)
706 });
707
708 Some(cache)
709 }
710
711 pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
718 where
719 T: TryFrom<OwnedValue>,
720 T::Error: Into<Error>,
721 {
722 self.cached_property_raw(property_name)
723 .as_deref()
724 .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
725 .transpose()
726 }
727
728 pub fn cached_property_raw<'p>(
733 &'p self,
734 property_name: &'p str,
735 ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
736 if let Some(values) = self
737 .inner
738 .property_cache
739 .as_ref()
740 .and_then(OnceLock::get)
741 .map(|c| c.0.values.read().expect("lock poisoned"))
742 {
743 values
745 .get(property_name)
746 .and_then(|e| e.value.as_ref())?;
748
749 struct Wrapper<'a> {
750 values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
751 property_name: &'a str,
752 }
753
754 impl Deref for Wrapper<'_> {
755 type Target = Value<'static>;
756
757 fn deref(&self) -> &Self::Target {
758 self.values
759 .get(self.property_name)
760 .and_then(|e| e.value.as_ref())
761 .map(|v| v.deref())
762 .expect("inexistent property")
763 }
764 }
765
766 Some(Wrapper {
767 values,
768 property_name,
769 })
770 } else {
771 None
772 }
773 }
774
775 async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
776 Ok(self
777 .properties_proxy()
778 .get(self.inner.interface.as_ref(), property_name)
779 .await?)
780 }
781
782 pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
787 where
788 T: TryFrom<OwnedValue>,
789 T::Error: Into<Error>,
790 {
791 if let Some(cache) = self.get_property_cache() {
792 cache.ready().await?;
793 }
794 if let Some(value) = self.cached_property(property_name)? {
795 return Ok(value);
796 }
797
798 let value = self.get_proxy_property(property_name).await?;
799 value.try_into().map_err(Into::into)
800 }
801
802 pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
806 where
807 T: 't + Into<Value<'t>>,
808 {
809 self.properties_proxy()
810 .set(self.inner.interface.as_ref(), property_name, value.into())
811 .await
812 }
813
814 pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
822 where
823 M: TryInto<MemberName<'m>>,
824 M::Error: Into<Error>,
825 B: serde::ser::Serialize + zvariant::DynamicType,
826 {
827 self.inner
828 .inner_without_borrows
829 .conn
830 .call_method(
831 Some(&self.inner.destination),
832 self.inner.path.as_str(),
833 Some(&self.inner.interface),
834 method_name,
835 body,
836 )
837 .await
838 }
839
840 pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
846 where
847 M: TryInto<MemberName<'m>>,
848 M::Error: Into<Error>,
849 B: serde::ser::Serialize + zvariant::DynamicType,
850 R: for<'d> zvariant::DynamicDeserialize<'d>,
851 {
852 let reply = self.call_method(method_name, body).await?;
853
854 reply.body().deserialize()
855 }
856
857 pub async fn call_with_flags<'m, M, B, R>(
867 &self,
868 method_name: M,
869 flags: BitFlags<MethodFlags>,
870 body: &B,
871 ) -> Result<Option<R>>
872 where
873 M: TryInto<MemberName<'m>>,
874 M::Error: Into<Error>,
875 B: serde::ser::Serialize + zvariant::DynamicType,
876 R: for<'d> zvariant::DynamicDeserialize<'d>,
877 {
878 let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
879 match self
880 .inner
881 .inner_without_borrows
882 .conn
883 .call_method_raw(
884 Some(self.destination()),
885 self.path(),
886 Some(self.interface()),
887 method_name,
888 flags,
889 body,
890 )
891 .await?
892 {
893 Some(reply) => reply.await?.body().deserialize().map(Some),
894 None => Ok(None),
895 }
896 }
897
898 pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
902 where
903 M: TryInto<MemberName<'m>>,
904 M::Error: Into<Error>,
905 B: serde::ser::Serialize + zvariant::DynamicType,
906 {
907 self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
908 .await?;
909 Ok(())
910 }
911
912 pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
920 where
921 M: TryInto<MemberName<'m>>,
922 M::Error: Into<Error>,
923 {
924 self.receive_signal_with_args(signal_name, &[]).await
925 }
926
927 pub async fn receive_signal_with_args<'m, M>(
936 &self,
937 signal_name: M,
938 args: &[(u8, &str)],
939 ) -> Result<SignalStream<'m>>
940 where
941 M: TryInto<MemberName<'m>>,
942 M::Error: Into<Error>,
943 {
944 let signal_name = signal_name.try_into().map_err(Into::into)?;
945 self.receive_signals(Some(signal_name), args).await
946 }
947
948 async fn receive_signals<'m>(
949 &self,
950 signal_name: Option<MemberName<'m>>,
951 args: &[(u8, &str)],
952 ) -> Result<SignalStream<'m>> {
953 self.inner.subscribe_dest_owner_change().await?;
954
955 SignalStream::new(self.clone(), signal_name, args).await
956 }
957
958 pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
960 self.receive_signals(None, &[]).await
961 }
962
963 pub async fn receive_property_changed<'name: 'a, T>(
971 &self,
972 name: &'name str,
973 ) -> PropertyStream<'a, T> {
974 let properties = self.get_property_cache();
975 let changed_listener = if let Some(properties) = &properties {
976 let mut values = properties.values.write().expect("lock poisoned");
977 let entry = values
978 .entry(name.to_string())
979 .or_insert_with(PropertyValue::default);
980 let listener = entry.event.listen();
981 if entry.value.is_some() {
982 entry.event.notify(1);
983 }
984 listener
985 } else {
986 Event::new().listen()
987 };
988
989 PropertyStream {
990 name,
991 proxy: self.clone(),
992 changed_listener,
993 phantom: std::marker::PhantomData,
994 }
995 }
996
997 pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
1009 use ordered_stream::OrderedStreamExt;
1010 let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1011 .cache_properties(CacheProperties::No)
1012 .build()
1013 .await?;
1014 Ok(OwnerChangedStream {
1015 stream: dbus_proxy
1016 .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1017 .await?
1018 .map(Box::new(move |signal| {
1019 let args = signal.args().unwrap();
1020
1021 args.new_owner().as_ref().map(|owner| owner.to_owned())
1022 })),
1023 name: self.destination().clone(),
1024 })
1025 }
1026}
1027
1028#[derive(Debug, Default)]
1029struct PropertyValue {
1030 value: Option<OwnedValue>,
1031 event: Event,
1032}
1033
1034#[bitflags]
1036#[repr(u8)]
1037#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1038pub enum MethodFlags {
1039 NoReplyExpected = 0x1,
1048
1049 NoAutoStart = 0x2,
1057
1058 AllowInteractiveAuth = 0x4,
1064}
1065
1066impl From<MethodFlags> for Flags {
1067 fn from(method_flag: MethodFlags) -> Self {
1068 match method_flag {
1069 MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1070 MethodFlags::NoAutoStart => Self::NoAutoStart,
1071 MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1072 }
1073 }
1074}
1075
1076type OwnerChangedStreamMap = Map<
1077 fdo::NameOwnerChangedStream,
1078 Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1079>;
1080
1081pub struct OwnerChangedStream<'a> {
1085 stream: OwnerChangedStreamMap,
1086 name: BusName<'a>,
1087}
1088
1089impl<'a> OwnerChangedStream<'a> {
1090 pub fn name(&self) -> &BusName<'a> {
1092 &self.name
1093 }
1094}
1095
1096impl stream::Stream for OwnerChangedStream<'_> {
1097 type Item = Option<UniqueName<'static>>;
1098
1099 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1100 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1101 }
1102}
1103
1104impl OrderedStream for OwnerChangedStream<'_> {
1105 type Data = Option<UniqueName<'static>>;
1106 type Ordering = Sequence;
1107
1108 fn poll_next_before(
1109 self: Pin<&mut Self>,
1110 cx: &mut Context<'_>,
1111 before: Option<&Self::Ordering>,
1112 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1113 Pin::new(&mut self.get_mut().stream).poll_next_before(cx, before)
1114 }
1115}
1116
1117#[derive(Debug)]
1124pub struct SignalStream<'a> {
1125 stream: Join<MessageStream, Option<MessageStream>>,
1126 src_unique_name: Option<UniqueName<'static>>,
1127 signal_name: Option<MemberName<'a>>,
1128}
1129
1130impl<'a> SignalStream<'a> {
1131 pub fn name(&self) -> Option<&MemberName<'a>> {
1133 self.signal_name.as_ref()
1134 }
1135
1136 async fn new(
1137 proxy: Proxy<'_>,
1138 signal_name: Option<MemberName<'a>>,
1139 args: &[(u8, &str)],
1140 ) -> Result<SignalStream<'a>> {
1141 let mut rule_builder = MatchRule::builder()
1142 .msg_type(Type::Signal)
1143 .sender(proxy.destination())?
1144 .path(proxy.path())?
1145 .interface(proxy.interface())?;
1146 if let Some(name) = &signal_name {
1147 rule_builder = rule_builder.member(name)?;
1148 }
1149 for (i, arg) in args {
1150 rule_builder = rule_builder.arg(*i, *arg)?;
1151 }
1152 let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1153 let conn = proxy.connection();
1154
1155 let (src_unique_name, stream) = match proxy.destination().to_owned() {
1156 BusName::Unique(name) => (
1157 Some(name),
1158 join_streams(
1159 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1160 None,
1161 ),
1162 ),
1163 BusName::WellKnown(name) => {
1164 use ordered_stream::OrderedStreamExt;
1165
1166 let name_owner_changed_rule = MatchRule::builder()
1167 .msg_type(Type::Signal)
1168 .sender("org.freedesktop.DBus")?
1169 .path("/org/freedesktop/DBus")?
1170 .interface("org.freedesktop.DBus")?
1171 .member("NameOwnerChanged")?
1172 .add_arg(name.as_str())?
1173 .build();
1174 let name_owner_changed_stream = MessageStream::for_match_rule(
1175 name_owner_changed_rule,
1176 conn,
1177 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1178 )
1179 .await?
1180 .map(Either::Left);
1181
1182 let get_name_owner = conn
1183 .call_method_raw(
1184 Some("org.freedesktop.DBus"),
1185 "/org/freedesktop/DBus",
1186 Some("org.freedesktop.DBus"),
1187 "GetNameOwner",
1188 BitFlags::empty(),
1189 &name,
1190 )
1191 .await
1192 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1193
1194 let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1195
1196 let mut src_unique_name = loop {
1197 match join.next().await {
1198 Some(Either::Left(Ok(msg))) => {
1199 let signal = NameOwnerChanged::from_message(msg)
1200 .expect("`NameOwnerChanged` signal stream got wrong message");
1201 {
1202 break signal
1203 .args()
1204 .expect("`NameOwnerChanged` signal has no args")
1207 .new_owner()
1208 .as_ref()
1209 .map(UniqueName::to_owned);
1210 }
1211 }
1212 Some(Either::Left(Err(_))) => (),
1213 Some(Either::Right(Ok(response))) => {
1214 break Some(
1215 response.body().deserialize::<UniqueName<'_>>()?.to_owned(),
1216 );
1217 }
1218 Some(Either::Right(Err(e))) => {
1219 debug!("Failed to get owner of {name}: {e}");
1221
1222 break None;
1223 }
1224 None => {
1225 return Err(Error::InputOutput(
1226 std::io::Error::new(
1227 std::io::ErrorKind::BrokenPipe,
1228 "connection closed",
1229 )
1230 .into(),
1231 ));
1232 }
1233 }
1234 };
1235
1236 let (stream, _, queued) = join.into_inner();
1238 if let Some(msg) = queued.and_then(|e| match e.0 {
1239 Either::Left(Ok(msg)) => Some(msg),
1240 Either::Left(Err(_)) | Either::Right(_) => None,
1241 }) {
1242 if let Some(signal) = NameOwnerChanged::from_message(msg) {
1243 if let Ok(args) = signal.args() {
1244 match (args.name(), args.new_owner().deref()) {
1245 (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1246 src_unique_name = Some(new_owner.to_owned());
1247 }
1248 _ => (),
1249 }
1250 }
1251 }
1252 }
1253 let name_owner_changed_stream = stream.into_inner();
1254
1255 let stream = join_streams(
1256 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1257 Some(name_owner_changed_stream),
1258 );
1259
1260 (src_unique_name, stream)
1261 }
1262 };
1263
1264 Ok(SignalStream {
1265 stream,
1266 src_unique_name,
1267 signal_name,
1268 })
1269 }
1270
1271 fn filter(&mut self, msg: &Message) -> Result<bool> {
1272 let header = msg.header();
1273 let sender = header.sender();
1274 if sender == self.src_unique_name.as_ref() {
1275 return Ok(true);
1276 }
1277
1278 if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1280 let args = signal.args()?;
1281 self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1282 }
1283
1284 Ok(false)
1285 }
1286}
1287
1288impl stream::Stream for SignalStream<'_> {
1289 type Item = Message;
1290
1291 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1292 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1293 }
1294}
1295
1296impl OrderedStream for SignalStream<'_> {
1297 type Data = Message;
1298 type Ordering = Sequence;
1299
1300 fn poll_next_before(
1301 self: Pin<&mut Self>,
1302 cx: &mut Context<'_>,
1303 before: Option<&Self::Ordering>,
1304 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1305 let this = self.get_mut();
1306 loop {
1307 match ready!(OrderedStream::poll_next_before(
1308 Pin::new(&mut this.stream),
1309 cx,
1310 before
1311 )) {
1312 PollResult::Item { data, ordering } => {
1313 if let Ok(msg) = data {
1314 if let Ok(true) = this.filter(&msg) {
1315 return Poll::Ready(PollResult::Item {
1316 data: msg,
1317 ordering,
1318 });
1319 }
1320 }
1321 }
1322 PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1323 PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1324 }
1325 }
1326 }
1327}
1328
1329impl stream::FusedStream for SignalStream<'_> {
1330 fn is_terminated(&self) -> bool {
1331 ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1332 }
1333}
1334
1335#[async_trait::async_trait]
1336impl AsyncDrop for SignalStream<'_> {
1337 async fn async_drop(self) {
1338 let (signals, names, _buffered) = self.stream.into_inner();
1339 signals.async_drop().await;
1340 if let Some(names) = names {
1341 names.async_drop().await;
1342 }
1343 }
1344}
1345
1346#[cfg(feature = "blocking-api")]
1347impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1348 fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1349 proxy.into_inner()
1350 }
1351}
1352
1353pub trait ProxyImpl<'c>
1356where
1357 Self: Sized,
1358{
1359 fn builder(conn: &Connection) -> Builder<'c, Self>;
1361
1362 fn into_inner(self) -> Proxy<'c>;
1364
1365 fn inner(&self) -> &Proxy<'c>;
1367}
1368
1369enum Either<L, R> {
1370 Left(L),
1371 Right(R),
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377 use crate::{connection, interface, object_server::SignalEmitter, proxy, utils::block_on};
1378 use futures_util::StreamExt;
1379 use ntest::timeout;
1380 use test_log::test;
1381
1382 #[test]
1383 #[timeout(15000)]
1384 fn signal() {
1385 block_on(test_signal()).unwrap();
1386 }
1387
1388 async fn test_signal() -> Result<()> {
1389 let conn = Connection::session().await?;
1392 let dest_conn = Connection::session().await?;
1393 let unique_name = dest_conn.unique_name().unwrap().clone();
1394
1395 let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1396 let proxy: Proxy<'_> = Builder::new(&conn)
1397 .destination(well_known)?
1398 .path("/does/not/matter")?
1399 .interface("does.not.matter")?
1400 .build()
1401 .await?;
1402 let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1403
1404 let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1405 let mut name_acquired_stream = proxy
1406 .inner()
1407 .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1408 .await?;
1409
1410 let prop_stream = proxy
1411 .inner()
1412 .receive_property_changed("SomeProp")
1413 .await
1414 .filter_map(|changed| async move {
1415 let v: Option<u32> = changed.get().await.ok();
1416 dbg!(v)
1417 });
1418 drop(proxy);
1419 drop(prop_stream);
1420
1421 dest_conn.request_name(well_known).await?;
1422
1423 let (new_owner, acquired_signal) =
1424 futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1425
1426 assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1427
1428 let acquired_signal = acquired_signal.unwrap();
1429 assert_eq!(
1430 acquired_signal.body().deserialize::<&str>().unwrap(),
1431 well_known
1432 );
1433
1434 let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1435 let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1436
1437 drop(dest_conn);
1438 name_acquired_stream.async_drop().await;
1439
1440 let new_owner = owner_changed_stream.next().await;
1442 assert!(new_owner.unwrap().is_none());
1443
1444 let new_unique_owner = unique_name_changed_stream.next().await;
1445 assert!(new_unique_owner.unwrap().is_none());
1446
1447 Ok(())
1448 }
1449
1450 #[test]
1451 #[timeout(15000)]
1452 fn signal_stream_deadlock() {
1453 block_on(test_signal_stream_deadlock()).unwrap();
1454 }
1455
1456 async fn test_signal_stream_deadlock() -> Result<()> {
1464 #[proxy(
1465 gen_blocking = false,
1466 default_path = "/org/zbus/Test",
1467 default_service = "org.zbus.Test.MR501",
1468 interface = "org.zbus.Test"
1469 )]
1470 trait Test {
1471 #[zbus(signal)]
1472 fn my_signal(&self, msg: &str) -> Result<()>;
1473 }
1474
1475 struct TestIface;
1476
1477 #[interface(name = "org.zbus.Test")]
1478 impl TestIface {
1479 #[zbus(signal)]
1480 async fn my_signal(context: &SignalEmitter<'_>, msg: &'static str) -> Result<()>;
1481 }
1482
1483 let test_iface = TestIface;
1484 let server_conn = connection::Builder::session()?
1485 .name("org.zbus.Test.MR501")?
1486 .serve_at("/org/zbus/Test", test_iface)?
1487 .build()
1488 .await?;
1489
1490 let client_conn = connection::Builder::session()?
1491 .max_queued(1)
1492 .build()
1493 .await?;
1494
1495 let test_proxy = TestProxy::new(&client_conn).await?;
1496 let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1497 .destination("org.zbus.Test.MR501")?
1498 .path("/org/zbus/Test")?
1499 .build()
1500 .await?;
1501
1502 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1503
1504 let handle = {
1505 let tx = tx.clone();
1506 let conn = server_conn.clone();
1507 let server_fut = async move {
1508 use std::time::Duration;
1509
1510 #[cfg(not(feature = "tokio"))]
1511 use async_io::Timer;
1512
1513 #[cfg(feature = "tokio")]
1514 use tokio::time::sleep;
1515
1516 let iface_ref = conn
1517 .object_server()
1518 .interface::<_, TestIface>("/org/zbus/Test")
1519 .await
1520 .unwrap();
1521
1522 let context = iface_ref.signal_emitter();
1523 while !tx.is_closed() {
1524 for _ in 0..10 {
1525 TestIface::my_signal(context, "This is a test")
1526 .await
1527 .unwrap();
1528 }
1529
1530 #[cfg(not(feature = "tokio"))]
1531 Timer::after(Duration::from_millis(5)).await;
1532
1533 #[cfg(feature = "tokio")]
1534 sleep(Duration::from_millis(5)).await;
1535 }
1536 };
1537 server_conn.executor().spawn(server_fut, "server_task")
1538 };
1539
1540 let signal_fut = async {
1541 let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1542
1543 tx.send(()).await.unwrap();
1544
1545 while let Some(_signal) = signal_stream.next().await {}
1546 };
1547
1548 let prop_fut = async move {
1549 rx.recv().await.unwrap();
1550 let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1551 };
1552
1553 futures_util::pin_mut!(signal_fut);
1554 futures_util::pin_mut!(prop_fut);
1555
1556 futures_util::future::select(signal_fut, prop_fut).await;
1557
1558 handle.await?;
1559
1560 Ok(())
1561 }
1562}