ipc_channel/platform/unix/
mod.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::ipc::{self, IpcMessage};
11use bincode;
12use fnv::FnvHasher;
13use libc::{
14    self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
15    PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
16};
17use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
18use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
19use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
20use libc::{EAGAIN, EWOULDBLOCK};
21use mio::unix::SourceFd;
22use mio::{Events, Interest, Poll, Token};
23use std::cell::Cell;
24use std::cmp;
25use std::collections::HashMap;
26use std::convert::TryInto;
27use std::error::Error as StdError;
28use std::ffi::{c_uint, CString};
29use std::fmt::{self, Debug, Formatter};
30use std::hash::BuildHasherDefault;
31use std::io;
32use std::mem;
33use std::ops::{Deref, RangeFrom};
34use std::os::fd::RawFd;
35use std::ptr;
36use std::slice;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::{Arc, LazyLock};
39use std::thread;
40use std::time::{Duration, UNIX_EPOCH};
41use tempfile::{Builder, TempDir};
42
43const MAX_FDS_IN_CMSG: u32 = 64;
44
45// The value Linux returns for SO_SNDBUF
46// is not the size we are actually allowed to use...
47// Empirically, we have to deduct 32 bytes from that.
48const RESERVED_SIZE: usize = 32;
49
50#[cfg(any(target_os = "linux", target_os = "illumos"))]
51const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
52#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
53const SOCK_FLAGS: c_int = 0;
54
55#[cfg(any(target_os = "linux", target_os = "illumos"))]
56const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
57#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
58const RECVMSG_FLAGS: c_int = 0;
59
60#[cfg(target_env = "gnu")]
61type IovLen = usize;
62#[cfg(target_env = "gnu")]
63type MsgControlLen = size_t;
64
65#[cfg(not(target_env = "gnu"))]
66type IovLen = i32;
67#[cfg(not(target_env = "gnu"))]
68type MsgControlLen = socklen_t;
69
70unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
71    let mut sockaddr: sockaddr_un = mem::zeroed();
72    libc::strncpy(
73        sockaddr.sun_path.as_mut_ptr(),
74        path,
75        sockaddr.sun_path.len() - 1,
76    );
77    sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
78    (sockaddr, mem::size_of::<sockaddr_un>())
79}
80
81static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
82    let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
83    tx.get_system_sendbuf_size()
84        .expect("Failed to obtain maximum send size for socket")
85});
86
87// The pid of the current process which is used to create unique IDs
88static PID: LazyLock<u32> = LazyLock::new(std::process::id);
89
90// A global count used to create unique IDs
91static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
92
93pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
94    let mut results = [0, 0];
95    unsafe {
96        if socketpair(
97            libc::AF_UNIX,
98            SOCK_SEQPACKET | SOCK_FLAGS,
99            0,
100            &mut results[0],
101        ) >= 0
102        {
103            Ok((
104                OsIpcSender::from_fd(results[0]),
105                OsIpcReceiver::from_fd(results[1]),
106            ))
107        } else {
108            Err(UnixError::last())
109        }
110    }
111}
112
113#[derive(Clone, Copy)]
114struct PollEntry {
115    pub id: u64,
116    pub fd: RawFd,
117}
118
119#[derive(PartialEq, Debug)]
120pub struct OsIpcReceiver {
121    fd: Cell<c_int>,
122}
123
124impl Drop for OsIpcReceiver {
125    fn drop(&mut self) {
126        unsafe {
127            let fd = self.fd.get();
128            if fd >= 0 {
129                let result = libc::close(fd);
130                assert!(
131                    thread::panicking() || result == 0,
132                    "closed receiver (fd: {}): {}",
133                    fd,
134                    UnixError::last(),
135                );
136            }
137        }
138    }
139}
140
141impl OsIpcReceiver {
142    fn from_fd(fd: c_int) -> OsIpcReceiver {
143        OsIpcReceiver { fd: Cell::new(fd) }
144    }
145
146    fn consume_fd(&self) -> c_int {
147        let fd = self.fd.get();
148        self.fd.set(-1);
149        fd
150    }
151
152    pub fn consume(&self) -> OsIpcReceiver {
153        OsIpcReceiver::from_fd(self.consume_fd())
154    }
155
156    #[allow(clippy::type_complexity)]
157    pub fn recv(&self) -> Result<IpcMessage, UnixError> {
158        recv(self.fd.get(), BlockingMode::Blocking)
159    }
160
161    #[allow(clippy::type_complexity)]
162    pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
163        recv(self.fd.get(), BlockingMode::Nonblocking)
164    }
165
166    #[allow(clippy::type_complexity)]
167    pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
168        recv(self.fd.get(), BlockingMode::Timeout(duration))
169    }
170}
171
172#[derive(PartialEq, Debug)]
173struct SharedFileDescriptor(c_int);
174
175impl Drop for SharedFileDescriptor {
176    fn drop(&mut self) {
177        unsafe {
178            let result = libc::close(self.0);
179            assert!(thread::panicking() || result == 0);
180        }
181    }
182}
183
184#[derive(PartialEq, Debug, Clone)]
185pub struct OsIpcSender {
186    fd: Arc<SharedFileDescriptor>,
187}
188
189impl OsIpcSender {
190    fn from_fd(fd: c_int) -> OsIpcSender {
191        OsIpcSender {
192            fd: Arc::new(SharedFileDescriptor(fd)),
193        }
194    }
195
196    /// Maximum size of the kernel buffer used for transfers over this channel.
197    ///
198    /// Note: This is *not* the actual maximal packet size we are allowed to use...
199    /// Some of it is reserved by the kernel for bookkeeping.
200    fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
201        unsafe {
202            let mut socket_sendbuf_size: c_int = 0;
203            let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
204            if getsockopt(
205                self.fd.0,
206                libc::SOL_SOCKET,
207                libc::SO_SNDBUF,
208                &mut socket_sendbuf_size as *mut _ as *mut c_void,
209                &mut socket_sendbuf_size_len as *mut socklen_t,
210            ) < 0
211            {
212                return Err(UnixError::last());
213            }
214            Ok(socket_sendbuf_size.try_into().unwrap())
215        }
216    }
217
218    /// Calculate maximum payload data size per fragment.
219    ///
220    /// It is the total size of the kernel buffer, minus the part reserved by the kernel.
221    ///
222    /// The `sendbuf_size` passed in should usually be the maximum kernel buffer size,
223    /// i.e. the value of *SYSTEM_SENDBUF_SIZE --
224    /// except after getting ENOBUFS, in which case it needs to be reduced.
225    fn fragment_size(sendbuf_size: usize) -> usize {
226        sendbuf_size - RESERVED_SIZE
227    }
228
229    /// Calculate maximum payload data size of first fragment.
230    ///
231    /// This one is smaller than regular fragments, because it carries the message (size) header.
232    fn first_fragment_size(sendbuf_size: usize) -> usize {
233        (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
234        // Ensure optimal alignment.
235    }
236
237    /// Maximum data size that can be transferred over this channel in a single packet.
238    ///
239    /// This is the size of the main data chunk only --
240    /// it's independent of any auxiliary data (FDs) transferred along with it.
241    ///
242    /// A send on this channel won't block for transfers up to this size
243    /// under normal circumstances.
244    /// (It might still block if heavy memory pressure causes ENOBUFS,
245    /// forcing us to reduce the packet size.)
246    pub fn get_max_fragment_size() -> usize {
247        Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
248    }
249
250    pub fn send(
251        &self,
252        data: &[u8],
253        channels: Vec<OsIpcChannel>,
254        shared_memory_regions: Vec<OsIpcSharedMemory>,
255    ) -> Result<(), UnixError> {
256        let mut fds = Vec::new();
257        for channel in channels.iter() {
258            fds.push(channel.fd());
259        }
260        for shared_memory_region in shared_memory_regions.iter() {
261            fds.push(shared_memory_region.store.fd());
262        }
263
264        // `len` is the total length of the message.
265        // Its value will be sent as a message header before the payload data.
266        //
267        // Not to be confused with the length of the data to send in this packet
268        // (i.e. the length of the data buffer passed in),
269        // which in a fragmented send will be smaller than the total message length.
270        fn send_first_fragment(
271            sender_fd: c_int,
272            fds: &[c_int],
273            data_buffer: &[u8],
274            len: usize,
275        ) -> Result<(), UnixError> {
276            let result = unsafe {
277                let cmsg_length = mem::size_of_val(fds) as c_uint;
278                let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
279                    let cmsg_buffer =
280                        libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
281                    if cmsg_buffer.is_null() {
282                        return Err(UnixError::last());
283                    }
284                    (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
285                    (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
286                    (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
287
288                    ptr::copy_nonoverlapping(
289                        fds.as_ptr(),
290                        CMSG_DATA(cmsg_buffer) as *mut c_int,
291                        fds.len(),
292                    );
293                    (cmsg_buffer, CMSG_SPACE(cmsg_length))
294                } else {
295                    (ptr::null_mut(), 0)
296                };
297
298                let mut iovec = [
299                    // First fragment begins with a header recording the total data length.
300                    //
301                    // The receiver uses this to determine
302                    // whether it already got the entire message,
303                    // or needs to receive additional fragments -- and if so, how much.
304                    iovec {
305                        iov_base: &len as *const _ as *mut c_void,
306                        iov_len: mem::size_of_val(&len),
307                    },
308                    iovec {
309                        iov_base: data_buffer.as_ptr() as *mut c_void,
310                        iov_len: data_buffer.len(),
311                    },
312                ];
313
314                let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
315                let result = sendmsg(sender_fd, &msghdr, 0);
316                libc::free(cmsg_buffer as *mut c_void);
317                result
318            };
319
320            if result > 0 {
321                Ok(())
322            } else {
323                Err(UnixError::last())
324            }
325        }
326
327        fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
328            let result = unsafe {
329                libc::send(
330                    sender_fd,
331                    data_buffer.as_ptr() as *const c_void,
332                    data_buffer.len(),
333                    0,
334                )
335            };
336
337            if result > 0 {
338                Ok(())
339            } else {
340                Err(UnixError::last())
341            }
342        }
343
344        let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
345
346        /// Reduce send buffer size after getting ENOBUFS,
347        /// i.e. when the kernel failed to allocate a large enough buffer.
348        ///
349        /// (If the buffer already was significantly smaller
350        /// than the memory page size though,
351        /// if means something else must have gone wrong;
352        /// so there is no point in further downsizing,
353        /// and we error out instead.)
354        fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
355            if sent_size > 2000 {
356                *sendbuf_size /= 2;
357                // Make certain we end up with less than what we tried before...
358                if *sendbuf_size >= sent_size {
359                    *sendbuf_size = sent_size / 2;
360                }
361                Ok(())
362            } else {
363                Err(())
364            }
365        }
366
367        // If the message is small enough, try sending it in a single fragment.
368        if data.len() <= Self::get_max_fragment_size() {
369            match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
370                Ok(_) => return Ok(()),
371                Err(error) => {
372                    // ENOBUFS means the kernel failed to allocate a buffer large enough
373                    // to actually transfer the message,
374                    // although the message was small enough to fit the maximum send size --
375                    // so we have to proceed with a fragmented send nevertheless,
376                    // using a reduced send buffer size.
377                    //
378                    // Any other errors we might get here are non-recoverable.
379                    if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
380                        && downsize(&mut sendbuf_size, data.len()).is_ok())
381                    {
382                        return Err(error);
383                    }
384                },
385            }
386        }
387
388        // The packet is too big. Fragmentation time!
389        //
390        // Create dedicated channel to send all but the first fragment.
391        // This way we avoid fragments of different messages interleaving in the receiver.
392        //
393        // The receiver end of the channel is sent with the first fragment
394        // along any other file descriptors that are to be transferred in the message.
395        let (dedicated_tx, dedicated_rx) = channel()?;
396        // Extract FD handle without consuming the Receiver, so the FD doesn't get closed.
397        fds.push(dedicated_rx.fd.get());
398
399        // Split up the packet into fragments.
400        let mut byte_position = 0;
401        while byte_position < data.len() {
402            let end_byte_position;
403            let result = if byte_position == 0 {
404                // First fragment. No offset; but contains message header (total size).
405                // The auxiliary data (FDs) is also sent along with this one.
406
407                // This fragment always uses the full allowable buffer size.
408                end_byte_position = Self::first_fragment_size(sendbuf_size);
409                send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
410            } else {
411                // Followup fragment. No header; but offset by amount of data already sent.
412
413                end_byte_position = cmp::min(
414                    byte_position + Self::fragment_size(sendbuf_size),
415                    data.len(),
416                );
417                send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
418            };
419
420            if let Err(error) = result {
421                if matches!(error, UnixError::Errno(libc::ENOBUFS))
422                    && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
423                {
424                    // If the kernel failed to allocate a buffer large enough for the packet,
425                    // retry with a smaller size (if possible).
426                    continue;
427                } else {
428                    return Err(error);
429                }
430            }
431
432            byte_position = end_byte_position;
433        }
434
435        Ok(())
436    }
437
438    pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
439        let name = CString::new(name).unwrap();
440        unsafe {
441            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
442            let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
443            if libc::connect(
444                fd,
445                &sockaddr as *const _ as *const sockaddr,
446                len as socklen_t,
447            ) < 0
448            {
449                return Err(UnixError::last());
450            }
451
452            Ok(OsIpcSender::from_fd(fd))
453        }
454    }
455}
456
457#[derive(PartialEq, Debug)]
458pub enum OsIpcChannel {
459    Sender(OsIpcSender),
460    Receiver(OsIpcReceiver),
461}
462
463impl OsIpcChannel {
464    fn fd(&self) -> c_int {
465        match *self {
466            OsIpcChannel::Sender(ref sender) => sender.fd.0,
467            OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
468        }
469    }
470}
471
472pub struct OsIpcReceiverSet {
473    incrementor: RangeFrom<u64>,
474    poll: Poll,
475    pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
476    events: Events,
477}
478
479impl Drop for OsIpcReceiverSet {
480    fn drop(&mut self) {
481        for &PollEntry { id: _, fd } in self.pollfds.values() {
482            let result = unsafe { libc::close(fd) };
483            assert!(thread::panicking() || result == 0);
484        }
485    }
486}
487
488impl OsIpcReceiverSet {
489    pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
490        let fnv = BuildHasherDefault::<FnvHasher>::default();
491        Ok(OsIpcReceiverSet {
492            incrementor: 0..,
493            poll: Poll::new()?,
494            pollfds: HashMap::with_hasher(fnv),
495            events: Events::with_capacity(10),
496        })
497    }
498
499    pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
500        let last_index = self.incrementor.next().unwrap();
501        let fd = receiver.consume_fd();
502        let fd_token = Token(fd as usize);
503        let poll_entry = PollEntry { id: last_index, fd };
504        self.poll
505            .registry()
506            .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
507        self.pollfds.insert(fd_token, poll_entry);
508        Ok(last_index)
509    }
510
511    pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
512        // Poll until we receive at least one event.
513        loop {
514            match self.poll.poll(&mut self.events, None) {
515                Ok(()) if !self.events.is_empty() => break,
516                Ok(()) => {},
517                Err(ref error) => {
518                    if error.kind() != io::ErrorKind::Interrupted {
519                        return Err(UnixError::last());
520                    }
521                },
522            }
523
524            if !self.events.is_empty() {
525                break;
526            }
527        }
528
529        let mut selection_results = Vec::new();
530        for event in self.events.iter() {
531            // We only register this `Poll` for readable events.
532            assert!(event.is_readable());
533
534            let event_token = event.token();
535            let poll_entry = *self
536                .pollfds
537                .get(&event_token)
538                .expect("Got event for unknown token.");
539            loop {
540                match recv(poll_entry.fd, BlockingMode::Nonblocking) {
541                    Ok(ipc_message) => {
542                        selection_results.push(OsIpcSelectionResult::DataReceived(
543                            poll_entry.id,
544                            ipc_message,
545                        ));
546                    },
547                    Err(err) if err.channel_is_closed() => {
548                        self.pollfds.remove(&event_token).unwrap();
549                        self.poll
550                            .registry()
551                            .deregister(&mut SourceFd(&poll_entry.fd))
552                            .unwrap();
553                        unsafe {
554                            libc::close(poll_entry.fd);
555                        }
556
557                        selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
558                        break;
559                    },
560                    Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
561                        // We tried to read another message from the file descriptor and
562                        // it would have blocked, so we have exhausted all of the data
563                        // pending to read.
564                        break;
565                    },
566                    Err(err) => return Err(err),
567                }
568            }
569        }
570
571        Ok(selection_results)
572    }
573}
574
575pub enum OsIpcSelectionResult {
576    DataReceived(u64, IpcMessage),
577    ChannelClosed(u64),
578}
579
580impl OsIpcSelectionResult {
581    pub fn unwrap(self) -> (u64, IpcMessage) {
582        match self {
583            OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
584            OsIpcSelectionResult::ChannelClosed(id) => {
585                panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
586            },
587        }
588    }
589}
590
591#[derive(PartialEq, Debug)]
592pub struct OsOpaqueIpcChannel {
593    fd: c_int,
594}
595
596impl Drop for OsOpaqueIpcChannel {
597    fn drop(&mut self) {
598        // Make sure we don't leak!
599        //
600        // The `OsOpaqueIpcChannel` objects should always be used,
601        // i.e. converted with `to_sender()` or `to_receiver()` --
602        // so the value should already be unset before the object gets dropped.
603        debug_assert!(self.fd == -1);
604    }
605}
606
607impl OsOpaqueIpcChannel {
608    fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
609        OsOpaqueIpcChannel { fd }
610    }
611
612    pub fn to_sender(&mut self) -> OsIpcSender {
613        OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
614    }
615
616    pub fn to_receiver(&mut self) -> OsIpcReceiver {
617        OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
618    }
619}
620
621pub struct OsIpcOneShotServer {
622    fd: c_int,
623
624    // Object representing the temporary directory the socket was created in.
625    // The directory is automatically deleted (along with the socket inside it)
626    // when this field is dropped.
627    _temp_dir: TempDir,
628}
629
630impl Drop for OsIpcOneShotServer {
631    fn drop(&mut self) {
632        unsafe {
633            let result = libc::close(self.fd);
634            assert!(thread::panicking() || result == 0);
635        }
636    }
637}
638
639impl OsIpcOneShotServer {
640    pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
641        unsafe {
642            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
643            let temp_dir = Builder::new().tempdir()?;
644            let socket_path = temp_dir.path().join("socket");
645            let path_string = socket_path.to_str().unwrap();
646
647            let path_c_string = CString::new(path_string).unwrap();
648            let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
649            if libc::bind(
650                fd,
651                &sockaddr as *const _ as *const sockaddr,
652                len as socklen_t,
653            ) != 0
654            {
655                return Err(UnixError::last());
656            }
657
658            if libc::listen(fd, 10) != 0 {
659                return Err(UnixError::last());
660            }
661
662            Ok((
663                OsIpcOneShotServer {
664                    fd,
665                    _temp_dir: temp_dir,
666                },
667                path_string.to_string(),
668            ))
669        }
670    }
671
672    #[allow(clippy::type_complexity)]
673    pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
674        unsafe {
675            let sockaddr: *mut sockaddr = ptr::null_mut();
676            let sockaddr_len: *mut socklen_t = ptr::null_mut();
677            let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
678            if client_fd < 0 {
679                return Err(UnixError::last());
680            }
681            make_socket_lingering(client_fd)?;
682
683            let receiver = OsIpcReceiver::from_fd(client_fd);
684            let ipc_message = receiver.recv()?;
685            Ok((receiver, ipc_message))
686        }
687    }
688}
689
690// Make sure that the kernel doesn't return errors to readers if there's still data left after we
691// close our end.
692//
693// See, for example, https://github.com/servo/ipc-channel/issues/29
694fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
695    let linger = linger {
696        l_onoff: 1,
697        l_linger: 30,
698    };
699    let err = unsafe {
700        setsockopt(
701            sockfd,
702            SOL_SOCKET,
703            SO_LINGER,
704            &linger as *const _ as *const c_void,
705            mem::size_of::<linger>() as socklen_t,
706        )
707    };
708    if err < 0 {
709        let error = UnixError::last();
710        if let UnixError::Errno(libc::EINVAL) = error {
711            // If the other side of the connection is already closed, POSIX.1-2024 (and earlier
712            // versions) require that setsockopt return EINVAL [1]. This is a bit unfortunate
713            // because SO_LINGER for a closed socket is logically a no-op, which is why some OSes
714            // like Linux don't follow this part of the spec. But other OSes like illumos do return
715            // EINVAL here.
716            //
717            // SO_LINGER is widely understood and EINVAL should not occur for any other reason, so
718            // accept those errors.
719            //
720            // Another option would be to call make_socket_lingering on the initial socket created
721            // by libc::socket, but whether accept inherits a particular option is
722            // implementation-defined [2]. This means that special-casing EINVAL is the most
723            // portable thing to do.
724            //
725            // [1] https://pubs.opengroup.org/onlinepubs/9799919799/functions/setsockopt.html:
726            //     "[EINVAL] The specified option is invalid at the specified socket level or the
727            //     socket has been shut down."
728            //
729            // [2] https://pubs.opengroup.org/onlinepubs/9799919799/functions/accept.html: "It is
730            //     implementation-defined which socket options, if any, on the accepted socket will
731            //     have a default value determined by a value previously customized by setsockopt()
732            //     on socket, rather than the default value used for other new sockets."
733        } else {
734            return Err(error);
735        }
736    }
737    Ok(())
738}
739
740struct BackingStore {
741    fd: c_int,
742}
743
744impl BackingStore {
745    pub fn new(length: usize) -> BackingStore {
746        let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
747        let timestamp = UNIX_EPOCH.elapsed().unwrap();
748        let name = CString::new(format!(
749            "/ipc-channel-shared-memory.{}.{}.{}.{}",
750            count,
751            *PID,
752            timestamp.as_secs(),
753            timestamp.subsec_nanos()
754        ))
755        .unwrap();
756        let fd = create_shmem(name, length);
757        Self::from_fd(fd)
758    }
759
760    pub fn from_fd(fd: c_int) -> BackingStore {
761        BackingStore { fd }
762    }
763
764    pub fn fd(&self) -> c_int {
765        self.fd
766    }
767
768    pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
769        let length = length.unwrap_or_else(|| {
770            let mut st = mem::MaybeUninit::uninit();
771            if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
772                panic!("error stating fd {}: {}", self.fd, UnixError::last());
773            }
774            st.assume_init().st_size as size_t
775        });
776        if length == 0 {
777            // This will cause `mmap` to fail, so handle it explicitly.
778            return (ptr::null_mut(), length);
779        }
780        let address = libc::mmap(
781            ptr::null_mut(),
782            length,
783            PROT_READ | PROT_WRITE,
784            MAP_SHARED,
785            self.fd,
786            0,
787        );
788        assert!(!address.is_null());
789        assert!(address != MAP_FAILED);
790        (address as *mut u8, length)
791    }
792}
793
794impl Drop for BackingStore {
795    fn drop(&mut self) {
796        unsafe {
797            let result = libc::close(self.fd);
798            assert!(thread::panicking() || result == 0);
799        }
800    }
801}
802
803pub struct OsIpcSharedMemory {
804    ptr: *mut u8,
805    length: usize,
806    store: BackingStore,
807}
808
809unsafe impl Send for OsIpcSharedMemory {}
810unsafe impl Sync for OsIpcSharedMemory {}
811
812impl Drop for OsIpcSharedMemory {
813    fn drop(&mut self) {
814        unsafe {
815            if !self.ptr.is_null() {
816                let result = libc::munmap(self.ptr as *mut c_void, self.length);
817                assert!(thread::panicking() || result == 0);
818            }
819        }
820    }
821}
822
823impl Clone for OsIpcSharedMemory {
824    fn clone(&self) -> OsIpcSharedMemory {
825        unsafe {
826            let store = BackingStore::from_fd(libc::dup(self.store.fd()));
827            let (address, _) = store.map_file(Some(self.length));
828            OsIpcSharedMemory::from_raw_parts(address, self.length, store)
829        }
830    }
831}
832
833impl PartialEq for OsIpcSharedMemory {
834    fn eq(&self, other: &OsIpcSharedMemory) -> bool {
835        **self == **other
836    }
837}
838
839impl Debug for OsIpcSharedMemory {
840    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
841        (**self).fmt(formatter)
842    }
843}
844
845impl Deref for OsIpcSharedMemory {
846    type Target = [u8];
847
848    #[inline]
849    fn deref(&self) -> &[u8] {
850        unsafe { slice::from_raw_parts(self.ptr, self.length) }
851    }
852}
853
854impl OsIpcSharedMemory {
855    /// # Safety
856    ///
857    /// This is safe if there is only one reader/writer on the data.
858    /// User can achieve this by not cloning [`IpcSharedMemory`]
859    /// and serializing/deserializing only once.
860    #[inline]
861    pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
862        unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
863    }
864}
865
866impl OsIpcSharedMemory {
867    unsafe fn from_raw_parts(
868        ptr: *mut u8,
869        length: usize,
870        store: BackingStore,
871    ) -> OsIpcSharedMemory {
872        OsIpcSharedMemory { ptr, length, store }
873    }
874
875    unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
876        let store = BackingStore::from_fd(fd);
877        let (ptr, length) = store.map_file(None);
878        OsIpcSharedMemory::from_raw_parts(ptr, length, store)
879    }
880
881    pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
882        unsafe {
883            let store = BackingStore::new(length);
884            let (address, _) = store.map_file(Some(length));
885            for element in slice::from_raw_parts_mut(address, length) {
886                *element = byte;
887            }
888            OsIpcSharedMemory::from_raw_parts(address, length, store)
889        }
890    }
891
892    pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
893        unsafe {
894            let store = BackingStore::new(bytes.len());
895            let (address, _) = store.map_file(Some(bytes.len()));
896            ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
897            OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
898        }
899    }
900}
901
902#[derive(Debug)]
903pub enum UnixError {
904    Errno(c_int),
905    ChannelClosed,
906    IoError(io::Error),
907}
908
909impl UnixError {
910    fn last() -> UnixError {
911        UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
912    }
913
914    #[allow(dead_code)]
915    pub fn channel_is_closed(&self) -> bool {
916        matches!(self, UnixError::ChannelClosed)
917    }
918}
919
920impl fmt::Display for UnixError {
921    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
922        match self {
923            UnixError::Errno(errno) => {
924                fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
925            },
926            UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
927            UnixError::IoError(e) => write!(fmt, "{e}"),
928        }
929    }
930}
931
932impl StdError for UnixError {}
933
934impl From<UnixError> for bincode::Error {
935    fn from(unix_error: UnixError) -> Self {
936        io::Error::from(unix_error).into()
937    }
938}
939
940impl From<UnixError> for io::Error {
941    fn from(unix_error: UnixError) -> io::Error {
942        match unix_error {
943            UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
944            UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
945            UnixError::IoError(e) => e,
946        }
947    }
948}
949
950impl From<UnixError> for ipc::IpcError {
951    fn from(error: UnixError) -> Self {
952        match error {
953            UnixError::ChannelClosed => ipc::IpcError::Disconnected,
954            e => ipc::IpcError::Io(io::Error::from(e)),
955        }
956    }
957}
958
959impl From<UnixError> for ipc::TryRecvError {
960    fn from(error: UnixError) -> Self {
961        match error {
962            UnixError::ChannelClosed => ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected),
963            UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
964                ipc::TryRecvError::Empty
965            },
966            e => ipc::TryRecvError::IpcError(ipc::IpcError::Io(io::Error::from(e))),
967        }
968    }
969}
970
971impl From<io::Error> for UnixError {
972    fn from(e: io::Error) -> UnixError {
973        if let Some(errno) = e.raw_os_error() {
974            UnixError::Errno(errno)
975        } else {
976            assert!(e.kind() == io::ErrorKind::ConnectionReset);
977            UnixError::ChannelClosed
978        }
979    }
980}
981
982#[derive(Copy, Clone)]
983enum BlockingMode {
984    Blocking,
985    Nonblocking,
986    Timeout(Duration),
987}
988
989#[allow(clippy::uninit_vec, clippy::type_complexity)]
990fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
991    let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
992
993    // First fragments begins with a header recording the total data length.
994    //
995    // We use this to determine whether we already got the entire message,
996    // or need to receive additional fragments -- and if so, how much.
997    let mut total_size = 0usize;
998    let mut main_data_buffer;
999    unsafe {
1000        // Allocate a buffer without initialising the memory.
1001        main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1002        main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1003
1004        let mut iovec = [
1005            iovec {
1006                iov_base: &mut total_size as *mut _ as *mut c_void,
1007                iov_len: mem::size_of_val(&total_size),
1008            },
1009            iovec {
1010                iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1011                iov_len: main_data_buffer.len(),
1012            },
1013        ];
1014        let mut cmsg = UnixCmsg::new(&mut iovec)?;
1015
1016        let bytes_read = cmsg.recv(fd, blocking_mode)?;
1017        main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1018
1019        let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1020        let cmsg_length = cmsg.msghdr.msg_controllen;
1021        let channel_length = if cmsg_length == 0 {
1022            0
1023        } else {
1024            // The control header is followed by an array of FDs. The size of the control header is
1025            // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
1026            // exposed by libc. CMSG_SPACE(0) is the portable version of that.)
1027            (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1028        };
1029        for index in 0..channel_length {
1030            let fd = *cmsg_fds.add(index);
1031            if is_socket(fd) {
1032                channels.push(OsOpaqueIpcChannel::from_fd(fd));
1033                continue;
1034            }
1035            shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1036        }
1037    }
1038
1039    if total_size == main_data_buffer.len() {
1040        // Fast path: no fragments.
1041        return Ok(IpcMessage::new(
1042            main_data_buffer,
1043            channels,
1044            shared_memory_regions,
1045        ));
1046    }
1047
1048    // Reassemble fragments.
1049    //
1050    // The initial fragment carries the receive end of a dedicated channel
1051    // through which all the remaining fragments will be coming in.
1052    let dedicated_rx = channels.pop().unwrap().to_receiver();
1053
1054    // Extend the buffer to hold the entire message, without initialising the memory.
1055    let len = main_data_buffer.len();
1056    main_data_buffer.reserve_exact(total_size - len);
1057
1058    // Receive followup fragments directly into the main buffer.
1059    while main_data_buffer.len() < total_size {
1060        let write_pos = main_data_buffer.len();
1061        let end_pos = cmp::min(
1062            write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1063            total_size,
1064        );
1065        let result = unsafe {
1066            assert!(end_pos <= main_data_buffer.capacity());
1067            main_data_buffer.set_len(end_pos);
1068
1069            // Integer underflow could make the following code unsound...
1070            assert!(end_pos >= write_pos);
1071
1072            // Note: we always use blocking mode for followup fragments,
1073            // to make sure that once we start receiving a multi-fragment message,
1074            // we don't abort in the middle of it...
1075            let result = libc::recv(
1076                dedicated_rx.fd.get(),
1077                main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1078                end_pos - write_pos,
1079                0,
1080            );
1081            main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1082            result
1083        };
1084
1085        match result.cmp(&0) {
1086            cmp::Ordering::Greater => continue,
1087            cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1088            cmp::Ordering::Less => return Err(UnixError::last()),
1089        }
1090    }
1091
1092    Ok(IpcMessage::new(
1093        main_data_buffer,
1094        channels,
1095        shared_memory_regions,
1096    ))
1097}
1098
1099// https://github.com/servo/ipc-channel/issues/192
1100fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1101    let mut msghdr: msghdr = unsafe { mem::zeroed() };
1102    msghdr.msg_name = ptr::null_mut();
1103    msghdr.msg_namelen = 0;
1104    msghdr.msg_iov = iovec.as_mut_ptr();
1105    msghdr.msg_iovlen = iovec.len() as IovLen;
1106    msghdr.msg_control = cmsg_buffer as *mut c_void;
1107    msghdr.msg_controllen = cmsg_space;
1108    msghdr.msg_flags = 0;
1109    msghdr
1110}
1111
1112fn create_shmem(name: CString, length: usize) -> c_int {
1113    unsafe {
1114        let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1115        assert!(fd >= 0);
1116        assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1117        fd
1118    }
1119}
1120
1121struct UnixCmsg {
1122    cmsg_buffer: *mut cmsghdr,
1123    msghdr: msghdr,
1124}
1125
1126unsafe impl Send for UnixCmsg {}
1127
1128impl Drop for UnixCmsg {
1129    fn drop(&mut self) {
1130        unsafe {
1131            libc::free(self.cmsg_buffer as *mut c_void);
1132        }
1133    }
1134}
1135
1136impl UnixCmsg {
1137    unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1138        let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1139        let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1140        if cmsg_buffer.is_null() {
1141            return Err(UnixError::last());
1142        }
1143        Ok(UnixCmsg {
1144            cmsg_buffer,
1145            msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1146        })
1147    }
1148
1149    unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1150        match blocking_mode {
1151            BlockingMode::Nonblocking => {
1152                if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1153                    return Err(UnixError::last());
1154                }
1155            },
1156            BlockingMode::Timeout(duration) => {
1157                let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1158
1159                let mut fd = [libc::pollfd {
1160                    fd,
1161                    events,
1162                    revents: 0,
1163                }];
1164                let result = libc::poll(
1165                    fd.as_mut_ptr(),
1166                    fd.len() as _,
1167                    duration.as_millis().try_into().unwrap_or(-1),
1168                );
1169
1170                match result.cmp(&0) {
1171                    cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1172                    cmp::Ordering::Less => return Err(UnixError::last()),
1173                    cmp::Ordering::Greater => {},
1174                }
1175            },
1176            BlockingMode::Blocking => {},
1177        }
1178
1179        let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1180
1181        let result = match result.cmp(&0) {
1182            cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1183            cmp::Ordering::Less => Err(UnixError::last()),
1184            cmp::Ordering::Greater => Ok(result as usize),
1185        };
1186
1187        if let BlockingMode::Nonblocking = blocking_mode {
1188            if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1189                return Err(UnixError::last());
1190            }
1191        }
1192        result
1193    }
1194
1195    unsafe fn cmsg_len(&self) -> size_t {
1196        (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1197    }
1198}
1199
1200fn is_socket(fd: c_int) -> bool {
1201    unsafe {
1202        let mut st = mem::MaybeUninit::uninit();
1203        if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1204            return false;
1205        }
1206        (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1207    }
1208}