gstreamer/subclass/
buffer_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6    prelude::*,
7    subclass::{prelude::*, InitializingObject},
8    translate::*,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{ffi, BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef};
14
15pub trait BufferPoolImpl: BufferPoolImplExt + GstObjectImpl + Send + Sync {
16    fn acquire_buffer(
17        &self,
18        params: Option<&BufferPoolAcquireParams>,
19    ) -> Result<crate::Buffer, crate::FlowError> {
20        self.parent_acquire_buffer(params)
21    }
22
23    fn alloc_buffer(
24        &self,
25        params: Option<&BufferPoolAcquireParams>,
26    ) -> Result<crate::Buffer, crate::FlowError> {
27        self.parent_alloc_buffer(params)
28    }
29
30    fn flush_start(&self) {
31        self.parent_flush_start()
32    }
33
34    fn flush_stop(&self) {
35        self.parent_flush_stop()
36    }
37
38    fn free_buffer(&self, buffer: crate::Buffer) {
39        self.parent_free_buffer(buffer)
40    }
41
42    fn release_buffer(&self, buffer: crate::Buffer) {
43        self.parent_release_buffer(buffer)
44    }
45
46    fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
47        self.parent_reset_buffer(buffer)
48    }
49
50    fn start(&self) -> bool {
51        self.parent_start()
52    }
53
54    fn stop(&self) -> bool {
55        self.parent_stop()
56    }
57
58    fn options() -> &'static [&'static str] {
59        &[]
60    }
61
62    fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
63        self.parent_set_config(config)
64    }
65}
66
67mod sealed {
68    pub trait Sealed {}
69    impl<T: super::BufferPoolImplExt> Sealed for T {}
70}
71
72pub trait BufferPoolImplExt: sealed::Sealed + ObjectSubclass {
73    fn parent_acquire_buffer(
74        &self,
75        params: Option<&BufferPoolAcquireParams>,
76    ) -> Result<crate::Buffer, crate::FlowError> {
77        unsafe {
78            let data = Self::type_data();
79            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
80            if let Some(f) = (*parent_class).acquire_buffer {
81                let params_ptr = mut_override(params.to_glib_none().0);
82                let mut buffer = std::ptr::null_mut();
83
84                let result = f(
85                    self.obj()
86                        .unsafe_cast_ref::<crate::BufferPool>()
87                        .to_glib_none()
88                        .0,
89                    &mut buffer,
90                    params_ptr,
91                );
92
93                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
94            } else {
95                Err(crate::FlowError::NotSupported)
96            }
97        }
98    }
99
100    fn parent_alloc_buffer(
101        &self,
102        params: Option<&BufferPoolAcquireParams>,
103    ) -> Result<crate::Buffer, crate::FlowError> {
104        unsafe {
105            let data = Self::type_data();
106            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
107            if let Some(f) = (*parent_class).alloc_buffer {
108                let params_ptr = mut_override(params.to_glib_none().0);
109                let mut buffer = std::ptr::null_mut();
110
111                let result = f(
112                    self.obj()
113                        .unsafe_cast_ref::<crate::BufferPool>()
114                        .to_glib_none()
115                        .0,
116                    &mut buffer,
117                    params_ptr,
118                );
119
120                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
121            } else {
122                Err(crate::FlowError::NotSupported)
123            }
124        }
125    }
126
127    fn parent_free_buffer(&self, buffer: crate::Buffer) {
128        unsafe {
129            let data = Self::type_data();
130            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
131            if let Some(f) = (*parent_class).free_buffer {
132                f(
133                    self.obj()
134                        .unsafe_cast_ref::<crate::BufferPool>()
135                        .to_glib_none()
136                        .0,
137                    buffer.into_glib_ptr(),
138                )
139            }
140        }
141    }
142
143    fn parent_release_buffer(&self, buffer: crate::Buffer) {
144        unsafe {
145            let data = Self::type_data();
146            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
147            if let Some(f) = (*parent_class).release_buffer {
148                f(
149                    self.obj()
150                        .unsafe_cast_ref::<crate::BufferPool>()
151                        .to_glib_none()
152                        .0,
153                    buffer.into_glib_ptr(),
154                )
155            }
156        }
157    }
158
159    fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
160        unsafe {
161            let data = Self::type_data();
162            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
163            if let Some(f) = (*parent_class).reset_buffer {
164                f(
165                    self.obj()
166                        .unsafe_cast_ref::<crate::BufferPool>()
167                        .to_glib_none()
168                        .0,
169                    buffer.as_mut_ptr(),
170                )
171            }
172        }
173    }
174
175    fn parent_start(&self) -> bool {
176        unsafe {
177            let data = Self::type_data();
178            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
179            if let Some(f) = (*parent_class).start {
180                let result = f(self
181                    .obj()
182                    .unsafe_cast_ref::<crate::BufferPool>()
183                    .to_glib_none()
184                    .0);
185
186                from_glib(result)
187            } else {
188                true
189            }
190        }
191    }
192
193    fn parent_stop(&self) -> bool {
194        unsafe {
195            let data = Self::type_data();
196            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
197            if let Some(f) = (*parent_class).stop {
198                let result = f(self
199                    .obj()
200                    .unsafe_cast_ref::<crate::BufferPool>()
201                    .to_glib_none()
202                    .0);
203
204                from_glib(result)
205            } else {
206                true
207            }
208        }
209    }
210
211    fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
212        unsafe {
213            let data = Self::type_data();
214            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
215            if let Some(f) = (*parent_class).set_config {
216                let result = f(
217                    self.obj()
218                        .unsafe_cast_ref::<crate::BufferPool>()
219                        .to_glib_none()
220                        .0,
221                    (*config).as_mut_ptr(),
222                );
223
224                from_glib(result)
225            } else {
226                false
227            }
228        }
229    }
230
231    fn parent_flush_start(&self) {
232        unsafe {
233            let data = Self::type_data();
234            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
235            if let Some(f) = (*parent_class).flush_start {
236                f(self
237                    .obj()
238                    .unsafe_cast_ref::<crate::BufferPool>()
239                    .to_glib_none()
240                    .0)
241            }
242        }
243    }
244
245    fn parent_flush_stop(&self) {
246        unsafe {
247            let data = Self::type_data();
248            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
249            if let Some(f) = (*parent_class).flush_stop {
250                f(self
251                    .obj()
252                    .unsafe_cast_ref::<crate::BufferPool>()
253                    .to_glib_none()
254                    .0)
255            }
256        }
257    }
258}
259
260impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
261
262unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
263    fn class_init(klass: &mut glib::Class<Self>) {
264        Self::parent_class_init::<T>(klass);
265        let klass = klass.as_mut();
266        klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
267        klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
268        klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
269        klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
270        klass.start = Some(buffer_pool_start::<T>);
271        klass.stop = Some(buffer_pool_stop::<T>);
272        klass.get_options = Some(buffer_pool_get_options::<T>);
273        klass.set_config = Some(buffer_pool_set_config::<T>);
274        klass.flush_start = Some(buffer_pool_flush_start::<T>);
275        klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
276        klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
277    }
278
279    fn instance_init(instance: &mut InitializingObject<T>) {
280        Self::parent_instance_init(instance);
281
282        // Store the pool options in the instance data
283        // for later retrieval in buffer_pool_get_options
284        let options = T::options();
285        instance.set_instance_data(T::type_(), glib::StrV::from(options));
286    }
287}
288
289unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
290    ptr: *mut ffi::GstBufferPool,
291    buffer: *mut *mut ffi::GstBuffer,
292    params: *mut ffi::GstBufferPoolAcquireParams,
293) -> ffi::GstFlowReturn {
294    let instance = &*(ptr as *mut T::Instance);
295    let imp = instance.imp();
296    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
297
298    match imp.acquire_buffer(params.as_ref()) {
299        Ok(b) => {
300            *buffer = b.into_glib_ptr();
301            ffi::GST_FLOW_OK
302        }
303        Err(err) => err.into_glib(),
304    }
305}
306
307unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
308    ptr: *mut ffi::GstBufferPool,
309    buffer: *mut *mut ffi::GstBuffer,
310    params: *mut ffi::GstBufferPoolAcquireParams,
311) -> ffi::GstFlowReturn {
312    let instance = &*(ptr as *mut T::Instance);
313    let imp = instance.imp();
314    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
315
316    match imp.alloc_buffer(params.as_ref()) {
317        Ok(b) => {
318            *buffer = b.into_glib_ptr();
319            ffi::GST_FLOW_OK
320        }
321        Err(err) => err.into_glib(),
322    }
323}
324
325unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
326    // the GstBufferPool implementation calls this
327    // in finalize where the ref_count will already
328    // be zero and we are actually destroyed
329    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
330    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
331        // flush_start is a no-op in GstBufferPool
332        return;
333    }
334
335    let instance = &*(ptr as *mut T::Instance);
336    let imp = instance.imp();
337    imp.flush_start();
338}
339
340unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
341    // the GstBufferPool implementation calls this
342    // in finalize where the ref_count will already
343    // be zero and we are actually destroyed
344    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
345    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
346        // flush_stop is a no-op in GstBufferPool
347        return;
348    }
349
350    let instance = &*(ptr as *mut T::Instance);
351    let imp = instance.imp();
352    imp.flush_stop();
353}
354
355unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
356    ptr: *mut ffi::GstBufferPool,
357    buffer: *mut ffi::GstBuffer,
358) {
359    // the GstBufferPool implementation calls this
360    // in finalize where the ref_count will already
361    // be zero and we are actually destroyed
362    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
363    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
364        // As a workaround we call free_buffer directly on the
365        // GstBufferPool to prevent leaking the buffer
366        // This will NOT call free_buffer on a subclass.
367        let pool_class =
368            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
369        let pool_class = pool_class.as_ref();
370        if let Some(f) = pool_class.free_buffer {
371            f(ptr, buffer)
372        }
373        return;
374    }
375
376    let instance = &*(ptr as *mut T::Instance);
377    let imp = instance.imp();
378    imp.free_buffer(from_glib_full(buffer));
379}
380
381unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
382    ptr: *mut ffi::GstBufferPool,
383    buffer: *mut ffi::GstBuffer,
384) {
385    let instance = &*(ptr as *mut T::Instance);
386    let imp = instance.imp();
387    imp.release_buffer(from_glib_full(buffer));
388}
389
390unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
391    ptr: *mut ffi::GstBufferPool,
392    buffer: *mut ffi::GstBuffer,
393) {
394    let instance = &*(ptr as *mut T::Instance);
395    let imp = instance.imp();
396    imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
397}
398
399unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
400    ptr: *mut ffi::GstBufferPool,
401) -> glib::ffi::gboolean {
402    let instance = &*(ptr as *mut T::Instance);
403    let imp = instance.imp();
404    imp.start().into_glib()
405}
406
407unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
408    ptr: *mut ffi::GstBufferPool,
409) -> glib::ffi::gboolean {
410    // the GstBufferPool implementation calls this
411    // in finalize where the ref_count will already
412    // be zero and we are actually destroyed
413    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
414    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
415        // As a workaround we call stop directly on the GstBufferPool
416        // This is needed because the default implementation clears
417        // the pool in stop.
418        let pool_class =
419            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
420        let pool_class = pool_class.as_ref();
421        let result = if let Some(f) = pool_class.stop {
422            f(ptr)
423        } else {
424            true.into_glib()
425        };
426
427        return result;
428    }
429
430    let instance = &*(ptr as *mut T::Instance);
431    let imp = instance.imp();
432    imp.stop().into_glib()
433}
434
435unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
436    ptr: *mut ffi::GstBufferPool,
437) -> *mut *const c_char {
438    let instance = &*(ptr as *mut T::Instance);
439    let imp = instance.imp();
440    T::instance_data::<glib::StrV>(imp, T::type_())
441        .map(|p| p.as_ptr() as *mut *const _)
442        .unwrap_or(ptr::null_mut())
443}
444
445unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
446    ptr: *mut ffi::GstBufferPool,
447    config: *mut ffi::GstStructure,
448) -> glib::ffi::gboolean {
449    let instance = &*(ptr as *mut T::Instance);
450    let imp = instance.imp();
451    imp.set_config(BufferPoolConfigRef::from_glib_borrow_mut(config))
452        .into_glib()
453}
454
455#[cfg(test)]
456mod tests {
457    use std::sync::{
458        atomic::{AtomicBool, Ordering},
459        Arc,
460    };
461
462    use super::*;
463    use crate::prelude::*;
464
465    pub mod imp {
466        use super::*;
467
468        #[derive(Default)]
469        pub struct TestBufferPool;
470
471        impl ObjectImpl for TestBufferPool {}
472        impl GstObjectImpl for TestBufferPool {}
473        impl BufferPoolImpl for TestBufferPool {
474            fn options() -> &'static [&'static str] {
475                &["TEST_OPTION"]
476            }
477
478            fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
479                let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
480                config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
481                self.parent_set_config(config)
482            }
483        }
484
485        #[glib::object_subclass]
486        impl ObjectSubclass for TestBufferPool {
487            const NAME: &'static str = "TestBufferPool";
488            type Type = super::TestBufferPool;
489            type ParentType = BufferPool;
490        }
491    }
492
493    glib::wrapper! {
494        pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
495    }
496
497    impl Default for TestBufferPool {
498        fn default() -> Self {
499            glib::Object::new()
500        }
501    }
502
503    #[test]
504    fn test_pool_options() {
505        crate::init().unwrap();
506        let pool = TestBufferPool::default();
507        assert_eq!(pool.options(), vec!["TEST_OPTION"]);
508    }
509
510    #[test]
511    fn test_pool_acquire() {
512        crate::init().unwrap();
513        let pool = TestBufferPool::default();
514        let mut config = pool.config();
515        config.set_params(None, 1024, 1, 1);
516        pool.set_config(config).expect("failed to set pool config");
517        pool.set_active(true).expect("failed to activate pool");
518        let buffer = pool
519            .acquire_buffer(None)
520            .expect("failed to acquire buffer from pool");
521        assert_eq!(buffer.size(), 2048);
522    }
523
524    #[test]
525    fn test_pool_free_on_finalize() {
526        crate::init().unwrap();
527        let pool = TestBufferPool::default();
528        let mut config = pool.config();
529        config.set_params(None, 1024, 1, 1);
530        pool.set_config(config).expect("failed to set pool config");
531        pool.set_active(true).expect("failed to activate pool");
532        let mut buffer = pool
533            .acquire_buffer(None)
534            .expect("failed to acquire buffer from pool");
535        let finalized = Arc::new(AtomicBool::new(false));
536        unsafe {
537            ffi::gst_mini_object_weak_ref(
538                buffer.make_mut().upcast_mut().as_mut_ptr(),
539                Some(buffer_finalized),
540                Arc::into_raw(finalized.clone()) as *mut _,
541            )
542        };
543        // return the buffer to the pool
544        std::mem::drop(buffer);
545        // drop should finalize the buffer pool which frees all allocated buffers
546        std::mem::drop(pool);
547        assert!(finalized.load(Ordering::SeqCst));
548    }
549
550    #[test]
551    fn test_pool_free_on_deactivate() {
552        crate::init().unwrap();
553        let pool = TestBufferPool::default();
554        let mut config = pool.config();
555        config.set_params(None, 1024, 1, 1);
556        pool.set_config(config).expect("failed to set pool config");
557        pool.set_active(true).expect("failed to activate pool");
558        let mut buffer = pool
559            .acquire_buffer(None)
560            .expect("failed to acquire buffer from pool");
561        let finalized = Arc::new(AtomicBool::new(false));
562        unsafe {
563            ffi::gst_mini_object_weak_ref(
564                buffer.make_mut().upcast_mut().as_mut_ptr(),
565                Some(buffer_finalized),
566                Arc::into_raw(finalized.clone()) as *mut _,
567            )
568        };
569        // return the buffer to the pool
570        std::mem::drop(buffer);
571        // de-activating a poll should free all buffers
572        pool.set_active(false).expect("failed to de-activate pool");
573        assert!(finalized.load(Ordering::SeqCst));
574    }
575
576    unsafe extern "C" fn buffer_finalized(
577        data: *mut libc::c_void,
578        _mini_object: *mut ffi::GstMiniObject,
579    ) {
580        let finalized = Arc::from_raw(data as *const AtomicBool);
581        finalized.store(true, Ordering::SeqCst);
582    }
583}