webgpu/
poll_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Data and main loop of WGPU poll thread.
6//!
7//! This is roughly based on <https://github.com/LucentFlux/wgpu-async/blob/1322c7e3fcdfc1865a472c7bbbf0e2e06dcf4da8/src/wgpu_future.rs>
8
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex, MutexGuard};
11use std::thread::JoinHandle;
12
13use log::warn;
14
15use crate::wgc::global::Global;
16
17/// Polls devices while there is something to poll.
18///
19/// This objects corresponds to a thread that parks itself when there is no work,
20/// waiting on it, and then calls `poll_all_devices` repeatedly to block.
21///
22/// The thread dies when this object is dropped, and all work in submission is done.
23///
24/// ## Example
25/// ```no_run
26/// let token = self.poller.token(); // create a new token
27/// let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
28///    drop(token); // drop token as closure has been fired
29///    // ...
30/// }));
31/// let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
32/// self.poller.wake(); // wake poller thread to actually poll
33/// ```
34#[derive(Debug)]
35pub(crate) struct Poller {
36    /// The number of closures that still needs to be fired.
37    /// When this is 0, the thread can park itself.
38    work_count: Arc<AtomicUsize>,
39    /// True if thread should die after all work in submission is done
40    is_done: Arc<AtomicBool>,
41    /// Handle to the WGPU poller thread (to be used for unparking the thread)
42    handle: Option<JoinHandle<()>>,
43    /// Lock for device maintain calls (in poll_all_devices and queue_submit)
44    ///
45    /// This is workaround for wgpu deadlocks: <https://github.com/gfx-rs/wgpu/issues/5572>
46    lock: Arc<Mutex<()>>,
47}
48
49#[inline]
50fn poll_all_devices(
51    global: &Arc<Global>,
52    more_work: &mut bool,
53    force_wait: bool,
54    lock: &Mutex<()>,
55) {
56    let _guard = lock.lock().unwrap();
57    match global.poll_all_devices(force_wait) {
58        Ok(all_queue_empty) => *more_work = !all_queue_empty,
59        Err(e) => warn!("Poller thread got `{e}` on poll_all_devices."),
60    }
61    // drop guard
62}
63
64impl Poller {
65    pub(crate) fn new(global: Arc<Global>) -> Self {
66        let work_count = Arc::new(AtomicUsize::new(0));
67        let is_done = Arc::new(AtomicBool::new(false));
68        let work = work_count.clone();
69        let done = is_done.clone();
70        let lock = Arc::new(Mutex::new(()));
71        Self {
72            work_count,
73            is_done,
74            lock: Arc::clone(&lock),
75            handle: Some(
76                std::thread::Builder::new()
77                    .name("WGPU poller".into())
78                    .spawn(move || {
79                        while !done.load(Ordering::Acquire) {
80                            let mut more_work = false;
81                            // Do non-blocking poll unconditionally
82                            // so every `ẁake` (even spurious) will do at least one poll.
83                            // this is mostly useful for stuff that is deferred
84                            // to maintain calls in wgpu (device resource destruction)
85                            poll_all_devices(&global, &mut more_work, false, &lock);
86                            while more_work || work.load(Ordering::Acquire) != 0 {
87                                poll_all_devices(&global, &mut more_work, true, &lock);
88                            }
89                            std::thread::park(); // TODO: should we use timeout here
90                        }
91                    })
92                    .expect("Spawning thread should not fail"),
93            ),
94        }
95    }
96
97    /// Creates a token of work
98    pub(crate) fn token(&self) -> WorkToken {
99        let prev = self.work_count.fetch_add(1, Ordering::AcqRel);
100        debug_assert!(
101            prev < usize::MAX,
102            "cannot have more than `usize::MAX` outstanding operations on the GPU"
103        );
104        WorkToken {
105            work_count: Arc::clone(&self.work_count),
106        }
107    }
108
109    /// Wakes the poller thread to start polling.
110    pub(crate) fn wake(&self) {
111        self.handle
112            .as_ref()
113            .expect("Poller thread does not exist!")
114            .thread()
115            .unpark();
116    }
117
118    /// Lock for device maintain calls (in poll_all_devices and queue_submit)
119    pub(crate) fn lock(&self) -> MutexGuard<'_, ()> {
120        self.lock.lock().unwrap()
121    }
122}
123
124impl Drop for Poller {
125    fn drop(&mut self) {
126        self.is_done.store(true, Ordering::Release);
127
128        let handle = self.handle.take().expect("Poller dropped twice");
129        handle.thread().unpark();
130        handle.join().expect("Poller thread panicked");
131    }
132}
133
134/// RAII indicating that there is some work enqueued (closure to be fired),
135/// while this token is held.
136pub(crate) struct WorkToken {
137    work_count: Arc<AtomicUsize>,
138}
139
140impl Drop for WorkToken {
141    fn drop(&mut self) {
142        self.work_count.fetch_sub(1, Ordering::AcqRel);
143    }
144}