mio/sys/unix/
pipe.rs

1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::fd::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9    let mut fds: [RawFd; 2] = [-1, -1];
10
11    #[cfg(any(
12        target_os = "android",
13        target_os = "dragonfly",
14        target_os = "freebsd",
15        target_os = "fuchsia",
16        target_os = "hurd",
17        target_os = "linux",
18        target_os = "netbsd",
19        target_os = "openbsd",
20        target_os = "illumos",
21        target_os = "redox",
22        target_os = "solaris",
23        target_os = "vita",
24        target_os = "cygwin",
25    ))]
26    unsafe {
27        if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
28            return Err(io::Error::last_os_error());
29        }
30    }
31
32    #[cfg(any(
33        target_os = "aix",
34        target_os = "haiku",
35        target_os = "ios",
36        target_os = "macos",
37        target_os = "tvos",
38        target_os = "visionos",
39        target_os = "watchos",
40        target_os = "espidf",
41        target_os = "nto",
42    ))]
43    unsafe {
44        // For platforms that don't have `pipe2(2)` we need to manually set the
45        // correct flags on the file descriptor.
46        if libc::pipe(fds.as_mut_ptr()) != 0 {
47            return Err(io::Error::last_os_error());
48        }
49
50        for fd in &fds {
51            if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
52                || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
53            {
54                let err = io::Error::last_os_error();
55                // Don't leak file descriptors. Can't handle closing error though.
56                let _ = libc::close(fds[0]);
57                let _ = libc::close(fds[1]);
58                return Err(err);
59            }
60        }
61    }
62
63    Ok(fds)
64}
65
66cfg_os_ext! {
67use std::fs::File;
68use std::io::{IoSlice, IoSliceMut, Read, Write};
69use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
70use std::process::{ChildStderr, ChildStdin, ChildStdout};
71
72use crate::io_source::IoSource;
73use crate::{event, Interest, Registry, Token};
74
75/// Create a new non-blocking Unix pipe.
76///
77/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
78/// inter-process or thread communication channel.
79///
80/// This channel may be created before forking the process and then one end used
81/// in each process, e.g. the parent process has the sending end to send command
82/// to the child process.
83///
84/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
85///
86/// # Events
87///
88/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
89/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
90/// written to the `Sender` the `Receiver` will receive an [readable event].
91///
92/// In addition to those events, events will also be generated if the other side
93/// is dropped. To check if the `Sender` is dropped you'll need to check
94/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
95/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
96/// returns true the `Receiver` was dropped. Also see the second example below.
97///
98/// [`WRITABLE`]: Interest::WRITABLE
99/// [writable events]: event::Event::is_writable
100/// [`READABLE`]: Interest::READABLE
101/// [readable event]: event::Event::is_readable
102/// [`is_read_closed`]: event::Event::is_read_closed
103/// [`is_write_closed`]: event::Event::is_write_closed
104///
105/// # Deregistering
106///
107/// Both `Sender` and `Receiver` will deregister themselves when dropped,
108/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
109///
110/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
111///
112/// # Examples
113///
114/// Simple example that writes data into the sending end and read it from the
115/// receiving end.
116///
117/// ```
118/// use std::io::{self, Read, Write};
119///
120/// use mio::{Poll, Events, Interest, Token};
121/// use mio::unix::pipe;
122///
123/// // Unique tokens for the two ends of the channel.
124/// const PIPE_RECV: Token = Token(0);
125/// const PIPE_SEND: Token = Token(1);
126///
127/// # fn main() -> io::Result<()> {
128/// // Create our `Poll` instance and the `Events` container.
129/// let mut poll = Poll::new()?;
130/// let mut events = Events::with_capacity(8);
131///
132/// // Create a new pipe.
133/// let (mut sender, mut receiver) = pipe::new()?;
134///
135/// // Register both ends of the channel.
136/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
137/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
138///
139/// const MSG: &[u8; 11] = b"Hello world";
140///
141/// loop {
142///     poll.poll(&mut events, None)?;
143///
144///     for event in events.iter() {
145///         match event.token() {
146///             PIPE_SEND => sender.write(MSG)
147///                 .and_then(|n| if n != MSG.len() {
148///                         // We'll consider a short write an error in this
149///                         // example. NOTE: we can't use `write_all` with
150///                         // non-blocking I/O.
151///                         Err(io::ErrorKind::WriteZero.into())
152///                     } else {
153///                         Ok(())
154///                     })?,
155///             PIPE_RECV => {
156///                 let mut buf = [0; 11];
157///                 let n = receiver.read(&mut buf)?;
158///                 println!("received: {:?}", &buf[0..n]);
159///                 assert_eq!(n, MSG.len());
160///                 assert_eq!(&buf, &*MSG);
161///                 return Ok(());
162///             },
163///             _ => unreachable!(),
164///         }
165///     }
166/// }
167/// # }
168/// ```
169///
170/// Example that receives an event once the `Sender` is dropped.
171///
172/// ```
173/// # use std::io;
174/// #
175/// # use mio::{Poll, Events, Interest, Token};
176/// # use mio::unix::pipe;
177/// #
178/// # const PIPE_RECV: Token = Token(0);
179/// # const PIPE_SEND: Token = Token(1);
180/// #
181/// # fn main() -> io::Result<()> {
182/// // Same setup as in the example above.
183/// let mut poll = Poll::new()?;
184/// let mut events = Events::with_capacity(8);
185///
186/// let (mut sender, mut receiver) = pipe::new()?;
187///
188/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
189/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
190///
191/// // Drop the sender.
192/// drop(sender);
193///
194/// poll.poll(&mut events, None)?;
195///
196/// for event in events.iter() {
197///     match event.token() {
198///         PIPE_RECV if event.is_read_closed() => {
199///             // Detected that the sender was dropped.
200///             println!("Sender dropped!");
201///             return Ok(());
202///         },
203///         _ => unreachable!(),
204///     }
205/// }
206/// # unreachable!();
207/// # }
208/// ```
209pub fn new() -> io::Result<(Sender, Receiver)> {
210    let fds = new_raw()?;
211    // SAFETY: `new_raw` initialised the `fds` above.
212    let r = unsafe { Receiver::from_raw_fd(fds[0]) };
213    let w = unsafe { Sender::from_raw_fd(fds[1]) };
214    Ok((w, r))
215}
216
217/// Sending end of an Unix pipe.
218///
219/// See [`new`] for documentation, including examples.
220#[derive(Debug)]
221pub struct Sender {
222    inner: IoSource<File>,
223}
224
225impl Sender {
226    /// Set the `Sender` into or out of non-blocking mode.
227    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
228        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
229    }
230
231    /// Execute an I/O operation ensuring that the socket receives more events
232    /// if it hits a [`WouldBlock`] error.
233    ///
234    /// # Notes
235    ///
236    /// This method is required to be called for **all** I/O operations to
237    /// ensure the user will receive events once the socket is ready again after
238    /// returning a [`WouldBlock`] error.
239    ///
240    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
241    ///
242    /// # Examples
243    ///
244    /// ```
245    /// # use std::error::Error;
246    /// #
247    /// # fn main() -> Result<(), Box<dyn Error>> {
248    /// use std::io;
249    /// use std::os::fd::AsRawFd;
250    /// use mio::unix::pipe;
251    ///
252    /// let (sender, receiver) = pipe::new()?;
253    ///
254    /// // Wait until the sender is writable...
255    ///
256    /// // Write to the sender using a direct libc call, of course the
257    /// // `io::Write` implementation would be easier to use.
258    /// let buf = b"hello";
259    /// let n = sender.try_io(|| {
260    ///     let buf_ptr = &buf as *const _ as *const _;
261    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
262    ///     if res != -1 {
263    ///         Ok(res as usize)
264    ///     } else {
265    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
266    ///         // should return `WouldBlock` error.
267    ///         Err(io::Error::last_os_error())
268    ///     }
269    /// })?;
270    /// eprintln!("write {} bytes", n);
271    ///
272    /// // Wait until the receiver is readable...
273    ///
274    /// // Read from the receiver using a direct libc call, of course the
275    /// // `io::Read` implementation would be easier to use.
276    /// let mut buf = [0; 512];
277    /// let n = receiver.try_io(|| {
278    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
279    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
280    ///     if res != -1 {
281    ///         Ok(res as usize)
282    ///     } else {
283    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
284    ///         // should return `WouldBlock` error.
285    ///         Err(io::Error::last_os_error())
286    ///     }
287    /// })?;
288    /// eprintln!("read {} bytes", n);
289    /// # Ok(())
290    /// # }
291    /// ```
292    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
293    where
294        F: FnOnce() -> io::Result<T>,
295    {
296        self.inner.do_io(|_| f())
297    }
298}
299
300impl event::Source for Sender {
301    fn register(
302        &mut self,
303        registry: &Registry,
304        token: Token,
305        interests: Interest,
306    ) -> io::Result<()> {
307        self.inner.register(registry, token, interests)
308    }
309
310    fn reregister(
311        &mut self,
312        registry: &Registry,
313        token: Token,
314        interests: Interest,
315    ) -> io::Result<()> {
316        self.inner.reregister(registry, token, interests)
317    }
318
319    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
320        self.inner.deregister(registry)
321    }
322}
323
324impl Write for Sender {
325    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
326        self.inner.do_io(|mut sender| sender.write(buf))
327    }
328
329    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
330        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
331    }
332
333    fn flush(&mut self) -> io::Result<()> {
334        self.inner.do_io(|mut sender| sender.flush())
335    }
336}
337
338impl Write for &Sender {
339    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
340        self.inner.do_io(|mut sender| sender.write(buf))
341    }
342
343    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
344        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
345    }
346
347    fn flush(&mut self) -> io::Result<()> {
348        self.inner.do_io(|mut sender| sender.flush())
349    }
350}
351
352/// # Notes
353///
354/// The underlying pipe is **not** set to non-blocking.
355impl From<ChildStdin> for Sender {
356    fn from(stdin: ChildStdin) -> Sender {
357        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
358        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
359    }
360}
361
362impl FromRawFd for Sender {
363    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
364        Sender {
365            inner: IoSource::new(File::from_raw_fd(fd)),
366        }
367    }
368}
369
370impl AsRawFd for Sender {
371    fn as_raw_fd(&self) -> RawFd {
372        self.inner.as_raw_fd()
373    }
374}
375
376impl IntoRawFd for Sender {
377    fn into_raw_fd(self) -> RawFd {
378        self.inner.into_inner().into_raw_fd()
379    }
380}
381
382impl From<Sender> for OwnedFd {
383    fn from(sender: Sender) -> Self {
384        sender.inner.into_inner().into()
385    }
386}
387
388impl AsFd for Sender {
389    fn as_fd(&self) -> BorrowedFd<'_> {
390        self.inner.as_fd()
391    }
392}
393
394impl From<OwnedFd> for Sender {
395    fn from(fd: OwnedFd) -> Self {
396        Sender {
397            inner: IoSource::new(File::from(fd)),
398        }
399    }
400}
401
402/// Receiving end of an Unix pipe.
403///
404/// See [`new`] for documentation, including examples.
405#[derive(Debug)]
406pub struct Receiver {
407    inner: IoSource<File>,
408}
409
410impl Receiver {
411    /// Set the `Receiver` into or out of non-blocking mode.
412    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
413        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
414    }
415
416    /// Execute an I/O operation ensuring that the socket receives more events
417    /// if it hits a [`WouldBlock`] error.
418    ///
419    /// # Notes
420    ///
421    /// This method is required to be called for **all** I/O operations to
422    /// ensure the user will receive events once the socket is ready again after
423    /// returning a [`WouldBlock`] error.
424    ///
425    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
426    ///
427    /// # Examples
428    ///
429    /// ```
430    /// # use std::error::Error;
431    /// #
432    /// # fn main() -> Result<(), Box<dyn Error>> {
433    /// use std::io;
434    /// use std::os::fd::AsRawFd;
435    /// use mio::unix::pipe;
436    ///
437    /// let (sender, receiver) = pipe::new()?;
438    ///
439    /// // Wait until the sender is writable...
440    ///
441    /// // Write to the sender using a direct libc call, of course the
442    /// // `io::Write` implementation would be easier to use.
443    /// let buf = b"hello";
444    /// let n = sender.try_io(|| {
445    ///     let buf_ptr = &buf as *const _ as *const _;
446    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
447    ///     if res != -1 {
448    ///         Ok(res as usize)
449    ///     } else {
450    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
451    ///         // should return `WouldBlock` error.
452    ///         Err(io::Error::last_os_error())
453    ///     }
454    /// })?;
455    /// eprintln!("write {} bytes", n);
456    ///
457    /// // Wait until the receiver is readable...
458    ///
459    /// // Read from the receiver using a direct libc call, of course the
460    /// // `io::Read` implementation would be easier to use.
461    /// let mut buf = [0; 512];
462    /// let n = receiver.try_io(|| {
463    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
464    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
465    ///     if res != -1 {
466    ///         Ok(res as usize)
467    ///     } else {
468    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
469    ///         // should return `WouldBlock` error.
470    ///         Err(io::Error::last_os_error())
471    ///     }
472    /// })?;
473    /// eprintln!("read {} bytes", n);
474    /// # Ok(())
475    /// # }
476    /// ```
477    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
478    where
479        F: FnOnce() -> io::Result<T>,
480    {
481        self.inner.do_io(|_| f())
482    }
483}
484
485impl event::Source for Receiver {
486    fn register(
487        &mut self,
488        registry: &Registry,
489        token: Token,
490        interests: Interest,
491    ) -> io::Result<()> {
492        self.inner.register(registry, token, interests)
493    }
494
495    fn reregister(
496        &mut self,
497        registry: &Registry,
498        token: Token,
499        interests: Interest,
500    ) -> io::Result<()> {
501        self.inner.reregister(registry, token, interests)
502    }
503
504    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
505        self.inner.deregister(registry)
506    }
507}
508
509impl Read for Receiver {
510    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
511        self.inner.do_io(|mut sender| sender.read(buf))
512    }
513
514    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
515        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
516    }
517}
518
519impl Read for &Receiver {
520    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
521        self.inner.do_io(|mut sender| sender.read(buf))
522    }
523
524    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
525        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
526    }
527}
528
529/// # Notes
530///
531/// The underlying pipe is **not** set to non-blocking.
532impl From<ChildStdout> for Receiver {
533    fn from(stdout: ChildStdout) -> Receiver {
534        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
535        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
536    }
537}
538
539/// # Notes
540///
541/// The underlying pipe is **not** set to non-blocking.
542impl From<ChildStderr> for Receiver {
543    fn from(stderr: ChildStderr) -> Receiver {
544        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
545        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
546    }
547}
548
549impl IntoRawFd for Receiver {
550    fn into_raw_fd(self) -> RawFd {
551        self.inner.into_inner().into_raw_fd()
552    }
553}
554
555impl AsRawFd for Receiver {
556    fn as_raw_fd(&self) -> RawFd {
557        self.inner.as_raw_fd()
558    }
559}
560
561impl FromRawFd for Receiver {
562    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
563        Receiver {
564            inner: IoSource::new(File::from_raw_fd(fd)),
565        }
566    }
567}
568
569impl From<Receiver> for OwnedFd {
570    fn from(receiver: Receiver) -> Self {
571        receiver.inner.into_inner().into()
572    }
573}
574
575impl AsFd for Receiver {
576    fn as_fd(&self) -> BorrowedFd<'_> {
577        self.inner.as_fd()
578    }
579}
580
581impl From<OwnedFd> for Receiver {
582    fn from(fd: OwnedFd) -> Self {
583        Receiver {
584            inner: IoSource::new(File::from(fd)),
585        }
586    }
587}
588
589#[cfg(not(any(target_os = "aix", target_os = "illumos", target_os = "solaris", target_os = "vita")))]
590fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
591    let value = nonblocking as libc::c_int;
592    if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
593        Err(io::Error::last_os_error())
594    } else {
595        Ok(())
596    }
597}
598
599#[cfg(any(target_os = "aix", target_os = "illumos", target_os = "solaris", target_os = "vita"))]
600fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
601    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
602    if flags < 0 {
603        return Err(io::Error::last_os_error());
604    }
605
606    let nflags = if nonblocking {
607        flags | libc::O_NONBLOCK
608    } else {
609        flags & !libc::O_NONBLOCK
610    };
611
612    if flags != nflags {
613        if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
614            return Err(io::Error::last_os_error());
615        }
616    }
617
618    Ok(())
619}
620} // `cfg_os_ext!`.