swapper/lib.rs
1
2//! Concurrency control for atomic swap of ownership.
3//!
4//! A common pattern for thread pools is that each thread owns a token,
5//! and some times threads need to exchange tokens. A skeleton example
6//! is:
7//!
8//! ```rust
9//! # use std::sync::mpsc::{Sender, Receiver};
10//! struct Token;
11//! enum Message {
12//! // Messages go here
13//! };
14//! struct Thread {
15//! sender_to_other_thread: Sender<Message>,
16//! receiver_from_other_thread: Receiver<Message>,
17//! token: Token,
18//! }
19//! impl Thread {
20//! fn swap_token(&mut self) {
21//! // This function should swap the token with the other thread.
22//! }
23//! fn handle(&mut self, message: Message) {
24//! match message {
25//! // Message handlers go here
26//! }
27//! }
28//! fn run(&mut self) {
29//! loop {
30//! let message = self.receiver_from_other_thread.recv();
31//! match message {
32//! Ok(message) => self.handle(message),
33//! Err(_) => return,
34//! }
35//! }
36//! }
37//! }
38//! ```
39//!
40//! To do this with the Rust channels, ownership of the token is first
41//! passed from the thread to the channel, then to the other thead,
42//! resulting in a transitory state where the thread does not have the
43//! token. Typically to work round this, the thread stores an `Option<Token>`
44//! rather than a `Token`:
45//!
46//! ```rust
47//! # use std::sync::mpsc::{self, Sender, Receiver};
48//! # use std::mem;
49//! # struct Token;
50//! enum Message {
51//! SwapToken(Token, Sender<Token>),
52//! };
53//! struct Thread {
54//! sender_to_other_thread: Sender<Message>,
55//! receiver_from_other_thread: Receiver<Message>,
56//! token: Option<Token>, // ANNOYING Option
57//! }
58//! impl Thread {
59//! fn swap_token(&mut self) {
60//! let (sender, receiver) = mpsc::channel();
61//! let token = self.token.take().unwrap();
62//! self.sender_to_other_thread.send(Message::SwapToken(token, sender));
63//! let token = receiver.recv().unwrap();
64//! self.token = Some(token);
65//! }
66//! fn handle(&mut self, message: Message) {
67//! match message {
68//! Message::SwapToken(token, sender) => {
69//! let token = mem::replace(&mut self.token, Some(token)).unwrap();
70//! sender.send(token).unwrap();
71//! }
72//! }
73//! }
74//! }
75//! ```
76//!
77//! This crate provides a synchronization primitive for swapping ownership between threads.
78//! The API is similar to channels, except that rather than separate `send(T)` and `recv():T`
79//! methods, there is one `swap(T):T`, which swaps a `T` owned by one thread for a `T` owned
80//! by the other. For example, it allows an implementation of the thread pool which always
81//! owns a token.
82//!
83//! ```rust
84//! # use std::sync::mpsc::{self, Sender, Receiver};
85//! # use swapper::{self, Swapper};
86//! # struct Token;
87//! enum Message {
88//! SwapToken(Swapper<Token>),
89//! };
90//! struct Thread {
91//! sender_to_other_thread: Sender<Message>,
92//! receiver_from_other_thread: Receiver<Message>,
93//! token: Token,
94//! }
95//! impl Thread {
96//! fn swap_token(&mut self) {
97//! let (our_swapper, their_swapper) = swapper::swapper();
98//! self.sender_to_other_thread.send(Message::SwapToken(their_swapper));
99//! our_swapper.swap(&mut self.token).unwrap();
100//! }
101//! fn handle(&mut self, message: Message) {
102//! match message {
103//! Message::SwapToken(swapper) => swapper.swap(&mut self.token).unwrap(),
104//! }
105//! }
106//! }
107//! ```
108
109use std::mem;
110use std::ptr;
111use std::sync::Arc;
112use std::sync::atomic::AtomicPtr;
113use std::sync::atomic::Ordering;
114use std::sync::mpsc;
115use std::sync::mpsc::Receiver;
116use std::sync::mpsc::RecvError;
117use std::sync::mpsc::Sender;
118use std::sync::mpsc::SendError;
119
120/// A concurrency control for swapping ownership between threads.
121
122pub struct Swapper<T> {
123 contents: Arc<AtomicPtr<T>>,
124 wait: Receiver<()>,
125 notify: Sender<()>,
126}
127
128impl<T: Send> Swapper<T> {
129 /// Swap data.
130 ///
131 /// If the other half of the swap pair is blocked waiting to swap, then it swaps ownership
132 /// of the data, then unblocks the other thread. Otherwise it blocks waiting to swap.
133 pub fn swap(&self, our_ref: &mut T) -> Result<(), SwapError> {
134 loop {
135 // Is the other thead blocked waiting to swap? If so, swap and unblock it.
136 let their_ptr = self.contents.swap(ptr::null_mut(), Ordering::AcqRel);
137 if let Some(their_ref) = unsafe { their_ptr.as_mut() } {
138 // The safety of this implementation depends on the other thread being blocked
139 // while this swap happens.
140 mem::swap(our_ref, their_ref);
141 // We have swapped ownership, so its now safe to unblock the other thread.
142 try!(self.notify.send(()));
143 return Ok(());
144 }
145 // Is the other thead not ready for a swap yet? If so, block waiting to swap.
146 let their_ptr = self.contents.compare_and_swap(ptr::null_mut(), our_ref, Ordering::AcqRel);
147 if their_ptr.is_null() {
148 try!(self.wait.recv());
149 return Ok(());
150 }
151 }
152 }
153}
154
155// Be explicit about implementing Send.
156unsafe impl<T: Send> Send for Swapper<T> {}
157
158/// Create a new pair of swappers.
159pub fn swapper<T>() -> (Swapper<T>, Swapper<T>) {
160 let contents = Arc::new(AtomicPtr::new(ptr::null_mut()));
161 let (notify_a, wait_a) = mpsc::channel();
162 let (notify_b, wait_b) = mpsc::channel();
163 let swapper_a = Swapper {
164 contents: contents.clone(),
165 notify: notify_b,
166 wait: wait_a,
167 };
168 let swapper_b = Swapper {
169 contents: contents,
170 notify: notify_a,
171 wait: wait_b,
172 };
173 (swapper_a, swapper_b)
174}
175
176/// The error returned when a thread attempts to swap with a thread that has dropped its swapper.
177#[derive(Copy, Clone, Debug, Eq, PartialEq)]
178pub struct SwapError(());
179
180impl From<RecvError> for SwapError {
181 fn from(_: RecvError) -> SwapError {
182 SwapError(())
183 }
184}
185
186impl From<SendError<()>> for SwapError {
187 fn from(_: SendError<()>) -> SwapError {
188 SwapError(())
189 }
190}