futures_util/lock/bilock.rs
1//! Futures-powered synchronization primitives.
2
3use alloc::boxed::Box;
4use alloc::sync::Arc;
5use core::cell::UnsafeCell;
6use core::ops::{Deref, DerefMut};
7use core::pin::Pin;
8use core::sync::atomic::AtomicPtr;
9use core::sync::atomic::Ordering::SeqCst;
10use core::{fmt, ptr};
11#[cfg(feature = "bilock")]
12use futures_core::future::Future;
13use futures_core::task::{Context, Poll, Waker};
14
15/// A type of futures-powered synchronization primitive which is a mutex between
16/// two possible owners.
17///
18/// This primitive is not as generic as a full-blown mutex but is sufficient for
19/// many use cases where there are only two possible owners of a resource. The
20/// implementation of `BiLock` can be more optimized for just the two possible
21/// owners.
22///
23/// Note that it's possible to use this lock through a poll-style interface with
24/// the `poll_lock` method but you can also use it as a future with the `lock`
25/// method that consumes a `BiLock` and returns a future that will resolve when
26/// it's locked.
27///
28/// A `BiLock` is typically used for "split" operations where data which serves
29/// two purposes wants to be split into two to be worked with separately. For
30/// example a TCP stream could be both a reader and a writer or a framing layer
31/// could be both a stream and a sink for messages. A `BiLock` enables splitting
32/// these two and then using each independently in a futures-powered fashion.
33///
34/// This type is only available when the `bilock` feature of this
35/// library is activated.
36#[derive(Debug)]
37#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
38pub struct BiLock<T> {
39 arc: Arc<Inner<T>>,
40}
41
42#[derive(Debug)]
43struct Inner<T> {
44 state: AtomicPtr<Waker>,
45 value: Option<UnsafeCell<T>>,
46}
47
48unsafe impl<T: Send> Send for Inner<T> {}
49unsafe impl<T: Send> Sync for Inner<T> {}
50
51impl<T> BiLock<T> {
52 /// Creates a new `BiLock` protecting the provided data.
53 ///
54 /// Two handles to the lock are returned, and these are the only two handles
55 /// that will ever be available to the lock. These can then be sent to separate
56 /// tasks to be managed there.
57 ///
58 /// The data behind the bilock is considered to be pinned, which allows `Pin`
59 /// references to locked data. However, this means that the locked value
60 /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`.
61 /// Similarly, reuniting the lock and extracting the inner value is only
62 /// possible when `T` is `Unpin`.
63 pub fn new(t: T) -> (Self, Self) {
64 let arc = Arc::new(Inner {
65 state: AtomicPtr::new(ptr::null_mut()),
66 value: Some(UnsafeCell::new(t)),
67 });
68
69 (Self { arc: arc.clone() }, Self { arc })
70 }
71
72 /// Attempt to acquire this lock, returning `Pending` if it can't be
73 /// acquired.
74 ///
75 /// This function will acquire the lock in a nonblocking fashion, returning
76 /// immediately if the lock is already held. If the lock is successfully
77 /// acquired then `Poll::Ready` is returned with a value that represents
78 /// the locked value (and can be used to access the protected data). The
79 /// lock is unlocked when the returned `BiLockGuard` is dropped.
80 ///
81 /// If the lock is already held then this function will return
82 /// `Poll::Pending`. In this case the current task will also be scheduled
83 /// to receive a notification when the lock would otherwise become
84 /// available.
85 ///
86 /// # Panics
87 ///
88 /// This function will panic if called outside the context of a future's
89 /// task.
90 pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
91 let mut waker = None;
92 loop {
93 let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
94 match n as usize {
95 // Woohoo, we grabbed the lock!
96 0 => return Poll::Ready(BiLockGuard { bilock: self }),
97
98 // Oops, someone else has locked the lock
99 1 => {}
100
101 // A task was previously blocked on this lock, likely our task,
102 // so we need to update that task.
103 _ => unsafe {
104 let mut prev = Box::from_raw(n);
105 *prev = cx.waker().clone();
106 waker = Some(prev);
107 },
108 }
109
110 // type ascription for safety's sake!
111 let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
112 let me = Box::into_raw(me);
113
114 match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
115 // The lock is still locked, but we've now parked ourselves, so
116 // just report that we're scheduled to receive a notification.
117 Ok(_) => return Poll::Pending,
118
119 // Oops, looks like the lock was unlocked after our swap above
120 // and before the compare_exchange. Deallocate what we just
121 // allocated and go through the loop again.
122 Err(n) if n.is_null() => unsafe {
123 waker = Some(Box::from_raw(me));
124 },
125
126 // The top of this loop set the previous state to 1, so if we
127 // failed the CAS above then it's because the previous value was
128 // *not* zero or one. This indicates that a task was blocked,
129 // but we're trying to acquire the lock and there's only one
130 // other reference of the lock, so it should be impossible for
131 // that task to ever block itself.
132 Err(n) => panic!("invalid state: {}", n as usize),
133 }
134 }
135 }
136
137 /// Perform a "blocking lock" of this lock, consuming this lock handle and
138 /// returning a future to the acquired lock.
139 ///
140 /// This function consumes the `BiLock<T>` and returns a sentinel future,
141 /// `BiLockAcquire<T>`. The returned future will resolve to
142 /// `BiLockGuard<T>`.
143 ///
144 /// Note that the returned future will never resolve to an error.
145 #[cfg(feature = "bilock")]
146 #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
147 pub fn lock(&self) -> BiLockAcquire<'_, T> {
148 BiLockAcquire { bilock: self }
149 }
150
151 /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`.
152 pub fn is_pair_of(&self, other: &Self) -> bool {
153 Arc::ptr_eq(&self.arc, &other.arc)
154 }
155
156 /// Attempts to put the two "halves" of a `BiLock<T>` back together and
157 /// recover the original value. Succeeds only if the two `BiLock<T>`s
158 /// originated from the same call to `BiLock::new`.
159 pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>>
160 where
161 T: Unpin,
162 {
163 if self.is_pair_of(&other) {
164 drop(other);
165 let inner = Arc::try_unwrap(self.arc)
166 .ok()
167 .expect("futures: try_unwrap failed in BiLock<T>::reunite");
168 Ok(unsafe { inner.into_value() })
169 } else {
170 Err(ReuniteError(self, other))
171 }
172 }
173
174 fn unlock(&self) {
175 let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
176 match n as usize {
177 // we've locked the lock, shouldn't be possible for us to see an
178 // unlocked lock.
179 0 => panic!("invalid unlocked state"),
180
181 // Ok, no one else tried to get the lock, we're done.
182 1 => {}
183
184 // Another task has parked themselves on this lock, let's wake them
185 // up as its now their turn.
186 _ => unsafe {
187 Box::from_raw(n).wake();
188 },
189 }
190 }
191}
192
193impl<T: Unpin> Inner<T> {
194 unsafe fn into_value(mut self) -> T {
195 self.value.take().unwrap().into_inner()
196 }
197}
198
199impl<T> Drop for Inner<T> {
200 fn drop(&mut self) {
201 assert!(self.state.load(SeqCst).is_null());
202 }
203}
204
205/// Error indicating two `BiLock<T>`s were not two halves of a whole, and
206/// thus could not be `reunite`d.
207#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
208pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
209
210impl<T> fmt::Debug for ReuniteError<T> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.debug_tuple("ReuniteError").field(&"...").finish()
213 }
214}
215
216impl<T> fmt::Display for ReuniteError<T> {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 write!(f, "tried to reunite two BiLocks that don't form a pair")
219 }
220}
221
222#[cfg(feature = "std")]
223impl<T: core::any::Any> std::error::Error for ReuniteError<T> {}
224
225/// Returned RAII guard from the `poll_lock` method.
226///
227/// This structure acts as a sentinel to the data in the `BiLock<T>` itself,
228/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
229/// unlocked.
230#[derive(Debug)]
231#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
232pub struct BiLockGuard<'a, T> {
233 bilock: &'a BiLock<T>,
234}
235
236// We allow parallel access to T via Deref, so Sync bound is also needed here.
237unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {}
238
239impl<T> Deref for BiLockGuard<'_, T> {
240 type Target = T;
241 fn deref(&self) -> &T {
242 unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() }
243 }
244}
245
246impl<T: Unpin> DerefMut for BiLockGuard<'_, T> {
247 fn deref_mut(&mut self) -> &mut T {
248 unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() }
249 }
250}
251
252impl<T> BiLockGuard<'_, T> {
253 /// Get a mutable pinned reference to the locked value.
254 pub fn as_pin_mut(&mut self) -> Pin<&mut T> {
255 // Safety: we never allow moving a !Unpin value out of a bilock, nor
256 // allow mutable access to it
257 unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) }
258 }
259}
260
261impl<T> Drop for BiLockGuard<'_, T> {
262 fn drop(&mut self) {
263 self.bilock.unlock();
264 }
265}
266
267/// Future returned by `BiLock::lock` which will resolve when the lock is
268/// acquired.
269#[cfg(feature = "bilock")]
270#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
271#[must_use = "futures do nothing unless you `.await` or poll them"]
272#[derive(Debug)]
273pub struct BiLockAcquire<'a, T> {
274 bilock: &'a BiLock<T>,
275}
276
277// Pinning is never projected to fields
278#[cfg(feature = "bilock")]
279impl<T> Unpin for BiLockAcquire<'_, T> {}
280
281#[cfg(feature = "bilock")]
282impl<'a, T> Future for BiLockAcquire<'a, T> {
283 type Output = BiLockGuard<'a, T>;
284
285 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
286 self.bilock.poll_lock(cx)
287 }
288}
289
290// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible.
291#[allow(clippy::useless_transmute)]
292#[inline]
293fn invalid_ptr<T>(addr: usize) -> *mut T {
294 // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that
295 // pointer).
296 unsafe { core::mem::transmute(addr) }
297}