zbus/connection/socket/
vsock.rs

1#[cfg(feature = "tokio-vsock")]
2use super::{Socket, Split};
3
4#[cfg(all(feature = "vsock", not(feature = "tokio")))]
5#[async_trait::async_trait]
6impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
7    async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
8        match futures_lite::AsyncReadExt::read(&mut self.as_ref(), buf).await {
9            Err(e) => Err(e),
10            Ok(len) => {
11                #[cfg(unix)]
12                let ret = (len, vec![]);
13                #[cfg(not(unix))]
14                let ret = len;
15                Ok(ret)
16            }
17        }
18    }
19
20    fn auth_mechanism(&self) -> crate::AuthMechanism {
21        crate::AuthMechanism::Anonymous
22    }
23}
24
25#[cfg(all(feature = "vsock", not(feature = "tokio")))]
26#[async_trait::async_trait]
27impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
28    async fn sendmsg(
29        &mut self,
30        buf: &[u8],
31        #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
32    ) -> std::io::Result<usize> {
33        use std::io;
34
35        #[cfg(unix)]
36        if !fds.is_empty() {
37            return Err(io::Error::new(
38                io::ErrorKind::InvalidInput,
39                "fds cannot be sent with a vsock stream",
40            ));
41        }
42
43        futures_lite::AsyncWriteExt::write(&mut self.as_ref(), buf).await
44    }
45
46    async fn close(&mut self) -> std::io::Result<()> {
47        let stream = self.clone();
48        crate::Task::spawn_blocking(
49            move || stream.get_ref().shutdown(std::net::Shutdown::Both),
50            "close socket",
51        )
52        .await
53    }
54}
55
56#[cfg(feature = "tokio-vsock")]
57impl Socket for tokio_vsock::VsockStream {
58    type ReadHalf = tokio_vsock::OwnedReadHalf;
59    type WriteHalf = tokio_vsock::OwnedWriteHalf;
60
61    fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
62        let (read, write) = self.into_split();
63
64        Split { read, write }
65    }
66}
67
68#[cfg(feature = "tokio-vsock")]
69#[async_trait::async_trait]
70impl super::ReadHalf for tokio_vsock::OwnedReadHalf {
71    async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
72        use tokio::io::{AsyncReadExt, ReadBuf};
73
74        let mut read_buf = ReadBuf::new(buf);
75        self.read_buf(&mut read_buf).await.map(|_| {
76            let ret = read_buf.filled().len();
77            #[cfg(unix)]
78            let ret = (ret, vec![]);
79
80            ret
81        })
82    }
83
84    fn auth_mechanism(&self) -> crate::conn::AuthMechanism {
85        crate::conn::AuthMechanism::Anonymous
86    }
87}
88
89#[cfg(feature = "tokio-vsock")]
90#[async_trait::async_trait]
91impl super::WriteHalf for tokio_vsock::OwnedWriteHalf {
92    async fn sendmsg(
93        &mut self,
94        buf: &[u8],
95        #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
96    ) -> std::io::Result<usize> {
97        use std::io;
98        use tokio::io::AsyncWriteExt;
99
100        #[cfg(unix)]
101        if !fds.is_empty() {
102            return Err(io::Error::new(
103                io::ErrorKind::InvalidInput,
104                "fds cannot be sent with a vsock stream",
105            ));
106        }
107
108        self.write(buf).await
109    }
110
111    async fn close(&mut self) -> std::io::Result<()> {
112        tokio::io::AsyncWriteExt::shutdown(self).await
113    }
114}