gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate {
77            self.eos(eos)
78        } else {
79            self
80        }
81    }
82
83    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84        if let Some(eos) = eos {
85            self.eos(eos)
86        } else {
87            self
88        }
89    }
90
91    pub fn new_preroll<
92        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93    >(
94        self,
95        new_preroll: F,
96    ) -> Self {
97        Self {
98            new_preroll: Some(Box::new(new_preroll)),
99            ..self
100        }
101    }
102
103    pub fn new_preroll_if<
104        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105    >(
106        self,
107        new_preroll: F,
108        predicate: bool,
109    ) -> Self {
110        if predicate {
111            self.new_preroll(new_preroll)
112        } else {
113            self
114        }
115    }
116
117    pub fn new_preroll_if_some<
118        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119    >(
120        self,
121        new_preroll: Option<F>,
122    ) -> Self {
123        if let Some(new_preroll) = new_preroll {
124            self.new_preroll(new_preroll)
125        } else {
126            self
127        }
128    }
129
130    pub fn new_sample<
131        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132    >(
133        self,
134        new_sample: F,
135    ) -> Self {
136        Self {
137            new_sample: Some(Box::new(new_sample)),
138            ..self
139        }
140    }
141
142    pub fn new_sample_if<
143        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144    >(
145        self,
146        new_sample: F,
147        predicate: bool,
148    ) -> Self {
149        if predicate {
150            self.new_sample(new_sample)
151        } else {
152            self
153        }
154    }
155
156    pub fn new_sample_if_some<
157        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158    >(
159        self,
160        new_sample: Option<F>,
161    ) -> Self {
162        if let Some(new_sample) = new_sample {
163            self.new_sample(new_sample)
164        } else {
165            self
166        }
167    }
168
169    #[cfg(feature = "v1_20")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172        Self {
173            new_event: Some(Box::new(new_event)),
174            ..self
175        }
176    }
177
178    #[cfg(feature = "v1_20")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181        self,
182        new_event: F,
183        predicate: bool,
184    ) -> Self {
185        if predicate {
186            self.new_event(new_event)
187        } else {
188            self
189        }
190    }
191
192    #[cfg(feature = "v1_20")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195        self,
196        new_event: Option<F>,
197    ) -> Self {
198        if let Some(new_event) = new_event {
199            self.new_event(new_event)
200        } else {
201            self
202        }
203    }
204
205    #[cfg(feature = "v1_24")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207    pub fn propose_allocation<
208        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209    >(
210        self,
211        propose_allocation: F,
212    ) -> Self {
213        Self {
214            propose_allocation: Some(Box::new(propose_allocation)),
215            ..self
216        }
217    }
218
219    #[cfg(feature = "v1_24")]
220    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221    pub fn propose_allocation_if<
222        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223    >(
224        self,
225        propose_allocation: F,
226        predicate: bool,
227    ) -> Self {
228        if predicate {
229            self.propose_allocation(propose_allocation)
230        } else {
231            self
232        }
233    }
234
235    #[cfg(feature = "v1_24")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237    pub fn propose_allocation_if_some<
238        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239    >(
240        self,
241        propose_allocation: Option<F>,
242    ) -> Self {
243        if let Some(propose_allocation) = propose_allocation {
244            self.propose_allocation(propose_allocation)
245        } else {
246            self
247        }
248    }
249
250    #[must_use = "Building the callbacks without using them has no effect"]
251    pub fn build(self) -> AppSinkCallbacks {
252        let have_eos = self.eos.is_some();
253        let have_new_preroll = self.new_preroll.is_some();
254        let have_new_sample = self.new_sample.is_some();
255        let have_new_event = self.new_event.is_some();
256        let have_propose_allocation = self.propose_allocation.is_some();
257
258        AppSinkCallbacks {
259            eos: self.eos,
260            new_preroll: self.new_preroll,
261            new_sample: self.new_sample,
262            new_event: self.new_event,
263            propose_allocation: self.propose_allocation,
264            #[cfg(not(panic = "abort"))]
265            panicked: AtomicBool::new(false),
266            callbacks: ffi::GstAppSinkCallbacks {
267                eos: if have_eos { Some(trampoline_eos) } else { None },
268                new_preroll: if have_new_preroll {
269                    Some(trampoline_new_preroll)
270                } else {
271                    None
272                },
273                new_sample: if have_new_sample {
274                    Some(trampoline_new_sample)
275                } else {
276                    None
277                },
278                new_event: if have_new_event {
279                    Some(trampoline_new_event)
280                } else {
281                    None
282                },
283                propose_allocation: if have_propose_allocation {
284                    Some(trampoline_propose_allocation)
285                } else {
286                    None
287                },
288                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289            },
290        }
291    }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295    let callbacks = callbacks as *mut AppSinkCallbacks;
296    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298    #[cfg(not(panic = "abort"))]
299    if (*callbacks).panicked.load(Ordering::Relaxed) {
300        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302        return;
303    }
304
305    if let Some(ref mut eos) = (*callbacks).eos {
306        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307        match result {
308            Ok(result) => result,
309            Err(err) => {
310                #[cfg(panic = "abort")]
311                {
312                    unreachable!("{err:?}");
313                }
314                #[cfg(not(panic = "abort"))]
315                {
316                    (*callbacks).panicked.store(true, Ordering::Relaxed);
317                    gst::subclass::post_panic_error_message(
318                        element.upcast_ref(),
319                        element.upcast_ref(),
320                        Some(err),
321                    );
322                }
323            }
324        }
325    }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329    appsink: *mut ffi::GstAppSink,
330    callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332    let callbacks = callbacks as *mut AppSinkCallbacks;
333    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335    #[cfg(not(panic = "abort"))]
336    if (*callbacks).panicked.load(Ordering::Relaxed) {
337        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339        return gst::FlowReturn::Error.into_glib();
340    }
341
342    let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344        match result {
345            Ok(result) => result,
346            Err(err) => {
347                #[cfg(panic = "abort")]
348                {
349                    unreachable!("{err:?}");
350                }
351                #[cfg(not(panic = "abort"))]
352                {
353                    (*callbacks).panicked.store(true, Ordering::Relaxed);
354                    gst::subclass::post_panic_error_message(
355                        element.upcast_ref(),
356                        element.upcast_ref(),
357                        Some(err),
358                    );
359
360                    gst::FlowReturn::Error
361                }
362            }
363        }
364    } else {
365        gst::FlowReturn::Error
366    };
367
368    ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372    appsink: *mut ffi::GstAppSink,
373    callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375    let callbacks = callbacks as *mut AppSinkCallbacks;
376    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378    #[cfg(not(panic = "abort"))]
379    if (*callbacks).panicked.load(Ordering::Relaxed) {
380        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382        return gst::FlowReturn::Error.into_glib();
383    }
384
385    let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387        match result {
388            Ok(result) => result,
389            Err(err) => {
390                #[cfg(panic = "abort")]
391                {
392                    unreachable!("{err:?}");
393                }
394                #[cfg(not(panic = "abort"))]
395                {
396                    (*callbacks).panicked.store(true, Ordering::Relaxed);
397                    gst::subclass::post_panic_error_message(
398                        element.upcast_ref(),
399                        element.upcast_ref(),
400                        Some(err),
401                    );
402
403                    gst::FlowReturn::Error
404                }
405            }
406        }
407    } else {
408        gst::FlowReturn::Error
409    };
410
411    ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415    appsink: *mut ffi::GstAppSink,
416    callbacks: gpointer,
417) -> glib::ffi::gboolean {
418    let callbacks = callbacks as *mut AppSinkCallbacks;
419    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421    #[cfg(not(panic = "abort"))]
422    if (*callbacks).panicked.load(Ordering::Relaxed) {
423        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425        return false.into_glib();
426    }
427
428    let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430        match result {
431            Ok(result) => result,
432            Err(err) => {
433                #[cfg(panic = "abort")]
434                {
435                    unreachable!("{err:?}");
436                }
437                #[cfg(not(panic = "abort"))]
438                {
439                    (*callbacks).panicked.store(true, Ordering::Relaxed);
440                    gst::subclass::post_panic_error_message(
441                        element.upcast_ref(),
442                        element.upcast_ref(),
443                        Some(err),
444                    );
445
446                    false
447                }
448            }
449        }
450    } else {
451        false
452    };
453
454    ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458    appsink: *mut ffi::GstAppSink,
459    query: *mut gst::ffi::GstQuery,
460    callbacks: gpointer,
461) -> glib::ffi::gboolean {
462    let callbacks = callbacks as *mut AppSinkCallbacks;
463    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465    #[cfg(not(panic = "abort"))]
466    if (*callbacks).panicked.load(Ordering::Relaxed) {
467        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469        return false.into_glib();
470    }
471
472    let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473        let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474            gst::QueryViewMut::Allocation(allocation) => allocation,
475            _ => unreachable!(),
476        };
477        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478            propose_allocation(&element, query)
479        }));
480        match result {
481            Ok(result) => result,
482            Err(err) => {
483                #[cfg(panic = "abort")]
484                {
485                    unreachable!("{err:?}");
486                }
487                #[cfg(not(panic = "abort"))]
488                {
489                    (*callbacks).panicked.store(true, Ordering::Relaxed);
490                    gst::subclass::post_panic_error_message(
491                        element.upcast_ref(),
492                        element.upcast_ref(),
493                        Some(err),
494                    );
495                    false
496                }
497            }
498        }
499    } else {
500        false
501    };
502
503    ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507    let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511    // rustdoc-stripper-ignore-next
512    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
513    ///
514    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
515    pub fn builder() -> AppSinkBuilder {
516        assert_initialized_main_thread!();
517        AppSinkBuilder::new()
518    }
519
520    #[doc(alias = "gst_app_sink_set_callbacks")]
521    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
522        unsafe {
523            let sink = self.to_glib_none().0;
524
525            #[cfg(not(feature = "v1_18"))]
526            {
527                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
528                    std::sync::OnceLock::new();
529
530                let set_once_quark = SET_ONCE_QUARK
531                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
532
533                // This is not thread-safe before 1.16.3, see
534                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
535                if gst::version() < (1, 16, 3, 0) {
536                    if !glib::gobject_ffi::g_object_get_qdata(
537                        sink as *mut _,
538                        set_once_quark.into_glib(),
539                    )
540                    .is_null()
541                    {
542                        panic!("AppSink callbacks can only be set once");
543                    }
544
545                    glib::gobject_ffi::g_object_set_qdata(
546                        sink as *mut _,
547                        set_once_quark.into_glib(),
548                        1 as *mut _,
549                    );
550                }
551            }
552
553            ffi::gst_app_sink_set_callbacks(
554                sink,
555                mut_override(&callbacks.callbacks),
556                Box::into_raw(Box::new(callbacks)) as *mut _,
557                Some(destroy_callbacks),
558            );
559        }
560    }
561
562    #[doc(alias = "drop-out-of-segment")]
563    pub fn drops_out_of_segment(&self) -> bool {
564        unsafe {
565            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
566                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
567            ))
568        }
569    }
570
571    #[doc(alias = "max-bitrate")]
572    #[doc(alias = "gst_base_sink_get_max_bitrate")]
573    pub fn max_bitrate(&self) -> u64 {
574        unsafe {
575            gst_base::ffi::gst_base_sink_get_max_bitrate(
576                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
577            )
578        }
579    }
580
581    #[doc(alias = "max-lateness")]
582    #[doc(alias = "gst_base_sink_get_max_lateness")]
583    pub fn max_lateness(&self) -> i64 {
584        unsafe {
585            gst_base::ffi::gst_base_sink_get_max_lateness(
586                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
587            )
588        }
589    }
590
591    #[doc(alias = "processing-deadline")]
592    #[cfg(feature = "v1_16")]
593    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
594    #[doc(alias = "gst_base_sink_get_processing_deadline")]
595    pub fn processing_deadline(&self) -> gst::ClockTime {
596        unsafe {
597            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
598                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
599            ))
600            .expect("undefined processing_deadline")
601        }
602    }
603
604    #[doc(alias = "render-delay")]
605    #[doc(alias = "gst_base_sink_get_render_delay")]
606    pub fn render_delay(&self) -> gst::ClockTime {
607        unsafe {
608            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
609                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
610            ))
611            .expect("undefined render_delay")
612        }
613    }
614
615    #[cfg(feature = "v1_18")]
616    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
617    #[doc(alias = "gst_base_sink_get_stats")]
618    pub fn stats(&self) -> gst::Structure {
619        unsafe {
620            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
621                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
622            ))
623        }
624    }
625
626    #[doc(alias = "sync")]
627    pub fn is_sync(&self) -> bool {
628        unsafe {
629            from_glib(gst_base::ffi::gst_base_sink_get_sync(
630                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
631            ))
632        }
633    }
634
635    #[doc(alias = "throttle-time")]
636    #[doc(alias = "gst_base_sink_get_throttle_time")]
637    pub fn throttle_time(&self) -> u64 {
638        unsafe {
639            gst_base::ffi::gst_base_sink_get_throttle_time(
640                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
641            )
642        }
643    }
644
645    #[doc(alias = "ts-offset")]
646    #[doc(alias = "gst_base_sink_get_ts_offset")]
647    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
648        unsafe {
649            gst_base::ffi::gst_base_sink_get_ts_offset(
650                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
651            )
652        }
653    }
654
655    #[doc(alias = "async")]
656    #[doc(alias = "gst_base_sink_is_async_enabled")]
657    pub fn is_async(&self) -> bool {
658        unsafe {
659            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
660                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
661            ))
662        }
663    }
664
665    #[doc(alias = "last-sample")]
666    pub fn enables_last_sample(&self) -> bool {
667        unsafe {
668            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
669                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
670            ))
671        }
672    }
673
674    #[doc(alias = "qos")]
675    #[doc(alias = "gst_base_sink_is_qos_enabled")]
676    pub fn is_qos(&self) -> bool {
677        unsafe {
678            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
679                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
680            ))
681        }
682    }
683
684    #[doc(alias = "async")]
685    #[doc(alias = "gst_base_sink_set_async_enabled")]
686    pub fn set_async(&self, enabled: bool) {
687        unsafe {
688            gst_base::ffi::gst_base_sink_set_async_enabled(
689                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
690                enabled.into_glib(),
691            );
692        }
693    }
694
695    #[doc(alias = "drop-out-of-segment")]
696    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
697    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
698        unsafe {
699            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
700                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
701                drop_out_of_segment.into_glib(),
702            );
703        }
704    }
705
706    #[doc(alias = "last-sample")]
707    pub fn set_enable_last_sample(&self, enabled: bool) {
708        unsafe {
709            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
710                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
711                enabled.into_glib(),
712            );
713        }
714    }
715
716    #[doc(alias = "max-bitrate")]
717    #[doc(alias = "gst_base_sink_set_max_bitrate")]
718    pub fn set_max_bitrate(&self, max_bitrate: u64) {
719        unsafe {
720            gst_base::ffi::gst_base_sink_set_max_bitrate(
721                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
722                max_bitrate,
723            );
724        }
725    }
726
727    #[doc(alias = "max-lateness")]
728    #[doc(alias = "gst_base_sink_set_max_lateness")]
729    pub fn set_max_lateness(&self, max_lateness: i64) {
730        unsafe {
731            gst_base::ffi::gst_base_sink_set_max_lateness(
732                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
733                max_lateness,
734            );
735        }
736    }
737
738    #[doc(alias = "processing-deadline")]
739    #[cfg(feature = "v1_16")]
740    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
741    #[doc(alias = "gst_base_sink_set_processing_deadline")]
742    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
743        unsafe {
744            gst_base::ffi::gst_base_sink_set_processing_deadline(
745                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746                processing_deadline.into_glib(),
747            );
748        }
749    }
750
751    #[doc(alias = "qos")]
752    #[doc(alias = "gst_base_sink_set_qos_enabled")]
753    pub fn set_qos(&self, enabled: bool) {
754        unsafe {
755            gst_base::ffi::gst_base_sink_set_qos_enabled(
756                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
757                enabled.into_glib(),
758            );
759        }
760    }
761
762    #[doc(alias = "render-delay")]
763    #[doc(alias = "gst_base_sink_set_render_delay")]
764    pub fn set_render_delay(&self, delay: gst::ClockTime) {
765        unsafe {
766            gst_base::ffi::gst_base_sink_set_render_delay(
767                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
768                delay.into_glib(),
769            );
770        }
771    }
772
773    #[doc(alias = "sync")]
774    #[doc(alias = "gst_base_sink_set_sync")]
775    pub fn set_sync(&self, sync: bool) {
776        unsafe {
777            gst_base::ffi::gst_base_sink_set_sync(
778                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
779                sync.into_glib(),
780            );
781        }
782    }
783
784    #[doc(alias = "throttle-time")]
785    #[doc(alias = "gst_base_sink_set_throttle_time")]
786    pub fn set_throttle_time(&self, throttle: u64) {
787        unsafe {
788            gst_base::ffi::gst_base_sink_set_throttle_time(
789                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
790                throttle,
791            );
792        }
793    }
794
795    #[doc(alias = "ts-offset")]
796    #[doc(alias = "gst_base_sink_set_ts_offset")]
797    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
798        unsafe {
799            gst_base::ffi::gst_base_sink_set_ts_offset(
800                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
801                offset,
802            );
803        }
804    }
805
806    #[doc(alias = "async")]
807    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
808        &self,
809        f: F,
810    ) -> glib::SignalHandlerId {
811        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
812            this: *mut ffi::GstAppSink,
813            _param_spec: glib::ffi::gpointer,
814            f: glib::ffi::gpointer,
815        ) {
816            let f: &F = &*(f as *const F);
817            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
818        }
819        unsafe {
820            let f: Box<F> = Box::new(f);
821            glib::signal::connect_raw(
822                self.as_ptr() as *mut _,
823                b"notify::async\0".as_ptr() as *const _,
824                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
825                    notify_async_trampoline::<F> as *const (),
826                )),
827                Box::into_raw(f),
828            )
829        }
830    }
831
832    #[doc(alias = "blocksize")]
833    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
834        &self,
835        f: F,
836    ) -> glib::SignalHandlerId {
837        unsafe extern "C" fn notify_blocksize_trampoline<
838            F: Fn(&AppSink) + Send + Sync + 'static,
839        >(
840            this: *mut ffi::GstAppSink,
841            _param_spec: glib::ffi::gpointer,
842            f: glib::ffi::gpointer,
843        ) {
844            let f: &F = &*(f as *const F);
845            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
846        }
847        unsafe {
848            let f: Box<F> = Box::new(f);
849            glib::signal::connect_raw(
850                self.as_ptr() as *mut _,
851                b"notify::blocksize\0".as_ptr() as *const _,
852                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
853                    notify_blocksize_trampoline::<F> as *const (),
854                )),
855                Box::into_raw(f),
856            )
857        }
858    }
859
860    #[doc(alias = "enable-last-sample")]
861    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
862        &self,
863        f: F,
864    ) -> glib::SignalHandlerId {
865        unsafe extern "C" fn notify_enable_last_sample_trampoline<
866            F: Fn(&AppSink) + Send + Sync + 'static,
867        >(
868            this: *mut ffi::GstAppSink,
869            _param_spec: glib::ffi::gpointer,
870            f: glib::ffi::gpointer,
871        ) {
872            let f: &F = &*(f as *const F);
873            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
874        }
875        unsafe {
876            let f: Box<F> = Box::new(f);
877            glib::signal::connect_raw(
878                self.as_ptr() as *mut _,
879                b"notify::enable-last-sample\0".as_ptr() as *const _,
880                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
881                    notify_enable_last_sample_trampoline::<F> as *const (),
882                )),
883                Box::into_raw(f),
884            )
885        }
886    }
887
888    #[doc(alias = "last-sample")]
889    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
890        &self,
891        f: F,
892    ) -> glib::SignalHandlerId {
893        unsafe extern "C" fn notify_last_sample_trampoline<
894            F: Fn(&AppSink) + Send + Sync + 'static,
895        >(
896            this: *mut ffi::GstAppSink,
897            _param_spec: glib::ffi::gpointer,
898            f: glib::ffi::gpointer,
899        ) {
900            let f: &F = &*(f as *const F);
901            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
902        }
903        unsafe {
904            let f: Box<F> = Box::new(f);
905            glib::signal::connect_raw(
906                self.as_ptr() as *mut _,
907                b"notify::last-sample\0".as_ptr() as *const _,
908                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
909                    notify_last_sample_trampoline::<F> as *const (),
910                )),
911                Box::into_raw(f),
912            )
913        }
914    }
915
916    #[doc(alias = "max-bitrate")]
917    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
918        &self,
919        f: F,
920    ) -> glib::SignalHandlerId {
921        unsafe extern "C" fn notify_max_bitrate_trampoline<
922            F: Fn(&AppSink) + Send + Sync + 'static,
923        >(
924            this: *mut ffi::GstAppSink,
925            _param_spec: glib::ffi::gpointer,
926            f: glib::ffi::gpointer,
927        ) {
928            let f: &F = &*(f as *const F);
929            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
930        }
931        unsafe {
932            let f: Box<F> = Box::new(f);
933            glib::signal::connect_raw(
934                self.as_ptr() as *mut _,
935                b"notify::max-bitrate\0".as_ptr() as *const _,
936                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
937                    notify_max_bitrate_trampoline::<F> as *const (),
938                )),
939                Box::into_raw(f),
940            )
941        }
942    }
943
944    #[doc(alias = "max-lateness")]
945    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
946        &self,
947        f: F,
948    ) -> glib::SignalHandlerId {
949        unsafe extern "C" fn notify_max_lateness_trampoline<
950            F: Fn(&AppSink) + Send + Sync + 'static,
951        >(
952            this: *mut ffi::GstAppSink,
953            _param_spec: glib::ffi::gpointer,
954            f: glib::ffi::gpointer,
955        ) {
956            let f: &F = &*(f as *const F);
957            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
958        }
959        unsafe {
960            let f: Box<F> = Box::new(f);
961            glib::signal::connect_raw(
962                self.as_ptr() as *mut _,
963                b"notify::max-lateness\0".as_ptr() as *const _,
964                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
965                    notify_max_lateness_trampoline::<F> as *const (),
966                )),
967                Box::into_raw(f),
968            )
969        }
970    }
971
972    #[cfg(feature = "v1_16")]
973    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
974    #[doc(alias = "processing-deadline")]
975    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
976        &self,
977        f: F,
978    ) -> glib::SignalHandlerId {
979        unsafe extern "C" fn notify_processing_deadline_trampoline<
980            F: Fn(&AppSink) + Send + Sync + 'static,
981        >(
982            this: *mut ffi::GstAppSink,
983            _param_spec: glib::ffi::gpointer,
984            f: glib::ffi::gpointer,
985        ) {
986            let f: &F = &*(f as *const F);
987            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
988        }
989        unsafe {
990            let f: Box<F> = Box::new(f);
991            glib::signal::connect_raw(
992                self.as_ptr() as *mut _,
993                b"notify::processing-deadline\0".as_ptr() as *const _,
994                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
995                    notify_processing_deadline_trampoline::<F> as *const (),
996                )),
997                Box::into_raw(f),
998            )
999        }
1000    }
1001
1002    #[doc(alias = "qos")]
1003    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1004        &self,
1005        f: F,
1006    ) -> glib::SignalHandlerId {
1007        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1008            this: *mut ffi::GstAppSink,
1009            _param_spec: glib::ffi::gpointer,
1010            f: glib::ffi::gpointer,
1011        ) {
1012            let f: &F = &*(f as *const F);
1013            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1014        }
1015        unsafe {
1016            let f: Box<F> = Box::new(f);
1017            glib::signal::connect_raw(
1018                self.as_ptr() as *mut _,
1019                b"notify::qos\0".as_ptr() as *const _,
1020                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1021                    notify_qos_trampoline::<F> as *const (),
1022                )),
1023                Box::into_raw(f),
1024            )
1025        }
1026    }
1027
1028    #[doc(alias = "render-delay")]
1029    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1030        &self,
1031        f: F,
1032    ) -> glib::SignalHandlerId {
1033        unsafe extern "C" fn notify_render_delay_trampoline<
1034            F: Fn(&AppSink) + Send + Sync + 'static,
1035        >(
1036            this: *mut ffi::GstAppSink,
1037            _param_spec: glib::ffi::gpointer,
1038            f: glib::ffi::gpointer,
1039        ) {
1040            let f: &F = &*(f as *const F);
1041            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1042        }
1043        unsafe {
1044            let f: Box<F> = Box::new(f);
1045            glib::signal::connect_raw(
1046                self.as_ptr() as *mut _,
1047                b"notify::render-delay\0".as_ptr() as *const _,
1048                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1049                    notify_render_delay_trampoline::<F> as *const (),
1050                )),
1051                Box::into_raw(f),
1052            )
1053        }
1054    }
1055
1056    #[cfg(feature = "v1_18")]
1057    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1058    #[doc(alias = "stats")]
1059    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1060        &self,
1061        f: F,
1062    ) -> glib::SignalHandlerId {
1063        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1064            this: *mut ffi::GstAppSink,
1065            _param_spec: glib::ffi::gpointer,
1066            f: glib::ffi::gpointer,
1067        ) {
1068            let f: &F = &*(f as *const F);
1069            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1070        }
1071        unsafe {
1072            let f: Box<F> = Box::new(f);
1073            glib::signal::connect_raw(
1074                self.as_ptr() as *mut _,
1075                b"notify::stats\0".as_ptr() as *const _,
1076                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1077                    notify_stats_trampoline::<F> as *const (),
1078                )),
1079                Box::into_raw(f),
1080            )
1081        }
1082    }
1083
1084    #[doc(alias = "sync")]
1085    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1086        &self,
1087        f: F,
1088    ) -> glib::SignalHandlerId {
1089        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1090            this: *mut ffi::GstAppSink,
1091            _param_spec: glib::ffi::gpointer,
1092            f: glib::ffi::gpointer,
1093        ) {
1094            let f: &F = &*(f as *const F);
1095            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1096        }
1097        unsafe {
1098            let f: Box<F> = Box::new(f);
1099            glib::signal::connect_raw(
1100                self.as_ptr() as *mut _,
1101                b"notify::sync\0".as_ptr() as *const _,
1102                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1103                    notify_sync_trampoline::<F> as *const (),
1104                )),
1105                Box::into_raw(f),
1106            )
1107        }
1108    }
1109
1110    #[doc(alias = "throttle-time")]
1111    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1112        &self,
1113        f: F,
1114    ) -> glib::SignalHandlerId {
1115        unsafe extern "C" fn notify_throttle_time_trampoline<
1116            F: Fn(&AppSink) + Send + Sync + 'static,
1117        >(
1118            this: *mut ffi::GstAppSink,
1119            _param_spec: glib::ffi::gpointer,
1120            f: glib::ffi::gpointer,
1121        ) {
1122            let f: &F = &*(f as *const F);
1123            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1124        }
1125        unsafe {
1126            let f: Box<F> = Box::new(f);
1127            glib::signal::connect_raw(
1128                self.as_ptr() as *mut _,
1129                b"notify::throttle-time\0".as_ptr() as *const _,
1130                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1131                    notify_throttle_time_trampoline::<F> as *const (),
1132                )),
1133                Box::into_raw(f),
1134            )
1135        }
1136    }
1137
1138    #[doc(alias = "ts-offset")]
1139    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1140        &self,
1141        f: F,
1142    ) -> glib::SignalHandlerId {
1143        unsafe extern "C" fn notify_ts_offset_trampoline<
1144            F: Fn(&AppSink) + Send + Sync + 'static,
1145        >(
1146            this: *mut ffi::GstAppSink,
1147            _param_spec: glib::ffi::gpointer,
1148            f: glib::ffi::gpointer,
1149        ) {
1150            let f: &F = &*(f as *const F);
1151            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1152        }
1153        unsafe {
1154            let f: Box<F> = Box::new(f);
1155            glib::signal::connect_raw(
1156                self.as_ptr() as *mut _,
1157                b"notify::ts-offset\0".as_ptr() as *const _,
1158                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1159                    notify_ts_offset_trampoline::<F> as *const (),
1160                )),
1161                Box::into_raw(f),
1162            )
1163        }
1164    }
1165
1166    pub fn stream(&self) -> AppSinkStream {
1167        AppSinkStream::new(self)
1168    }
1169}
1170
1171// rustdoc-stripper-ignore-next
1172/// A [builder-pattern] type to construct [`AppSink`] objects.
1173///
1174/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1175#[must_use = "The builder must be built to be used"]
1176pub struct AppSinkBuilder {
1177    builder: glib::object::ObjectBuilder<'static, AppSink>,
1178    callbacks: Option<AppSinkCallbacks>,
1179    drop_out_of_segment: Option<bool>,
1180}
1181
1182impl AppSinkBuilder {
1183    fn new() -> Self {
1184        Self {
1185            builder: glib::Object::builder(),
1186            callbacks: None,
1187            drop_out_of_segment: None,
1188        }
1189    }
1190
1191    // rustdoc-stripper-ignore-next
1192    /// Build the [`AppSink`].
1193    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1194    pub fn build(self) -> AppSink {
1195        let appsink = self.builder.build();
1196
1197        if let Some(callbacks) = self.callbacks {
1198            appsink.set_callbacks(callbacks);
1199        }
1200
1201        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1202            appsink.set_drop_out_of_segment(drop_out_of_segment);
1203        }
1204
1205        appsink
1206    }
1207
1208    pub fn async_(self, async_: bool) -> Self {
1209        Self {
1210            builder: self.builder.property("async", async_),
1211            ..self
1212        }
1213    }
1214
1215    pub fn buffer_list(self, buffer_list: bool) -> Self {
1216        Self {
1217            builder: self.builder.property("buffer-list", buffer_list),
1218            ..self
1219        }
1220    }
1221
1222    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1223        Self {
1224            callbacks: Some(callbacks),
1225            ..self
1226        }
1227    }
1228
1229    pub fn caps(self, caps: &gst::Caps) -> Self {
1230        Self {
1231            builder: self.builder.property("caps", caps),
1232            ..self
1233        }
1234    }
1235
1236    pub fn drop(self, drop: bool) -> Self {
1237        Self {
1238            builder: self.builder.property("drop", drop),
1239            ..self
1240        }
1241    }
1242
1243    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1244        Self {
1245            builder: self
1246                .builder
1247                .property("drop-out-of-segment", drop_out_of_segment),
1248            ..self
1249        }
1250    }
1251
1252    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1253        Self {
1254            builder: self
1255                .builder
1256                .property("enable-last-sample", enable_last_sample),
1257            ..self
1258        }
1259    }
1260
1261    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1262        Self {
1263            builder: self.builder.property("max-bitrate", max_bitrate),
1264            ..self
1265        }
1266    }
1267
1268    pub fn max_buffers(self, max_buffers: u32) -> Self {
1269        Self {
1270            builder: self.builder.property("max-buffers", max_buffers),
1271            ..self
1272        }
1273    }
1274
1275    pub fn max_lateness(self, max_lateness: i64) -> Self {
1276        Self {
1277            builder: self.builder.property("max-lateness", max_lateness),
1278            ..self
1279        }
1280    }
1281
1282    #[cfg(feature = "v1_16")]
1283    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1284    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1285        Self {
1286            builder: self
1287                .builder
1288                .property("processing-deadline", processing_deadline),
1289            ..self
1290        }
1291    }
1292
1293    pub fn qos(self, qos: bool) -> Self {
1294        Self {
1295            builder: self.builder.property("qos", qos),
1296            ..self
1297        }
1298    }
1299
1300    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1301        Self {
1302            builder: self.builder.property("render-delay", render_delay),
1303            ..self
1304        }
1305    }
1306
1307    pub fn sync(self, sync: bool) -> Self {
1308        Self {
1309            builder: self.builder.property("sync", sync),
1310            ..self
1311        }
1312    }
1313
1314    pub fn throttle_time(self, throttle_time: u64) -> Self {
1315        Self {
1316            builder: self.builder.property("throttle-time", throttle_time),
1317            ..self
1318        }
1319    }
1320
1321    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1322        Self {
1323            builder: self.builder.property("ts-offset", ts_offset),
1324            ..self
1325        }
1326    }
1327
1328    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1329        Self {
1330            builder: self.builder.property("wait-on-eos", wait_on_eos),
1331            ..self
1332        }
1333    }
1334
1335    #[cfg(feature = "v1_24")]
1336    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1337    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1338        Self {
1339            builder: self.builder.property("max-time", max_time),
1340            ..self
1341        }
1342    }
1343
1344    #[cfg(feature = "v1_24")]
1345    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1346    pub fn max_bytes(self, max_bytes: u64) -> Self {
1347        Self {
1348            builder: self.builder.property("max-bytes", max_bytes),
1349            ..self
1350        }
1351    }
1352
1353    pub fn name(self, name: impl Into<glib::GString>) -> Self {
1354        Self {
1355            builder: self.builder.property("name", name.into()),
1356            ..self
1357        }
1358    }
1359}
1360
1361#[derive(Debug)]
1362pub struct AppSinkStream {
1363    app_sink: glib::WeakRef<AppSink>,
1364    waker_reference: Arc<Mutex<Option<Waker>>>,
1365}
1366
1367impl AppSinkStream {
1368    fn new(app_sink: &AppSink) -> Self {
1369        skip_assert_initialized!();
1370
1371        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1372
1373        app_sink.set_callbacks(
1374            AppSinkCallbacks::builder()
1375                .new_sample({
1376                    let waker_reference = Arc::clone(&waker_reference);
1377
1378                    move |_| {
1379                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1380                            waker.wake();
1381                        }
1382
1383                        Ok(gst::FlowSuccess::Ok)
1384                    }
1385                })
1386                .eos({
1387                    let waker_reference = Arc::clone(&waker_reference);
1388
1389                    move |_| {
1390                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1391                            waker.wake();
1392                        }
1393                    }
1394                })
1395                .build(),
1396        );
1397
1398        Self {
1399            app_sink: app_sink.downgrade(),
1400            waker_reference,
1401        }
1402    }
1403}
1404
1405impl Drop for AppSinkStream {
1406    fn drop(&mut self) {
1407        #[cfg(not(feature = "v1_18"))]
1408        {
1409            // This is not thread-safe before 1.16.3, see
1410            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1411            if gst::version() >= (1, 16, 3, 0) {
1412                if let Some(app_sink) = self.app_sink.upgrade() {
1413                    app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1414                }
1415            }
1416        }
1417    }
1418}
1419
1420impl Stream for AppSinkStream {
1421    type Item = gst::Sample;
1422
1423    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1424        let mut waker = self.waker_reference.lock().unwrap();
1425
1426        let Some(app_sink) = self.app_sink.upgrade() else {
1427            return Poll::Ready(None);
1428        };
1429
1430        app_sink
1431            .try_pull_sample(gst::ClockTime::ZERO)
1432            .map(|sample| Poll::Ready(Some(sample)))
1433            .unwrap_or_else(|| {
1434                if app_sink.is_eos() {
1435                    return Poll::Ready(None);
1436                }
1437
1438                waker.replace(context.waker().to_owned());
1439
1440                Poll::Pending
1441            })
1442    }
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447    use futures_util::StreamExt;
1448    use gst::prelude::*;
1449
1450    use super::*;
1451
1452    #[test]
1453    fn test_app_sink_stream() {
1454        gst::init().unwrap();
1455
1456        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1457            .property("num-buffers", 5)
1458            .build()
1459            .unwrap();
1460        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1461
1462        let pipeline = gst::Pipeline::new();
1463        pipeline.add(&videotestsrc).unwrap();
1464        pipeline.add(&appsink).unwrap();
1465
1466        videotestsrc.link(&appsink).unwrap();
1467
1468        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1469        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1470
1471        pipeline.set_state(gst::State::Playing).unwrap();
1472        let samples = futures_executor::block_on(samples_future);
1473        pipeline.set_state(gst::State::Null).unwrap();
1474
1475        assert_eq!(samples.len(), 5);
1476    }
1477}