gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    let func = Box::from_raw(data as *mut P);
11    func()
12}
13mod sealed {
14    pub trait Sealed {}
15    impl<T: super::IsA<super::TaskPool>> Sealed for T {}
16}
17
18pub trait TaskPoolExtManual: sealed::Sealed + IsA<TaskPool> + 'static {
19    #[doc(alias = "gst_task_pool_push")]
20    fn push<P: FnOnce() + Send + 'static>(
21        &self,
22        func: P,
23    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
24        unsafe {
25            let mut error = ptr::null_mut();
26            let func: Box<P> = Box::new(func);
27            let func = Box::into_raw(func);
28
29            let handle = ffi::gst_task_pool_push(
30                self.as_ref().to_glib_none().0,
31                Some(task_pool_trampoline::<P>),
32                func as gpointer,
33                &mut error,
34            );
35
36            if !error.is_null() {
37                debug_assert!(handle.is_null());
38
39                // Assume that task_pool_trampoline was
40                // not called and will not be called
41                drop(Box::from_raw(func));
42
43                return Err(from_glib_full(error));
44            }
45
46            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
47                handle,
48                task_pool: Some(self.as_ref().clone()),
49            });
50
51            Ok(handle)
52        }
53    }
54}
55
56impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
57
58impl TaskPool {
59    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
60        ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
61    }
62
63    #[cfg(feature = "v1_20")]
64    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
65    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
66        ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
67    }
68}
69
70// rustdoc-stripper-ignore-next
71/// A handle for a task which was pushed to a task pool.
72pub trait TaskHandle {
73    // rustdoc-stripper-ignore-next
74    /// Wait for the task to complete.
75    fn join(self);
76}
77
78impl TaskHandle for () {
79    fn join(self) {}
80}
81
82impl TaskHandle for std::convert::Infallible {
83    fn join(self) {}
84}
85
86// rustdoc-stripper-ignore-next
87/// An opaque handle for a task associated with a particular task pool.
88///
89/// Keeps a reference to the pool alive.
90///
91/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
92/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
93#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
94#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
95pub struct TaskPoolTaskHandle {
96    handle: ptr::NonNull<libc::c_void>,
97    task_pool: Option<TaskPool>,
98}
99
100impl TaskHandle for TaskPoolTaskHandle {
101    #[doc(alias = "gst_task_pool_join")]
102    fn join(mut self) {
103        let task_pool = self.task_pool.take().unwrap();
104        unsafe { task_pool.join(self.handle) }
105    }
106}
107
108impl Drop for TaskPoolTaskHandle {
109    #[doc(alias = "gst_task_pool_dispose_handle")]
110    #[inline]
111    fn drop(&mut self) {
112        if let Some(task_pool) = self.task_pool.take() {
113            cfg_if::cfg_if! {
114                if #[cfg(feature = "v1_20")] {
115                    unsafe { task_pool.dispose_handle(self.handle) }
116                } else {
117                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
118                }
119            }
120        }
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use std::sync::mpsc::{channel, RecvError};
127
128    use super::*;
129    use crate::prelude::*;
130
131    #[test]
132    fn test_simple() {
133        crate::init().unwrap();
134        let pool = TaskPool::new();
135        pool.prepare().unwrap();
136
137        let (sender, receiver) = channel();
138
139        let handle = pool
140            .push(move || {
141                sender.send(()).unwrap();
142            })
143            .unwrap();
144
145        assert_eq!(receiver.recv(), Ok(()));
146
147        if let Some(handle) = handle {
148            handle.join();
149        }
150
151        // Can't test try_recv here as the default task pool produces no
152        // handles and thus no way to wait for channel destruction
153        assert_eq!(receiver.recv(), Err(RecvError));
154
155        pool.cleanup();
156    }
157}