gstreamer/
bus.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future,
5    mem::transmute,
6    pin::Pin,
7    sync::{Arc, Mutex},
8    task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{StreamExt, stream::FusedStream};
14use glib::{
15    ControlFlow,
16    ffi::{gboolean, gpointer},
17    prelude::*,
18    source::Priority,
19    translate::*,
20};
21
22use crate::{Bus, BusSyncReply, Message, MessageType, ffi};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25    bus: *mut ffi::GstBus,
26    msg: *mut ffi::GstMessage,
27    func: gpointer,
28) -> gboolean {
29    unsafe {
30        let func: &mut F = &mut *(func as *mut F);
31        func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
32    }
33}
34
35unsafe extern "C" fn destroy_closure_watch<
36    F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
37>(
38    ptr: gpointer,
39) {
40    unsafe {
41        let _ = Box::<F>::from_raw(ptr as *mut _);
42    }
43}
44
45fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
46    #[allow(clippy::type_complexity)]
47    let func: Box<F> = Box::new(func);
48    Box::into_raw(func) as gpointer
49}
50
51unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
52    bus: *mut ffi::GstBus,
53    msg: *mut ffi::GstMessage,
54    func: gpointer,
55) -> gboolean {
56    unsafe {
57        let func: &mut glib::thread_guard::ThreadGuard<F> =
58            &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
59        (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
60    }
61}
62
63unsafe extern "C" fn destroy_closure_watch_local<
64    F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
65>(
66    ptr: gpointer,
67) {
68    unsafe {
69        let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
70    }
71}
72
73fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
74    #[allow(clippy::type_complexity)]
75    let func: Box<glib::thread_guard::ThreadGuard<F>> =
76        Box::new(glib::thread_guard::ThreadGuard::new(func));
77    Box::into_raw(func) as gpointer
78}
79
80unsafe extern "C" fn trampoline_sync<
81    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
82>(
83    bus: *mut ffi::GstBus,
84    msg: *mut ffi::GstMessage,
85    func: gpointer,
86) -> ffi::GstBusSyncReply {
87    unsafe {
88        let f: &F = &*(func as *const F);
89        let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
90
91        if res == ffi::GST_BUS_DROP {
92            ffi::gst_mini_object_unref(msg as *mut _);
93        }
94
95        res
96    }
97}
98
99unsafe extern "C" fn destroy_closure_sync<
100    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
101>(
102    ptr: gpointer,
103) {
104    unsafe {
105        let _ = Box::<F>::from_raw(ptr as *mut _);
106    }
107}
108
109fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
110    func: F,
111) -> gpointer {
112    let func: Box<F> = Box::new(func);
113    Box::into_raw(func) as gpointer
114}
115
116impl Bus {
117    #[doc(alias = "gst_bus_add_signal_watch")]
118    #[doc(alias = "gst_bus_add_signal_watch_full")]
119    pub fn add_signal_watch_full(&self, priority: Priority) {
120        unsafe {
121            ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
122        }
123    }
124
125    #[doc(alias = "gst_bus_create_watch")]
126    pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
127    where
128        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
129    {
130        skip_assert_initialized!();
131        unsafe {
132            let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
133            glib::ffi::g_source_set_callback(
134                source,
135                Some(transmute::<
136                    *mut (),
137                    unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
138                >(trampoline_watch::<F> as *mut ())),
139                into_raw_watch(func),
140                Some(destroy_closure_watch::<F>),
141            );
142            glib::ffi::g_source_set_priority(source, priority.into_glib());
143
144            if let Some(name) = name {
145                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
146            }
147
148            from_glib_full(source)
149        }
150    }
151
152    #[doc(alias = "gst_bus_add_watch")]
153    #[doc(alias = "gst_bus_add_watch_full")]
154    pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
155    where
156        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
157    {
158        unsafe {
159            let res = ffi::gst_bus_add_watch_full(
160                self.to_glib_none().0,
161                glib::ffi::G_PRIORITY_DEFAULT,
162                Some(trampoline_watch::<F>),
163                into_raw_watch(func),
164                Some(destroy_closure_watch::<F>),
165            );
166
167            if res == 0 {
168                Err(glib::bool_error!("Bus already has a watch"))
169            } else {
170                Ok(BusWatchGuard { bus: self.clone() })
171            }
172        }
173    }
174
175    #[doc(alias = "gst_bus_add_watch")]
176    #[doc(alias = "gst_bus_add_watch_full")]
177    pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
178    where
179        F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
180    {
181        unsafe {
182            let ctx = glib::MainContext::ref_thread_default();
183            let _acquire = ctx
184                .acquire()
185                .expect("thread default main context already acquired by another thread");
186
187            let res = ffi::gst_bus_add_watch_full(
188                self.to_glib_none().0,
189                glib::ffi::G_PRIORITY_DEFAULT,
190                Some(trampoline_watch_local::<F>),
191                into_raw_watch_local(func),
192                Some(destroy_closure_watch_local::<F>),
193            );
194
195            if res == 0 {
196                Err(glib::bool_error!("Bus already has a watch"))
197            } else {
198                Ok(BusWatchGuard { bus: self.clone() })
199            }
200        }
201    }
202
203    #[doc(alias = "gst_bus_set_sync_handler")]
204    pub fn set_sync_handler<F>(&self, func: F)
205    where
206        F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
207    {
208        unsafe {
209            let bus = self.to_glib_none().0;
210
211            #[allow(clippy::manual_dangling_ptr)]
212            #[cfg(not(feature = "v1_18"))]
213            {
214                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
215                    std::sync::OnceLock::new();
216
217                let set_once_quark = SET_ONCE_QUARK
218                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
219
220                // This is not thread-safe before 1.16.3, see
221                // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
222                if crate::version() < (1, 16, 3, 0) {
223                    if !glib::gobject_ffi::g_object_get_qdata(
224                        bus as *mut _,
225                        set_once_quark.into_glib(),
226                    )
227                    .is_null()
228                    {
229                        panic!("Bus sync handler can only be set once");
230                    }
231
232                    glib::gobject_ffi::g_object_set_qdata(
233                        bus as *mut _,
234                        set_once_quark.into_glib(),
235                        1 as *mut _,
236                    );
237                }
238            }
239
240            ffi::gst_bus_set_sync_handler(
241                bus,
242                Some(trampoline_sync::<F>),
243                into_raw_sync(func),
244                Some(destroy_closure_sync::<F>),
245            )
246        }
247    }
248
249    pub fn unset_sync_handler(&self) {
250        #[cfg(not(feature = "v1_18"))]
251        {
252            // This is not thread-safe before 1.16.3, see
253            // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
254            if crate::version() < (1, 16, 3, 0) {
255                return;
256            }
257        }
258
259        unsafe {
260            use std::ptr;
261
262            ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
263        }
264    }
265
266    #[doc(alias = "gst_bus_pop")]
267    pub fn iter(&self) -> Iter<'_> {
268        self.iter_timed(Some(crate::ClockTime::ZERO))
269    }
270
271    #[doc(alias = "gst_bus_timed_pop")]
272    pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter<'_> {
273        Iter {
274            bus: self,
275            timeout: timeout.into(),
276        }
277    }
278
279    #[doc(alias = "gst_bus_pop_filtered")]
280    pub fn iter_filtered<'a>(
281        &'a self,
282        msg_types: &'a [MessageType],
283    ) -> impl Iterator<Item = Message> + 'a {
284        self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
285    }
286
287    #[doc(alias = "gst_bus_timed_pop_filtered")]
288    pub fn iter_timed_filtered<'a>(
289        &'a self,
290        timeout: impl Into<Option<crate::ClockTime>>,
291        msg_types: &'a [MessageType],
292    ) -> impl Iterator<Item = Message> + 'a {
293        self.iter_timed(timeout)
294            .filter(move |msg| msg_types.contains(&msg.type_()))
295    }
296
297    #[doc(alias = "gst_bus_timed_pop_filtered")]
298    pub fn timed_pop_filtered(
299        &self,
300        timeout: impl Into<Option<crate::ClockTime>>,
301        msg_types: &[MessageType],
302    ) -> Option<Message> {
303        // Infinite wait: just loop forever
304        let Some(timeout) = timeout.into() else {
305            loop {
306                let msg = self.timed_pop(None)?;
307                if msg_types.contains(&msg.type_()) {
308                    return Some(msg);
309                }
310            }
311        };
312
313        // Finite timeout
314        let total = timeout;
315        let start = std::time::Instant::now();
316
317        loop {
318            let elapsed = crate::ClockTime::from_nseconds(start.elapsed().as_nanos() as u64);
319
320            // If timeout budget is exhausted, return None
321            let remaining = total.checked_sub(elapsed)?;
322
323            let msg = self.timed_pop(Some(remaining))?;
324
325            if msg_types.contains(&msg.type_()) {
326                return Some(msg);
327            }
328
329            // Discard non-matching messages without restarting the timeout
330        }
331    }
332
333    #[doc(alias = "gst_bus_pop_filtered")]
334    pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
335        loop {
336            let msg = self.pop()?;
337            if msg_types.contains(&msg.type_()) {
338                return Some(msg);
339            }
340        }
341    }
342
343    pub fn stream(&self) -> BusStream {
344        BusStream::new(self)
345    }
346
347    pub fn stream_filtered<'a>(
348        &self,
349        message_types: &'a [MessageType],
350    ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a + use<'a> {
351        self.stream().filter(move |message| {
352            let message_type = message.type_();
353
354            future::ready(message_types.contains(&message_type))
355        })
356    }
357}
358
359#[must_use = "iterators are lazy and do nothing unless consumed"]
360#[derive(Debug)]
361pub struct Iter<'a> {
362    bus: &'a Bus,
363    timeout: Option<crate::ClockTime>,
364}
365
366impl Iterator for Iter<'_> {
367    type Item = Message;
368
369    fn next(&mut self) -> Option<Message> {
370        self.bus.timed_pop(self.timeout)
371    }
372}
373
374#[derive(Debug)]
375pub struct BusStream {
376    bus: glib::WeakRef<Bus>,
377    receiver: UnboundedReceiver<Message>,
378}
379
380impl BusStream {
381    fn new(bus: &Bus) -> Self {
382        skip_assert_initialized!();
383
384        let mutex = Arc::new(Mutex::new(()));
385        let (sender, receiver) = mpsc::unbounded();
386
387        // Use a mutex to ensure that the sync handler is not putting any messages into the sender
388        // until we have removed all previously queued messages from the bus.
389        // This makes sure that the messages are staying in order.
390        //
391        // We could use the bus' object lock here but a separate mutex seems safer.
392        let _mutex_guard = mutex.lock().unwrap();
393        bus.set_sync_handler({
394            let sender = sender.clone();
395            let mutex = mutex.clone();
396
397            move |_bus, message| {
398                let _mutex_guard = mutex.lock().unwrap();
399
400                let _ = sender.unbounded_send(message.to_owned());
401
402                BusSyncReply::Drop
403            }
404        });
405
406        // First pop all messages that might've been previously queued before creating the bus stream.
407        while let Some(message) = bus.pop() {
408            let _ = sender.unbounded_send(message);
409        }
410
411        Self {
412            bus: bus.downgrade(),
413            receiver,
414        }
415    }
416}
417
418impl Drop for BusStream {
419    fn drop(&mut self) {
420        if let Some(bus) = self.bus.upgrade() {
421            bus.unset_sync_handler();
422        }
423    }
424}
425
426impl Stream for BusStream {
427    type Item = Message;
428
429    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
430        self.receiver.poll_next_unpin(context)
431    }
432}
433
434impl FusedStream for BusStream {
435    fn is_terminated(&self) -> bool {
436        self.receiver.is_terminated()
437    }
438}
439
440// rustdoc-stripper-ignore-next
441/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
442///
443/// When dropped the bus watch is removed from the bus.
444#[derive(Debug)]
445#[must_use = "if unused the bus watch will immediately be removed"]
446pub struct BusWatchGuard {
447    bus: Bus,
448}
449
450impl Drop for BusWatchGuard {
451    fn drop(&mut self) {
452        let _ = self.bus.remove_watch();
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::{
459        sync::{Arc, Barrier, Mutex},
460        thread,
461        time::{Duration, Instant},
462    };
463
464    use super::*;
465
466    #[test]
467    fn test_sync_handler() {
468        crate::init().unwrap();
469
470        let bus = Bus::new();
471        let msgs = Arc::new(Mutex::new(Vec::new()));
472        let msgs_clone = msgs.clone();
473        bus.set_sync_handler(move |_, msg| {
474            msgs_clone.lock().unwrap().push(msg.clone());
475            BusSyncReply::Pass
476        });
477
478        bus.post(crate::message::Eos::new()).unwrap();
479
480        let msgs = msgs.lock().unwrap();
481        assert_eq!(msgs.len(), 1);
482        match msgs[0].view() {
483            crate::MessageView::Eos(_) => (),
484            _ => unreachable!(),
485        }
486    }
487
488    #[test]
489    fn test_bus_stream() {
490        crate::init().unwrap();
491
492        let bus = Bus::new();
493        let bus_stream = bus.stream();
494
495        let eos_message = crate::message::Eos::new();
496        bus.post(eos_message).unwrap();
497
498        let bus_future = StreamExt::into_future(bus_stream);
499        let (message, _) = futures_executor::block_on(bus_future);
500
501        match message.unwrap().view() {
502            crate::MessageView::Eos(_) => (),
503            _ => unreachable!(),
504        }
505    }
506
507    #[test]
508    fn test_bus_timed_pop_filtered_does_not_restart_timeout_for_filtered_messages() {
509        crate::init().unwrap();
510
511        let bus = Bus::new();
512        let bus_clone = bus.clone();
513
514        let timeout = Duration::from_millis(100);
515        let timeout_allowance = Duration::from_millis(50);
516
517        let barrier = Arc::new(Barrier::new(2));
518        let barrier_clone = barrier.clone();
519
520        // Post non-matching messages for longer than the timeout
521        let message_poster = thread::spawn(move || {
522            // Signal that posting is about to start
523            barrier_clone.wait();
524
525            let start = Instant::now();
526            while start.elapsed() < timeout * 3 {
527                bus_clone
528                    .post(crate::message::DurationChanged::new())
529                    .unwrap();
530                thread::sleep(Duration::from_millis(10));
531            }
532        });
533
534        // Wait until the poster thread is ready
535        barrier.wait();
536
537        let start = Instant::now();
538
539        let msg = bus.timed_pop_filtered(
540            crate::ClockTime::from_mseconds(timeout.as_millis() as u64),
541            &[MessageType::Eos],
542        );
543
544        let elapsed = start.elapsed();
545
546        assert!(msg.is_none());
547
548        // While filtering messages, `timed_pop_filtered` must not restart the
549        // timeout for each discarded message. The timeout applies to the full
550        // wait for a matching message.
551        assert!(
552            elapsed < timeout + timeout_allowance,
553            "timed_pop_filtered exceeded the requested timeout while filtering"
554        );
555
556        message_poster
557            .join()
558            .expect("message_poster thread panicked");
559    }
560}