tokio/net/udp.rs
1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::{to_socket_addrs, ToSocketAddrs};
3use crate::util::check_socket_for_blocking;
4
5use std::fmt;
6use std::io;
7use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
8use std::task::{ready, Context, Poll};
9
10cfg_io_util! {
11 use bytes::BufMut;
12}
13
14cfg_net! {
15 /// A UDP socket.
16 ///
17 /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
18 /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
19 ///
20 /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
21 /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
22 /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
23 /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
24 ///
25 /// This type does not provide a `split` method, because this functionality
26 /// can be achieved by instead wrapping the socket in an [`Arc`]. Note that
27 /// you do not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>`
28 /// is enough. This is because all of the methods take `&self` instead of
29 /// `&mut self`. Once you have wrapped it in an `Arc`, you can call
30 /// `.clone()` on the `Arc<UdpSocket>` to get multiple shared handles to the
31 /// same socket. An example of such usage can be found further down.
32 ///
33 /// [`Arc`]: std::sync::Arc
34 ///
35 /// # Streams
36 ///
37 /// If you need to listen over UDP and produce a [`Stream`], you can look
38 /// at [`UdpFramed`].
39 ///
40 /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html
41 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
42 ///
43 /// # Example: one to many (bind)
44 ///
45 /// Using `bind` we can create a simple echo server that sends and recv's with many different clients:
46 /// ```no_run
47 /// use tokio::net::UdpSocket;
48 /// use std::io;
49 ///
50 /// #[tokio::main]
51 /// async fn main() -> io::Result<()> {
52 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
53 /// let mut buf = [0; 1024];
54 /// loop {
55 /// let (len, addr) = sock.recv_from(&mut buf).await?;
56 /// println!("{:?} bytes received from {:?}", len, addr);
57 ///
58 /// let len = sock.send_to(&buf[..len], addr).await?;
59 /// println!("{:?} bytes sent", len);
60 /// }
61 /// }
62 /// ```
63 ///
64 /// # Example: one to one (connect)
65 ///
66 /// Or using `connect` we can echo with a single remote address using `send` and `recv`:
67 /// ```no_run
68 /// use tokio::net::UdpSocket;
69 /// use std::io;
70 ///
71 /// #[tokio::main]
72 /// async fn main() -> io::Result<()> {
73 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
74 ///
75 /// let remote_addr = "127.0.0.1:59611";
76 /// sock.connect(remote_addr).await?;
77 /// let mut buf = [0; 1024];
78 /// loop {
79 /// let len = sock.recv(&mut buf).await?;
80 /// println!("{:?} bytes received from {:?}", len, remote_addr);
81 ///
82 /// let len = sock.send(&buf[..len]).await?;
83 /// println!("{:?} bytes sent", len);
84 /// }
85 /// }
86 /// ```
87 ///
88 /// # Example: Splitting with `Arc`
89 ///
90 /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright
91 /// to use an `Arc<UdpSocket>` and share the references to multiple tasks.
92 /// Here is a similar "echo" example that supports concurrent
93 /// sending/receiving:
94 ///
95 /// ```no_run
96 /// use tokio::{net::UdpSocket, sync::mpsc};
97 /// use std::{io, net::SocketAddr, sync::Arc};
98 ///
99 /// #[tokio::main]
100 /// async fn main() -> io::Result<()> {
101 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
102 /// let r = Arc::new(sock);
103 /// let s = r.clone();
104 /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
105 ///
106 /// tokio::spawn(async move {
107 /// while let Some((bytes, addr)) = rx.recv().await {
108 /// let len = s.send_to(&bytes, &addr).await.unwrap();
109 /// println!("{:?} bytes sent", len);
110 /// }
111 /// });
112 ///
113 /// let mut buf = [0; 1024];
114 /// loop {
115 /// let (len, addr) = r.recv_from(&mut buf).await?;
116 /// println!("{:?} bytes received from {:?}", len, addr);
117 /// tx.send((buf[..len].to_vec(), addr)).await.unwrap();
118 /// }
119 /// }
120 /// ```
121 ///
122 pub struct UdpSocket {
123 io: PollEvented<mio::net::UdpSocket>,
124 }
125}
126
127impl UdpSocket {
128 /// This function will create a new UDP socket and attempt to bind it to
129 /// the `addr` provided.
130 ///
131 /// Binding with a port number of 0 will request that the OS assigns a port
132 /// to this listener. The port allocated can be queried via the `local_addr`
133 /// method.
134 ///
135 /// # Example
136 ///
137 /// ```no_run
138 /// use tokio::net::UdpSocket;
139 /// use std::io;
140 ///
141 /// #[tokio::main]
142 /// async fn main() -> io::Result<()> {
143 /// # if cfg!(miri) { return Ok(()); } // No `socket` in miri.
144 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
145 /// // use `sock`
146 /// # let _ = sock;
147 /// Ok(())
148 /// }
149 /// ```
150 pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
151 let addrs = to_socket_addrs(addr).await?;
152 let mut last_err = None;
153
154 for addr in addrs {
155 match UdpSocket::bind_addr(addr) {
156 Ok(socket) => return Ok(socket),
157 Err(e) => last_err = Some(e),
158 }
159 }
160
161 Err(last_err.unwrap_or_else(|| {
162 io::Error::new(
163 io::ErrorKind::InvalidInput,
164 "could not resolve to any address",
165 )
166 }))
167 }
168
169 fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
170 let sys = mio::net::UdpSocket::bind(addr)?;
171 UdpSocket::new(sys)
172 }
173
174 #[track_caller]
175 fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> {
176 let io = PollEvented::new(socket)?;
177 Ok(UdpSocket { io })
178 }
179
180 /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
181 ///
182 /// This function is intended to be used to wrap a UDP socket from the
183 /// standard library in the Tokio equivalent.
184 ///
185 /// This can be used in conjunction with `socket2`'s `Socket` interface to
186 /// configure a socket before it's handed off, such as setting options like
187 /// `reuse_address` or binding to multiple addresses.
188 ///
189 /// # Notes
190 ///
191 /// The caller is responsible for ensuring that the socket is in
192 /// non-blocking mode. Otherwise all I/O operations on the socket
193 /// will block the thread, which will cause unexpected behavior.
194 /// Non-blocking mode can be set using [`set_nonblocking`].
195 ///
196 /// Passing a listener in blocking mode is always erroneous,
197 /// and the behavior in that case may change in the future.
198 /// For example, it could panic.
199 ///
200 /// [`set_nonblocking`]: std::net::UdpSocket::set_nonblocking
201 ///
202 /// # Panics
203 ///
204 /// This function panics if thread-local runtime is not set.
205 ///
206 /// The runtime is usually set implicitly when this function is called
207 /// from a future driven by a tokio runtime, otherwise runtime can be set
208 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
209 ///
210 /// # Example
211 ///
212 /// ```no_run
213 /// use tokio::net::UdpSocket;
214 /// # use std::{io, net::SocketAddr};
215 ///
216 /// # #[tokio::main]
217 /// # async fn main() -> io::Result<()> {
218 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
219 /// let std_sock = std::net::UdpSocket::bind(addr)?;
220 /// std_sock.set_nonblocking(true)?;
221 /// let sock = UdpSocket::from_std(std_sock)?;
222 /// // use `sock`
223 /// # Ok(())
224 /// # }
225 /// ```
226 #[track_caller]
227 pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
228 check_socket_for_blocking(&socket)?;
229
230 let io = mio::net::UdpSocket::from_std(socket);
231 UdpSocket::new(io)
232 }
233
234 /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`].
235 ///
236 /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as
237 /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
238 ///
239 /// # Examples
240 ///
241 /// ```rust,no_run
242 /// use std::error::Error;
243 ///
244 /// #[tokio::main]
245 /// async fn main() -> Result<(), Box<dyn Error>> {
246 /// let tokio_socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
247 /// let std_socket = tokio_socket.into_std()?;
248 /// std_socket.set_nonblocking(false)?;
249 /// Ok(())
250 /// }
251 /// ```
252 ///
253 /// [`tokio::net::UdpSocket`]: UdpSocket
254 /// [`std::net::UdpSocket`]: std::net::UdpSocket
255 /// [`set_nonblocking`]: fn@std::net::UdpSocket::set_nonblocking
256 pub fn into_std(self) -> io::Result<std::net::UdpSocket> {
257 #[cfg(not(windows))]
258 {
259 use std::os::fd::{FromRawFd, IntoRawFd};
260 self.io
261 .into_inner()
262 .map(IntoRawFd::into_raw_fd)
263 .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) })
264 }
265
266 #[cfg(windows)]
267 {
268 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
269 self.io
270 .into_inner()
271 .map(|io| io.into_raw_socket())
272 .map(|raw_socket| unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) })
273 }
274 }
275
276 fn as_socket(&self) -> socket2::SockRef<'_> {
277 socket2::SockRef::from(self)
278 }
279
280 /// Returns the local address that this socket is bound to.
281 ///
282 /// # Example
283 ///
284 /// ```no_run
285 /// use tokio::net::UdpSocket;
286 /// # use std::{io, net::SocketAddr};
287 ///
288 /// # #[tokio::main]
289 /// # async fn main() -> io::Result<()> {
290 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
291 /// let sock = UdpSocket::bind(addr).await?;
292 /// // the address the socket is bound to
293 /// let local_addr = sock.local_addr()?;
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub fn local_addr(&self) -> io::Result<SocketAddr> {
298 self.io.local_addr()
299 }
300
301 /// Returns the socket address of the remote peer this socket was connected to.
302 ///
303 /// # Example
304 ///
305 /// ```
306 /// use tokio::net::UdpSocket;
307 ///
308 /// # use std::{io, net::SocketAddr};
309 /// # #[tokio::main]
310 /// # async fn main() -> io::Result<()> {
311 /// # if cfg!(miri) { return Ok(()); } // No `socket` in miri.
312 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
313 /// let peer = "127.0.0.1:11100".parse::<SocketAddr>().unwrap();
314 /// let sock = UdpSocket::bind(addr).await?;
315 /// sock.connect(peer).await?;
316 /// assert_eq!(peer, sock.peer_addr()?);
317 /// # Ok(())
318 /// # }
319 /// ```
320 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
321 self.io.peer_addr()
322 }
323
324 /// Connects the UDP socket setting the default destination for send() and
325 /// limiting packets that are read via `recv` from the address specified in
326 /// `addr`.
327 ///
328 /// # Example
329 ///
330 /// ```no_run
331 /// use tokio::net::UdpSocket;
332 /// # use std::{io, net::SocketAddr};
333 ///
334 /// # #[tokio::main]
335 /// # async fn main() -> io::Result<()> {
336 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
337 ///
338 /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap();
339 /// sock.connect(remote_addr).await?;
340 /// let mut buf = [0u8; 32];
341 /// // recv from remote_addr
342 /// let len = sock.recv(&mut buf).await?;
343 /// // send to remote_addr
344 /// let _len = sock.send(&buf[..len]).await?;
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> {
349 let addrs = to_socket_addrs(addr).await?;
350 let mut last_err = None;
351
352 for addr in addrs {
353 match self.io.connect(addr) {
354 Ok(()) => return Ok(()),
355 Err(e) => last_err = Some(e),
356 }
357 }
358
359 Err(last_err.unwrap_or_else(|| {
360 io::Error::new(
361 io::ErrorKind::InvalidInput,
362 "could not resolve to any address",
363 )
364 }))
365 }
366
367 /// Waits for any of the requested ready states.
368 ///
369 /// This function is usually paired with `try_recv()` or `try_send()`. It
370 /// can be used to concurrently `recv` / `send` to the same socket on a single
371 /// task without splitting the socket.
372 ///
373 /// The function may complete without the socket being ready. This is a
374 /// false-positive and attempting an operation will return with
375 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
376 /// [`Ready`] set, so you should always check the returned value and possibly
377 /// wait again if the requested states are not set.
378 ///
379 /// # Cancel safety
380 ///
381 /// This method is cancel safe. Once a readiness event occurs, the method
382 /// will continue to return immediately until the readiness event is
383 /// consumed by an attempt to read or write that fails with `WouldBlock` or
384 /// `Poll::Pending`.
385 ///
386 /// # Examples
387 ///
388 /// Concurrently receive from and send to the socket on the same task
389 /// without splitting.
390 ///
391 /// ```no_run
392 /// use tokio::io::{self, Interest};
393 /// use tokio::net::UdpSocket;
394 ///
395 /// #[tokio::main]
396 /// async fn main() -> io::Result<()> {
397 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
398 /// socket.connect("127.0.0.1:8081").await?;
399 ///
400 /// loop {
401 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
402 ///
403 /// if ready.is_readable() {
404 /// // The buffer is **not** included in the async task and will only exist
405 /// // on the stack.
406 /// let mut data = [0; 1024];
407 /// match socket.try_recv(&mut data[..]) {
408 /// Ok(n) => {
409 /// println!("received {:?}", &data[..n]);
410 /// }
411 /// // False-positive, continue
412 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
413 /// Err(e) => {
414 /// return Err(e);
415 /// }
416 /// }
417 /// }
418 ///
419 /// if ready.is_writable() {
420 /// // Write some data
421 /// match socket.try_send(b"hello world") {
422 /// Ok(n) => {
423 /// println!("sent {} bytes", n);
424 /// }
425 /// // False-positive, continue
426 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
427 /// Err(e) => {
428 /// return Err(e);
429 /// }
430 /// }
431 /// }
432 /// }
433 /// }
434 /// ```
435 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
436 let event = self.io.registration().readiness(interest).await?;
437 Ok(event.ready)
438 }
439
440 /// Waits for the socket to become writable.
441 ///
442 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
443 /// usually paired with `try_send()` or `try_send_to()`.
444 ///
445 /// The function may complete without the socket being writable. This is a
446 /// false-positive and attempting a `try_send()` will return with
447 /// `io::ErrorKind::WouldBlock`.
448 ///
449 /// # Cancel safety
450 ///
451 /// This method is cancel safe. Once a readiness event occurs, the method
452 /// will continue to return immediately until the readiness event is
453 /// consumed by an attempt to write that fails with `WouldBlock` or
454 /// `Poll::Pending`.
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// use tokio::net::UdpSocket;
460 /// use std::io;
461 ///
462 /// #[tokio::main]
463 /// async fn main() -> io::Result<()> {
464 /// // Bind socket
465 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
466 /// socket.connect("127.0.0.1:8081").await?;
467 ///
468 /// loop {
469 /// // Wait for the socket to be writable
470 /// socket.writable().await?;
471 ///
472 /// // Try to send data, this may still fail with `WouldBlock`
473 /// // if the readiness event is a false positive.
474 /// match socket.try_send(b"hello world") {
475 /// Ok(n) => {
476 /// break;
477 /// }
478 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
479 /// continue;
480 /// }
481 /// Err(e) => {
482 /// return Err(e);
483 /// }
484 /// }
485 /// }
486 ///
487 /// Ok(())
488 /// }
489 /// ```
490 pub async fn writable(&self) -> io::Result<()> {
491 self.ready(Interest::WRITABLE).await?;
492 Ok(())
493 }
494
495 /// Polls for write/send readiness.
496 ///
497 /// If the udp stream is not currently ready for sending, this method will
498 /// store a clone of the `Waker` from the provided `Context`. When the udp
499 /// stream becomes ready for sending, `Waker::wake` will be called on the
500 /// waker.
501 ///
502 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
503 /// the `Waker` from the `Context` passed to the most recent call is
504 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
505 /// second, independent waker.)
506 ///
507 /// This function is intended for cases where creating and pinning a future
508 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
509 /// preferred, as this supports polling from multiple tasks at once.
510 ///
511 /// # Return value
512 ///
513 /// The function returns:
514 ///
515 /// * `Poll::Pending` if the udp stream is not ready for writing.
516 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
517 /// * `Poll::Ready(Err(e))` if an error is encountered.
518 ///
519 /// # Errors
520 ///
521 /// This function may encounter any standard I/O error except `WouldBlock`.
522 ///
523 /// [`writable`]: method@Self::writable
524 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
525 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
526 }
527
528 /// Sends data on the socket to the remote address that the socket is
529 /// connected to.
530 ///
531 /// The [`connect`] method will connect this socket to a remote address.
532 /// This method will fail if the socket is not connected.
533 ///
534 /// This method may fail with a [`ConnectionRefused`] error if the remote
535 /// address has replied with ICMP Unreachable to a previously sent packet.
536 /// However, this behavior depends on the OS.
537 ///
538 /// [`connect`]: method@Self::connect
539 /// [`ConnectionRefused`]: std::io::ErrorKind::ConnectionRefused
540 ///
541 /// # Return
542 ///
543 /// On success, the number of bytes sent is returned, otherwise, the
544 /// encountered error is returned.
545 ///
546 /// # Cancel safety
547 ///
548 /// This method is cancel safe. If `send` is used as the event in a
549 /// [`tokio::select!`](crate::select) statement and some other branch
550 /// completes first, then it is guaranteed that the message was not sent.
551 ///
552 /// # Examples
553 ///
554 /// ```no_run
555 /// use tokio::io;
556 /// use tokio::net::UdpSocket;
557 ///
558 /// #[tokio::main]
559 /// async fn main() -> io::Result<()> {
560 /// // Bind socket
561 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
562 /// socket.connect("127.0.0.1:8081").await?;
563 ///
564 /// // Send a message
565 /// socket.send(b"hello world").await?;
566 ///
567 /// Ok(())
568 /// }
569 /// ```
570 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
571 self.io
572 .registration()
573 .async_io(Interest::WRITABLE, || self.io.send(buf))
574 .await
575 }
576
577 /// Attempts to send data on the socket to the remote address to which it
578 /// was previously `connect`ed.
579 ///
580 /// The [`connect`] method will connect this socket to a remote address.
581 /// This method will fail if the socket is not connected.
582 ///
583 /// Note that on multiple calls to a `poll_*` method in the send direction,
584 /// only the `Waker` from the `Context` passed to the most recent call will
585 /// be scheduled to receive a wakeup.
586 ///
587 /// # Return value
588 ///
589 /// The function returns:
590 ///
591 /// * `Poll::Pending` if the socket is not available to write
592 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
593 /// * `Poll::Ready(Err(e))` if an error is encountered.
594 ///
595 /// # Errors
596 ///
597 /// This function may encounter any standard I/O error except `WouldBlock`.
598 ///
599 /// [`connect`]: method@Self::connect
600 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
601 self.io
602 .registration()
603 .poll_write_io(cx, || self.io.send(buf))
604 }
605
606 /// Tries to send data on the socket to the remote address to which it is
607 /// connected.
608 ///
609 /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
610 /// returned. This function is usually paired with `writable()`.
611 ///
612 /// # Returns
613 ///
614 /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
615 /// sent. If the socket is not ready to send data,
616 /// `Err(ErrorKind::WouldBlock)` is returned.
617 ///
618 /// # Examples
619 ///
620 /// ```no_run
621 /// use tokio::net::UdpSocket;
622 /// use std::io;
623 ///
624 /// #[tokio::main]
625 /// async fn main() -> io::Result<()> {
626 /// // Bind a UDP socket
627 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
628 ///
629 /// // Connect to a peer
630 /// socket.connect("127.0.0.1:8081").await?;
631 ///
632 /// loop {
633 /// // Wait for the socket to be writable
634 /// socket.writable().await?;
635 ///
636 /// // Try to send data, this may still fail with `WouldBlock`
637 /// // if the readiness event is a false positive.
638 /// match socket.try_send(b"hello world") {
639 /// Ok(n) => {
640 /// break;
641 /// }
642 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
643 /// continue;
644 /// }
645 /// Err(e) => {
646 /// return Err(e);
647 /// }
648 /// }
649 /// }
650 ///
651 /// Ok(())
652 /// }
653 /// ```
654 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
655 self.io
656 .registration()
657 .try_io(Interest::WRITABLE, || self.io.send(buf))
658 }
659
660 /// Waits for the socket to become readable.
661 ///
662 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
663 /// paired with `try_recv()`.
664 ///
665 /// The function may complete without the socket being readable. This is a
666 /// false-positive and attempting a `try_recv()` will return with
667 /// `io::ErrorKind::WouldBlock`.
668 ///
669 /// # Cancel safety
670 ///
671 /// This method is cancel safe. Once a readiness event occurs, the method
672 /// will continue to return immediately until the readiness event is
673 /// consumed by an attempt to read that fails with `WouldBlock` or
674 /// `Poll::Pending`.
675 ///
676 /// # Examples
677 ///
678 /// ```no_run
679 /// use tokio::net::UdpSocket;
680 /// use std::io;
681 ///
682 /// #[tokio::main]
683 /// async fn main() -> io::Result<()> {
684 /// // Connect to a peer
685 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
686 /// socket.connect("127.0.0.1:8081").await?;
687 ///
688 /// loop {
689 /// // Wait for the socket to be readable
690 /// socket.readable().await?;
691 ///
692 /// // The buffer is **not** included in the async task and will
693 /// // only exist on the stack.
694 /// let mut buf = [0; 1024];
695 ///
696 /// // Try to recv data, this may still fail with `WouldBlock`
697 /// // if the readiness event is a false positive.
698 /// match socket.try_recv(&mut buf) {
699 /// Ok(n) => {
700 /// println!("GOT {:?}", &buf[..n]);
701 /// break;
702 /// }
703 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
704 /// continue;
705 /// }
706 /// Err(e) => {
707 /// return Err(e);
708 /// }
709 /// }
710 /// }
711 ///
712 /// Ok(())
713 /// }
714 /// ```
715 pub async fn readable(&self) -> io::Result<()> {
716 self.ready(Interest::READABLE).await?;
717 Ok(())
718 }
719
720 /// Polls for read/receive readiness.
721 ///
722 /// If the udp stream is not currently ready for receiving, this method will
723 /// store a clone of the `Waker` from the provided `Context`. When the udp
724 /// socket becomes ready for reading, `Waker::wake` will be called on the
725 /// waker.
726 ///
727 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
728 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
729 /// recent call is scheduled to receive a wakeup. (However,
730 /// `poll_send_ready` retains a second, independent waker.)
731 ///
732 /// This function is intended for cases where creating and pinning a future
733 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
734 /// preferred, as this supports polling from multiple tasks at once.
735 ///
736 /// # Return value
737 ///
738 /// The function returns:
739 ///
740 /// * `Poll::Pending` if the udp stream is not ready for reading.
741 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
742 /// * `Poll::Ready(Err(e))` if an error is encountered.
743 ///
744 /// # Errors
745 ///
746 /// This function may encounter any standard I/O error except `WouldBlock`.
747 ///
748 /// [`readable`]: method@Self::readable
749 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
750 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
751 }
752
753 /// Receives a single datagram message on the socket from the remote address
754 /// to which it is connected. On success, returns the number of bytes read.
755 ///
756 /// The function must be called with valid byte array `buf` of sufficient
757 /// size to hold the message bytes. If a message is too long to fit in the
758 /// supplied buffer, excess bytes may be discarded.
759 ///
760 /// The [`connect`] method will connect this socket to a remote address.
761 /// This method will fail if the socket is not connected.
762 ///
763 /// # Cancel safety
764 ///
765 /// This method is cancel safe. If `recv` is used as the event in a
766 /// [`tokio::select!`](crate::select) statement and some other branch
767 /// completes first, it is guaranteed that no messages were received on this
768 /// socket.
769 ///
770 /// [`connect`]: method@Self::connect
771 ///
772 /// ```no_run
773 /// use tokio::net::UdpSocket;
774 /// use std::io;
775 ///
776 /// #[tokio::main]
777 /// async fn main() -> io::Result<()> {
778 /// // Bind socket
779 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
780 /// socket.connect("127.0.0.1:8081").await?;
781 ///
782 /// let mut buf = vec![0; 10];
783 /// let n = socket.recv(&mut buf).await?;
784 ///
785 /// println!("received {} bytes {:?}", n, &buf[..n]);
786 ///
787 /// Ok(())
788 /// }
789 /// ```
790 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
791 self.io
792 .registration()
793 .async_io(Interest::READABLE | Interest::ERROR, || self.io.recv(buf))
794 .await
795 }
796
797 /// Attempts to receive a single datagram message on the socket from the remote
798 /// address to which it is `connect`ed.
799 ///
800 /// The [`connect`] method will connect this socket to a remote address. This method
801 /// resolves to an error if the socket is not connected.
802 ///
803 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
804 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
805 /// receive a wakeup.
806 ///
807 /// # Return value
808 ///
809 /// The function returns:
810 ///
811 /// * `Poll::Pending` if the socket is not ready to read
812 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
813 /// * `Poll::Ready(Err(e))` if an error is encountered.
814 ///
815 /// # Errors
816 ///
817 /// This function may encounter any standard I/O error except `WouldBlock`.
818 ///
819 /// [`connect`]: method@Self::connect
820 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
821 #[allow(clippy::blocks_in_conditions)]
822 let n = ready!(self.io.registration().poll_read_io(cx, || {
823 // Safety: will not read the maybe uninitialized bytes.
824 let b = unsafe {
825 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
826 };
827
828 self.io.recv(b)
829 }))?;
830
831 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
832 unsafe {
833 buf.assume_init(n);
834 }
835 buf.advance(n);
836 Poll::Ready(Ok(()))
837 }
838
839 /// Tries to receive a single datagram message on the socket from the remote
840 /// address to which it is connected. On success, returns the number of
841 /// bytes read.
842 ///
843 /// This method must be called with valid byte array `buf` of sufficient size
844 /// to hold the message bytes. If a message is too long to fit in the
845 /// supplied buffer, excess bytes may be discarded.
846 ///
847 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
848 /// returned. This function is usually paired with `readable()`.
849 ///
850 /// # Examples
851 ///
852 /// ```no_run
853 /// use tokio::net::UdpSocket;
854 /// use std::io;
855 ///
856 /// #[tokio::main]
857 /// async fn main() -> io::Result<()> {
858 /// // Connect to a peer
859 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
860 /// socket.connect("127.0.0.1:8081").await?;
861 ///
862 /// loop {
863 /// // Wait for the socket to be readable
864 /// socket.readable().await?;
865 ///
866 /// // The buffer is **not** included in the async task and will
867 /// // only exist on the stack.
868 /// let mut buf = [0; 1024];
869 ///
870 /// // Try to recv data, this may still fail with `WouldBlock`
871 /// // if the readiness event is a false positive.
872 /// match socket.try_recv(&mut buf) {
873 /// Ok(n) => {
874 /// println!("GOT {:?}", &buf[..n]);
875 /// break;
876 /// }
877 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
878 /// continue;
879 /// }
880 /// Err(e) => {
881 /// return Err(e);
882 /// }
883 /// }
884 /// }
885 ///
886 /// Ok(())
887 /// }
888 /// ```
889 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
890 self.io
891 .registration()
892 .try_io(Interest::READABLE, || self.io.recv(buf))
893 }
894
895 cfg_io_util! {
896 /// Tries to receive data from the stream into the provided buffer, advancing the
897 /// buffer's internal cursor, returning how many bytes were read.
898 ///
899 /// This method must be called with valid byte array `buf` of sufficient size
900 /// to hold the message bytes. If a message is too long to fit in the
901 /// supplied buffer, excess bytes may be discarded.
902 ///
903 /// This method can be used even if `buf` is uninitialized.
904 ///
905 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
906 /// returned. This function is usually paired with `readable()`.
907 ///
908 /// # Examples
909 ///
910 /// ```no_run
911 /// use tokio::net::UdpSocket;
912 /// use std::io;
913 ///
914 /// #[tokio::main]
915 /// async fn main() -> io::Result<()> {
916 /// // Connect to a peer
917 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
918 /// socket.connect("127.0.0.1:8081").await?;
919 ///
920 /// loop {
921 /// // Wait for the socket to be readable
922 /// socket.readable().await?;
923 ///
924 /// let mut buf = Vec::with_capacity(1024);
925 ///
926 /// // Try to recv data, this may still fail with `WouldBlock`
927 /// // if the readiness event is a false positive.
928 /// match socket.try_recv_buf(&mut buf) {
929 /// Ok(n) => {
930 /// println!("GOT {:?}", &buf[..n]);
931 /// break;
932 /// }
933 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
934 /// continue;
935 /// }
936 /// Err(e) => {
937 /// return Err(e);
938 /// }
939 /// }
940 /// }
941 ///
942 /// Ok(())
943 /// }
944 /// ```
945 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
946 self.io.registration().try_io(Interest::READABLE, || {
947 let dst = buf.chunk_mut();
948 let dst =
949 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
950
951 let n = (*self.io).recv(dst)?;
952
953 // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
954 // buffer.
955 unsafe {
956 buf.advance_mut(n);
957 }
958
959 Ok(n)
960 })
961 }
962
963 /// Receives a single datagram message on the socket from the remote address
964 /// to which it is connected, advancing the buffer's internal cursor,
965 /// returning how many bytes were read.
966 ///
967 /// This method must be called with valid byte array `buf` of sufficient size
968 /// to hold the message bytes. If a message is too long to fit in the
969 /// supplied buffer, excess bytes may be discarded.
970 ///
971 /// This method can be used even if `buf` is uninitialized.
972 ///
973 /// # Examples
974 ///
975 /// ```no_run
976 /// use tokio::net::UdpSocket;
977 /// use std::io;
978 ///
979 /// #[tokio::main]
980 /// async fn main() -> io::Result<()> {
981 /// // Connect to a peer
982 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
983 /// socket.connect("127.0.0.1:8081").await?;
984 ///
985 /// let mut buf = Vec::with_capacity(512);
986 /// let len = socket.recv_buf(&mut buf).await?;
987 ///
988 /// println!("received {} bytes {:?}", len, &buf[..len]);
989 ///
990 /// Ok(())
991 /// }
992 /// ```
993 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
994 self.io
995 .registration()
996 .async_io(Interest::READABLE | Interest::ERROR, || {
997 let dst = buf.chunk_mut();
998 let dst =
999 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1000
1001 let n = (*self.io).recv(dst)?;
1002
1003 // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
1004 // buffer.
1005 unsafe {
1006 buf.advance_mut(n);
1007 }
1008
1009 Ok(n)
1010 })
1011 .await
1012 }
1013
1014 /// Tries to receive a single datagram message on the socket. On success,
1015 /// returns the number of bytes read and the origin.
1016 ///
1017 /// This method must be called with valid byte array `buf` of sufficient size
1018 /// to hold the message bytes. If a message is too long to fit in the
1019 /// supplied buffer, excess bytes may be discarded.
1020 ///
1021 /// This method can be used even if `buf` is uninitialized.
1022 ///
1023 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1024 /// returned. This function is usually paired with `readable()`.
1025 ///
1026 /// # Notes
1027 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1028 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1029 /// Because UDP is stateless and does not validate the origin of a packet,
1030 /// the attacker does not need to be able to intercept traffic in order to interfere.
1031 /// It is important to be aware of this when designing your application-level protocol.
1032 ///
1033 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```no_run
1038 /// use tokio::net::UdpSocket;
1039 /// use std::io;
1040 ///
1041 /// #[tokio::main]
1042 /// async fn main() -> io::Result<()> {
1043 /// // Connect to a peer
1044 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1045 ///
1046 /// loop {
1047 /// // Wait for the socket to be readable
1048 /// socket.readable().await?;
1049 ///
1050 /// let mut buf = Vec::with_capacity(1024);
1051 ///
1052 /// // Try to recv data, this may still fail with `WouldBlock`
1053 /// // if the readiness event is a false positive.
1054 /// match socket.try_recv_buf_from(&mut buf) {
1055 /// Ok((n, _addr)) => {
1056 /// println!("GOT {:?}", &buf[..n]);
1057 /// break;
1058 /// }
1059 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1060 /// continue;
1061 /// }
1062 /// Err(e) => {
1063 /// return Err(e);
1064 /// }
1065 /// }
1066 /// }
1067 ///
1068 /// Ok(())
1069 /// }
1070 /// ```
1071 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
1072 self.io.registration().try_io(Interest::READABLE, || {
1073 let dst = buf.chunk_mut();
1074 let dst =
1075 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1076
1077 let (n, addr) = (*self.io).recv_from(dst)?;
1078
1079 // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
1080 // buffer.
1081 unsafe {
1082 buf.advance_mut(n);
1083 }
1084
1085 Ok((n, addr))
1086 })
1087 }
1088
1089 /// Receives a single datagram message on the socket, advancing the
1090 /// buffer's internal cursor, returning how many bytes were read and the origin.
1091 ///
1092 /// This method must be called with valid byte array `buf` of sufficient size
1093 /// to hold the message bytes. If a message is too long to fit in the
1094 /// supplied buffer, excess bytes may be discarded.
1095 ///
1096 /// This method can be used even if `buf` is uninitialized.
1097 ///
1098 /// # Notes
1099 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1100 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1101 /// Because UDP is stateless and does not validate the origin of a packet,
1102 /// the attacker does not need to be able to intercept traffic in order to interfere.
1103 /// It is important to be aware of this when designing your application-level protocol.
1104 ///
1105 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1106 ///
1107 /// # Examples
1108 ///
1109 /// ```no_run
1110 /// use tokio::net::UdpSocket;
1111 /// use std::io;
1112 ///
1113 /// #[tokio::main]
1114 /// async fn main() -> io::Result<()> {
1115 /// // Connect to a peer
1116 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1117 /// socket.connect("127.0.0.1:8081").await?;
1118 ///
1119 /// let mut buf = Vec::with_capacity(512);
1120 /// let (len, addr) = socket.recv_buf_from(&mut buf).await?;
1121 ///
1122 /// println!("received {:?} bytes from {:?}", len, addr);
1123 ///
1124 /// Ok(())
1125 /// }
1126 /// ```
1127 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
1128 self.io
1129 .registration()
1130 .async_io(Interest::READABLE | Interest::ERROR, || {
1131 let dst = buf.chunk_mut();
1132 let dst =
1133 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1134
1135 let (n, addr) = (*self.io).recv_from(dst)?;
1136
1137 // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
1138 // buffer.
1139 unsafe {
1140 buf.advance_mut(n);
1141 }
1142
1143 Ok((n, addr))
1144 })
1145 .await
1146 }
1147 }
1148
1149 /// Sends data on the socket to the given address. On success, returns the
1150 /// number of bytes written.
1151 ///
1152 /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its
1153 /// documentation for concrete examples.
1154 ///
1155 /// It is possible for `addr` to yield multiple addresses, but `send_to`
1156 /// will only send data to the first address yielded by `addr`.
1157 ///
1158 /// This will return an error when the IP version of the local socket does
1159 /// not match that returned from [`ToSocketAddrs`].
1160 ///
1161 /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs
1162 ///
1163 /// # Cancel safety
1164 ///
1165 /// This method is cancel safe. If `send_to` is used as the event in a
1166 /// [`tokio::select!`](crate::select) statement and some other branch
1167 /// completes first, then it is guaranteed that the message was not sent.
1168 ///
1169 /// # Example
1170 ///
1171 /// ```no_run
1172 /// use tokio::net::UdpSocket;
1173 /// use std::io;
1174 ///
1175 /// #[tokio::main]
1176 /// async fn main() -> io::Result<()> {
1177 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1178 /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?;
1179 ///
1180 /// println!("Sent {} bytes", len);
1181 ///
1182 /// Ok(())
1183 /// }
1184 /// ```
1185 pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1186 let mut addrs = to_socket_addrs(addr).await?;
1187
1188 match addrs.next() {
1189 Some(target) => self.send_to_addr(buf, target).await,
1190 None => Err(io::Error::new(
1191 io::ErrorKind::InvalidInput,
1192 "no addresses to send data to",
1193 )),
1194 }
1195 }
1196
1197 /// Attempts to send data on the socket to a given address.
1198 ///
1199 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1200 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1201 /// receive a wakeup.
1202 ///
1203 /// # Return value
1204 ///
1205 /// The function returns:
1206 ///
1207 /// * `Poll::Pending` if the socket is not ready to write
1208 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1209 /// * `Poll::Ready(Err(e))` if an error is encountered.
1210 ///
1211 /// # Errors
1212 ///
1213 /// This function may encounter any standard I/O error except `WouldBlock`.
1214 pub fn poll_send_to(
1215 &self,
1216 cx: &mut Context<'_>,
1217 buf: &[u8],
1218 target: SocketAddr,
1219 ) -> Poll<io::Result<usize>> {
1220 self.io
1221 .registration()
1222 .poll_write_io(cx, || self.io.send_to(buf, target))
1223 }
1224
1225 /// Tries to send data on the socket to the given address, but if the send is
1226 /// blocked this will return right away.
1227 ///
1228 /// This function is usually paired with `writable()`.
1229 ///
1230 /// # Returns
1231 ///
1232 /// If successful, returns the number of bytes sent
1233 ///
1234 /// Users should ensure that when the remote cannot receive, the
1235 /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
1236 /// if the IP version of the socket does not match that of `target`.
1237 ///
1238 /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
1239 ///
1240 /// # Example
1241 ///
1242 /// ```no_run
1243 /// use tokio::net::UdpSocket;
1244 /// use std::error::Error;
1245 /// use std::io;
1246 ///
1247 /// #[tokio::main]
1248 /// async fn main() -> Result<(), Box<dyn Error>> {
1249 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1250 ///
1251 /// let dst = "127.0.0.1:8081".parse()?;
1252 ///
1253 /// loop {
1254 /// socket.writable().await?;
1255 ///
1256 /// match socket.try_send_to(&b"hello world"[..], dst) {
1257 /// Ok(sent) => {
1258 /// println!("sent {} bytes", sent);
1259 /// break;
1260 /// }
1261 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1262 /// // Writable false positive.
1263 /// continue;
1264 /// }
1265 /// Err(e) => return Err(e.into()),
1266 /// }
1267 /// }
1268 ///
1269 /// Ok(())
1270 /// }
1271 /// ```
1272 pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
1273 self.io
1274 .registration()
1275 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
1276 }
1277
1278 async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
1279 self.io
1280 .registration()
1281 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target))
1282 .await
1283 }
1284
1285 /// Receives a single datagram message on the socket. On success, returns
1286 /// the number of bytes read and the origin.
1287 ///
1288 /// The function must be called with valid byte array `buf` of sufficient
1289 /// size to hold the message bytes. If a message is too long to fit in the
1290 /// supplied buffer, excess bytes may be discarded.
1291 ///
1292 /// # Cancel safety
1293 ///
1294 /// This method is cancel safe. If `recv_from` is used as the event in a
1295 /// [`tokio::select!`](crate::select) statement and some other branch
1296 /// completes first, it is guaranteed that no messages were received on this
1297 /// socket.
1298 ///
1299 /// # Example
1300 ///
1301 /// ```no_run
1302 /// use tokio::net::UdpSocket;
1303 /// use std::io;
1304 ///
1305 /// #[tokio::main]
1306 /// async fn main() -> io::Result<()> {
1307 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1308 ///
1309 /// let mut buf = vec![0u8; 32];
1310 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1311 ///
1312 /// println!("received {:?} bytes from {:?}", len, addr);
1313 ///
1314 /// Ok(())
1315 /// }
1316 /// ```
1317 ///
1318 /// # Notes
1319 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1320 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1321 /// Because UDP is stateless and does not validate the origin of a packet,
1322 /// the attacker does not need to be able to intercept traffic in order to interfere.
1323 /// It is important to be aware of this when designing your application-level protocol.
1324 ///
1325 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1326 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1327 self.io
1328 .registration()
1329 .async_io(Interest::READABLE | Interest::ERROR, || {
1330 self.io.recv_from(buf)
1331 })
1332 .await
1333 }
1334
1335 /// Attempts to receive a single datagram on the socket.
1336 ///
1337 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1338 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1339 /// receive a wakeup.
1340 ///
1341 /// # Return value
1342 ///
1343 /// The function returns:
1344 ///
1345 /// * `Poll::Pending` if the socket is not ready to read
1346 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1347 /// * `Poll::Ready(Err(e))` if an error is encountered.
1348 ///
1349 /// # Errors
1350 ///
1351 /// This function may encounter any standard I/O error except `WouldBlock`.
1352 ///
1353 /// # Notes
1354 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1355 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1356 /// Because UDP is stateless and does not validate the origin of a packet,
1357 /// the attacker does not need to be able to intercept traffic in order to interfere.
1358 /// It is important to be aware of this when designing your application-level protocol.
1359 ///
1360 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1361 pub fn poll_recv_from(
1362 &self,
1363 cx: &mut Context<'_>,
1364 buf: &mut ReadBuf<'_>,
1365 ) -> Poll<io::Result<SocketAddr>> {
1366 #[allow(clippy::blocks_in_conditions)]
1367 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1368 // Safety: will not read the maybe uninitialized bytes.
1369 let b = unsafe {
1370 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1371 };
1372
1373 self.io.recv_from(b)
1374 }))?;
1375
1376 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1377 unsafe {
1378 buf.assume_init(n);
1379 }
1380 buf.advance(n);
1381 Poll::Ready(Ok(addr))
1382 }
1383
1384 /// Tries to receive a single datagram message on the socket. On success,
1385 /// returns the number of bytes read and the origin.
1386 ///
1387 /// This method must be called with valid byte array `buf` of sufficient size
1388 /// to hold the message bytes. If a message is too long to fit in the
1389 /// supplied buffer, excess bytes may be discarded.
1390 ///
1391 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1392 /// returned. This function is usually paired with `readable()`.
1393 ///
1394 /// # Notes
1395 ///
1396 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1397 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1398 /// Because UDP is stateless and does not validate the origin of a packet,
1399 /// the attacker does not need to be able to intercept traffic in order to interfere.
1400 /// It is important to be aware of this when designing your application-level protocol.
1401 ///
1402 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1403 ///
1404 /// # Examples
1405 ///
1406 /// ```no_run
1407 /// use tokio::net::UdpSocket;
1408 /// use std::io;
1409 ///
1410 /// #[tokio::main]
1411 /// async fn main() -> io::Result<()> {
1412 /// // Connect to a peer
1413 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1414 ///
1415 /// loop {
1416 /// // Wait for the socket to be readable
1417 /// socket.readable().await?;
1418 ///
1419 /// // The buffer is **not** included in the async task and will
1420 /// // only exist on the stack.
1421 /// let mut buf = [0; 1024];
1422 ///
1423 /// // Try to recv data, this may still fail with `WouldBlock`
1424 /// // if the readiness event is a false positive.
1425 /// match socket.try_recv_from(&mut buf) {
1426 /// Ok((n, _addr)) => {
1427 /// println!("GOT {:?}", &buf[..n]);
1428 /// break;
1429 /// }
1430 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1431 /// continue;
1432 /// }
1433 /// Err(e) => {
1434 /// return Err(e);
1435 /// }
1436 /// }
1437 /// }
1438 ///
1439 /// Ok(())
1440 /// }
1441 /// ```
1442 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1443 self.io
1444 .registration()
1445 .try_io(Interest::READABLE, || self.io.recv_from(buf))
1446 }
1447
1448 /// Tries to read or write from the socket using a user-provided IO operation.
1449 ///
1450 /// If the socket is ready, the provided closure is called. The closure
1451 /// should attempt to perform IO operation on the socket by manually
1452 /// calling the appropriate syscall. If the operation fails because the
1453 /// socket is not actually ready, then the closure should return a
1454 /// `WouldBlock` error and the readiness flag is cleared. The return value
1455 /// of the closure is then returned by `try_io`.
1456 ///
1457 /// If the socket is not ready, then the closure is not called
1458 /// and a `WouldBlock` error is returned.
1459 ///
1460 /// The closure should only return a `WouldBlock` error if it has performed
1461 /// an IO operation on the socket that failed due to the socket not being
1462 /// ready. Returning a `WouldBlock` error in any other situation will
1463 /// incorrectly clear the readiness flag, which can cause the socket to
1464 /// behave incorrectly.
1465 ///
1466 /// The closure should not perform the IO operation using any of the methods
1467 /// defined on the Tokio `UdpSocket` type, as this will mess with the
1468 /// readiness flag and can cause the socket to behave incorrectly.
1469 ///
1470 /// This method is not intended to be used with combined interests.
1471 /// The closure should perform only one type of IO operation, so it should not
1472 /// require more than one ready state. This method may panic or sleep forever
1473 /// if it is called with a combined interest.
1474 ///
1475 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1476 ///
1477 /// [`readable()`]: UdpSocket::readable()
1478 /// [`writable()`]: UdpSocket::writable()
1479 /// [`ready()`]: UdpSocket::ready()
1480 pub fn try_io<R>(
1481 &self,
1482 interest: Interest,
1483 f: impl FnOnce() -> io::Result<R>,
1484 ) -> io::Result<R> {
1485 self.io
1486 .registration()
1487 .try_io(interest, || self.io.try_io(f))
1488 }
1489
1490 /// Reads or writes from the socket using a user-provided IO operation.
1491 ///
1492 /// The readiness of the socket is awaited and when the socket is ready,
1493 /// the provided closure is called. The closure should attempt to perform
1494 /// IO operation on the socket by manually calling the appropriate syscall.
1495 /// If the operation fails because the socket is not actually ready,
1496 /// then the closure should return a `WouldBlock` error. In such case the
1497 /// readiness flag is cleared and the socket readiness is awaited again.
1498 /// This loop is repeated until the closure returns an `Ok` or an error
1499 /// other than `WouldBlock`.
1500 ///
1501 /// The closure should only return a `WouldBlock` error if it has performed
1502 /// an IO operation on the socket that failed due to the socket not being
1503 /// ready. Returning a `WouldBlock` error in any other situation will
1504 /// incorrectly clear the readiness flag, which can cause the socket to
1505 /// behave incorrectly.
1506 ///
1507 /// The closure should not perform the IO operation using any of the methods
1508 /// defined on the Tokio `UdpSocket` type, as this will mess with the
1509 /// readiness flag and can cause the socket to behave incorrectly.
1510 ///
1511 /// This method is not intended to be used with combined interests.
1512 /// The closure should perform only one type of IO operation, so it should not
1513 /// require more than one ready state. This method may panic or sleep forever
1514 /// if it is called with a combined interest.
1515 pub async fn async_io<R>(
1516 &self,
1517 interest: Interest,
1518 mut f: impl FnMut() -> io::Result<R>,
1519 ) -> io::Result<R> {
1520 self.io
1521 .registration()
1522 .async_io(interest, || self.io.try_io(&mut f))
1523 .await
1524 }
1525
1526 /// Receives a single datagram from the connected address without removing it from the queue.
1527 /// On success, returns the number of bytes read from whence the data came.
1528 ///
1529 /// # Notes
1530 ///
1531 /// On Windows, if the data is larger than the buffer specified, the buffer
1532 /// is filled with the first part of the data, and peek returns the error
1533 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1534 /// Make sure to always use a sufficiently large buffer to hold the
1535 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1536 ///
1537 /// MacOS will return an error if you pass a zero-sized buffer.
1538 ///
1539 /// If you're merely interested in learning the sender of the data at the head of the queue,
1540 /// try [`peek_sender`].
1541 ///
1542 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1543 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1544 /// Because UDP is stateless and does not validate the origin of a packet,
1545 /// the attacker does not need to be able to intercept traffic in order to interfere.
1546 /// It is important to be aware of this when designing your application-level protocol.
1547 ///
1548 /// # Examples
1549 ///
1550 /// ```no_run
1551 /// use tokio::net::UdpSocket;
1552 /// use std::io;
1553 ///
1554 /// #[tokio::main]
1555 /// async fn main() -> io::Result<()> {
1556 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1557 ///
1558 /// let mut buf = vec![0u8; 32];
1559 /// let len = socket.peek(&mut buf).await?;
1560 ///
1561 /// println!("peeked {:?} bytes", len);
1562 ///
1563 /// Ok(())
1564 /// }
1565 /// ```
1566 ///
1567 /// [`peek_sender`]: method@Self::peek_sender
1568 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1569 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1570 self.io
1571 .registration()
1572 .async_io(Interest::READABLE | Interest::ERROR, || self.io.peek(buf))
1573 .await
1574 }
1575
1576 /// Receives data from the connected address, without removing it from the input queue.
1577 ///
1578 /// # Notes
1579 ///
1580 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1581 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1582 /// receive a wakeup
1583 ///
1584 /// On Windows, if the data is larger than the buffer specified, the buffer
1585 /// is filled with the first part of the data, and peek returns the error
1586 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1587 /// Make sure to always use a sufficiently large buffer to hold the
1588 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1589 ///
1590 /// MacOS will return an error if you pass a zero-sized buffer.
1591 ///
1592 /// If you're merely interested in learning the sender of the data at the head of the queue,
1593 /// try [`poll_peek_sender`].
1594 ///
1595 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1596 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1597 /// Because UDP is stateless and does not validate the origin of a packet,
1598 /// the attacker does not need to be able to intercept traffic in order to interfere.
1599 /// It is important to be aware of this when designing your application-level protocol.
1600 ///
1601 /// # Return value
1602 ///
1603 /// The function returns:
1604 ///
1605 /// * `Poll::Pending` if the socket is not ready to read
1606 /// * `Poll::Ready(Ok(()))` reads data into `ReadBuf` if the socket is ready
1607 /// * `Poll::Ready(Err(e))` if an error is encountered.
1608 ///
1609 /// # Errors
1610 ///
1611 /// This function may encounter any standard I/O error except `WouldBlock`.
1612 ///
1613 /// [`poll_peek_sender`]: method@Self::poll_peek_sender
1614 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1615 pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1616 #[allow(clippy::blocks_in_conditions)]
1617 let n = ready!(self.io.registration().poll_read_io(cx, || {
1618 // Safety: will not read the maybe uninitialized bytes.
1619 let b = unsafe {
1620 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1621 };
1622
1623 self.io.peek(b)
1624 }))?;
1625
1626 // Safety: We trust `peek` to have filled up `n` bytes in the buffer.
1627 unsafe {
1628 buf.assume_init(n);
1629 }
1630 buf.advance(n);
1631 Poll::Ready(Ok(()))
1632 }
1633
1634 /// Tries to receive data on the connected address without removing it from the input queue.
1635 /// On success, returns the number of bytes read.
1636 ///
1637 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1638 /// returned. This function is usually paired with `readable()`.
1639 ///
1640 /// # Notes
1641 ///
1642 /// On Windows, if the data is larger than the buffer specified, the buffer
1643 /// is filled with the first part of the data, and peek returns the error
1644 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1645 /// Make sure to always use a sufficiently large buffer to hold the
1646 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1647 ///
1648 /// MacOS will return an error if you pass a zero-sized buffer.
1649 ///
1650 /// If you're merely interested in learning the sender of the data at the head of the queue,
1651 /// try [`try_peek_sender`].
1652 ///
1653 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1654 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1655 /// Because UDP is stateless and does not validate the origin of a packet,
1656 /// the attacker does not need to be able to intercept traffic in order to interfere.
1657 /// It is important to be aware of this when designing your application-level protocol.
1658 ///
1659 /// [`try_peek_sender`]: method@Self::try_peek_sender
1660 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1661 pub fn try_peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1662 self.io
1663 .registration()
1664 .try_io(Interest::READABLE, || self.io.peek(buf))
1665 }
1666
1667 /// Receives data from the socket, without removing it from the input queue.
1668 /// On success, returns the number of bytes read and the address from whence
1669 /// the data came.
1670 ///
1671 /// # Notes
1672 ///
1673 /// On Windows, if the data is larger than the buffer specified, the buffer
1674 /// is filled with the first part of the data, and `peek_from` returns the error
1675 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1676 /// Make sure to always use a sufficiently large buffer to hold the
1677 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1678 ///
1679 /// MacOS will return an error if you pass a zero-sized buffer.
1680 ///
1681 /// If you're merely interested in learning the sender of the data at the head of the queue,
1682 /// try [`peek_sender`].
1683 ///
1684 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1685 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1686 /// Because UDP is stateless and does not validate the origin of a packet,
1687 /// the attacker does not need to be able to intercept traffic in order to interfere.
1688 /// It is important to be aware of this when designing your application-level protocol.
1689 ///
1690 /// # Examples
1691 ///
1692 /// ```no_run
1693 /// use tokio::net::UdpSocket;
1694 /// use std::io;
1695 ///
1696 /// #[tokio::main]
1697 /// async fn main() -> io::Result<()> {
1698 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1699 ///
1700 /// let mut buf = vec![0u8; 32];
1701 /// let (len, addr) = socket.peek_from(&mut buf).await?;
1702 ///
1703 /// println!("peeked {:?} bytes from {:?}", len, addr);
1704 ///
1705 /// Ok(())
1706 /// }
1707 /// ```
1708 ///
1709 /// [`peek_sender`]: method@Self::peek_sender
1710 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1711 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1712 self.io
1713 .registration()
1714 .async_io(Interest::READABLE | Interest::ERROR, || {
1715 self.io.peek_from(buf)
1716 })
1717 .await
1718 }
1719
1720 /// Receives data from the socket, without removing it from the input queue.
1721 /// On success, returns the sending address of the datagram.
1722 ///
1723 /// # Notes
1724 ///
1725 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1726 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1727 /// receive a wakeup
1728 ///
1729 /// On Windows, if the data is larger than the buffer specified, the buffer
1730 /// is filled with the first part of the data, and peek returns the error
1731 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1732 /// Make sure to always use a sufficiently large buffer to hold the
1733 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1734 ///
1735 /// MacOS will return an error if you pass a zero-sized buffer.
1736 ///
1737 /// If you're merely interested in learning the sender of the data at the head of the queue,
1738 /// try [`poll_peek_sender`].
1739 ///
1740 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1741 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1742 /// Because UDP is stateless and does not validate the origin of a packet,
1743 /// the attacker does not need to be able to intercept traffic in order to interfere.
1744 /// It is important to be aware of this when designing your application-level protocol.
1745 ///
1746 /// # Return value
1747 ///
1748 /// The function returns:
1749 ///
1750 /// * `Poll::Pending` if the socket is not ready to read
1751 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1752 /// * `Poll::Ready(Err(e))` if an error is encountered.
1753 ///
1754 /// # Errors
1755 ///
1756 /// This function may encounter any standard I/O error except `WouldBlock`.
1757 ///
1758 /// [`poll_peek_sender`]: method@Self::poll_peek_sender
1759 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1760 pub fn poll_peek_from(
1761 &self,
1762 cx: &mut Context<'_>,
1763 buf: &mut ReadBuf<'_>,
1764 ) -> Poll<io::Result<SocketAddr>> {
1765 #[allow(clippy::blocks_in_conditions)]
1766 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1767 // Safety: will not read the maybe uninitialized bytes.
1768 let b = unsafe {
1769 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1770 };
1771
1772 self.io.peek_from(b)
1773 }))?;
1774
1775 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1776 unsafe {
1777 buf.assume_init(n);
1778 }
1779 buf.advance(n);
1780 Poll::Ready(Ok(addr))
1781 }
1782
1783 /// Tries to receive data on the socket without removing it from the input queue.
1784 /// On success, returns the number of bytes read and the sending address of the
1785 /// datagram.
1786 ///
1787 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1788 /// returned. This function is usually paired with `readable()`.
1789 ///
1790 /// # Notes
1791 ///
1792 /// On Windows, if the data is larger than the buffer specified, the buffer
1793 /// is filled with the first part of the data, and peek returns the error
1794 /// `WSAEMSGSIZE(10040)`. The excess data is lost.
1795 /// Make sure to always use a sufficiently large buffer to hold the
1796 /// maximum UDP packet size, which can be up to 65536 bytes in size.
1797 ///
1798 /// MacOS will return an error if you pass a zero-sized buffer.
1799 ///
1800 /// If you're merely interested in learning the sender of the data at the head of the queue,
1801 /// try [`try_peek_sender`].
1802 ///
1803 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1804 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1805 /// Because UDP is stateless and does not validate the origin of a packet,
1806 /// the attacker does not need to be able to intercept traffic in order to interfere.
1807 /// It is important to be aware of this when designing your application-level protocol.
1808 ///
1809 /// [`try_peek_sender`]: method@Self::try_peek_sender
1810 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1811 pub fn try_peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1812 self.io
1813 .registration()
1814 .try_io(Interest::READABLE, || self.io.peek_from(buf))
1815 }
1816
1817 /// Retrieve the sender of the data at the head of the input queue, waiting if empty.
1818 ///
1819 /// This is equivalent to calling [`peek_from`] with a zero-sized buffer,
1820 /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS.
1821 ///
1822 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1823 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1824 /// Because UDP is stateless and does not validate the origin of a packet,
1825 /// the attacker does not need to be able to intercept traffic in order to interfere.
1826 /// It is important to be aware of this when designing your application-level protocol.
1827 ///
1828 /// [`peek_from`]: method@Self::peek_from
1829 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1830 pub async fn peek_sender(&self) -> io::Result<SocketAddr> {
1831 self.io
1832 .registration()
1833 .async_io(Interest::READABLE | Interest::ERROR, || {
1834 self.peek_sender_inner()
1835 })
1836 .await
1837 }
1838
1839 /// Retrieve the sender of the data at the head of the input queue,
1840 /// scheduling a wakeup if empty.
1841 ///
1842 /// This is equivalent to calling [`poll_peek_from`] with a zero-sized buffer,
1843 /// but suppresses the `WSAEMSGSIZE` error on Windows and the "invalid argument" error on macOS.
1844 ///
1845 /// # Notes
1846 ///
1847 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1848 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1849 /// receive a wakeup.
1850 ///
1851 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1852 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1853 /// Because UDP is stateless and does not validate the origin of a packet,
1854 /// the attacker does not need to be able to intercept traffic in order to interfere.
1855 /// It is important to be aware of this when designing your application-level protocol.
1856 ///
1857 /// [`poll_peek_from`]: method@Self::poll_peek_from
1858 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1859 pub fn poll_peek_sender(&self, cx: &mut Context<'_>) -> Poll<io::Result<SocketAddr>> {
1860 self.io
1861 .registration()
1862 .poll_read_io(cx, || self.peek_sender_inner())
1863 }
1864
1865 /// Try to retrieve the sender of the data at the head of the input queue.
1866 ///
1867 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1868 /// returned. This function is usually paired with `readable()`.
1869 ///
1870 /// Note that the socket address **cannot** be implicitly trusted, because it is relatively
1871 /// trivial to send a UDP datagram with a spoofed origin in a [packet injection attack].
1872 /// Because UDP is stateless and does not validate the origin of a packet,
1873 /// the attacker does not need to be able to intercept traffic in order to interfere.
1874 /// It is important to be aware of this when designing your application-level protocol.
1875 ///
1876 /// [packet injection attack]: https://en.wikipedia.org/wiki/Packet_injection
1877 pub fn try_peek_sender(&self) -> io::Result<SocketAddr> {
1878 self.io
1879 .registration()
1880 .try_io(Interest::READABLE, || self.peek_sender_inner())
1881 }
1882
1883 #[inline]
1884 fn peek_sender_inner(&self) -> io::Result<SocketAddr> {
1885 self.io.try_io(|| {
1886 self.as_socket()
1887 .peek_sender()?
1888 // May be `None` if the platform doesn't populate the sender for some reason.
1889 // In testing, that only occurred on macOS if you pass a zero-sized buffer,
1890 // but the implementation of `Socket::peek_sender()` covers that.
1891 .as_socket()
1892 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "sender not available"))
1893 })
1894 }
1895
1896 /// Gets the value of the `SO_BROADCAST` option for this socket.
1897 ///
1898 /// For more information about this option, see [`set_broadcast`].
1899 ///
1900 /// [`set_broadcast`]: method@Self::set_broadcast
1901 pub fn broadcast(&self) -> io::Result<bool> {
1902 self.io.broadcast()
1903 }
1904
1905 /// Sets the value of the `SO_BROADCAST` option for this socket.
1906 ///
1907 /// When enabled, this socket is allowed to send packets to a broadcast
1908 /// address.
1909 pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
1910 self.io.set_broadcast(on)
1911 }
1912
1913 /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket.
1914 ///
1915 /// For more information about this option, see [`set_multicast_loop_v4`].
1916 ///
1917 /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4
1918 pub fn multicast_loop_v4(&self) -> io::Result<bool> {
1919 self.io.multicast_loop_v4()
1920 }
1921
1922 /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket.
1923 ///
1924 /// If enabled, multicast packets will be looped back to the local socket.
1925 ///
1926 /// # Note
1927 ///
1928 /// This may not have any effect on IPv6 sockets.
1929 pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
1930 self.io.set_multicast_loop_v4(on)
1931 }
1932
1933 /// Gets the value of the `IP_MULTICAST_TTL` option for this socket.
1934 ///
1935 /// For more information about this option, see [`set_multicast_ttl_v4`].
1936 ///
1937 /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4
1938 pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
1939 self.io.multicast_ttl_v4()
1940 }
1941
1942 /// Sets the value of the `IP_MULTICAST_TTL` option for this socket.
1943 ///
1944 /// Indicates the time-to-live value of outgoing multicast packets for
1945 /// this socket. The default value is 1 which means that multicast packets
1946 /// don't leave the local network unless explicitly requested.
1947 ///
1948 /// # Note
1949 ///
1950 /// This may not have any effect on IPv6 sockets.
1951 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
1952 self.io.set_multicast_ttl_v4(ttl)
1953 }
1954
1955 /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
1956 ///
1957 /// For more information about this option, see [`set_multicast_loop_v6`].
1958 ///
1959 /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6
1960 pub fn multicast_loop_v6(&self) -> io::Result<bool> {
1961 self.io.multicast_loop_v6()
1962 }
1963
1964 /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
1965 ///
1966 /// Controls whether this socket sees the multicast packets it sends itself.
1967 ///
1968 /// # Note
1969 ///
1970 /// This may not have any effect on IPv4 sockets.
1971 pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
1972 self.io.set_multicast_loop_v6(on)
1973 }
1974
1975 /// Gets the value of the `IPV6_TCLASS` option for this socket.
1976 ///
1977 /// For more information about this option, see [`set_tclass_v6`].
1978 ///
1979 /// [`set_tclass_v6`]: Self::set_tclass_v6
1980 // https://docs.rs/socket2/0.6.1/src/socket2/sys/unix.rs.html#2541
1981 #[cfg(any(
1982 target_os = "android",
1983 target_os = "dragonfly",
1984 target_os = "freebsd",
1985 target_os = "fuchsia",
1986 target_os = "linux",
1987 target_os = "macos",
1988 target_os = "netbsd",
1989 target_os = "openbsd",
1990 target_os = "cygwin",
1991 ))]
1992 #[cfg_attr(
1993 docsrs,
1994 doc(cfg(any(
1995 target_os = "android",
1996 target_os = "dragonfly",
1997 target_os = "freebsd",
1998 target_os = "fuchsia",
1999 target_os = "linux",
2000 target_os = "macos",
2001 target_os = "netbsd",
2002 target_os = "openbsd",
2003 target_os = "cygwin",
2004 )))
2005 )]
2006 pub fn tclass_v6(&self) -> io::Result<u32> {
2007 self.as_socket().tclass_v6()
2008 }
2009
2010 /// Sets the value for the `IPV6_TCLASS` option on this socket.
2011 ///
2012 /// Specifies the traffic class field that is used in every packet
2013 /// sent from this socket.
2014 ///
2015 /// # Note
2016 ///
2017 /// This may not have any effect on IPv4 sockets.
2018 // https://docs.rs/socket2/0.6.1/src/socket2/sys/unix.rs.html#2566
2019 #[cfg(any(
2020 target_os = "android",
2021 target_os = "dragonfly",
2022 target_os = "freebsd",
2023 target_os = "fuchsia",
2024 target_os = "linux",
2025 target_os = "macos",
2026 target_os = "netbsd",
2027 target_os = "openbsd",
2028 target_os = "cygwin",
2029 ))]
2030 #[cfg_attr(
2031 docsrs,
2032 doc(cfg(any(
2033 target_os = "android",
2034 target_os = "dragonfly",
2035 target_os = "freebsd",
2036 target_os = "fuchsia",
2037 target_os = "linux",
2038 target_os = "macos",
2039 target_os = "netbsd",
2040 target_os = "openbsd",
2041 target_os = "cygwin",
2042 )))
2043 )]
2044 pub fn set_tclass_v6(&self, tclass: u32) -> io::Result<()> {
2045 self.as_socket().set_tclass_v6(tclass)
2046 }
2047
2048 /// Gets the value of the `IP_TTL` option for this socket.
2049 ///
2050 /// For more information about this option, see [`set_ttl`].
2051 ///
2052 /// [`set_ttl`]: method@Self::set_ttl
2053 ///
2054 /// # Examples
2055 ///
2056 /// ```no_run
2057 /// use tokio::net::UdpSocket;
2058 /// # use std::io;
2059 ///
2060 /// # async fn dox() -> io::Result<()> {
2061 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
2062 ///
2063 /// println!("{:?}", sock.ttl()?);
2064 /// # Ok(())
2065 /// # }
2066 /// ```
2067 pub fn ttl(&self) -> io::Result<u32> {
2068 self.io.ttl()
2069 }
2070
2071 /// Sets the value for the `IP_TTL` option on this socket.
2072 ///
2073 /// This value sets the time-to-live field that is used in every packet sent
2074 /// from this socket.
2075 ///
2076 /// # Examples
2077 ///
2078 /// ```no_run
2079 /// use tokio::net::UdpSocket;
2080 /// # use std::io;
2081 ///
2082 /// # async fn dox() -> io::Result<()> {
2083 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
2084 /// sock.set_ttl(60)?;
2085 ///
2086 /// # Ok(())
2087 /// # }
2088 /// ```
2089 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
2090 self.io.set_ttl(ttl)
2091 }
2092
2093 /// Gets the value of the `IP_TOS` option for this socket.
2094 ///
2095 /// For more information about this option, see [`set_tos_v4`].
2096 ///
2097 /// [`set_tos_v4`]: Self::set_tos_v4
2098 // https://docs.rs/socket2/0.6.1/src/socket2/socket.rs.html#1585
2099 #[cfg(not(any(
2100 target_os = "fuchsia",
2101 target_os = "redox",
2102 target_os = "solaris",
2103 target_os = "illumos",
2104 target_os = "haiku",
2105 target_os = "wasi",
2106 )))]
2107 #[cfg_attr(
2108 docsrs,
2109 doc(cfg(not(any(
2110 target_os = "fuchsia",
2111 target_os = "redox",
2112 target_os = "solaris",
2113 target_os = "illumos",
2114 target_os = "haiku",
2115 target_os = "wasi",
2116 ))))
2117 )]
2118 pub fn tos_v4(&self) -> io::Result<u32> {
2119 self.as_socket().tos_v4()
2120 }
2121
2122 /// Deprecated. Use [`tos_v4()`] instead.
2123 ///
2124 /// [`tos_v4()`]: Self::tos_v4
2125 #[deprecated(
2126 note = "`tos` related methods have been renamed `tos_v4` since they are IPv4-specific."
2127 )]
2128 #[doc(hidden)]
2129 #[cfg(not(any(
2130 target_os = "fuchsia",
2131 target_os = "redox",
2132 target_os = "solaris",
2133 target_os = "illumos",
2134 target_os = "haiku",
2135 target_os = "wasi",
2136 )))]
2137 #[cfg_attr(
2138 docsrs,
2139 doc(cfg(not(any(
2140 target_os = "fuchsia",
2141 target_os = "redox",
2142 target_os = "solaris",
2143 target_os = "illumos",
2144 target_os = "haiku",
2145 target_os = "wasi",
2146 ))))
2147 )]
2148 pub fn tos(&self) -> io::Result<u32> {
2149 self.tos_v4()
2150 }
2151
2152 /// Sets the value for the `IP_TOS` option on this socket.
2153 ///
2154 /// This value sets the type-of-service field that is used in every packet
2155 /// sent from this socket.
2156 ///
2157 /// # Note
2158 ///
2159 /// - This may not have any effect on IPv6 sockets.
2160 /// - On Windows, `IP_TOS` is only supported on [Windows 8+ or
2161 /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
2162 // https://docs.rs/socket2/0.6.1/src/socket2/socket.rs.html#1566
2163 #[cfg(not(any(
2164 target_os = "fuchsia",
2165 target_os = "redox",
2166 target_os = "solaris",
2167 target_os = "illumos",
2168 target_os = "haiku",
2169 target_os = "wasi",
2170 )))]
2171 #[cfg_attr(
2172 docsrs,
2173 doc(cfg(not(any(
2174 target_os = "fuchsia",
2175 target_os = "redox",
2176 target_os = "solaris",
2177 target_os = "illumos",
2178 target_os = "haiku",
2179 target_os = "wasi",
2180 ))))
2181 )]
2182 pub fn set_tos_v4(&self, tos: u32) -> io::Result<()> {
2183 self.as_socket().set_tos_v4(tos)
2184 }
2185
2186 /// Deprecated. Use [`set_tos_v4()`] instead.
2187 ///
2188 /// [`set_tos_v4()`]: Self::set_tos_v4
2189 #[deprecated(
2190 note = "`tos` related methods have been renamed `tos_v4` since they are IPv4-specific."
2191 )]
2192 #[doc(hidden)]
2193 #[cfg(not(any(
2194 target_os = "fuchsia",
2195 target_os = "redox",
2196 target_os = "solaris",
2197 target_os = "illumos",
2198 target_os = "haiku",
2199 target_os = "wasi",
2200 )))]
2201 #[cfg_attr(
2202 docsrs,
2203 doc(cfg(not(any(
2204 target_os = "fuchsia",
2205 target_os = "redox",
2206 target_os = "solaris",
2207 target_os = "illumos",
2208 target_os = "haiku",
2209 target_os = "wasi",
2210 ))))
2211 )]
2212 pub fn set_tos(&self, tos: u32) -> io::Result<()> {
2213 self.set_tos_v4(tos)
2214 }
2215
2216 /// Gets the value for the `SO_BINDTODEVICE` option on this socket
2217 ///
2218 /// This value gets the socket-bound device's interface name.
2219 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux",))]
2220 #[cfg_attr(
2221 docsrs,
2222 doc(cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux",)))
2223 )]
2224 pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
2225 self.as_socket().device()
2226 }
2227
2228 /// Sets the value for the `SO_BINDTODEVICE` option on this socket
2229 ///
2230 /// If a socket is bound to an interface, only packets received from that
2231 /// particular interface are processed by the socket. Note that this only
2232 /// works for some socket types, particularly `AF_INET` sockets.
2233 ///
2234 /// If `interface` is `None` or an empty string it removes the binding.
2235 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
2236 #[cfg_attr(
2237 docsrs,
2238 doc(cfg(all(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))))
2239 )]
2240 pub fn bind_device(&self, interface: Option<&[u8]>) -> io::Result<()> {
2241 self.as_socket().bind_device(interface)
2242 }
2243
2244 /// Executes an operation of the `IP_ADD_MEMBERSHIP` type.
2245 ///
2246 /// This function specifies a new multicast group for this socket to join.
2247 /// The address must be a valid multicast address, and `interface` is the
2248 /// address of the local interface with which the system should join the
2249 /// multicast group. If it's equal to `INADDR_ANY` then an appropriate
2250 /// interface is chosen by the system.
2251 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
2252 self.io.join_multicast_v4(&multiaddr, &interface)
2253 }
2254
2255 /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type.
2256 ///
2257 /// This function specifies a new multicast group for this socket to join.
2258 /// The address must be a valid multicast address, and `interface` is the
2259 /// index of the interface to join/leave (or 0 to indicate any interface).
2260 pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
2261 self.io.join_multicast_v6(multiaddr, interface)
2262 }
2263
2264 /// Executes an operation of the `IP_DROP_MEMBERSHIP` type.
2265 ///
2266 /// For more information about this option, see [`join_multicast_v4`].
2267 ///
2268 /// [`join_multicast_v4`]: method@Self::join_multicast_v4
2269 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
2270 self.io.leave_multicast_v4(&multiaddr, &interface)
2271 }
2272
2273 /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type.
2274 ///
2275 /// For more information about this option, see [`join_multicast_v6`].
2276 ///
2277 /// [`join_multicast_v6`]: method@Self::join_multicast_v6
2278 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
2279 self.io.leave_multicast_v6(multiaddr, interface)
2280 }
2281
2282 /// Returns the value of the `SO_ERROR` option.
2283 ///
2284 /// # Examples
2285 /// ```
2286 /// use tokio::net::UdpSocket;
2287 /// use std::io;
2288 ///
2289 /// #[tokio::main]
2290 /// async fn main() -> io::Result<()> {
2291 /// # if cfg!(miri) { return Ok(()); } // No `socket` in miri.
2292 /// // Create a socket
2293 /// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
2294 ///
2295 /// if let Ok(Some(err)) = socket.take_error() {
2296 /// println!("Got error: {:?}", err);
2297 /// }
2298 ///
2299 /// Ok(())
2300 /// }
2301 /// ```
2302 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
2303 self.io.take_error()
2304 }
2305}
2306
2307impl TryFrom<std::net::UdpSocket> for UdpSocket {
2308 type Error = io::Error;
2309
2310 /// Consumes stream, returning the tokio I/O object.
2311 ///
2312 /// This is equivalent to
2313 /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std).
2314 fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> {
2315 Self::from_std(stream)
2316 }
2317}
2318
2319impl fmt::Debug for UdpSocket {
2320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2321 self.io.fmt(f)
2322 }
2323}
2324
2325#[cfg(not(windows))]
2326mod sys {
2327 use super::UdpSocket;
2328 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
2329
2330 impl AsRawFd for UdpSocket {
2331 fn as_raw_fd(&self) -> RawFd {
2332 self.io.as_raw_fd()
2333 }
2334 }
2335
2336 impl AsFd for UdpSocket {
2337 fn as_fd(&self) -> BorrowedFd<'_> {
2338 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
2339 }
2340 }
2341}
2342
2343cfg_windows! {
2344 use crate::os::windows::io::{AsRawSocket, RawSocket};
2345 use crate::os::windows::io::{AsSocket, BorrowedSocket};
2346
2347 impl AsRawSocket for UdpSocket {
2348 fn as_raw_socket(&self) -> RawSocket {
2349 self.io.as_raw_socket()
2350 }
2351 }
2352
2353 impl AsSocket for UdpSocket {
2354 fn as_socket(&self) -> BorrowedSocket<'_> {
2355 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
2356 }
2357 }
2358}