tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33cfg_io_uring! {
34    #[cfg(not(test))]
35    use crate::spawn;
36}
37
38/// A reference to an open file on the filesystem.
39///
40/// This is a specialized version of [`std::fs::File`] for usage from the
41/// Tokio runtime.
42///
43/// An instance of a `File` can be read and/or written depending on what options
44/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
45/// cursor that the file contains internally.
46///
47/// A file will not be closed immediately when it goes out of scope if there
48/// are any IO operations that have not yet completed. To ensure that a file is
49/// closed immediately when it is dropped, you should call [`flush`] before
50/// dropping it. Note that this does not ensure that the file has been fully
51/// written to disk; the operating system might keep the changes around in an
52/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
53/// the data to disk.
54///
55/// Reading and writing to a `File` is usually done using the convenience
56/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
57///
58/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
59/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
60/// [`sync_all`]: fn@crate::fs::File::sync_all
61/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
62/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
63///
64/// # Examples
65///
66/// Create a new file and asynchronously write bytes to it:
67///
68/// ```no_run
69/// use tokio::fs::File;
70/// use tokio::io::AsyncWriteExt; // for write_all()
71///
72/// # async fn dox() -> std::io::Result<()> {
73/// let mut file = File::create("foo.txt").await?;
74/// file.write_all(b"hello, world!").await?;
75/// # Ok(())
76/// # }
77/// ```
78///
79/// Read the contents of a file into a buffer:
80///
81/// ```no_run
82/// use tokio::fs::File;
83/// use tokio::io::AsyncReadExt; // for read_to_end()
84///
85/// # async fn dox() -> std::io::Result<()> {
86/// let mut file = File::open("foo.txt").await?;
87///
88/// let mut contents = vec![];
89/// file.read_to_end(&mut contents).await?;
90///
91/// println!("len = {}", contents.len());
92/// # Ok(())
93/// # }
94/// ```
95pub struct File {
96    std: Arc<StdFile>,
97    inner: Mutex<Inner>,
98    max_buf_size: usize,
99}
100
101struct Inner {
102    state: State,
103
104    /// Errors from writes/flushes are returned in write/flush calls. If a write
105    /// error is observed while performing a read, it is saved until the next
106    /// write / flush call.
107    last_write_err: Option<io::ErrorKind>,
108
109    pos: u64,
110}
111
112#[derive(Debug)]
113enum State {
114    Idle(Option<Buf>),
115    Busy(JoinHandle<(Operation, Buf)>),
116}
117
118#[derive(Debug)]
119enum Operation {
120    Read(io::Result<usize>),
121    Write(io::Result<()>),
122    Seek(io::Result<u64>),
123}
124
125impl File {
126    /// Attempts to open a file in read-only mode.
127    ///
128    /// See [`OpenOptions`] for more details.
129    ///
130    /// # Errors
131    ///
132    /// This function will return an error if called from outside of the Tokio
133    /// runtime or if path does not already exist. Other errors may also be
134    /// returned according to `OpenOptions::open`.
135    ///
136    /// # Examples
137    ///
138    /// ```no_run
139    /// use tokio::fs::File;
140    /// use tokio::io::AsyncReadExt;
141    ///
142    /// # async fn dox() -> std::io::Result<()> {
143    /// let mut file = File::open("foo.txt").await?;
144    ///
145    /// let mut contents = vec![];
146    /// file.read_to_end(&mut contents).await?;
147    ///
148    /// println!("len = {}", contents.len());
149    /// # Ok(())
150    /// # }
151    /// ```
152    ///
153    /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
154    ///
155    /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
156    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
157    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
158        Self::options().read(true).open(path).await
159    }
160
161    /// Opens a file in write-only mode.
162    ///
163    /// This function will create a file if it does not exist, and will truncate
164    /// it if it does.
165    ///
166    /// See [`OpenOptions`] for more details.
167    ///
168    /// # Errors
169    ///
170    /// Results in an error if called from outside of the Tokio runtime or if
171    /// the underlying [`create`] call results in an error.
172    ///
173    /// [`create`]: std::fs::File::create
174    ///
175    /// # Examples
176    ///
177    /// ```no_run
178    /// use tokio::fs::File;
179    /// use tokio::io::AsyncWriteExt;
180    ///
181    /// # async fn dox() -> std::io::Result<()> {
182    /// let mut file = File::create("foo.txt").await?;
183    /// file.write_all(b"hello, world!").await?;
184    /// # Ok(())
185    /// # }
186    /// ```
187    ///
188    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
189    ///
190    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
191    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
192    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
193        Self::options()
194            .write(true)
195            .create(true)
196            .truncate(true)
197            .open(path)
198            .await
199    }
200
201    /// Opens a file in read-write mode.
202    ///
203    /// This function will create a file if it does not exist, or return an error
204    /// if it does. This way, if the call succeeds, the file returned is guaranteed
205    /// to be new.
206    ///
207    /// This option is useful because it is atomic. Otherwise between checking
208    /// whether a file exists and creating a new one, the file may have been
209    /// created by another process (a TOCTOU race condition / attack).
210    ///
211    /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
212    ///
213    /// See [`OpenOptions`] for more details.
214    ///
215    /// # Examples
216    ///
217    /// ```no_run
218    /// use tokio::fs::File;
219    /// use tokio::io::AsyncWriteExt;
220    ///
221    /// # async fn dox() -> std::io::Result<()> {
222    /// let mut file = File::create_new("foo.txt").await?;
223    /// file.write_all(b"hello, world!").await?;
224    /// # Ok(())
225    /// # }
226    /// ```
227    ///
228    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
229    ///
230    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
231    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
232    pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
233        Self::options()
234            .read(true)
235            .write(true)
236            .create_new(true)
237            .open(path)
238            .await
239    }
240
241    /// Returns a new [`OpenOptions`] object.
242    ///
243    /// This function returns a new `OpenOptions` object that you can use to
244    /// open or create a file with specific options if `open()` or `create()`
245    /// are not appropriate.
246    ///
247    /// It is equivalent to `OpenOptions::new()`, but allows you to write more
248    /// readable code. Instead of
249    /// `OpenOptions::new().append(true).open("example.log")`,
250    /// you can write `File::options().append(true).open("example.log")`. This
251    /// also avoids the need to import `OpenOptions`.
252    ///
253    /// See the [`OpenOptions::new`] function for more details.
254    ///
255    /// # Examples
256    ///
257    /// ```no_run
258    /// use tokio::fs::File;
259    /// use tokio::io::AsyncWriteExt;
260    ///
261    /// # async fn dox() -> std::io::Result<()> {
262    /// let mut f = File::options().append(true).open("example.log").await?;
263    /// f.write_all(b"new line\n").await?;
264    /// # Ok(())
265    /// # }
266    /// ```
267    #[must_use]
268    pub fn options() -> OpenOptions {
269        OpenOptions::new()
270    }
271
272    /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
273    ///
274    /// # Examples
275    ///
276    /// ```no_run
277    /// // This line could block. It is not recommended to do this on the Tokio
278    /// // runtime.
279    /// let std_file = std::fs::File::open("foo.txt").unwrap();
280    /// let file = tokio::fs::File::from_std(std_file);
281    /// ```
282    pub fn from_std(std: StdFile) -> File {
283        File {
284            std: Arc::new(std),
285            inner: Mutex::new(Inner {
286                state: State::Idle(Some(Buf::with_capacity(0))),
287                last_write_err: None,
288                pos: 0,
289            }),
290            max_buf_size: DEFAULT_MAX_BUF_SIZE,
291        }
292    }
293
294    /// Attempts to sync all OS-internal metadata to disk.
295    ///
296    /// This function will attempt to ensure that all in-core data reaches the
297    /// filesystem before returning.
298    ///
299    /// # Examples
300    ///
301    /// ```no_run
302    /// use tokio::fs::File;
303    /// use tokio::io::AsyncWriteExt;
304    ///
305    /// # async fn dox() -> std::io::Result<()> {
306    /// let mut file = File::create("foo.txt").await?;
307    /// file.write_all(b"hello, world!").await?;
308    /// file.sync_all().await?;
309    /// # Ok(())
310    /// # }
311    /// ```
312    ///
313    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
314    ///
315    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
316    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
317    pub async fn sync_all(&self) -> io::Result<()> {
318        let mut inner = self.inner.lock().await;
319        inner.complete_inflight().await;
320
321        let std = self.std.clone();
322        asyncify(move || std.sync_all()).await
323    }
324
325    /// This function is similar to `sync_all`, except that it may not
326    /// synchronize file metadata to the filesystem.
327    ///
328    /// This is intended for use cases that must synchronize content, but don't
329    /// need the metadata on disk. The goal of this method is to reduce disk
330    /// operations.
331    ///
332    /// Note that some platforms may simply implement this in terms of `sync_all`.
333    ///
334    /// # Examples
335    ///
336    /// ```no_run
337    /// use tokio::fs::File;
338    /// use tokio::io::AsyncWriteExt;
339    ///
340    /// # async fn dox() -> std::io::Result<()> {
341    /// let mut file = File::create("foo.txt").await?;
342    /// file.write_all(b"hello, world!").await?;
343    /// file.sync_data().await?;
344    /// # Ok(())
345    /// # }
346    /// ```
347    ///
348    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
349    ///
350    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
351    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
352    pub async fn sync_data(&self) -> io::Result<()> {
353        let mut inner = self.inner.lock().await;
354        inner.complete_inflight().await;
355
356        let std = self.std.clone();
357        asyncify(move || std.sync_data()).await
358    }
359
360    /// Truncates or extends the underlying file, updating the size of this file to become size.
361    ///
362    /// If the size is less than the current file's size, then the file will be
363    /// shrunk. If it is greater than the current file's size, then the file
364    /// will be extended to size and have all of the intermediate data filled in
365    /// with 0s.
366    ///
367    /// # Errors
368    ///
369    /// This function will return an error if the file is not opened for
370    /// writing.
371    ///
372    /// # Examples
373    ///
374    /// ```no_run
375    /// use tokio::fs::File;
376    /// use tokio::io::AsyncWriteExt;
377    ///
378    /// # async fn dox() -> std::io::Result<()> {
379    /// let mut file = File::create("foo.txt").await?;
380    /// file.write_all(b"hello, world!").await?;
381    /// file.set_len(10).await?;
382    /// # Ok(())
383    /// # }
384    /// ```
385    ///
386    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
387    ///
388    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
389    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
390    pub async fn set_len(&self, size: u64) -> io::Result<()> {
391        let mut inner = self.inner.lock().await;
392        inner.complete_inflight().await;
393
394        let mut buf = match inner.state {
395            State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
396            _ => unreachable!(),
397        };
398
399        let seek = if !buf.is_empty() {
400            Some(SeekFrom::Current(buf.discard_read()))
401        } else {
402            None
403        };
404
405        let std = self.std.clone();
406
407        inner.state = State::Busy(spawn_blocking(move || {
408            let res = if let Some(seek) = seek {
409                (&*std).seek(seek).and_then(|_| std.set_len(size))
410            } else {
411                std.set_len(size)
412            }
413            .map(|()| 0); // the value is discarded later
414
415            // Return the result as a seek
416            (Operation::Seek(res), buf)
417        }));
418
419        let (op, buf) = match inner.state {
420            State::Idle(_) => unreachable!(),
421            State::Busy(ref mut rx) => rx.await?,
422        };
423
424        inner.state = State::Idle(Some(buf));
425
426        match op {
427            Operation::Seek(res) => res.map(|pos| {
428                inner.pos = pos;
429            }),
430            _ => unreachable!(),
431        }
432    }
433
434    /// Queries metadata about the underlying file.
435    ///
436    /// # Examples
437    ///
438    /// ```no_run
439    /// use tokio::fs::File;
440    ///
441    /// # async fn dox() -> std::io::Result<()> {
442    /// let file = File::open("foo.txt").await?;
443    /// let metadata = file.metadata().await?;
444    ///
445    /// println!("{:?}", metadata);
446    /// # Ok(())
447    /// # }
448    /// ```
449    pub async fn metadata(&self) -> io::Result<Metadata> {
450        let std = self.std.clone();
451        asyncify(move || std.metadata()).await
452    }
453
454    /// Creates a new `File` instance that shares the same underlying file handle
455    /// as the existing `File` instance. Reads, writes, and seeks will affect both
456    /// File instances simultaneously.
457    ///
458    /// # Examples
459    ///
460    /// ```no_run
461    /// use tokio::fs::File;
462    ///
463    /// # async fn dox() -> std::io::Result<()> {
464    /// let file = File::open("foo.txt").await?;
465    /// let file_clone = file.try_clone().await?;
466    /// # Ok(())
467    /// # }
468    /// ```
469    pub async fn try_clone(&self) -> io::Result<File> {
470        self.inner.lock().await.complete_inflight().await;
471        let std = self.std.clone();
472        let std_file = asyncify(move || std.try_clone()).await?;
473        let mut file = File::from_std(std_file);
474        file.set_max_buf_size(self.max_buf_size);
475        Ok(file)
476    }
477
478    /// Destructures `File` into a [`std::fs::File`]. This function is
479    /// async to allow any in-flight operations to complete.
480    ///
481    /// Use `File::try_into_std` to attempt conversion immediately.
482    ///
483    /// # Examples
484    ///
485    /// ```no_run
486    /// use tokio::fs::File;
487    ///
488    /// # async fn dox() -> std::io::Result<()> {
489    /// let tokio_file = File::open("foo.txt").await?;
490    /// let std_file = tokio_file.into_std().await;
491    /// # Ok(())
492    /// # }
493    /// ```
494    pub async fn into_std(mut self) -> StdFile {
495        self.inner.get_mut().complete_inflight().await;
496        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
497    }
498
499    /// Tries to immediately destructure `File` into a [`std::fs::File`].
500    ///
501    /// # Errors
502    ///
503    /// This function will return an error containing the file if some
504    /// operation is in-flight.
505    ///
506    /// # Examples
507    ///
508    /// ```no_run
509    /// use tokio::fs::File;
510    ///
511    /// # async fn dox() -> std::io::Result<()> {
512    /// let tokio_file = File::open("foo.txt").await?;
513    /// let std_file = tokio_file.try_into_std().unwrap();
514    /// # Ok(())
515    /// # }
516    /// ```
517    #[allow(clippy::result_large_err)]
518    pub fn try_into_std(mut self) -> Result<StdFile, Self> {
519        match Arc::try_unwrap(self.std) {
520            Ok(file) => Ok(file),
521            Err(std_file_arc) => {
522                self.std = std_file_arc;
523                Err(self)
524            }
525        }
526    }
527
528    /// Changes the permissions on the underlying file.
529    ///
530    /// # Platform-specific behavior
531    ///
532    /// This function currently corresponds to the `fchmod` function on Unix and
533    /// the `SetFileInformationByHandle` function on Windows. Note that, this
534    /// [may change in the future][changes].
535    ///
536    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
537    ///
538    /// # Errors
539    ///
540    /// This function will return an error if the user lacks permission change
541    /// attributes on the underlying file. It may also return an error in other
542    /// os-specific unspecified cases.
543    ///
544    /// # Examples
545    ///
546    /// ```no_run
547    /// use tokio::fs::File;
548    ///
549    /// # async fn dox() -> std::io::Result<()> {
550    /// let file = File::open("foo.txt").await?;
551    /// let mut perms = file.metadata().await?.permissions();
552    /// perms.set_readonly(true);
553    /// file.set_permissions(perms).await?;
554    /// # Ok(())
555    /// # }
556    /// ```
557    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
558        let std = self.std.clone();
559        asyncify(move || std.set_permissions(perm)).await
560    }
561
562    /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
563    ///
564    /// Although Tokio uses a sensible default value for this buffer size, this function would be
565    /// useful for changing that default depending on the situation.
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// use tokio::fs::File;
571    /// use tokio::io::AsyncWriteExt;
572    ///
573    /// # async fn dox() -> std::io::Result<()> {
574    /// let mut file = File::open("foo.txt").await?;
575    ///
576    /// // Set maximum buffer size to 8 MiB
577    /// file.set_max_buf_size(8 * 1024 * 1024);
578    ///
579    /// let mut buf = vec![1; 1024 * 1024 * 1024];
580    ///
581    /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
582    /// file.write_all(&mut buf).await?;
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
587        self.max_buf_size = max_buf_size;
588    }
589
590    /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
591    pub fn max_buf_size(&self) -> usize {
592        self.max_buf_size
593    }
594}
595
596impl AsyncRead for File {
597    fn poll_read(
598        self: Pin<&mut Self>,
599        cx: &mut Context<'_>,
600        dst: &mut ReadBuf<'_>,
601    ) -> Poll<io::Result<()>> {
602        ready!(crate::trace::trace_leaf(cx));
603
604        let me = self.get_mut();
605        let inner = me.inner.get_mut();
606
607        loop {
608            match inner.state {
609                State::Idle(ref mut buf_cell) => {
610                    let mut buf = buf_cell.take().unwrap();
611
612                    if !buf.is_empty() || dst.remaining() == 0 {
613                        buf.copy_to(dst);
614                        *buf_cell = Some(buf);
615                        return Poll::Ready(Ok(()));
616                    }
617
618                    let std = me.std.clone();
619
620                    let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
621                    inner.state = State::Busy(Inner::poll_read_inner(std, buf, max_buf_size)?);
622                }
623                State::Busy(ref mut rx) => {
624                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
625
626                    match op {
627                        Operation::Read(Ok(_)) => {
628                            buf.copy_to(dst);
629                            inner.state = State::Idle(Some(buf));
630                            return Poll::Ready(Ok(()));
631                        }
632                        Operation::Read(Err(e)) => {
633                            assert!(buf.is_empty());
634
635                            inner.state = State::Idle(Some(buf));
636                            return Poll::Ready(Err(e));
637                        }
638                        Operation::Write(Ok(())) => {
639                            assert!(buf.is_empty());
640                            inner.state = State::Idle(Some(buf));
641                            continue;
642                        }
643                        Operation::Write(Err(e)) => {
644                            assert!(inner.last_write_err.is_none());
645                            inner.last_write_err = Some(e.kind());
646                            inner.state = State::Idle(Some(buf));
647                        }
648                        Operation::Seek(result) => {
649                            assert!(buf.is_empty());
650                            inner.state = State::Idle(Some(buf));
651                            if let Ok(pos) = result {
652                                inner.pos = pos;
653                            }
654                            continue;
655                        }
656                    }
657                }
658            }
659        }
660    }
661}
662
663impl AsyncSeek for File {
664    fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
665        let me = self.get_mut();
666        let inner = me.inner.get_mut();
667
668        match inner.state {
669            State::Busy(_) => Err(io::Error::new(
670                io::ErrorKind::Other,
671                "other file operation is pending, call poll_complete before start_seek",
672            )),
673            State::Idle(ref mut buf_cell) => {
674                let mut buf = buf_cell.take().unwrap();
675
676                // Factor in any unread data from the buf
677                if !buf.is_empty() {
678                    let n = buf.discard_read();
679
680                    if let SeekFrom::Current(ref mut offset) = pos {
681                        *offset += n;
682                    }
683                }
684
685                let std = me.std.clone();
686
687                inner.state = State::Busy(spawn_blocking(move || {
688                    let res = (&*std).seek(pos);
689                    (Operation::Seek(res), buf)
690                }));
691                Ok(())
692            }
693        }
694    }
695
696    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
697        ready!(crate::trace::trace_leaf(cx));
698        let inner = self.inner.get_mut();
699
700        loop {
701            match inner.state {
702                State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
703                State::Busy(ref mut rx) => {
704                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
705                    inner.state = State::Idle(Some(buf));
706
707                    match op {
708                        Operation::Read(_) => {}
709                        Operation::Write(Err(e)) => {
710                            assert!(inner.last_write_err.is_none());
711                            inner.last_write_err = Some(e.kind());
712                        }
713                        Operation::Write(_) => {}
714                        Operation::Seek(res) => {
715                            if let Ok(pos) = res {
716                                inner.pos = pos;
717                            }
718                            return Poll::Ready(res);
719                        }
720                    }
721                }
722            }
723        }
724    }
725}
726
727impl AsyncWrite for File {
728    fn poll_write(
729        self: Pin<&mut Self>,
730        cx: &mut Context<'_>,
731        src: &[u8],
732    ) -> Poll<io::Result<usize>> {
733        ready!(crate::trace::trace_leaf(cx));
734        let me = self.get_mut();
735        let inner = me.inner.get_mut();
736
737        if let Some(e) = inner.last_write_err.take() {
738            return Poll::Ready(Err(e.into()));
739        }
740
741        loop {
742            match inner.state {
743                State::Idle(ref mut buf_cell) => {
744                    let mut buf = buf_cell.take().unwrap();
745
746                    let seek = if !buf.is_empty() {
747                        Some(SeekFrom::Current(buf.discard_read()))
748                    } else {
749                        None
750                    };
751
752                    let n = buf.copy_from(src, me.max_buf_size);
753                    let std = me.std.clone();
754
755                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
756                        let res = if let Some(seek) = seek {
757                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
758                        } else {
759                            buf.write_to(&mut &*std)
760                        };
761
762                        (Operation::Write(res), buf)
763                    })
764                    .ok_or_else(|| {
765                        io::Error::new(io::ErrorKind::Other, "background task failed")
766                    })?;
767
768                    inner.state = State::Busy(blocking_task_join_handle);
769
770                    return Poll::Ready(Ok(n));
771                }
772                State::Busy(ref mut rx) => {
773                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
774                    inner.state = State::Idle(Some(buf));
775
776                    match op {
777                        Operation::Read(_) => {
778                            // We don't care about the result here. The fact
779                            // that the cursor has advanced will be reflected in
780                            // the next iteration of the loop
781                            continue;
782                        }
783                        Operation::Write(res) => {
784                            // If the previous write was successful, continue.
785                            // Otherwise, error.
786                            res?;
787                            continue;
788                        }
789                        Operation::Seek(_) => {
790                            // Ignore the seek
791                            continue;
792                        }
793                    }
794                }
795            }
796        }
797    }
798
799    fn poll_write_vectored(
800        self: Pin<&mut Self>,
801        cx: &mut Context<'_>,
802        bufs: &[io::IoSlice<'_>],
803    ) -> Poll<Result<usize, io::Error>> {
804        ready!(crate::trace::trace_leaf(cx));
805        let me = self.get_mut();
806        let inner = me.inner.get_mut();
807
808        if let Some(e) = inner.last_write_err.take() {
809            return Poll::Ready(Err(e.into()));
810        }
811
812        loop {
813            match inner.state {
814                State::Idle(ref mut buf_cell) => {
815                    let mut buf = buf_cell.take().unwrap();
816
817                    let seek = if !buf.is_empty() {
818                        Some(SeekFrom::Current(buf.discard_read()))
819                    } else {
820                        None
821                    };
822
823                    let n = buf.copy_from_bufs(bufs, me.max_buf_size);
824                    let std = me.std.clone();
825
826                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
827                        let res = if let Some(seek) = seek {
828                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
829                        } else {
830                            buf.write_to(&mut &*std)
831                        };
832
833                        (Operation::Write(res), buf)
834                    })
835                    .ok_or_else(|| {
836                        io::Error::new(io::ErrorKind::Other, "background task failed")
837                    })?;
838
839                    inner.state = State::Busy(blocking_task_join_handle);
840
841                    return Poll::Ready(Ok(n));
842                }
843                State::Busy(ref mut rx) => {
844                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
845                    inner.state = State::Idle(Some(buf));
846
847                    match op {
848                        Operation::Read(_) => {
849                            // We don't care about the result here. The fact
850                            // that the cursor has advanced will be reflected in
851                            // the next iteration of the loop
852                            continue;
853                        }
854                        Operation::Write(res) => {
855                            // If the previous write was successful, continue.
856                            // Otherwise, error.
857                            res?;
858                            continue;
859                        }
860                        Operation::Seek(_) => {
861                            // Ignore the seek
862                            continue;
863                        }
864                    }
865                }
866            }
867        }
868    }
869
870    fn is_write_vectored(&self) -> bool {
871        true
872    }
873
874    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
875        ready!(crate::trace::trace_leaf(cx));
876        let inner = self.inner.get_mut();
877        inner.poll_flush(cx)
878    }
879
880    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
881        ready!(crate::trace::trace_leaf(cx));
882        self.poll_flush(cx)
883    }
884}
885
886impl From<StdFile> for File {
887    fn from(std: StdFile) -> Self {
888        Self::from_std(std)
889    }
890}
891
892impl fmt::Debug for File {
893    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
894        fmt.debug_struct("tokio::fs::File")
895            .field("std", &self.std)
896            .finish()
897    }
898}
899
900#[cfg(unix)]
901impl std::os::unix::io::AsRawFd for File {
902    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
903        self.std.as_raw_fd()
904    }
905}
906
907#[cfg(unix)]
908impl std::os::unix::io::AsFd for File {
909    fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
910        unsafe {
911            std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
912        }
913    }
914}
915
916#[cfg(unix)]
917impl std::os::unix::io::FromRawFd for File {
918    unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
919        // Safety: exactly the same safety contract as
920        // `std::os::unix::io::FromRawFd::from_raw_fd`.
921        unsafe { StdFile::from_raw_fd(fd).into() }
922    }
923}
924
925cfg_windows! {
926    use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
927
928    impl AsRawHandle for File {
929        fn as_raw_handle(&self) -> RawHandle {
930            self.std.as_raw_handle()
931        }
932    }
933
934    impl AsHandle for File {
935        fn as_handle(&self) -> BorrowedHandle<'_> {
936            unsafe {
937                BorrowedHandle::borrow_raw(
938                    AsRawHandle::as_raw_handle(self),
939                )
940            }
941        }
942    }
943
944    impl FromRawHandle for File {
945        unsafe fn from_raw_handle(handle: RawHandle) -> Self {
946            // Safety: exactly the same safety contract as
947            // `FromRawHandle::from_raw_handle`.
948            unsafe { StdFile::from_raw_handle(handle).into() }
949        }
950    }
951}
952
953impl Inner {
954    fn poll_read_inner(
955        std: Arc<StdFile>,
956        buf: Buf,
957        max_buf_size: usize,
958    ) -> io::Result<JoinHandle<(Operation, Buf)>> {
959        // Unit tests use `MockFile` and the mock `spawn_blocking` infrastructure,
960        // which can't drive real io_uring operations. The io_uring read path
961        // is tested through integration tests in `tests/fs_uring_file_read.rs`.
962        #[cfg(all(
963            not(test),
964            tokio_unstable,
965            feature = "io-uring",
966            feature = "rt",
967            feature = "fs",
968            target_os = "linux",
969        ))]
970        {
971            if let Ok(handle) = crate::runtime::Handle::try_current() {
972                let driver_handle = handle.inner.driver().io();
973
974                if driver_handle.is_uring_ready(io_uring::opcode::Read::CODE) {
975                    // Fast path: uring already initialized and Read supported.
976                    let fd: crate::io::uring::utils::ArcFd = std;
977                    return Ok(spawn(Self::uring_read(fd, buf, max_buf_size)));
978                }
979
980                if !driver_handle.is_uring_probed() {
981                    // Not yet probed: lazy init inside an async task so
982                    // `File::from_std()` can still benefit from io-uring.
983                    return Ok(spawn(Self::lazy_init_read(std, buf, max_buf_size)));
984                }
985                // Probed but unsupported: fall through to spawn_blocking.
986            }
987        }
988
989        // Fallback: spawn_blocking
990        let join = Self::spawn_blocking_read(buf, std, max_buf_size);
991        Ok(join)
992    }
993
994    /// Perform an io-uring read with interrupt retry.
995    #[cfg(all(
996        not(test),
997        tokio_unstable,
998        feature = "io-uring",
999        feature = "rt",
1000        feature = "fs",
1001        target_os = "linux",
1002    ))]
1003    async fn uring_read(
1004        mut fd: crate::io::uring::utils::ArcFd,
1005        mut buf: Buf,
1006        max_buf_size: usize,
1007    ) -> (Operation, Buf) {
1008        use crate::runtime::driver::op::Op;
1009
1010        loop {
1011            let (res, r_fd, r_buf) =
1012                // u64::MAX to use and advance the file position
1013                Op::read_at(fd, buf, max_buf_size, u64::MAX).await;
1014            match res {
1015                Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1016                    buf = r_buf;
1017                    fd = r_fd;
1018                    continue;
1019                }
1020                Err(e) => break (Operation::Read(Err(e)), r_buf),
1021                Ok(n) => break (Operation::Read(Ok(n as usize)), r_buf),
1022            }
1023        }
1024    }
1025
1026    /// Attempt lazy io-uring initialization, then read via uring or fall back
1027    /// to a blocking read. Covers the `File::from_std()` path where
1028    /// `check_and_init()` hasn't been called yet.
1029    #[cfg(all(
1030        not(test),
1031        tokio_unstable,
1032        feature = "io-uring",
1033        feature = "rt",
1034        feature = "fs",
1035        target_os = "linux",
1036    ))]
1037    async fn lazy_init_read(std: Arc<StdFile>, buf: Buf, max_buf_size: usize) -> (Operation, Buf) {
1038        let handle = crate::runtime::Handle::current();
1039        let driver_handle = handle.inner.driver().io();
1040        if driver_handle
1041            .check_and_init(io_uring::opcode::Read::CODE)
1042            .await
1043            .unwrap_or(false)
1044        {
1045            let fd: crate::io::uring::utils::ArcFd = std;
1046            Self::uring_read(fd, buf, max_buf_size).await
1047        } else {
1048            match Self::spawn_blocking_read(buf, std, max_buf_size).await {
1049                Ok(result) => result,
1050                Err(e) => (
1051                    Operation::Read(Err(io::Error::new(io::ErrorKind::Other, e))),
1052                    Buf::with_capacity(0),
1053                ),
1054            }
1055        }
1056    }
1057
1058    fn spawn_blocking_read(
1059        buf: Buf,
1060        std: Arc<StdFile>,
1061        max_buf_size: usize,
1062    ) -> JoinHandle<(Operation, Buf)> {
1063        spawn_blocking(move || {
1064            let mut buf = buf;
1065            // SAFETY: the `Read` implementation of `std` does not
1066            // read from the buffer it is borrowing and correctly
1067            // reports the length of the data written into the buffer.
1068            let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
1069            (Operation::Read(res), buf)
1070        })
1071    }
1072
1073    async fn complete_inflight(&mut self) {
1074        use std::future::poll_fn;
1075
1076        poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1077    }
1078
1079    fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1080        ready!(crate::trace::trace_leaf(cx));
1081        match self.poll_flush(cx) {
1082            Poll::Ready(Err(e)) => {
1083                self.last_write_err = Some(e.kind());
1084                Poll::Ready(())
1085            }
1086            Poll::Ready(Ok(())) => Poll::Ready(()),
1087            Poll::Pending => Poll::Pending,
1088        }
1089    }
1090
1091    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1092        if let Some(e) = self.last_write_err.take() {
1093            return Poll::Ready(Err(e.into()));
1094        }
1095
1096        let (op, buf) = match self.state {
1097            State::Idle(_) => return Poll::Ready(Ok(())),
1098            State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1099        };
1100
1101        // The buffer is not used here
1102        self.state = State::Idle(Some(buf));
1103
1104        match op {
1105            Operation::Read(_) => Poll::Ready(Ok(())),
1106            Operation::Write(res) => Poll::Ready(res),
1107            Operation::Seek(_) => Poll::Ready(Ok(())),
1108        }
1109    }
1110}
1111
1112#[cfg(test)]
1113mod tests;