gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{TaskPool, ffi};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    unsafe {
11        let func = Box::from_raw(data as *mut P);
12        func()
13    }
14}
15
16pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
17    #[doc(alias = "gst_task_pool_push")]
18    fn push<P: FnOnce() + Send + 'static>(
19        &self,
20        func: P,
21    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
22        unsafe {
23            let mut error = ptr::null_mut();
24            let func: Box<P> = Box::new(func);
25            let func = Box::into_raw(func);
26
27            let handle = ffi::gst_task_pool_push(
28                self.as_ref().to_glib_none().0,
29                Some(task_pool_trampoline::<P>),
30                func as gpointer,
31                &mut error,
32            );
33
34            if !error.is_null() {
35                debug_assert!(handle.is_null());
36
37                // Assume that task_pool_trampoline was
38                // not called and will not be called
39                drop(Box::from_raw(func));
40
41                return Err(from_glib_full(error));
42            }
43
44            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
45                handle,
46                task_pool: Some(self.as_ref().clone()),
47            });
48
49            Ok(handle)
50        }
51    }
52}
53
54impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
55
56impl TaskPool {
57    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
58        unsafe { ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr()) }
59    }
60
61    #[cfg(feature = "v1_20")]
62    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
63    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
64        unsafe { ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr()) }
65    }
66}
67
68// rustdoc-stripper-ignore-next
69/// A handle for a task which was pushed to a task pool.
70pub trait TaskHandle {
71    // rustdoc-stripper-ignore-next
72    /// Wait for the task to complete.
73    fn join(self);
74}
75
76impl TaskHandle for () {
77    fn join(self) {}
78}
79
80impl TaskHandle for std::convert::Infallible {
81    fn join(self) {}
82}
83
84// rustdoc-stripper-ignore-next
85/// An opaque handle for a task associated with a particular task pool.
86///
87/// Keeps a reference to the pool alive.
88///
89/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
90/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
91#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
92#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
93pub struct TaskPoolTaskHandle {
94    handle: ptr::NonNull<libc::c_void>,
95    task_pool: Option<TaskPool>,
96}
97
98impl TaskHandle for TaskPoolTaskHandle {
99    #[doc(alias = "gst_task_pool_join")]
100    fn join(mut self) {
101        let task_pool = self.task_pool.take().unwrap();
102        unsafe { task_pool.join(self.handle) }
103    }
104}
105
106impl Drop for TaskPoolTaskHandle {
107    #[doc(alias = "gst_task_pool_dispose_handle")]
108    #[inline]
109    fn drop(&mut self) {
110        if let Some(task_pool) = self.task_pool.take() {
111            cfg_if::cfg_if! {
112                if #[cfg(feature = "v1_20")] {
113                    unsafe { task_pool.dispose_handle(self.handle) }
114                } else {
115                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
116                }
117            }
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use std::sync::mpsc::{RecvError, channel};
125
126    use super::*;
127    use crate::prelude::*;
128
129    #[test]
130    fn test_simple() {
131        crate::init().unwrap();
132        let pool = TaskPool::new();
133        pool.prepare().unwrap();
134
135        let (sender, receiver) = channel();
136
137        let handle = pool
138            .push(move || {
139                sender.send(()).unwrap();
140            })
141            .unwrap();
142
143        assert_eq!(receiver.recv(), Ok(()));
144
145        if let Some(handle) = handle {
146            handle.join();
147        }
148
149        // Can't test try_recv here as the default task pool produces no
150        // handles and thus no way to wait for channel destruction
151        assert_eq!(receiver.recv(), Err(RecvError));
152
153        pool.cleanup();
154    }
155}