1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
//! Eventfd based implementation of the ping event source.
//!
//! # Implementation notes
//!
//! The eventfd is a much lighter signalling mechanism provided by the Linux
//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
//! 64-bit counter.
//!
//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
//! the sole owner of the eventfd itself. The senders have weak references to
//! the eventfd, and if the source is dropped before the senders, they will
//! simply not do anything (except log a message).
//!
//! To differentiate between regular ping events and close ping events, we add 2
//! to the counter for regular events and 1 for close events. In the source we
//! can then check the LSB and if it's set, we know it was a close event. This
//! only works if a close event never fires more than once.

use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
use std::sync::Arc;

use rustix::event::{eventfd, EventfdFlags};
use rustix::io::{read, write, Errno};

use super::PingError;
use crate::{
    generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};

// These are not bitfields! They are increments to add to the eventfd counter.
// Since the fd can only be closed once, we can effectively use the
// INCREMENT_CLOSE value as a bitmask when checking.
const INCREMENT_PING: u64 = 0x2;
const INCREMENT_CLOSE: u64 = 0x1;

#[inline]
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
    let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;

    // We only have one fd for the eventfd. If the sending end closes it when
    // all copies are dropped, the receiving end will be closed as well. We need
    // to make sure the fd is not closed until all holders of it have dropped
    // it.

    let fd = Arc::new(read);

    let ping = Ping {
        event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
    };

    let source = PingSource {
        event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
    };

    Ok((ping, source))
}

// Helper functions for the event source IO.

#[inline]
fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
    assert!(count > 0);
    match write(fd, &count.to_ne_bytes()) {
        // The write succeeded, the ping will wake up the loop.
        Ok(_) => Ok(()),

        // The counter hit its cap, which means previous calls to write() will
        // wake up the loop.
        Err(Errno::AGAIN) => Ok(()),

        // Anything else is a real error.
        Err(e) => Err(e.into()),
    }
}

#[inline]
fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
    // The eventfd counter is effectively a u64.
    const NBYTES: usize = 8;
    let mut buf = [0u8; NBYTES];

    match read(fd, &mut buf) {
        // Reading from an eventfd should only ever produce 8 bytes. No looping
        // is required.
        Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),

        Ok(_) => unreachable!(),

        // Any other error can be propagated.
        Err(e) => Err(e.into()),
    }
}

// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
#[derive(Debug)]
struct ArcAsFd(Arc<OwnedFd>);

impl AsFd for ArcAsFd {
    fn as_fd(&self) -> BorrowedFd {
        self.0.as_fd()
    }
}

// The event source is simply a generic source with one of the eventfds.
#[derive(Debug)]
pub struct PingSource {
    event: Generic<ArcAsFd>,
}

impl EventSource for PingSource {
    type Event = ();
    type Metadata = ();
    type Ret = ();
    type Error = PingError;

    fn process_events<C>(
        &mut self,
        readiness: Readiness,
        token: Token,
        mut callback: C,
    ) -> Result<PostAction, Self::Error>
    where
        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        self.event
            .process_events(readiness, token, |_, fd| {
                let counter = drain_ping(fd.as_fd())?;

                // If the LSB is set, it means we were closed. If anything else
                // is also set, it means we were pinged. The two are not
                // mutually exclusive.
                let close = (counter & INCREMENT_CLOSE) != 0;
                let ping = (counter & (u64::MAX - 1)) != 0;

                if ping {
                    callback((), &mut ());
                }

                if close {
                    Ok(PostAction::Remove)
                } else {
                    Ok(PostAction::Continue)
                }
            })
            .map_err(|e| PingError(e.into()))
    }

    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
        self.event.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> crate::Result<()> {
        self.event.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
        self.event.unregister(poll)
    }
}

#[derive(Clone, Debug)]
pub struct Ping {
    // This is an Arc because it's potentially shared with clones. The last one
    // dropped needs to signal to the event source via the eventfd.
    event: Arc<FlagOnDrop>,
}

impl Ping {
    /// Send a ping to the `PingSource`.
    pub fn ping(&self) {
        if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
            log::warn!("[calloop] Failed to write a ping: {:?}", e);
        }
    }
}

/// This manages signalling to the PingSource when it's dropped. There should
/// only ever be one of these per PingSource.
#[derive(Debug)]
struct FlagOnDrop(Arc<OwnedFd>);

impl Drop for FlagOnDrop {
    fn drop(&mut self) {
        if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
            log::warn!("[calloop] Failed to send close ping: {:?}", e);
        }
    }
}