1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//! An SPSC broadcast channel.
//!
//! - The value can only be a `usize`.
//! - The consumer is only notified if the value is different.
//! - The value `0` is reserved for closed.

use futures_util::task::AtomicWaker;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use std::task;

type Value = usize;

pub(crate) const CLOSED: usize = 0;

pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
    debug_assert!(
        initial != CLOSED,
        "watch::channel initial state of 0 is reserved"
    );

    let shared = Arc::new(Shared {
        value: AtomicUsize::new(initial),
        waker: AtomicWaker::new(),
    });

    (
        Sender {
            shared: shared.clone(),
        },
        Receiver { shared },
    )
}

pub(crate) struct Sender {
    shared: Arc<Shared>,
}

pub(crate) struct Receiver {
    shared: Arc<Shared>,
}

struct Shared {
    value: AtomicUsize,
    waker: AtomicWaker,
}

impl Sender {
    pub(crate) fn send(&mut self, value: Value) {
        if self.shared.value.swap(value, Ordering::SeqCst) != value {
            self.shared.waker.wake();
        }
    }
}

impl Drop for Sender {
    fn drop(&mut self) {
        self.send(CLOSED);
    }
}

impl Receiver {
    pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
        self.shared.waker.register(cx.waker());
        self.shared.value.load(Ordering::SeqCst)
    }

    pub(crate) fn peek(&self) -> Value {
        self.shared.value.load(Ordering::Relaxed)
    }
}