zbus/connection/socket/
vsock.rs1#[cfg(feature = "tokio-vsock")]
2use super::{Socket, Split};
3
4#[cfg(all(feature = "vsock", not(feature = "tokio")))]
5#[async_trait::async_trait]
6impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
7 async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
8 match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
9 Err(e) => Err(e),
10 Ok(len) => {
11 #[cfg(unix)]
12 let ret = (len, vec![]);
13 #[cfg(not(unix))]
14 let ret = len;
15 Ok(ret)
16 }
17 }
18 }
19
20 fn auth_mechanism(&self) -> crate::AuthMechanism {
21 crate::AuthMechanism::Anonymous
22 }
23}
24
25#[cfg(all(feature = "vsock", not(feature = "tokio")))]
26#[async_trait::async_trait]
27impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
28 async fn sendmsg(
29 &mut self,
30 buf: &[u8],
31 #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
32 ) -> std::io::Result<usize> {
33 use std::io;
34
35 #[cfg(unix)]
36 if !fds.is_empty() {
37 return Err(io::Error::new(
38 io::ErrorKind::InvalidInput,
39 "fds cannot be sent with a vsock stream",
40 ));
41 }
42
43 futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
44 }
45
46 async fn close(&mut self) -> std::io::Result<()> {
47 let stream = self.clone();
48 crate::Task::spawn_blocking(
49 move || stream.get_ref().shutdown(std::net::Shutdown::Both),
50 "close socket",
51 )
52 .await
53 }
54}
55
56#[cfg(feature = "tokio-vsock")]
57impl Socket for tokio_vsock::VsockStream {
58 type ReadHalf = tokio_vsock::OwnedReadHalf;
59 type WriteHalf = tokio_vsock::OwnedWriteHalf;
60
61 fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
62 let (read, write) = self.into_split();
63
64 Split { read, write }
65 }
66}
67
68#[cfg(feature = "tokio-vsock")]
69#[async_trait::async_trait]
70impl super::ReadHalf for tokio_vsock::OwnedReadHalf {
71 async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
72 use tokio::io::{AsyncReadExt, ReadBuf};
73
74 let mut read_buf = ReadBuf::new(buf);
75 self.read_buf(&mut read_buf).await.map(|_| {
76 let ret = read_buf.filled().len();
77 #[cfg(unix)]
78 let ret = (ret, vec![]);
79
80 ret
81 })
82 }
83
84 fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
85 crate::conn::AuthMechanism::Anonymous
86 }
87}
88
89#[cfg(feature = "tokio-vsock")]
90#[async_trait::async_trait]
91impl super::WriteHalf for tokio_vsock::OwnedWriteHalf {
92 async fn sendmsg(
93 &mut self,
94 buf: &[u8],
95 #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
96 ) -> std::io::Result<usize> {
97 use std::io;
98 use tokio::io::AsyncWriteExt;
99
100 #[cfg(unix)]
101 if !fds.is_empty() {
102 return Err(io::Error::new(
103 io::ErrorKind::InvalidInput,
104 "fds cannot be sent with a vsock stream",
105 ));
106 }
107
108 self.write(buf).await
109 }
110
111 async fn close(&mut self) -> std::io::Result<()> {
112 tokio::io::AsyncWriteExt::shutdown(self).await
113 }
114}