tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33cfg_io_uring! {
34 #[cfg(not(test))]
35 use crate::spawn;
36}
37
38/// A reference to an open file on the filesystem.
39///
40/// This is a specialized version of [`std::fs::File`] for usage from the
41/// Tokio runtime.
42///
43/// An instance of a `File` can be read and/or written depending on what options
44/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
45/// cursor that the file contains internally.
46///
47/// A file will not be closed immediately when it goes out of scope if there
48/// are any IO operations that have not yet completed. To ensure that a file is
49/// closed immediately when it is dropped, you should call [`flush`] before
50/// dropping it. Note that this does not ensure that the file has been fully
51/// written to disk; the operating system might keep the changes around in an
52/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
53/// the data to disk.
54///
55/// Reading and writing to a `File` is usually done using the convenience
56/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
57///
58/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
59/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
60/// [`sync_all`]: fn@crate::fs::File::sync_all
61/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
62/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
63///
64/// # Examples
65///
66/// Create a new file and asynchronously write bytes to it:
67///
68/// ```no_run
69/// use tokio::fs::File;
70/// use tokio::io::AsyncWriteExt; // for write_all()
71///
72/// # async fn dox() -> std::io::Result<()> {
73/// let mut file = File::create("foo.txt").await?;
74/// file.write_all(b"hello, world!").await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// Read the contents of a file into a buffer:
80///
81/// ```no_run
82/// use tokio::fs::File;
83/// use tokio::io::AsyncReadExt; // for read_to_end()
84///
85/// # async fn dox() -> std::io::Result<()> {
86/// let mut file = File::open("foo.txt").await?;
87///
88/// let mut contents = vec![];
89/// file.read_to_end(&mut contents).await?;
90///
91/// println!("len = {}", contents.len());
92/// # Ok(())
93/// # }
94/// ```
95pub struct File {
96 std: Arc<StdFile>,
97 inner: Mutex<Inner>,
98 max_buf_size: usize,
99}
100
101struct Inner {
102 state: State,
103
104 /// Errors from writes/flushes are returned in write/flush calls. If a write
105 /// error is observed while performing a read, it is saved until the next
106 /// write / flush call.
107 last_write_err: Option<io::ErrorKind>,
108
109 pos: u64,
110}
111
112#[derive(Debug)]
113enum State {
114 Idle(Option<Buf>),
115 Busy(JoinHandle<(Operation, Buf)>),
116}
117
118#[derive(Debug)]
119enum Operation {
120 Read(io::Result<usize>),
121 Write(io::Result<()>),
122 Seek(io::Result<u64>),
123}
124
125impl File {
126 /// Attempts to open a file in read-only mode.
127 ///
128 /// See [`OpenOptions`] for more details.
129 ///
130 /// # Errors
131 ///
132 /// This function will return an error if called from outside of the Tokio
133 /// runtime or if path does not already exist. Other errors may also be
134 /// returned according to `OpenOptions::open`.
135 ///
136 /// # Examples
137 ///
138 /// ```no_run
139 /// use tokio::fs::File;
140 /// use tokio::io::AsyncReadExt;
141 ///
142 /// # async fn dox() -> std::io::Result<()> {
143 /// let mut file = File::open("foo.txt").await?;
144 ///
145 /// let mut contents = vec![];
146 /// file.read_to_end(&mut contents).await?;
147 ///
148 /// println!("len = {}", contents.len());
149 /// # Ok(())
150 /// # }
151 /// ```
152 ///
153 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
154 ///
155 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
156 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
157 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
158 Self::options().read(true).open(path).await
159 }
160
161 /// Opens a file in write-only mode.
162 ///
163 /// This function will create a file if it does not exist, and will truncate
164 /// it if it does.
165 ///
166 /// See [`OpenOptions`] for more details.
167 ///
168 /// # Errors
169 ///
170 /// Results in an error if called from outside of the Tokio runtime or if
171 /// the underlying [`create`] call results in an error.
172 ///
173 /// [`create`]: std::fs::File::create
174 ///
175 /// # Examples
176 ///
177 /// ```no_run
178 /// use tokio::fs::File;
179 /// use tokio::io::AsyncWriteExt;
180 ///
181 /// # async fn dox() -> std::io::Result<()> {
182 /// let mut file = File::create("foo.txt").await?;
183 /// file.write_all(b"hello, world!").await?;
184 /// # Ok(())
185 /// # }
186 /// ```
187 ///
188 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
189 ///
190 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
191 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
192 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
193 Self::options()
194 .write(true)
195 .create(true)
196 .truncate(true)
197 .open(path)
198 .await
199 }
200
201 /// Opens a file in read-write mode.
202 ///
203 /// This function will create a file if it does not exist, or return an error
204 /// if it does. This way, if the call succeeds, the file returned is guaranteed
205 /// to be new.
206 ///
207 /// This option is useful because it is atomic. Otherwise between checking
208 /// whether a file exists and creating a new one, the file may have been
209 /// created by another process (a TOCTOU race condition / attack).
210 ///
211 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
212 ///
213 /// See [`OpenOptions`] for more details.
214 ///
215 /// # Examples
216 ///
217 /// ```no_run
218 /// use tokio::fs::File;
219 /// use tokio::io::AsyncWriteExt;
220 ///
221 /// # async fn dox() -> std::io::Result<()> {
222 /// let mut file = File::create_new("foo.txt").await?;
223 /// file.write_all(b"hello, world!").await?;
224 /// # Ok(())
225 /// # }
226 /// ```
227 ///
228 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
229 ///
230 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
231 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
232 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
233 Self::options()
234 .read(true)
235 .write(true)
236 .create_new(true)
237 .open(path)
238 .await
239 }
240
241 /// Returns a new [`OpenOptions`] object.
242 ///
243 /// This function returns a new `OpenOptions` object that you can use to
244 /// open or create a file with specific options if `open()` or `create()`
245 /// are not appropriate.
246 ///
247 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
248 /// readable code. Instead of
249 /// `OpenOptions::new().append(true).open("example.log")`,
250 /// you can write `File::options().append(true).open("example.log")`. This
251 /// also avoids the need to import `OpenOptions`.
252 ///
253 /// See the [`OpenOptions::new`] function for more details.
254 ///
255 /// # Examples
256 ///
257 /// ```no_run
258 /// use tokio::fs::File;
259 /// use tokio::io::AsyncWriteExt;
260 ///
261 /// # async fn dox() -> std::io::Result<()> {
262 /// let mut f = File::options().append(true).open("example.log").await?;
263 /// f.write_all(b"new line\n").await?;
264 /// # Ok(())
265 /// # }
266 /// ```
267 #[must_use]
268 pub fn options() -> OpenOptions {
269 OpenOptions::new()
270 }
271
272 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
273 ///
274 /// # Examples
275 ///
276 /// ```no_run
277 /// // This line could block. It is not recommended to do this on the Tokio
278 /// // runtime.
279 /// let std_file = std::fs::File::open("foo.txt").unwrap();
280 /// let file = tokio::fs::File::from_std(std_file);
281 /// ```
282 pub fn from_std(std: StdFile) -> File {
283 File {
284 std: Arc::new(std),
285 inner: Mutex::new(Inner {
286 state: State::Idle(Some(Buf::with_capacity(0))),
287 last_write_err: None,
288 pos: 0,
289 }),
290 max_buf_size: DEFAULT_MAX_BUF_SIZE,
291 }
292 }
293
294 /// Attempts to sync all OS-internal metadata to disk.
295 ///
296 /// This function will attempt to ensure that all in-core data reaches the
297 /// filesystem before returning.
298 ///
299 /// # Examples
300 ///
301 /// ```no_run
302 /// use tokio::fs::File;
303 /// use tokio::io::AsyncWriteExt;
304 ///
305 /// # async fn dox() -> std::io::Result<()> {
306 /// let mut file = File::create("foo.txt").await?;
307 /// file.write_all(b"hello, world!").await?;
308 /// file.sync_all().await?;
309 /// # Ok(())
310 /// # }
311 /// ```
312 ///
313 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
314 ///
315 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
316 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
317 pub async fn sync_all(&self) -> io::Result<()> {
318 let mut inner = self.inner.lock().await;
319 inner.complete_inflight().await;
320
321 let std = self.std.clone();
322 asyncify(move || std.sync_all()).await
323 }
324
325 /// This function is similar to `sync_all`, except that it may not
326 /// synchronize file metadata to the filesystem.
327 ///
328 /// This is intended for use cases that must synchronize content, but don't
329 /// need the metadata on disk. The goal of this method is to reduce disk
330 /// operations.
331 ///
332 /// Note that some platforms may simply implement this in terms of `sync_all`.
333 ///
334 /// # Examples
335 ///
336 /// ```no_run
337 /// use tokio::fs::File;
338 /// use tokio::io::AsyncWriteExt;
339 ///
340 /// # async fn dox() -> std::io::Result<()> {
341 /// let mut file = File::create("foo.txt").await?;
342 /// file.write_all(b"hello, world!").await?;
343 /// file.sync_data().await?;
344 /// # Ok(())
345 /// # }
346 /// ```
347 ///
348 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
349 ///
350 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
351 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
352 pub async fn sync_data(&self) -> io::Result<()> {
353 let mut inner = self.inner.lock().await;
354 inner.complete_inflight().await;
355
356 let std = self.std.clone();
357 asyncify(move || std.sync_data()).await
358 }
359
360 /// Truncates or extends the underlying file, updating the size of this file to become size.
361 ///
362 /// If the size is less than the current file's size, then the file will be
363 /// shrunk. If it is greater than the current file's size, then the file
364 /// will be extended to size and have all of the intermediate data filled in
365 /// with 0s.
366 ///
367 /// # Errors
368 ///
369 /// This function will return an error if the file is not opened for
370 /// writing.
371 ///
372 /// # Examples
373 ///
374 /// ```no_run
375 /// use tokio::fs::File;
376 /// use tokio::io::AsyncWriteExt;
377 ///
378 /// # async fn dox() -> std::io::Result<()> {
379 /// let mut file = File::create("foo.txt").await?;
380 /// file.write_all(b"hello, world!").await?;
381 /// file.set_len(10).await?;
382 /// # Ok(())
383 /// # }
384 /// ```
385 ///
386 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
387 ///
388 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
389 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
390 pub async fn set_len(&self, size: u64) -> io::Result<()> {
391 let mut inner = self.inner.lock().await;
392 inner.complete_inflight().await;
393
394 let mut buf = match inner.state {
395 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
396 _ => unreachable!(),
397 };
398
399 let seek = if !buf.is_empty() {
400 Some(SeekFrom::Current(buf.discard_read()))
401 } else {
402 None
403 };
404
405 let std = self.std.clone();
406
407 inner.state = State::Busy(spawn_blocking(move || {
408 let res = if let Some(seek) = seek {
409 (&*std).seek(seek).and_then(|_| std.set_len(size))
410 } else {
411 std.set_len(size)
412 }
413 .map(|()| 0); // the value is discarded later
414
415 // Return the result as a seek
416 (Operation::Seek(res), buf)
417 }));
418
419 let (op, buf) = match inner.state {
420 State::Idle(_) => unreachable!(),
421 State::Busy(ref mut rx) => rx.await?,
422 };
423
424 inner.state = State::Idle(Some(buf));
425
426 match op {
427 Operation::Seek(res) => res.map(|pos| {
428 inner.pos = pos;
429 }),
430 _ => unreachable!(),
431 }
432 }
433
434 /// Queries metadata about the underlying file.
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use tokio::fs::File;
440 ///
441 /// # async fn dox() -> std::io::Result<()> {
442 /// let file = File::open("foo.txt").await?;
443 /// let metadata = file.metadata().await?;
444 ///
445 /// println!("{:?}", metadata);
446 /// # Ok(())
447 /// # }
448 /// ```
449 pub async fn metadata(&self) -> io::Result<Metadata> {
450 let std = self.std.clone();
451 asyncify(move || std.metadata()).await
452 }
453
454 /// Creates a new `File` instance that shares the same underlying file handle
455 /// as the existing `File` instance. Reads, writes, and seeks will affect both
456 /// File instances simultaneously.
457 ///
458 /// # Examples
459 ///
460 /// ```no_run
461 /// use tokio::fs::File;
462 ///
463 /// # async fn dox() -> std::io::Result<()> {
464 /// let file = File::open("foo.txt").await?;
465 /// let file_clone = file.try_clone().await?;
466 /// # Ok(())
467 /// # }
468 /// ```
469 pub async fn try_clone(&self) -> io::Result<File> {
470 self.inner.lock().await.complete_inflight().await;
471 let std = self.std.clone();
472 let std_file = asyncify(move || std.try_clone()).await?;
473 let mut file = File::from_std(std_file);
474 file.set_max_buf_size(self.max_buf_size);
475 Ok(file)
476 }
477
478 /// Destructures `File` into a [`std::fs::File`]. This function is
479 /// async to allow any in-flight operations to complete.
480 ///
481 /// Use `File::try_into_std` to attempt conversion immediately.
482 ///
483 /// # Examples
484 ///
485 /// ```no_run
486 /// use tokio::fs::File;
487 ///
488 /// # async fn dox() -> std::io::Result<()> {
489 /// let tokio_file = File::open("foo.txt").await?;
490 /// let std_file = tokio_file.into_std().await;
491 /// # Ok(())
492 /// # }
493 /// ```
494 pub async fn into_std(mut self) -> StdFile {
495 self.inner.get_mut().complete_inflight().await;
496 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
497 }
498
499 /// Tries to immediately destructure `File` into a [`std::fs::File`].
500 ///
501 /// # Errors
502 ///
503 /// This function will return an error containing the file if some
504 /// operation is in-flight.
505 ///
506 /// # Examples
507 ///
508 /// ```no_run
509 /// use tokio::fs::File;
510 ///
511 /// # async fn dox() -> std::io::Result<()> {
512 /// let tokio_file = File::open("foo.txt").await?;
513 /// let std_file = tokio_file.try_into_std().unwrap();
514 /// # Ok(())
515 /// # }
516 /// ```
517 #[allow(clippy::result_large_err)]
518 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
519 match Arc::try_unwrap(self.std) {
520 Ok(file) => Ok(file),
521 Err(std_file_arc) => {
522 self.std = std_file_arc;
523 Err(self)
524 }
525 }
526 }
527
528 /// Changes the permissions on the underlying file.
529 ///
530 /// # Platform-specific behavior
531 ///
532 /// This function currently corresponds to the `fchmod` function on Unix and
533 /// the `SetFileInformationByHandle` function on Windows. Note that, this
534 /// [may change in the future][changes].
535 ///
536 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
537 ///
538 /// # Errors
539 ///
540 /// This function will return an error if the user lacks permission change
541 /// attributes on the underlying file. It may also return an error in other
542 /// os-specific unspecified cases.
543 ///
544 /// # Examples
545 ///
546 /// ```no_run
547 /// use tokio::fs::File;
548 ///
549 /// # async fn dox() -> std::io::Result<()> {
550 /// let file = File::open("foo.txt").await?;
551 /// let mut perms = file.metadata().await?.permissions();
552 /// perms.set_readonly(true);
553 /// file.set_permissions(perms).await?;
554 /// # Ok(())
555 /// # }
556 /// ```
557 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
558 let std = self.std.clone();
559 asyncify(move || std.set_permissions(perm)).await
560 }
561
562 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
563 ///
564 /// Although Tokio uses a sensible default value for this buffer size, this function would be
565 /// useful for changing that default depending on the situation.
566 ///
567 /// # Examples
568 ///
569 /// ```no_run
570 /// use tokio::fs::File;
571 /// use tokio::io::AsyncWriteExt;
572 ///
573 /// # async fn dox() -> std::io::Result<()> {
574 /// let mut file = File::open("foo.txt").await?;
575 ///
576 /// // Set maximum buffer size to 8 MiB
577 /// file.set_max_buf_size(8 * 1024 * 1024);
578 ///
579 /// let mut buf = vec![1; 1024 * 1024 * 1024];
580 ///
581 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
582 /// file.write_all(&mut buf).await?;
583 /// # Ok(())
584 /// # }
585 /// ```
586 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
587 self.max_buf_size = max_buf_size;
588 }
589
590 /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
591 pub fn max_buf_size(&self) -> usize {
592 self.max_buf_size
593 }
594}
595
596impl AsyncRead for File {
597 fn poll_read(
598 self: Pin<&mut Self>,
599 cx: &mut Context<'_>,
600 dst: &mut ReadBuf<'_>,
601 ) -> Poll<io::Result<()>> {
602 ready!(crate::trace::trace_leaf(cx));
603
604 let me = self.get_mut();
605 let inner = me.inner.get_mut();
606
607 loop {
608 match inner.state {
609 State::Idle(ref mut buf_cell) => {
610 let mut buf = buf_cell.take().unwrap();
611
612 if !buf.is_empty() || dst.remaining() == 0 {
613 buf.copy_to(dst);
614 *buf_cell = Some(buf);
615 return Poll::Ready(Ok(()));
616 }
617
618 let std = me.std.clone();
619
620 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
621 inner.state = State::Busy(Inner::poll_read_inner(std, buf, max_buf_size)?);
622 }
623 State::Busy(ref mut rx) => {
624 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
625
626 match op {
627 Operation::Read(Ok(_)) => {
628 buf.copy_to(dst);
629 inner.state = State::Idle(Some(buf));
630 return Poll::Ready(Ok(()));
631 }
632 Operation::Read(Err(e)) => {
633 assert!(buf.is_empty());
634
635 inner.state = State::Idle(Some(buf));
636 return Poll::Ready(Err(e));
637 }
638 Operation::Write(Ok(())) => {
639 assert!(buf.is_empty());
640 inner.state = State::Idle(Some(buf));
641 continue;
642 }
643 Operation::Write(Err(e)) => {
644 assert!(inner.last_write_err.is_none());
645 inner.last_write_err = Some(e.kind());
646 inner.state = State::Idle(Some(buf));
647 }
648 Operation::Seek(result) => {
649 assert!(buf.is_empty());
650 inner.state = State::Idle(Some(buf));
651 if let Ok(pos) = result {
652 inner.pos = pos;
653 }
654 continue;
655 }
656 }
657 }
658 }
659 }
660 }
661}
662
663impl AsyncSeek for File {
664 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
665 let me = self.get_mut();
666 let inner = me.inner.get_mut();
667
668 match inner.state {
669 State::Busy(_) => Err(io::Error::new(
670 io::ErrorKind::Other,
671 "other file operation is pending, call poll_complete before start_seek",
672 )),
673 State::Idle(ref mut buf_cell) => {
674 let mut buf = buf_cell.take().unwrap();
675
676 // Factor in any unread data from the buf
677 if !buf.is_empty() {
678 let n = buf.discard_read();
679
680 if let SeekFrom::Current(ref mut offset) = pos {
681 *offset += n;
682 }
683 }
684
685 let std = me.std.clone();
686
687 inner.state = State::Busy(spawn_blocking(move || {
688 let res = (&*std).seek(pos);
689 (Operation::Seek(res), buf)
690 }));
691 Ok(())
692 }
693 }
694 }
695
696 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
697 ready!(crate::trace::trace_leaf(cx));
698 let inner = self.inner.get_mut();
699
700 loop {
701 match inner.state {
702 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
703 State::Busy(ref mut rx) => {
704 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
705 inner.state = State::Idle(Some(buf));
706
707 match op {
708 Operation::Read(_) => {}
709 Operation::Write(Err(e)) => {
710 assert!(inner.last_write_err.is_none());
711 inner.last_write_err = Some(e.kind());
712 }
713 Operation::Write(_) => {}
714 Operation::Seek(res) => {
715 if let Ok(pos) = res {
716 inner.pos = pos;
717 }
718 return Poll::Ready(res);
719 }
720 }
721 }
722 }
723 }
724 }
725}
726
727impl AsyncWrite for File {
728 fn poll_write(
729 self: Pin<&mut Self>,
730 cx: &mut Context<'_>,
731 src: &[u8],
732 ) -> Poll<io::Result<usize>> {
733 ready!(crate::trace::trace_leaf(cx));
734 let me = self.get_mut();
735 let inner = me.inner.get_mut();
736
737 if let Some(e) = inner.last_write_err.take() {
738 return Poll::Ready(Err(e.into()));
739 }
740
741 loop {
742 match inner.state {
743 State::Idle(ref mut buf_cell) => {
744 let mut buf = buf_cell.take().unwrap();
745
746 let seek = if !buf.is_empty() {
747 Some(SeekFrom::Current(buf.discard_read()))
748 } else {
749 None
750 };
751
752 let n = buf.copy_from(src, me.max_buf_size);
753 let std = me.std.clone();
754
755 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
756 let res = if let Some(seek) = seek {
757 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
758 } else {
759 buf.write_to(&mut &*std)
760 };
761
762 (Operation::Write(res), buf)
763 })
764 .ok_or_else(|| {
765 io::Error::new(io::ErrorKind::Other, "background task failed")
766 })?;
767
768 inner.state = State::Busy(blocking_task_join_handle);
769
770 return Poll::Ready(Ok(n));
771 }
772 State::Busy(ref mut rx) => {
773 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
774 inner.state = State::Idle(Some(buf));
775
776 match op {
777 Operation::Read(_) => {
778 // We don't care about the result here. The fact
779 // that the cursor has advanced will be reflected in
780 // the next iteration of the loop
781 continue;
782 }
783 Operation::Write(res) => {
784 // If the previous write was successful, continue.
785 // Otherwise, error.
786 res?;
787 continue;
788 }
789 Operation::Seek(_) => {
790 // Ignore the seek
791 continue;
792 }
793 }
794 }
795 }
796 }
797 }
798
799 fn poll_write_vectored(
800 self: Pin<&mut Self>,
801 cx: &mut Context<'_>,
802 bufs: &[io::IoSlice<'_>],
803 ) -> Poll<Result<usize, io::Error>> {
804 ready!(crate::trace::trace_leaf(cx));
805 let me = self.get_mut();
806 let inner = me.inner.get_mut();
807
808 if let Some(e) = inner.last_write_err.take() {
809 return Poll::Ready(Err(e.into()));
810 }
811
812 loop {
813 match inner.state {
814 State::Idle(ref mut buf_cell) => {
815 let mut buf = buf_cell.take().unwrap();
816
817 let seek = if !buf.is_empty() {
818 Some(SeekFrom::Current(buf.discard_read()))
819 } else {
820 None
821 };
822
823 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
824 let std = me.std.clone();
825
826 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
827 let res = if let Some(seek) = seek {
828 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
829 } else {
830 buf.write_to(&mut &*std)
831 };
832
833 (Operation::Write(res), buf)
834 })
835 .ok_or_else(|| {
836 io::Error::new(io::ErrorKind::Other, "background task failed")
837 })?;
838
839 inner.state = State::Busy(blocking_task_join_handle);
840
841 return Poll::Ready(Ok(n));
842 }
843 State::Busy(ref mut rx) => {
844 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
845 inner.state = State::Idle(Some(buf));
846
847 match op {
848 Operation::Read(_) => {
849 // We don't care about the result here. The fact
850 // that the cursor has advanced will be reflected in
851 // the next iteration of the loop
852 continue;
853 }
854 Operation::Write(res) => {
855 // If the previous write was successful, continue.
856 // Otherwise, error.
857 res?;
858 continue;
859 }
860 Operation::Seek(_) => {
861 // Ignore the seek
862 continue;
863 }
864 }
865 }
866 }
867 }
868 }
869
870 fn is_write_vectored(&self) -> bool {
871 true
872 }
873
874 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
875 ready!(crate::trace::trace_leaf(cx));
876 let inner = self.inner.get_mut();
877 inner.poll_flush(cx)
878 }
879
880 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
881 ready!(crate::trace::trace_leaf(cx));
882 self.poll_flush(cx)
883 }
884}
885
886impl From<StdFile> for File {
887 fn from(std: StdFile) -> Self {
888 Self::from_std(std)
889 }
890}
891
892impl fmt::Debug for File {
893 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
894 fmt.debug_struct("tokio::fs::File")
895 .field("std", &self.std)
896 .finish()
897 }
898}
899
900#[cfg(unix)]
901impl std::os::unix::io::AsRawFd for File {
902 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
903 self.std.as_raw_fd()
904 }
905}
906
907#[cfg(unix)]
908impl std::os::unix::io::AsFd for File {
909 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
910 unsafe {
911 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
912 }
913 }
914}
915
916#[cfg(unix)]
917impl std::os::unix::io::FromRawFd for File {
918 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
919 // Safety: exactly the same safety contract as
920 // `std::os::unix::io::FromRawFd::from_raw_fd`.
921 unsafe { StdFile::from_raw_fd(fd).into() }
922 }
923}
924
925cfg_windows! {
926 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
927
928 impl AsRawHandle for File {
929 fn as_raw_handle(&self) -> RawHandle {
930 self.std.as_raw_handle()
931 }
932 }
933
934 impl AsHandle for File {
935 fn as_handle(&self) -> BorrowedHandle<'_> {
936 unsafe {
937 BorrowedHandle::borrow_raw(
938 AsRawHandle::as_raw_handle(self),
939 )
940 }
941 }
942 }
943
944 impl FromRawHandle for File {
945 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
946 // Safety: exactly the same safety contract as
947 // `FromRawHandle::from_raw_handle`.
948 unsafe { StdFile::from_raw_handle(handle).into() }
949 }
950 }
951}
952
953impl Inner {
954 fn poll_read_inner(
955 std: Arc<StdFile>,
956 buf: Buf,
957 max_buf_size: usize,
958 ) -> io::Result<JoinHandle<(Operation, Buf)>> {
959 // Unit tests use `MockFile` and the mock `spawn_blocking` infrastructure,
960 // which can't drive real io_uring operations. The io_uring read path
961 // is tested through integration tests in `tests/fs_uring_file_read.rs`.
962 #[cfg(all(
963 not(test),
964 tokio_unstable,
965 feature = "io-uring",
966 feature = "rt",
967 feature = "fs",
968 target_os = "linux",
969 ))]
970 {
971 if let Ok(handle) = crate::runtime::Handle::try_current() {
972 let driver_handle = handle.inner.driver().io();
973
974 if driver_handle.is_uring_ready(io_uring::opcode::Read::CODE) {
975 // Fast path: uring already initialized and Read supported.
976 let fd: crate::io::uring::utils::ArcFd = std;
977 return Ok(spawn(Self::uring_read(fd, buf, max_buf_size)));
978 }
979
980 if !driver_handle.is_uring_probed() {
981 // Not yet probed: lazy init inside an async task so
982 // `File::from_std()` can still benefit from io-uring.
983 return Ok(spawn(Self::lazy_init_read(std, buf, max_buf_size)));
984 }
985 // Probed but unsupported: fall through to spawn_blocking.
986 }
987 }
988
989 // Fallback: spawn_blocking
990 let join = Self::spawn_blocking_read(buf, std, max_buf_size);
991 Ok(join)
992 }
993
994 /// Perform an io-uring read with interrupt retry.
995 #[cfg(all(
996 not(test),
997 tokio_unstable,
998 feature = "io-uring",
999 feature = "rt",
1000 feature = "fs",
1001 target_os = "linux",
1002 ))]
1003 async fn uring_read(
1004 mut fd: crate::io::uring::utils::ArcFd,
1005 mut buf: Buf,
1006 max_buf_size: usize,
1007 ) -> (Operation, Buf) {
1008 use crate::runtime::driver::op::Op;
1009
1010 loop {
1011 let (res, r_fd, r_buf) =
1012 // u64::MAX to use and advance the file position
1013 Op::read_at(fd, buf, max_buf_size, u64::MAX).await;
1014 match res {
1015 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1016 buf = r_buf;
1017 fd = r_fd;
1018 continue;
1019 }
1020 Err(e) => break (Operation::Read(Err(e)), r_buf),
1021 Ok(n) => break (Operation::Read(Ok(n as usize)), r_buf),
1022 }
1023 }
1024 }
1025
1026 /// Attempt lazy io-uring initialization, then read via uring or fall back
1027 /// to a blocking read. Covers the `File::from_std()` path where
1028 /// `check_and_init()` hasn't been called yet.
1029 #[cfg(all(
1030 not(test),
1031 tokio_unstable,
1032 feature = "io-uring",
1033 feature = "rt",
1034 feature = "fs",
1035 target_os = "linux",
1036 ))]
1037 async fn lazy_init_read(std: Arc<StdFile>, buf: Buf, max_buf_size: usize) -> (Operation, Buf) {
1038 let handle = crate::runtime::Handle::current();
1039 let driver_handle = handle.inner.driver().io();
1040 if driver_handle
1041 .check_and_init(io_uring::opcode::Read::CODE)
1042 .await
1043 .unwrap_or(false)
1044 {
1045 let fd: crate::io::uring::utils::ArcFd = std;
1046 Self::uring_read(fd, buf, max_buf_size).await
1047 } else {
1048 match Self::spawn_blocking_read(buf, std, max_buf_size).await {
1049 Ok(result) => result,
1050 Err(e) => (
1051 Operation::Read(Err(io::Error::new(io::ErrorKind::Other, e))),
1052 Buf::with_capacity(0),
1053 ),
1054 }
1055 }
1056 }
1057
1058 fn spawn_blocking_read(
1059 buf: Buf,
1060 std: Arc<StdFile>,
1061 max_buf_size: usize,
1062 ) -> JoinHandle<(Operation, Buf)> {
1063 spawn_blocking(move || {
1064 let mut buf = buf;
1065 // SAFETY: the `Read` implementation of `std` does not
1066 // read from the buffer it is borrowing and correctly
1067 // reports the length of the data written into the buffer.
1068 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
1069 (Operation::Read(res), buf)
1070 })
1071 }
1072
1073 async fn complete_inflight(&mut self) {
1074 use std::future::poll_fn;
1075
1076 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1077 }
1078
1079 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1080 ready!(crate::trace::trace_leaf(cx));
1081 match self.poll_flush(cx) {
1082 Poll::Ready(Err(e)) => {
1083 self.last_write_err = Some(e.kind());
1084 Poll::Ready(())
1085 }
1086 Poll::Ready(Ok(())) => Poll::Ready(()),
1087 Poll::Pending => Poll::Pending,
1088 }
1089 }
1090
1091 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1092 if let Some(e) = self.last_write_err.take() {
1093 return Poll::Ready(Err(e.into()));
1094 }
1095
1096 let (op, buf) = match self.state {
1097 State::Idle(_) => return Poll::Ready(Ok(())),
1098 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1099 };
1100
1101 // The buffer is not used here
1102 self.state = State::Idle(Some(buf));
1103
1104 match op {
1105 Operation::Read(_) => Poll::Ready(Ok(())),
1106 Operation::Write(res) => Poll::Ready(res),
1107 Operation::Seek(_) => Poll::Ready(Ok(())),
1108 }
1109 }
1110}
1111
1112#[cfg(test)]
1113mod tests;