tokio_util/future.rs
1//! An extension trait for Futures that provides a variety of convenient adapters.
2
3mod with_cancellation_token;
4use with_cancellation_token::{WithCancellationTokenFuture, WithCancellationTokenFutureOwned};
5
6use std::future::Future;
7
8use crate::sync::CancellationToken;
9
10/// A trait which contains a variety of convenient adapters and utilities for `Future`s.
11pub trait FutureExt: Future {
12    cfg_time! {
13        /// A wrapper around [`tokio::time::timeout`], with the advantage that it is easier to write
14        /// fluent call chains.
15        ///
16        /// # Examples
17        ///
18        /// ```rust
19        /// use tokio::{sync::oneshot, time::Duration};
20        /// use tokio_util::future::FutureExt;
21        ///
22        /// # async fn dox() {
23        /// let (_tx, rx) = oneshot::channel::<()>();
24        ///
25        /// let res = rx.timeout(Duration::from_millis(10)).await;
26        /// assert!(res.is_err());
27        /// # }
28        /// ```
29        #[track_caller]
30        fn timeout(self, timeout: std::time::Duration) -> tokio::time::Timeout<Self>
31        where
32            Self: Sized,
33        {
34            tokio::time::timeout(timeout, self)
35        }
36
37        /// A wrapper around [`tokio::time::timeout_at`], with the advantage that it is easier to write
38        /// fluent call chains.
39        ///
40        /// # Examples
41        ///
42        /// ```rust
43        /// use tokio::{sync::oneshot, time::{Duration, Instant}};
44        /// use tokio_util::future::FutureExt;
45        ///
46        /// # async fn dox() {
47        /// let (_tx, rx) = oneshot::channel::<()>();
48        /// let deadline = Instant::now() + Duration::from_millis(10);
49        ///
50        /// let res = rx.timeout_at(deadline).await;
51        /// assert!(res.is_err());
52        /// # }
53        /// ```
54        fn timeout_at(self, deadline: tokio::time::Instant) -> tokio::time::Timeout<Self>
55        where
56            Self: Sized,
57        {
58            tokio::time::timeout_at(deadline, self)
59        }
60    }
61
62    /// Similar to [`CancellationToken::run_until_cancelled`],
63    /// but with the advantage that it is easier to write fluent call chains.
64    ///
65    /// # Fairness
66    ///
67    /// Calling this on an already-cancelled token directly returns `None`.
68    /// For all subsequent polls, in case of concurrent completion and
69    /// cancellation, this is biased towards the `self` future completion.
70    ///
71    /// # Examples
72    ///
73    /// ```rust
74    /// use tokio::sync::oneshot;
75    /// use tokio_util::future::FutureExt;
76    /// use tokio_util::sync::CancellationToken;
77    ///
78    /// # async fn dox() {
79    /// let (_tx, rx) = oneshot::channel::<()>();
80    /// let token = CancellationToken::new();
81    /// let token_clone = token.clone();
82    /// tokio::spawn(async move {
83    ///     tokio::time::sleep(std::time::Duration::from_millis(10)).await;
84    ///     token.cancel();
85    /// });
86    /// assert!(rx.with_cancellation_token(&token_clone).await.is_none())
87    /// # }
88    /// ```
89    fn with_cancellation_token(
90        self,
91        cancellation_token: &CancellationToken,
92    ) -> WithCancellationTokenFuture<'_, Self>
93    where
94        Self: Sized,
95    {
96        WithCancellationTokenFuture::new(cancellation_token, self)
97    }
98
99    /// Similar to [`CancellationToken::run_until_cancelled_owned`],
100    /// but with the advantage that it is easier to write fluent call chains.
101    ///
102    /// # Fairness
103    ///
104    /// Calling this on an already-cancelled token directly returns `None`.
105    /// For all subsequent polls, in case of concurrent completion and
106    /// cancellation, this is biased towards the `self` future completion.
107    ///
108    /// # Examples
109    ///
110    /// ```rust
111    /// use tokio::sync::oneshot;
112    /// use tokio_util::future::FutureExt;
113    /// use tokio_util::sync::CancellationToken;
114    ///
115    /// # async fn dox() {
116    /// let (_tx, rx) = oneshot::channel::<()>();
117    /// let token = CancellationToken::new();
118    /// let token_clone = token.clone();
119    /// tokio::spawn(async move {
120    ///     tokio::time::sleep(std::time::Duration::from_millis(10)).await;
121    ///     token.cancel();
122    /// });
123    /// assert!(rx.with_cancellation_token_owned(token_clone).await.is_none())
124    /// # }
125    /// ```
126    fn with_cancellation_token_owned(
127        self,
128        cancellation_token: CancellationToken,
129    ) -> WithCancellationTokenFutureOwned<Self>
130    where
131        Self: Sized,
132    {
133        WithCancellationTokenFutureOwned::new(cancellation_token, self)
134    }
135}
136
137impl<T: Future + ?Sized> FutureExt for T {}