gstreamer/
task_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10    let func = Box::from_raw(data as *mut P);
11    func()
12}
13
14pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
15    #[doc(alias = "gst_task_pool_push")]
16    fn push<P: FnOnce() + Send + 'static>(
17        &self,
18        func: P,
19    ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
20        unsafe {
21            let mut error = ptr::null_mut();
22            let func: Box<P> = Box::new(func);
23            let func = Box::into_raw(func);
24
25            let handle = ffi::gst_task_pool_push(
26                self.as_ref().to_glib_none().0,
27                Some(task_pool_trampoline::<P>),
28                func as gpointer,
29                &mut error,
30            );
31
32            if !error.is_null() {
33                debug_assert!(handle.is_null());
34
35                // Assume that task_pool_trampoline was
36                // not called and will not be called
37                drop(Box::from_raw(func));
38
39                return Err(from_glib_full(error));
40            }
41
42            let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
43                handle,
44                task_pool: Some(self.as_ref().clone()),
45            });
46
47            Ok(handle)
48        }
49    }
50}
51
52impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
53
54impl TaskPool {
55    unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
56        ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
57    }
58
59    #[cfg(feature = "v1_20")]
60    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
61    unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
62        ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
63    }
64}
65
66// rustdoc-stripper-ignore-next
67/// A handle for a task which was pushed to a task pool.
68pub trait TaskHandle {
69    // rustdoc-stripper-ignore-next
70    /// Wait for the task to complete.
71    fn join(self);
72}
73
74impl TaskHandle for () {
75    fn join(self) {}
76}
77
78impl TaskHandle for std::convert::Infallible {
79    fn join(self) {}
80}
81
82// rustdoc-stripper-ignore-next
83/// An opaque handle for a task associated with a particular task pool.
84///
85/// Keeps a reference to the pool alive.
86///
87/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
88/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
89#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
90#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
91pub struct TaskPoolTaskHandle {
92    handle: ptr::NonNull<libc::c_void>,
93    task_pool: Option<TaskPool>,
94}
95
96impl TaskHandle for TaskPoolTaskHandle {
97    #[doc(alias = "gst_task_pool_join")]
98    fn join(mut self) {
99        let task_pool = self.task_pool.take().unwrap();
100        unsafe { task_pool.join(self.handle) }
101    }
102}
103
104impl Drop for TaskPoolTaskHandle {
105    #[doc(alias = "gst_task_pool_dispose_handle")]
106    #[inline]
107    fn drop(&mut self) {
108        if let Some(task_pool) = self.task_pool.take() {
109            cfg_if::cfg_if! {
110                if #[cfg(feature = "v1_20")] {
111                    unsafe { task_pool.dispose_handle(self.handle) }
112                } else {
113                    crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
114                }
115            }
116        }
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::sync::mpsc::{channel, RecvError};
123
124    use super::*;
125    use crate::prelude::*;
126
127    #[test]
128    fn test_simple() {
129        crate::init().unwrap();
130        let pool = TaskPool::new();
131        pool.prepare().unwrap();
132
133        let (sender, receiver) = channel();
134
135        let handle = pool
136            .push(move || {
137                sender.send(()).unwrap();
138            })
139            .unwrap();
140
141        assert_eq!(receiver.recv(), Ok(()));
142
143        if let Some(handle) = handle {
144            handle.join();
145        }
146
147        // Can't test try_recv here as the default task pool produces no
148        // handles and thus no way to wait for channel destruction
149        assert_eq!(receiver.recv(), Err(RecvError));
150
151        pool.cleanup();
152    }
153}