tokio_stream/wrappers/
tcp_listener.rs

1use crate::Stream;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::{TcpListener, TcpStream};
6
7/// A wrapper around [`TcpListener`] that implements [`Stream`].
8///
9/// # Example
10///
11/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
12///
13/// ```no_run
14/// # #[cfg(not(target_family = "wasm"))]
15/// # {
16/// use std::net::{Ipv4Addr, Ipv6Addr};
17///
18/// use tokio::net::TcpListener;
19/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
20///
21/// # #[tokio::main(flavor = "current_thread")]
22/// # async fn main() -> std::io::Result<()> {
23/// let ipv4_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
24/// let ipv6_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
25/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
26/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
27///
28/// let mut connections = ipv4_connections.merge(ipv6_connections);
29/// while let Some(tcp_stream) = connections.next().await {
30///     let stream = tcp_stream?;
31///     let peer_addr = stream.peer_addr()?;
32///     println!("accepted connection; peer address = {peer_addr}");
33/// }
34/// # Ok(())
35/// # }
36/// # }
37/// ```
38///
39/// [`TcpListener`]: struct@tokio::net::TcpListener
40/// [`Stream`]: trait@crate::Stream
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
43pub struct TcpListenerStream {
44    inner: TcpListener,
45}
46
47impl TcpListenerStream {
48    /// Create a new `TcpListenerStream`.
49    pub fn new(listener: TcpListener) -> Self {
50        Self { inner: listener }
51    }
52
53    /// Get back the inner `TcpListener`.
54    pub fn into_inner(self) -> TcpListener {
55        self.inner
56    }
57}
58
59impl Stream for TcpListenerStream {
60    type Item = io::Result<TcpStream>;
61
62    fn poll_next(
63        self: Pin<&mut Self>,
64        cx: &mut Context<'_>,
65    ) -> Poll<Option<io::Result<TcpStream>>> {
66        match self.inner.poll_accept(cx) {
67            Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
68            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
69            Poll::Pending => Poll::Pending,
70        }
71    }
72}
73
74impl AsRef<TcpListener> for TcpListenerStream {
75    fn as_ref(&self) -> &TcpListener {
76        &self.inner
77    }
78}
79
80impl AsMut<TcpListener> for TcpListenerStream {
81    fn as_mut(&mut self) -> &mut TcpListener {
82        &mut self.inner
83    }
84}