calloop/sources/ping/
eventfd.rs1use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
22use std::sync::Arc;
23
24use rustix::event::{eventfd, EventfdFlags};
25use rustix::io::{read, write, Errno};
26
27use super::PingError;
28use crate::{
29 generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
30};
31
32const INCREMENT_PING: u64 = 0x2;
36const INCREMENT_CLOSE: u64 = 0x1;
37
38#[inline]
39pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
40 let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
41
42 let fd = Arc::new(read);
48
49 let ping = Ping {
50 event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
51 };
52
53 let source = PingSource {
54 event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
55 };
56
57 Ok((ping, source))
58}
59
60#[inline]
63fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
64 assert!(count > 0);
65 match write(fd, &count.to_ne_bytes()) {
66 Ok(_) => Ok(()),
68
69 Err(Errno::AGAIN) => Ok(()),
72
73 Err(e) => Err(e.into()),
75 }
76}
77
78#[inline]
79fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
80 const NBYTES: usize = 8;
82 let mut buf = [0u8; NBYTES];
83
84 match read(fd, &mut buf) {
85 Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
88
89 Ok(_) => unreachable!(),
90
91 Err(e) => Err(e.into()),
93 }
94}
95
96#[derive(Debug)]
98struct ArcAsFd(Arc<OwnedFd>);
99
100impl AsFd for ArcAsFd {
101 fn as_fd(&self) -> BorrowedFd {
102 self.0.as_fd()
103 }
104}
105
106#[derive(Debug)]
108pub struct PingSource {
109 event: Generic<ArcAsFd>,
110}
111
112impl EventSource for PingSource {
113 type Event = ();
114 type Metadata = ();
115 type Ret = ();
116 type Error = PingError;
117
118 fn process_events<C>(
119 &mut self,
120 readiness: Readiness,
121 token: Token,
122 mut callback: C,
123 ) -> Result<PostAction, Self::Error>
124 where
125 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
126 {
127 self.event
128 .process_events(readiness, token, |_, fd| {
129 let counter = drain_ping(fd.as_fd())?;
130
131 let close = (counter & INCREMENT_CLOSE) != 0;
135 let ping = (counter & (u64::MAX - 1)) != 0;
136
137 if ping {
138 callback((), &mut ());
139 }
140
141 if close {
142 Ok(PostAction::Remove)
143 } else {
144 Ok(PostAction::Continue)
145 }
146 })
147 .map_err(|e| PingError(e.into()))
148 }
149
150 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
151 self.event.register(poll, token_factory)
152 }
153
154 fn reregister(
155 &mut self,
156 poll: &mut Poll,
157 token_factory: &mut TokenFactory,
158 ) -> crate::Result<()> {
159 self.event.reregister(poll, token_factory)
160 }
161
162 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
163 self.event.unregister(poll)
164 }
165}
166
167#[derive(Clone, Debug)]
168pub struct Ping {
169 event: Arc<FlagOnDrop>,
172}
173
174impl Ping {
175 pub fn ping(&self) {
177 if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
178 log::warn!("[calloop] Failed to write a ping: {:?}", e);
179 }
180 }
181}
182
183#[derive(Debug)]
186struct FlagOnDrop(Arc<OwnedFd>);
187
188impl Drop for FlagOnDrop {
189 fn drop(&mut self) {
190 if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
191 log::warn!("[calloop] Failed to send close ping: {:?}", e);
192 }
193 }
194}