Skip to main content

mio/sys/unix/selector/
epoll.rs

1use std::io;
2use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
3#[cfg(debug_assertions)]
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::Duration;
6
7use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
8
9use crate::{Interest, Token};
10
11cfg_io_source! {
12    use std::ptr;
13}
14
15/// Unique id for use as `SelectorId`.
16#[cfg(debug_assertions)]
17static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
18
19#[derive(Debug)]
20pub struct Selector {
21    #[cfg(debug_assertions)]
22    id: usize,
23    ep: OwnedFd,
24}
25
26impl Selector {
27    pub fn new() -> io::Result<Selector> {
28        // SAFETY: `epoll_create1(2)` ensures the fd is valid.
29        let ep = unsafe { OwnedFd::from_raw_fd(syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?) };
30        Ok(Selector {
31            #[cfg(debug_assertions)]
32            id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
33            ep,
34        })
35    }
36
37    pub fn try_clone(&self) -> io::Result<Selector> {
38        self.ep.try_clone().map(|ep| Selector {
39            // It's the same selector, so we use the same id.
40            #[cfg(debug_assertions)]
41            id: self.id,
42            ep,
43        })
44    }
45
46    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
47        let timeout = timeout
48            .map(|to| {
49                // `Duration::as_millis` truncates, so round up. This avoids
50                // turning sub-millisecond timeouts into a zero timeout, unless
51                // the caller explicitly requests that by specifying a zero
52                // timeout.
53                libc::c_int::try_from(
54                    to.checked_add(Duration::from_nanos(999_999))
55                        .unwrap_or(to)
56                        .as_millis(),
57                )
58                .unwrap_or(libc::c_int::MAX)
59            })
60            .unwrap_or(-1);
61
62        events.clear();
63        syscall!(epoll_wait(
64            self.ep.as_raw_fd(),
65            events.as_mut_ptr(),
66            events.capacity() as i32,
67            timeout,
68        ))
69        .map(|n_events| {
70            // This is safe because `epoll_wait` ensures that `n_events` are
71            // assigned.
72            unsafe { events.set_len(n_events as usize) };
73        })
74    }
75
76    pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
77        let mut event = libc::epoll_event {
78            events: interests_to_epoll(interests),
79            u64: usize::from(token) as u64,
80            #[cfg(target_os = "redox")]
81            _pad: 0,
82        };
83
84        let ep = self.ep.as_raw_fd();
85        syscall!(epoll_ctl(ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
86    }
87
88    cfg_any_os_ext! {
89    pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
90        let mut event = libc::epoll_event {
91            events: interests_to_epoll(interests),
92            u64: usize::from(token) as u64,
93            #[cfg(target_os = "redox")]
94            _pad: 0,
95        };
96
97        let ep = self.ep.as_raw_fd();
98        syscall!(epoll_ctl(ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
99    }
100
101    pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
102        let ep = self.ep.as_raw_fd();
103        syscall!(epoll_ctl(ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
104    }
105    }
106}
107
108cfg_io_source! {
109    impl Selector {
110        #[cfg(debug_assertions)]
111        pub fn id(&self) -> usize {
112            self.id
113        }
114    }
115}
116
117impl AsFd for Selector {
118    fn as_fd(&self) -> BorrowedFd<'_> {
119        self.ep.as_fd()
120    }
121}
122
123impl AsRawFd for Selector {
124    fn as_raw_fd(&self) -> RawFd {
125        self.ep.as_raw_fd()
126    }
127}
128
129fn interests_to_epoll(interests: Interest) -> u32 {
130    let mut kind = EPOLLET;
131
132    if interests.is_readable() {
133        kind = kind | EPOLLIN | EPOLLRDHUP;
134    }
135
136    if interests.is_writable() {
137        kind |= EPOLLOUT;
138    }
139
140    if interests.is_priority() {
141        kind |= EPOLLPRI;
142    }
143
144    kind as u32
145}
146
147pub type Event = libc::epoll_event;
148pub type Events = Vec<Event>;
149
150pub mod event {
151    use std::fmt;
152
153    use crate::sys::Event;
154    use crate::Token;
155
156    pub fn token(event: &Event) -> Token {
157        Token(event.u64 as usize)
158    }
159
160    pub fn is_readable(event: &Event) -> bool {
161        (event.events as libc::c_int & libc::EPOLLIN) != 0
162            || (event.events as libc::c_int & libc::EPOLLPRI) != 0
163    }
164
165    pub fn is_writable(event: &Event) -> bool {
166        (event.events as libc::c_int & libc::EPOLLOUT) != 0
167    }
168
169    pub fn is_error(event: &Event) -> bool {
170        (event.events as libc::c_int & libc::EPOLLERR) != 0
171    }
172
173    pub fn is_read_closed(event: &Event) -> bool {
174        // Both halves of the socket have closed
175        event.events as libc::c_int & libc::EPOLLHUP != 0
176            // Socket has received FIN or called shutdown(SHUT_RD)
177            || (event.events as libc::c_int & libc::EPOLLIN != 0
178                && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
179    }
180
181    pub fn is_write_closed(event: &Event) -> bool {
182        // Both halves of the socket have closed
183        event.events as libc::c_int & libc::EPOLLHUP != 0
184            // Unix pipe write end has closed
185            || (event.events as libc::c_int & libc::EPOLLOUT != 0
186                && event.events as libc::c_int & libc::EPOLLERR != 0)
187            // The other side (read end) of a Unix pipe has closed.
188            || event.events as libc::c_int == libc::EPOLLERR
189    }
190
191    pub fn is_priority(event: &Event) -> bool {
192        (event.events as libc::c_int & libc::EPOLLPRI) != 0
193    }
194
195    pub fn is_aio(_: &Event) -> bool {
196        // Not supported in the kernel, only in libc.
197        false
198    }
199
200    pub fn is_lio(_: &Event) -> bool {
201        // Not supported.
202        false
203    }
204
205    pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
206        #[allow(clippy::trivially_copy_pass_by_ref)]
207        fn check_events(got: &u32, want: &libc::c_int) -> bool {
208            (*got as libc::c_int & want) != 0
209        }
210        debug_detail!(
211            EventsDetails(u32),
212            check_events,
213            libc::EPOLLIN,
214            libc::EPOLLPRI,
215            libc::EPOLLOUT,
216            libc::EPOLLRDNORM,
217            libc::EPOLLRDBAND,
218            libc::EPOLLWRNORM,
219            libc::EPOLLWRBAND,
220            libc::EPOLLMSG,
221            libc::EPOLLERR,
222            libc::EPOLLHUP,
223            libc::EPOLLET,
224            libc::EPOLLRDHUP,
225            libc::EPOLLONESHOT,
226            libc::EPOLLEXCLUSIVE,
227            libc::EPOLLWAKEUP,
228            libc::EPOLL_CLOEXEC,
229        );
230
231        // Can't reference fields in packed structures.
232        let e_u64 = event.u64;
233        f.debug_struct("epoll_event")
234            .field("events", &EventsDetails(event.events))
235            .field("u64", &e_u64)
236            .finish()
237    }
238}
239
240// No special requirement from the implementation around waking.
241pub(crate) use crate::sys::unix::waker::Waker;
242
243cfg_io_source! {
244    mod stateless_io_source;
245    pub(crate) use stateless_io_source::IoSourceState;
246}