pub struct Semaphore {
ll_sem: Semaphore,
}
Expand description
Counting semaphore performing asynchronous permit acquisition.
A semaphore maintains a set of permits. Permits are used to synchronize access to a shared resource. A semaphore differs from a mutex in that it can allow more than one concurrent caller to access the shared resource at a time.
When acquire
is called and the semaphore has remaining permits, the
function immediately returns a permit. However, if no remaining permits are
available, acquire
(asynchronously) waits until an outstanding permit is
dropped. At this point, the freed permit is assigned to the caller.
This Semaphore
is fair, which means that permits are given out in the order
they were requested. This fairness is also applied when acquire_many
gets
involved, so if a call to acquire_many
at the front of the queue requests
more permits than currently available, this can prevent a call to acquire
from completing, even if the semaphore has enough permits complete the call
to acquire
.
To use the Semaphore
in a poll function, you can use the PollSemaphore
utility.
§Examples
Basic usage:
use tokio::sync::{Semaphore, TryAcquireError};
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(3);
let a_permit = semaphore.acquire().await.unwrap();
let two_permits = semaphore.acquire_many(2).await.unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_attempt = semaphore.try_acquire();
assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
}
§Limit the number of simultaneously opened files in your program
Most operating systems have limits on the number of open file handles. Even in systems without explicit limits, resource constraints implicitly set an upper bound on the number of open files. If your program attempts to open a large number of files and exceeds this limit, it will result in an error.
This example uses a Semaphore with 100 permits. By acquiring a permit from the Semaphore before accessing a file, you ensure that your program opens no more than 100 files at a time. When trying to open the 101st file, the program will wait until a permit becomes available before proceeding to open another file.
use std::io::Result;
use tokio::fs::File;
use tokio::sync::Semaphore;
use tokio::io::AsyncWriteExt;
static PERMITS: Semaphore = Semaphore::const_new(100);
async fn write_to_file(message: &[u8]) -> Result<()> {
let _permit = PERMITS.acquire().await.unwrap();
let mut buffer = File::create("example.txt").await?;
buffer.write_all(message).await?;
Ok(()) // Permit goes out of scope here, and is available again for acquisition
}
§Limit the number of outgoing requests being sent at the same time
In some scenarios, it might be required to limit the number of outgoing requests being sent in parallel. This could be due to limits of a consumed API or the network resources of the system the application is running on.
This example uses an Arc<Semaphore>
with 10 permits. Each task spawned is
given a reference to the semaphore by cloning the Arc<Semaphore>
. Before
a task sends a request, it must acquire a permit from the semaphore by
calling Semaphore::acquire
. This ensures that at most 10 requests are
sent in parallel at any given time. After a task has sent a request, it
drops the permit to allow other tasks to send requests.
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
// Define maximum number of parallel requests.
let semaphore = Arc::new(Semaphore::new(10));
// Spawn many tasks that will send requests.
let mut jhs = Vec::new();
for task_id in 0..100 {
let semaphore = semaphore.clone();
let jh = tokio::spawn(async move {
// Acquire permit before sending request.
let _permit = semaphore.acquire().await.unwrap();
// Send the request.
let response = send_request(task_id).await;
// Drop the permit after the request has been sent.
drop(_permit);
// Handle response.
// ...
response
});
jhs.push(jh);
}
// Collect responses from tasks.
let mut responses = Vec::new();
for jh in jhs {
let response = jh.await.unwrap();
responses.push(response);
}
// Process responses.
// ...
}
§Limit the number of incoming requests being handled at the same time
Similar to limiting the number of simultaneously opened files, network handles are a limited resource. Allowing an unbounded amount of requests to be processed could result in a denial-of-service, among many other issues.
This example uses an Arc<Semaphore>
instead of a global variable.
To limit the number of requests that can be processed at the time,
we acquire a permit for each task before spawning it. Once acquired,
a new task is spawned; and once finished, the permit is dropped inside
of the task to allow others to spawn. Permits must be acquired via
Semaphore::acquire_owned
to be movable across the task boundary.
(Since our semaphore is not a global variable — if it was, then acquire
would be enough.)
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let semaphore = Arc::new(Semaphore::new(3));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
// Acquire permit before accepting the next socket.
//
// We use `acquire_owned` so that we can move `permit` into
// other tasks.
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Do work using the socket.
handle_connection(&mut socket).await;
// Drop socket while the permit is still live.
drop(socket);
// Drop the permit, so more tasks can be created.
drop(permit);
});
}
}
§Prevent tests from running in parallel
By default, Rust runs tests in the same file in parallel. However, in some cases, running two tests in parallel may lead to problems. For example, this can happen when tests use the same database.
Consider the following scenario:
test_insert
: Inserts a key-value pair into the database, then retrieves the value using the same key to verify the insertion.test_update
: Inserts a key, then updates the key to a new value and verifies that the value has been accurately updated.test_others
: A third test that doesn’t modify the database state. It can run in parallel with the other tests.
In this example, test_insert
and test_update
need to run in sequence to
work, but it doesn’t matter which test runs first. We can leverage a
semaphore with a single permit to address this challenge.
use tokio::sync::Semaphore;
// Initialize a static semaphore with only one permit, which is used to
// prevent test_insert and test_update from running in parallel.
static PERMIT: Semaphore = Semaphore::const_new(1);
// Initialize the database that will be used by the subsequent tests.
static DB: Database = Database::setup();
#[tokio::test]
async fn test_insert() {
// Acquire permit before proceeding. Since the semaphore has only one permit,
// the test will wait if the permit is already acquired by other tests.
let permit = PERMIT.acquire().await.unwrap();
// Do the actual test stuff with database
// Insert a key-value pair to database
let (key, value) = ("name", 0);
DB.insert(key, value).await;
// Verify that the value has been inserted correctly.
assert_eq!(DB.get(key).await, value);
// Undo the insertion, so the database is empty at the end of the test.
DB.delete(key).await;
// Drop permit. This allows the other test to start running.
drop(permit);
}
#[tokio::test]
async fn test_update() {
// Acquire permit before proceeding. Since the semaphore has only one permit,
// the test will wait if the permit is already acquired by other tests.
let permit = PERMIT.acquire().await.unwrap();
// Do the same insert.
let (key, value) = ("name", 0);
DB.insert(key, value).await;
// Update the existing value with a new one.
let new_value = 1;
DB.update(key, new_value).await;
// Verify that the value has been updated correctly.
assert_eq!(DB.get(key).await, new_value);
// Undo any modificattion.
DB.delete(key).await;
// Drop permit. This allows the other test to start running.
drop(permit);
}
#[tokio::test]
async fn test_others() {
// This test can run in parallel with test_insert and test_update,
// so it does not use PERMIT.
}
§Rate limiting using a token bucket
This example showcases the add_permits
and SemaphorePermit::forget
methods.
Many applications and systems have constraints on the rate at which certain operations should occur. Exceeding this rate can result in suboptimal performance or even errors.
This example implements rate limiting using a token bucket. A token bucket is a form of rate limiting that doesn’t kick in immediately, to allow for short bursts of incoming requests that arrive at the same time.
With a token bucket, each incoming request consumes a token, and the tokens are refilled at a certain rate that defines the rate limit. When a burst of requests arrives, tokens are immediately given out until the bucket is empty. Once the bucket is empty, requests will have to wait for new tokens to be added.
Unlike the example that limits how many requests can be handled at the same time, we do not add tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
Note that this implementation is suboptimal when the duration is small, because it consumes a lot of cpu constantly looping and sleeping.
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{interval, Duration};
struct TokenBucket {
sem: Arc<Semaphore>,
jh: tokio::task::JoinHandle<()>,
}
impl TokenBucket {
fn new(duration: Duration, capacity: usize) -> Self {
let sem = Arc::new(Semaphore::new(capacity));
// refills the tokens at the end of each interval
let jh = tokio::spawn({
let sem = sem.clone();
let mut interval = interval(duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
async move {
loop {
interval.tick().await;
if sem.available_permits() < capacity {
sem.add_permits(1);
}
}
}
});
Self { jh, sem }
}
async fn acquire(&self) {
// This can return an error if the semaphore is closed, but we
// never close it, so this error can never happen.
let permit = self.sem.acquire().await.unwrap();
// To avoid releasing the permit back to the semaphore, we use
// the `SemaphorePermit::forget` method.
permit.forget();
}
}
impl Drop for TokenBucket {
fn drop(&mut self) {
// Kill the background task so it stops taking up resources when we
// don't need it anymore.
self.jh.abort();
}
}
#[tokio::main]
async fn main() {
let capacity = 5;
let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
let bucket = TokenBucket::new(update_interval, capacity);
for _ in 0..5 {
bucket.acquire().await;
// do the operation
}
}
Fields§
§ll_sem: Semaphore
The low level semaphore
Implementations§
source§impl Semaphore
impl Semaphore
sourcepub const MAX_PERMITS: usize = 2_305_843_009_213_693_951usize
pub const MAX_PERMITS: usize = 2_305_843_009_213_693_951usize
The maximum number of permits which a semaphore can hold. It is usize::MAX >> 3
.
Exceeding this limit typically results in a panic.
sourcepub fn new(permits: usize) -> Self
pub fn new(permits: usize) -> Self
Creates a new semaphore with the initial number of permits.
Panics if permits
exceeds Semaphore::MAX_PERMITS
.
sourcepub const fn const_new(permits: usize) -> Self
pub const fn const_new(permits: usize) -> Self
Creates a new semaphore with the initial number of permits.
When using the tracing
unstable feature, a Semaphore
created with
const_new
will not be instrumented. As such, it will not be visible
in tokio-console
. Instead, Semaphore::new
should be used to
create an instrumented object if that is needed.
§Examples
use tokio::sync::Semaphore;
static SEM: Semaphore = Semaphore::const_new(10);
sourcepub(crate) fn new_closed() -> Self
pub(crate) fn new_closed() -> Self
Creates a new closed semaphore with 0 permits.
sourcepub(crate) const fn const_new_closed() -> Self
pub(crate) const fn const_new_closed() -> Self
Creates a new closed semaphore with 0 permits.
sourcepub fn available_permits(&self) -> usize
pub fn available_permits(&self) -> usize
Returns the current number of available permits.
sourcepub fn add_permits(&self, n: usize)
pub fn add_permits(&self, n: usize)
Adds n
new permits to the semaphore.
The maximum number of permits is Semaphore::MAX_PERMITS
, and this function will panic if the limit is exceeded.
sourcepub fn forget_permits(&self, n: usize) -> usize
pub fn forget_permits(&self, n: usize) -> usize
Decrease a semaphore’s permits by a maximum of n
.
If there are insufficient permits and it’s not possible to reduce by n
,
return the number of permits that were actually reduced.
sourcepub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>
Acquires a permit from the semaphore.
If the semaphore has been closed, this returns an AcquireError
.
Otherwise, this returns a SemaphorePermit
representing the
acquired permit.
§Cancel safety
This method uses a queue to fairly distribute permits in the order they
were requested. Cancelling a call to acquire
makes you lose your place
in the queue.
§Examples
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(2);
let permit_1 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 0);
drop(permit_1);
assert_eq!(semaphore.available_permits(), 1);
}
sourcepub async fn acquire_many(
&self,
n: u32,
) -> Result<SemaphorePermit<'_>, AcquireError>
pub async fn acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, AcquireError>
Acquires n
permits from the semaphore.
If the semaphore has been closed, this returns an AcquireError
.
Otherwise, this returns a SemaphorePermit
representing the
acquired permits.
§Cancel safety
This method uses a queue to fairly distribute permits in the order they
were requested. Cancelling a call to acquire_many
makes you lose your
place in the queue.
§Examples
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(5);
let permit = semaphore.acquire_many(3).await.unwrap();
assert_eq!(semaphore.available_permits(), 2);
}
sourcepub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>
Tries to acquire a permit from the semaphore.
If the semaphore has been closed, this returns a TryAcquireError::Closed
and a TryAcquireError::NoPermits
if there are no permits left. Otherwise,
this returns a SemaphorePermit
representing the acquired permits.
§Examples
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Semaphore::new(2);
let permit_1 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_3 = semaphore.try_acquire();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
sourcepub fn try_acquire_many(
&self,
n: u32,
) -> Result<SemaphorePermit<'_>, TryAcquireError>
pub fn try_acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, TryAcquireError>
Tries to acquire n
permits from the semaphore.
If the semaphore has been closed, this returns a TryAcquireError::Closed
and a TryAcquireError::NoPermits
if there are not enough permits left.
Otherwise, this returns a SemaphorePermit
representing the acquired permits.
§Examples
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Semaphore::new(4);
let permit_1 = semaphore.try_acquire_many(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire_many(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
sourcepub async fn acquire_owned(
self: Arc<Self>,
) -> Result<OwnedSemaphorePermit, AcquireError>
pub async fn acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, AcquireError>
Acquires a permit from the semaphore.
The semaphore must be wrapped in an Arc
to call this method.
If the semaphore has been closed, this returns an AcquireError
.
Otherwise, this returns a OwnedSemaphorePermit
representing the
acquired permit.
§Cancel safety
This method uses a queue to fairly distribute permits in the order they
were requested. Cancelling a call to acquire_owned
makes you lose your
place in the queue.
§Examples
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
// perform task...
// explicitly own `permit` in the task
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}
}
sourcepub async fn acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, AcquireError>
pub async fn acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>
Acquires n
permits from the semaphore.
The semaphore must be wrapped in an Arc
to call this method.
If the semaphore has been closed, this returns an AcquireError
.
Otherwise, this returns a OwnedSemaphorePermit
representing the
acquired permit.
§Cancel safety
This method uses a queue to fairly distribute permits in the order they
were requested. Cancelling a call to acquire_many_owned
makes you lose
your place in the queue.
§Examples
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(10));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
join_handles.push(tokio::spawn(async move {
// perform task...
// explicitly own `permit` in the task
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}
}
sourcepub fn try_acquire_owned(
self: Arc<Self>,
) -> Result<OwnedSemaphorePermit, TryAcquireError>
pub fn try_acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, TryAcquireError>
Tries to acquire a permit from the semaphore.
The semaphore must be wrapped in an Arc
to call this method. If
the semaphore has been closed, this returns a TryAcquireError::Closed
and a TryAcquireError::NoPermits
if there are no permits left.
Otherwise, this returns a OwnedSemaphorePermit
representing the
acquired permit.
§Examples
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Arc::new(Semaphore::new(2));
let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_3 = semaphore.try_acquire_owned();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
sourcepub fn try_acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, TryAcquireError>
pub fn try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>
Tries to acquire n
permits from the semaphore.
The semaphore must be wrapped in an Arc
to call this method. If
the semaphore has been closed, this returns a TryAcquireError::Closed
and a TryAcquireError::NoPermits
if there are no permits left.
Otherwise, this returns a OwnedSemaphorePermit
representing the
acquired permit.
§Examples
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Arc::new(Semaphore::new(4));
let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire_many_owned(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
sourcepub fn close(&self)
pub fn close(&self)
Closes the semaphore.
This prevents the semaphore from issuing new permits and notifies all pending waiters.
§Examples
use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::sync::TryAcquireError;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(1));
let semaphore2 = semaphore.clone();
tokio::spawn(async move {
let permit = semaphore.acquire_many(2).await;
assert!(permit.is_err());
println!("waiter received error");
});
println!("closing semaphore");
semaphore2.close();
// Cannot obtain more permits
assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
}