gstreamer/subclass/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    hash::{Hash, Hasher},
5    ptr,
6    sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{ffi, TaskHandle, TaskPool};
13
14pub trait TaskPoolImpl: GstObjectImpl + Send + Sync {
15    // rustdoc-stripper-ignore-next
16    /// Handle to be returned from the `push` function to allow the caller to wait for the task's
17    /// completion.
18    ///
19    /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle
20    /// that does nothing on `join` or drop.
21    type Handle: TaskHandle;
22
23    // rustdoc-stripper-ignore-next
24    /// Prepare the task pool to accept tasks.
25    ///
26    /// This defaults to doing nothing.
27    fn prepare(&self) -> Result<(), glib::Error> {
28        Ok(())
29    }
30
31    // rustdoc-stripper-ignore-next
32    /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped.
33    ///
34    /// This is mainly used internally to ensure proper cleanup of internal data structures in test
35    /// suites.
36    fn cleanup(&self) {}
37
38    // rustdoc-stripper-ignore-next
39    /// Deliver a task to the pool.
40    ///
41    /// If returning `Ok`, you need to call the `func` eventually.
42    ///
43    /// If returning `Err`, the `func` must be dropped without calling it.
44    fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
45}
46
47unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
48    fn class_init(klass: &mut glib::Class<Self>) {
49        Self::parent_class_init::<T>(klass);
50        let klass = klass.as_mut();
51        klass.prepare = Some(task_pool_prepare::<T>);
52        klass.cleanup = Some(task_pool_cleanup::<T>);
53        klass.push = Some(task_pool_push::<T>);
54        klass.join = Some(task_pool_join::<T>);
55
56        #[cfg(feature = "v1_20")]
57        {
58            klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
59        }
60    }
61}
62
63unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
64    ptr: *mut ffi::GstTaskPool,
65    error: *mut *mut glib::ffi::GError,
66) {
67    let instance = &*(ptr as *mut T::Instance);
68    let imp = instance.imp();
69
70    match imp.prepare() {
71        Ok(()) => {}
72        Err(err) => {
73            if !error.is_null() {
74                *error = err.into_glib_ptr();
75            }
76        }
77    }
78}
79
80unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
81    let instance = &*(ptr as *mut T::Instance);
82    let imp = instance.imp();
83
84    imp.cleanup();
85}
86
87unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
88    ptr: *mut ffi::GstTaskPool,
89    func: ffi::GstTaskPoolFunction,
90    user_data: gpointer,
91    error: *mut *mut glib::ffi::GError,
92) -> gpointer {
93    let instance = &*(ptr as *mut T::Instance);
94    let imp = instance.imp();
95
96    let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
97
98    match imp.push(func.clone()) {
99        Ok(None) => ptr::null_mut(),
100        Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
101        Err(err) => {
102            func.prevent_call();
103            if !error.is_null() {
104                *error = err.into_glib_ptr();
105            }
106            ptr::null_mut()
107        }
108    }
109}
110
111unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
112    if id.is_null() {
113        let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
114        crate::warning!(
115            crate::CAT_RUST,
116            obj = wrap.as_ref(),
117            "Tried to join null handle"
118        );
119        return;
120    }
121
122    let handle = Box::from_raw(id as *mut T::Handle);
123    handle.join();
124}
125
126#[cfg(feature = "v1_20")]
127#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
128unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
129    ptr: *mut ffi::GstTaskPool,
130    id: gpointer,
131) {
132    if id.is_null() {
133        let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
134        crate::warning!(
135            crate::CAT_RUST,
136            obj = wrap.as_ref(),
137            "Tried to dispose null handle"
138        );
139        return;
140    }
141
142    let handle = Box::from_raw(id as *mut T::Handle);
143    drop(handle);
144}
145
146// rustdoc-stripper-ignore-next
147/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push).
148#[derive(Debug)]
149pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
150
151// `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function
152// has not been called and will never be called after `push` returns `Err`.
153
154#[derive(Debug)]
155struct TaskPoolFunctionInner {
156    func: unsafe extern "C" fn(gpointer),
157    user_data: gpointer,
158    warn_on_drop: bool,
159}
160
161unsafe impl Send for TaskPoolFunctionInner {}
162
163impl TaskPoolFunction {
164    fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
165        let inner = TaskPoolFunctionInner {
166            func,
167            user_data,
168            warn_on_drop: true,
169        };
170        Self(Arc::new(Mutex::new(Some(inner))))
171    }
172
173    #[inline]
174    fn clone(&self) -> Self {
175        Self(self.0.clone())
176    }
177
178    // rustdoc-stripper-ignore-next
179    /// Consume and execute the function.
180    pub fn call(self) {
181        let mut inner = self
182            .0
183            .lock()
184            .unwrap()
185            .take()
186            .expect("TaskPoolFunction has already been dropped");
187        inner.warn_on_drop = false;
188        unsafe { (inner.func)(inner.user_data) }
189    }
190
191    fn prevent_call(self) {
192        let mut inner = self
193            .0
194            .lock()
195            .unwrap()
196            .take()
197            .expect("TaskPoolFunction has already been called");
198        inner.warn_on_drop = false;
199        drop(inner);
200    }
201
202    #[inline]
203    fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
204        Arc::as_ptr(&self.0)
205    }
206}
207
208impl Drop for TaskPoolFunctionInner {
209    fn drop(&mut self) {
210        if self.warn_on_drop {
211            crate::warning!(crate::CAT_RUST, "Leaked task function");
212        }
213    }
214}
215
216impl PartialEq for TaskPoolFunction {
217    fn eq(&self, other: &Self) -> bool {
218        self.as_ptr() == other.as_ptr()
219    }
220}
221
222impl Eq for TaskPoolFunction {}
223
224impl PartialOrd for TaskPoolFunction {
225    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226        Some(self.cmp(other))
227    }
228}
229
230impl Ord for TaskPoolFunction {
231    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
232        self.as_ptr().cmp(&other.as_ptr())
233    }
234}
235
236impl Hash for TaskPoolFunction {
237    fn hash<H: Hasher>(&self, state: &mut H) {
238        self.as_ptr().hash(state)
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::{
245        sync::{
246            atomic,
247            mpsc::{channel, TryRecvError},
248        },
249        thread,
250    };
251
252    use super::*;
253    use crate::prelude::*;
254
255    pub mod imp {
256        use super::*;
257
258        #[derive(Default)]
259        pub struct TestPool {
260            pub(super) prepared: atomic::AtomicBool,
261            pub(super) cleaned_up: atomic::AtomicBool,
262        }
263
264        #[glib::object_subclass]
265        impl ObjectSubclass for TestPool {
266            const NAME: &'static str = "TestPool";
267            type Type = super::TestPool;
268            type ParentType = TaskPool;
269        }
270
271        impl ObjectImpl for TestPool {}
272
273        impl GstObjectImpl for TestPool {}
274
275        impl TaskPoolImpl for TestPool {
276            type Handle = TestHandle;
277
278            fn prepare(&self) -> Result<(), glib::Error> {
279                self.prepared.store(true, atomic::Ordering::SeqCst);
280                Ok(())
281            }
282
283            fn cleanup(&self) {
284                self.cleaned_up.store(true, atomic::Ordering::SeqCst);
285            }
286
287            fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
288                let handle = thread::spawn(move || func.call());
289                Ok(Some(TestHandle(handle)))
290            }
291        }
292
293        pub struct TestHandle(thread::JoinHandle<()>);
294
295        impl TaskHandle for TestHandle {
296            fn join(self) {
297                self.0.join().unwrap();
298            }
299        }
300    }
301
302    glib::wrapper! {
303        pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
304    }
305
306    unsafe impl Send for TestPool {}
307    unsafe impl Sync for TestPool {}
308
309    impl TestPool {
310        pub fn new() -> Self {
311            Self::default()
312        }
313    }
314
315    impl Default for TestPool {
316        fn default() -> Self {
317            glib::Object::new()
318        }
319    }
320
321    #[test]
322    fn test_simple_subclass() {
323        crate::init().unwrap();
324
325        let pool = TestPool::new();
326        pool.prepare().unwrap();
327
328        let (sender, receiver) = channel();
329
330        let handle = pool
331            .push(move || {
332                sender.send(()).unwrap();
333            })
334            .unwrap();
335        let handle = handle.unwrap();
336
337        assert_eq!(receiver.recv(), Ok(()));
338
339        handle.join();
340        assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
341
342        pool.cleanup();
343
344        let imp = pool.imp();
345        assert!(imp.prepared.load(atomic::Ordering::SeqCst));
346        assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
347    }
348}