mio/sys/unix/pipe.rs
1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::fd::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9 let mut fds: [RawFd; 2] = [-1, -1];
10
11 #[cfg(any(
12 target_os = "android",
13 target_os = "dragonfly",
14 target_os = "freebsd",
15 target_os = "fuchsia",
16 target_os = "hurd",
17 target_os = "linux",
18 target_os = "netbsd",
19 target_os = "openbsd",
20 target_os = "illumos",
21 target_os = "redox",
22 target_os = "solaris",
23 target_os = "vita",
24 target_os = "cygwin",
25 ))]
26 unsafe {
27 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
28 return Err(io::Error::last_os_error());
29 }
30 }
31
32 #[cfg(any(
33 target_os = "aix",
34 target_os = "haiku",
35 target_os = "ios",
36 target_os = "macos",
37 target_os = "tvos",
38 target_os = "visionos",
39 target_os = "watchos",
40 target_os = "espidf",
41 target_os = "nto",
42 ))]
43 unsafe {
44 // For platforms that don't have `pipe2(2)` we need to manually set the
45 // correct flags on the file descriptor.
46 if libc::pipe(fds.as_mut_ptr()) != 0 {
47 return Err(io::Error::last_os_error());
48 }
49
50 for fd in &fds {
51 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
52 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
53 {
54 let err = io::Error::last_os_error();
55 // Don't leak file descriptors. Can't handle closing error though.
56 let _ = libc::close(fds[0]);
57 let _ = libc::close(fds[1]);
58 return Err(err);
59 }
60 }
61 }
62
63 Ok(fds)
64}
65
66cfg_os_ext! {
67use std::fs::File;
68use std::io::{IoSlice, IoSliceMut, Read, Write};
69use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
70use std::process::{ChildStderr, ChildStdin, ChildStdout};
71
72use crate::io_source::IoSource;
73use crate::{event, Interest, Registry, Token};
74
75/// Create a new non-blocking Unix pipe.
76///
77/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
78/// inter-process or thread communication channel.
79///
80/// This channel may be created before forking the process and then one end used
81/// in each process, e.g. the parent process has the sending end to send command
82/// to the child process.
83///
84/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
85///
86/// # Events
87///
88/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
89/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
90/// written to the `Sender` the `Receiver` will receive an [readable event].
91///
92/// In addition to those events, events will also be generated if the other side
93/// is dropped. To check if the `Sender` is dropped you'll need to check
94/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
95/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
96/// returns true the `Receiver` was dropped. Also see the second example below.
97///
98/// [`WRITABLE`]: Interest::WRITABLE
99/// [writable events]: event::Event::is_writable
100/// [`READABLE`]: Interest::READABLE
101/// [readable event]: event::Event::is_readable
102/// [`is_read_closed`]: event::Event::is_read_closed
103/// [`is_write_closed`]: event::Event::is_write_closed
104///
105/// # Deregistering
106///
107/// Both `Sender` and `Receiver` will deregister themselves when dropped,
108/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
109///
110/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
111///
112/// # Examples
113///
114/// Simple example that writes data into the sending end and read it from the
115/// receiving end.
116///
117/// ```
118/// use std::io::{self, Read, Write};
119///
120/// use mio::{Poll, Events, Interest, Token};
121/// use mio::unix::pipe;
122///
123/// // Unique tokens for the two ends of the channel.
124/// const PIPE_RECV: Token = Token(0);
125/// const PIPE_SEND: Token = Token(1);
126///
127/// # fn main() -> io::Result<()> {
128/// // Create our `Poll` instance and the `Events` container.
129/// let mut poll = Poll::new()?;
130/// let mut events = Events::with_capacity(8);
131///
132/// // Create a new pipe.
133/// let (mut sender, mut receiver) = pipe::new()?;
134///
135/// // Register both ends of the channel.
136/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
137/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
138///
139/// const MSG: &[u8; 11] = b"Hello world";
140///
141/// loop {
142/// poll.poll(&mut events, None)?;
143///
144/// for event in events.iter() {
145/// match event.token() {
146/// PIPE_SEND => sender.write(MSG)
147/// .and_then(|n| if n != MSG.len() {
148/// // We'll consider a short write an error in this
149/// // example. NOTE: we can't use `write_all` with
150/// // non-blocking I/O.
151/// Err(io::ErrorKind::WriteZero.into())
152/// } else {
153/// Ok(())
154/// })?,
155/// PIPE_RECV => {
156/// let mut buf = [0; 11];
157/// let n = receiver.read(&mut buf)?;
158/// println!("received: {:?}", &buf[0..n]);
159/// assert_eq!(n, MSG.len());
160/// assert_eq!(&buf, &*MSG);
161/// return Ok(());
162/// },
163/// _ => unreachable!(),
164/// }
165/// }
166/// }
167/// # }
168/// ```
169///
170/// Example that receives an event once the `Sender` is dropped.
171///
172/// ```
173/// # use std::io;
174/// #
175/// # use mio::{Poll, Events, Interest, Token};
176/// # use mio::unix::pipe;
177/// #
178/// # const PIPE_RECV: Token = Token(0);
179/// # const PIPE_SEND: Token = Token(1);
180/// #
181/// # fn main() -> io::Result<()> {
182/// // Same setup as in the example above.
183/// let mut poll = Poll::new()?;
184/// let mut events = Events::with_capacity(8);
185///
186/// let (mut sender, mut receiver) = pipe::new()?;
187///
188/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
189/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
190///
191/// // Drop the sender.
192/// drop(sender);
193///
194/// poll.poll(&mut events, None)?;
195///
196/// for event in events.iter() {
197/// match event.token() {
198/// PIPE_RECV if event.is_read_closed() => {
199/// // Detected that the sender was dropped.
200/// println!("Sender dropped!");
201/// return Ok(());
202/// },
203/// _ => unreachable!(),
204/// }
205/// }
206/// # unreachable!();
207/// # }
208/// ```
209pub fn new() -> io::Result<(Sender, Receiver)> {
210 let fds = new_raw()?;
211 // SAFETY: `new_raw` initialised the `fds` above.
212 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
213 let w = unsafe { Sender::from_raw_fd(fds[1]) };
214 Ok((w, r))
215}
216
217/// Sending end of an Unix pipe.
218///
219/// See [`new`] for documentation, including examples.
220#[derive(Debug)]
221pub struct Sender {
222 inner: IoSource<File>,
223}
224
225impl Sender {
226 /// Set the `Sender` into or out of non-blocking mode.
227 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
228 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
229 }
230
231 /// Execute an I/O operation ensuring that the socket receives more events
232 /// if it hits a [`WouldBlock`] error.
233 ///
234 /// # Notes
235 ///
236 /// This method is required to be called for **all** I/O operations to
237 /// ensure the user will receive events once the socket is ready again after
238 /// returning a [`WouldBlock`] error.
239 ///
240 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
241 ///
242 /// # Examples
243 ///
244 /// ```
245 /// # use std::error::Error;
246 /// #
247 /// # fn main() -> Result<(), Box<dyn Error>> {
248 /// use std::io;
249 /// use std::os::fd::AsRawFd;
250 /// use mio::unix::pipe;
251 ///
252 /// let (sender, receiver) = pipe::new()?;
253 ///
254 /// // Wait until the sender is writable...
255 ///
256 /// // Write to the sender using a direct libc call, of course the
257 /// // `io::Write` implementation would be easier to use.
258 /// let buf = b"hello";
259 /// let n = sender.try_io(|| {
260 /// let buf_ptr = &buf as *const _ as *const _;
261 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
262 /// if res != -1 {
263 /// Ok(res as usize)
264 /// } else {
265 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
266 /// // should return `WouldBlock` error.
267 /// Err(io::Error::last_os_error())
268 /// }
269 /// })?;
270 /// eprintln!("write {} bytes", n);
271 ///
272 /// // Wait until the receiver is readable...
273 ///
274 /// // Read from the receiver using a direct libc call, of course the
275 /// // `io::Read` implementation would be easier to use.
276 /// let mut buf = [0; 512];
277 /// let n = receiver.try_io(|| {
278 /// let buf_ptr = &mut buf as *mut _ as *mut _;
279 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
280 /// if res != -1 {
281 /// Ok(res as usize)
282 /// } else {
283 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
284 /// // should return `WouldBlock` error.
285 /// Err(io::Error::last_os_error())
286 /// }
287 /// })?;
288 /// eprintln!("read {} bytes", n);
289 /// # Ok(())
290 /// # }
291 /// ```
292 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
293 where
294 F: FnOnce() -> io::Result<T>,
295 {
296 self.inner.do_io(|_| f())
297 }
298}
299
300impl event::Source for Sender {
301 fn register(
302 &mut self,
303 registry: &Registry,
304 token: Token,
305 interests: Interest,
306 ) -> io::Result<()> {
307 self.inner.register(registry, token, interests)
308 }
309
310 fn reregister(
311 &mut self,
312 registry: &Registry,
313 token: Token,
314 interests: Interest,
315 ) -> io::Result<()> {
316 self.inner.reregister(registry, token, interests)
317 }
318
319 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
320 self.inner.deregister(registry)
321 }
322}
323
324impl Write for Sender {
325 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
326 self.inner.do_io(|mut sender| sender.write(buf))
327 }
328
329 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
330 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
331 }
332
333 fn flush(&mut self) -> io::Result<()> {
334 self.inner.do_io(|mut sender| sender.flush())
335 }
336}
337
338impl Write for &Sender {
339 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
340 self.inner.do_io(|mut sender| sender.write(buf))
341 }
342
343 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
344 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
345 }
346
347 fn flush(&mut self) -> io::Result<()> {
348 self.inner.do_io(|mut sender| sender.flush())
349 }
350}
351
352/// # Notes
353///
354/// The underlying pipe is **not** set to non-blocking.
355impl From<ChildStdin> for Sender {
356 fn from(stdin: ChildStdin) -> Sender {
357 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
358 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
359 }
360}
361
362impl FromRawFd for Sender {
363 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
364 Sender {
365 inner: IoSource::new(File::from_raw_fd(fd)),
366 }
367 }
368}
369
370impl AsRawFd for Sender {
371 fn as_raw_fd(&self) -> RawFd {
372 self.inner.as_raw_fd()
373 }
374}
375
376impl IntoRawFd for Sender {
377 fn into_raw_fd(self) -> RawFd {
378 self.inner.into_inner().into_raw_fd()
379 }
380}
381
382impl From<Sender> for OwnedFd {
383 fn from(sender: Sender) -> Self {
384 sender.inner.into_inner().into()
385 }
386}
387
388impl AsFd for Sender {
389 fn as_fd(&self) -> BorrowedFd<'_> {
390 self.inner.as_fd()
391 }
392}
393
394impl From<OwnedFd> for Sender {
395 fn from(fd: OwnedFd) -> Self {
396 Sender {
397 inner: IoSource::new(File::from(fd)),
398 }
399 }
400}
401
402/// Receiving end of an Unix pipe.
403///
404/// See [`new`] for documentation, including examples.
405#[derive(Debug)]
406pub struct Receiver {
407 inner: IoSource<File>,
408}
409
410impl Receiver {
411 /// Set the `Receiver` into or out of non-blocking mode.
412 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
413 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
414 }
415
416 /// Execute an I/O operation ensuring that the socket receives more events
417 /// if it hits a [`WouldBlock`] error.
418 ///
419 /// # Notes
420 ///
421 /// This method is required to be called for **all** I/O operations to
422 /// ensure the user will receive events once the socket is ready again after
423 /// returning a [`WouldBlock`] error.
424 ///
425 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
426 ///
427 /// # Examples
428 ///
429 /// ```
430 /// # use std::error::Error;
431 /// #
432 /// # fn main() -> Result<(), Box<dyn Error>> {
433 /// use std::io;
434 /// use std::os::fd::AsRawFd;
435 /// use mio::unix::pipe;
436 ///
437 /// let (sender, receiver) = pipe::new()?;
438 ///
439 /// // Wait until the sender is writable...
440 ///
441 /// // Write to the sender using a direct libc call, of course the
442 /// // `io::Write` implementation would be easier to use.
443 /// let buf = b"hello";
444 /// let n = sender.try_io(|| {
445 /// let buf_ptr = &buf as *const _ as *const _;
446 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
447 /// if res != -1 {
448 /// Ok(res as usize)
449 /// } else {
450 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
451 /// // should return `WouldBlock` error.
452 /// Err(io::Error::last_os_error())
453 /// }
454 /// })?;
455 /// eprintln!("write {} bytes", n);
456 ///
457 /// // Wait until the receiver is readable...
458 ///
459 /// // Read from the receiver using a direct libc call, of course the
460 /// // `io::Read` implementation would be easier to use.
461 /// let mut buf = [0; 512];
462 /// let n = receiver.try_io(|| {
463 /// let buf_ptr = &mut buf as *mut _ as *mut _;
464 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
465 /// if res != -1 {
466 /// Ok(res as usize)
467 /// } else {
468 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
469 /// // should return `WouldBlock` error.
470 /// Err(io::Error::last_os_error())
471 /// }
472 /// })?;
473 /// eprintln!("read {} bytes", n);
474 /// # Ok(())
475 /// # }
476 /// ```
477 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
478 where
479 F: FnOnce() -> io::Result<T>,
480 {
481 self.inner.do_io(|_| f())
482 }
483}
484
485impl event::Source for Receiver {
486 fn register(
487 &mut self,
488 registry: &Registry,
489 token: Token,
490 interests: Interest,
491 ) -> io::Result<()> {
492 self.inner.register(registry, token, interests)
493 }
494
495 fn reregister(
496 &mut self,
497 registry: &Registry,
498 token: Token,
499 interests: Interest,
500 ) -> io::Result<()> {
501 self.inner.reregister(registry, token, interests)
502 }
503
504 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
505 self.inner.deregister(registry)
506 }
507}
508
509impl Read for Receiver {
510 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
511 self.inner.do_io(|mut sender| sender.read(buf))
512 }
513
514 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
515 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
516 }
517}
518
519impl Read for &Receiver {
520 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
521 self.inner.do_io(|mut sender| sender.read(buf))
522 }
523
524 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
525 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
526 }
527}
528
529/// # Notes
530///
531/// The underlying pipe is **not** set to non-blocking.
532impl From<ChildStdout> for Receiver {
533 fn from(stdout: ChildStdout) -> Receiver {
534 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
535 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
536 }
537}
538
539/// # Notes
540///
541/// The underlying pipe is **not** set to non-blocking.
542impl From<ChildStderr> for Receiver {
543 fn from(stderr: ChildStderr) -> Receiver {
544 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
545 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
546 }
547}
548
549impl IntoRawFd for Receiver {
550 fn into_raw_fd(self) -> RawFd {
551 self.inner.into_inner().into_raw_fd()
552 }
553}
554
555impl AsRawFd for Receiver {
556 fn as_raw_fd(&self) -> RawFd {
557 self.inner.as_raw_fd()
558 }
559}
560
561impl FromRawFd for Receiver {
562 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
563 Receiver {
564 inner: IoSource::new(File::from_raw_fd(fd)),
565 }
566 }
567}
568
569impl From<Receiver> for OwnedFd {
570 fn from(receiver: Receiver) -> Self {
571 receiver.inner.into_inner().into()
572 }
573}
574
575impl AsFd for Receiver {
576 fn as_fd(&self) -> BorrowedFd<'_> {
577 self.inner.as_fd()
578 }
579}
580
581impl From<OwnedFd> for Receiver {
582 fn from(fd: OwnedFd) -> Self {
583 Receiver {
584 inner: IoSource::new(File::from(fd)),
585 }
586 }
587}
588
589#[cfg(not(any(target_os = "aix", target_os = "illumos", target_os = "solaris", target_os = "vita")))]
590fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
591 let value = nonblocking as libc::c_int;
592 if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
593 Err(io::Error::last_os_error())
594 } else {
595 Ok(())
596 }
597}
598
599#[cfg(any(target_os = "aix", target_os = "illumos", target_os = "solaris", target_os = "vita"))]
600fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
601 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
602 if flags < 0 {
603 return Err(io::Error::last_os_error());
604 }
605
606 let nflags = if nonblocking {
607 flags | libc::O_NONBLOCK
608 } else {
609 flags & !libc::O_NONBLOCK
610 };
611
612 if flags != nflags {
613 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
614 return Err(io::Error::last_os_error());
615 }
616 }
617
618 Ok(())
619}
620} // `cfg_os_ext!`.