webgpu/poll_thread.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Data and main loop of WGPU poll thread.
6//!
7//! This is roughly based on <https://github.com/LucentFlux/wgpu-async/blob/1322c7e3fcdfc1865a472c7bbbf0e2e06dcf4da8/src/wgpu_future.rs>
8
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex, MutexGuard};
11use std::thread::JoinHandle;
12
13use log::warn;
14
15use crate::wgc::global::Global;
16
17/// Polls devices while there is something to poll.
18///
19/// This objects corresponds to a thread that parks itself when there is no work,
20/// waiting on it, and then calls `poll_all_devices` repeatedly to block.
21///
22/// The thread dies when this object is dropped, and all work in submission is done.
23///
24/// ## Example
25/// ```no_run
26/// let token = self.poller.token(); // create a new token
27/// let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
28/// drop(token); // drop token as closure has been fired
29/// // ...
30/// }));
31/// let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
32/// self.poller.wake(); // wake poller thread to actually poll
33/// ```
34#[derive(Debug)]
35pub(crate) struct Poller {
36 /// The number of closures that still needs to be fired.
37 /// When this is 0, the thread can park itself.
38 work_count: Arc<AtomicUsize>,
39 /// True if thread should die after all work in submission is done
40 is_done: Arc<AtomicBool>,
41 /// Handle to the WGPU poller thread (to be used for unparking the thread)
42 handle: Option<JoinHandle<()>>,
43 /// Lock for device maintain calls (in poll_all_devices and queue_submit)
44 ///
45 /// This is workaround for wgpu deadlocks: <https://github.com/gfx-rs/wgpu/issues/5572>
46 lock: Arc<Mutex<()>>,
47}
48
49#[inline]
50fn poll_all_devices(
51 global: &Arc<Global>,
52 more_work: &mut bool,
53 force_wait: bool,
54 lock: &Mutex<()>,
55) {
56 let _guard = lock.lock().unwrap();
57 match global.poll_all_devices(force_wait) {
58 Ok(all_queue_empty) => *more_work = !all_queue_empty,
59 Err(e) => warn!("Poller thread got `{e}` on poll_all_devices."),
60 }
61 // drop guard
62}
63
64impl Poller {
65 pub(crate) fn new(global: Arc<Global>) -> Self {
66 let work_count = Arc::new(AtomicUsize::new(0));
67 let is_done = Arc::new(AtomicBool::new(false));
68 let work = work_count.clone();
69 let done = is_done.clone();
70 let lock = Arc::new(Mutex::new(()));
71 Self {
72 work_count,
73 is_done,
74 lock: Arc::clone(&lock),
75 handle: Some(
76 std::thread::Builder::new()
77 .name("WGPU poller".into())
78 .spawn(move || {
79 while !done.load(Ordering::Acquire) {
80 let mut more_work = false;
81 // Do non-blocking poll unconditionally
82 // so every `ẁake` (even spurious) will do at least one poll.
83 // this is mostly useful for stuff that is deferred
84 // to maintain calls in wgpu (device resource destruction)
85 poll_all_devices(&global, &mut more_work, false, &lock);
86 while more_work || work.load(Ordering::Acquire) != 0 {
87 poll_all_devices(&global, &mut more_work, true, &lock);
88 }
89 std::thread::park(); // TODO: should we use timeout here
90 }
91 })
92 .expect("Spawning thread should not fail"),
93 ),
94 }
95 }
96
97 /// Creates a token of work
98 pub(crate) fn token(&self) -> WorkToken {
99 let prev = self.work_count.fetch_add(1, Ordering::AcqRel);
100 debug_assert!(
101 prev < usize::MAX,
102 "cannot have more than `usize::MAX` outstanding operations on the GPU"
103 );
104 WorkToken {
105 work_count: Arc::clone(&self.work_count),
106 }
107 }
108
109 /// Wakes the poller thread to start polling.
110 pub(crate) fn wake(&self) {
111 self.handle
112 .as_ref()
113 .expect("Poller thread does not exist!")
114 .thread()
115 .unpark();
116 }
117
118 /// Lock for device maintain calls (in poll_all_devices and queue_submit)
119 pub(crate) fn lock(&self) -> MutexGuard<'_, ()> {
120 self.lock.lock().unwrap()
121 }
122}
123
124impl Drop for Poller {
125 fn drop(&mut self) {
126 self.is_done.store(true, Ordering::Release);
127
128 let handle = self.handle.take().expect("Poller dropped twice");
129 handle.thread().unpark();
130 handle.join().expect("Poller thread panicked");
131 }
132}
133
134/// RAII indicating that there is some work enqueued (closure to be fired),
135/// while this token is held.
136pub(crate) struct WorkToken {
137 work_count: Arc<AtomicUsize>,
138}
139
140impl Drop for WorkToken {
141 fn drop(&mut self) {
142 self.work_count.fetch_sub(1, Ordering::AcqRel);
143 }
144}