gstreamer/subclass/
buffer_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6    prelude::*,
7    subclass::{prelude::*, InitializingObject},
8    translate::*,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{ffi, BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef};
14
15pub trait BufferPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<BufferPool>> {
16    fn acquire_buffer(
17        &self,
18        params: Option<&BufferPoolAcquireParams>,
19    ) -> Result<crate::Buffer, crate::FlowError> {
20        self.parent_acquire_buffer(params)
21    }
22
23    fn alloc_buffer(
24        &self,
25        params: Option<&BufferPoolAcquireParams>,
26    ) -> Result<crate::Buffer, crate::FlowError> {
27        self.parent_alloc_buffer(params)
28    }
29
30    fn flush_start(&self) {
31        self.parent_flush_start()
32    }
33
34    fn flush_stop(&self) {
35        self.parent_flush_stop()
36    }
37
38    fn free_buffer(&self, buffer: crate::Buffer) {
39        self.parent_free_buffer(buffer)
40    }
41
42    fn release_buffer(&self, buffer: crate::Buffer) {
43        self.parent_release_buffer(buffer)
44    }
45
46    fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
47        self.parent_reset_buffer(buffer)
48    }
49
50    fn start(&self) -> bool {
51        self.parent_start()
52    }
53
54    fn stop(&self) -> bool {
55        self.parent_stop()
56    }
57
58    fn options() -> &'static [&'static str] {
59        &[]
60    }
61
62    fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
63        self.parent_set_config(config)
64    }
65}
66
67pub trait BufferPoolImplExt: BufferPoolImpl {
68    fn parent_acquire_buffer(
69        &self,
70        params: Option<&BufferPoolAcquireParams>,
71    ) -> Result<crate::Buffer, crate::FlowError> {
72        unsafe {
73            let data = Self::type_data();
74            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
75            if let Some(f) = (*parent_class).acquire_buffer {
76                let params_ptr = mut_override(params.to_glib_none().0);
77                let mut buffer = std::ptr::null_mut();
78
79                let result = f(
80                    self.obj()
81                        .unsafe_cast_ref::<crate::BufferPool>()
82                        .to_glib_none()
83                        .0,
84                    &mut buffer,
85                    params_ptr,
86                );
87
88                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
89            } else {
90                Err(crate::FlowError::NotSupported)
91            }
92        }
93    }
94
95    fn parent_alloc_buffer(
96        &self,
97        params: Option<&BufferPoolAcquireParams>,
98    ) -> Result<crate::Buffer, crate::FlowError> {
99        unsafe {
100            let data = Self::type_data();
101            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
102            if let Some(f) = (*parent_class).alloc_buffer {
103                let params_ptr = mut_override(params.to_glib_none().0);
104                let mut buffer = std::ptr::null_mut();
105
106                let result = f(
107                    self.obj()
108                        .unsafe_cast_ref::<crate::BufferPool>()
109                        .to_glib_none()
110                        .0,
111                    &mut buffer,
112                    params_ptr,
113                );
114
115                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
116            } else {
117                Err(crate::FlowError::NotSupported)
118            }
119        }
120    }
121
122    fn parent_free_buffer(&self, buffer: crate::Buffer) {
123        unsafe {
124            let data = Self::type_data();
125            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
126            if let Some(f) = (*parent_class).free_buffer {
127                f(
128                    self.obj()
129                        .unsafe_cast_ref::<crate::BufferPool>()
130                        .to_glib_none()
131                        .0,
132                    buffer.into_glib_ptr(),
133                )
134            }
135        }
136    }
137
138    fn parent_release_buffer(&self, buffer: crate::Buffer) {
139        unsafe {
140            let data = Self::type_data();
141            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
142            if let Some(f) = (*parent_class).release_buffer {
143                f(
144                    self.obj()
145                        .unsafe_cast_ref::<crate::BufferPool>()
146                        .to_glib_none()
147                        .0,
148                    buffer.into_glib_ptr(),
149                )
150            }
151        }
152    }
153
154    fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
155        unsafe {
156            let data = Self::type_data();
157            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
158            if let Some(f) = (*parent_class).reset_buffer {
159                f(
160                    self.obj()
161                        .unsafe_cast_ref::<crate::BufferPool>()
162                        .to_glib_none()
163                        .0,
164                    buffer.as_mut_ptr(),
165                )
166            }
167        }
168    }
169
170    fn parent_start(&self) -> bool {
171        unsafe {
172            let data = Self::type_data();
173            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
174            if let Some(f) = (*parent_class).start {
175                let result = f(self
176                    .obj()
177                    .unsafe_cast_ref::<crate::BufferPool>()
178                    .to_glib_none()
179                    .0);
180
181                from_glib(result)
182            } else {
183                true
184            }
185        }
186    }
187
188    fn parent_stop(&self) -> bool {
189        unsafe {
190            let data = Self::type_data();
191            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
192            if let Some(f) = (*parent_class).stop {
193                let result = f(self
194                    .obj()
195                    .unsafe_cast_ref::<crate::BufferPool>()
196                    .to_glib_none()
197                    .0);
198
199                from_glib(result)
200            } else {
201                true
202            }
203        }
204    }
205
206    fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
207        unsafe {
208            let data = Self::type_data();
209            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
210            if let Some(f) = (*parent_class).set_config {
211                let result = f(
212                    self.obj()
213                        .unsafe_cast_ref::<crate::BufferPool>()
214                        .to_glib_none()
215                        .0,
216                    (*config).as_mut_ptr(),
217                );
218
219                from_glib(result)
220            } else {
221                false
222            }
223        }
224    }
225
226    fn parent_flush_start(&self) {
227        unsafe {
228            let data = Self::type_data();
229            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
230            if let Some(f) = (*parent_class).flush_start {
231                f(self
232                    .obj()
233                    .unsafe_cast_ref::<crate::BufferPool>()
234                    .to_glib_none()
235                    .0)
236            }
237        }
238    }
239
240    fn parent_flush_stop(&self) {
241        unsafe {
242            let data = Self::type_data();
243            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
244            if let Some(f) = (*parent_class).flush_stop {
245                f(self
246                    .obj()
247                    .unsafe_cast_ref::<crate::BufferPool>()
248                    .to_glib_none()
249                    .0)
250            }
251        }
252    }
253}
254
255impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
256
257unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
258    fn class_init(klass: &mut glib::Class<Self>) {
259        Self::parent_class_init::<T>(klass);
260        let klass = klass.as_mut();
261        klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
262        klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
263        klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
264        klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
265        klass.start = Some(buffer_pool_start::<T>);
266        klass.stop = Some(buffer_pool_stop::<T>);
267        klass.get_options = Some(buffer_pool_get_options::<T>);
268        klass.set_config = Some(buffer_pool_set_config::<T>);
269        klass.flush_start = Some(buffer_pool_flush_start::<T>);
270        klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
271        klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
272    }
273
274    fn instance_init(instance: &mut InitializingObject<T>) {
275        Self::parent_instance_init(instance);
276
277        // Store the pool options in the instance data
278        // for later retrieval in buffer_pool_get_options
279        let options = T::options();
280        instance.set_instance_data(T::type_(), glib::StrV::from(options));
281    }
282}
283
284unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
285    ptr: *mut ffi::GstBufferPool,
286    buffer: *mut *mut ffi::GstBuffer,
287    params: *mut ffi::GstBufferPoolAcquireParams,
288) -> ffi::GstFlowReturn {
289    let instance = &*(ptr as *mut T::Instance);
290    let imp = instance.imp();
291    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
292
293    match imp.acquire_buffer(params.as_ref()) {
294        Ok(b) => {
295            *buffer = b.into_glib_ptr();
296            ffi::GST_FLOW_OK
297        }
298        Err(err) => err.into_glib(),
299    }
300}
301
302unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
303    ptr: *mut ffi::GstBufferPool,
304    buffer: *mut *mut ffi::GstBuffer,
305    params: *mut ffi::GstBufferPoolAcquireParams,
306) -> ffi::GstFlowReturn {
307    let instance = &*(ptr as *mut T::Instance);
308    let imp = instance.imp();
309    let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
310
311    match imp.alloc_buffer(params.as_ref()) {
312        Ok(b) => {
313            *buffer = b.into_glib_ptr();
314            ffi::GST_FLOW_OK
315        }
316        Err(err) => err.into_glib(),
317    }
318}
319
320unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
321    // the GstBufferPool implementation calls this
322    // in finalize where the ref_count will already
323    // be zero and we are actually destroyed
324    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
325    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
326        // flush_start is a no-op in GstBufferPool
327        return;
328    }
329
330    let instance = &*(ptr as *mut T::Instance);
331    let imp = instance.imp();
332    imp.flush_start();
333}
334
335unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
336    // the GstBufferPool implementation calls this
337    // in finalize where the ref_count will already
338    // be zero and we are actually destroyed
339    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
340    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
341        // flush_stop is a no-op in GstBufferPool
342        return;
343    }
344
345    let instance = &*(ptr as *mut T::Instance);
346    let imp = instance.imp();
347    imp.flush_stop();
348}
349
350unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
351    ptr: *mut ffi::GstBufferPool,
352    buffer: *mut ffi::GstBuffer,
353) {
354    // the GstBufferPool implementation calls this
355    // in finalize where the ref_count will already
356    // be zero and we are actually destroyed
357    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
358    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
359        // As a workaround we call free_buffer directly on the
360        // GstBufferPool to prevent leaking the buffer
361        // This will NOT call free_buffer on a subclass.
362        let pool_class =
363            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
364        let pool_class = pool_class.as_ref();
365        if let Some(f) = pool_class.free_buffer {
366            f(ptr, buffer)
367        }
368        return;
369    }
370
371    let instance = &*(ptr as *mut T::Instance);
372    let imp = instance.imp();
373    imp.free_buffer(from_glib_full(buffer));
374}
375
376unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
377    ptr: *mut ffi::GstBufferPool,
378    buffer: *mut ffi::GstBuffer,
379) {
380    let instance = &*(ptr as *mut T::Instance);
381    let imp = instance.imp();
382    imp.release_buffer(from_glib_full(buffer));
383}
384
385unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
386    ptr: *mut ffi::GstBufferPool,
387    buffer: *mut ffi::GstBuffer,
388) {
389    let instance = &*(ptr as *mut T::Instance);
390    let imp = instance.imp();
391    imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
392}
393
394unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
395    ptr: *mut ffi::GstBufferPool,
396) -> glib::ffi::gboolean {
397    let instance = &*(ptr as *mut T::Instance);
398    let imp = instance.imp();
399    imp.start().into_glib()
400}
401
402unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
403    ptr: *mut ffi::GstBufferPool,
404) -> glib::ffi::gboolean {
405    // the GstBufferPool implementation calls this
406    // in finalize where the ref_count will already
407    // be zero and we are actually destroyed
408    // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
409    if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
410        // As a workaround we call stop directly on the GstBufferPool
411        // This is needed because the default implementation clears
412        // the pool in stop.
413        let pool_class =
414            glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type()).unwrap();
415        let pool_class = pool_class.as_ref();
416        let result = if let Some(f) = pool_class.stop {
417            f(ptr)
418        } else {
419            true.into_glib()
420        };
421
422        return result;
423    }
424
425    let instance = &*(ptr as *mut T::Instance);
426    let imp = instance.imp();
427    imp.stop().into_glib()
428}
429
430unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
431    ptr: *mut ffi::GstBufferPool,
432) -> *mut *const c_char {
433    let instance = &*(ptr as *mut T::Instance);
434    let imp = instance.imp();
435    T::instance_data::<glib::StrV>(imp, T::type_())
436        .map(|p| p.as_ptr() as *mut *const _)
437        .unwrap_or(ptr::null_mut())
438}
439
440unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
441    ptr: *mut ffi::GstBufferPool,
442    config: *mut ffi::GstStructure,
443) -> glib::ffi::gboolean {
444    let instance = &*(ptr as *mut T::Instance);
445    let imp = instance.imp();
446    imp.set_config(BufferPoolConfigRef::from_glib_borrow_mut(config))
447        .into_glib()
448}
449
450#[cfg(test)]
451mod tests {
452    use std::sync::{
453        atomic::{AtomicBool, Ordering},
454        Arc,
455    };
456
457    use super::*;
458    use crate::prelude::*;
459
460    pub mod imp {
461        use super::*;
462
463        #[derive(Default)]
464        pub struct TestBufferPool;
465
466        impl ObjectImpl for TestBufferPool {}
467        impl GstObjectImpl for TestBufferPool {}
468        impl BufferPoolImpl for TestBufferPool {
469            fn options() -> &'static [&'static str] {
470                &["TEST_OPTION"]
471            }
472
473            fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
474                let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
475                config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
476                self.parent_set_config(config)
477            }
478        }
479
480        #[glib::object_subclass]
481        impl ObjectSubclass for TestBufferPool {
482            const NAME: &'static str = "TestBufferPool";
483            type Type = super::TestBufferPool;
484            type ParentType = BufferPool;
485        }
486    }
487
488    glib::wrapper! {
489        pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
490    }
491
492    impl Default for TestBufferPool {
493        fn default() -> Self {
494            glib::Object::new()
495        }
496    }
497
498    #[test]
499    fn test_pool_options() {
500        crate::init().unwrap();
501        let pool = TestBufferPool::default();
502        assert_eq!(pool.options(), vec!["TEST_OPTION"]);
503    }
504
505    #[test]
506    fn test_pool_acquire() {
507        crate::init().unwrap();
508        let pool = TestBufferPool::default();
509        let mut config = pool.config();
510        config.set_params(None, 1024, 1, 1);
511        pool.set_config(config).expect("failed to set pool config");
512        pool.set_active(true).expect("failed to activate pool");
513        let buffer = pool
514            .acquire_buffer(None)
515            .expect("failed to acquire buffer from pool");
516        assert_eq!(buffer.size(), 2048);
517    }
518
519    #[test]
520    fn test_pool_free_on_finalize() {
521        crate::init().unwrap();
522        let pool = TestBufferPool::default();
523        let mut config = pool.config();
524        config.set_params(None, 1024, 1, 1);
525        pool.set_config(config).expect("failed to set pool config");
526        pool.set_active(true).expect("failed to activate pool");
527        let mut buffer = pool
528            .acquire_buffer(None)
529            .expect("failed to acquire buffer from pool");
530        let finalized = Arc::new(AtomicBool::new(false));
531        unsafe {
532            ffi::gst_mini_object_weak_ref(
533                buffer.make_mut().upcast_mut().as_mut_ptr(),
534                Some(buffer_finalized),
535                Arc::into_raw(finalized.clone()) as *mut _,
536            )
537        };
538        // return the buffer to the pool
539        std::mem::drop(buffer);
540        // drop should finalize the buffer pool which frees all allocated buffers
541        std::mem::drop(pool);
542        assert!(finalized.load(Ordering::SeqCst));
543    }
544
545    #[test]
546    fn test_pool_free_on_deactivate() {
547        crate::init().unwrap();
548        let pool = TestBufferPool::default();
549        let mut config = pool.config();
550        config.set_params(None, 1024, 1, 1);
551        pool.set_config(config).expect("failed to set pool config");
552        pool.set_active(true).expect("failed to activate pool");
553        let mut buffer = pool
554            .acquire_buffer(None)
555            .expect("failed to acquire buffer from pool");
556        let finalized = Arc::new(AtomicBool::new(false));
557        unsafe {
558            ffi::gst_mini_object_weak_ref(
559                buffer.make_mut().upcast_mut().as_mut_ptr(),
560                Some(buffer_finalized),
561                Arc::into_raw(finalized.clone()) as *mut _,
562            )
563        };
564        // return the buffer to the pool
565        std::mem::drop(buffer);
566        // de-activating a poll should free all buffers
567        pool.set_active(false).expect("failed to de-activate pool");
568        assert!(finalized.load(Ordering::SeqCst));
569    }
570
571    unsafe extern "C" fn buffer_finalized(
572        data: *mut libc::c_void,
573        _mini_object: *mut ffi::GstMiniObject,
574    ) {
575        let finalized = Arc::from_raw(data as *const AtomicBool);
576        finalized.store(true, Ordering::SeqCst);
577    }
578}