zbus/abstractions/
executor.rs

1#[cfg(not(feature = "tokio"))]
2use async_executor::Executor as AsyncExecutor;
3#[cfg(not(feature = "tokio"))]
4use async_task::Task as AsyncTask;
5#[cfg(not(feature = "tokio"))]
6use std::sync::Arc;
7use std::{
8    future::Future,
9    io::Result,
10    pin::Pin,
11    task::{Context, Poll},
12};
13#[cfg(feature = "tokio")]
14use std::{future::pending, io::Error, marker::PhantomData};
15#[cfg(feature = "tokio")]
16use tokio::task::JoinHandle;
17
18/// A wrapper around the underlying runtime/executor.
19///
20/// This is used to run asynchronous tasks internally and allows integration with various runtimes.
21/// See [`crate::Connection::executor`] for an example of integration with external runtimes.
22///
23/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature
24/// enabled.
25#[cfg(not(feature = "tokio"))]
26#[derive(Debug, Clone)]
27pub struct Executor<'a> {
28    executor: Arc<AsyncExecutor<'a>>,
29}
30#[cfg(feature = "tokio")]
31#[derive(Debug, Clone)]
32pub struct Executor<'a> {
33    phantom: PhantomData<&'a ()>,
34}
35
36impl Executor<'_> {
37    /// Spawns a task onto the executor.
38    #[doc(hidden)]
39    pub fn spawn<T: Send + 'static>(
40        &self,
41        future: impl Future<Output = T> + Send + 'static,
42        #[allow(unused)] name: &str,
43    ) -> Task<T> {
44        #[cfg(not(feature = "tokio"))]
45        {
46            Task(Some(self.executor.spawn(future)))
47        }
48
49        #[cfg(feature = "tokio")]
50        {
51            #[cfg(tokio_unstable)]
52            {
53                Task(Some(
54                    tokio::task::Builder::new()
55                        .name(name)
56                        .spawn(future)
57                        // SAFETY: Looking at the code, this call always returns an `Ok`.
58                        .unwrap(),
59                ))
60            }
61            #[cfg(not(tokio_unstable))]
62            {
63                Task(Some(tokio::task::spawn(future)))
64            }
65        }
66    }
67
68    /// Return `true` if there are no unfinished tasks.
69    ///
70    /// With `tokio` feature enabled, this always returns `true`.
71    pub fn is_empty(&self) -> bool {
72        #[cfg(not(feature = "tokio"))]
73        {
74            self.executor.is_empty()
75        }
76
77        #[cfg(feature = "tokio")]
78        true
79    }
80
81    /// Runs a single task.
82    ///
83    /// With `tokio` feature enabled, its a noop and never returns.
84    pub async fn tick(&self) {
85        #[cfg(not(feature = "tokio"))]
86        {
87            self.executor.tick().await
88        }
89
90        #[cfg(feature = "tokio")]
91        {
92            pending().await
93        }
94    }
95
96    /// Create a new `Executor`.
97    pub(crate) fn new() -> Self {
98        #[cfg(not(feature = "tokio"))]
99        {
100            Self {
101                executor: Arc::new(AsyncExecutor::new()),
102            }
103        }
104
105        #[cfg(feature = "tokio")]
106        {
107            Self {
108                phantom: PhantomData,
109            }
110        }
111    }
112
113    /// Runs the executor until the given future completes.
114    ///
115    /// With `tokio` feature enabled, it just awaits on the `future`.
116    pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
117        #[cfg(not(feature = "tokio"))]
118        {
119            self.executor.run(future).await
120        }
121        #[cfg(feature = "tokio")]
122        {
123            future.await
124        }
125    }
126}
127
128/// A wrapper around the task API of the underlying runtime/executor.
129///
130/// This follows the semantics of `async_task::Task` on drop:
131///
132/// * it will be cancelled, rather than detached. For detaching, use the `detach` method.
133/// * errors from the task cancellation will will be ignored. If you need to know about task errors,
134///   convert the task to a `FallibleTask` using the `fallible` method.
135#[cfg(not(feature = "tokio"))]
136#[doc(hidden)]
137#[derive(Debug)]
138pub struct Task<T>(Option<AsyncTask<T>>);
139#[cfg(feature = "tokio")]
140#[doc(hidden)]
141#[derive(Debug)]
142pub struct Task<T>(Option<JoinHandle<T>>);
143
144impl<T> Task<T> {
145    /// Detaches the task to let it keep running in the background.
146    #[allow(unused_mut)]
147    #[allow(unused)]
148    pub fn detach(mut self) {
149        #[cfg(not(feature = "tokio"))]
150        {
151            self.0.take().expect("async_task::Task is none").detach()
152        }
153
154        #[cfg(feature = "tokio")]
155        {
156            self.0.take().expect("tokio::task::JoinHandle is none");
157        }
158    }
159}
160
161impl<T> Task<T>
162where
163    T: Send + 'static,
164{
165    /// Launch the given blocking function in a task.
166    #[allow(unused)]
167    pub(crate) fn spawn_blocking<F>(f: F, #[allow(unused)] name: &str) -> Self
168    where
169        F: FnOnce() -> T + Send + 'static,
170    {
171        #[cfg(not(feature = "tokio"))]
172        {
173            Self(Some(blocking::unblock(f)))
174        }
175
176        #[cfg(feature = "tokio")]
177        {
178            #[cfg(tokio_unstable)]
179            {
180                Self(Some(
181                    tokio::task::Builder::new()
182                        .name(name)
183                        .spawn_blocking(f)
184                        // SAFETY: Looking at the code, this call always returns an `Ok`.
185                        .unwrap(),
186                ))
187            }
188            #[cfg(not(tokio_unstable))]
189            {
190                Self(Some(tokio::task::spawn_blocking(f)))
191            }
192        }
193    }
194}
195
196impl<T> Drop for Task<T> {
197    fn drop(&mut self) {
198        #[cfg(feature = "tokio")]
199        {
200            if let Some(join_handle) = self.0.take() {
201                join_handle.abort();
202            }
203        }
204    }
205}
206
207impl<T> Future for Task<T> {
208    type Output = Result<T>;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211        #[cfg(not(feature = "tokio"))]
212        {
213            Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none"))
214                .poll(cx)
215                .map(|r| Ok(r))
216        }
217
218        #[cfg(feature = "tokio")]
219        {
220            Pin::new(
221                &mut self
222                    .get_mut()
223                    .0
224                    .as_mut()
225                    .expect("tokio::task::JoinHandle is none"),
226            )
227            .poll(cx)
228            .map(|r| match r {
229                Ok(v) => Ok(v),
230                Err(e) => {
231                    if e.is_cancelled() {
232                        Err(Error::other("tokio::task cancelled"))
233                    } else {
234                        panic!("tokio::task::JoinHandle error: {e}")
235                    }
236                }
237            })
238        }
239    }
240}