1use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{TaskPool, ffi};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 unsafe {
11 let func = Box::from_raw(data as *mut P);
12 func()
13 }
14}
15
16pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
17 #[doc(alias = "gst_task_pool_push")]
18 fn push<P: FnOnce() + Send + 'static>(
19 &self,
20 func: P,
21 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
22 unsafe {
23 let mut error = ptr::null_mut();
24 let func: Box<P> = Box::new(func);
25 let func = Box::into_raw(func);
26
27 let handle = ffi::gst_task_pool_push(
28 self.as_ref().to_glib_none().0,
29 Some(task_pool_trampoline::<P>),
30 func as gpointer,
31 &mut error,
32 );
33
34 if !error.is_null() {
35 debug_assert!(handle.is_null());
36
37 drop(Box::from_raw(func));
40
41 return Err(from_glib_full(error));
42 }
43
44 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
45 handle,
46 task_pool: Some(self.as_ref().clone()),
47 });
48
49 Ok(handle)
50 }
51 }
52}
53
54impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
55
56impl TaskPool {
57 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
58 unsafe { ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr()) }
59 }
60
61 #[cfg(feature = "v1_20")]
62 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
63 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
64 unsafe { ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr()) }
65 }
66}
67
68pub trait TaskHandle {
71 fn join(self);
74}
75
76impl TaskHandle for () {
77 fn join(self) {}
78}
79
80impl TaskHandle for std::convert::Infallible {
81 fn join(self) {}
82}
83
84#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
92#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
93pub struct TaskPoolTaskHandle {
94 handle: ptr::NonNull<libc::c_void>,
95 task_pool: Option<TaskPool>,
96}
97
98impl TaskHandle for TaskPoolTaskHandle {
99 #[doc(alias = "gst_task_pool_join")]
100 fn join(mut self) {
101 let task_pool = self.task_pool.take().unwrap();
102 unsafe { task_pool.join(self.handle) }
103 }
104}
105
106impl Drop for TaskPoolTaskHandle {
107 #[doc(alias = "gst_task_pool_dispose_handle")]
108 #[inline]
109 fn drop(&mut self) {
110 if let Some(task_pool) = self.task_pool.take() {
111 cfg_if::cfg_if! {
112 if #[cfg(feature = "v1_20")] {
113 unsafe { task_pool.dispose_handle(self.handle) }
114 } else {
115 crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
116 }
117 }
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::sync::mpsc::{RecvError, channel};
125
126 use super::*;
127 use crate::prelude::*;
128
129 #[test]
130 fn test_simple() {
131 crate::init().unwrap();
132 let pool = TaskPool::new();
133 pool.prepare().unwrap();
134
135 let (sender, receiver) = channel();
136
137 let handle = pool
138 .push(move || {
139 sender.send(()).unwrap();
140 })
141 .unwrap();
142
143 assert_eq!(receiver.recv(), Ok(()));
144
145 if let Some(handle) = handle {
146 handle.join();
147 }
148
149 assert_eq!(receiver.recv(), Err(RecvError));
152
153 pool.cleanup();
154 }
155}