zbus/abstractions/
executor.rs1#[cfg(not(feature = "tokio"))]
2use async_executor::Executor as AsyncExecutor;
3#[cfg(not(feature = "tokio"))]
4use async_task::Task as AsyncTask;
5#[cfg(not(feature = "tokio"))]
6use std::sync::Arc;
7use std::{
8 future::Future,
9 io::Result,
10 pin::Pin,
11 task::{Context, Poll},
12};
13#[cfg(feature = "tokio")]
14use std::{future::pending, io::Error, marker::PhantomData};
15#[cfg(feature = "tokio")]
16use tokio::task::JoinHandle;
17
18#[cfg(not(feature = "tokio"))]
26#[derive(Debug, Clone)]
27pub struct Executor<'a> {
28 executor: Arc<AsyncExecutor<'a>>,
29}
30#[cfg(feature = "tokio")]
31#[derive(Debug, Clone)]
32pub struct Executor<'a> {
33 phantom: PhantomData<&'a ()>,
34}
35
36impl Executor<'_> {
37 #[doc(hidden)]
39 pub fn spawn<T: Send + 'static>(
40 &self,
41 future: impl Future<Output = T> + Send + 'static,
42 #[allow(unused)] name: &str,
43 ) -> Task<T> {
44 #[cfg(not(feature = "tokio"))]
45 {
46 Task(Some(self.executor.spawn(future)))
47 }
48
49 #[cfg(feature = "tokio")]
50 {
51 #[cfg(tokio_unstable)]
52 {
53 Task(Some(
54 tokio::task::Builder::new()
55 .name(name)
56 .spawn(future)
57 .unwrap(),
59 ))
60 }
61 #[cfg(not(tokio_unstable))]
62 {
63 Task(Some(tokio::task::spawn(future)))
64 }
65 }
66 }
67
68 pub fn is_empty(&self) -> bool {
72 #[cfg(not(feature = "tokio"))]
73 {
74 self.executor.is_empty()
75 }
76
77 #[cfg(feature = "tokio")]
78 true
79 }
80
81 pub async fn tick(&self) {
85 #[cfg(not(feature = "tokio"))]
86 {
87 self.executor.tick().await
88 }
89
90 #[cfg(feature = "tokio")]
91 {
92 pending().await
93 }
94 }
95
96 pub(crate) fn new() -> Self {
98 #[cfg(not(feature = "tokio"))]
99 {
100 Self {
101 executor: Arc::new(AsyncExecutor::new()),
102 }
103 }
104
105 #[cfg(feature = "tokio")]
106 {
107 Self {
108 phantom: PhantomData,
109 }
110 }
111 }
112
113 pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
117 #[cfg(not(feature = "tokio"))]
118 {
119 self.executor.run(future).await
120 }
121 #[cfg(feature = "tokio")]
122 {
123 future.await
124 }
125 }
126}
127
128#[cfg(not(feature = "tokio"))]
136#[doc(hidden)]
137#[derive(Debug)]
138pub struct Task<T>(Option<AsyncTask<T>>);
139#[cfg(feature = "tokio")]
140#[doc(hidden)]
141#[derive(Debug)]
142pub struct Task<T>(Option<JoinHandle<T>>);
143
144impl<T> Task<T> {
145 #[allow(unused_mut)]
147 #[allow(unused)]
148 pub fn detach(mut self) {
149 #[cfg(not(feature = "tokio"))]
150 {
151 self.0.take().expect("async_task::Task is none").detach()
152 }
153
154 #[cfg(feature = "tokio")]
155 {
156 self.0.take().expect("tokio::task::JoinHandle is none");
157 }
158 }
159}
160
161impl<T> Task<T>
162where
163 T: Send + 'static,
164{
165 #[allow(unused)]
167 pub(crate) fn spawn_blocking<F>(f: F, #[allow(unused)] name: &str) -> Self
168 where
169 F: FnOnce() -> T + Send + 'static,
170 {
171 #[cfg(not(feature = "tokio"))]
172 {
173 Self(Some(blocking::unblock(f)))
174 }
175
176 #[cfg(feature = "tokio")]
177 {
178 #[cfg(tokio_unstable)]
179 {
180 Self(Some(
181 tokio::task::Builder::new()
182 .name(name)
183 .spawn_blocking(f)
184 .unwrap(),
186 ))
187 }
188 #[cfg(not(tokio_unstable))]
189 {
190 Self(Some(tokio::task::spawn_blocking(f)))
191 }
192 }
193 }
194}
195
196impl<T> Drop for Task<T> {
197 fn drop(&mut self) {
198 #[cfg(feature = "tokio")]
199 {
200 if let Some(join_handle) = self.0.take() {
201 join_handle.abort();
202 }
203 }
204 }
205}
206
207impl<T> Future for Task<T> {
208 type Output = Result<T>;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 #[cfg(not(feature = "tokio"))]
212 {
213 Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none"))
214 .poll(cx)
215 .map(|r| Ok(r))
216 }
217
218 #[cfg(feature = "tokio")]
219 {
220 Pin::new(
221 &mut self
222 .get_mut()
223 .0
224 .as_mut()
225 .expect("tokio::task::JoinHandle is none"),
226 )
227 .poll(cx)
228 .map(|r| match r {
229 Ok(v) => Ok(v),
230 Err(e) => {
231 if e.is_cancelled() {
232 Err(Error::other("tokio::task cancelled"))
233 } else {
234 panic!("tokio::task::JoinHandle error: {e}")
235 }
236 }
237 })
238 }
239 }
240}