tokio_stream/wrappers/tcp_listener.rs
1use crate::Stream;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::{TcpListener, TcpStream};
6
7/// A wrapper around [`TcpListener`] that implements [`Stream`].
8///
9/// # Example
10///
11/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
12///
13/// ```no_run
14/// # #[cfg(not(target_family = "wasm"))]
15/// # {
16/// use std::net::{Ipv4Addr, Ipv6Addr};
17///
18/// use tokio::net::TcpListener;
19/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
20///
21/// # #[tokio::main(flavor = "current_thread")]
22/// # async fn main() -> std::io::Result<()> {
23/// let ipv4_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
24/// let ipv6_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
25/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
26/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
27///
28/// let mut connections = ipv4_connections.merge(ipv6_connections);
29/// while let Some(tcp_stream) = connections.next().await {
30/// let stream = tcp_stream?;
31/// let peer_addr = stream.peer_addr()?;
32/// println!("accepted connection; peer address = {peer_addr}");
33/// }
34/// # Ok(())
35/// # }
36/// # }
37/// ```
38///
39/// [`TcpListener`]: struct@tokio::net::TcpListener
40/// [`Stream`]: trait@crate::Stream
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
43pub struct TcpListenerStream {
44 inner: TcpListener,
45}
46
47impl TcpListenerStream {
48 /// Create a new `TcpListenerStream`.
49 pub fn new(listener: TcpListener) -> Self {
50 Self { inner: listener }
51 }
52
53 /// Get back the inner `TcpListener`.
54 pub fn into_inner(self) -> TcpListener {
55 self.inner
56 }
57}
58
59impl Stream for TcpListenerStream {
60 type Item = io::Result<TcpStream>;
61
62 fn poll_next(
63 self: Pin<&mut Self>,
64 cx: &mut Context<'_>,
65 ) -> Poll<Option<io::Result<TcpStream>>> {
66 match self.inner.poll_accept(cx) {
67 Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
68 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
69 Poll::Pending => Poll::Pending,
70 }
71 }
72}
73
74impl AsRef<TcpListener> for TcpListenerStream {
75 fn as_ref(&self) -> &TcpListener {
76 &self.inner
77 }
78}
79
80impl AsMut<TcpListener> for TcpListenerStream {
81 fn as_mut(&mut self) -> &mut TcpListener {
82 &mut self.inner
83 }
84}