Skip to main content

tokio/net/unix/
pipe.rs

1//! Unix pipe types.
2
3use crate::io::interest::Interest;
4use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5
6use mio::unix::pipe as mio_pipe;
7use std::fs::File;
8use std::io::{self, Read, Write};
9use std::os::unix::fs::OpenOptionsExt;
10use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
11use std::path::Path;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15cfg_io_util! {
16    use bytes::BufMut;
17}
18
19/// Creates a new anonymous Unix pipe.
20///
21/// This function will open a new pipe and associate both pipe ends with the default
22/// event loop.
23///
24/// If you need to create a pipe for communication with a spawned process, you can
25/// use [`Stdio::piped()`] instead.
26///
27/// [`Stdio::piped()`]: std::process::Stdio::piped
28///
29/// # Errors
30///
31/// If creating a pipe fails, this function will return with the related OS error.
32///
33/// # Examples
34///
35/// Create a pipe and pass the writing end to a spawned process.
36///
37/// ```no_run
38/// use tokio::net::unix::pipe;
39/// use tokio::process::Command;
40/// # use tokio::io::AsyncReadExt;
41/// # use std::error::Error;
42///
43/// # async fn dox() -> Result<(), Box<dyn Error>> {
44/// let (tx, mut rx) = pipe::pipe()?;
45/// let mut buffer = String::new();
46///
47/// let status = Command::new("echo")
48///     .arg("Hello, world!")
49///     .stdout(tx.into_blocking_fd()?)
50///     .status();
51/// rx.read_to_string(&mut buffer).await?;
52///
53/// assert!(status.await?.success());
54/// assert_eq!(buffer, "Hello, world!\n");
55/// # Ok(())
56/// # }
57/// ```
58///
59/// # Panics
60///
61/// This function panics if it is not called from within a runtime with
62/// IO enabled.
63///
64/// The runtime is usually set implicitly when this function is called
65/// from a future driven by a tokio runtime, otherwise runtime can be set
66/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
67pub fn pipe() -> io::Result<(Sender, Receiver)> {
68    let (tx, rx) = mio_pipe::new()?;
69    Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
70}
71
72/// Options and flags which can be used to configure how a FIFO file is opened.
73///
74/// This builder allows configuring how to create a pipe end from a FIFO file.
75/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
76/// then chain calls to methods to set each option, then call either
77/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
78/// are trying to open. This will give you a [`io::Result`] with a pipe end
79/// inside that you can further operate on.
80///
81/// [`new`]: OpenOptions::new
82/// [`open_receiver`]: OpenOptions::open_receiver
83/// [`open_sender`]: OpenOptions::open_sender
84///
85/// # Examples
86///
87/// Opening a pair of pipe ends from a FIFO file:
88///
89/// ```no_run
90/// use tokio::net::unix::pipe;
91/// # use std::error::Error;
92///
93/// const FIFO_NAME: &str = "path/to/a/fifo";
94///
95/// # async fn dox() -> Result<(), Box<dyn Error>> {
96/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
97/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
98/// # Ok(())
99/// # }
100/// ```
101///
102/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
103///
104/// ```ignore
105/// use tokio::net::unix::pipe;
106/// use nix::{unistd::mkfifo, sys::stat::Mode};
107/// # use std::error::Error;
108///
109/// // Our program has exclusive access to this path.
110/// const FIFO_NAME: &str = "path/to/a/new/fifo";
111///
112/// # async fn dox() -> Result<(), Box<dyn Error>> {
113/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
114/// let tx = pipe::OpenOptions::new()
115///     .read_write(true)
116///     .unchecked(true)
117///     .open_sender(FIFO_NAME)?;
118/// # Ok(())
119/// # }
120/// ```
121#[derive(Clone, Debug)]
122pub struct OpenOptions {
123    #[cfg(any(target_os = "linux", target_os = "android"))]
124    read_write: bool,
125    unchecked: bool,
126}
127
128impl OpenOptions {
129    /// Creates a blank new set of options ready for configuration.
130    ///
131    /// All options are initially set to `false`.
132    pub fn new() -> OpenOptions {
133        OpenOptions {
134            #[cfg(any(target_os = "linux", target_os = "android"))]
135            read_write: false,
136            unchecked: false,
137        }
138    }
139
140    /// Sets the option for read-write access.
141    ///
142    /// This option, when true, will indicate that a FIFO file will be opened
143    /// in read-write access mode. This operation is not defined by the POSIX
144    /// standard and is only guaranteed to work on Linux.
145    ///
146    /// # Examples
147    ///
148    /// Opening a [`Sender`] even if there are no open reading ends:
149    ///
150    /// ```ignore
151    /// use tokio::net::unix::pipe;
152    ///
153    /// let tx = pipe::OpenOptions::new()
154    ///     .read_write(true)
155    ///     .open_sender("path/to/a/fifo");
156    /// ```
157    ///
158    /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
159    /// fail with [`UnexpectedEof`] during reading if all writing ends of the
160    /// pipe close the FIFO file.
161    ///
162    /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
163    ///
164    /// ```ignore
165    /// use tokio::net::unix::pipe;
166    ///
167    /// let tx = pipe::OpenOptions::new()
168    ///     .read_write(true)
169    ///     .open_receiver("path/to/a/fifo");
170    /// ```
171    #[cfg(any(target_os = "linux", target_os = "android"))]
172    #[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
173    pub fn read_write(&mut self, value: bool) -> &mut Self {
174        self.read_write = value;
175        self
176    }
177
178    /// Sets the option to skip the check for FIFO file type.
179    ///
180    /// By default, [`open_receiver`] and [`open_sender`] functions will check
181    /// if the opened file is a FIFO file. Set this option to `true` if you are
182    /// sure the file is a FIFO file.
183    ///
184    /// [`open_receiver`]: OpenOptions::open_receiver
185    /// [`open_sender`]: OpenOptions::open_sender
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// use tokio::net::unix::pipe;
191    /// use nix::{unistd::mkfifo, sys::stat::Mode};
192    /// # use std::error::Error;
193    ///
194    /// // Our program has exclusive access to this path.
195    /// const FIFO_NAME: &str = "path/to/a/new/fifo";
196    ///
197    /// # async fn dox() -> Result<(), Box<dyn Error>> {
198    /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
199    /// let rx = pipe::OpenOptions::new()
200    ///     .unchecked(true)
201    ///     .open_receiver(FIFO_NAME)?;
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub fn unchecked(&mut self, value: bool) -> &mut Self {
206        self.unchecked = value;
207        self
208    }
209
210    /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
211    ///
212    /// This function will open the FIFO file at the specified path, possibly
213    /// check if it is a pipe, and associate the pipe with the default event
214    /// loop for reading.
215    ///
216    /// # Errors
217    ///
218    /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
219    /// This function may also fail with other standard OS errors.
220    ///
221    /// # Panics
222    ///
223    /// This function panics if it is not called from within a runtime with
224    /// IO enabled.
225    ///
226    /// The runtime is usually set implicitly when this function is called
227    /// from a future driven by a tokio runtime, otherwise runtime can be set
228    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
229    pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
230        let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
231        Receiver::from_file_unchecked(file)
232    }
233
234    /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
235    ///
236    /// This function will open the FIFO file at the specified path, possibly
237    /// check if it is a pipe, and associate the pipe with the default event
238    /// loop for writing.
239    ///
240    /// # Errors
241    ///
242    /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
243    /// If the file is not opened in read-write access mode and the file is not
244    /// currently open for reading, this function will fail with `ENXIO`.
245    /// This function may also fail with other standard OS errors.
246    ///
247    /// # Panics
248    ///
249    /// This function panics if it is not called from within a runtime with
250    /// IO enabled.
251    ///
252    /// The runtime is usually set implicitly when this function is called
253    /// from a future driven by a tokio runtime, otherwise runtime can be set
254    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
255    pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
256        let file = self.open(path.as_ref(), PipeEnd::Sender)?;
257        Sender::from_file_unchecked(file)
258    }
259
260    fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
261        let mut options = std::fs::OpenOptions::new();
262        options
263            .read(pipe_end == PipeEnd::Receiver)
264            .write(pipe_end == PipeEnd::Sender)
265            .custom_flags(libc::O_NONBLOCK);
266
267        #[cfg(any(target_os = "linux", target_os = "android"))]
268        if self.read_write {
269            options.read(true).write(true);
270        }
271
272        let file = options.open(path)?;
273
274        if !self.unchecked && !is_pipe(file.as_fd())? {
275            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
276        }
277
278        Ok(file)
279    }
280}
281
282impl Default for OpenOptions {
283    fn default() -> OpenOptions {
284        OpenOptions::new()
285    }
286}
287
288#[derive(Clone, Copy, PartialEq, Eq, Debug)]
289enum PipeEnd {
290    Sender,
291    Receiver,
292}
293
294/// Writing end of a Unix pipe.
295///
296/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
297///
298/// Opening a named pipe for writing involves a few steps.
299/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
300/// different things:
301///
302/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
303/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
304/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
305///   Sleep for a while and try again.
306/// * Other OS errors not specific to opening FIFO files.
307///
308/// Opening a `Sender` from a FIFO file should look like this:
309///
310/// ```no_run
311/// use tokio::net::unix::pipe;
312/// use tokio::time::{self, Duration};
313///
314/// const FIFO_NAME: &str = "path/to/a/fifo";
315///
316/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
317/// // Wait for a reader to open the file.
318/// let tx = loop {
319///     match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
320///         Ok(tx) => break tx,
321///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
322///         Err(e) => return Err(e.into()),
323///     }
324///
325///     time::sleep(Duration::from_millis(50)).await;
326/// };
327/// # Ok(())
328/// # }
329/// ```
330///
331/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
332/// loop. This is done by opening a named pipe in read-write access mode with
333/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
334/// both a writing end and a reading end, and the latter allows to open a FIFO
335/// without [`ENXIO`] error since the pipe is open for reading as well.
336///
337/// `Sender` cannot be used to read from a pipe, so in practice the read access
338/// is only used when a FIFO is opened. However, using a `Sender` in read-write
339/// mode **may lead to lost data**, because written data will be dropped by the
340/// system as soon as all pipe ends are closed. To avoid lost data you have to
341/// make sure that a reading end has been opened before dropping a `Sender`.
342///
343/// Note that using read-write access mode with FIFO files is not defined by
344/// the POSIX standard and it is only guaranteed to work on Linux.
345///
346/// ```ignore
347/// use tokio::io::AsyncWriteExt;
348/// use tokio::net::unix::pipe;
349///
350/// const FIFO_NAME: &str = "path/to/a/fifo";
351///
352/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
353/// let mut tx = pipe::OpenOptions::new()
354///     .read_write(true)
355///     .open_sender(FIFO_NAME)?;
356///
357/// // Asynchronously write to the pipe before a reader.
358/// tx.write_all(b"hello world").await?;
359/// # Ok(())
360/// # }
361/// ```
362///
363/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
364#[derive(Debug)]
365pub struct Sender {
366    io: PollEvented<mio_pipe::Sender>,
367}
368
369impl Sender {
370    fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
371        let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
372        Ok(Sender { io })
373    }
374
375    /// Creates a new `Sender` from a [`File`].
376    ///
377    /// This function is intended to construct a pipe from a [`File`] representing
378    /// a special FIFO file. It will check if the file is a pipe and has write access,
379    /// set it in non-blocking mode and perform the conversion.
380    ///
381    /// # Errors
382    ///
383    /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
384    /// does not have write access. Also fails with any standard OS error if it occurs.
385    ///
386    /// # Panics
387    ///
388    /// This function panics if it is not called from within a runtime with
389    /// IO enabled.
390    ///
391    /// The runtime is usually set implicitly when this function is called
392    /// from a future driven by a tokio runtime, otherwise runtime can be set
393    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
394    pub fn from_file(file: File) -> io::Result<Sender> {
395        Sender::from_owned_fd(file.into())
396    }
397
398    /// Creates a new `Sender` from an [`OwnedFd`].
399    ///
400    /// This function is intended to construct a pipe from an [`OwnedFd`] representing
401    /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
402    /// is a pipe and has write access, set it in non-blocking mode and perform the
403    /// conversion.
404    ///
405    /// # Errors
406    ///
407    /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
408    /// or it does not have write access. Also fails with any standard OS error if it
409    /// occurs.
410    ///
411    /// # Panics
412    ///
413    /// This function panics if it is not called from within a runtime with
414    /// IO enabled.
415    ///
416    /// The runtime is usually set implicitly when this function is called
417    /// from a future driven by a tokio runtime, otherwise runtime can be set
418    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
419    pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
420        if !is_pipe(owned_fd.as_fd())? {
421            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
422        }
423
424        let flags = get_file_flags(owned_fd.as_fd())?;
425        if has_write_access(flags) {
426            set_nonblocking(owned_fd.as_fd(), flags)?;
427            Sender::from_owned_fd_unchecked(owned_fd)
428        } else {
429            Err(io::Error::new(
430                io::ErrorKind::InvalidInput,
431                "not in O_WRONLY or O_RDWR access mode",
432            ))
433        }
434    }
435
436    /// Creates a new `Sender` from a [`File`] without checking pipe properties.
437    ///
438    /// This function is intended to construct a pipe from a File representing
439    /// a special FIFO file. The conversion assumes nothing about the underlying
440    /// file; it is left up to the user to make sure it is opened with write access,
441    /// represents a pipe and is set in non-blocking mode.
442    ///
443    /// # Examples
444    ///
445    /// ```no_run
446    /// use tokio::net::unix::pipe;
447    /// use std::fs::OpenOptions;
448    /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
449    /// # use std::error::Error;
450    ///
451    /// const FIFO_NAME: &str = "path/to/a/fifo";
452    ///
453    /// # async fn dox() -> Result<(), Box<dyn Error>> {
454    /// let file = OpenOptions::new()
455    ///     .write(true)
456    ///     .custom_flags(libc::O_NONBLOCK)
457    ///     .open(FIFO_NAME)?;
458    /// if file.metadata()?.file_type().is_fifo() {
459    ///     let tx = pipe::Sender::from_file_unchecked(file)?;
460    ///     /* use the Sender */
461    /// }
462    /// # Ok(())
463    /// # }
464    /// ```
465    ///
466    /// # Panics
467    ///
468    /// This function panics if it is not called from within a runtime with
469    /// IO enabled.
470    ///
471    /// The runtime is usually set implicitly when this function is called
472    /// from a future driven by a tokio runtime, otherwise runtime can be set
473    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
474    pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
475        Sender::from_owned_fd_unchecked(file.into())
476    }
477
478    /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
479    ///
480    /// This function is intended to construct a pipe from an [`OwnedFd`] representing
481    /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
482    /// the underlying pipe; it is left up to the user to make sure that the file
483    /// descriptor represents the writing end of a pipe and the pipe is set in
484    /// non-blocking mode.
485    ///
486    /// # Panics
487    ///
488    /// This function panics if it is not called from within a runtime with
489    /// IO enabled.
490    ///
491    /// The runtime is usually set implicitly when this function is called
492    /// from a future driven by a tokio runtime, otherwise runtime can be set
493    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
494    pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
495        // Safety: OwnedFd represents a valid, open file descriptor.
496        let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
497        Sender::from_mio(mio_tx)
498    }
499
500    /// Waits for any of the requested ready states.
501    ///
502    /// This function can be used instead of [`writable()`] to check the returned
503    /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
504    ///
505    /// The function may complete without the pipe being ready. This is a
506    /// false-positive and attempting an operation will return with
507    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
508    /// [`Ready`] set, so you should always check the returned value and possibly
509    /// wait again if the requested states are not set.
510    ///
511    /// [`writable()`]: Self::writable
512    ///
513    /// # Cancel safety
514    ///
515    /// This method is cancel safe. Once a readiness event occurs, the method
516    /// will continue to return immediately until the readiness event is
517    /// consumed by an attempt to write that fails with `WouldBlock` or
518    /// `Poll::Pending`.
519    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
520        let event = self.io.registration().readiness(interest).await?;
521        Ok(event.ready)
522    }
523
524    /// Waits for the pipe to become writable.
525    ///
526    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
527    /// paired with [`try_write()`].
528    ///
529    /// [`try_write()`]: Self::try_write
530    ///
531    /// # Examples
532    ///
533    /// ```no_run
534    /// use tokio::net::unix::pipe;
535    /// use std::io;
536    ///
537    /// #[tokio::main]
538    /// async fn main() -> io::Result<()> {
539    ///     // Open a writing end of a fifo
540    ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
541    ///
542    ///     loop {
543    ///         // Wait for the pipe to be writable
544    ///         tx.writable().await?;
545    ///
546    ///         // Try to write data, this may still fail with `WouldBlock`
547    ///         // if the readiness event is a false positive.
548    ///         match tx.try_write(b"hello world") {
549    ///             Ok(n) => {
550    ///                 break;
551    ///             }
552    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
553    ///                 continue;
554    ///             }
555    ///             Err(e) => {
556    ///                 return Err(e.into());
557    ///             }
558    ///         }
559    ///     }
560    ///
561    ///     Ok(())
562    /// }
563    /// ```
564    pub async fn writable(&self) -> io::Result<()> {
565        self.ready(Interest::WRITABLE).await?;
566        Ok(())
567    }
568
569    /// Polls for write readiness.
570    ///
571    /// If the pipe is not currently ready for writing, this method will
572    /// store a clone of the `Waker` from the provided `Context`. When the pipe
573    /// becomes ready for writing, `Waker::wake` will be called on the waker.
574    ///
575    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
576    /// the `Waker` from the `Context` passed to the most recent call is
577    /// scheduled to receive a wakeup.
578    ///
579    /// This function is intended for cases where creating and pinning a future
580    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
581    /// preferred, as this supports polling from multiple tasks at once.
582    ///
583    /// [`writable`]: Self::writable
584    ///
585    /// # Return value
586    ///
587    /// The function returns:
588    ///
589    /// * `Poll::Pending` if the pipe is not ready for writing.
590    /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
591    /// * `Poll::Ready(Err(e))` if an error is encountered.
592    ///
593    /// # Errors
594    ///
595    /// This function may encounter any standard I/O error except `WouldBlock`.
596    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
597        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
598    }
599
600    /// Tries to write a buffer to the pipe, returning how many bytes were
601    /// written.
602    ///
603    /// The function will attempt to write the entire contents of `buf`, but
604    /// only part of the buffer may be written. If the length of `buf` is not
605    /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
606    /// write is guaranteed to be atomic, i.e. either the entire content of
607    /// `buf` will be written or this method will fail with `WouldBlock`. There
608    /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
609    ///
610    /// This function is usually paired with [`writable`].
611    ///
612    /// [`writable`]: Self::writable
613    ///
614    /// # Return
615    ///
616    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
617    /// number of bytes written. If the pipe is not ready to write data,
618    /// `Err(io::ErrorKind::WouldBlock)` is returned.
619    ///
620    /// # Notes
621    ///
622    /// To avoid unnecessary syscalls, this will only attempt the write
623    /// operation if the OS has informed Tokio that this pipe has become
624    /// writable. Because of this, `try_write()` may fail with a
625    /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
626    /// this pipe has become writable.
627    ///
628    /// # Examples
629    ///
630    /// ```no_run
631    /// use tokio::net::unix::pipe;
632    /// use std::io;
633    ///
634    /// #[tokio::main]
635    /// async fn main() -> io::Result<()> {
636    ///     // Open a writing end of a fifo
637    ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
638    ///
639    ///     loop {
640    ///         // Wait for the pipe to be writable
641    ///         tx.writable().await?;
642    ///
643    ///         // Try to write data, this may still fail with `WouldBlock`
644    ///         // if the readiness event is a false positive.
645    ///         match tx.try_write(b"hello world") {
646    ///             Ok(n) => {
647    ///                 break;
648    ///             }
649    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650    ///                 continue;
651    ///             }
652    ///             Err(e) => {
653    ///                 return Err(e.into());
654    ///             }
655    ///         }
656    ///     }
657    ///
658    ///     Ok(())
659    /// }
660    /// ```
661    ///
662    /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
663    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
664        self.io
665            .registration()
666            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
667    }
668
669    /// Tries to write several buffers to the pipe, returning how many bytes
670    /// were written.
671    ///
672    /// Data is written from each buffer in order, with the final buffer read
673    /// from possible being only partially consumed. This method behaves
674    /// equivalently to a single call to [`try_write()`] with concatenated
675    /// buffers.
676    ///
677    /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
678    /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
679    /// i.e. either the entire contents of buffers will be written or this
680    /// method will fail with `WouldBlock`. There is no such guarantee if the
681    /// total length of buffers is greater than `PIPE_BUF`.
682    ///
683    /// This function is usually paired with [`writable`].
684    ///
685    /// [`try_write()`]: Self::try_write()
686    /// [`writable`]: Self::writable
687    ///
688    /// # Return
689    ///
690    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
691    /// number of bytes written. If the pipe is not ready to write data,
692    /// `Err(io::ErrorKind::WouldBlock)` is returned.
693    ///
694    /// # Notes
695    ///
696    /// To avoid unnecessary syscalls, this will only attempt the write
697    /// operation if the OS has informed Tokio that this pipe has become
698    /// writable. Because of this, `try_write_vectored()` may fail with a
699    /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
700    /// this pipe has become writable.
701    ///
702    /// # Examples
703    ///
704    /// ```no_run
705    /// use tokio::net::unix::pipe;
706    /// use std::io;
707    ///
708    /// #[tokio::main]
709    /// async fn main() -> io::Result<()> {
710    ///     // Open a writing end of a fifo
711    ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
712    ///
713    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
714    ///
715    ///     loop {
716    ///         // Wait for the pipe to be writable
717    ///         tx.writable().await?;
718    ///
719    ///         // Try to write data, this may still fail with `WouldBlock`
720    ///         // if the readiness event is a false positive.
721    ///         match tx.try_write_vectored(&bufs) {
722    ///             Ok(n) => {
723    ///                 break;
724    ///             }
725    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
726    ///                 continue;
727    ///             }
728    ///             Err(e) => {
729    ///                 return Err(e.into());
730    ///             }
731    ///         }
732    ///     }
733    ///
734    ///     Ok(())
735    /// }
736    /// ```
737    ///
738    /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
739    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
740        self.io
741            .registration()
742            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
743    }
744
745    /// Tries to write from the socket using a user-provided IO operation.
746    ///
747    /// If the socket is ready, the provided closure is called. The closure
748    /// should attempt to perform IO operation on the socket by manually
749    /// calling the appropriate syscall. If the operation fails because the
750    /// socket is not actually ready, then the closure should return a
751    /// `WouldBlock` error and the readiness flag is cleared. The return value
752    /// of the closure is then returned by `try_io`.
753    ///
754    /// If the socket is not ready, then the closure is not called
755    /// and a `WouldBlock` error is returned.
756    ///
757    /// The closure should only return a `WouldBlock` error if it has performed
758    /// an IO operation on the socket that failed due to the socket not being
759    /// ready. Returning a `WouldBlock` error in any other situation will
760    /// incorrectly clear the readiness flag, which can cause the socket to
761    /// behave incorrectly.
762    ///
763    /// The closure should not perform the IO operation using any of the methods
764    /// defined on the Tokio `pipe::Sender` type, as this will mess with the
765    /// readiness flag and can cause the socket to behave incorrectly.
766    ///
767    /// Usually, [`writable()`] or [`ready()`] is used with this function.
768    ///
769    /// [`writable()`]: Self::writable()
770    /// [`ready()`]: Self::ready()
771    pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
772        self.io
773            .registration()
774            .try_io(Interest::WRITABLE, || self.io.try_io(f))
775    }
776
777    /// Converts the pipe into an [`OwnedFd`] in blocking mode.
778    ///
779    /// This function will deregister this pipe end from the event loop, set
780    /// it in blocking mode and perform the conversion.
781    pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
782        let fd = self.into_nonblocking_fd()?;
783        set_blocking(&fd)?;
784        Ok(fd)
785    }
786
787    /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
788    ///
789    /// This function will deregister this pipe end from the event loop and
790    /// perform the conversion. The returned file descriptor will be in nonblocking
791    /// mode.
792    pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
793        let mio_pipe = self.io.into_inner()?;
794
795        // Safety: the pipe is now deregistered from the event loop
796        // and we are the only owner of this pipe end.
797        let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
798
799        Ok(owned_fd)
800    }
801}
802
803impl AsyncWrite for Sender {
804    fn poll_write(
805        self: Pin<&mut Self>,
806        cx: &mut Context<'_>,
807        buf: &[u8],
808    ) -> Poll<io::Result<usize>> {
809        self.io.poll_write(cx, buf)
810    }
811
812    fn poll_write_vectored(
813        self: Pin<&mut Self>,
814        cx: &mut Context<'_>,
815        bufs: &[io::IoSlice<'_>],
816    ) -> Poll<io::Result<usize>> {
817        self.io.poll_write_vectored(cx, bufs)
818    }
819
820    fn is_write_vectored(&self) -> bool {
821        true
822    }
823
824    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
825        Poll::Ready(Ok(()))
826    }
827
828    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
829        Poll::Ready(Ok(()))
830    }
831}
832
833impl AsRawFd for Sender {
834    fn as_raw_fd(&self) -> RawFd {
835        self.io.as_raw_fd()
836    }
837}
838
839impl AsFd for Sender {
840    fn as_fd(&self) -> BorrowedFd<'_> {
841        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
842    }
843}
844
845/// Reading end of a Unix pipe.
846///
847/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
848///
849/// # Examples
850///
851/// Receiving messages from a named pipe in a loop:
852///
853/// ```no_run
854/// use tokio::net::unix::pipe;
855/// use tokio::io::{self, AsyncReadExt};
856///
857/// const FIFO_NAME: &str = "path/to/a/fifo";
858///
859/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
860/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
861/// loop {
862///     let mut msg = vec![0; 256];
863///     match rx.read_exact(&mut msg).await {
864///         Ok(_) => {
865///             /* handle the message */
866///         }
867///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
868///             // Writing end has been closed, we should reopen the pipe.
869///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
870///         }
871///         Err(e) => return Err(e.into()),
872///     }
873/// }
874/// # }
875/// ```
876///
877/// On Linux, you can use a `Receiver` in read-write access mode to implement
878/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
879/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
880/// when the writing end is closed. This way, a `Receiver` can asynchronously
881/// wait for the next writer to open the pipe.
882///
883/// You should not use functions waiting for EOF such as [`read_to_end`] with
884/// a `Receiver` in read-write access mode, since it **may wait forever**.
885/// `Receiver` in this mode also holds an open writing end, which prevents
886/// receiving EOF.
887///
888/// To set the read-write access mode you can use `OpenOptions::read_write`.
889/// Note that using read-write access mode with FIFO files is not defined by
890/// the POSIX standard and it is only guaranteed to work on Linux.
891///
892/// ```ignore
893/// use tokio::net::unix::pipe;
894/// use tokio::io::AsyncReadExt;
895/// # use std::error::Error;
896///
897/// const FIFO_NAME: &str = "path/to/a/fifo";
898///
899/// # async fn dox() -> Result<(), Box<dyn Error>> {
900/// let mut rx = pipe::OpenOptions::new()
901///     .read_write(true)
902///     .open_receiver(FIFO_NAME)?;
903/// loop {
904///     let mut msg = vec![0; 256];
905///     rx.read_exact(&mut msg).await?;
906///     /* handle the message */
907/// }
908/// # }
909/// ```
910///
911/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
912#[derive(Debug)]
913pub struct Receiver {
914    io: PollEvented<mio_pipe::Receiver>,
915}
916
917impl Receiver {
918    fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
919        let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
920        Ok(Receiver { io })
921    }
922
923    /// Creates a new `Receiver` from a [`File`].
924    ///
925    /// This function is intended to construct a pipe from a [`File`] representing
926    /// a special FIFO file. It will check if the file is a pipe and has read access,
927    /// set it in non-blocking mode and perform the conversion.
928    ///
929    /// # Errors
930    ///
931    /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
932    /// does not have read access. Also fails with any standard OS error if it occurs.
933    ///
934    /// # Panics
935    ///
936    /// This function panics if it is not called from within a runtime with
937    /// IO enabled.
938    ///
939    /// The runtime is usually set implicitly when this function is called
940    /// from a future driven by a tokio runtime, otherwise runtime can be set
941    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
942    pub fn from_file(file: File) -> io::Result<Receiver> {
943        Receiver::from_owned_fd(file.into())
944    }
945
946    /// Creates a new `Receiver` from an [`OwnedFd`].
947    ///
948    /// This function is intended to construct a pipe from an [`OwnedFd`] representing
949    /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
950    /// is a pipe and has read access, set it in non-blocking mode and perform the
951    /// conversion.
952    ///
953    /// # Errors
954    ///
955    /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
956    /// or it does not have read access. Also fails with any standard OS error if it
957    /// occurs.
958    ///
959    /// # Panics
960    ///
961    /// This function panics if it is not called from within a runtime with
962    /// IO enabled.
963    ///
964    /// The runtime is usually set implicitly when this function is called
965    /// from a future driven by a tokio runtime, otherwise runtime can be set
966    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
967    pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
968        if !is_pipe(owned_fd.as_fd())? {
969            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
970        }
971
972        let flags = get_file_flags(owned_fd.as_fd())?;
973        if has_read_access(flags) {
974            set_nonblocking(owned_fd.as_fd(), flags)?;
975            Receiver::from_owned_fd_unchecked(owned_fd)
976        } else {
977            Err(io::Error::new(
978                io::ErrorKind::InvalidInput,
979                "not in O_RDONLY or O_RDWR access mode",
980            ))
981        }
982    }
983
984    /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
985    ///
986    /// This function is intended to construct a pipe from a File representing
987    /// a special FIFO file. The conversion assumes nothing about the underlying
988    /// file; it is left up to the user to make sure it is opened with read access,
989    /// represents a pipe and is set in non-blocking mode.
990    ///
991    /// # Examples
992    ///
993    /// ```no_run
994    /// use tokio::net::unix::pipe;
995    /// use std::fs::OpenOptions;
996    /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
997    /// # use std::error::Error;
998    ///
999    /// const FIFO_NAME: &str = "path/to/a/fifo";
1000    ///
1001    /// # async fn dox() -> Result<(), Box<dyn Error>> {
1002    /// let file = OpenOptions::new()
1003    ///     .read(true)
1004    ///     .custom_flags(libc::O_NONBLOCK)
1005    ///     .open(FIFO_NAME)?;
1006    /// if file.metadata()?.file_type().is_fifo() {
1007    ///     let rx = pipe::Receiver::from_file_unchecked(file)?;
1008    ///     /* use the Receiver */
1009    /// }
1010    /// # Ok(())
1011    /// # }
1012    /// ```
1013    ///
1014    /// # Panics
1015    ///
1016    /// This function panics if it is not called from within a runtime with
1017    /// IO enabled.
1018    ///
1019    /// The runtime is usually set implicitly when this function is called
1020    /// from a future driven by a tokio runtime, otherwise runtime can be set
1021    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
1022    pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
1023        Receiver::from_owned_fd_unchecked(file.into())
1024    }
1025
1026    /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
1027    ///
1028    /// This function is intended to construct a pipe from an [`OwnedFd`] representing
1029    /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
1030    /// the underlying pipe; it is left up to the user to make sure that the file
1031    /// descriptor represents the reading end of a pipe and the pipe is set in
1032    /// non-blocking mode.
1033    ///
1034    /// # Panics
1035    ///
1036    /// This function panics if it is not called from within a runtime with
1037    /// IO enabled.
1038    ///
1039    /// The runtime is usually set implicitly when this function is called
1040    /// from a future driven by a tokio runtime, otherwise runtime can be set
1041    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
1042    pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
1043        // Safety: OwnedFd represents a valid, open file descriptor.
1044        let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
1045        Receiver::from_mio(mio_rx)
1046    }
1047
1048    /// Waits for any of the requested ready states.
1049    ///
1050    /// This function can be used instead of [`readable()`] to check the returned
1051    /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
1052    ///
1053    /// The function may complete without the pipe being ready. This is a
1054    /// false-positive and attempting an operation will return with
1055    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1056    /// [`Ready`] set, so you should always check the returned value and possibly
1057    /// wait again if the requested states are not set.
1058    ///
1059    /// [`readable()`]: Self::readable
1060    ///
1061    /// # Cancel safety
1062    ///
1063    /// This method is cancel safe. Once a readiness event occurs, the method
1064    /// will continue to return immediately until the readiness event is
1065    /// consumed by an attempt to read that fails with `WouldBlock` or
1066    /// `Poll::Pending`.
1067    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1068        let event = self.io.registration().readiness(interest).await?;
1069        Ok(event.ready)
1070    }
1071
1072    /// Waits for the pipe to become readable.
1073    ///
1074    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1075    /// paired with [`try_read()`].
1076    ///
1077    /// [`try_read()`]: Self::try_read()
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```no_run
1082    /// use tokio::net::unix::pipe;
1083    /// use std::io;
1084    ///
1085    /// #[tokio::main]
1086    /// async fn main() -> io::Result<()> {
1087    ///     // Open a reading end of a fifo
1088    ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1089    ///
1090    ///     let mut msg = vec![0; 1024];
1091    ///
1092    ///     loop {
1093    ///         // Wait for the pipe to be readable
1094    ///         rx.readable().await?;
1095    ///
1096    ///         // Try to read data, this may still fail with `WouldBlock`
1097    ///         // if the readiness event is a false positive.
1098    ///         match rx.try_read(&mut msg) {
1099    ///             Ok(n) => {
1100    ///                 msg.truncate(n);
1101    ///                 break;
1102    ///             }
1103    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1104    ///                 continue;
1105    ///             }
1106    ///             Err(e) => {
1107    ///                 return Err(e.into());
1108    ///             }
1109    ///         }
1110    ///     }
1111    ///
1112    ///     println!("GOT = {:?}", msg);
1113    ///     Ok(())
1114    /// }
1115    /// ```
1116    pub async fn readable(&self) -> io::Result<()> {
1117        self.ready(Interest::READABLE).await?;
1118        Ok(())
1119    }
1120
1121    /// Polls for read readiness.
1122    ///
1123    /// If the pipe is not currently ready for reading, this method will
1124    /// store a clone of the `Waker` from the provided `Context`. When the pipe
1125    /// becomes ready for reading, `Waker::wake` will be called on the waker.
1126    ///
1127    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1128    /// the `Waker` from the `Context` passed to the most recent call is
1129    /// scheduled to receive a wakeup.
1130    ///
1131    /// This function is intended for cases where creating and pinning a future
1132    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1133    /// preferred, as this supports polling from multiple tasks at once.
1134    ///
1135    /// [`readable`]: Self::readable
1136    ///
1137    /// # Return value
1138    ///
1139    /// The function returns:
1140    ///
1141    /// * `Poll::Pending` if the pipe is not ready for reading.
1142    /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1143    /// * `Poll::Ready(Err(e))` if an error is encountered.
1144    ///
1145    /// # Errors
1146    ///
1147    /// This function may encounter any standard I/O error except `WouldBlock`.
1148    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1149        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1150    }
1151
1152    /// Tries to read data from the pipe into the provided buffer, returning how
1153    /// many bytes were read.
1154    ///
1155    /// Reads any pending data from the pipe but does not wait for new data
1156    /// to arrive. On success, returns the number of bytes read. Because
1157    /// `try_read()` is non-blocking, the buffer does not have to be stored by
1158    /// the async task and can exist entirely on the stack.
1159    ///
1160    /// Usually [`readable()`] is used with this function.
1161    ///
1162    /// [`readable()`]: Self::readable()
1163    ///
1164    /// # Return
1165    ///
1166    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1167    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1168    ///
1169    /// 1. The pipe's writing end is closed and will no longer write data.
1170    /// 2. The specified buffer was 0 bytes in length.
1171    ///
1172    /// If the pipe is not ready to read data,
1173    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1174    ///
1175    /// # Notes
1176    ///
1177    /// To avoid unnecessary syscalls, this will only attempt the read
1178    /// operation if the OS has informed Tokio that this pipe has become
1179    /// readable. Because of this, `try_read()` may fail with a
1180    /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1181    /// this pipe has become readable.
1182    ///
1183    /// # Examples
1184    ///
1185    /// ```no_run
1186    /// use tokio::net::unix::pipe;
1187    /// use std::io;
1188    ///
1189    /// #[tokio::main]
1190    /// async fn main() -> io::Result<()> {
1191    ///     // Open a reading end of a fifo
1192    ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1193    ///
1194    ///     let mut msg = vec![0; 1024];
1195    ///
1196    ///     loop {
1197    ///         // Wait for the pipe to be readable
1198    ///         rx.readable().await?;
1199    ///
1200    ///         // Try to read data, this may still fail with `WouldBlock`
1201    ///         // if the readiness event is a false positive.
1202    ///         match rx.try_read(&mut msg) {
1203    ///             Ok(n) => {
1204    ///                 msg.truncate(n);
1205    ///                 break;
1206    ///             }
1207    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1208    ///                 continue;
1209    ///             }
1210    ///             Err(e) => {
1211    ///                 return Err(e.into());
1212    ///             }
1213    ///         }
1214    ///     }
1215    ///
1216    ///     println!("GOT = {:?}", msg);
1217    ///     Ok(())
1218    /// }
1219    /// ```
1220    ///
1221    /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1222    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1223        self.io
1224            .registration()
1225            .try_io(Interest::READABLE, || (&*self.io).read(buf))
1226    }
1227
1228    /// Tries to read data from the pipe into the provided buffers, returning
1229    /// how many bytes were read.
1230    ///
1231    /// Data is copied to fill each buffer in order, with the final buffer
1232    /// written to possibly being only partially filled. This method behaves
1233    /// equivalently to a single call to [`try_read()`] with concatenated
1234    /// buffers.
1235    ///
1236    /// Reads any pending data from the pipe but does not wait for new data
1237    /// to arrive. On success, returns the number of bytes read. Because
1238    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1239    /// stored by the async task and can exist entirely on the stack.
1240    ///
1241    /// Usually, [`readable()`] is used with this function.
1242    ///
1243    /// [`try_read()`]: Self::try_read()
1244    /// [`readable()`]: Self::readable()
1245    ///
1246    /// # Return
1247    ///
1248    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1249    /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1250    /// closed and will no longer write data. If the pipe is not ready to read
1251    /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1252    ///
1253    /// # Notes
1254    ///
1255    /// To avoid unnecessary syscalls, this will only attempt the read
1256    /// operation if the OS has informed Tokio that this pipe has become
1257    /// readable. Because of this, `try_read_vectored()` may fail with a
1258    /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1259    /// this pipe has become readable.
1260    ///
1261    /// # Examples
1262    ///
1263    /// ```no_run
1264    /// use tokio::net::unix::pipe;
1265    /// use std::io;
1266    ///
1267    /// #[tokio::main]
1268    /// async fn main() -> io::Result<()> {
1269    ///     // Open a reading end of a fifo
1270    ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1271    ///
1272    ///     loop {
1273    ///         // Wait for the pipe to be readable
1274    ///         rx.readable().await?;
1275    ///
1276    ///         // Creating the buffer **after** the `await` prevents it from
1277    ///         // being stored in the async task.
1278    ///         let mut buf_a = [0; 512];
1279    ///         let mut buf_b = [0; 1024];
1280    ///         let mut bufs = [
1281    ///             io::IoSliceMut::new(&mut buf_a),
1282    ///             io::IoSliceMut::new(&mut buf_b),
1283    ///         ];
1284    ///
1285    ///         // Try to read data, this may still fail with `WouldBlock`
1286    ///         // if the readiness event is a false positive.
1287    ///         match rx.try_read_vectored(&mut bufs) {
1288    ///             Ok(0) => break,
1289    ///             Ok(n) => {
1290    ///                 println!("read {} bytes", n);
1291    ///             }
1292    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1293    ///                 continue;
1294    ///             }
1295    ///             Err(e) => {
1296    ///                 return Err(e.into());
1297    ///             }
1298    ///         }
1299    ///     }
1300    ///
1301    ///     Ok(())
1302    /// }
1303    /// ```
1304    ///
1305    /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1306    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1307        self.io
1308            .registration()
1309            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1310    }
1311
1312    /// Tries to read to the socket using a user-provided IO operation.
1313    ///
1314    /// If the socket is ready, the provided closure is called. The closure
1315    /// should attempt to perform IO operation on the socket by manually
1316    /// calling the appropriate syscall. If the operation fails because the
1317    /// socket is not actually ready, then the closure should return a
1318    /// `WouldBlock` error and the readiness flag is cleared. The return value
1319    /// of the closure is then returned by `try_io`.
1320    ///
1321    /// If the socket is not ready, then the closure is not called
1322    /// and a `WouldBlock` error is returned.
1323    ///
1324    /// The closure should only return a `WouldBlock` error if it has performed
1325    /// an IO operation on the socket that failed due to the socket not being
1326    /// ready. Returning a `WouldBlock` error in any other situation will
1327    /// incorrectly clear the readiness flag, which can cause the socket to
1328    /// behave incorrectly.
1329    ///
1330    /// The closure should not perform the IO operation using any of the methods
1331    /// defined on the Tokio `pipe::Receiver` type, as this will mess with the
1332    /// readiness flag and can cause the socket to behave incorrectly.
1333    ///
1334    /// Usually, [`readable()`] or [`ready()`] is used with this function.
1335    ///
1336    /// [`readable()`]: Self::readable()
1337    /// [`ready()`]: Self::ready()
1338    pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
1339        self.io
1340            .registration()
1341            .try_io(Interest::READABLE, || self.io.try_io(f))
1342    }
1343
1344    cfg_io_util! {
1345        /// Tries to read data from the pipe into the provided buffer, advancing the
1346        /// buffer's internal cursor, returning how many bytes were read.
1347        ///
1348        /// Reads any pending data from the pipe but does not wait for new data
1349        /// to arrive. On success, returns the number of bytes read. Because
1350        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1351        /// the async task and can exist entirely on the stack.
1352        ///
1353        /// Usually, [`readable()`] or [`ready()`] is used with this function.
1354        ///
1355        /// [`readable()`]: Self::readable
1356        /// [`ready()`]: Self::ready
1357        ///
1358        /// # Return
1359        ///
1360        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1361        /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1362        /// closed and will no longer write data. If the pipe is not ready to read
1363        /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1364        ///
1365        /// # Notes
1366        ///
1367        /// To avoid unnecessary syscalls, this will only attempt the read
1368        /// operation if the OS has informed Tokio that this pipe has become
1369        /// readable. Because of this, `try_read_buf()` may fail with a
1370        /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1371        /// this pipe has become readable.
1372        ///
1373        /// # Examples
1374        ///
1375        /// ```no_run
1376        /// use tokio::net::unix::pipe;
1377        /// use std::io;
1378        ///
1379        /// #[tokio::main]
1380        /// async fn main() -> io::Result<()> {
1381        ///     // Open a reading end of a fifo
1382        ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1383        ///
1384        ///     loop {
1385        ///         // Wait for the pipe to be readable
1386        ///         rx.readable().await?;
1387        ///
1388        ///         let mut buf = Vec::with_capacity(4096);
1389        ///
1390        ///         // Try to read data, this may still fail with `WouldBlock`
1391        ///         // if the readiness event is a false positive.
1392        ///         match rx.try_read_buf(&mut buf) {
1393        ///             Ok(0) => break,
1394        ///             Ok(n) => {
1395        ///                 println!("read {} bytes", n);
1396        ///             }
1397        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1398        ///                 continue;
1399        ///             }
1400        ///             Err(e) => {
1401        ///                 return Err(e.into());
1402        ///             }
1403        ///         }
1404        ///     }
1405        ///
1406        ///     Ok(())
1407        /// }
1408        /// ```
1409        ///
1410        /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1411        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1412            self.io.registration().try_io(Interest::READABLE, || {
1413                use std::io::Read;
1414
1415                let dst = buf.chunk_mut();
1416                let dst =
1417                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1418
1419                // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1420                // which correctly handles reads into uninitialized memory.
1421                let n = (&*self.io).read(dst)?;
1422
1423                unsafe {
1424                    buf.advance_mut(n);
1425                }
1426
1427                Ok(n)
1428            })
1429        }
1430    }
1431
1432    /// Converts the pipe into an [`OwnedFd`] in blocking mode.
1433    ///
1434    /// This function will deregister this pipe end from the event loop, set
1435    /// it in blocking mode and perform the conversion.
1436    pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
1437        let fd = self.into_nonblocking_fd()?;
1438        set_blocking(&fd)?;
1439        Ok(fd)
1440    }
1441
1442    /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
1443    ///
1444    /// This function will deregister this pipe end from the event loop and
1445    /// perform the conversion. Returned file descriptor will be in nonblocking
1446    /// mode.
1447    pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
1448        let mio_pipe = self.io.into_inner()?;
1449
1450        // Safety: the pipe is now deregistered from the event loop
1451        // and we are the only owner of this pipe end.
1452        let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
1453
1454        Ok(owned_fd)
1455    }
1456}
1457
1458impl AsyncRead for Receiver {
1459    fn poll_read(
1460        self: Pin<&mut Self>,
1461        cx: &mut Context<'_>,
1462        buf: &mut ReadBuf<'_>,
1463    ) -> Poll<io::Result<()>> {
1464        // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1465        // which correctly handles reads into uninitialized memory.
1466        unsafe { self.io.poll_read(cx, buf) }
1467    }
1468}
1469
1470impl AsRawFd for Receiver {
1471    fn as_raw_fd(&self) -> RawFd {
1472        self.io.as_raw_fd()
1473    }
1474}
1475
1476impl AsFd for Receiver {
1477    fn as_fd(&self) -> BorrowedFd<'_> {
1478        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1479    }
1480}
1481
1482/// Checks if the file descriptor is a pipe or a FIFO.
1483fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
1484    // Safety: `libc::stat` is C-like struct used for syscalls and all-zero
1485    // byte pattern forms a valid value.
1486    let mut stat: libc::stat = unsafe { std::mem::zeroed() };
1487
1488    // Safety: it's safe to call `fstat` with a valid, open file descriptor
1489    // and a valid pointer to a `stat` struct.
1490    let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
1491
1492    if r == -1 {
1493        Err(io::Error::last_os_error())
1494    } else {
1495        Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
1496    }
1497}
1498
1499/// Gets file descriptor's flags by fcntl.
1500fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
1501    // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1502    let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1503    if flags < 0 {
1504        Err(io::Error::last_os_error())
1505    } else {
1506        Ok(flags)
1507    }
1508}
1509
1510/// Checks for `O_RDONLY` or `O_RDWR` access mode.
1511fn has_read_access(flags: libc::c_int) -> bool {
1512    let mode = flags & libc::O_ACCMODE;
1513    mode == libc::O_RDONLY || mode == libc::O_RDWR
1514}
1515
1516/// Checks for `O_WRONLY` or `O_RDWR` access mode.
1517fn has_write_access(flags: libc::c_int) -> bool {
1518    let mode = flags & libc::O_ACCMODE;
1519    mode == libc::O_WRONLY || mode == libc::O_RDWR
1520}
1521
1522/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
1523fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
1524    let flags = current_flags | libc::O_NONBLOCK;
1525
1526    if flags != current_flags {
1527        // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
1528        // open file descriptor.
1529        let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
1530        if ret < 0 {
1531            return Err(io::Error::last_os_error());
1532        }
1533    }
1534
1535    Ok(())
1536}
1537
1538/// Removes `O_NONBLOCK` from fd's flags.
1539fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
1540    // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1541    let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1542    if previous == -1 {
1543        return Err(io::Error::last_os_error());
1544    }
1545
1546    let new = previous & !libc::O_NONBLOCK;
1547
1548    // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
1549    // open file descriptor.
1550    let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
1551    if r == -1 {
1552        Err(io::Error::last_os_error())
1553    } else {
1554        Ok(())
1555    }
1556}