1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{stream::FusedStream, StreamExt};
14use glib::{
15 ffi::{gboolean, gpointer},
16 prelude::*,
17 source::Priority,
18 translate::*,
19 ControlFlow,
20};
21
22use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25 bus: *mut ffi::GstBus,
26 msg: *mut ffi::GstMessage,
27 func: gpointer,
28) -> gboolean {
29 let func: &mut F = &mut *(func as *mut F);
30 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
31}
32
33unsafe extern "C" fn destroy_closure_watch<
34 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
35>(
36 ptr: gpointer,
37) {
38 let _ = Box::<F>::from_raw(ptr as *mut _);
39}
40
41fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
42 #[allow(clippy::type_complexity)]
43 let func: Box<F> = Box::new(func);
44 Box::into_raw(func) as gpointer
45}
46
47unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
48 bus: *mut ffi::GstBus,
49 msg: *mut ffi::GstMessage,
50 func: gpointer,
51) -> gboolean {
52 let func: &mut glib::thread_guard::ThreadGuard<F> =
53 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
54 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
55}
56
57unsafe extern "C" fn destroy_closure_watch_local<
58 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
59>(
60 ptr: gpointer,
61) {
62 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
63}
64
65fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
66 #[allow(clippy::type_complexity)]
67 let func: Box<glib::thread_guard::ThreadGuard<F>> =
68 Box::new(glib::thread_guard::ThreadGuard::new(func));
69 Box::into_raw(func) as gpointer
70}
71
72unsafe extern "C" fn trampoline_sync<
73 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
74>(
75 bus: *mut ffi::GstBus,
76 msg: *mut ffi::GstMessage,
77 func: gpointer,
78) -> ffi::GstBusSyncReply {
79 let f: &F = &*(func as *const F);
80 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
81
82 if res == ffi::GST_BUS_DROP {
83 ffi::gst_mini_object_unref(msg as *mut _);
84 }
85
86 res
87}
88
89unsafe extern "C" fn destroy_closure_sync<
90 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
91>(
92 ptr: gpointer,
93) {
94 let _ = Box::<F>::from_raw(ptr as *mut _);
95}
96
97fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
98 func: F,
99) -> gpointer {
100 let func: Box<F> = Box::new(func);
101 Box::into_raw(func) as gpointer
102}
103
104impl Bus {
105 #[doc(alias = "gst_bus_add_signal_watch")]
106 #[doc(alias = "gst_bus_add_signal_watch_full")]
107 pub fn add_signal_watch_full(&self, priority: Priority) {
108 unsafe {
109 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
110 }
111 }
112
113 #[doc(alias = "gst_bus_create_watch")]
114 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
115 where
116 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
117 {
118 skip_assert_initialized!();
119 unsafe {
120 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
121 glib::ffi::g_source_set_callback(
122 source,
123 Some(transmute::<
124 *mut (),
125 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
126 >(trampoline_watch::<F> as *mut ())),
127 into_raw_watch(func),
128 Some(destroy_closure_watch::<F>),
129 );
130 glib::ffi::g_source_set_priority(source, priority.into_glib());
131
132 if let Some(name) = name {
133 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
134 }
135
136 from_glib_full(source)
137 }
138 }
139
140 #[doc(alias = "gst_bus_add_watch")]
141 #[doc(alias = "gst_bus_add_watch_full")]
142 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
143 where
144 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
145 {
146 unsafe {
147 let res = ffi::gst_bus_add_watch_full(
148 self.to_glib_none().0,
149 glib::ffi::G_PRIORITY_DEFAULT,
150 Some(trampoline_watch::<F>),
151 into_raw_watch(func),
152 Some(destroy_closure_watch::<F>),
153 );
154
155 if res == 0 {
156 Err(glib::bool_error!("Bus already has a watch"))
157 } else {
158 Ok(BusWatchGuard { bus: self.clone() })
159 }
160 }
161 }
162
163 #[doc(alias = "gst_bus_add_watch")]
164 #[doc(alias = "gst_bus_add_watch_full")]
165 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
166 where
167 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
168 {
169 unsafe {
170 let ctx = glib::MainContext::ref_thread_default();
171 let _acquire = ctx
172 .acquire()
173 .expect("thread default main context already acquired by another thread");
174
175 let res = ffi::gst_bus_add_watch_full(
176 self.to_glib_none().0,
177 glib::ffi::G_PRIORITY_DEFAULT,
178 Some(trampoline_watch_local::<F>),
179 into_raw_watch_local(func),
180 Some(destroy_closure_watch_local::<F>),
181 );
182
183 if res == 0 {
184 Err(glib::bool_error!("Bus already has a watch"))
185 } else {
186 Ok(BusWatchGuard { bus: self.clone() })
187 }
188 }
189 }
190
191 #[doc(alias = "gst_bus_set_sync_handler")]
192 pub fn set_sync_handler<F>(&self, func: F)
193 where
194 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
195 {
196 unsafe {
197 let bus = self.to_glib_none().0;
198
199 #[cfg(not(feature = "v1_18"))]
200 {
201 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
202 std::sync::OnceLock::new();
203
204 let set_once_quark = SET_ONCE_QUARK
205 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
206
207 if crate::version() < (1, 16, 3, 0) {
210 if !glib::gobject_ffi::g_object_get_qdata(
211 bus as *mut _,
212 set_once_quark.into_glib(),
213 )
214 .is_null()
215 {
216 panic!("Bus sync handler can only be set once");
217 }
218
219 glib::gobject_ffi::g_object_set_qdata(
220 bus as *mut _,
221 set_once_quark.into_glib(),
222 1 as *mut _,
223 );
224 }
225 }
226
227 ffi::gst_bus_set_sync_handler(
228 bus,
229 Some(trampoline_sync::<F>),
230 into_raw_sync(func),
231 Some(destroy_closure_sync::<F>),
232 )
233 }
234 }
235
236 pub fn unset_sync_handler(&self) {
237 #[cfg(not(feature = "v1_18"))]
238 {
239 if crate::version() < (1, 16, 3, 0) {
242 return;
243 }
244 }
245
246 unsafe {
247 use std::ptr;
248
249 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
250 }
251 }
252
253 #[doc(alias = "gst_bus_pop")]
254 pub fn iter(&self) -> Iter {
255 self.iter_timed(Some(crate::ClockTime::ZERO))
256 }
257
258 #[doc(alias = "gst_bus_timed_pop")]
259 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
260 Iter {
261 bus: self,
262 timeout: timeout.into(),
263 }
264 }
265
266 #[doc(alias = "gst_bus_pop_filtered")]
267 pub fn iter_filtered<'a>(
268 &'a self,
269 msg_types: &'a [MessageType],
270 ) -> impl Iterator<Item = Message> + 'a {
271 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
272 }
273
274 #[doc(alias = "gst_bus_timed_pop_filtered")]
275 pub fn iter_timed_filtered<'a>(
276 &'a self,
277 timeout: impl Into<Option<crate::ClockTime>>,
278 msg_types: &'a [MessageType],
279 ) -> impl Iterator<Item = Message> + 'a {
280 self.iter_timed(timeout)
281 .filter(move |msg| msg_types.contains(&msg.type_()))
282 }
283
284 #[doc(alias = "gst_bus_timed_pop_filtered")]
285 pub fn timed_pop_filtered(
286 &self,
287 timeout: impl Into<Option<crate::ClockTime>> + Clone,
288 msg_types: &[MessageType],
289 ) -> Option<Message> {
290 loop {
291 let msg = self.timed_pop(timeout.clone())?;
292 if msg_types.contains(&msg.type_()) {
293 return Some(msg);
294 }
295 }
296 }
297
298 #[doc(alias = "gst_bus_pop_filtered")]
299 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
300 loop {
301 let msg = self.pop()?;
302 if msg_types.contains(&msg.type_()) {
303 return Some(msg);
304 }
305 }
306 }
307
308 pub fn stream(&self) -> BusStream {
309 BusStream::new(self)
310 }
311
312 pub fn stream_filtered<'a>(
313 &self,
314 message_types: &'a [MessageType],
315 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
316 self.stream().filter(move |message| {
317 let message_type = message.type_();
318
319 future::ready(message_types.contains(&message_type))
320 })
321 }
322}
323
324#[derive(Debug)]
325pub struct Iter<'a> {
326 bus: &'a Bus,
327 timeout: Option<crate::ClockTime>,
328}
329
330impl Iterator for Iter<'_> {
331 type Item = Message;
332
333 fn next(&mut self) -> Option<Message> {
334 self.bus.timed_pop(self.timeout)
335 }
336}
337
338#[derive(Debug)]
339pub struct BusStream {
340 bus: glib::WeakRef<Bus>,
341 receiver: UnboundedReceiver<Message>,
342}
343
344impl BusStream {
345 fn new(bus: &Bus) -> Self {
346 skip_assert_initialized!();
347
348 let mutex = Arc::new(Mutex::new(()));
349 let (sender, receiver) = mpsc::unbounded();
350
351 let _mutex_guard = mutex.lock().unwrap();
357 bus.set_sync_handler({
358 let sender = sender.clone();
359 let mutex = mutex.clone();
360
361 move |_bus, message| {
362 let _mutex_guard = mutex.lock().unwrap();
363
364 let _ = sender.unbounded_send(message.to_owned());
365
366 BusSyncReply::Drop
367 }
368 });
369
370 while let Some(message) = bus.pop() {
372 let _ = sender.unbounded_send(message);
373 }
374
375 Self {
376 bus: bus.downgrade(),
377 receiver,
378 }
379 }
380}
381
382impl Drop for BusStream {
383 fn drop(&mut self) {
384 if let Some(bus) = self.bus.upgrade() {
385 bus.unset_sync_handler();
386 }
387 }
388}
389
390impl Stream for BusStream {
391 type Item = Message;
392
393 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
394 self.receiver.poll_next_unpin(context)
395 }
396}
397
398impl FusedStream for BusStream {
399 fn is_terminated(&self) -> bool {
400 self.receiver.is_terminated()
401 }
402}
403
404#[derive(Debug)]
409#[must_use = "if unused the bus watch will immediately be removed"]
410pub struct BusWatchGuard {
411 bus: Bus,
412}
413
414impl Drop for BusWatchGuard {
415 fn drop(&mut self) {
416 let _ = self.bus.remove_watch();
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use std::sync::{Arc, Mutex};
423
424 use super::*;
425
426 #[test]
427 fn test_sync_handler() {
428 crate::init().unwrap();
429
430 let bus = Bus::new();
431 let msgs = Arc::new(Mutex::new(Vec::new()));
432 let msgs_clone = msgs.clone();
433 bus.set_sync_handler(move |_, msg| {
434 msgs_clone.lock().unwrap().push(msg.clone());
435 BusSyncReply::Pass
436 });
437
438 bus.post(crate::message::Eos::new()).unwrap();
439
440 let msgs = msgs.lock().unwrap();
441 assert_eq!(msgs.len(), 1);
442 match msgs[0].view() {
443 crate::MessageView::Eos(_) => (),
444 _ => unreachable!(),
445 }
446 }
447
448 #[test]
449 fn test_bus_stream() {
450 crate::init().unwrap();
451
452 let bus = Bus::new();
453 let bus_stream = bus.stream();
454
455 let eos_message = crate::message::Eos::new();
456 bus.post(eos_message).unwrap();
457
458 let bus_future = bus_stream.into_future();
459 let (message, _) = futures_executor::block_on(bus_future);
460
461 match message.unwrap().view() {
462 crate::MessageView::Eos(_) => (),
463 _ => unreachable!(),
464 }
465 }
466}