hyper/rt/
io.rs

1use std::fmt;
2use std::mem::MaybeUninit;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7// New IO traits? What?! Why, are you bonkers?
8//
9// I mean, yes, probably. But, here's the goals:
10//
11// 1. Supports poll-based IO operations.
12// 2. Opt-in vectored IO.
13// 3. Can use an optional buffer pool.
14// 4. Able to add completion-based (uring) IO eventually.
15//
16// Frankly, the last point is the entire reason we're doing this. We want to
17// have forwards-compatibility with an eventually stable io-uring runtime. We
18// don't need that to work right away. But it must be possible to add in here
19// without breaking hyper 1.0.
20//
21// While in here, if there's small tweaks to poll_read or poll_write that would
22// allow even the "slow" path to be faster, such as if someone didn't remember
23// to forward along an `is_completion` call.
24
25/// Reads bytes from a source.
26///
27/// This trait is similar to `std::io::Read`, but supports asynchronous reads.
28///
29/// # Implementing `Read`
30///
31/// Implementations should read data into the provided [`ReadBufCursor`] and
32/// advance the cursor to indicate how many bytes were written. The simplest
33/// and safest approach is to use [`ReadBufCursor::put_slice`]:
34///
35/// ```
36/// use hyper::rt::{Read, ReadBufCursor};
37/// use std::pin::Pin;
38/// use std::task::{Context, Poll};
39/// use std::io;
40///
41/// struct MyReader {
42///     data: Vec<u8>,
43///     position: usize,
44/// }
45///
46/// impl Read for MyReader {
47///     fn poll_read(
48///         mut self: Pin<&mut Self>,
49///         _cx: &mut Context<'_>,
50///         mut buf: ReadBufCursor<'_>,
51///     ) -> Poll<Result<(), io::Error>> {
52///         let remaining_data = &self.data[self.position..];
53///         if remaining_data.is_empty() {
54///             // No more data to read, signal EOF by returning Ok without
55///             // advancing the buffer
56///             return Poll::Ready(Ok(()));
57///         }
58///
59///         // Calculate how many bytes we can write
60///         let to_copy = remaining_data.len().min(buf.remaining());
61///         // Use put_slice to safely copy data and advance the cursor
62///         buf.put_slice(&remaining_data[..to_copy]);
63///
64///         self.position += to_copy;
65///         Poll::Ready(Ok(()))
66///     }
67/// }
68/// ```
69///
70/// For more advanced use cases where you need direct access to the buffer
71/// (e.g., when interfacing with APIs that write directly to a pointer),
72/// you can use the unsafe [`ReadBufCursor::as_mut`] and [`ReadBufCursor::advance`]
73/// methods. See their documentation for safety requirements.
74pub trait Read {
75    /// Attempts to read bytes into the `buf`.
76    ///
77    /// On success, returns `Poll::Ready(Ok(()))` and places data in the
78    /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is
79    /// unchanged), it implies that EOF has been reached.
80    ///
81    /// If no data is available for reading, the method returns `Poll::Pending`
82    /// and arranges for the current task (via `cx.waker()`) to receive a
83    /// notification when the object becomes readable or is closed.
84    fn poll_read(
85        self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87        buf: ReadBufCursor<'_>,
88    ) -> Poll<Result<(), std::io::Error>>;
89}
90
91/// Write bytes asynchronously.
92///
93/// This trait is similar to `std::io::Write`, but for asynchronous writes.
94pub trait Write {
95    /// Attempt to write bytes from `buf` into the destination.
96    ///
97    /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If
98    /// successful, it must be guaranteed that `n <= buf.len()`. A return value
99    /// of `0` means that the underlying object is no longer able to accept
100    /// bytes, or that the provided buffer is empty.
101    ///
102    /// If the object is not ready for writing, the method returns
103    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
104    /// receive a notification when the object becomes writable or is closed.
105    fn poll_write(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108        buf: &[u8],
109    ) -> Poll<Result<usize, std::io::Error>>;
110
111    /// Attempts to flush the object.
112    ///
113    /// On success, returns `Poll::Ready(Ok(()))`.
114    ///
115    /// If flushing cannot immediately complete, this method returns
116    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
117    /// receive a notification when the object can make progress.
118    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>;
119
120    /// Attempts to shut down this writer.
121    fn poll_shutdown(
122        self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124    ) -> Poll<Result<(), std::io::Error>>;
125
126    /// Returns whether this writer has an efficient `poll_write_vectored`
127    /// implementation.
128    ///
129    /// The default implementation returns `false`.
130    fn is_write_vectored(&self) -> bool {
131        false
132    }
133
134    /// Like `poll_write`, except that it writes from a slice of buffers.
135    fn poll_write_vectored(
136        self: Pin<&mut Self>,
137        cx: &mut Context<'_>,
138        bufs: &[std::io::IoSlice<'_>],
139    ) -> Poll<Result<usize, std::io::Error>> {
140        let buf = bufs
141            .iter()
142            .find(|b| !b.is_empty())
143            .map_or(&[][..], |b| &**b);
144        self.poll_write(cx, buf)
145    }
146}
147
148/// A wrapper around a byte buffer that is incrementally filled and initialized.
149///
150/// This type is a sort of "double cursor". It tracks three regions in the
151/// buffer: a region at the beginning of the buffer that has been logically
152/// filled with data, a region that has been initialized at some point but not
153/// yet logically filled, and a region at the end that may be uninitialized.
154/// The filled region is guaranteed to be a subset of the initialized region.
155///
156/// In summary, the contents of the buffer can be visualized as:
157///
158/// ```not_rust
159/// [             capacity              ]
160/// [ filled |         unfilled         ]
161/// [    initialized    | uninitialized ]
162/// ```
163///
164/// It is undefined behavior to de-initialize any bytes from the uninitialized
165/// region, since it is merely unknown whether this region is uninitialized or
166/// not, and if part of it turns out to be initialized, it must stay initialized.
167pub struct ReadBuf<'a> {
168    raw: &'a mut [MaybeUninit<u8>],
169    filled: usize,
170    init: usize,
171}
172
173/// The cursor part of a [`ReadBuf`], representing the unfilled portion.
174///
175/// This is created by calling [`ReadBuf::unfilled()`].
176///
177/// `ReadBufCursor` provides safe and unsafe methods for writing data into the
178/// buffer:
179///
180/// - **Safe approach**: Use [`put_slice`](Self::put_slice) to copy data from
181///   a slice. This handles initialization tracking and cursor advancement
182///   automatically.
183///
184/// - **Unsafe approach**: For zero-copy scenarios or when interfacing with
185///   low-level APIs, use [`as_mut`](Self::as_mut) to get a mutable slice
186///   of `MaybeUninit<u8>`, then call [`advance`](Self::advance) after writing.
187///   This is more efficient but requires careful attention to safety invariants.
188///
189/// # Example using safe methods
190///
191/// ```
192/// use hyper::rt::ReadBuf;
193///
194/// let mut backing = [0u8; 64];
195/// let mut read_buf = ReadBuf::new(&mut backing);
196///
197/// {
198///     let mut cursor = read_buf.unfilled();
199///     // put_slice handles everything safely
200///     cursor.put_slice(b"hello");
201/// }
202///
203/// assert_eq!(read_buf.filled(), b"hello");
204/// ```
205///
206/// # Example using unsafe methods
207///
208/// ```
209/// use hyper::rt::ReadBuf;
210///
211/// let mut backing = [0u8; 64];
212/// let mut read_buf = ReadBuf::new(&mut backing);
213///
214/// {
215///     let mut cursor = read_buf.unfilled();
216///     // SAFETY: we will initialize exactly 5 bytes
217///     let slice = unsafe { cursor.as_mut() };
218///     slice[0].write(b'h');
219///     slice[1].write(b'e');
220///     slice[2].write(b'l');
221///     slice[3].write(b'l');
222///     slice[4].write(b'o');
223///     // SAFETY: we have initialized 5 bytes
224///     unsafe { cursor.advance(5) };
225/// }
226///
227/// assert_eq!(read_buf.filled(), b"hello");
228/// ```
229#[derive(Debug)]
230pub struct ReadBufCursor<'a> {
231    buf: &'a mut ReadBuf<'a>,
232}
233
234impl<'data> ReadBuf<'data> {
235    /// Create a new `ReadBuf` with a slice of initialized bytes.
236    #[inline]
237    pub fn new(raw: &'data mut [u8]) -> Self {
238        let len = raw.len();
239        Self {
240            // SAFETY: We never de-init the bytes ourselves.
241            raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) },
242            filled: 0,
243            init: len,
244        }
245    }
246
247    /// Create a new `ReadBuf` with a slice of uninitialized bytes.
248    #[inline]
249    pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self {
250        Self {
251            raw,
252            filled: 0,
253            init: 0,
254        }
255    }
256
257    /// Get a slice of the buffer that has been filled in with bytes.
258    #[inline]
259    pub fn filled(&self) -> &[u8] {
260        // SAFETY: We only slice the filled part of the buffer, which is always valid
261        unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) }
262    }
263
264    /// Get a cursor to the unfilled portion of the buffer.
265    #[inline]
266    pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> {
267        ReadBufCursor {
268            // SAFETY: self.buf is never re-assigned, so its safe to narrow
269            // the lifetime.
270            buf: unsafe {
271                std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>(
272                    self,
273                )
274            },
275        }
276    }
277
278    #[inline]
279    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
280    pub(crate) unsafe fn set_init(&mut self, n: usize) {
281        self.init = self.init.max(n);
282    }
283
284    #[inline]
285    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
286    pub(crate) unsafe fn set_filled(&mut self, n: usize) {
287        self.filled = self.filled.max(n);
288    }
289
290    #[inline]
291    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
292    pub(crate) fn len(&self) -> usize {
293        self.filled
294    }
295
296    #[inline]
297    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
298    pub(crate) fn init_len(&self) -> usize {
299        self.init
300    }
301
302    #[inline]
303    fn remaining(&self) -> usize {
304        self.capacity() - self.filled
305    }
306
307    #[inline]
308    fn capacity(&self) -> usize {
309        self.raw.len()
310    }
311}
312
313impl fmt::Debug for ReadBuf<'_> {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        f.debug_struct("ReadBuf")
316            .field("filled", &self.filled)
317            .field("init", &self.init)
318            .field("capacity", &self.capacity())
319            .finish()
320    }
321}
322
323impl ReadBufCursor<'_> {
324    /// Access the unfilled part of the buffer.
325    ///
326    /// # Safety
327    ///
328    /// The caller must not uninitialize any bytes that may have been
329    /// initialized before.
330    #[inline]
331    pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] {
332        &mut self.buf.raw[self.buf.filled..]
333    }
334
335    /// Advance the `filled` cursor by `n` bytes.
336    ///
337    /// # Safety
338    ///
339    /// The caller must take care that `n` more bytes have been initialized.
340    #[inline]
341    pub unsafe fn advance(&mut self, n: usize) {
342        self.buf.filled = self.buf.filled.checked_add(n).expect("overflow");
343        self.buf.init = self.buf.filled.max(self.buf.init);
344    }
345
346    /// Returns the number of bytes that can be written from the current
347    /// position until the end of the buffer is reached.
348    ///
349    /// This value is equal to the length of the slice returned by `as_mut()``.
350    #[inline]
351    pub fn remaining(&self) -> usize {
352        self.buf.remaining()
353    }
354
355    /// Transfer bytes into `self` from `src` and advance the cursor
356    /// by the number of bytes written.
357    ///
358    /// # Panics
359    ///
360    /// `self` must have enough remaining capacity to contain all of `src`.
361    #[inline]
362    pub fn put_slice(&mut self, src: &[u8]) {
363        assert!(
364            self.buf.remaining() >= src.len(),
365            "src.len() must fit in remaining()"
366        );
367
368        let amt = src.len();
369        // Cannot overflow, asserted above
370        let end = self.buf.filled + amt;
371
372        // Safety: the length is asserted above
373        unsafe {
374            self.buf.raw[self.buf.filled..end]
375                .as_mut_ptr()
376                .cast::<u8>()
377                .copy_from_nonoverlapping(src.as_ptr(), amt);
378        }
379
380        if self.buf.init < end {
381            self.buf.init = end;
382        }
383        self.buf.filled = end;
384    }
385}
386
387macro_rules! deref_async_read {
388    () => {
389        fn poll_read(
390            mut self: Pin<&mut Self>,
391            cx: &mut Context<'_>,
392            buf: ReadBufCursor<'_>,
393        ) -> Poll<std::io::Result<()>> {
394            Pin::new(&mut **self).poll_read(cx, buf)
395        }
396    };
397}
398
399impl<T: ?Sized + Read + Unpin> Read for Box<T> {
400    deref_async_read!();
401}
402
403impl<T: ?Sized + Read + Unpin> Read for &mut T {
404    deref_async_read!();
405}
406
407impl<P> Read for Pin<P>
408where
409    P: DerefMut,
410    P::Target: Read,
411{
412    fn poll_read(
413        self: Pin<&mut Self>,
414        cx: &mut Context<'_>,
415        buf: ReadBufCursor<'_>,
416    ) -> Poll<std::io::Result<()>> {
417        pin_as_deref_mut(self).poll_read(cx, buf)
418    }
419}
420
421macro_rules! deref_async_write {
422    () => {
423        fn poll_write(
424            mut self: Pin<&mut Self>,
425            cx: &mut Context<'_>,
426            buf: &[u8],
427        ) -> Poll<std::io::Result<usize>> {
428            Pin::new(&mut **self).poll_write(cx, buf)
429        }
430
431        fn poll_write_vectored(
432            mut self: Pin<&mut Self>,
433            cx: &mut Context<'_>,
434            bufs: &[std::io::IoSlice<'_>],
435        ) -> Poll<std::io::Result<usize>> {
436            Pin::new(&mut **self).poll_write_vectored(cx, bufs)
437        }
438
439        fn is_write_vectored(&self) -> bool {
440            (**self).is_write_vectored()
441        }
442
443        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
444            Pin::new(&mut **self).poll_flush(cx)
445        }
446
447        fn poll_shutdown(
448            mut self: Pin<&mut Self>,
449            cx: &mut Context<'_>,
450        ) -> Poll<std::io::Result<()>> {
451            Pin::new(&mut **self).poll_shutdown(cx)
452        }
453    };
454}
455
456impl<T: ?Sized + Write + Unpin> Write for Box<T> {
457    deref_async_write!();
458}
459
460impl<T: ?Sized + Write + Unpin> Write for &mut T {
461    deref_async_write!();
462}
463
464impl<P> Write for Pin<P>
465where
466    P: DerefMut,
467    P::Target: Write,
468{
469    fn poll_write(
470        self: Pin<&mut Self>,
471        cx: &mut Context<'_>,
472        buf: &[u8],
473    ) -> Poll<std::io::Result<usize>> {
474        pin_as_deref_mut(self).poll_write(cx, buf)
475    }
476
477    fn poll_write_vectored(
478        self: Pin<&mut Self>,
479        cx: &mut Context<'_>,
480        bufs: &[std::io::IoSlice<'_>],
481    ) -> Poll<std::io::Result<usize>> {
482        pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
483    }
484
485    fn is_write_vectored(&self) -> bool {
486        (**self).is_write_vectored()
487    }
488
489    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
490        pin_as_deref_mut(self).poll_flush(cx)
491    }
492
493    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
494        pin_as_deref_mut(self).poll_shutdown(cx)
495    }
496}
497
498/// Polyfill for Pin::as_deref_mut()
499/// TODO: use Pin::as_deref_mut() instead once stabilized
500fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> {
501    // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or
502    // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail.
503    unsafe { pin.get_unchecked_mut() }.as_mut()
504}