gstreamer/
bus.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future,
5    mem::transmute,
6    pin::Pin,
7    sync::{Arc, Mutex},
8    task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{stream::FusedStream, StreamExt};
14use glib::{
15    ffi::{gboolean, gpointer},
16    prelude::*,
17    source::Priority,
18    translate::*,
19    ControlFlow,
20};
21
22use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25    bus: *mut ffi::GstBus,
26    msg: *mut ffi::GstMessage,
27    func: gpointer,
28) -> gboolean {
29    let func: &mut F = &mut *(func as *mut F);
30    func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
31}
32
33unsafe extern "C" fn destroy_closure_watch<
34    F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
35>(
36    ptr: gpointer,
37) {
38    let _ = Box::<F>::from_raw(ptr as *mut _);
39}
40
41fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
42    #[allow(clippy::type_complexity)]
43    let func: Box<F> = Box::new(func);
44    Box::into_raw(func) as gpointer
45}
46
47unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
48    bus: *mut ffi::GstBus,
49    msg: *mut ffi::GstMessage,
50    func: gpointer,
51) -> gboolean {
52    let func: &mut glib::thread_guard::ThreadGuard<F> =
53        &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
54    (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
55}
56
57unsafe extern "C" fn destroy_closure_watch_local<
58    F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
59>(
60    ptr: gpointer,
61) {
62    let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
63}
64
65fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
66    #[allow(clippy::type_complexity)]
67    let func: Box<glib::thread_guard::ThreadGuard<F>> =
68        Box::new(glib::thread_guard::ThreadGuard::new(func));
69    Box::into_raw(func) as gpointer
70}
71
72unsafe extern "C" fn trampoline_sync<
73    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
74>(
75    bus: *mut ffi::GstBus,
76    msg: *mut ffi::GstMessage,
77    func: gpointer,
78) -> ffi::GstBusSyncReply {
79    let f: &F = &*(func as *const F);
80    let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
81
82    if res == ffi::GST_BUS_DROP {
83        ffi::gst_mini_object_unref(msg as *mut _);
84    }
85
86    res
87}
88
89unsafe extern "C" fn destroy_closure_sync<
90    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
91>(
92    ptr: gpointer,
93) {
94    let _ = Box::<F>::from_raw(ptr as *mut _);
95}
96
97fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
98    func: F,
99) -> gpointer {
100    let func: Box<F> = Box::new(func);
101    Box::into_raw(func) as gpointer
102}
103
104impl Bus {
105    #[doc(alias = "gst_bus_add_signal_watch")]
106    #[doc(alias = "gst_bus_add_signal_watch_full")]
107    pub fn add_signal_watch_full(&self, priority: Priority) {
108        unsafe {
109            ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
110        }
111    }
112
113    #[doc(alias = "gst_bus_create_watch")]
114    pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
115    where
116        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
117    {
118        skip_assert_initialized!();
119        unsafe {
120            let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
121            glib::ffi::g_source_set_callback(
122                source,
123                Some(transmute::<
124                    *mut (),
125                    unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
126                >(trampoline_watch::<F> as *mut ())),
127                into_raw_watch(func),
128                Some(destroy_closure_watch::<F>),
129            );
130            glib::ffi::g_source_set_priority(source, priority.into_glib());
131
132            if let Some(name) = name {
133                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
134            }
135
136            from_glib_full(source)
137        }
138    }
139
140    #[doc(alias = "gst_bus_add_watch")]
141    #[doc(alias = "gst_bus_add_watch_full")]
142    pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
143    where
144        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
145    {
146        unsafe {
147            let res = ffi::gst_bus_add_watch_full(
148                self.to_glib_none().0,
149                glib::ffi::G_PRIORITY_DEFAULT,
150                Some(trampoline_watch::<F>),
151                into_raw_watch(func),
152                Some(destroy_closure_watch::<F>),
153            );
154
155            if res == 0 {
156                Err(glib::bool_error!("Bus already has a watch"))
157            } else {
158                Ok(BusWatchGuard { bus: self.clone() })
159            }
160        }
161    }
162
163    #[doc(alias = "gst_bus_add_watch")]
164    #[doc(alias = "gst_bus_add_watch_full")]
165    pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
166    where
167        F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
168    {
169        unsafe {
170            let ctx = glib::MainContext::ref_thread_default();
171            let _acquire = ctx
172                .acquire()
173                .expect("thread default main context already acquired by another thread");
174
175            let res = ffi::gst_bus_add_watch_full(
176                self.to_glib_none().0,
177                glib::ffi::G_PRIORITY_DEFAULT,
178                Some(trampoline_watch_local::<F>),
179                into_raw_watch_local(func),
180                Some(destroy_closure_watch_local::<F>),
181            );
182
183            if res == 0 {
184                Err(glib::bool_error!("Bus already has a watch"))
185            } else {
186                Ok(BusWatchGuard { bus: self.clone() })
187            }
188        }
189    }
190
191    #[doc(alias = "gst_bus_set_sync_handler")]
192    pub fn set_sync_handler<F>(&self, func: F)
193    where
194        F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
195    {
196        unsafe {
197            let bus = self.to_glib_none().0;
198
199            #[cfg(not(feature = "v1_18"))]
200            {
201                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
202                    std::sync::OnceLock::new();
203
204                let set_once_quark = SET_ONCE_QUARK
205                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
206
207                // This is not thread-safe before 1.16.3, see
208                // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
209                if crate::version() < (1, 16, 3, 0) {
210                    if !glib::gobject_ffi::g_object_get_qdata(
211                        bus as *mut _,
212                        set_once_quark.into_glib(),
213                    )
214                    .is_null()
215                    {
216                        panic!("Bus sync handler can only be set once");
217                    }
218
219                    glib::gobject_ffi::g_object_set_qdata(
220                        bus as *mut _,
221                        set_once_quark.into_glib(),
222                        1 as *mut _,
223                    );
224                }
225            }
226
227            ffi::gst_bus_set_sync_handler(
228                bus,
229                Some(trampoline_sync::<F>),
230                into_raw_sync(func),
231                Some(destroy_closure_sync::<F>),
232            )
233        }
234    }
235
236    pub fn unset_sync_handler(&self) {
237        #[cfg(not(feature = "v1_18"))]
238        {
239            // This is not thread-safe before 1.16.3, see
240            // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
241            if crate::version() < (1, 16, 3, 0) {
242                return;
243            }
244        }
245
246        unsafe {
247            use std::ptr;
248
249            ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
250        }
251    }
252
253    #[doc(alias = "gst_bus_pop")]
254    pub fn iter(&self) -> Iter {
255        self.iter_timed(Some(crate::ClockTime::ZERO))
256    }
257
258    #[doc(alias = "gst_bus_timed_pop")]
259    pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
260        Iter {
261            bus: self,
262            timeout: timeout.into(),
263        }
264    }
265
266    #[doc(alias = "gst_bus_pop_filtered")]
267    pub fn iter_filtered<'a>(
268        &'a self,
269        msg_types: &'a [MessageType],
270    ) -> impl Iterator<Item = Message> + 'a {
271        self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
272    }
273
274    #[doc(alias = "gst_bus_timed_pop_filtered")]
275    pub fn iter_timed_filtered<'a>(
276        &'a self,
277        timeout: impl Into<Option<crate::ClockTime>>,
278        msg_types: &'a [MessageType],
279    ) -> impl Iterator<Item = Message> + 'a {
280        self.iter_timed(timeout)
281            .filter(move |msg| msg_types.contains(&msg.type_()))
282    }
283
284    #[doc(alias = "gst_bus_timed_pop_filtered")]
285    pub fn timed_pop_filtered(
286        &self,
287        timeout: impl Into<Option<crate::ClockTime>> + Clone,
288        msg_types: &[MessageType],
289    ) -> Option<Message> {
290        loop {
291            let msg = self.timed_pop(timeout.clone())?;
292            if msg_types.contains(&msg.type_()) {
293                return Some(msg);
294            }
295        }
296    }
297
298    #[doc(alias = "gst_bus_pop_filtered")]
299    pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
300        loop {
301            let msg = self.pop()?;
302            if msg_types.contains(&msg.type_()) {
303                return Some(msg);
304            }
305        }
306    }
307
308    pub fn stream(&self) -> BusStream {
309        BusStream::new(self)
310    }
311
312    pub fn stream_filtered<'a>(
313        &self,
314        message_types: &'a [MessageType],
315    ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
316        self.stream().filter(move |message| {
317            let message_type = message.type_();
318
319            future::ready(message_types.contains(&message_type))
320        })
321    }
322}
323
324#[derive(Debug)]
325pub struct Iter<'a> {
326    bus: &'a Bus,
327    timeout: Option<crate::ClockTime>,
328}
329
330impl Iterator for Iter<'_> {
331    type Item = Message;
332
333    fn next(&mut self) -> Option<Message> {
334        self.bus.timed_pop(self.timeout)
335    }
336}
337
338#[derive(Debug)]
339pub struct BusStream {
340    bus: glib::WeakRef<Bus>,
341    receiver: UnboundedReceiver<Message>,
342}
343
344impl BusStream {
345    fn new(bus: &Bus) -> Self {
346        skip_assert_initialized!();
347
348        let mutex = Arc::new(Mutex::new(()));
349        let (sender, receiver) = mpsc::unbounded();
350
351        // Use a mutex to ensure that the sync handler is not putting any messages into the sender
352        // until we have removed all previously queued messages from the bus.
353        // This makes sure that the messages are staying in order.
354        //
355        // We could use the bus' object lock here but a separate mutex seems safer.
356        let _mutex_guard = mutex.lock().unwrap();
357        bus.set_sync_handler({
358            let sender = sender.clone();
359            let mutex = mutex.clone();
360
361            move |_bus, message| {
362                let _mutex_guard = mutex.lock().unwrap();
363
364                let _ = sender.unbounded_send(message.to_owned());
365
366                BusSyncReply::Drop
367            }
368        });
369
370        // First pop all messages that might've been previously queued before creating the bus stream.
371        while let Some(message) = bus.pop() {
372            let _ = sender.unbounded_send(message);
373        }
374
375        Self {
376            bus: bus.downgrade(),
377            receiver,
378        }
379    }
380}
381
382impl Drop for BusStream {
383    fn drop(&mut self) {
384        if let Some(bus) = self.bus.upgrade() {
385            bus.unset_sync_handler();
386        }
387    }
388}
389
390impl Stream for BusStream {
391    type Item = Message;
392
393    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
394        self.receiver.poll_next_unpin(context)
395    }
396}
397
398impl FusedStream for BusStream {
399    fn is_terminated(&self) -> bool {
400        self.receiver.is_terminated()
401    }
402}
403
404// rustdoc-stripper-ignore-next
405/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
406///
407/// When dropped the bus watch is removed from the bus.
408#[derive(Debug)]
409#[must_use = "if unused the bus watch will immediately be removed"]
410pub struct BusWatchGuard {
411    bus: Bus,
412}
413
414impl Drop for BusWatchGuard {
415    fn drop(&mut self) {
416        let _ = self.bus.remove_watch();
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::sync::{Arc, Mutex};
423
424    use super::*;
425
426    #[test]
427    fn test_sync_handler() {
428        crate::init().unwrap();
429
430        let bus = Bus::new();
431        let msgs = Arc::new(Mutex::new(Vec::new()));
432        let msgs_clone = msgs.clone();
433        bus.set_sync_handler(move |_, msg| {
434            msgs_clone.lock().unwrap().push(msg.clone());
435            BusSyncReply::Pass
436        });
437
438        bus.post(crate::message::Eos::new()).unwrap();
439
440        let msgs = msgs.lock().unwrap();
441        assert_eq!(msgs.len(), 1);
442        match msgs[0].view() {
443            crate::MessageView::Eos(_) => (),
444            _ => unreachable!(),
445        }
446    }
447
448    #[test]
449    fn test_bus_stream() {
450        crate::init().unwrap();
451
452        let bus = Bus::new();
453        let bus_stream = bus.stream();
454
455        let eos_message = crate::message::Eos::new();
456        bus.post(eos_message).unwrap();
457
458        let bus_future = bus_stream.into_future();
459        let (message, _) = futures_executor::block_on(bus_future);
460
461        match message.unwrap().view() {
462            crate::MessageView::Eos(_) => (),
463            _ => unreachable!(),
464        }
465    }
466}