tokio/net/unix/pipe.rs
1//! Unix pipe types.
2
3use crate::io::interest::Interest;
4use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5
6use mio::unix::pipe as mio_pipe;
7use std::fs::File;
8use std::io::{self, Read, Write};
9use std::os::unix::fs::OpenOptionsExt;
10use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
11use std::path::Path;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15cfg_io_util! {
16 use bytes::BufMut;
17}
18
19/// Creates a new anonymous Unix pipe.
20///
21/// This function will open a new pipe and associate both pipe ends with the default
22/// event loop.
23///
24/// If you need to create a pipe for communication with a spawned process, you can
25/// use [`Stdio::piped()`] instead.
26///
27/// [`Stdio::piped()`]: std::process::Stdio::piped
28///
29/// # Errors
30///
31/// If creating a pipe fails, this function will return with the related OS error.
32///
33/// # Examples
34///
35/// Create a pipe and pass the writing end to a spawned process.
36///
37/// ```no_run
38/// use tokio::net::unix::pipe;
39/// use tokio::process::Command;
40/// # use tokio::io::AsyncReadExt;
41/// # use std::error::Error;
42///
43/// # async fn dox() -> Result<(), Box<dyn Error>> {
44/// let (tx, mut rx) = pipe::pipe()?;
45/// let mut buffer = String::new();
46///
47/// let status = Command::new("echo")
48/// .arg("Hello, world!")
49/// .stdout(tx.into_blocking_fd()?)
50/// .status();
51/// rx.read_to_string(&mut buffer).await?;
52///
53/// assert!(status.await?.success());
54/// assert_eq!(buffer, "Hello, world!\n");
55/// # Ok(())
56/// # }
57/// ```
58///
59/// # Panics
60///
61/// This function panics if it is not called from within a runtime with
62/// IO enabled.
63///
64/// The runtime is usually set implicitly when this function is called
65/// from a future driven by a tokio runtime, otherwise runtime can be set
66/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
67pub fn pipe() -> io::Result<(Sender, Receiver)> {
68 let (tx, rx) = mio_pipe::new()?;
69 Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
70}
71
72/// Options and flags which can be used to configure how a FIFO file is opened.
73///
74/// This builder allows configuring how to create a pipe end from a FIFO file.
75/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
76/// then chain calls to methods to set each option, then call either
77/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
78/// are trying to open. This will give you a [`io::Result`] with a pipe end
79/// inside that you can further operate on.
80///
81/// [`new`]: OpenOptions::new
82/// [`open_receiver`]: OpenOptions::open_receiver
83/// [`open_sender`]: OpenOptions::open_sender
84///
85/// # Examples
86///
87/// Opening a pair of pipe ends from a FIFO file:
88///
89/// ```no_run
90/// use tokio::net::unix::pipe;
91/// # use std::error::Error;
92///
93/// const FIFO_NAME: &str = "path/to/a/fifo";
94///
95/// # async fn dox() -> Result<(), Box<dyn Error>> {
96/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
97/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
98/// # Ok(())
99/// # }
100/// ```
101///
102/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
103///
104/// ```ignore
105/// use tokio::net::unix::pipe;
106/// use nix::{unistd::mkfifo, sys::stat::Mode};
107/// # use std::error::Error;
108///
109/// // Our program has exclusive access to this path.
110/// const FIFO_NAME: &str = "path/to/a/new/fifo";
111///
112/// # async fn dox() -> Result<(), Box<dyn Error>> {
113/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
114/// let tx = pipe::OpenOptions::new()
115/// .read_write(true)
116/// .unchecked(true)
117/// .open_sender(FIFO_NAME)?;
118/// # Ok(())
119/// # }
120/// ```
121#[derive(Clone, Debug)]
122pub struct OpenOptions {
123 #[cfg(any(target_os = "linux", target_os = "android"))]
124 read_write: bool,
125 unchecked: bool,
126}
127
128impl OpenOptions {
129 /// Creates a blank new set of options ready for configuration.
130 ///
131 /// All options are initially set to `false`.
132 pub fn new() -> OpenOptions {
133 OpenOptions {
134 #[cfg(any(target_os = "linux", target_os = "android"))]
135 read_write: false,
136 unchecked: false,
137 }
138 }
139
140 /// Sets the option for read-write access.
141 ///
142 /// This option, when true, will indicate that a FIFO file will be opened
143 /// in read-write access mode. This operation is not defined by the POSIX
144 /// standard and is only guaranteed to work on Linux.
145 ///
146 /// # Examples
147 ///
148 /// Opening a [`Sender`] even if there are no open reading ends:
149 ///
150 /// ```ignore
151 /// use tokio::net::unix::pipe;
152 ///
153 /// let tx = pipe::OpenOptions::new()
154 /// .read_write(true)
155 /// .open_sender("path/to/a/fifo");
156 /// ```
157 ///
158 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
159 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
160 /// pipe close the FIFO file.
161 ///
162 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
163 ///
164 /// ```ignore
165 /// use tokio::net::unix::pipe;
166 ///
167 /// let tx = pipe::OpenOptions::new()
168 /// .read_write(true)
169 /// .open_receiver("path/to/a/fifo");
170 /// ```
171 #[cfg(any(target_os = "linux", target_os = "android"))]
172 #[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
173 pub fn read_write(&mut self, value: bool) -> &mut Self {
174 self.read_write = value;
175 self
176 }
177
178 /// Sets the option to skip the check for FIFO file type.
179 ///
180 /// By default, [`open_receiver`] and [`open_sender`] functions will check
181 /// if the opened file is a FIFO file. Set this option to `true` if you are
182 /// sure the file is a FIFO file.
183 ///
184 /// [`open_receiver`]: OpenOptions::open_receiver
185 /// [`open_sender`]: OpenOptions::open_sender
186 ///
187 /// # Examples
188 ///
189 /// ```no_run
190 /// use tokio::net::unix::pipe;
191 /// use nix::{unistd::mkfifo, sys::stat::Mode};
192 /// # use std::error::Error;
193 ///
194 /// // Our program has exclusive access to this path.
195 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
196 ///
197 /// # async fn dox() -> Result<(), Box<dyn Error>> {
198 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
199 /// let rx = pipe::OpenOptions::new()
200 /// .unchecked(true)
201 /// .open_receiver(FIFO_NAME)?;
202 /// # Ok(())
203 /// # }
204 /// ```
205 pub fn unchecked(&mut self, value: bool) -> &mut Self {
206 self.unchecked = value;
207 self
208 }
209
210 /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
211 ///
212 /// This function will open the FIFO file at the specified path, possibly
213 /// check if it is a pipe, and associate the pipe with the default event
214 /// loop for reading.
215 ///
216 /// # Errors
217 ///
218 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
219 /// This function may also fail with other standard OS errors.
220 ///
221 /// # Panics
222 ///
223 /// This function panics if it is not called from within a runtime with
224 /// IO enabled.
225 ///
226 /// The runtime is usually set implicitly when this function is called
227 /// from a future driven by a tokio runtime, otherwise runtime can be set
228 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
229 pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
230 let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
231 Receiver::from_file_unchecked(file)
232 }
233
234 /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
235 ///
236 /// This function will open the FIFO file at the specified path, possibly
237 /// check if it is a pipe, and associate the pipe with the default event
238 /// loop for writing.
239 ///
240 /// # Errors
241 ///
242 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
243 /// If the file is not opened in read-write access mode and the file is not
244 /// currently open for reading, this function will fail with `ENXIO`.
245 /// This function may also fail with other standard OS errors.
246 ///
247 /// # Panics
248 ///
249 /// This function panics if it is not called from within a runtime with
250 /// IO enabled.
251 ///
252 /// The runtime is usually set implicitly when this function is called
253 /// from a future driven by a tokio runtime, otherwise runtime can be set
254 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
255 pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
256 let file = self.open(path.as_ref(), PipeEnd::Sender)?;
257 Sender::from_file_unchecked(file)
258 }
259
260 fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
261 let mut options = std::fs::OpenOptions::new();
262 options
263 .read(pipe_end == PipeEnd::Receiver)
264 .write(pipe_end == PipeEnd::Sender)
265 .custom_flags(libc::O_NONBLOCK);
266
267 #[cfg(any(target_os = "linux", target_os = "android"))]
268 if self.read_write {
269 options.read(true).write(true);
270 }
271
272 let file = options.open(path)?;
273
274 if !self.unchecked && !is_pipe(file.as_fd())? {
275 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
276 }
277
278 Ok(file)
279 }
280}
281
282impl Default for OpenOptions {
283 fn default() -> OpenOptions {
284 OpenOptions::new()
285 }
286}
287
288#[derive(Clone, Copy, PartialEq, Eq, Debug)]
289enum PipeEnd {
290 Sender,
291 Receiver,
292}
293
294/// Writing end of a Unix pipe.
295///
296/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
297///
298/// Opening a named pipe for writing involves a few steps.
299/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
300/// different things:
301///
302/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
303/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
304/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
305/// Sleep for a while and try again.
306/// * Other OS errors not specific to opening FIFO files.
307///
308/// Opening a `Sender` from a FIFO file should look like this:
309///
310/// ```no_run
311/// use tokio::net::unix::pipe;
312/// use tokio::time::{self, Duration};
313///
314/// const FIFO_NAME: &str = "path/to/a/fifo";
315///
316/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
317/// // Wait for a reader to open the file.
318/// let tx = loop {
319/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
320/// Ok(tx) => break tx,
321/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
322/// Err(e) => return Err(e.into()),
323/// }
324///
325/// time::sleep(Duration::from_millis(50)).await;
326/// };
327/// # Ok(())
328/// # }
329/// ```
330///
331/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
332/// loop. This is done by opening a named pipe in read-write access mode with
333/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
334/// both a writing end and a reading end, and the latter allows to open a FIFO
335/// without [`ENXIO`] error since the pipe is open for reading as well.
336///
337/// `Sender` cannot be used to read from a pipe, so in practice the read access
338/// is only used when a FIFO is opened. However, using a `Sender` in read-write
339/// mode **may lead to lost data**, because written data will be dropped by the
340/// system as soon as all pipe ends are closed. To avoid lost data you have to
341/// make sure that a reading end has been opened before dropping a `Sender`.
342///
343/// Note that using read-write access mode with FIFO files is not defined by
344/// the POSIX standard and it is only guaranteed to work on Linux.
345///
346/// ```ignore
347/// use tokio::io::AsyncWriteExt;
348/// use tokio::net::unix::pipe;
349///
350/// const FIFO_NAME: &str = "path/to/a/fifo";
351///
352/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
353/// let mut tx = pipe::OpenOptions::new()
354/// .read_write(true)
355/// .open_sender(FIFO_NAME)?;
356///
357/// // Asynchronously write to the pipe before a reader.
358/// tx.write_all(b"hello world").await?;
359/// # Ok(())
360/// # }
361/// ```
362///
363/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
364#[derive(Debug)]
365pub struct Sender {
366 io: PollEvented<mio_pipe::Sender>,
367}
368
369impl Sender {
370 fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
371 let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
372 Ok(Sender { io })
373 }
374
375 /// Creates a new `Sender` from a [`File`].
376 ///
377 /// This function is intended to construct a pipe from a [`File`] representing
378 /// a special FIFO file. It will check if the file is a pipe and has write access,
379 /// set it in non-blocking mode and perform the conversion.
380 ///
381 /// # Errors
382 ///
383 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
384 /// does not have write access. Also fails with any standard OS error if it occurs.
385 ///
386 /// # Panics
387 ///
388 /// This function panics if it is not called from within a runtime with
389 /// IO enabled.
390 ///
391 /// The runtime is usually set implicitly when this function is called
392 /// from a future driven by a tokio runtime, otherwise runtime can be set
393 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
394 pub fn from_file(file: File) -> io::Result<Sender> {
395 Sender::from_owned_fd(file.into())
396 }
397
398 /// Creates a new `Sender` from an [`OwnedFd`].
399 ///
400 /// This function is intended to construct a pipe from an [`OwnedFd`] representing
401 /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
402 /// is a pipe and has write access, set it in non-blocking mode and perform the
403 /// conversion.
404 ///
405 /// # Errors
406 ///
407 /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
408 /// or it does not have write access. Also fails with any standard OS error if it
409 /// occurs.
410 ///
411 /// # Panics
412 ///
413 /// This function panics if it is not called from within a runtime with
414 /// IO enabled.
415 ///
416 /// The runtime is usually set implicitly when this function is called
417 /// from a future driven by a tokio runtime, otherwise runtime can be set
418 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
419 pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
420 if !is_pipe(owned_fd.as_fd())? {
421 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
422 }
423
424 let flags = get_file_flags(owned_fd.as_fd())?;
425 if has_write_access(flags) {
426 set_nonblocking(owned_fd.as_fd(), flags)?;
427 Sender::from_owned_fd_unchecked(owned_fd)
428 } else {
429 Err(io::Error::new(
430 io::ErrorKind::InvalidInput,
431 "not in O_WRONLY or O_RDWR access mode",
432 ))
433 }
434 }
435
436 /// Creates a new `Sender` from a [`File`] without checking pipe properties.
437 ///
438 /// This function is intended to construct a pipe from a File representing
439 /// a special FIFO file. The conversion assumes nothing about the underlying
440 /// file; it is left up to the user to make sure it is opened with write access,
441 /// represents a pipe and is set in non-blocking mode.
442 ///
443 /// # Examples
444 ///
445 /// ```no_run
446 /// use tokio::net::unix::pipe;
447 /// use std::fs::OpenOptions;
448 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
449 /// # use std::error::Error;
450 ///
451 /// const FIFO_NAME: &str = "path/to/a/fifo";
452 ///
453 /// # async fn dox() -> Result<(), Box<dyn Error>> {
454 /// let file = OpenOptions::new()
455 /// .write(true)
456 /// .custom_flags(libc::O_NONBLOCK)
457 /// .open(FIFO_NAME)?;
458 /// if file.metadata()?.file_type().is_fifo() {
459 /// let tx = pipe::Sender::from_file_unchecked(file)?;
460 /// /* use the Sender */
461 /// }
462 /// # Ok(())
463 /// # }
464 /// ```
465 ///
466 /// # Panics
467 ///
468 /// This function panics if it is not called from within a runtime with
469 /// IO enabled.
470 ///
471 /// The runtime is usually set implicitly when this function is called
472 /// from a future driven by a tokio runtime, otherwise runtime can be set
473 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
474 pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
475 Sender::from_owned_fd_unchecked(file.into())
476 }
477
478 /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
479 ///
480 /// This function is intended to construct a pipe from an [`OwnedFd`] representing
481 /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
482 /// the underlying pipe; it is left up to the user to make sure that the file
483 /// descriptor represents the writing end of a pipe and the pipe is set in
484 /// non-blocking mode.
485 ///
486 /// # Panics
487 ///
488 /// This function panics if it is not called from within a runtime with
489 /// IO enabled.
490 ///
491 /// The runtime is usually set implicitly when this function is called
492 /// from a future driven by a tokio runtime, otherwise runtime can be set
493 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
494 pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
495 // Safety: OwnedFd represents a valid, open file descriptor.
496 let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
497 Sender::from_mio(mio_tx)
498 }
499
500 /// Waits for any of the requested ready states.
501 ///
502 /// This function can be used instead of [`writable()`] to check the returned
503 /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
504 ///
505 /// The function may complete without the pipe being ready. This is a
506 /// false-positive and attempting an operation will return with
507 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
508 /// [`Ready`] set, so you should always check the returned value and possibly
509 /// wait again if the requested states are not set.
510 ///
511 /// [`writable()`]: Self::writable
512 ///
513 /// # Cancel safety
514 ///
515 /// This method is cancel safe. Once a readiness event occurs, the method
516 /// will continue to return immediately until the readiness event is
517 /// consumed by an attempt to write that fails with `WouldBlock` or
518 /// `Poll::Pending`.
519 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
520 let event = self.io.registration().readiness(interest).await?;
521 Ok(event.ready)
522 }
523
524 /// Waits for the pipe to become writable.
525 ///
526 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
527 /// paired with [`try_write()`].
528 ///
529 /// [`try_write()`]: Self::try_write
530 ///
531 /// # Examples
532 ///
533 /// ```no_run
534 /// use tokio::net::unix::pipe;
535 /// use std::io;
536 ///
537 /// #[tokio::main]
538 /// async fn main() -> io::Result<()> {
539 /// // Open a writing end of a fifo
540 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
541 ///
542 /// loop {
543 /// // Wait for the pipe to be writable
544 /// tx.writable().await?;
545 ///
546 /// // Try to write data, this may still fail with `WouldBlock`
547 /// // if the readiness event is a false positive.
548 /// match tx.try_write(b"hello world") {
549 /// Ok(n) => {
550 /// break;
551 /// }
552 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
553 /// continue;
554 /// }
555 /// Err(e) => {
556 /// return Err(e.into());
557 /// }
558 /// }
559 /// }
560 ///
561 /// Ok(())
562 /// }
563 /// ```
564 pub async fn writable(&self) -> io::Result<()> {
565 self.ready(Interest::WRITABLE).await?;
566 Ok(())
567 }
568
569 /// Polls for write readiness.
570 ///
571 /// If the pipe is not currently ready for writing, this method will
572 /// store a clone of the `Waker` from the provided `Context`. When the pipe
573 /// becomes ready for writing, `Waker::wake` will be called on the waker.
574 ///
575 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
576 /// the `Waker` from the `Context` passed to the most recent call is
577 /// scheduled to receive a wakeup.
578 ///
579 /// This function is intended for cases where creating and pinning a future
580 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
581 /// preferred, as this supports polling from multiple tasks at once.
582 ///
583 /// [`writable`]: Self::writable
584 ///
585 /// # Return value
586 ///
587 /// The function returns:
588 ///
589 /// * `Poll::Pending` if the pipe is not ready for writing.
590 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
591 /// * `Poll::Ready(Err(e))` if an error is encountered.
592 ///
593 /// # Errors
594 ///
595 /// This function may encounter any standard I/O error except `WouldBlock`.
596 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
597 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
598 }
599
600 /// Tries to write a buffer to the pipe, returning how many bytes were
601 /// written.
602 ///
603 /// The function will attempt to write the entire contents of `buf`, but
604 /// only part of the buffer may be written. If the length of `buf` is not
605 /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
606 /// write is guaranteed to be atomic, i.e. either the entire content of
607 /// `buf` will be written or this method will fail with `WouldBlock`. There
608 /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
609 ///
610 /// This function is usually paired with [`writable`].
611 ///
612 /// [`writable`]: Self::writable
613 ///
614 /// # Return
615 ///
616 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
617 /// number of bytes written. If the pipe is not ready to write data,
618 /// `Err(io::ErrorKind::WouldBlock)` is returned.
619 ///
620 /// # Notes
621 ///
622 /// To avoid unnecessary syscalls, this will only attempt the write
623 /// operation if the OS has informed Tokio that this pipe has become
624 /// writable. Because of this, `try_write()` may fail with a
625 /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
626 /// this pipe has become writable.
627 ///
628 /// # Examples
629 ///
630 /// ```no_run
631 /// use tokio::net::unix::pipe;
632 /// use std::io;
633 ///
634 /// #[tokio::main]
635 /// async fn main() -> io::Result<()> {
636 /// // Open a writing end of a fifo
637 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
638 ///
639 /// loop {
640 /// // Wait for the pipe to be writable
641 /// tx.writable().await?;
642 ///
643 /// // Try to write data, this may still fail with `WouldBlock`
644 /// // if the readiness event is a false positive.
645 /// match tx.try_write(b"hello world") {
646 /// Ok(n) => {
647 /// break;
648 /// }
649 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650 /// continue;
651 /// }
652 /// Err(e) => {
653 /// return Err(e.into());
654 /// }
655 /// }
656 /// }
657 ///
658 /// Ok(())
659 /// }
660 /// ```
661 ///
662 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
663 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
664 self.io
665 .registration()
666 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
667 }
668
669 /// Tries to write several buffers to the pipe, returning how many bytes
670 /// were written.
671 ///
672 /// Data is written from each buffer in order, with the final buffer read
673 /// from possible being only partially consumed. This method behaves
674 /// equivalently to a single call to [`try_write()`] with concatenated
675 /// buffers.
676 ///
677 /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
678 /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
679 /// i.e. either the entire contents of buffers will be written or this
680 /// method will fail with `WouldBlock`. There is no such guarantee if the
681 /// total length of buffers is greater than `PIPE_BUF`.
682 ///
683 /// This function is usually paired with [`writable`].
684 ///
685 /// [`try_write()`]: Self::try_write()
686 /// [`writable`]: Self::writable
687 ///
688 /// # Return
689 ///
690 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
691 /// number of bytes written. If the pipe is not ready to write data,
692 /// `Err(io::ErrorKind::WouldBlock)` is returned.
693 ///
694 /// # Notes
695 ///
696 /// To avoid unnecessary syscalls, this will only attempt the write
697 /// operation if the OS has informed Tokio that this pipe has become
698 /// writable. Because of this, `try_write_vectored()` may fail with a
699 /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
700 /// this pipe has become writable.
701 ///
702 /// # Examples
703 ///
704 /// ```no_run
705 /// use tokio::net::unix::pipe;
706 /// use std::io;
707 ///
708 /// #[tokio::main]
709 /// async fn main() -> io::Result<()> {
710 /// // Open a writing end of a fifo
711 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
712 ///
713 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
714 ///
715 /// loop {
716 /// // Wait for the pipe to be writable
717 /// tx.writable().await?;
718 ///
719 /// // Try to write data, this may still fail with `WouldBlock`
720 /// // if the readiness event is a false positive.
721 /// match tx.try_write_vectored(&bufs) {
722 /// Ok(n) => {
723 /// break;
724 /// }
725 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
726 /// continue;
727 /// }
728 /// Err(e) => {
729 /// return Err(e.into());
730 /// }
731 /// }
732 /// }
733 ///
734 /// Ok(())
735 /// }
736 /// ```
737 ///
738 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
739 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
740 self.io
741 .registration()
742 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
743 }
744
745 /// Tries to write from the socket using a user-provided IO operation.
746 ///
747 /// If the socket is ready, the provided closure is called. The closure
748 /// should attempt to perform IO operation on the socket by manually
749 /// calling the appropriate syscall. If the operation fails because the
750 /// socket is not actually ready, then the closure should return a
751 /// `WouldBlock` error and the readiness flag is cleared. The return value
752 /// of the closure is then returned by `try_io`.
753 ///
754 /// If the socket is not ready, then the closure is not called
755 /// and a `WouldBlock` error is returned.
756 ///
757 /// The closure should only return a `WouldBlock` error if it has performed
758 /// an IO operation on the socket that failed due to the socket not being
759 /// ready. Returning a `WouldBlock` error in any other situation will
760 /// incorrectly clear the readiness flag, which can cause the socket to
761 /// behave incorrectly.
762 ///
763 /// The closure should not perform the IO operation using any of the methods
764 /// defined on the Tokio `pipe::Sender` type, as this will mess with the
765 /// readiness flag and can cause the socket to behave incorrectly.
766 ///
767 /// Usually, [`writable()`] or [`ready()`] is used with this function.
768 ///
769 /// [`writable()`]: Self::writable()
770 /// [`ready()`]: Self::ready()
771 pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
772 self.io
773 .registration()
774 .try_io(Interest::WRITABLE, || self.io.try_io(f))
775 }
776
777 /// Converts the pipe into an [`OwnedFd`] in blocking mode.
778 ///
779 /// This function will deregister this pipe end from the event loop, set
780 /// it in blocking mode and perform the conversion.
781 pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
782 let fd = self.into_nonblocking_fd()?;
783 set_blocking(&fd)?;
784 Ok(fd)
785 }
786
787 /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
788 ///
789 /// This function will deregister this pipe end from the event loop and
790 /// perform the conversion. The returned file descriptor will be in nonblocking
791 /// mode.
792 pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
793 let mio_pipe = self.io.into_inner()?;
794
795 // Safety: the pipe is now deregistered from the event loop
796 // and we are the only owner of this pipe end.
797 let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
798
799 Ok(owned_fd)
800 }
801}
802
803impl AsyncWrite for Sender {
804 fn poll_write(
805 self: Pin<&mut Self>,
806 cx: &mut Context<'_>,
807 buf: &[u8],
808 ) -> Poll<io::Result<usize>> {
809 self.io.poll_write(cx, buf)
810 }
811
812 fn poll_write_vectored(
813 self: Pin<&mut Self>,
814 cx: &mut Context<'_>,
815 bufs: &[io::IoSlice<'_>],
816 ) -> Poll<io::Result<usize>> {
817 self.io.poll_write_vectored(cx, bufs)
818 }
819
820 fn is_write_vectored(&self) -> bool {
821 true
822 }
823
824 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
825 Poll::Ready(Ok(()))
826 }
827
828 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
829 Poll::Ready(Ok(()))
830 }
831}
832
833impl AsRawFd for Sender {
834 fn as_raw_fd(&self) -> RawFd {
835 self.io.as_raw_fd()
836 }
837}
838
839impl AsFd for Sender {
840 fn as_fd(&self) -> BorrowedFd<'_> {
841 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
842 }
843}
844
845/// Reading end of a Unix pipe.
846///
847/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
848///
849/// # Examples
850///
851/// Receiving messages from a named pipe in a loop:
852///
853/// ```no_run
854/// use tokio::net::unix::pipe;
855/// use tokio::io::{self, AsyncReadExt};
856///
857/// const FIFO_NAME: &str = "path/to/a/fifo";
858///
859/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
860/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
861/// loop {
862/// let mut msg = vec![0; 256];
863/// match rx.read_exact(&mut msg).await {
864/// Ok(_) => {
865/// /* handle the message */
866/// }
867/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
868/// // Writing end has been closed, we should reopen the pipe.
869/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
870/// }
871/// Err(e) => return Err(e.into()),
872/// }
873/// }
874/// # }
875/// ```
876///
877/// On Linux, you can use a `Receiver` in read-write access mode to implement
878/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
879/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
880/// when the writing end is closed. This way, a `Receiver` can asynchronously
881/// wait for the next writer to open the pipe.
882///
883/// You should not use functions waiting for EOF such as [`read_to_end`] with
884/// a `Receiver` in read-write access mode, since it **may wait forever**.
885/// `Receiver` in this mode also holds an open writing end, which prevents
886/// receiving EOF.
887///
888/// To set the read-write access mode you can use `OpenOptions::read_write`.
889/// Note that using read-write access mode with FIFO files is not defined by
890/// the POSIX standard and it is only guaranteed to work on Linux.
891///
892/// ```ignore
893/// use tokio::net::unix::pipe;
894/// use tokio::io::AsyncReadExt;
895/// # use std::error::Error;
896///
897/// const FIFO_NAME: &str = "path/to/a/fifo";
898///
899/// # async fn dox() -> Result<(), Box<dyn Error>> {
900/// let mut rx = pipe::OpenOptions::new()
901/// .read_write(true)
902/// .open_receiver(FIFO_NAME)?;
903/// loop {
904/// let mut msg = vec![0; 256];
905/// rx.read_exact(&mut msg).await?;
906/// /* handle the message */
907/// }
908/// # }
909/// ```
910///
911/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
912#[derive(Debug)]
913pub struct Receiver {
914 io: PollEvented<mio_pipe::Receiver>,
915}
916
917impl Receiver {
918 fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
919 let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
920 Ok(Receiver { io })
921 }
922
923 /// Creates a new `Receiver` from a [`File`].
924 ///
925 /// This function is intended to construct a pipe from a [`File`] representing
926 /// a special FIFO file. It will check if the file is a pipe and has read access,
927 /// set it in non-blocking mode and perform the conversion.
928 ///
929 /// # Errors
930 ///
931 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
932 /// does not have read access. Also fails with any standard OS error if it occurs.
933 ///
934 /// # Panics
935 ///
936 /// This function panics if it is not called from within a runtime with
937 /// IO enabled.
938 ///
939 /// The runtime is usually set implicitly when this function is called
940 /// from a future driven by a tokio runtime, otherwise runtime can be set
941 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
942 pub fn from_file(file: File) -> io::Result<Receiver> {
943 Receiver::from_owned_fd(file.into())
944 }
945
946 /// Creates a new `Receiver` from an [`OwnedFd`].
947 ///
948 /// This function is intended to construct a pipe from an [`OwnedFd`] representing
949 /// an anonymous pipe or a special FIFO file. It will check if the file descriptor
950 /// is a pipe and has read access, set it in non-blocking mode and perform the
951 /// conversion.
952 ///
953 /// # Errors
954 ///
955 /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
956 /// or it does not have read access. Also fails with any standard OS error if it
957 /// occurs.
958 ///
959 /// # Panics
960 ///
961 /// This function panics if it is not called from within a runtime with
962 /// IO enabled.
963 ///
964 /// The runtime is usually set implicitly when this function is called
965 /// from a future driven by a tokio runtime, otherwise runtime can be set
966 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
967 pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
968 if !is_pipe(owned_fd.as_fd())? {
969 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
970 }
971
972 let flags = get_file_flags(owned_fd.as_fd())?;
973 if has_read_access(flags) {
974 set_nonblocking(owned_fd.as_fd(), flags)?;
975 Receiver::from_owned_fd_unchecked(owned_fd)
976 } else {
977 Err(io::Error::new(
978 io::ErrorKind::InvalidInput,
979 "not in O_RDONLY or O_RDWR access mode",
980 ))
981 }
982 }
983
984 /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
985 ///
986 /// This function is intended to construct a pipe from a File representing
987 /// a special FIFO file. The conversion assumes nothing about the underlying
988 /// file; it is left up to the user to make sure it is opened with read access,
989 /// represents a pipe and is set in non-blocking mode.
990 ///
991 /// # Examples
992 ///
993 /// ```no_run
994 /// use tokio::net::unix::pipe;
995 /// use std::fs::OpenOptions;
996 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
997 /// # use std::error::Error;
998 ///
999 /// const FIFO_NAME: &str = "path/to/a/fifo";
1000 ///
1001 /// # async fn dox() -> Result<(), Box<dyn Error>> {
1002 /// let file = OpenOptions::new()
1003 /// .read(true)
1004 /// .custom_flags(libc::O_NONBLOCK)
1005 /// .open(FIFO_NAME)?;
1006 /// if file.metadata()?.file_type().is_fifo() {
1007 /// let rx = pipe::Receiver::from_file_unchecked(file)?;
1008 /// /* use the Receiver */
1009 /// }
1010 /// # Ok(())
1011 /// # }
1012 /// ```
1013 ///
1014 /// # Panics
1015 ///
1016 /// This function panics if it is not called from within a runtime with
1017 /// IO enabled.
1018 ///
1019 /// The runtime is usually set implicitly when this function is called
1020 /// from a future driven by a tokio runtime, otherwise runtime can be set
1021 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
1022 pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
1023 Receiver::from_owned_fd_unchecked(file.into())
1024 }
1025
1026 /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
1027 ///
1028 /// This function is intended to construct a pipe from an [`OwnedFd`] representing
1029 /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
1030 /// the underlying pipe; it is left up to the user to make sure that the file
1031 /// descriptor represents the reading end of a pipe and the pipe is set in
1032 /// non-blocking mode.
1033 ///
1034 /// # Panics
1035 ///
1036 /// This function panics if it is not called from within a runtime with
1037 /// IO enabled.
1038 ///
1039 /// The runtime is usually set implicitly when this function is called
1040 /// from a future driven by a tokio runtime, otherwise runtime can be set
1041 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
1042 pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
1043 // Safety: OwnedFd represents a valid, open file descriptor.
1044 let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
1045 Receiver::from_mio(mio_rx)
1046 }
1047
1048 /// Waits for any of the requested ready states.
1049 ///
1050 /// This function can be used instead of [`readable()`] to check the returned
1051 /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
1052 ///
1053 /// The function may complete without the pipe being ready. This is a
1054 /// false-positive and attempting an operation will return with
1055 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1056 /// [`Ready`] set, so you should always check the returned value and possibly
1057 /// wait again if the requested states are not set.
1058 ///
1059 /// [`readable()`]: Self::readable
1060 ///
1061 /// # Cancel safety
1062 ///
1063 /// This method is cancel safe. Once a readiness event occurs, the method
1064 /// will continue to return immediately until the readiness event is
1065 /// consumed by an attempt to read that fails with `WouldBlock` or
1066 /// `Poll::Pending`.
1067 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1068 let event = self.io.registration().readiness(interest).await?;
1069 Ok(event.ready)
1070 }
1071
1072 /// Waits for the pipe to become readable.
1073 ///
1074 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1075 /// paired with [`try_read()`].
1076 ///
1077 /// [`try_read()`]: Self::try_read()
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```no_run
1082 /// use tokio::net::unix::pipe;
1083 /// use std::io;
1084 ///
1085 /// #[tokio::main]
1086 /// async fn main() -> io::Result<()> {
1087 /// // Open a reading end of a fifo
1088 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1089 ///
1090 /// let mut msg = vec![0; 1024];
1091 ///
1092 /// loop {
1093 /// // Wait for the pipe to be readable
1094 /// rx.readable().await?;
1095 ///
1096 /// // Try to read data, this may still fail with `WouldBlock`
1097 /// // if the readiness event is a false positive.
1098 /// match rx.try_read(&mut msg) {
1099 /// Ok(n) => {
1100 /// msg.truncate(n);
1101 /// break;
1102 /// }
1103 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1104 /// continue;
1105 /// }
1106 /// Err(e) => {
1107 /// return Err(e.into());
1108 /// }
1109 /// }
1110 /// }
1111 ///
1112 /// println!("GOT = {:?}", msg);
1113 /// Ok(())
1114 /// }
1115 /// ```
1116 pub async fn readable(&self) -> io::Result<()> {
1117 self.ready(Interest::READABLE).await?;
1118 Ok(())
1119 }
1120
1121 /// Polls for read readiness.
1122 ///
1123 /// If the pipe is not currently ready for reading, this method will
1124 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1125 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1126 ///
1127 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1128 /// the `Waker` from the `Context` passed to the most recent call is
1129 /// scheduled to receive a wakeup.
1130 ///
1131 /// This function is intended for cases where creating and pinning a future
1132 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1133 /// preferred, as this supports polling from multiple tasks at once.
1134 ///
1135 /// [`readable`]: Self::readable
1136 ///
1137 /// # Return value
1138 ///
1139 /// The function returns:
1140 ///
1141 /// * `Poll::Pending` if the pipe is not ready for reading.
1142 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1143 /// * `Poll::Ready(Err(e))` if an error is encountered.
1144 ///
1145 /// # Errors
1146 ///
1147 /// This function may encounter any standard I/O error except `WouldBlock`.
1148 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1149 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1150 }
1151
1152 /// Tries to read data from the pipe into the provided buffer, returning how
1153 /// many bytes were read.
1154 ///
1155 /// Reads any pending data from the pipe but does not wait for new data
1156 /// to arrive. On success, returns the number of bytes read. Because
1157 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1158 /// the async task and can exist entirely on the stack.
1159 ///
1160 /// Usually [`readable()`] is used with this function.
1161 ///
1162 /// [`readable()`]: Self::readable()
1163 ///
1164 /// # Return
1165 ///
1166 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1167 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1168 ///
1169 /// 1. The pipe's writing end is closed and will no longer write data.
1170 /// 2. The specified buffer was 0 bytes in length.
1171 ///
1172 /// If the pipe is not ready to read data,
1173 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1174 ///
1175 /// # Notes
1176 ///
1177 /// To avoid unnecessary syscalls, this will only attempt the read
1178 /// operation if the OS has informed Tokio that this pipe has become
1179 /// readable. Because of this, `try_read()` may fail with a
1180 /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1181 /// this pipe has become readable.
1182 ///
1183 /// # Examples
1184 ///
1185 /// ```no_run
1186 /// use tokio::net::unix::pipe;
1187 /// use std::io;
1188 ///
1189 /// #[tokio::main]
1190 /// async fn main() -> io::Result<()> {
1191 /// // Open a reading end of a fifo
1192 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1193 ///
1194 /// let mut msg = vec![0; 1024];
1195 ///
1196 /// loop {
1197 /// // Wait for the pipe to be readable
1198 /// rx.readable().await?;
1199 ///
1200 /// // Try to read data, this may still fail with `WouldBlock`
1201 /// // if the readiness event is a false positive.
1202 /// match rx.try_read(&mut msg) {
1203 /// Ok(n) => {
1204 /// msg.truncate(n);
1205 /// break;
1206 /// }
1207 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1208 /// continue;
1209 /// }
1210 /// Err(e) => {
1211 /// return Err(e.into());
1212 /// }
1213 /// }
1214 /// }
1215 ///
1216 /// println!("GOT = {:?}", msg);
1217 /// Ok(())
1218 /// }
1219 /// ```
1220 ///
1221 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1222 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1223 self.io
1224 .registration()
1225 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1226 }
1227
1228 /// Tries to read data from the pipe into the provided buffers, returning
1229 /// how many bytes were read.
1230 ///
1231 /// Data is copied to fill each buffer in order, with the final buffer
1232 /// written to possibly being only partially filled. This method behaves
1233 /// equivalently to a single call to [`try_read()`] with concatenated
1234 /// buffers.
1235 ///
1236 /// Reads any pending data from the pipe but does not wait for new data
1237 /// to arrive. On success, returns the number of bytes read. Because
1238 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1239 /// stored by the async task and can exist entirely on the stack.
1240 ///
1241 /// Usually, [`readable()`] is used with this function.
1242 ///
1243 /// [`try_read()`]: Self::try_read()
1244 /// [`readable()`]: Self::readable()
1245 ///
1246 /// # Return
1247 ///
1248 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1249 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1250 /// closed and will no longer write data. If the pipe is not ready to read
1251 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1252 ///
1253 /// # Notes
1254 ///
1255 /// To avoid unnecessary syscalls, this will only attempt the read
1256 /// operation if the OS has informed Tokio that this pipe has become
1257 /// readable. Because of this, `try_read_vectored()` may fail with a
1258 /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1259 /// this pipe has become readable.
1260 ///
1261 /// # Examples
1262 ///
1263 /// ```no_run
1264 /// use tokio::net::unix::pipe;
1265 /// use std::io;
1266 ///
1267 /// #[tokio::main]
1268 /// async fn main() -> io::Result<()> {
1269 /// // Open a reading end of a fifo
1270 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1271 ///
1272 /// loop {
1273 /// // Wait for the pipe to be readable
1274 /// rx.readable().await?;
1275 ///
1276 /// // Creating the buffer **after** the `await` prevents it from
1277 /// // being stored in the async task.
1278 /// let mut buf_a = [0; 512];
1279 /// let mut buf_b = [0; 1024];
1280 /// let mut bufs = [
1281 /// io::IoSliceMut::new(&mut buf_a),
1282 /// io::IoSliceMut::new(&mut buf_b),
1283 /// ];
1284 ///
1285 /// // Try to read data, this may still fail with `WouldBlock`
1286 /// // if the readiness event is a false positive.
1287 /// match rx.try_read_vectored(&mut bufs) {
1288 /// Ok(0) => break,
1289 /// Ok(n) => {
1290 /// println!("read {} bytes", n);
1291 /// }
1292 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1293 /// continue;
1294 /// }
1295 /// Err(e) => {
1296 /// return Err(e.into());
1297 /// }
1298 /// }
1299 /// }
1300 ///
1301 /// Ok(())
1302 /// }
1303 /// ```
1304 ///
1305 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1306 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1307 self.io
1308 .registration()
1309 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1310 }
1311
1312 /// Tries to read to the socket using a user-provided IO operation.
1313 ///
1314 /// If the socket is ready, the provided closure is called. The closure
1315 /// should attempt to perform IO operation on the socket by manually
1316 /// calling the appropriate syscall. If the operation fails because the
1317 /// socket is not actually ready, then the closure should return a
1318 /// `WouldBlock` error and the readiness flag is cleared. The return value
1319 /// of the closure is then returned by `try_io`.
1320 ///
1321 /// If the socket is not ready, then the closure is not called
1322 /// and a `WouldBlock` error is returned.
1323 ///
1324 /// The closure should only return a `WouldBlock` error if it has performed
1325 /// an IO operation on the socket that failed due to the socket not being
1326 /// ready. Returning a `WouldBlock` error in any other situation will
1327 /// incorrectly clear the readiness flag, which can cause the socket to
1328 /// behave incorrectly.
1329 ///
1330 /// The closure should not perform the IO operation using any of the methods
1331 /// defined on the Tokio `pipe::Receiver` type, as this will mess with the
1332 /// readiness flag and can cause the socket to behave incorrectly.
1333 ///
1334 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1335 ///
1336 /// [`readable()`]: Self::readable()
1337 /// [`ready()`]: Self::ready()
1338 pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
1339 self.io
1340 .registration()
1341 .try_io(Interest::READABLE, || self.io.try_io(f))
1342 }
1343
1344 cfg_io_util! {
1345 /// Tries to read data from the pipe into the provided buffer, advancing the
1346 /// buffer's internal cursor, returning how many bytes were read.
1347 ///
1348 /// Reads any pending data from the pipe but does not wait for new data
1349 /// to arrive. On success, returns the number of bytes read. Because
1350 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1351 /// the async task and can exist entirely on the stack.
1352 ///
1353 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1354 ///
1355 /// [`readable()`]: Self::readable
1356 /// [`ready()`]: Self::ready
1357 ///
1358 /// # Return
1359 ///
1360 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1361 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1362 /// closed and will no longer write data. If the pipe is not ready to read
1363 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1364 ///
1365 /// # Notes
1366 ///
1367 /// To avoid unnecessary syscalls, this will only attempt the read
1368 /// operation if the OS has informed Tokio that this pipe has become
1369 /// readable. Because of this, `try_read_buf()` may fail with a
1370 /// [`WouldBlock`] error if Tokio has not yet heard from the OS that
1371 /// this pipe has become readable.
1372 ///
1373 /// # Examples
1374 ///
1375 /// ```no_run
1376 /// use tokio::net::unix::pipe;
1377 /// use std::io;
1378 ///
1379 /// #[tokio::main]
1380 /// async fn main() -> io::Result<()> {
1381 /// // Open a reading end of a fifo
1382 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1383 ///
1384 /// loop {
1385 /// // Wait for the pipe to be readable
1386 /// rx.readable().await?;
1387 ///
1388 /// let mut buf = Vec::with_capacity(4096);
1389 ///
1390 /// // Try to read data, this may still fail with `WouldBlock`
1391 /// // if the readiness event is a false positive.
1392 /// match rx.try_read_buf(&mut buf) {
1393 /// Ok(0) => break,
1394 /// Ok(n) => {
1395 /// println!("read {} bytes", n);
1396 /// }
1397 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1398 /// continue;
1399 /// }
1400 /// Err(e) => {
1401 /// return Err(e.into());
1402 /// }
1403 /// }
1404 /// }
1405 ///
1406 /// Ok(())
1407 /// }
1408 /// ```
1409 ///
1410 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1411 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1412 self.io.registration().try_io(Interest::READABLE, || {
1413 use std::io::Read;
1414
1415 let dst = buf.chunk_mut();
1416 let dst =
1417 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1418
1419 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1420 // which correctly handles reads into uninitialized memory.
1421 let n = (&*self.io).read(dst)?;
1422
1423 unsafe {
1424 buf.advance_mut(n);
1425 }
1426
1427 Ok(n)
1428 })
1429 }
1430 }
1431
1432 /// Converts the pipe into an [`OwnedFd`] in blocking mode.
1433 ///
1434 /// This function will deregister this pipe end from the event loop, set
1435 /// it in blocking mode and perform the conversion.
1436 pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
1437 let fd = self.into_nonblocking_fd()?;
1438 set_blocking(&fd)?;
1439 Ok(fd)
1440 }
1441
1442 /// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
1443 ///
1444 /// This function will deregister this pipe end from the event loop and
1445 /// perform the conversion. Returned file descriptor will be in nonblocking
1446 /// mode.
1447 pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
1448 let mio_pipe = self.io.into_inner()?;
1449
1450 // Safety: the pipe is now deregistered from the event loop
1451 // and we are the only owner of this pipe end.
1452 let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
1453
1454 Ok(owned_fd)
1455 }
1456}
1457
1458impl AsyncRead for Receiver {
1459 fn poll_read(
1460 self: Pin<&mut Self>,
1461 cx: &mut Context<'_>,
1462 buf: &mut ReadBuf<'_>,
1463 ) -> Poll<io::Result<()>> {
1464 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1465 // which correctly handles reads into uninitialized memory.
1466 unsafe { self.io.poll_read(cx, buf) }
1467 }
1468}
1469
1470impl AsRawFd for Receiver {
1471 fn as_raw_fd(&self) -> RawFd {
1472 self.io.as_raw_fd()
1473 }
1474}
1475
1476impl AsFd for Receiver {
1477 fn as_fd(&self) -> BorrowedFd<'_> {
1478 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1479 }
1480}
1481
1482/// Checks if the file descriptor is a pipe or a FIFO.
1483fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
1484 // Safety: `libc::stat` is C-like struct used for syscalls and all-zero
1485 // byte pattern forms a valid value.
1486 let mut stat: libc::stat = unsafe { std::mem::zeroed() };
1487
1488 // Safety: it's safe to call `fstat` with a valid, open file descriptor
1489 // and a valid pointer to a `stat` struct.
1490 let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
1491
1492 if r == -1 {
1493 Err(io::Error::last_os_error())
1494 } else {
1495 Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
1496 }
1497}
1498
1499/// Gets file descriptor's flags by fcntl.
1500fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
1501 // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1502 let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1503 if flags < 0 {
1504 Err(io::Error::last_os_error())
1505 } else {
1506 Ok(flags)
1507 }
1508}
1509
1510/// Checks for `O_RDONLY` or `O_RDWR` access mode.
1511fn has_read_access(flags: libc::c_int) -> bool {
1512 let mode = flags & libc::O_ACCMODE;
1513 mode == libc::O_RDONLY || mode == libc::O_RDWR
1514}
1515
1516/// Checks for `O_WRONLY` or `O_RDWR` access mode.
1517fn has_write_access(flags: libc::c_int) -> bool {
1518 let mode = flags & libc::O_ACCMODE;
1519 mode == libc::O_WRONLY || mode == libc::O_RDWR
1520}
1521
1522/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
1523fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
1524 let flags = current_flags | libc::O_NONBLOCK;
1525
1526 if flags != current_flags {
1527 // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
1528 // open file descriptor.
1529 let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
1530 if ret < 0 {
1531 return Err(io::Error::last_os_error());
1532 }
1533 }
1534
1535 Ok(())
1536}
1537
1538/// Removes `O_NONBLOCK` from fd's flags.
1539fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
1540 // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1541 let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1542 if previous == -1 {
1543 return Err(io::Error::last_os_error());
1544 }
1545
1546 let new = previous & !libc::O_NONBLOCK;
1547
1548 // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
1549 // open file descriptor.
1550 let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
1551 if r == -1 {
1552 Err(io::Error::last_os_error())
1553 } else {
1554 Ok(())
1555 }
1556}