Crate swapper

source ·
Expand description

Concurrency control for atomic swap of ownership.

A common pattern for thread pools is that each thread owns a token, and some times threads need to exchange tokens. A skeleton example is:

struct Token;
enum Message {
   // Messages go here
};
struct Thread {
   sender_to_other_thread: Sender<Message>,
   receiver_from_other_thread: Receiver<Message>,
   token: Token,
}
impl Thread {
   fn swap_token(&mut self) {
      // This function should swap the token with the other thread.
   }
   fn handle(&mut self, message: Message) {
       match message {
          // Message handlers go here
       }
   }
   fn run(&mut self) {
      loop {
         let message = self.receiver_from_other_thread.recv();
         match message {
            Ok(message) => self.handle(message),
            Err(_) => return,
         }
      }
   }
}

To do this with the Rust channels, ownership of the token is first passed from the thread to the channel, then to the other thead, resulting in a transitory state where the thread does not have the token. Typically to work round this, the thread stores an Option<Token> rather than a Token:

enum Message {
   SwapToken(Token, Sender<Token>),
};
struct Thread {
   sender_to_other_thread: Sender<Message>,
   receiver_from_other_thread: Receiver<Message>,
   token: Option<Token>, // ANNOYING Option
}
impl Thread {
   fn swap_token(&mut self) {
      let (sender, receiver) = mpsc::channel();
      let token = self.token.take().unwrap();
      self.sender_to_other_thread.send(Message::SwapToken(token, sender));
      let token = receiver.recv().unwrap();
      self.token = Some(token);
   }
   fn handle(&mut self, message: Message) {
       match message {
          Message::SwapToken(token, sender) => {
             let token = mem::replace(&mut self.token, Some(token)).unwrap();
             sender.send(token).unwrap();
          }
       }
   }
}

This crate provides a synchronization primitive for swapping ownership between threads. The API is similar to channels, except that rather than separate send(T) and recv():T methods, there is one swap(T):T, which swaps a T owned by one thread for a T owned by the other. For example, it allows an implementation of the thread pool which always owns a token.

enum Message {
   SwapToken(Swapper<Token>),
};
struct Thread {
   sender_to_other_thread: Sender<Message>,
   receiver_from_other_thread: Receiver<Message>,
   token: Token,
}
impl Thread {
   fn swap_token(&mut self) {
      let (our_swapper, their_swapper) = swapper::swapper();
      self.sender_to_other_thread.send(Message::SwapToken(their_swapper));
      our_swapper.swap(&mut self.token).unwrap();
   }
   fn handle(&mut self, message: Message) {
       match message {
          Message::SwapToken(swapper) => swapper.swap(&mut self.token).unwrap(),
       }
   }
}

Structs§

  • The error returned when a thread attempts to swap with a thread that has dropped its swapper.
  • A concurrency control for swapping ownership between threads.

Functions§

  • Create a new pair of swappers.