gstreamer/subclass/
task_pool.rs1use std::{
4 hash::{Hash, Hasher},
5 ptr,
6 sync::{Arc, Mutex},
7};
8
9use glib::{ffi::gpointer, subclass::prelude::*, translate::*};
10
11use super::prelude::*;
12use crate::{ffi, TaskHandle, TaskPool};
13
14pub trait TaskPoolImpl: GstObjectImpl + Send + Sync {
15 type Handle: TaskHandle;
22
23 fn prepare(&self) -> Result<(), glib::Error> {
28 Ok(())
29 }
30
31 fn cleanup(&self) {}
37
38 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error>;
45}
46
47unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
48 fn class_init(klass: &mut glib::Class<Self>) {
49 Self::parent_class_init::<T>(klass);
50 let klass = klass.as_mut();
51 klass.prepare = Some(task_pool_prepare::<T>);
52 klass.cleanup = Some(task_pool_cleanup::<T>);
53 klass.push = Some(task_pool_push::<T>);
54 klass.join = Some(task_pool_join::<T>);
55
56 #[cfg(feature = "v1_20")]
57 {
58 klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
59 }
60 }
61}
62
63unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
64 ptr: *mut ffi::GstTaskPool,
65 error: *mut *mut glib::ffi::GError,
66) {
67 let instance = &*(ptr as *mut T::Instance);
68 let imp = instance.imp();
69
70 match imp.prepare() {
71 Ok(()) => {}
72 Err(err) => {
73 if !error.is_null() {
74 *error = err.into_glib_ptr();
75 }
76 }
77 }
78}
79
80unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
81 let instance = &*(ptr as *mut T::Instance);
82 let imp = instance.imp();
83
84 imp.cleanup();
85}
86
87unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
88 ptr: *mut ffi::GstTaskPool,
89 func: ffi::GstTaskPoolFunction,
90 user_data: gpointer,
91 error: *mut *mut glib::ffi::GError,
92) -> gpointer {
93 let instance = &*(ptr as *mut T::Instance);
94 let imp = instance.imp();
95
96 let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
97
98 match imp.push(func.clone()) {
99 Ok(None) => ptr::null_mut(),
100 Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
101 Err(err) => {
102 func.prevent_call();
103 if !error.is_null() {
104 *error = err.into_glib_ptr();
105 }
106 ptr::null_mut()
107 }
108 }
109}
110
111unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
112 if id.is_null() {
113 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
114 crate::warning!(
115 crate::CAT_RUST,
116 obj = wrap.as_ref(),
117 "Tried to join null handle"
118 );
119 return;
120 }
121
122 let handle = Box::from_raw(id as *mut T::Handle);
123 handle.join();
124}
125
126#[cfg(feature = "v1_20")]
127#[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
128unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
129 ptr: *mut ffi::GstTaskPool,
130 id: gpointer,
131) {
132 if id.is_null() {
133 let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
134 crate::warning!(
135 crate::CAT_RUST,
136 obj = wrap.as_ref(),
137 "Tried to dispose null handle"
138 );
139 return;
140 }
141
142 let handle = Box::from_raw(id as *mut T::Handle);
143 drop(handle);
144}
145
146#[derive(Debug)]
149pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
150
151#[derive(Debug)]
155struct TaskPoolFunctionInner {
156 func: unsafe extern "C" fn(gpointer),
157 user_data: gpointer,
158 warn_on_drop: bool,
159}
160
161unsafe impl Send for TaskPoolFunctionInner {}
162
163impl TaskPoolFunction {
164 fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
165 let inner = TaskPoolFunctionInner {
166 func,
167 user_data,
168 warn_on_drop: true,
169 };
170 Self(Arc::new(Mutex::new(Some(inner))))
171 }
172
173 #[inline]
174 fn clone(&self) -> Self {
175 Self(self.0.clone())
176 }
177
178 pub fn call(self) {
181 let mut inner = self
182 .0
183 .lock()
184 .unwrap()
185 .take()
186 .expect("TaskPoolFunction has already been dropped");
187 inner.warn_on_drop = false;
188 unsafe { (inner.func)(inner.user_data) }
189 }
190
191 fn prevent_call(self) {
192 let mut inner = self
193 .0
194 .lock()
195 .unwrap()
196 .take()
197 .expect("TaskPoolFunction has already been called");
198 inner.warn_on_drop = false;
199 drop(inner);
200 }
201
202 #[inline]
203 fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
204 Arc::as_ptr(&self.0)
205 }
206}
207
208impl Drop for TaskPoolFunctionInner {
209 fn drop(&mut self) {
210 if self.warn_on_drop {
211 crate::warning!(crate::CAT_RUST, "Leaked task function");
212 }
213 }
214}
215
216impl PartialEq for TaskPoolFunction {
217 fn eq(&self, other: &Self) -> bool {
218 self.as_ptr() == other.as_ptr()
219 }
220}
221
222impl Eq for TaskPoolFunction {}
223
224impl PartialOrd for TaskPoolFunction {
225 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226 Some(self.cmp(other))
227 }
228}
229
230impl Ord for TaskPoolFunction {
231 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
232 self.as_ptr().cmp(&other.as_ptr())
233 }
234}
235
236impl Hash for TaskPoolFunction {
237 fn hash<H: Hasher>(&self, state: &mut H) {
238 self.as_ptr().hash(state)
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::{
245 sync::{
246 atomic,
247 mpsc::{channel, TryRecvError},
248 },
249 thread,
250 };
251
252 use super::*;
253 use crate::prelude::*;
254
255 pub mod imp {
256 use super::*;
257
258 #[derive(Default)]
259 pub struct TestPool {
260 pub(super) prepared: atomic::AtomicBool,
261 pub(super) cleaned_up: atomic::AtomicBool,
262 }
263
264 #[glib::object_subclass]
265 impl ObjectSubclass for TestPool {
266 const NAME: &'static str = "TestPool";
267 type Type = super::TestPool;
268 type ParentType = TaskPool;
269 }
270
271 impl ObjectImpl for TestPool {}
272
273 impl GstObjectImpl for TestPool {}
274
275 impl TaskPoolImpl for TestPool {
276 type Handle = TestHandle;
277
278 fn prepare(&self) -> Result<(), glib::Error> {
279 self.prepared.store(true, atomic::Ordering::SeqCst);
280 Ok(())
281 }
282
283 fn cleanup(&self) {
284 self.cleaned_up.store(true, atomic::Ordering::SeqCst);
285 }
286
287 fn push(&self, func: TaskPoolFunction) -> Result<Option<Self::Handle>, glib::Error> {
288 let handle = thread::spawn(move || func.call());
289 Ok(Some(TestHandle(handle)))
290 }
291 }
292
293 pub struct TestHandle(thread::JoinHandle<()>);
294
295 impl TaskHandle for TestHandle {
296 fn join(self) {
297 self.0.join().unwrap();
298 }
299 }
300 }
301
302 glib::wrapper! {
303 pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
304 }
305
306 unsafe impl Send for TestPool {}
307 unsafe impl Sync for TestPool {}
308
309 impl TestPool {
310 pub fn new() -> Self {
311 Self::default()
312 }
313 }
314
315 impl Default for TestPool {
316 fn default() -> Self {
317 glib::Object::new()
318 }
319 }
320
321 #[test]
322 fn test_simple_subclass() {
323 crate::init().unwrap();
324
325 let pool = TestPool::new();
326 pool.prepare().unwrap();
327
328 let (sender, receiver) = channel();
329
330 let handle = pool
331 .push(move || {
332 sender.send(()).unwrap();
333 })
334 .unwrap();
335 let handle = handle.unwrap();
336
337 assert_eq!(receiver.recv(), Ok(()));
338
339 handle.join();
340 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
341
342 pool.cleanup();
343
344 let imp = pool.imp();
345 assert!(imp.prepared.load(atomic::Ordering::SeqCst));
346 assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
347 }
348}