1use std::ptr;
4
5use glib::{ffi::gpointer, prelude::*, translate::*};
6
7use crate::{ffi, TaskPool};
8
9unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
10 let func = Box::from_raw(data as *mut P);
11 func()
12}
13
14pub trait TaskPoolExtManual: IsA<TaskPool> + 'static {
15 #[doc(alias = "gst_task_pool_push")]
16 fn push<P: FnOnce() + Send + 'static>(
17 &self,
18 func: P,
19 ) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
20 unsafe {
21 let mut error = ptr::null_mut();
22 let func: Box<P> = Box::new(func);
23 let func = Box::into_raw(func);
24
25 let handle = ffi::gst_task_pool_push(
26 self.as_ref().to_glib_none().0,
27 Some(task_pool_trampoline::<P>),
28 func as gpointer,
29 &mut error,
30 );
31
32 if !error.is_null() {
33 debug_assert!(handle.is_null());
34
35 drop(Box::from_raw(func));
38
39 return Err(from_glib_full(error));
40 }
41
42 let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
43 handle,
44 task_pool: Some(self.as_ref().clone()),
45 });
46
47 Ok(handle)
48 }
49 }
50}
51
52impl<O: IsA<TaskPool>> TaskPoolExtManual for O {}
53
54impl TaskPool {
55 unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
56 ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
57 }
58
59 #[cfg(feature = "v1_20")]
60 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
61 unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
62 ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
63 }
64}
65
66pub trait TaskHandle {
69 fn join(self);
72}
73
74impl TaskHandle for () {
75 fn join(self) {}
76}
77
78impl TaskHandle for std::convert::Infallible {
79 fn join(self) {}
80}
81
82#[cfg_attr(not(any(feature = "v1_20", docsrs)), must_use)]
90#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
91pub struct TaskPoolTaskHandle {
92 handle: ptr::NonNull<libc::c_void>,
93 task_pool: Option<TaskPool>,
94}
95
96impl TaskHandle for TaskPoolTaskHandle {
97 #[doc(alias = "gst_task_pool_join")]
98 fn join(mut self) {
99 let task_pool = self.task_pool.take().unwrap();
100 unsafe { task_pool.join(self.handle) }
101 }
102}
103
104impl Drop for TaskPoolTaskHandle {
105 #[doc(alias = "gst_task_pool_dispose_handle")]
106 #[inline]
107 fn drop(&mut self) {
108 if let Some(task_pool) = self.task_pool.take() {
109 cfg_if::cfg_if! {
110 if #[cfg(feature = "v1_20")] {
111 unsafe { task_pool.dispose_handle(self.handle) }
112 } else {
113 crate::warning!(crate::CAT_RUST, obj = &task_pool, "Leaked task handle");
114 }
115 }
116 }
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::sync::mpsc::{channel, RecvError};
123
124 use super::*;
125 use crate::prelude::*;
126
127 #[test]
128 fn test_simple() {
129 crate::init().unwrap();
130 let pool = TaskPool::new();
131 pool.prepare().unwrap();
132
133 let (sender, receiver) = channel();
134
135 let handle = pool
136 .push(move || {
137 sender.send(()).unwrap();
138 })
139 .unwrap();
140
141 assert_eq!(receiver.recv(), Ok(()));
142
143 if let Some(handle) = handle {
144 handle.join();
145 }
146
147 assert_eq!(receiver.recv(), Err(RecvError));
150
151 pool.cleanup();
152 }
153}