swapper/
lib.rs

1
2//! Concurrency control for atomic swap of ownership.
3//!
4//! A common pattern for thread pools is that each thread owns a token,
5//! and some times threads need to exchange tokens. A skeleton example
6//! is:
7//!
8//! ```rust
9//! # use std::sync::mpsc::{Sender, Receiver};
10//! struct Token;
11//! enum Message {
12//!    // Messages go here
13//! };
14//! struct Thread {
15//!    sender_to_other_thread: Sender<Message>,
16//!    receiver_from_other_thread: Receiver<Message>,
17//!    token: Token,
18//! }
19//! impl Thread {
20//!    fn swap_token(&mut self) {
21//!       // This function should swap the token with the other thread.
22//!    }
23//!    fn handle(&mut self, message: Message) {
24//!        match message {
25//!           // Message handlers go here
26//!        }
27//!    }
28//!    fn run(&mut self) {
29//!       loop {
30//!          let message = self.receiver_from_other_thread.recv();
31//!          match message {
32//!             Ok(message) => self.handle(message),
33//!             Err(_) => return,
34//!          }
35//!       }
36//!    }
37//! }
38//! ```
39//!
40//! To do this with the Rust channels, ownership of the token is first
41//! passed from the thread to the channel, then to the other thead,
42//! resulting in a transitory state where the thread does not have the
43//! token. Typically to work round this, the thread stores an `Option<Token>`
44//! rather than a `Token`:
45//!
46//! ```rust
47//! # use std::sync::mpsc::{self, Sender, Receiver};
48//! # use std::mem;
49//! # struct Token;
50//! enum Message {
51//!    SwapToken(Token, Sender<Token>),
52//! };
53//! struct Thread {
54//!    sender_to_other_thread: Sender<Message>,
55//!    receiver_from_other_thread: Receiver<Message>,
56//!    token: Option<Token>, // ANNOYING Option
57//! }
58//! impl Thread {
59//!    fn swap_token(&mut self) {
60//!       let (sender, receiver) = mpsc::channel();
61//!       let token = self.token.take().unwrap();
62//!       self.sender_to_other_thread.send(Message::SwapToken(token, sender));
63//!       let token = receiver.recv().unwrap();
64//!       self.token = Some(token);
65//!    }
66//!    fn handle(&mut self, message: Message) {
67//!        match message {
68//!           Message::SwapToken(token, sender) => {
69//!              let token = mem::replace(&mut self.token, Some(token)).unwrap();
70//!              sender.send(token).unwrap();
71//!           }
72//!        }
73//!    }
74//! }
75//! ```
76//!
77//! This crate provides a synchronization primitive for swapping ownership between threads.
78//! The API is similar to channels, except that rather than separate `send(T)` and `recv():T`
79//! methods, there is one `swap(T):T`, which swaps a `T` owned by one thread for a `T` owned
80//! by the other. For example, it allows an implementation of the thread pool which always
81//! owns a token.
82//!
83//! ```rust
84//! # use std::sync::mpsc::{self, Sender, Receiver};
85//! # use swapper::{self, Swapper};
86//! # struct Token;
87//! enum Message {
88//!    SwapToken(Swapper<Token>),
89//! };
90//! struct Thread {
91//!    sender_to_other_thread: Sender<Message>,
92//!    receiver_from_other_thread: Receiver<Message>,
93//!    token: Token,
94//! }
95//! impl Thread {
96//!    fn swap_token(&mut self) {
97//!       let (our_swapper, their_swapper) = swapper::swapper();
98//!       self.sender_to_other_thread.send(Message::SwapToken(their_swapper));
99//!       our_swapper.swap(&mut self.token).unwrap();
100//!    }
101//!    fn handle(&mut self, message: Message) {
102//!        match message {
103//!           Message::SwapToken(swapper) => swapper.swap(&mut self.token).unwrap(),
104//!        }
105//!    }
106//! }
107//! ```
108
109use std::mem;
110use std::ptr;
111use std::sync::Arc;
112use std::sync::atomic::AtomicPtr;
113use std::sync::atomic::Ordering;
114use std::sync::mpsc;
115use std::sync::mpsc::Receiver;
116use std::sync::mpsc::RecvError;
117use std::sync::mpsc::Sender;
118use std::sync::mpsc::SendError;
119
120/// A concurrency control for swapping ownership between threads.
121
122pub struct Swapper<T> {
123    contents: Arc<AtomicPtr<T>>,
124    wait: Receiver<()>,
125    notify: Sender<()>,
126}
127
128impl<T: Send> Swapper<T> {
129    /// Swap data.
130    ///
131    /// If the other half of the swap pair is blocked waiting to swap, then it swaps ownership
132    /// of the data, then unblocks the other thread. Otherwise it blocks waiting to swap.
133    pub fn swap(&self, our_ref: &mut T) -> Result<(), SwapError> {
134        loop {
135            // Is the other thead blocked waiting to swap? If so, swap and unblock it.
136            let their_ptr = self.contents.swap(ptr::null_mut(), Ordering::AcqRel);
137            if let Some(their_ref) = unsafe { their_ptr.as_mut() } {
138                // The safety of this implementation depends on the other thread being blocked
139                // while this swap happens.
140                mem::swap(our_ref, their_ref);
141                // We have swapped ownership, so its now safe to unblock the other thread.
142                try!(self.notify.send(()));
143                return Ok(());
144            }
145            // Is the other thead not ready for a swap yet? If so, block waiting to swap.
146            let their_ptr = self.contents.compare_and_swap(ptr::null_mut(), our_ref, Ordering::AcqRel);
147            if their_ptr.is_null() {
148                try!(self.wait.recv());
149                return Ok(());
150            }
151        }
152    }
153}
154
155// Be explicit about implementing Send.
156unsafe impl<T: Send> Send for Swapper<T> {}
157
158/// Create a new pair of swappers.
159pub fn swapper<T>() -> (Swapper<T>, Swapper<T>) {
160    let contents = Arc::new(AtomicPtr::new(ptr::null_mut()));
161    let (notify_a, wait_a) = mpsc::channel();
162    let (notify_b, wait_b) = mpsc::channel();
163    let swapper_a = Swapper {
164        contents: contents.clone(),
165        notify: notify_b,
166        wait: wait_a,
167    };
168    let swapper_b = Swapper {
169        contents: contents,
170        notify: notify_a,
171        wait: wait_b,
172    };
173    (swapper_a, swapper_b)
174}
175
176/// The error returned when a thread attempts to swap with a thread that has dropped its swapper.
177#[derive(Copy, Clone, Debug, Eq, PartialEq)]
178pub struct SwapError(());
179
180impl From<RecvError> for SwapError {
181    fn from(_: RecvError) -> SwapError {
182        SwapError(())
183    }
184}
185
186impl From<SendError<()>> for SwapError {
187    fn from(_: SendError<()>) -> SwapError {
188        SwapError(())
189    }
190}