1use crate::latch::Latch;
2use crate::unwind;
3use crossbeam_deque::{Injector, Steal};
4use std::any::Any;
5use std::cell::UnsafeCell;
6use std::mem;
7use std::sync::Arc;
8
9pub(super) enum JobResult<T> {
10    None,
11    Ok(T),
12    Panic(Box<dyn Any + Send>),
13}
14
15pub(super) trait Job {
21    unsafe fn execute(this: *const ());
25}
26
27pub(super) struct JobRef {
34    pointer: *const (),
35    execute_fn: unsafe fn(*const ()),
36}
37
38unsafe impl Send for JobRef {}
39unsafe impl Sync for JobRef {}
40
41impl JobRef {
42    pub(super) unsafe fn new<T>(data: *const T) -> JobRef
45    where
46        T: Job,
47    {
48        JobRef {
50            pointer: data as *const (),
51            execute_fn: <T as Job>::execute,
52        }
53    }
54
55    #[inline]
58    pub(super) fn id(&self) -> impl Eq {
59        (self.pointer, self.execute_fn)
60    }
61
62    #[inline]
63    pub(super) unsafe fn execute(self) {
64        (self.execute_fn)(self.pointer)
65    }
66}
67
68pub(super) struct StackJob<L, F, R>
73where
74    L: Latch + Sync,
75    F: FnOnce(bool) -> R + Send,
76    R: Send,
77{
78    pub(super) latch: L,
79    func: UnsafeCell<Option<F>>,
80    result: UnsafeCell<JobResult<R>>,
81}
82
83impl<L, F, R> StackJob<L, F, R>
84where
85    L: Latch + Sync,
86    F: FnOnce(bool) -> R + Send,
87    R: Send,
88{
89    pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
90        StackJob {
91            latch,
92            func: UnsafeCell::new(Some(func)),
93            result: UnsafeCell::new(JobResult::None),
94        }
95    }
96
97    pub(super) unsafe fn as_job_ref(&self) -> JobRef {
98        JobRef::new(self)
99    }
100
101    pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
102        self.func.into_inner().unwrap()(stolen)
103    }
104
105    pub(super) unsafe fn into_result(self) -> R {
106        self.result.into_inner().into_return_value()
107    }
108}
109
110impl<L, F, R> Job for StackJob<L, F, R>
111where
112    L: Latch + Sync,
113    F: FnOnce(bool) -> R + Send,
114    R: Send,
115{
116    unsafe fn execute(this: *const ()) {
117        let this = &*(this as *const Self);
118        let abort = unwind::AbortIfPanic;
119        let func = (*this.func.get()).take().unwrap();
120        (*this.result.get()) = JobResult::call(func);
121        Latch::set(&this.latch);
122        mem::forget(abort);
123    }
124}
125
126pub(super) struct HeapJob<BODY>
133where
134    BODY: FnOnce() + Send,
135{
136    job: BODY,
137}
138
139impl<BODY> HeapJob<BODY>
140where
141    BODY: FnOnce() + Send,
142{
143    pub(super) fn new(job: BODY) -> Box<Self> {
144        Box::new(HeapJob { job })
145    }
146
147    pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
151        JobRef::new(Box::into_raw(self))
152    }
153
154    pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
156    where
157        BODY: 'static,
158    {
159        unsafe { self.into_job_ref() }
160    }
161}
162
163impl<BODY> Job for HeapJob<BODY>
164where
165    BODY: FnOnce() + Send,
166{
167    unsafe fn execute(this: *const ()) {
168        let this = Box::from_raw(this as *mut Self);
169        (this.job)();
170    }
171}
172
173pub(super) struct ArcJob<BODY>
176where
177    BODY: Fn() + Send + Sync,
178{
179    job: BODY,
180}
181
182impl<BODY> ArcJob<BODY>
183where
184    BODY: Fn() + Send + Sync,
185{
186    pub(super) fn new(job: BODY) -> Arc<Self> {
187        Arc::new(ArcJob { job })
188    }
189
190    pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
194        JobRef::new(Arc::into_raw(Arc::clone(this)))
195    }
196
197    pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
199    where
200        BODY: 'static,
201    {
202        unsafe { Self::as_job_ref(this) }
203    }
204}
205
206impl<BODY> Job for ArcJob<BODY>
207where
208    BODY: Fn() + Send + Sync,
209{
210    unsafe fn execute(this: *const ()) {
211        let this = Arc::from_raw(this as *mut Self);
212        (this.job)();
213    }
214}
215
216impl<T> JobResult<T> {
217    fn call(func: impl FnOnce(bool) -> T) -> Self {
218        match unwind::halt_unwinding(|| func(true)) {
219            Ok(x) => JobResult::Ok(x),
220            Err(x) => JobResult::Panic(x),
221        }
222    }
223
224    pub(super) fn into_return_value(self) -> T {
229        match self {
230            JobResult::None => unreachable!(),
231            JobResult::Ok(x) => x,
232            JobResult::Panic(x) => unwind::resume_unwinding(x),
233        }
234    }
235}
236
237pub(super) struct JobFifo {
239    inner: Injector<JobRef>,
240}
241
242impl JobFifo {
243    pub(super) fn new() -> Self {
244        JobFifo {
245            inner: Injector::new(),
246        }
247    }
248
249    pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
250        self.inner.push(job_ref);
254        JobRef::new(self)
255    }
256}
257
258impl Job for JobFifo {
259    unsafe fn execute(this: *const ()) {
260        let this = &*(this as *const Self);
262        loop {
263            match this.inner.steal() {
264                Steal::Success(job_ref) => break job_ref.execute(),
265                Steal::Empty => panic!("FIFO is empty"),
266                Steal::Retry => {}
267            }
268        }
269    }
270}