1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{StreamExt, stream::FusedStream};
14use glib::{
15 ControlFlow,
16 ffi::{gboolean, gpointer},
17 prelude::*,
18 source::Priority,
19 translate::*,
20};
21
22use crate::{Bus, BusSyncReply, Message, MessageType, ffi};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25 bus: *mut ffi::GstBus,
26 msg: *mut ffi::GstMessage,
27 func: gpointer,
28) -> gboolean {
29 unsafe {
30 let func: &mut F = &mut *(func as *mut F);
31 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
32 }
33}
34
35unsafe extern "C" fn destroy_closure_watch<
36 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
37>(
38 ptr: gpointer,
39) {
40 unsafe {
41 let _ = Box::<F>::from_raw(ptr as *mut _);
42 }
43}
44
45fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
46 #[allow(clippy::type_complexity)]
47 let func: Box<F> = Box::new(func);
48 Box::into_raw(func) as gpointer
49}
50
51unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
52 bus: *mut ffi::GstBus,
53 msg: *mut ffi::GstMessage,
54 func: gpointer,
55) -> gboolean {
56 unsafe {
57 let func: &mut glib::thread_guard::ThreadGuard<F> =
58 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
59 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
60 }
61}
62
63unsafe extern "C" fn destroy_closure_watch_local<
64 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
65>(
66 ptr: gpointer,
67) {
68 unsafe {
69 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
70 }
71}
72
73fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
74 #[allow(clippy::type_complexity)]
75 let func: Box<glib::thread_guard::ThreadGuard<F>> =
76 Box::new(glib::thread_guard::ThreadGuard::new(func));
77 Box::into_raw(func) as gpointer
78}
79
80unsafe extern "C" fn trampoline_sync<
81 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
82>(
83 bus: *mut ffi::GstBus,
84 msg: *mut ffi::GstMessage,
85 func: gpointer,
86) -> ffi::GstBusSyncReply {
87 unsafe {
88 let f: &F = &*(func as *const F);
89 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
90
91 if res == ffi::GST_BUS_DROP {
92 ffi::gst_mini_object_unref(msg as *mut _);
93 }
94
95 res
96 }
97}
98
99unsafe extern "C" fn destroy_closure_sync<
100 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
101>(
102 ptr: gpointer,
103) {
104 unsafe {
105 let _ = Box::<F>::from_raw(ptr as *mut _);
106 }
107}
108
109fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
110 func: F,
111) -> gpointer {
112 let func: Box<F> = Box::new(func);
113 Box::into_raw(func) as gpointer
114}
115
116impl Bus {
117 #[doc(alias = "gst_bus_add_signal_watch")]
118 #[doc(alias = "gst_bus_add_signal_watch_full")]
119 pub fn add_signal_watch_full(&self, priority: Priority) {
120 unsafe {
121 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
122 }
123 }
124
125 #[doc(alias = "gst_bus_create_watch")]
126 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
127 where
128 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
129 {
130 skip_assert_initialized!();
131 unsafe {
132 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
133 glib::ffi::g_source_set_callback(
134 source,
135 Some(transmute::<
136 *mut (),
137 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
138 >(trampoline_watch::<F> as *mut ())),
139 into_raw_watch(func),
140 Some(destroy_closure_watch::<F>),
141 );
142 glib::ffi::g_source_set_priority(source, priority.into_glib());
143
144 if let Some(name) = name {
145 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
146 }
147
148 from_glib_full(source)
149 }
150 }
151
152 #[doc(alias = "gst_bus_add_watch")]
153 #[doc(alias = "gst_bus_add_watch_full")]
154 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
155 where
156 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
157 {
158 unsafe {
159 let res = ffi::gst_bus_add_watch_full(
160 self.to_glib_none().0,
161 glib::ffi::G_PRIORITY_DEFAULT,
162 Some(trampoline_watch::<F>),
163 into_raw_watch(func),
164 Some(destroy_closure_watch::<F>),
165 );
166
167 if res == 0 {
168 Err(glib::bool_error!("Bus already has a watch"))
169 } else {
170 Ok(BusWatchGuard { bus: self.clone() })
171 }
172 }
173 }
174
175 #[doc(alias = "gst_bus_add_watch")]
176 #[doc(alias = "gst_bus_add_watch_full")]
177 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
178 where
179 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
180 {
181 unsafe {
182 let ctx = glib::MainContext::ref_thread_default();
183 let _acquire = ctx
184 .acquire()
185 .expect("thread default main context already acquired by another thread");
186
187 let res = ffi::gst_bus_add_watch_full(
188 self.to_glib_none().0,
189 glib::ffi::G_PRIORITY_DEFAULT,
190 Some(trampoline_watch_local::<F>),
191 into_raw_watch_local(func),
192 Some(destroy_closure_watch_local::<F>),
193 );
194
195 if res == 0 {
196 Err(glib::bool_error!("Bus already has a watch"))
197 } else {
198 Ok(BusWatchGuard { bus: self.clone() })
199 }
200 }
201 }
202
203 #[doc(alias = "gst_bus_set_sync_handler")]
204 pub fn set_sync_handler<F>(&self, func: F)
205 where
206 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
207 {
208 unsafe {
209 let bus = self.to_glib_none().0;
210
211 #[allow(clippy::manual_dangling_ptr)]
212 #[cfg(not(feature = "v1_18"))]
213 {
214 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
215 std::sync::OnceLock::new();
216
217 let set_once_quark = SET_ONCE_QUARK
218 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
219
220 if crate::version() < (1, 16, 3, 0) {
223 if !glib::gobject_ffi::g_object_get_qdata(
224 bus as *mut _,
225 set_once_quark.into_glib(),
226 )
227 .is_null()
228 {
229 panic!("Bus sync handler can only be set once");
230 }
231
232 glib::gobject_ffi::g_object_set_qdata(
233 bus as *mut _,
234 set_once_quark.into_glib(),
235 1 as *mut _,
236 );
237 }
238 }
239
240 ffi::gst_bus_set_sync_handler(
241 bus,
242 Some(trampoline_sync::<F>),
243 into_raw_sync(func),
244 Some(destroy_closure_sync::<F>),
245 )
246 }
247 }
248
249 pub fn unset_sync_handler(&self) {
250 #[cfg(not(feature = "v1_18"))]
251 {
252 if crate::version() < (1, 16, 3, 0) {
255 return;
256 }
257 }
258
259 unsafe {
260 use std::ptr;
261
262 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
263 }
264 }
265
266 #[doc(alias = "gst_bus_pop")]
267 pub fn iter(&self) -> Iter<'_> {
268 self.iter_timed(Some(crate::ClockTime::ZERO))
269 }
270
271 #[doc(alias = "gst_bus_timed_pop")]
272 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter<'_> {
273 Iter {
274 bus: self,
275 timeout: timeout.into(),
276 }
277 }
278
279 #[doc(alias = "gst_bus_pop_filtered")]
280 pub fn iter_filtered<'a>(
281 &'a self,
282 msg_types: &'a [MessageType],
283 ) -> impl Iterator<Item = Message> + 'a {
284 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
285 }
286
287 #[doc(alias = "gst_bus_timed_pop_filtered")]
288 pub fn iter_timed_filtered<'a>(
289 &'a self,
290 timeout: impl Into<Option<crate::ClockTime>>,
291 msg_types: &'a [MessageType],
292 ) -> impl Iterator<Item = Message> + 'a {
293 self.iter_timed(timeout)
294 .filter(move |msg| msg_types.contains(&msg.type_()))
295 }
296
297 #[doc(alias = "gst_bus_timed_pop_filtered")]
298 pub fn timed_pop_filtered(
299 &self,
300 timeout: impl Into<Option<crate::ClockTime>>,
301 msg_types: &[MessageType],
302 ) -> Option<Message> {
303 let Some(timeout) = timeout.into() else {
305 loop {
306 let msg = self.timed_pop(None)?;
307 if msg_types.contains(&msg.type_()) {
308 return Some(msg);
309 }
310 }
311 };
312
313 let total = timeout;
315 let start = std::time::Instant::now();
316
317 loop {
318 let elapsed = crate::ClockTime::from_nseconds(start.elapsed().as_nanos() as u64);
319
320 let remaining = total.checked_sub(elapsed)?;
322
323 let msg = self.timed_pop(Some(remaining))?;
324
325 if msg_types.contains(&msg.type_()) {
326 return Some(msg);
327 }
328
329 }
331 }
332
333 #[doc(alias = "gst_bus_pop_filtered")]
334 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
335 loop {
336 let msg = self.pop()?;
337 if msg_types.contains(&msg.type_()) {
338 return Some(msg);
339 }
340 }
341 }
342
343 pub fn stream(&self) -> BusStream {
344 BusStream::new(self)
345 }
346
347 pub fn stream_filtered<'a>(
348 &self,
349 message_types: &'a [MessageType],
350 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a + use<'a> {
351 self.stream().filter(move |message| {
352 let message_type = message.type_();
353
354 future::ready(message_types.contains(&message_type))
355 })
356 }
357}
358
359#[must_use = "iterators are lazy and do nothing unless consumed"]
360#[derive(Debug)]
361pub struct Iter<'a> {
362 bus: &'a Bus,
363 timeout: Option<crate::ClockTime>,
364}
365
366impl Iterator for Iter<'_> {
367 type Item = Message;
368
369 fn next(&mut self) -> Option<Message> {
370 self.bus.timed_pop(self.timeout)
371 }
372}
373
374#[derive(Debug)]
375pub struct BusStream {
376 bus: glib::WeakRef<Bus>,
377 receiver: UnboundedReceiver<Message>,
378}
379
380impl BusStream {
381 fn new(bus: &Bus) -> Self {
382 skip_assert_initialized!();
383
384 let mutex = Arc::new(Mutex::new(()));
385 let (sender, receiver) = mpsc::unbounded();
386
387 let _mutex_guard = mutex.lock().unwrap();
393 bus.set_sync_handler({
394 let sender = sender.clone();
395 let mutex = mutex.clone();
396
397 move |_bus, message| {
398 let _mutex_guard = mutex.lock().unwrap();
399
400 let _ = sender.unbounded_send(message.to_owned());
401
402 BusSyncReply::Drop
403 }
404 });
405
406 while let Some(message) = bus.pop() {
408 let _ = sender.unbounded_send(message);
409 }
410
411 Self {
412 bus: bus.downgrade(),
413 receiver,
414 }
415 }
416}
417
418impl Drop for BusStream {
419 fn drop(&mut self) {
420 if let Some(bus) = self.bus.upgrade() {
421 bus.unset_sync_handler();
422 }
423 }
424}
425
426impl Stream for BusStream {
427 type Item = Message;
428
429 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
430 self.receiver.poll_next_unpin(context)
431 }
432}
433
434impl FusedStream for BusStream {
435 fn is_terminated(&self) -> bool {
436 self.receiver.is_terminated()
437 }
438}
439
440#[derive(Debug)]
445#[must_use = "if unused the bus watch will immediately be removed"]
446pub struct BusWatchGuard {
447 bus: Bus,
448}
449
450impl Drop for BusWatchGuard {
451 fn drop(&mut self) {
452 let _ = self.bus.remove_watch();
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::{
459 sync::{Arc, Barrier, Mutex},
460 thread,
461 time::{Duration, Instant},
462 };
463
464 use super::*;
465
466 #[test]
467 fn test_sync_handler() {
468 crate::init().unwrap();
469
470 let bus = Bus::new();
471 let msgs = Arc::new(Mutex::new(Vec::new()));
472 let msgs_clone = msgs.clone();
473 bus.set_sync_handler(move |_, msg| {
474 msgs_clone.lock().unwrap().push(msg.clone());
475 BusSyncReply::Pass
476 });
477
478 bus.post(crate::message::Eos::new()).unwrap();
479
480 let msgs = msgs.lock().unwrap();
481 assert_eq!(msgs.len(), 1);
482 match msgs[0].view() {
483 crate::MessageView::Eos(_) => (),
484 _ => unreachable!(),
485 }
486 }
487
488 #[test]
489 fn test_bus_stream() {
490 crate::init().unwrap();
491
492 let bus = Bus::new();
493 let bus_stream = bus.stream();
494
495 let eos_message = crate::message::Eos::new();
496 bus.post(eos_message).unwrap();
497
498 let bus_future = StreamExt::into_future(bus_stream);
499 let (message, _) = futures_executor::block_on(bus_future);
500
501 match message.unwrap().view() {
502 crate::MessageView::Eos(_) => (),
503 _ => unreachable!(),
504 }
505 }
506
507 #[test]
508 fn test_bus_timed_pop_filtered_does_not_restart_timeout_for_filtered_messages() {
509 crate::init().unwrap();
510
511 let bus = Bus::new();
512 let bus_clone = bus.clone();
513
514 let timeout = Duration::from_millis(100);
515 let timeout_allowance = Duration::from_millis(50);
516
517 let barrier = Arc::new(Barrier::new(2));
518 let barrier_clone = barrier.clone();
519
520 let message_poster = thread::spawn(move || {
522 barrier_clone.wait();
524
525 let start = Instant::now();
526 while start.elapsed() < timeout * 3 {
527 bus_clone
528 .post(crate::message::DurationChanged::new())
529 .unwrap();
530 thread::sleep(Duration::from_millis(10));
531 }
532 });
533
534 barrier.wait();
536
537 let start = Instant::now();
538
539 let msg = bus.timed_pop_filtered(
540 crate::ClockTime::from_mseconds(timeout.as_millis() as u64),
541 &[MessageType::Eos],
542 );
543
544 let elapsed = start.elapsed();
545
546 assert!(msg.is_none());
547
548 assert!(
552 elapsed < timeout + timeout_allowance,
553 "timed_pop_filtered exceeded the requested timeout while filtering"
554 );
555
556 message_poster
557 .join()
558 .expect("message_poster thread panicked");
559 }
560}