1use crate::ipc::{self, IpcMessage};
11use bincode;
12use fnv::FnvHasher;
13use libc::{
14 self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
15 PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
16};
17use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
18use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
19use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
20use libc::{EAGAIN, EWOULDBLOCK};
21use mio::unix::SourceFd;
22use mio::{Events, Interest, Poll, Token};
23use std::cell::Cell;
24use std::cmp;
25use std::collections::HashMap;
26use std::convert::TryInto;
27use std::error::Error as StdError;
28use std::ffi::{c_uint, CString};
29use std::fmt::{self, Debug, Formatter};
30use std::hash::BuildHasherDefault;
31use std::io;
32use std::mem;
33use std::ops::{Deref, RangeFrom};
34use std::os::fd::RawFd;
35use std::ptr;
36use std::slice;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::{Arc, LazyLock};
39use std::thread;
40use std::time::{Duration, UNIX_EPOCH};
41use tempfile::{Builder, TempDir};
42
43const MAX_FDS_IN_CMSG: u32 = 64;
44
45const RESERVED_SIZE: usize = 32;
49
50#[cfg(any(target_os = "linux", target_os = "illumos"))]
51const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
52#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
53const SOCK_FLAGS: c_int = 0;
54
55#[cfg(any(target_os = "linux", target_os = "illumos"))]
56const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
57#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
58const RECVMSG_FLAGS: c_int = 0;
59
60#[cfg(target_env = "gnu")]
61type IovLen = usize;
62#[cfg(target_env = "gnu")]
63type MsgControlLen = size_t;
64
65#[cfg(not(target_env = "gnu"))]
66type IovLen = i32;
67#[cfg(not(target_env = "gnu"))]
68type MsgControlLen = socklen_t;
69
70unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
71 let mut sockaddr: sockaddr_un = mem::zeroed();
72 libc::strncpy(
73 sockaddr.sun_path.as_mut_ptr(),
74 path,
75 sockaddr.sun_path.len() - 1,
76 );
77 sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
78 (sockaddr, mem::size_of::<sockaddr_un>())
79}
80
81static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
82 let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
83 tx.get_system_sendbuf_size()
84 .expect("Failed to obtain maximum send size for socket")
85});
86
87static PID: LazyLock<u32> = LazyLock::new(std::process::id);
89
90static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
92
93pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
94 let mut results = [0, 0];
95 unsafe {
96 if socketpair(
97 libc::AF_UNIX,
98 SOCK_SEQPACKET | SOCK_FLAGS,
99 0,
100 &mut results[0],
101 ) >= 0
102 {
103 Ok((
104 OsIpcSender::from_fd(results[0]),
105 OsIpcReceiver::from_fd(results[1]),
106 ))
107 } else {
108 Err(UnixError::last())
109 }
110 }
111}
112
113#[derive(Clone, Copy)]
114struct PollEntry {
115 pub id: u64,
116 pub fd: RawFd,
117}
118
119#[derive(PartialEq, Debug)]
120pub struct OsIpcReceiver {
121 fd: Cell<c_int>,
122}
123
124impl Drop for OsIpcReceiver {
125 fn drop(&mut self) {
126 unsafe {
127 let fd = self.fd.get();
128 if fd >= 0 {
129 let result = libc::close(fd);
130 assert!(
131 thread::panicking() || result == 0,
132 "closed receiver (fd: {}): {}",
133 fd,
134 UnixError::last(),
135 );
136 }
137 }
138 }
139}
140
141impl OsIpcReceiver {
142 fn from_fd(fd: c_int) -> OsIpcReceiver {
143 OsIpcReceiver { fd: Cell::new(fd) }
144 }
145
146 fn consume_fd(&self) -> c_int {
147 let fd = self.fd.get();
148 self.fd.set(-1);
149 fd
150 }
151
152 pub fn consume(&self) -> OsIpcReceiver {
153 OsIpcReceiver::from_fd(self.consume_fd())
154 }
155
156 #[allow(clippy::type_complexity)]
157 pub fn recv(&self) -> Result<IpcMessage, UnixError> {
158 recv(self.fd.get(), BlockingMode::Blocking)
159 }
160
161 #[allow(clippy::type_complexity)]
162 pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
163 recv(self.fd.get(), BlockingMode::Nonblocking)
164 }
165
166 #[allow(clippy::type_complexity)]
167 pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
168 recv(self.fd.get(), BlockingMode::Timeout(duration))
169 }
170}
171
172#[derive(PartialEq, Debug)]
173struct SharedFileDescriptor(c_int);
174
175impl Drop for SharedFileDescriptor {
176 fn drop(&mut self) {
177 unsafe {
178 let result = libc::close(self.0);
179 assert!(thread::panicking() || result == 0);
180 }
181 }
182}
183
184#[derive(PartialEq, Debug, Clone)]
185pub struct OsIpcSender {
186 fd: Arc<SharedFileDescriptor>,
187}
188
189impl OsIpcSender {
190 fn from_fd(fd: c_int) -> OsIpcSender {
191 OsIpcSender {
192 fd: Arc::new(SharedFileDescriptor(fd)),
193 }
194 }
195
196 fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
201 unsafe {
202 let mut socket_sendbuf_size: c_int = 0;
203 let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
204 if getsockopt(
205 self.fd.0,
206 libc::SOL_SOCKET,
207 libc::SO_SNDBUF,
208 &mut socket_sendbuf_size as *mut _ as *mut c_void,
209 &mut socket_sendbuf_size_len as *mut socklen_t,
210 ) < 0
211 {
212 return Err(UnixError::last());
213 }
214 Ok(socket_sendbuf_size.try_into().unwrap())
215 }
216 }
217
218 fn fragment_size(sendbuf_size: usize) -> usize {
226 sendbuf_size - RESERVED_SIZE
227 }
228
229 fn first_fragment_size(sendbuf_size: usize) -> usize {
233 (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
234 }
236
237 pub fn get_max_fragment_size() -> usize {
247 Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
248 }
249
250 pub fn send(
251 &self,
252 data: &[u8],
253 channels: Vec<OsIpcChannel>,
254 shared_memory_regions: Vec<OsIpcSharedMemory>,
255 ) -> Result<(), UnixError> {
256 let mut fds = Vec::new();
257 for channel in channels.iter() {
258 fds.push(channel.fd());
259 }
260 for shared_memory_region in shared_memory_regions.iter() {
261 fds.push(shared_memory_region.store.fd());
262 }
263
264 fn send_first_fragment(
271 sender_fd: c_int,
272 fds: &[c_int],
273 data_buffer: &[u8],
274 len: usize,
275 ) -> Result<(), UnixError> {
276 let result = unsafe {
277 let cmsg_length = mem::size_of_val(fds) as c_uint;
278 let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
279 let cmsg_buffer =
280 libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
281 if cmsg_buffer.is_null() {
282 return Err(UnixError::last());
283 }
284 (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
285 (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
286 (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
287
288 ptr::copy_nonoverlapping(
289 fds.as_ptr(),
290 CMSG_DATA(cmsg_buffer) as *mut c_int,
291 fds.len(),
292 );
293 (cmsg_buffer, CMSG_SPACE(cmsg_length))
294 } else {
295 (ptr::null_mut(), 0)
296 };
297
298 let mut iovec = [
299 iovec {
305 iov_base: &len as *const _ as *mut c_void,
306 iov_len: mem::size_of_val(&len),
307 },
308 iovec {
309 iov_base: data_buffer.as_ptr() as *mut c_void,
310 iov_len: data_buffer.len(),
311 },
312 ];
313
314 let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
315 let result = sendmsg(sender_fd, &msghdr, 0);
316 libc::free(cmsg_buffer as *mut c_void);
317 result
318 };
319
320 if result > 0 {
321 Ok(())
322 } else {
323 Err(UnixError::last())
324 }
325 }
326
327 fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
328 let result = unsafe {
329 libc::send(
330 sender_fd,
331 data_buffer.as_ptr() as *const c_void,
332 data_buffer.len(),
333 0,
334 )
335 };
336
337 if result > 0 {
338 Ok(())
339 } else {
340 Err(UnixError::last())
341 }
342 }
343
344 let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
345
346 fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
355 if sent_size > 2000 {
356 *sendbuf_size /= 2;
357 if *sendbuf_size >= sent_size {
359 *sendbuf_size = sent_size / 2;
360 }
361 Ok(())
362 } else {
363 Err(())
364 }
365 }
366
367 if data.len() <= Self::get_max_fragment_size() {
369 match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
370 Ok(_) => return Ok(()),
371 Err(error) => {
372 if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
380 && downsize(&mut sendbuf_size, data.len()).is_ok())
381 {
382 return Err(error);
383 }
384 },
385 }
386 }
387
388 let (dedicated_tx, dedicated_rx) = channel()?;
396 fds.push(dedicated_rx.fd.get());
398
399 let mut byte_position = 0;
401 while byte_position < data.len() {
402 let end_byte_position;
403 let result = if byte_position == 0 {
404 end_byte_position = Self::first_fragment_size(sendbuf_size);
409 send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
410 } else {
411 end_byte_position = cmp::min(
414 byte_position + Self::fragment_size(sendbuf_size),
415 data.len(),
416 );
417 send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
418 };
419
420 if let Err(error) = result {
421 if matches!(error, UnixError::Errno(libc::ENOBUFS))
422 && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
423 {
424 continue;
427 } else {
428 return Err(error);
429 }
430 }
431
432 byte_position = end_byte_position;
433 }
434
435 Ok(())
436 }
437
438 pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
439 let name = CString::new(name).unwrap();
440 unsafe {
441 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
442 let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
443 if libc::connect(
444 fd,
445 &sockaddr as *const _ as *const sockaddr,
446 len as socklen_t,
447 ) < 0
448 {
449 return Err(UnixError::last());
450 }
451
452 Ok(OsIpcSender::from_fd(fd))
453 }
454 }
455}
456
457#[derive(PartialEq, Debug)]
458pub enum OsIpcChannel {
459 Sender(OsIpcSender),
460 Receiver(OsIpcReceiver),
461}
462
463impl OsIpcChannel {
464 fn fd(&self) -> c_int {
465 match *self {
466 OsIpcChannel::Sender(ref sender) => sender.fd.0,
467 OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
468 }
469 }
470}
471
472pub struct OsIpcReceiverSet {
473 incrementor: RangeFrom<u64>,
474 poll: Poll,
475 pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
476 events: Events,
477}
478
479impl Drop for OsIpcReceiverSet {
480 fn drop(&mut self) {
481 for &PollEntry { id: _, fd } in self.pollfds.values() {
482 let result = unsafe { libc::close(fd) };
483 assert!(thread::panicking() || result == 0);
484 }
485 }
486}
487
488impl OsIpcReceiverSet {
489 pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
490 let fnv = BuildHasherDefault::<FnvHasher>::default();
491 Ok(OsIpcReceiverSet {
492 incrementor: 0..,
493 poll: Poll::new()?,
494 pollfds: HashMap::with_hasher(fnv),
495 events: Events::with_capacity(10),
496 })
497 }
498
499 pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
500 let last_index = self.incrementor.next().unwrap();
501 let fd = receiver.consume_fd();
502 let fd_token = Token(fd as usize);
503 let poll_entry = PollEntry { id: last_index, fd };
504 self.poll
505 .registry()
506 .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
507 self.pollfds.insert(fd_token, poll_entry);
508 Ok(last_index)
509 }
510
511 pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
512 loop {
514 match self.poll.poll(&mut self.events, None) {
515 Ok(()) if !self.events.is_empty() => break,
516 Ok(()) => {},
517 Err(ref error) => {
518 if error.kind() != io::ErrorKind::Interrupted {
519 return Err(UnixError::last());
520 }
521 },
522 }
523
524 if !self.events.is_empty() {
525 break;
526 }
527 }
528
529 let mut selection_results = Vec::new();
530 for event in self.events.iter() {
531 assert!(event.is_readable());
533
534 let event_token = event.token();
535 let poll_entry = *self
536 .pollfds
537 .get(&event_token)
538 .expect("Got event for unknown token.");
539 loop {
540 match recv(poll_entry.fd, BlockingMode::Nonblocking) {
541 Ok(ipc_message) => {
542 selection_results.push(OsIpcSelectionResult::DataReceived(
543 poll_entry.id,
544 ipc_message,
545 ));
546 },
547 Err(err) if err.channel_is_closed() => {
548 self.pollfds.remove(&event_token).unwrap();
549 self.poll
550 .registry()
551 .deregister(&mut SourceFd(&poll_entry.fd))
552 .unwrap();
553 unsafe {
554 libc::close(poll_entry.fd);
555 }
556
557 selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
558 break;
559 },
560 Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
561 break;
565 },
566 Err(err) => return Err(err),
567 }
568 }
569 }
570
571 Ok(selection_results)
572 }
573}
574
575pub enum OsIpcSelectionResult {
576 DataReceived(u64, IpcMessage),
577 ChannelClosed(u64),
578}
579
580impl OsIpcSelectionResult {
581 pub fn unwrap(self) -> (u64, IpcMessage) {
582 match self {
583 OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
584 OsIpcSelectionResult::ChannelClosed(id) => {
585 panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
586 },
587 }
588 }
589}
590
591#[derive(PartialEq, Debug)]
592pub struct OsOpaqueIpcChannel {
593 fd: c_int,
594}
595
596impl Drop for OsOpaqueIpcChannel {
597 fn drop(&mut self) {
598 debug_assert!(self.fd == -1);
604 }
605}
606
607impl OsOpaqueIpcChannel {
608 fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
609 OsOpaqueIpcChannel { fd }
610 }
611
612 pub fn to_sender(&mut self) -> OsIpcSender {
613 OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
614 }
615
616 pub fn to_receiver(&mut self) -> OsIpcReceiver {
617 OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
618 }
619}
620
621pub struct OsIpcOneShotServer {
622 fd: c_int,
623
624 _temp_dir: TempDir,
628}
629
630impl Drop for OsIpcOneShotServer {
631 fn drop(&mut self) {
632 unsafe {
633 let result = libc::close(self.fd);
634 assert!(thread::panicking() || result == 0);
635 }
636 }
637}
638
639impl OsIpcOneShotServer {
640 pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
641 unsafe {
642 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
643 let temp_dir = Builder::new().tempdir()?;
644 let socket_path = temp_dir.path().join("socket");
645 let path_string = socket_path.to_str().unwrap();
646
647 let path_c_string = CString::new(path_string).unwrap();
648 let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
649 if libc::bind(
650 fd,
651 &sockaddr as *const _ as *const sockaddr,
652 len as socklen_t,
653 ) != 0
654 {
655 return Err(UnixError::last());
656 }
657
658 if libc::listen(fd, 10) != 0 {
659 return Err(UnixError::last());
660 }
661
662 Ok((
663 OsIpcOneShotServer {
664 fd,
665 _temp_dir: temp_dir,
666 },
667 path_string.to_string(),
668 ))
669 }
670 }
671
672 #[allow(clippy::type_complexity)]
673 pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
674 unsafe {
675 let sockaddr: *mut sockaddr = ptr::null_mut();
676 let sockaddr_len: *mut socklen_t = ptr::null_mut();
677 let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
678 if client_fd < 0 {
679 return Err(UnixError::last());
680 }
681 make_socket_lingering(client_fd)?;
682
683 let receiver = OsIpcReceiver::from_fd(client_fd);
684 let ipc_message = receiver.recv()?;
685 Ok((receiver, ipc_message))
686 }
687 }
688}
689
690fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
695 let linger = linger {
696 l_onoff: 1,
697 l_linger: 30,
698 };
699 let err = unsafe {
700 setsockopt(
701 sockfd,
702 SOL_SOCKET,
703 SO_LINGER,
704 &linger as *const _ as *const c_void,
705 mem::size_of::<linger>() as socklen_t,
706 )
707 };
708 if err < 0 {
709 let error = UnixError::last();
710 if let UnixError::Errno(libc::EINVAL) = error {
711 } else {
734 return Err(error);
735 }
736 }
737 Ok(())
738}
739
740struct BackingStore {
741 fd: c_int,
742}
743
744impl BackingStore {
745 pub fn new(length: usize) -> BackingStore {
746 let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
747 let timestamp = UNIX_EPOCH.elapsed().unwrap();
748 let name = CString::new(format!(
749 "/ipc-channel-shared-memory.{}.{}.{}.{}",
750 count,
751 *PID,
752 timestamp.as_secs(),
753 timestamp.subsec_nanos()
754 ))
755 .unwrap();
756 let fd = create_shmem(name, length);
757 Self::from_fd(fd)
758 }
759
760 pub fn from_fd(fd: c_int) -> BackingStore {
761 BackingStore { fd }
762 }
763
764 pub fn fd(&self) -> c_int {
765 self.fd
766 }
767
768 pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
769 let length = length.unwrap_or_else(|| {
770 let mut st = mem::MaybeUninit::uninit();
771 if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
772 panic!("error stating fd {}: {}", self.fd, UnixError::last());
773 }
774 st.assume_init().st_size as size_t
775 });
776 if length == 0 {
777 return (ptr::null_mut(), length);
779 }
780 let address = libc::mmap(
781 ptr::null_mut(),
782 length,
783 PROT_READ | PROT_WRITE,
784 MAP_SHARED,
785 self.fd,
786 0,
787 );
788 assert!(!address.is_null());
789 assert!(address != MAP_FAILED);
790 (address as *mut u8, length)
791 }
792}
793
794impl Drop for BackingStore {
795 fn drop(&mut self) {
796 unsafe {
797 let result = libc::close(self.fd);
798 assert!(thread::panicking() || result == 0);
799 }
800 }
801}
802
803pub struct OsIpcSharedMemory {
804 ptr: *mut u8,
805 length: usize,
806 store: BackingStore,
807}
808
809unsafe impl Send for OsIpcSharedMemory {}
810unsafe impl Sync for OsIpcSharedMemory {}
811
812impl Drop for OsIpcSharedMemory {
813 fn drop(&mut self) {
814 unsafe {
815 if !self.ptr.is_null() {
816 let result = libc::munmap(self.ptr as *mut c_void, self.length);
817 assert!(thread::panicking() || result == 0);
818 }
819 }
820 }
821}
822
823impl Clone for OsIpcSharedMemory {
824 fn clone(&self) -> OsIpcSharedMemory {
825 unsafe {
826 let store = BackingStore::from_fd(libc::dup(self.store.fd()));
827 let (address, _) = store.map_file(Some(self.length));
828 OsIpcSharedMemory::from_raw_parts(address, self.length, store)
829 }
830 }
831}
832
833impl PartialEq for OsIpcSharedMemory {
834 fn eq(&self, other: &OsIpcSharedMemory) -> bool {
835 **self == **other
836 }
837}
838
839impl Debug for OsIpcSharedMemory {
840 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
841 (**self).fmt(formatter)
842 }
843}
844
845impl Deref for OsIpcSharedMemory {
846 type Target = [u8];
847
848 #[inline]
849 fn deref(&self) -> &[u8] {
850 unsafe { slice::from_raw_parts(self.ptr, self.length) }
851 }
852}
853
854impl OsIpcSharedMemory {
855 #[inline]
861 pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
862 unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
863 }
864}
865
866impl OsIpcSharedMemory {
867 unsafe fn from_raw_parts(
868 ptr: *mut u8,
869 length: usize,
870 store: BackingStore,
871 ) -> OsIpcSharedMemory {
872 OsIpcSharedMemory { ptr, length, store }
873 }
874
875 unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
876 let store = BackingStore::from_fd(fd);
877 let (ptr, length) = store.map_file(None);
878 OsIpcSharedMemory::from_raw_parts(ptr, length, store)
879 }
880
881 pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
882 unsafe {
883 let store = BackingStore::new(length);
884 let (address, _) = store.map_file(Some(length));
885 for element in slice::from_raw_parts_mut(address, length) {
886 *element = byte;
887 }
888 OsIpcSharedMemory::from_raw_parts(address, length, store)
889 }
890 }
891
892 pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
893 unsafe {
894 let store = BackingStore::new(bytes.len());
895 let (address, _) = store.map_file(Some(bytes.len()));
896 ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
897 OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
898 }
899 }
900}
901
902#[derive(Debug)]
903pub enum UnixError {
904 Errno(c_int),
905 ChannelClosed,
906 IoError(io::Error),
907}
908
909impl UnixError {
910 fn last() -> UnixError {
911 UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
912 }
913
914 #[allow(dead_code)]
915 pub fn channel_is_closed(&self) -> bool {
916 matches!(self, UnixError::ChannelClosed)
917 }
918}
919
920impl fmt::Display for UnixError {
921 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
922 match self {
923 UnixError::Errno(errno) => {
924 fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
925 },
926 UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
927 UnixError::IoError(e) => write!(fmt, "{e}"),
928 }
929 }
930}
931
932impl StdError for UnixError {}
933
934impl From<UnixError> for bincode::Error {
935 fn from(unix_error: UnixError) -> Self {
936 io::Error::from(unix_error).into()
937 }
938}
939
940impl From<UnixError> for io::Error {
941 fn from(unix_error: UnixError) -> io::Error {
942 match unix_error {
943 UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
944 UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
945 UnixError::IoError(e) => e,
946 }
947 }
948}
949
950impl From<UnixError> for ipc::IpcError {
951 fn from(error: UnixError) -> Self {
952 match error {
953 UnixError::ChannelClosed => ipc::IpcError::Disconnected,
954 e => ipc::IpcError::Io(io::Error::from(e)),
955 }
956 }
957}
958
959impl From<UnixError> for ipc::TryRecvError {
960 fn from(error: UnixError) -> Self {
961 match error {
962 UnixError::ChannelClosed => ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected),
963 UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
964 ipc::TryRecvError::Empty
965 },
966 e => ipc::TryRecvError::IpcError(ipc::IpcError::Io(io::Error::from(e))),
967 }
968 }
969}
970
971impl From<io::Error> for UnixError {
972 fn from(e: io::Error) -> UnixError {
973 if let Some(errno) = e.raw_os_error() {
974 UnixError::Errno(errno)
975 } else {
976 assert!(e.kind() == io::ErrorKind::ConnectionReset);
977 UnixError::ChannelClosed
978 }
979 }
980}
981
982#[derive(Copy, Clone)]
983enum BlockingMode {
984 Blocking,
985 Nonblocking,
986 Timeout(Duration),
987}
988
989#[allow(clippy::uninit_vec, clippy::type_complexity)]
990fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
991 let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
992
993 let mut total_size = 0usize;
998 let mut main_data_buffer;
999 unsafe {
1000 main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1002 main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1003
1004 let mut iovec = [
1005 iovec {
1006 iov_base: &mut total_size as *mut _ as *mut c_void,
1007 iov_len: mem::size_of_val(&total_size),
1008 },
1009 iovec {
1010 iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1011 iov_len: main_data_buffer.len(),
1012 },
1013 ];
1014 let mut cmsg = UnixCmsg::new(&mut iovec)?;
1015
1016 let bytes_read = cmsg.recv(fd, blocking_mode)?;
1017 main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1018
1019 let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1020 let cmsg_length = cmsg.msghdr.msg_controllen;
1021 let channel_length = if cmsg_length == 0 {
1022 0
1023 } else {
1024 (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1028 };
1029 for index in 0..channel_length {
1030 let fd = *cmsg_fds.add(index);
1031 if is_socket(fd) {
1032 channels.push(OsOpaqueIpcChannel::from_fd(fd));
1033 continue;
1034 }
1035 shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1036 }
1037 }
1038
1039 if total_size == main_data_buffer.len() {
1040 return Ok(IpcMessage::new(
1042 main_data_buffer,
1043 channels,
1044 shared_memory_regions,
1045 ));
1046 }
1047
1048 let dedicated_rx = channels.pop().unwrap().to_receiver();
1053
1054 let len = main_data_buffer.len();
1056 main_data_buffer.reserve_exact(total_size - len);
1057
1058 while main_data_buffer.len() < total_size {
1060 let write_pos = main_data_buffer.len();
1061 let end_pos = cmp::min(
1062 write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1063 total_size,
1064 );
1065 let result = unsafe {
1066 assert!(end_pos <= main_data_buffer.capacity());
1067 main_data_buffer.set_len(end_pos);
1068
1069 assert!(end_pos >= write_pos);
1071
1072 let result = libc::recv(
1076 dedicated_rx.fd.get(),
1077 main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1078 end_pos - write_pos,
1079 0,
1080 );
1081 main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1082 result
1083 };
1084
1085 match result.cmp(&0) {
1086 cmp::Ordering::Greater => continue,
1087 cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1088 cmp::Ordering::Less => return Err(UnixError::last()),
1089 }
1090 }
1091
1092 Ok(IpcMessage::new(
1093 main_data_buffer,
1094 channels,
1095 shared_memory_regions,
1096 ))
1097}
1098
1099fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1101 let mut msghdr: msghdr = unsafe { mem::zeroed() };
1102 msghdr.msg_name = ptr::null_mut();
1103 msghdr.msg_namelen = 0;
1104 msghdr.msg_iov = iovec.as_mut_ptr();
1105 msghdr.msg_iovlen = iovec.len() as IovLen;
1106 msghdr.msg_control = cmsg_buffer as *mut c_void;
1107 msghdr.msg_controllen = cmsg_space;
1108 msghdr.msg_flags = 0;
1109 msghdr
1110}
1111
1112fn create_shmem(name: CString, length: usize) -> c_int {
1113 unsafe {
1114 let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1115 assert!(fd >= 0);
1116 assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1117 fd
1118 }
1119}
1120
1121struct UnixCmsg {
1122 cmsg_buffer: *mut cmsghdr,
1123 msghdr: msghdr,
1124}
1125
1126unsafe impl Send for UnixCmsg {}
1127
1128impl Drop for UnixCmsg {
1129 fn drop(&mut self) {
1130 unsafe {
1131 libc::free(self.cmsg_buffer as *mut c_void);
1132 }
1133 }
1134}
1135
1136impl UnixCmsg {
1137 unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1138 let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1139 let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1140 if cmsg_buffer.is_null() {
1141 return Err(UnixError::last());
1142 }
1143 Ok(UnixCmsg {
1144 cmsg_buffer,
1145 msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1146 })
1147 }
1148
1149 unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1150 match blocking_mode {
1151 BlockingMode::Nonblocking => {
1152 if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1153 return Err(UnixError::last());
1154 }
1155 },
1156 BlockingMode::Timeout(duration) => {
1157 let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1158
1159 let mut fd = [libc::pollfd {
1160 fd,
1161 events,
1162 revents: 0,
1163 }];
1164 let result = libc::poll(
1165 fd.as_mut_ptr(),
1166 fd.len() as _,
1167 duration.as_millis().try_into().unwrap_or(-1),
1168 );
1169
1170 match result.cmp(&0) {
1171 cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1172 cmp::Ordering::Less => return Err(UnixError::last()),
1173 cmp::Ordering::Greater => {},
1174 }
1175 },
1176 BlockingMode::Blocking => {},
1177 }
1178
1179 let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1180
1181 let result = match result.cmp(&0) {
1182 cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1183 cmp::Ordering::Less => Err(UnixError::last()),
1184 cmp::Ordering::Greater => Ok(result as usize),
1185 };
1186
1187 if let BlockingMode::Nonblocking = blocking_mode {
1188 if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1189 return Err(UnixError::last());
1190 }
1191 }
1192 result
1193 }
1194
1195 unsafe fn cmsg_len(&self) -> size_t {
1196 (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1197 }
1198}
1199
1200fn is_socket(fd: c_int) -> bool {
1201 unsafe {
1202 let mut st = mem::MaybeUninit::uninit();
1203 if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1204 return false;
1205 }
1206 (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1207 }
1208}