pub struct Notify {
state: AtomicUsize,
waiters: Mutex<LinkedList<Waiter, <Waiter as Link>::Target>>,
}
Expand description
Notifies a single task to wake up.
Notify
provides a basic mechanism to notify a single task of an event.
Notify
itself does not carry any data. Instead, it is to be used to signal
another task to perform an operation.
A Notify
can be thought of as a Semaphore
starting with 0 permits. The
notified().await
method waits for a permit to become available, and
notify_one()
sets a permit if there currently are no available
permits.
The synchronization details of Notify
are similar to
thread::park
and Thread::unpark
from std. A Notify
value contains a single permit. notified().await
waits for the permit to
be made available, consumes the permit, and resumes. notify_one()
sets
the permit, waking a pending task if there is one.
If notify_one()
is called before notified().await
, then the next
call to notified().await
will complete immediately, consuming the permit.
Any subsequent calls to notified().await
will wait for a new permit.
If notify_one()
is called multiple times before notified().await
,
only a single permit is stored. The next call to notified().await
will
complete immediately, but the one after will wait for a new permit.
§Examples
Basic usage.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();
// Wait for task to receive notification.
handle.await.unwrap();
}
Unbound multi-producer single-consumer (mpsc) channel.
No wakeups can be lost when using this channel because the call to
notify_one()
will store a permit in the Notify
, which the following call
to notified()
will consume.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify_one();
}
// This is a single-consumer channel, so several concurrent calls to
// `recv` are not allowed.
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}
Unbound multi-producer multi-consumer (mpmc) channel.
The call to enable
is important because otherwise if you have two
calls to recv
and two calls to send
in parallel, the following could
happen:
- Both calls to
try_recv
returnNone
. - Both new elements are added to the vector.
- The
notify_one
method is called twice, adding only a single permit to theNotify
. - Both calls to
recv
reach theNotified
future. One of them consumes the permit, and the other sleeps forever.
By adding the Notified
futures to the list by calling enable
before
try_recv
, the notify_one
calls in step three would remove the
futures from the list and mark them notified instead of adding a permit
to the Notify
. This ensures that both futures are woken.
Notice that this failure can only happen if there are two concurrent calls
to recv
. This is why the mpsc example above does not require a call to
enable
.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
messages: Mutex<VecDeque<T>>,
notify_on_sent: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, msg: T) {
let mut locked_queue = self.messages.lock().unwrap();
locked_queue.push_back(msg);
drop(locked_queue);
// Send a notification to one of the calls currently
// waiting in a call to `recv`.
self.notify_on_sent.notify_one();
}
pub fn try_recv(&self) -> Option<T> {
let mut locked_queue = self.messages.lock().unwrap();
locked_queue.pop_front()
}
pub async fn recv(&self) -> T {
let future = self.notify_on_sent.notified();
tokio::pin!(future);
loop {
// Make sure that no wakeup is lost if we get
// `None` from `try_recv`.
future.as_mut().enable();
if let Some(msg) = self.try_recv() {
return msg;
}
// Wait for a call to `notify_one`.
//
// This uses `.as_mut()` to avoid consuming the future,
// which lets us call `Pin::set` below.
future.as_mut().await;
// Reset the future in case another call to
// `try_recv` got the message before us.
future.set(self.notify_on_sent.notified());
}
}
}
Fields§
§state: AtomicUsize
§waiters: Mutex<LinkedList<Waiter, <Waiter as Link>::Target>>
Implementations§
source§impl Notify
impl Notify
sourcepub fn new() -> Notify
pub fn new() -> Notify
Create a new Notify
, initialized without a permit.
§Examples
use tokio::sync::Notify;
let notify = Notify::new();
sourcepub const fn const_new() -> Notify
pub const fn const_new() -> Notify
Create a new Notify
, initialized without a permit.
When using the tracing
unstable feature, a Notify
created with
const_new
will not be instrumented. As such, it will not be visible
in tokio-console
. Instead, Notify::new
should be used to create
an instrumented object if that is needed.
§Examples
use tokio::sync::Notify;
static NOTIFY: Notify = Notify::const_new();
sourcepub fn notified(&self) -> Notified<'_> ⓘ
pub fn notified(&self) -> Notified<'_> ⓘ
Wait for a notification.
Equivalent to:
async fn notified(&self);
Each Notify
value holds a single permit. If a permit is available from
an earlier call to notify_one()
, then notified().await
will complete
immediately, consuming that permit. Otherwise, notified().await
waits
for a permit to be made available by the next call to notify_one()
.
The Notified
future is not guaranteed to receive wakeups from calls to
notify_one()
if it has not yet been polled. See the documentation for
Notified::enable()
for more details.
The Notified
future is guaranteed to receive wakeups from
notify_waiters()
as soon as it has been created, even if it has not
yet been polled.
§Cancel safety
This method uses a queue to fairly distribute notifications in the order
they were requested. Cancelling a call to notified
makes you lose your
place in the queue.
§Examples
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();
}
sourcepub fn notify_one(&self)
pub fn notify_one(&self)
Notifies the first waiting task.
If a task is currently waiting, that task is notified. Otherwise, a
permit is stored in this Notify
value and the next call to
notified().await
will complete immediately consuming the permit made
available by this call to notify_one()
.
At most one permit may be stored by Notify
. Many sequential calls to
notify_one
will result in a single permit being stored. The next call to
notified().await
will complete immediately, but the one after that
will wait.
§Examples
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();
}
sourcepub fn notify_last(&self)
pub fn notify_last(&self)
Notifies the last waiting task.
This function behaves similar to notify_one
. The only difference is that it wakes
the most recently added waiter instead of the oldest waiter.
Check the notify_one()
documentation for more info and
examples.
fn notify_with_strategy(&self, strategy: NotifyOneStrategy)
sourcepub fn notify_waiters(&self)
pub fn notify_waiters(&self)
Notifies all waiting tasks.
If a task is currently waiting, that task is notified. Unlike with
notify_one()
, no permit is stored to be used by the next call to
notified().await
. The purpose of this method is to notify all
already registered waiters. Registering for notification is done by
acquiring an instance of the Notified
future via calling notified()
.
§Examples
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();
let handle = tokio::spawn(async move {
println!("sending notifications");
notify2.notify_waiters();
});
notified1.await;
notified2.await;
println!("received notifications");
}