Struct tokio::sync::futures::Notified

source ·
pub struct Notified<'a> {
    notify: &'a Notify,
    state: State,
    notify_waiters_calls: usize,
    waiter: Waiter,
}
Expand description

Future returned from Notify::notified().

This future is fused, so once it has completed, any future calls to poll will immediately return Poll::Ready.

Fields§

§notify: &'a Notify

The Notify being received on.

§state: State

The current state of the receiving process.

§notify_waiters_calls: usize

Number of calls to notify_waiters at the time of creation.

§waiter: Waiter

Entry in the waiter LinkedList.

Implementations§

source§

impl Notified<'_>

source

pub fn enable(self: Pin<&mut Self>) -> bool

Adds this future to the list of futures that are ready to receive wakeups from calls to notify_one.

Polling the future also adds it to the list, so this method should only be used if you want to add the future to the list before the first call to poll. (In fact, this method is equivalent to calling poll except that no Waker is registered.)

This has no effect on notifications sent using notify_waiters, which are received as long as they happen after the creation of the Notified regardless of whether enable or poll has been called.

This method returns true if the Notified is ready. This happens in the following situations:

  1. The notify_waiters method was called between the creation of the Notified and the call to this method.
  2. This is the first call to enable or poll on this future, and the Notify was holding a permit from a previous call to notify_one. The call consumes the permit in that case.
  3. The future has previously been enabled or polled, and it has since then been marked ready by either consuming a permit from the Notify, or by a call to notify_one or notify_waiters that removed it from the list of futures ready to receive wakeups.

If this method returns true, any future calls to poll on the same future will immediately return Poll::Ready.

§Examples

Unbound multi-producer multi-consumer (mpmc) channel.

The call to enable is important because otherwise if you have two calls to recv and two calls to send in parallel, the following could happen:

  1. Both calls to try_recv return None.
  2. Both new elements are added to the vector.
  3. The notify_one method is called twice, adding only a single permit to the Notify.
  4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

By adding the Notified futures to the list by calling enable before try_recv, the notify_one calls in step three would remove the futures from the list and mark them notified instead of adding a permit to the Notify. This ensures that both futures are woken.

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    messages: Mutex<VecDeque<T>>,
    notify_on_sent: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, msg: T) {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.push_back(msg);
        drop(locked_queue);

        // Send a notification to one of the calls currently
        // waiting in a call to `recv`.
        self.notify_on_sent.notify_one();
    }

    pub fn try_recv(&self) -> Option<T> {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.pop_front()
    }

    pub async fn recv(&self) -> T {
        let future = self.notify_on_sent.notified();
        tokio::pin!(future);

        loop {
            // Make sure that no wakeup is lost if we get
            // `None` from `try_recv`.
            future.as_mut().enable();

            if let Some(msg) = self.try_recv() {
                return msg;
            }

            // Wait for a call to `notify_one`.
            //
            // This uses `.as_mut()` to avoid consuming the future,
            // which lets us call `Pin::set` below.
            future.as_mut().await;

            // Reset the future in case another call to
            // `try_recv` got the message before us.
            future.set(self.notify_on_sent.notified());
        }
    }
}
source

fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter)

A custom project implementation is used in place of pin-project-lite as a custom drop implementation is needed.

source

fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>

Trait Implementations§

source§

impl<'a> Debug for Notified<'a>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl Drop for Notified<'_>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl Future for Notified<'_>

source§

type Output = ()

The type of value produced on completion.
source§

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>

Attempts to resolve the future to a final value, registering the current task for wakeup if the value is not yet available. Read more
source§

impl<'a> Send for Notified<'a>

source§

impl<'a> Sync for Notified<'a>

Auto Trait Implementations§

§

impl<'a> !Freeze for Notified<'a>

§

impl<'a> !RefUnwindSafe for Notified<'a>

§

impl<'a> !Unpin for Notified<'a>

§

impl<'a> !UnwindSafe for Notified<'a>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<F> IntoFuture for F
where F: Future,

source§

type Output = <F as Future>::Output

The output that the future will produce on completion.
source§

type IntoFuture = F

Which kind of future are we turning this into?
source§

fn into_future(self) -> <F as IntoFuture>::IntoFuture

Creates a future from a value. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.