1use enumflags2::{BitFlags, bitflags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use ordered_stream::{FromFuture, Join, Map, OrderedStream, PollResult, join as join_streams};
7use std::{
8 collections::{HashMap, HashSet},
9 fmt,
10 future::Future,
11 ops::Deref,
12 pin::Pin,
13 sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
14 task::{Context, Poll},
15};
16use tracing::{Instrument, debug, info_span, instrument, trace};
17
18use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
19use zvariant::{ObjectPath, OwnedValue, Str, Value};
20
21use crate::{
22 AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
23 fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
24 message::{Flags, Message, Sequence, Type},
25};
26
27mod builder;
28pub use builder::{Builder, CacheProperties};
29
30mod defaults;
31pub use defaults::Defaults;
32
33#[derive(Clone, Debug)]
68pub struct Proxy<'a> {
69 pub(crate) inner: Arc<ProxyInner<'a>>,
70}
71
72pub(crate) struct ProxyInnerStatic {
75 pub(crate) conn: Connection,
76 dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
77}
78
79impl fmt::Debug for ProxyInnerStatic {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 f.debug_struct("ProxyInnerStatic")
82 .field(
83 "dest_owner_change_match_rule",
84 &self.dest_owner_change_match_rule,
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90#[derive(Debug)]
91pub(crate) struct ProxyInner<'a> {
92 inner_without_borrows: ProxyInnerStatic,
93 pub(crate) destination: BusName<'a>,
94 pub(crate) path: ObjectPath<'a>,
95 pub(crate) interface: InterfaceName<'a>,
96
97 property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
99 uncached_properties: HashSet<Str<'a>>,
102}
103
104impl Drop for ProxyInnerStatic {
105 fn drop(&mut self) {
106 if let Some(rule) = self.dest_owner_change_match_rule.take() {
107 self.conn.queue_remove_match(rule);
108 }
109 }
110}
111
112pub struct PropertyChanged<'a, T> {
116 name: &'a str,
117 properties: Arc<PropertiesCache>,
118 proxy: Proxy<'a>,
119 phantom: std::marker::PhantomData<T>,
120}
121
122impl<T> PropertyChanged<'_, T> {
123 pub fn name(&self) -> &str {
125 self.name
126 }
127
128 pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
134 struct Wrapper<'w> {
135 name: &'w str,
136 values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
137 }
138
139 impl Deref for Wrapper<'_> {
140 type Target = Value<'static>;
141
142 fn deref(&self) -> &Self::Target {
143 self.values
144 .get(self.name)
145 .expect("PropertyStream with no corresponding property")
146 .value
147 .as_ref()
148 .expect("PropertyStream with no corresponding property")
149 }
150 }
151
152 {
153 let values = self.properties.values.read().expect("lock poisoned");
154 if values
155 .get(self.name)
156 .expect("PropertyStream with no corresponding property")
157 .value
158 .is_some()
159 {
160 return Ok(Wrapper {
161 name: self.name,
162 values,
163 });
164 }
165 }
166
167 let properties_proxy = self.proxy.properties_proxy();
169 let value = properties_proxy
170 .get(self.proxy.inner.interface.clone(), self.name)
171 .await
172 .map_err(crate::Error::from)?;
173
174 {
176 let mut values = self.properties.values.write().expect("lock poisoned");
177
178 values
179 .get_mut(self.name)
180 .expect("PropertyStream with no corresponding property")
181 .value = Some(value);
182 }
183
184 Ok(Wrapper {
185 name: self.name,
186 values: self.properties.values.read().expect("lock poisoned"),
187 })
188 }
189}
190
191impl<T> PropertyChanged<'_, T>
192where
193 T: TryFrom<zvariant::OwnedValue>,
194 T::Error: Into<crate::Error>,
195{
196 pub async fn get(&self) -> Result<T> {
202 self.get_raw()
203 .await
204 .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
205 }
206}
207
208#[derive(Debug)]
212pub struct PropertyStream<'a, T> {
213 name: &'a str,
214 proxy: Proxy<'a>,
215 changed_listener: EventListener,
216 phantom: std::marker::PhantomData<T>,
217}
218
219impl<'a, T> stream::Stream for PropertyStream<'a, T>
220where
221 T: Unpin,
222{
223 type Item = PropertyChanged<'a, T>;
224
225 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
226 let m = self.get_mut();
227 let properties = match m.proxy.get_property_cache() {
228 Some(properties) => properties.clone(),
229 None => return Poll::Ready(None),
231 };
232 ready!(Pin::new(&mut m.changed_listener).poll(cx));
233
234 m.changed_listener = properties
235 .values
236 .read()
237 .expect("lock poisoned")
238 .get(m.name)
239 .expect("PropertyStream with no corresponding property")
240 .event
241 .listen();
242
243 Poll::Ready(Some(PropertyChanged {
244 name: m.name,
245 properties,
246 proxy: m.proxy.clone(),
247 phantom: std::marker::PhantomData,
248 }))
249 }
250}
251
252#[derive(Debug)]
253pub(crate) struct PropertiesCache {
254 values: RwLock<HashMap<String, PropertyValue>>,
255 caching_result: RwLock<CachingResult>,
256}
257
258#[derive(Debug)]
259enum CachingResult {
260 Caching { ready: Event },
261 Cached { result: Result<()> },
262}
263
264impl PropertiesCache {
265 #[instrument(skip_all, level = "trace")]
266 fn new(
267 proxy: PropertiesProxy<'static>,
268 interface: InterfaceName<'static>,
269 executor: &Executor<'_>,
270 uncached_properties: HashSet<zvariant::Str<'static>>,
271 ) -> (Arc<Self>, Task<()>) {
272 let cache = Arc::new(PropertiesCache {
273 values: Default::default(),
274 caching_result: RwLock::new(CachingResult::Caching {
275 ready: Event::new(),
276 }),
277 });
278
279 let cache_clone = cache.clone();
280 let task_name = format!("{interface} proxy caching");
281 let proxy_caching = async move {
282 let result = cache_clone
283 .init(proxy, interface, uncached_properties)
284 .await;
285 let (prop_changes, interface, uncached_properties) = {
286 let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
287 let ready = match &*caching_result {
288 CachingResult::Caching { ready } => ready,
289 _ => unreachable!(),
292 };
293 match result {
294 Ok((prop_changes, interface, uncached_properties)) => {
295 ready.notify(usize::MAX);
296 *caching_result = CachingResult::Cached { result: Ok(()) };
297
298 (prop_changes, interface, uncached_properties)
299 }
300 Err(e) => {
301 ready.notify(usize::MAX);
302 *caching_result = CachingResult::Cached { result: Err(e) };
303
304 return;
305 }
306 }
307 };
308
309 if let Err(e) = cache_clone
310 .keep_updated(prop_changes, interface, uncached_properties)
311 .await
312 {
313 debug!("Error keeping properties cache updated: {e}");
314 }
315 }
316 .instrument(info_span!("{}", task_name));
317 let task = executor.spawn(proxy_caching, &task_name);
318
319 (cache, task)
320 }
321
322 async fn init(
324 &self,
325 proxy: PropertiesProxy<'static>,
326 interface: InterfaceName<'static>,
327 uncached_properties: HashSet<zvariant::Str<'static>>,
328 ) -> Result<(
329 PropertiesChangedStream,
330 InterfaceName<'static>,
331 HashSet<zvariant::Str<'static>>,
332 )> {
333 use ordered_stream::OrderedStreamExt;
334
335 let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
336
337 let get_all = proxy
338 .inner()
339 .connection()
340 .call_method_raw(
341 Some(proxy.inner().destination()),
342 proxy.inner().path(),
343 Some(proxy.inner().interface()),
344 "GetAll",
345 BitFlags::empty(),
346 &interface,
347 )
348 .await
349 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
350
351 let mut join = join_streams(prop_changes, get_all);
352
353 loop {
354 match join.next().await {
355 Some(Either::Left(_update)) => {
356 }
358 Some(Either::Right(populate)) => {
359 populate?.body().deserialize().map(|values| {
360 self.update_cache(&uncached_properties, &values, &[], &interface);
361 })?;
362 break;
363 }
364 None => break,
365 }
366 }
367 if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
368 if let Ok(args) = update.args() {
371 if args.interface_name == interface {
372 self.update_cache(
373 &uncached_properties,
374 &args.changed_properties,
375 &args.invalidated_properties,
376 &interface,
377 );
378 }
379 }
380 }
381 let prop_changes = join.into_inner().0.into_inner();
385
386 Ok((prop_changes, interface, uncached_properties))
387 }
388
389 #[instrument(skip_all, level = "trace")]
391 async fn keep_updated(
392 &self,
393 mut prop_changes: PropertiesChangedStream,
394 interface: InterfaceName<'static>,
395 uncached_properties: HashSet<zvariant::Str<'static>>,
396 ) -> Result<()> {
397 use futures_lite::StreamExt;
398
399 trace!("Listening for property changes on {interface}...");
400 while let Some(update) = prop_changes.next().await {
401 if let Ok(args) = update.args() {
402 if args.interface_name == interface {
403 self.update_cache(
404 &uncached_properties,
405 &args.changed_properties,
406 &args.invalidated_properties,
407 &interface,
408 );
409 }
410 }
411 }
412
413 Ok(())
414 }
415
416 fn update_cache(
417 &self,
418 uncached_properties: &HashSet<Str<'_>>,
419 changed: &HashMap<&str, Value<'_>>,
420 invalidated: &[&str],
421 interface: &InterfaceName<'_>,
422 ) {
423 let mut values = self.values.write().expect("lock poisoned");
424
425 for inval in invalidated {
426 if uncached_properties.contains(&Str::from(*inval)) {
427 debug!(
428 "Ignoring invalidation of uncached property `{}.{}`",
429 interface, inval
430 );
431 continue;
432 }
433 trace!("Property `{interface}.{inval}` invalidated");
434
435 if let Some(entry) = values.get_mut(*inval) {
436 entry.value = None;
437 entry.event.notify(usize::MAX);
438 }
439 }
440
441 for (property_name, value) in changed {
442 if uncached_properties.contains(&Str::from(*property_name)) {
443 debug!(
444 "Ignoring update of uncached property `{}.{}`",
445 interface, property_name
446 );
447 continue;
448 }
449 trace!("Property `{interface}.{property_name}` updated");
450
451 let entry = values.entry(property_name.to_string()).or_default();
452
453 let value = match OwnedValue::try_from(value) {
454 Ok(value) => value,
455 Err(e) => {
456 debug!(
457 "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
458 );
459 continue;
460 }
461 };
462 entry.value = Some(value);
463 entry.event.notify(usize::MAX);
464 }
465 }
466
467 pub(crate) async fn ready(&self) -> Result<()> {
469 let listener = match &*self.caching_result.read().expect("lock poisoned") {
470 CachingResult::Caching { ready } => ready.listen(),
471 CachingResult::Cached { result } => return result.clone(),
472 };
473 listener.await;
474
475 match &*self.caching_result.read().expect("lock poisoned") {
477 CachingResult::Caching { .. } => unreachable!(),
480 CachingResult::Cached { result } => result.clone(),
481 }
482 }
483}
484
485impl<'a> ProxyInner<'a> {
486 pub(crate) fn new(
487 conn: Connection,
488 destination: BusName<'a>,
489 path: ObjectPath<'a>,
490 interface: InterfaceName<'a>,
491 cache: CacheProperties,
492 uncached_properties: HashSet<Str<'a>>,
493 ) -> Self {
494 let property_cache = match cache {
495 CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
496 CacheProperties::No => None,
497 };
498 Self {
499 inner_without_borrows: ProxyInnerStatic {
500 conn,
501 dest_owner_change_match_rule: OnceLock::new(),
502 },
503 destination,
504 path,
505 interface,
506 property_cache,
507 uncached_properties,
508 }
509 }
510
511 pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
515 if !self.inner_without_borrows.conn.is_bus() {
516 return Ok(());
518 }
519
520 let well_known_name = match &self.destination {
521 BusName::WellKnown(well_known_name) => well_known_name,
522 BusName::Unique(_) => return Ok(()),
523 };
524
525 if self
526 .inner_without_borrows
527 .dest_owner_change_match_rule
528 .get()
529 .is_some()
530 {
531 return Ok(());
533 }
534
535 let conn = &self.inner_without_borrows.conn;
536 let signal_rule: OwnedMatchRule = MatchRule::builder()
537 .msg_type(Type::Signal)
538 .sender("org.freedesktop.DBus")?
539 .path("/org/freedesktop/DBus")?
540 .interface("org.freedesktop.DBus")?
541 .member("NameOwnerChanged")?
542 .add_arg(well_known_name.as_str())?
543 .build()
544 .to_owned()
545 .into();
546
547 conn.add_match(
548 signal_rule.clone(),
549 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
550 )
551 .await?;
552
553 if self
554 .inner_without_borrows
555 .dest_owner_change_match_rule
556 .set(signal_rule.clone())
557 .is_err()
558 {
559 conn.remove_match(signal_rule).await?;
561 }
562
563 Ok(())
564 }
565}
566
567const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
568
569impl<'a> Proxy<'a> {
570 pub async fn new<D, P, I>(
572 conn: &Connection,
573 destination: D,
574 path: P,
575 interface: I,
576 ) -> Result<Proxy<'a>>
577 where
578 D: TryInto<BusName<'a>>,
579 P: TryInto<ObjectPath<'a>>,
580 I: TryInto<InterfaceName<'a>>,
581 D::Error: Into<Error>,
582 P::Error: Into<Error>,
583 I::Error: Into<Error>,
584 {
585 Builder::new(conn)
586 .destination(destination)?
587 .path(path)?
588 .interface(interface)?
589 .build()
590 .await
591 }
592
593 pub async fn new_owned<D, P, I>(
596 conn: Connection,
597 destination: D,
598 path: P,
599 interface: I,
600 ) -> Result<Proxy<'a>>
601 where
602 D: TryInto<BusName<'static>>,
603 P: TryInto<ObjectPath<'static>>,
604 I: TryInto<InterfaceName<'static>>,
605 D::Error: Into<Error>,
606 P::Error: Into<Error>,
607 I::Error: Into<Error>,
608 {
609 Builder::new(&conn)
610 .destination(destination)?
611 .path(path)?
612 .interface(interface)?
613 .build()
614 .await
615 }
616
617 pub fn connection(&self) -> &Connection {
619 &self.inner.inner_without_borrows.conn
620 }
621
622 pub fn destination(&self) -> &BusName<'a> {
624 &self.inner.destination
625 }
626
627 pub fn path(&self) -> &ObjectPath<'a> {
629 &self.inner.path
630 }
631
632 pub fn interface(&self) -> &InterfaceName<'a> {
634 &self.inner.interface
635 }
636
637 pub async fn introspect(&self) -> fdo::Result<String> {
642 let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
643 .destination(&self.inner.destination)?
644 .path(&self.inner.path)?
645 .build()
646 .await?;
647
648 proxy.introspect().await
649 }
650
651 fn properties_proxy(&self) -> PropertiesProxy<'_> {
652 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
653 .destination(self.inner.destination.as_ref())
655 .unwrap()
656 .path(self.inner.path.as_ref())
658 .unwrap()
659 .cache_properties(CacheProperties::No)
661 .build_internal()
662 .unwrap()
663 .into()
664 }
665
666 fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
667 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
668 .destination(self.inner.destination.to_owned())
670 .unwrap()
671 .path(self.inner.path.to_owned())
673 .unwrap()
674 .cache_properties(CacheProperties::No)
676 .build_internal()
677 .unwrap()
678 .into()
679 }
680
681 pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
686 let cache = match &self.inner.property_cache {
687 Some(cache) => cache,
688 None => return None,
689 };
690 let (cache, _) = &cache.get_or_init(|| {
691 let proxy = self.owned_properties_proxy();
692 let interface = self.interface().to_owned();
693 let uncached_properties: HashSet<zvariant::Str<'static>> = self
694 .inner
695 .uncached_properties
696 .iter()
697 .map(|s| s.to_owned())
698 .collect();
699 let executor = self.connection().executor();
700
701 PropertiesCache::new(proxy, interface, executor, uncached_properties)
702 });
703
704 Some(cache)
705 }
706
707 pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
714 where
715 T: TryFrom<OwnedValue>,
716 T::Error: Into<Error>,
717 {
718 self.cached_property_raw(property_name)
719 .as_deref()
720 .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
721 .transpose()
722 }
723
724 pub fn cached_property_raw<'p>(
729 &'p self,
730 property_name: &'p str,
731 ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
732 if let Some(values) = self
733 .inner
734 .property_cache
735 .as_ref()
736 .and_then(OnceLock::get)
737 .map(|c| c.0.values.read().expect("lock poisoned"))
738 {
739 values
741 .get(property_name)
742 .and_then(|e| e.value.as_ref())?;
744
745 struct Wrapper<'a> {
746 values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
747 property_name: &'a str,
748 }
749
750 impl Deref for Wrapper<'_> {
751 type Target = Value<'static>;
752
753 fn deref(&self) -> &Self::Target {
754 self.values
755 .get(self.property_name)
756 .and_then(|e| e.value.as_ref())
757 .map(|v| v.deref())
758 .expect("inexistent property")
759 }
760 }
761
762 Some(Wrapper {
763 values,
764 property_name,
765 })
766 } else {
767 None
768 }
769 }
770
771 async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
772 Ok(self
773 .properties_proxy()
774 .get(self.inner.interface.as_ref(), property_name)
775 .await?)
776 }
777
778 pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
783 where
784 T: TryFrom<OwnedValue>,
785 T::Error: Into<Error>,
786 {
787 if let Some(cache) = self.get_property_cache() {
788 cache.ready().await?;
789 }
790 if let Some(value) = self.cached_property(property_name)? {
791 return Ok(value);
792 }
793
794 let value = self.get_proxy_property(property_name).await?;
795 value.try_into().map_err(Into::into)
796 }
797
798 pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
802 where
803 T: 't + Into<Value<'t>>,
804 {
805 self.properties_proxy()
806 .set(self.inner.interface.as_ref(), property_name, value.into())
807 .await
808 }
809
810 pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
818 where
819 M: TryInto<MemberName<'m>>,
820 M::Error: Into<Error>,
821 B: serde::ser::Serialize + zvariant::DynamicType,
822 {
823 self.inner
824 .inner_without_borrows
825 .conn
826 .call_method(
827 Some(&self.inner.destination),
828 self.inner.path.as_str(),
829 Some(&self.inner.interface),
830 method_name,
831 body,
832 )
833 .await
834 }
835
836 pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
842 where
843 M: TryInto<MemberName<'m>>,
844 M::Error: Into<Error>,
845 B: serde::ser::Serialize + zvariant::DynamicType,
846 R: for<'d> zvariant::DynamicDeserialize<'d>,
847 {
848 let reply = self.call_method(method_name, body).await?;
849
850 reply.body().deserialize()
851 }
852
853 pub async fn call_with_flags<'m, M, B, R>(
863 &self,
864 method_name: M,
865 flags: BitFlags<MethodFlags>,
866 body: &B,
867 ) -> Result<Option<R>>
868 where
869 M: TryInto<MemberName<'m>>,
870 M::Error: Into<Error>,
871 B: serde::ser::Serialize + zvariant::DynamicType,
872 R: for<'d> zvariant::DynamicDeserialize<'d>,
873 {
874 let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
875 match self
876 .inner
877 .inner_without_borrows
878 .conn
879 .call_method_raw(
880 Some(self.destination()),
881 self.path(),
882 Some(self.interface()),
883 method_name,
884 flags,
885 body,
886 )
887 .await?
888 {
889 Some(reply) => reply.await?.body().deserialize().map(Some),
890 None => Ok(None),
891 }
892 }
893
894 pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
898 where
899 M: TryInto<MemberName<'m>>,
900 M::Error: Into<Error>,
901 B: serde::ser::Serialize + zvariant::DynamicType,
902 {
903 self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
904 .await?;
905 Ok(())
906 }
907
908 pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
916 where
917 M: TryInto<MemberName<'m>>,
918 M::Error: Into<Error>,
919 {
920 self.receive_signal_with_args(signal_name, &[]).await
921 }
922
923 pub async fn receive_signal_with_args<'m, M>(
932 &self,
933 signal_name: M,
934 args: &[(u8, &str)],
935 ) -> Result<SignalStream<'m>>
936 where
937 M: TryInto<MemberName<'m>>,
938 M::Error: Into<Error>,
939 {
940 let signal_name = signal_name.try_into().map_err(Into::into)?;
941 self.receive_signals(Some(signal_name), args).await
942 }
943
944 async fn receive_signals<'m>(
945 &self,
946 signal_name: Option<MemberName<'m>>,
947 args: &[(u8, &str)],
948 ) -> Result<SignalStream<'m>> {
949 self.inner.subscribe_dest_owner_change().await?;
950
951 SignalStream::new(self.clone(), signal_name, args).await
952 }
953
954 pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
956 self.receive_signals(None, &[]).await
957 }
958
959 pub async fn receive_property_changed<'name: 'a, T>(
967 &self,
968 name: &'name str,
969 ) -> PropertyStream<'a, T> {
970 let properties = self.get_property_cache();
971 let changed_listener = if let Some(properties) = &properties {
972 let mut values = properties.values.write().expect("lock poisoned");
973 let entry = values
974 .entry(name.to_string())
975 .or_insert_with(PropertyValue::default);
976 let listener = entry.event.listen();
977 if entry.value.is_some() {
978 entry.event.notify(1);
979 }
980 listener
981 } else {
982 Event::new().listen()
983 };
984
985 PropertyStream {
986 name,
987 proxy: self.clone(),
988 changed_listener,
989 phantom: std::marker::PhantomData,
990 }
991 }
992
993 pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
1005 use ordered_stream::OrderedStreamExt;
1006 let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1007 .cache_properties(CacheProperties::No)
1008 .build()
1009 .await?;
1010 Ok(OwnerChangedStream {
1011 stream: dbus_proxy
1012 .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1013 .await?
1014 .map(Box::new(move |signal| {
1015 let args = signal.args().unwrap();
1016
1017 args.new_owner().as_ref().map(|owner| owner.to_owned())
1018 })),
1019 name: self.destination().clone(),
1020 })
1021 }
1022}
1023
1024#[derive(Debug, Default)]
1025struct PropertyValue {
1026 value: Option<OwnedValue>,
1027 event: Event,
1028}
1029
1030#[bitflags]
1032#[repr(u8)]
1033#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1034pub enum MethodFlags {
1035 NoReplyExpected = 0x1,
1044
1045 NoAutoStart = 0x2,
1053
1054 AllowInteractiveAuth = 0x4,
1060}
1061
1062impl From<MethodFlags> for Flags {
1063 fn from(method_flag: MethodFlags) -> Self {
1064 match method_flag {
1065 MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1066 MethodFlags::NoAutoStart => Self::NoAutoStart,
1067 MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1068 }
1069 }
1070}
1071
1072type OwnerChangedStreamMap = Map<
1073 fdo::NameOwnerChangedStream,
1074 Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1075>;
1076
1077pub struct OwnerChangedStream<'a> {
1081 stream: OwnerChangedStreamMap,
1082 name: BusName<'a>,
1083}
1084
1085impl<'a> OwnerChangedStream<'a> {
1086 pub fn name(&self) -> &BusName<'a> {
1088 &self.name
1089 }
1090}
1091
1092impl stream::Stream for OwnerChangedStream<'_> {
1093 type Item = Option<UniqueName<'static>>;
1094
1095 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1096 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1097 }
1098}
1099
1100impl OrderedStream for OwnerChangedStream<'_> {
1101 type Data = Option<UniqueName<'static>>;
1102 type Ordering = Sequence;
1103
1104 fn poll_next_before(
1105 self: Pin<&mut Self>,
1106 cx: &mut Context<'_>,
1107 before: Option<&Self::Ordering>,
1108 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1109 Pin::new(&mut self.get_mut().stream).poll_next_before(cx, before)
1110 }
1111}
1112
1113#[derive(Debug)]
1120pub struct SignalStream<'a> {
1121 stream: Join<MessageStream, Option<MessageStream>>,
1122 src_unique_name: Option<UniqueName<'static>>,
1123 signal_name: Option<MemberName<'a>>,
1124}
1125
1126impl<'a> SignalStream<'a> {
1127 pub fn name(&self) -> Option<&MemberName<'a>> {
1129 self.signal_name.as_ref()
1130 }
1131
1132 async fn new(
1133 proxy: Proxy<'_>,
1134 signal_name: Option<MemberName<'a>>,
1135 args: &[(u8, &str)],
1136 ) -> Result<SignalStream<'a>> {
1137 let mut rule_builder = MatchRule::builder()
1138 .msg_type(Type::Signal)
1139 .sender(proxy.destination())?
1140 .path(proxy.path())?
1141 .interface(proxy.interface())?;
1142 if let Some(name) = &signal_name {
1143 rule_builder = rule_builder.member(name)?;
1144 }
1145 for (i, arg) in args {
1146 rule_builder = rule_builder.arg(*i, *arg)?;
1147 }
1148 let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1149 let conn = proxy.connection();
1150
1151 let (src_unique_name, stream) = match proxy.destination().to_owned() {
1152 BusName::Unique(name) => (
1153 Some(name),
1154 join_streams(
1155 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1156 None,
1157 ),
1158 ),
1159 BusName::WellKnown(name) => {
1160 use ordered_stream::OrderedStreamExt;
1161
1162 let name_owner_changed_rule = MatchRule::builder()
1163 .msg_type(Type::Signal)
1164 .sender("org.freedesktop.DBus")?
1165 .path("/org/freedesktop/DBus")?
1166 .interface("org.freedesktop.DBus")?
1167 .member("NameOwnerChanged")?
1168 .add_arg(name.as_str())?
1169 .build();
1170 let name_owner_changed_stream = MessageStream::for_match_rule(
1171 name_owner_changed_rule,
1172 conn,
1173 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1174 )
1175 .await?
1176 .map(Either::Left);
1177
1178 let get_name_owner = conn
1179 .call_method_raw(
1180 Some("org.freedesktop.DBus"),
1181 "/org/freedesktop/DBus",
1182 Some("org.freedesktop.DBus"),
1183 "GetNameOwner",
1184 BitFlags::empty(),
1185 &name,
1186 )
1187 .await
1188 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1189
1190 let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1191
1192 let mut src_unique_name = loop {
1193 match join.next().await {
1194 Some(Either::Left(Ok(msg))) => {
1195 let signal = NameOwnerChanged::from_message(msg)
1196 .expect("`NameOwnerChanged` signal stream got wrong message");
1197 {
1198 break signal
1199 .args()
1200 .expect("`NameOwnerChanged` signal has no args")
1203 .new_owner()
1204 .as_ref()
1205 .map(UniqueName::to_owned);
1206 }
1207 }
1208 Some(Either::Left(Err(_))) => (),
1209 Some(Either::Right(Ok(response))) => {
1210 break Some(
1211 response.body().deserialize::<UniqueName<'_>>()?.to_owned(),
1212 );
1213 }
1214 Some(Either::Right(Err(e))) => {
1215 debug!("Failed to get owner of {name}: {e}");
1217
1218 break None;
1219 }
1220 None => {
1221 return Err(Error::InputOutput(
1222 std::io::Error::new(
1223 std::io::ErrorKind::BrokenPipe,
1224 "connection closed",
1225 )
1226 .into(),
1227 ));
1228 }
1229 }
1230 };
1231
1232 let (stream, _, queued) = join.into_inner();
1234 if let Some(msg) = queued.and_then(|e| match e.0 {
1235 Either::Left(Ok(msg)) => Some(msg),
1236 Either::Left(Err(_)) | Either::Right(_) => None,
1237 }) {
1238 if let Some(signal) = NameOwnerChanged::from_message(msg) {
1239 if let Ok(args) = signal.args() {
1240 match (args.name(), args.new_owner().deref()) {
1241 (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1242 src_unique_name = Some(new_owner.to_owned());
1243 }
1244 _ => (),
1245 }
1246 }
1247 }
1248 }
1249 let name_owner_changed_stream = stream.into_inner();
1250
1251 let stream = join_streams(
1252 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1253 Some(name_owner_changed_stream),
1254 );
1255
1256 (src_unique_name, stream)
1257 }
1258 };
1259
1260 Ok(SignalStream {
1261 stream,
1262 src_unique_name,
1263 signal_name,
1264 })
1265 }
1266
1267 fn filter(&mut self, msg: &Message) -> Result<bool> {
1268 let header = msg.header();
1269 let sender = header.sender();
1270 if sender == self.src_unique_name.as_ref() {
1271 return Ok(true);
1272 }
1273
1274 if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1276 let args = signal.args()?;
1277 self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1278 }
1279
1280 Ok(false)
1281 }
1282}
1283
1284impl stream::Stream for SignalStream<'_> {
1285 type Item = Message;
1286
1287 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1288 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1289 }
1290}
1291
1292impl OrderedStream for SignalStream<'_> {
1293 type Data = Message;
1294 type Ordering = Sequence;
1295
1296 fn poll_next_before(
1297 self: Pin<&mut Self>,
1298 cx: &mut Context<'_>,
1299 before: Option<&Self::Ordering>,
1300 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1301 let this = self.get_mut();
1302 loop {
1303 match ready!(OrderedStream::poll_next_before(
1304 Pin::new(&mut this.stream),
1305 cx,
1306 before
1307 )) {
1308 PollResult::Item { data, ordering } => {
1309 if let Ok(msg) = data {
1310 if let Ok(true) = this.filter(&msg) {
1311 return Poll::Ready(PollResult::Item {
1312 data: msg,
1313 ordering,
1314 });
1315 }
1316 }
1317 }
1318 PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1319 PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1320 }
1321 }
1322 }
1323}
1324
1325impl stream::FusedStream for SignalStream<'_> {
1326 fn is_terminated(&self) -> bool {
1327 ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1328 }
1329}
1330
1331#[async_trait::async_trait]
1332impl AsyncDrop for SignalStream<'_> {
1333 async fn async_drop(self) {
1334 let (signals, names, _buffered) = self.stream.into_inner();
1335 signals.async_drop().await;
1336 if let Some(names) = names {
1337 names.async_drop().await;
1338 }
1339 }
1340}
1341
1342#[cfg(feature = "blocking-api")]
1343impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1344 fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1345 proxy.into_inner()
1346 }
1347}
1348
1349pub trait ProxyImpl<'c>
1352where
1353 Self: Sized,
1354{
1355 fn builder(conn: &Connection) -> Builder<'c, Self>;
1357
1358 fn into_inner(self) -> Proxy<'c>;
1360
1361 fn inner(&self) -> &Proxy<'c>;
1363}
1364
1365enum Either<L, R> {
1366 Left(L),
1367 Right(R),
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use super::*;
1373 use crate::{connection, interface, object_server::SignalEmitter, proxy, utils::block_on};
1374 use futures_util::StreamExt;
1375 use ntest::timeout;
1376 use test_log::test;
1377
1378 #[test]
1379 #[timeout(15000)]
1380 fn signal() {
1381 block_on(test_signal()).unwrap();
1382 }
1383
1384 async fn test_signal() -> Result<()> {
1385 let conn = Connection::session().await?;
1388 let dest_conn = Connection::session().await?;
1389 let unique_name = dest_conn.unique_name().unwrap().clone();
1390
1391 let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1392 let proxy: Proxy<'_> = Builder::new(&conn)
1393 .destination(well_known)?
1394 .path("/does/not/matter")?
1395 .interface("does.not.matter")?
1396 .build()
1397 .await?;
1398 let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1399
1400 let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1401 let mut name_acquired_stream = proxy
1402 .inner()
1403 .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1404 .await?;
1405
1406 let prop_stream = proxy
1407 .inner()
1408 .receive_property_changed("SomeProp")
1409 .await
1410 .filter_map(|changed| async move {
1411 let v: Option<u32> = changed.get().await.ok();
1412 dbg!(v)
1413 });
1414 drop(proxy);
1415 drop(prop_stream);
1416
1417 dest_conn.request_name(well_known).await?;
1418
1419 let (new_owner, acquired_signal) =
1420 futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1421
1422 assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1423
1424 let acquired_signal = acquired_signal.unwrap();
1425 assert_eq!(
1426 acquired_signal.body().deserialize::<&str>().unwrap(),
1427 well_known
1428 );
1429
1430 let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1431 let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1432
1433 drop(dest_conn);
1434 name_acquired_stream.async_drop().await;
1435
1436 let new_owner = owner_changed_stream.next().await;
1438 assert!(new_owner.unwrap().is_none());
1439
1440 let new_unique_owner = unique_name_changed_stream.next().await;
1441 assert!(new_unique_owner.unwrap().is_none());
1442
1443 Ok(())
1444 }
1445
1446 #[test]
1447 #[timeout(15000)]
1448 fn signal_stream_deadlock() {
1449 block_on(test_signal_stream_deadlock()).unwrap();
1450 }
1451
1452 async fn test_signal_stream_deadlock() -> Result<()> {
1460 #[proxy(
1461 gen_blocking = false,
1462 default_path = "/org/zbus/Test",
1463 default_service = "org.zbus.Test.MR501",
1464 interface = "org.zbus.Test"
1465 )]
1466 trait Test {
1467 #[zbus(signal)]
1468 fn my_signal(&self, msg: &str) -> Result<()>;
1469 }
1470
1471 struct TestIface;
1472
1473 #[interface(name = "org.zbus.Test")]
1474 impl TestIface {
1475 #[zbus(signal)]
1476 async fn my_signal(context: &SignalEmitter<'_>, msg: &'static str) -> Result<()>;
1477 }
1478
1479 let test_iface = TestIface;
1480 let server_conn = connection::Builder::session()?
1481 .name("org.zbus.Test.MR501")?
1482 .serve_at("/org/zbus/Test", test_iface)?
1483 .build()
1484 .await?;
1485
1486 let client_conn = connection::Builder::session()?
1487 .max_queued(1)
1488 .build()
1489 .await?;
1490
1491 let test_proxy = TestProxy::new(&client_conn).await?;
1492 let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1493 .destination("org.zbus.Test.MR501")?
1494 .path("/org/zbus/Test")?
1495 .build()
1496 .await?;
1497
1498 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1499
1500 let handle = {
1501 let tx = tx.clone();
1502 let conn = server_conn.clone();
1503 let server_fut = async move {
1504 use std::time::Duration;
1505
1506 #[cfg(not(feature = "tokio"))]
1507 use async_io::Timer;
1508
1509 #[cfg(feature = "tokio")]
1510 use tokio::time::sleep;
1511
1512 let iface_ref = conn
1513 .object_server()
1514 .interface::<_, TestIface>("/org/zbus/Test")
1515 .await
1516 .unwrap();
1517
1518 let context = iface_ref.signal_emitter();
1519 while !tx.is_closed() {
1520 for _ in 0..10 {
1521 TestIface::my_signal(context, "This is a test")
1522 .await
1523 .unwrap();
1524 }
1525
1526 #[cfg(not(feature = "tokio"))]
1527 Timer::after(Duration::from_millis(5)).await;
1528
1529 #[cfg(feature = "tokio")]
1530 sleep(Duration::from_millis(5)).await;
1531 }
1532 };
1533 server_conn.executor().spawn(server_fut, "server_task")
1534 };
1535
1536 let signal_fut = async {
1537 let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1538
1539 tx.send(()).await.unwrap();
1540
1541 while let Some(_signal) = signal_stream.next().await {}
1542 };
1543
1544 let prop_fut = async move {
1545 rx.recv().await.unwrap();
1546 let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1547 };
1548
1549 futures_util::pin_mut!(signal_fut);
1550 futures_util::pin_mut!(prop_fut);
1551
1552 futures_util::future::select(signal_fut, prop_fut).await;
1553
1554 handle.await?;
1555
1556 Ok(())
1557 }
1558}