hyper/rt/io.rs
1use std::fmt;
2use std::mem::MaybeUninit;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7// New IO traits? What?! Why, are you bonkers?
8//
9// I mean, yes, probably. But, here's the goals:
10//
11// 1. Supports poll-based IO operations.
12// 2. Opt-in vectored IO.
13// 3. Can use an optional buffer pool.
14// 4. Able to add completion-based (uring) IO eventually.
15//
16// Frankly, the last point is the entire reason we're doing this. We want to
17// have forwards-compatibility with an eventually stable io-uring runtime. We
18// don't need that to work right away. But it must be possible to add in here
19// without breaking hyper 1.0.
20//
21// While in here, if there's small tweaks to poll_read or poll_write that would
22// allow even the "slow" path to be faster, such as if someone didn't remember
23// to forward along an `is_completion` call.
24
25/// Reads bytes from a source.
26///
27/// This trait is similar to `std::io::Read`, but supports asynchronous reads.
28///
29/// # Implementing `Read`
30///
31/// Implementations should read data into the provided [`ReadBufCursor`] and
32/// advance the cursor to indicate how many bytes were written. The simplest
33/// and safest approach is to use [`ReadBufCursor::put_slice`]:
34///
35/// ```
36/// use hyper::rt::{Read, ReadBufCursor};
37/// use std::pin::Pin;
38/// use std::task::{Context, Poll};
39/// use std::io;
40///
41/// struct MyReader {
42/// data: Vec<u8>,
43/// position: usize,
44/// }
45///
46/// impl Read for MyReader {
47/// fn poll_read(
48/// mut self: Pin<&mut Self>,
49/// _cx: &mut Context<'_>,
50/// mut buf: ReadBufCursor<'_>,
51/// ) -> Poll<Result<(), io::Error>> {
52/// let remaining_data = &self.data[self.position..];
53/// if remaining_data.is_empty() {
54/// // No more data to read, signal EOF by returning Ok without
55/// // advancing the buffer
56/// return Poll::Ready(Ok(()));
57/// }
58///
59/// // Calculate how many bytes we can write
60/// let to_copy = remaining_data.len().min(buf.remaining());
61/// // Use put_slice to safely copy data and advance the cursor
62/// buf.put_slice(&remaining_data[..to_copy]);
63///
64/// self.position += to_copy;
65/// Poll::Ready(Ok(()))
66/// }
67/// }
68/// ```
69///
70/// For more advanced use cases where you need direct access to the buffer
71/// (e.g., when interfacing with APIs that write directly to a pointer),
72/// you can use the unsafe [`ReadBufCursor::as_mut`] and [`ReadBufCursor::advance`]
73/// methods. See their documentation for safety requirements.
74pub trait Read {
75 /// Attempts to read bytes into the `buf`.
76 ///
77 /// On success, returns `Poll::Ready(Ok(()))` and places data in the
78 /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is
79 /// unchanged), it implies that EOF has been reached.
80 ///
81 /// If no data is available for reading, the method returns `Poll::Pending`
82 /// and arranges for the current task (via `cx.waker()`) to receive a
83 /// notification when the object becomes readable or is closed.
84 fn poll_read(
85 self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 buf: ReadBufCursor<'_>,
88 ) -> Poll<Result<(), std::io::Error>>;
89}
90
91/// Write bytes asynchronously.
92///
93/// This trait is similar to `std::io::Write`, but for asynchronous writes.
94pub trait Write {
95 /// Attempt to write bytes from `buf` into the destination.
96 ///
97 /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If
98 /// successful, it must be guaranteed that `n <= buf.len()`. A return value
99 /// of `0` means that the underlying object is no longer able to accept
100 /// bytes, or that the provided buffer is empty.
101 ///
102 /// If the object is not ready for writing, the method returns
103 /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
104 /// receive a notification when the object becomes writable or is closed.
105 fn poll_write(
106 self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 buf: &[u8],
109 ) -> Poll<Result<usize, std::io::Error>>;
110
111 /// Attempts to flush the object.
112 ///
113 /// On success, returns `Poll::Ready(Ok(()))`.
114 ///
115 /// If flushing cannot immediately complete, this method returns
116 /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
117 /// receive a notification when the object can make progress.
118 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>;
119
120 /// Attempts to shut down this writer.
121 fn poll_shutdown(
122 self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Result<(), std::io::Error>>;
125
126 /// Returns whether this writer has an efficient `poll_write_vectored`
127 /// implementation.
128 ///
129 /// The default implementation returns `false`.
130 fn is_write_vectored(&self) -> bool {
131 false
132 }
133
134 /// Like `poll_write`, except that it writes from a slice of buffers.
135 fn poll_write_vectored(
136 self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 bufs: &[std::io::IoSlice<'_>],
139 ) -> Poll<Result<usize, std::io::Error>> {
140 let buf = bufs
141 .iter()
142 .find(|b| !b.is_empty())
143 .map_or(&[][..], |b| &**b);
144 self.poll_write(cx, buf)
145 }
146}
147
148/// A wrapper around a byte buffer that is incrementally filled and initialized.
149///
150/// This type is a sort of "double cursor". It tracks three regions in the
151/// buffer: a region at the beginning of the buffer that has been logically
152/// filled with data, a region that has been initialized at some point but not
153/// yet logically filled, and a region at the end that may be uninitialized.
154/// The filled region is guaranteed to be a subset of the initialized region.
155///
156/// In summary, the contents of the buffer can be visualized as:
157///
158/// ```not_rust
159/// [ capacity ]
160/// [ filled | unfilled ]
161/// [ initialized | uninitialized ]
162/// ```
163///
164/// It is undefined behavior to de-initialize any bytes from the uninitialized
165/// region, since it is merely unknown whether this region is uninitialized or
166/// not, and if part of it turns out to be initialized, it must stay initialized.
167pub struct ReadBuf<'a> {
168 raw: &'a mut [MaybeUninit<u8>],
169 filled: usize,
170 init: usize,
171}
172
173/// The cursor part of a [`ReadBuf`], representing the unfilled portion.
174///
175/// This is created by calling [`ReadBuf::unfilled()`].
176///
177/// `ReadBufCursor` provides safe and unsafe methods for writing data into the
178/// buffer:
179///
180/// - **Safe approach**: Use [`put_slice`](Self::put_slice) to copy data from
181/// a slice. This handles initialization tracking and cursor advancement
182/// automatically.
183///
184/// - **Unsafe approach**: For zero-copy scenarios or when interfacing with
185/// low-level APIs, use [`as_mut`](Self::as_mut) to get a mutable slice
186/// of `MaybeUninit<u8>`, then call [`advance`](Self::advance) after writing.
187/// This is more efficient but requires careful attention to safety invariants.
188///
189/// # Example using safe methods
190///
191/// ```
192/// use hyper::rt::ReadBuf;
193///
194/// let mut backing = [0u8; 64];
195/// let mut read_buf = ReadBuf::new(&mut backing);
196///
197/// {
198/// let mut cursor = read_buf.unfilled();
199/// // put_slice handles everything safely
200/// cursor.put_slice(b"hello");
201/// }
202///
203/// assert_eq!(read_buf.filled(), b"hello");
204/// ```
205///
206/// # Example using unsafe methods
207///
208/// ```
209/// use hyper::rt::ReadBuf;
210///
211/// let mut backing = [0u8; 64];
212/// let mut read_buf = ReadBuf::new(&mut backing);
213///
214/// {
215/// let mut cursor = read_buf.unfilled();
216/// // SAFETY: we will initialize exactly 5 bytes
217/// let slice = unsafe { cursor.as_mut() };
218/// slice[0].write(b'h');
219/// slice[1].write(b'e');
220/// slice[2].write(b'l');
221/// slice[3].write(b'l');
222/// slice[4].write(b'o');
223/// // SAFETY: we have initialized 5 bytes
224/// unsafe { cursor.advance(5) };
225/// }
226///
227/// assert_eq!(read_buf.filled(), b"hello");
228/// ```
229#[derive(Debug)]
230pub struct ReadBufCursor<'a> {
231 buf: &'a mut ReadBuf<'a>,
232}
233
234impl<'data> ReadBuf<'data> {
235 /// Create a new `ReadBuf` with a slice of initialized bytes.
236 #[inline]
237 pub fn new(raw: &'data mut [u8]) -> Self {
238 let len = raw.len();
239 Self {
240 // SAFETY: We never de-init the bytes ourselves.
241 raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) },
242 filled: 0,
243 init: len,
244 }
245 }
246
247 /// Create a new `ReadBuf` with a slice of uninitialized bytes.
248 #[inline]
249 pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self {
250 Self {
251 raw,
252 filled: 0,
253 init: 0,
254 }
255 }
256
257 /// Get a slice of the buffer that has been filled in with bytes.
258 #[inline]
259 pub fn filled(&self) -> &[u8] {
260 // SAFETY: We only slice the filled part of the buffer, which is always valid
261 unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) }
262 }
263
264 /// Get a cursor to the unfilled portion of the buffer.
265 #[inline]
266 pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> {
267 ReadBufCursor {
268 // SAFETY: self.buf is never re-assigned, so its safe to narrow
269 // the lifetime.
270 buf: unsafe {
271 std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>(
272 self,
273 )
274 },
275 }
276 }
277
278 #[inline]
279 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
280 pub(crate) unsafe fn set_init(&mut self, n: usize) {
281 self.init = self.init.max(n);
282 }
283
284 #[inline]
285 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
286 pub(crate) unsafe fn set_filled(&mut self, n: usize) {
287 self.filled = self.filled.max(n);
288 }
289
290 #[inline]
291 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
292 pub(crate) fn len(&self) -> usize {
293 self.filled
294 }
295
296 #[inline]
297 #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
298 pub(crate) fn init_len(&self) -> usize {
299 self.init
300 }
301
302 #[inline]
303 fn remaining(&self) -> usize {
304 self.capacity() - self.filled
305 }
306
307 #[inline]
308 fn capacity(&self) -> usize {
309 self.raw.len()
310 }
311}
312
313impl fmt::Debug for ReadBuf<'_> {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 f.debug_struct("ReadBuf")
316 .field("filled", &self.filled)
317 .field("init", &self.init)
318 .field("capacity", &self.capacity())
319 .finish()
320 }
321}
322
323impl ReadBufCursor<'_> {
324 /// Access the unfilled part of the buffer.
325 ///
326 /// # Safety
327 ///
328 /// The caller must not uninitialize any bytes that may have been
329 /// initialized before.
330 #[inline]
331 pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] {
332 &mut self.buf.raw[self.buf.filled..]
333 }
334
335 /// Advance the `filled` cursor by `n` bytes.
336 ///
337 /// # Safety
338 ///
339 /// The caller must take care that `n` more bytes have been initialized.
340 #[inline]
341 pub unsafe fn advance(&mut self, n: usize) {
342 self.buf.filled = self.buf.filled.checked_add(n).expect("overflow");
343 self.buf.init = self.buf.filled.max(self.buf.init);
344 }
345
346 /// Returns the number of bytes that can be written from the current
347 /// position until the end of the buffer is reached.
348 ///
349 /// This value is equal to the length of the slice returned by `as_mut()``.
350 #[inline]
351 pub fn remaining(&self) -> usize {
352 self.buf.remaining()
353 }
354
355 /// Transfer bytes into `self` from `src` and advance the cursor
356 /// by the number of bytes written.
357 ///
358 /// # Panics
359 ///
360 /// `self` must have enough remaining capacity to contain all of `src`.
361 #[inline]
362 pub fn put_slice(&mut self, src: &[u8]) {
363 assert!(
364 self.buf.remaining() >= src.len(),
365 "src.len() must fit in remaining()"
366 );
367
368 let amt = src.len();
369 // Cannot overflow, asserted above
370 let end = self.buf.filled + amt;
371
372 // Safety: the length is asserted above
373 unsafe {
374 self.buf.raw[self.buf.filled..end]
375 .as_mut_ptr()
376 .cast::<u8>()
377 .copy_from_nonoverlapping(src.as_ptr(), amt);
378 }
379
380 if self.buf.init < end {
381 self.buf.init = end;
382 }
383 self.buf.filled = end;
384 }
385}
386
387macro_rules! deref_async_read {
388 () => {
389 fn poll_read(
390 mut self: Pin<&mut Self>,
391 cx: &mut Context<'_>,
392 buf: ReadBufCursor<'_>,
393 ) -> Poll<std::io::Result<()>> {
394 Pin::new(&mut **self).poll_read(cx, buf)
395 }
396 };
397}
398
399impl<T: ?Sized + Read + Unpin> Read for Box<T> {
400 deref_async_read!();
401}
402
403impl<T: ?Sized + Read + Unpin> Read for &mut T {
404 deref_async_read!();
405}
406
407impl<P> Read for Pin<P>
408where
409 P: DerefMut,
410 P::Target: Read,
411{
412 fn poll_read(
413 self: Pin<&mut Self>,
414 cx: &mut Context<'_>,
415 buf: ReadBufCursor<'_>,
416 ) -> Poll<std::io::Result<()>> {
417 pin_as_deref_mut(self).poll_read(cx, buf)
418 }
419}
420
421macro_rules! deref_async_write {
422 () => {
423 fn poll_write(
424 mut self: Pin<&mut Self>,
425 cx: &mut Context<'_>,
426 buf: &[u8],
427 ) -> Poll<std::io::Result<usize>> {
428 Pin::new(&mut **self).poll_write(cx, buf)
429 }
430
431 fn poll_write_vectored(
432 mut self: Pin<&mut Self>,
433 cx: &mut Context<'_>,
434 bufs: &[std::io::IoSlice<'_>],
435 ) -> Poll<std::io::Result<usize>> {
436 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
437 }
438
439 fn is_write_vectored(&self) -> bool {
440 (**self).is_write_vectored()
441 }
442
443 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
444 Pin::new(&mut **self).poll_flush(cx)
445 }
446
447 fn poll_shutdown(
448 mut self: Pin<&mut Self>,
449 cx: &mut Context<'_>,
450 ) -> Poll<std::io::Result<()>> {
451 Pin::new(&mut **self).poll_shutdown(cx)
452 }
453 };
454}
455
456impl<T: ?Sized + Write + Unpin> Write for Box<T> {
457 deref_async_write!();
458}
459
460impl<T: ?Sized + Write + Unpin> Write for &mut T {
461 deref_async_write!();
462}
463
464impl<P> Write for Pin<P>
465where
466 P: DerefMut,
467 P::Target: Write,
468{
469 fn poll_write(
470 self: Pin<&mut Self>,
471 cx: &mut Context<'_>,
472 buf: &[u8],
473 ) -> Poll<std::io::Result<usize>> {
474 pin_as_deref_mut(self).poll_write(cx, buf)
475 }
476
477 fn poll_write_vectored(
478 self: Pin<&mut Self>,
479 cx: &mut Context<'_>,
480 bufs: &[std::io::IoSlice<'_>],
481 ) -> Poll<std::io::Result<usize>> {
482 pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
483 }
484
485 fn is_write_vectored(&self) -> bool {
486 (**self).is_write_vectored()
487 }
488
489 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
490 pin_as_deref_mut(self).poll_flush(cx)
491 }
492
493 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
494 pin_as_deref_mut(self).poll_shutdown(cx)
495 }
496}
497
498/// Polyfill for Pin::as_deref_mut()
499/// TODO: use Pin::as_deref_mut() instead once stabilized
500fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> {
501 // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or
502 // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail.
503 unsafe { pin.get_unchecked_mut() }.as_mut()
504}