gstreamer/subclass/
buffer_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{
6    prelude::*,
7    subclass::{InitializingObject, prelude::*},
8    translate::*,
9};
10use libc::c_char;
11
12use super::prelude::*;
13use crate::{BufferPool, BufferPoolAcquireParams, BufferPoolConfigRef, ffi};
14
15pub trait BufferPoolImpl: GstObjectImpl + ObjectSubclass<Type: IsA<BufferPool>> {
16    fn acquire_buffer(
17        &self,
18        params: Option<&BufferPoolAcquireParams>,
19    ) -> Result<crate::Buffer, crate::FlowError> {
20        self.parent_acquire_buffer(params)
21    }
22
23    fn alloc_buffer(
24        &self,
25        params: Option<&BufferPoolAcquireParams>,
26    ) -> Result<crate::Buffer, crate::FlowError> {
27        self.parent_alloc_buffer(params)
28    }
29
30    fn flush_start(&self) {
31        self.parent_flush_start()
32    }
33
34    fn flush_stop(&self) {
35        self.parent_flush_stop()
36    }
37
38    fn free_buffer(&self, buffer: crate::Buffer) {
39        self.parent_free_buffer(buffer)
40    }
41
42    fn release_buffer(&self, buffer: crate::Buffer) {
43        self.parent_release_buffer(buffer)
44    }
45
46    fn reset_buffer(&self, buffer: &mut crate::BufferRef) {
47        self.parent_reset_buffer(buffer)
48    }
49
50    fn start(&self) -> bool {
51        self.parent_start()
52    }
53
54    fn stop(&self) -> bool {
55        self.parent_stop()
56    }
57
58    fn options() -> &'static [&'static str] {
59        &[]
60    }
61
62    fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
63        self.parent_set_config(config)
64    }
65}
66
67pub trait BufferPoolImplExt: BufferPoolImpl {
68    fn parent_acquire_buffer(
69        &self,
70        params: Option<&BufferPoolAcquireParams>,
71    ) -> Result<crate::Buffer, crate::FlowError> {
72        unsafe {
73            let data = Self::type_data();
74            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
75            if let Some(f) = (*parent_class).acquire_buffer {
76                let params_ptr = mut_override(params.to_glib_none().0);
77                let mut buffer = std::ptr::null_mut();
78
79                let result = f(
80                    self.obj()
81                        .unsafe_cast_ref::<crate::BufferPool>()
82                        .to_glib_none()
83                        .0,
84                    &mut buffer,
85                    params_ptr,
86                );
87
88                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
89            } else {
90                Err(crate::FlowError::NotSupported)
91            }
92        }
93    }
94
95    fn parent_alloc_buffer(
96        &self,
97        params: Option<&BufferPoolAcquireParams>,
98    ) -> Result<crate::Buffer, crate::FlowError> {
99        unsafe {
100            let data = Self::type_data();
101            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
102            if let Some(f) = (*parent_class).alloc_buffer {
103                let params_ptr = mut_override(params.to_glib_none().0);
104                let mut buffer = std::ptr::null_mut();
105
106                let result = f(
107                    self.obj()
108                        .unsafe_cast_ref::<crate::BufferPool>()
109                        .to_glib_none()
110                        .0,
111                    &mut buffer,
112                    params_ptr,
113                );
114
115                crate::FlowSuccess::try_from_glib(result).map(|_| from_glib_full(buffer))
116            } else {
117                Err(crate::FlowError::NotSupported)
118            }
119        }
120    }
121
122    fn parent_free_buffer(&self, buffer: crate::Buffer) {
123        unsafe {
124            let data = Self::type_data();
125            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
126            if let Some(f) = (*parent_class).free_buffer {
127                f(
128                    self.obj()
129                        .unsafe_cast_ref::<crate::BufferPool>()
130                        .to_glib_none()
131                        .0,
132                    buffer.into_glib_ptr(),
133                )
134            }
135        }
136    }
137
138    fn parent_release_buffer(&self, buffer: crate::Buffer) {
139        unsafe {
140            let data = Self::type_data();
141            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
142            if let Some(f) = (*parent_class).release_buffer {
143                f(
144                    self.obj()
145                        .unsafe_cast_ref::<crate::BufferPool>()
146                        .to_glib_none()
147                        .0,
148                    buffer.into_glib_ptr(),
149                )
150            }
151        }
152    }
153
154    fn parent_reset_buffer(&self, buffer: &mut crate::BufferRef) {
155        unsafe {
156            let data = Self::type_data();
157            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
158            if let Some(f) = (*parent_class).reset_buffer {
159                f(
160                    self.obj()
161                        .unsafe_cast_ref::<crate::BufferPool>()
162                        .to_glib_none()
163                        .0,
164                    buffer.as_mut_ptr(),
165                )
166            }
167        }
168    }
169
170    fn parent_start(&self) -> bool {
171        unsafe {
172            let data = Self::type_data();
173            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
174            if let Some(f) = (*parent_class).start {
175                let result = f(self
176                    .obj()
177                    .unsafe_cast_ref::<crate::BufferPool>()
178                    .to_glib_none()
179                    .0);
180
181                from_glib(result)
182            } else {
183                true
184            }
185        }
186    }
187
188    fn parent_stop(&self) -> bool {
189        unsafe {
190            let data = Self::type_data();
191            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
192            if let Some(f) = (*parent_class).stop {
193                let result = f(self
194                    .obj()
195                    .unsafe_cast_ref::<crate::BufferPool>()
196                    .to_glib_none()
197                    .0);
198
199                from_glib(result)
200            } else {
201                true
202            }
203        }
204    }
205
206    fn parent_set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
207        unsafe {
208            let data = Self::type_data();
209            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
210            if let Some(f) = (*parent_class).set_config {
211                let result = f(
212                    self.obj()
213                        .unsafe_cast_ref::<crate::BufferPool>()
214                        .to_glib_none()
215                        .0,
216                    (*config).as_mut_ptr(),
217                );
218
219                from_glib(result)
220            } else {
221                false
222            }
223        }
224    }
225
226    fn parent_flush_start(&self) {
227        unsafe {
228            let data = Self::type_data();
229            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
230            if let Some(f) = (*parent_class).flush_start {
231                f(self
232                    .obj()
233                    .unsafe_cast_ref::<crate::BufferPool>()
234                    .to_glib_none()
235                    .0)
236            }
237        }
238    }
239
240    fn parent_flush_stop(&self) {
241        unsafe {
242            let data = Self::type_data();
243            let parent_class = data.as_ref().parent_class() as *mut ffi::GstBufferPoolClass;
244            if let Some(f) = (*parent_class).flush_stop {
245                f(self
246                    .obj()
247                    .unsafe_cast_ref::<crate::BufferPool>()
248                    .to_glib_none()
249                    .0)
250            }
251        }
252    }
253}
254
255impl<T: BufferPoolImpl> BufferPoolImplExt for T {}
256
257unsafe impl<T: BufferPoolImpl> IsSubclassable<T> for BufferPool {
258    fn class_init(klass: &mut glib::Class<Self>) {
259        Self::parent_class_init::<T>(klass);
260        let klass = klass.as_mut();
261        klass.acquire_buffer = Some(buffer_pool_acquire_buffer::<T>);
262        klass.alloc_buffer = Some(buffer_pool_alloc_buffer::<T>);
263        klass.release_buffer = Some(buffer_pool_release_buffer::<T>);
264        klass.reset_buffer = Some(buffer_pool_reset_buffer::<T>);
265        klass.start = Some(buffer_pool_start::<T>);
266        klass.stop = Some(buffer_pool_stop::<T>);
267        klass.get_options = Some(buffer_pool_get_options::<T>);
268        klass.set_config = Some(buffer_pool_set_config::<T>);
269        klass.flush_start = Some(buffer_pool_flush_start::<T>);
270        klass.flush_stop = Some(buffer_pool_flush_stop::<T>);
271        klass.free_buffer = Some(buffer_pool_free_buffer::<T>);
272    }
273
274    fn instance_init(instance: &mut InitializingObject<T>) {
275        Self::parent_instance_init(instance);
276
277        // Store the pool options in the instance data
278        // for later retrieval in buffer_pool_get_options
279        let options = T::options();
280        instance.set_instance_data(T::type_(), glib::StrV::from(options));
281    }
282}
283
284unsafe extern "C" fn buffer_pool_acquire_buffer<T: BufferPoolImpl>(
285    ptr: *mut ffi::GstBufferPool,
286    buffer: *mut *mut ffi::GstBuffer,
287    params: *mut ffi::GstBufferPoolAcquireParams,
288) -> ffi::GstFlowReturn {
289    unsafe {
290        let instance = &*(ptr as *mut T::Instance);
291        let imp = instance.imp();
292        let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
293
294        match imp.acquire_buffer(params.as_ref()) {
295            Ok(b) => {
296                *buffer = b.into_glib_ptr();
297                ffi::GST_FLOW_OK
298            }
299            Err(err) => err.into_glib(),
300        }
301    }
302}
303
304unsafe extern "C" fn buffer_pool_alloc_buffer<T: BufferPoolImpl>(
305    ptr: *mut ffi::GstBufferPool,
306    buffer: *mut *mut ffi::GstBuffer,
307    params: *mut ffi::GstBufferPoolAcquireParams,
308) -> ffi::GstFlowReturn {
309    unsafe {
310        let instance = &*(ptr as *mut T::Instance);
311        let imp = instance.imp();
312        let params: Option<BufferPoolAcquireParams> = from_glib_none(params);
313
314        match imp.alloc_buffer(params.as_ref()) {
315            Ok(b) => {
316                *buffer = b.into_glib_ptr();
317                ffi::GST_FLOW_OK
318            }
319            Err(err) => err.into_glib(),
320        }
321    }
322}
323
324unsafe extern "C" fn buffer_pool_flush_start<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
325    unsafe {
326        // the GstBufferPool implementation calls this
327        // in finalize where the ref_count will already
328        // be zero and we are actually destroyed
329        // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
330        if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
331            // flush_start is a no-op in GstBufferPool
332            return;
333        }
334
335        let instance = &*(ptr as *mut T::Instance);
336        let imp = instance.imp();
337        imp.flush_start();
338    }
339}
340
341unsafe extern "C" fn buffer_pool_flush_stop<T: BufferPoolImpl>(ptr: *mut ffi::GstBufferPool) {
342    unsafe {
343        // the GstBufferPool implementation calls this
344        // in finalize where the ref_count will already
345        // be zero and we are actually destroyed
346        // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
347        if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
348            // flush_stop is a no-op in GstBufferPool
349            return;
350        }
351
352        let instance = &*(ptr as *mut T::Instance);
353        let imp = instance.imp();
354        imp.flush_stop();
355    }
356}
357
358unsafe extern "C" fn buffer_pool_free_buffer<T: BufferPoolImpl>(
359    ptr: *mut ffi::GstBufferPool,
360    buffer: *mut ffi::GstBuffer,
361) {
362    unsafe {
363        // the GstBufferPool implementation calls this
364        // in finalize where the ref_count will already
365        // be zero and we are actually destroyed
366        // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
367        if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
368            // As a workaround we call free_buffer directly on the
369            // GstBufferPool to prevent leaking the buffer
370            // This will NOT call free_buffer on a subclass.
371            let pool_class =
372                glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type())
373                    .unwrap();
374            let pool_class = pool_class.as_ref();
375            if let Some(f) = pool_class.free_buffer {
376                f(ptr, buffer)
377            }
378            return;
379        }
380
381        let instance = &*(ptr as *mut T::Instance);
382        let imp = instance.imp();
383        imp.free_buffer(from_glib_full(buffer));
384    }
385}
386
387unsafe extern "C" fn buffer_pool_release_buffer<T: BufferPoolImpl>(
388    ptr: *mut ffi::GstBufferPool,
389    buffer: *mut ffi::GstBuffer,
390) {
391    unsafe {
392        let instance = &*(ptr as *mut T::Instance);
393        let imp = instance.imp();
394        imp.release_buffer(from_glib_full(buffer));
395    }
396}
397
398unsafe extern "C" fn buffer_pool_reset_buffer<T: BufferPoolImpl>(
399    ptr: *mut ffi::GstBufferPool,
400    buffer: *mut ffi::GstBuffer,
401) {
402    unsafe {
403        let instance = &*(ptr as *mut T::Instance);
404        let imp = instance.imp();
405        imp.reset_buffer(crate::BufferRef::from_mut_ptr(buffer));
406    }
407}
408
409unsafe extern "C" fn buffer_pool_start<T: BufferPoolImpl>(
410    ptr: *mut ffi::GstBufferPool,
411) -> glib::ffi::gboolean {
412    unsafe {
413        let instance = &*(ptr as *mut T::Instance);
414        let imp = instance.imp();
415        imp.start().into_glib()
416    }
417}
418
419unsafe extern "C" fn buffer_pool_stop<T: BufferPoolImpl>(
420    ptr: *mut ffi::GstBufferPool,
421) -> glib::ffi::gboolean {
422    unsafe {
423        // the GstBufferPool implementation calls this
424        // in finalize where the ref_count will already
425        // be zero and we are actually destroyed
426        // see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1645
427        if (*(ptr as *const glib::gobject_ffi::GObject)).ref_count == 0 {
428            // As a workaround we call stop directly on the GstBufferPool
429            // This is needed because the default implementation clears
430            // the pool in stop.
431            let pool_class =
432                glib::Class::<crate::BufferPool>::from_type(crate::BufferPool::static_type())
433                    .unwrap();
434            let pool_class = pool_class.as_ref();
435            let result = if let Some(f) = pool_class.stop {
436                f(ptr)
437            } else {
438                true.into_glib()
439            };
440
441            return result;
442        }
443
444        let instance = &*(ptr as *mut T::Instance);
445        let imp = instance.imp();
446        imp.stop().into_glib()
447    }
448}
449
450unsafe extern "C" fn buffer_pool_get_options<T: BufferPoolImpl>(
451    ptr: *mut ffi::GstBufferPool,
452) -> *mut *const c_char {
453    unsafe {
454        let instance = &*(ptr as *mut T::Instance);
455        let imp = instance.imp();
456        T::instance_data::<glib::StrV>(imp, T::type_())
457            .map(|p| p.as_ptr() as *mut *const _)
458            .unwrap_or(ptr::null_mut())
459    }
460}
461
462unsafe extern "C" fn buffer_pool_set_config<T: BufferPoolImpl>(
463    ptr: *mut ffi::GstBufferPool,
464    config: *mut ffi::GstStructure,
465) -> glib::ffi::gboolean {
466    unsafe {
467        let instance = &*(ptr as *mut T::Instance);
468        let imp = instance.imp();
469        imp.set_config(BufferPoolConfigRef::from_glib_borrow_mut(config))
470            .into_glib()
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use std::sync::{
477        Arc,
478        atomic::{AtomicBool, Ordering},
479    };
480
481    use super::*;
482    use crate::prelude::*;
483
484    pub mod imp {
485        use super::*;
486
487        #[derive(Default)]
488        pub struct TestBufferPool;
489
490        impl ObjectImpl for TestBufferPool {}
491        impl GstObjectImpl for TestBufferPool {}
492        impl BufferPoolImpl for TestBufferPool {
493            fn options() -> &'static [&'static str] {
494                &["TEST_OPTION"]
495            }
496
497            fn set_config(&self, config: &mut BufferPoolConfigRef) -> bool {
498                let (caps, size, min_buffers, max_buffers) = config.params().unwrap();
499                config.set_params(caps.as_ref(), size * 2, min_buffers, max_buffers);
500                self.parent_set_config(config)
501            }
502        }
503
504        #[glib::object_subclass]
505        impl ObjectSubclass for TestBufferPool {
506            const NAME: &'static str = "TestBufferPool";
507            type Type = super::TestBufferPool;
508            type ParentType = BufferPool;
509        }
510    }
511
512    glib::wrapper! {
513        pub struct TestBufferPool(ObjectSubclass<imp::TestBufferPool>) @extends BufferPool, crate::Object;
514    }
515
516    impl Default for TestBufferPool {
517        fn default() -> Self {
518            glib::Object::new()
519        }
520    }
521
522    #[test]
523    fn test_pool_options() {
524        crate::init().unwrap();
525        let pool = TestBufferPool::default();
526        assert_eq!(pool.options(), vec!["TEST_OPTION"]);
527    }
528
529    #[test]
530    fn test_pool_acquire() {
531        crate::init().unwrap();
532        let pool = TestBufferPool::default();
533        let mut config = pool.config();
534        config.set_params(None, 1024, 1, 1);
535        pool.set_config(config).expect("failed to set pool config");
536        pool.set_active(true).expect("failed to activate pool");
537        let buffer = pool
538            .acquire_buffer(None)
539            .expect("failed to acquire buffer from pool");
540        assert_eq!(buffer.size(), 2048);
541    }
542
543    #[test]
544    fn test_pool_free_on_finalize() {
545        crate::init().unwrap();
546        let pool = TestBufferPool::default();
547        let mut config = pool.config();
548        config.set_params(None, 1024, 1, 1);
549        pool.set_config(config).expect("failed to set pool config");
550        pool.set_active(true).expect("failed to activate pool");
551        let mut buffer = pool
552            .acquire_buffer(None)
553            .expect("failed to acquire buffer from pool");
554        let finalized = Arc::new(AtomicBool::new(false));
555        unsafe {
556            ffi::gst_mini_object_weak_ref(
557                buffer.make_mut().upcast_mut().as_mut_ptr(),
558                Some(buffer_finalized),
559                Arc::into_raw(finalized.clone()) as *mut _,
560            )
561        };
562        // return the buffer to the pool
563        std::mem::drop(buffer);
564        // drop should finalize the buffer pool which frees all allocated buffers
565        std::mem::drop(pool);
566        assert!(finalized.load(Ordering::SeqCst));
567    }
568
569    #[test]
570    fn test_pool_free_on_deactivate() {
571        crate::init().unwrap();
572        let pool = TestBufferPool::default();
573        let mut config = pool.config();
574        config.set_params(None, 1024, 1, 1);
575        pool.set_config(config).expect("failed to set pool config");
576        pool.set_active(true).expect("failed to activate pool");
577        let mut buffer = pool
578            .acquire_buffer(None)
579            .expect("failed to acquire buffer from pool");
580        let finalized = Arc::new(AtomicBool::new(false));
581        unsafe {
582            ffi::gst_mini_object_weak_ref(
583                buffer.make_mut().upcast_mut().as_mut_ptr(),
584                Some(buffer_finalized),
585                Arc::into_raw(finalized.clone()) as *mut _,
586            )
587        };
588        // return the buffer to the pool
589        std::mem::drop(buffer);
590        // de-activating a poll should free all buffers
591        pool.set_active(false).expect("failed to de-activate pool");
592        assert!(finalized.load(Ordering::SeqCst));
593    }
594
595    unsafe extern "C" fn buffer_finalized(
596        data: *mut libc::c_void,
597        _mini_object: *mut ffi::GstMiniObject,
598    ) {
599        unsafe {
600            let finalized = Arc::from_raw(data as *const AtomicBool);
601            finalized.store(true, Ordering::SeqCst);
602        }
603    }
604}