1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::threadpool::ThreadPool;
15use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
16use log::{debug, warn};
17use rustc_hash::FxHashMap;
18use servo_config::pref;
19use servo_url::origin::ImmutableOrigin;
20use storage_traits::indexeddb_thread::{
21 AsyncOperation, BackendError, BackendResult, CreateObjectResult, DbResult, IndexedDBThreadMsg,
22 IndexedDBTxnMode, KeyPath, SyncOperation,
23};
24use uuid::Uuid;
25
26use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
27
28pub trait IndexedDBThreadFactory {
29 fn new(config_dir: Option<PathBuf>) -> Self;
30}
31
32impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
33 fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
34 let (chan, port) = ipc::channel().unwrap();
35
36 let mut idb_base_dir = PathBuf::new();
37 if let Some(p) = config_dir {
38 idb_base_dir.push(p);
39 }
40 idb_base_dir.push("IndexedDB");
41
42 thread::Builder::new()
43 .name("IndexedDBManager".to_owned())
44 .spawn(move || {
45 IndexedDBManager::new(port, idb_base_dir).start();
46 })
47 .expect("Thread spawning failed");
48
49 chan
50 }
51}
52
53#[derive(Clone, Eq, Hash, PartialEq)]
54pub struct IndexedDBDescription {
55 pub origin: ImmutableOrigin,
56 pub name: String,
57}
58
59impl IndexedDBDescription {
60 const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
62 0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
63 0x5d,
64 ]);
65 pub(super) fn as_path(&self) -> PathBuf {
68 let mut path = PathBuf::new();
69
70 let origin_uuid = Uuid::new_v5(
72 Self::NAMESPACE_SERVO_IDB,
73 self.origin.ascii_serialization().as_bytes(),
74 );
75 let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
76 path.push(origin_uuid.to_string());
77 path.push(db_name_uuid.to_string());
78
79 path
80 }
81}
82
83struct IndexedDBEnvironment<E: KvsEngine> {
84 engine: E,
85 transactions: FxHashMap<u64, KvsTransaction>,
86 serial_number_counter: u64,
87}
88
89impl<E: KvsEngine> IndexedDBEnvironment<E> {
90 fn new(engine: E) -> IndexedDBEnvironment<E> {
91 IndexedDBEnvironment {
92 engine,
93 transactions: FxHashMap::default(),
94 serial_number_counter: 0,
95 }
96 }
97
98 fn queue_operation(
99 &mut self,
100 store_name: &str,
101 serial_number: u64,
102 mode: IndexedDBTxnMode,
103 operation: AsyncOperation,
104 ) {
105 self.transactions
106 .entry(serial_number)
107 .or_insert_with(|| KvsTransaction {
108 requests: VecDeque::new(),
109 mode,
110 })
111 .requests
112 .push_back(KvsOperation {
113 operation,
114 store_name: String::from(store_name),
115 });
116 }
117
118 fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<BackendResult<()>>>) {
120 if let Some(txn) = self.transactions.remove(&txn) {
123 let _ = self.engine.process_transaction(txn).blocking_recv();
124 }
125
126 if let Some(sender) = sender {
129 if sender.send(Ok(())).is_err() {
130 warn!("IDBTransaction starter dropped its channel");
131 }
132 }
133 }
134
135 fn has_key_generator(&self, store_name: &str) -> bool {
136 self.engine.has_key_generator(store_name)
137 }
138
139 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
140 self.engine.key_path(store_name)
141 }
142
143 fn create_index(
144 &self,
145 store_name: &str,
146 index_name: String,
147 key_path: KeyPath,
148 unique: bool,
149 multi_entry: bool,
150 ) -> DbResult<CreateObjectResult> {
151 self.engine
152 .create_index(store_name, index_name, key_path, unique, multi_entry)
153 .map_err(|err| format!("{err:?}"))
154 }
155
156 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
157 self.engine
158 .delete_index(store_name, index_name)
159 .map_err(|err| format!("{err:?}"))
160 }
161
162 fn create_object_store(
163 &mut self,
164 store_name: &str,
165 key_path: Option<KeyPath>,
166 auto_increment: bool,
167 ) -> DbResult<CreateObjectResult> {
168 self.engine
169 .create_store(store_name, key_path, auto_increment)
170 .map_err(|err| format!("{err:?}"))
171 }
172
173 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
174 let result = self.engine.delete_store(store_name);
175 result.map_err(|err| format!("{err:?}"))
176 }
177
178 fn delete_database(self, sender: IpcSender<BackendResult<()>>) {
179 let result = self.engine.delete_database();
180 let _ = sender.send(
181 result
182 .map_err(|err| format!("{err:?}"))
183 .map_err(BackendError::from),
184 );
185 }
186
187 fn version(&self) -> DbResult<u64> {
188 self.engine.version().map_err(|err| format!("{err:?}"))
189 }
190
191 fn set_version(&mut self, version: u64) -> DbResult<()> {
192 self.engine
193 .set_version(version)
194 .map_err(|err| format!("{err:?}"))
195 }
196}
197
198struct IndexedDBManager {
199 port: IpcReceiver<IndexedDBThreadMsg>,
200 idb_base_dir: PathBuf,
201 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
202 thread_pool: Arc<ThreadPool>,
203}
204
205impl IndexedDBManager {
206 fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
207 debug!("New indexedDBManager");
208
209 let thread_count = thread::available_parallelism()
213 .map(|i| i.get())
214 .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
215 .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
216
217 IndexedDBManager {
218 port,
219 idb_base_dir,
220 databases: HashMap::new(),
221 thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
222 }
223 }
224}
225
226impl IndexedDBManager {
227 fn start(&mut self) {
228 loop {
229 let message = match self.port.recv() {
232 Ok(msg) => msg,
233 Err(e) => match e {
234 IpcError::Disconnected => {
235 break;
236 },
237 other => {
238 warn!("Error in IndexedDB thread: {:?}", other);
239 continue;
240 },
241 },
242 };
243 match message {
244 IndexedDBThreadMsg::Sync(operation) => {
245 self.handle_sync_operation(operation);
246 },
247 IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
248 if let Some(db) = self.get_database_mut(origin, db_name) {
249 db.queue_operation(&store_name, txn, mode, operation);
251 db.start_transaction(txn, None);
256 }
257 },
258 }
259 }
260 }
261
262 fn get_database(
263 &self,
264 origin: ImmutableOrigin,
265 db_name: String,
266 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
267 let idb_description = IndexedDBDescription {
268 origin,
269 name: db_name,
270 };
271
272 self.databases.get(&idb_description)
273 }
274
275 fn get_database_mut(
276 &mut self,
277 origin: ImmutableOrigin,
278 db_name: String,
279 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
280 let idb_description = IndexedDBDescription {
281 origin,
282 name: db_name,
283 };
284
285 self.databases.get_mut(&idb_description)
286 }
287
288 fn handle_sync_operation(&mut self, operation: SyncOperation) {
289 match operation {
290 SyncOperation::CloseDatabase(sender, origin, db_name) => {
291 let idb_description = IndexedDBDescription {
292 origin,
293 name: db_name,
294 };
295 if let Some(_db) = self.databases.remove(&idb_description) {
296 }
298 let _ = sender.send(Ok(()));
299 },
300 SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
301 let idb_description = IndexedDBDescription {
302 origin,
303 name: db_name,
304 };
305
306 let idb_base_dir = self.idb_base_dir.as_path();
307
308 let version = version.unwrap_or(0);
309
310 match self.databases.entry(idb_description.clone()) {
311 Entry::Vacant(e) => {
312 let db = IndexedDBEnvironment::new(
313 SqliteEngine::new(
314 idb_base_dir,
315 &idb_description,
316 self.thread_pool.clone(),
317 )
318 .expect("Failed to create sqlite engine"),
319 );
320 let _ = sender.send(db.version().unwrap_or(version));
321 e.insert(db);
322 },
323 Entry::Occupied(db) => {
324 let _ = sender.send(db.get().version().unwrap_or(version));
325 },
326 }
327 },
328 SyncOperation::DeleteDatabase(sender, origin, db_name) => {
329 let idb_description = IndexedDBDescription {
333 origin,
334 name: db_name,
335 };
336 if let Some(db) = self.databases.remove(&idb_description) {
337 db.delete_database(sender);
338 } else {
339 let _ = sender.send(Ok(()));
340 }
341 },
342 SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
343 let result = self
344 .get_database(origin, db_name)
345 .map(|db| db.has_key_generator(&store_name));
346 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
347 },
348 SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
349 let result = self
350 .get_database(origin, db_name)
351 .map(|db| db.key_path(&store_name));
352 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
353 },
354 SyncOperation::CreateIndex(
355 sender,
356 origin,
357 db_name,
358 store_name,
359 index_name,
360 key_path,
361 unique,
362 multi_entry,
363 ) => {
364 if let Some(db) = self.get_database(origin, db_name) {
365 let result =
366 db.create_index(&store_name, index_name, key_path, unique, multi_entry);
367 let _ = sender.send(result.map_err(BackendError::from));
368 } else {
369 let _ = sender.send(Err(BackendError::DbNotFound));
370 }
371 },
372 SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
373 if let Some(db) = self.get_database(origin, db_name) {
374 let result = db.delete_index(&store_name, index_name);
375 let _ = sender.send(result.map_err(BackendError::from));
376 } else {
377 let _ = sender.send(Err(BackendError::DbNotFound));
378 }
379 },
380 SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
381 let _ = sender.send(Ok(()));
383 },
384 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
385 if let Some(db) = self.get_database_mut(origin, db_name) {
386 if version > db.version().unwrap_or(0) {
387 let _ = db.set_version(version);
388 }
389 let _ = sender.send(db.version().map_err(BackendError::from));
391 } else {
392 let _ = sender.send(Err(BackendError::DbNotFound));
393 }
394 },
395 SyncOperation::CreateObjectStore(
396 sender,
397 origin,
398 db_name,
399 store_name,
400 key_paths,
401 auto_increment,
402 ) => {
403 if let Some(db) = self.get_database_mut(origin, db_name) {
404 let result = db.create_object_store(&store_name, key_paths, auto_increment);
405 let _ = sender.send(result.map_err(BackendError::from));
406 } else {
407 let _ = sender.send(Err(BackendError::DbNotFound));
408 }
409 },
410 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
411 if let Some(db) = self.get_database_mut(origin, db_name) {
412 let result = db.delete_object_store(&store_name);
413 let _ = sender.send(result.map_err(BackendError::from));
414 } else {
415 let _ = sender.send(Err(BackendError::DbNotFound));
416 }
417 },
418 SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
419 if let Some(db) = self.get_database_mut(origin, db_name) {
420 db.start_transaction(txn, Some(sender));
421 } else {
422 let _ = sender.send(Err(BackendError::DbNotFound));
423 }
424 },
425 SyncOperation::Version(sender, origin, db_name) => {
426 if let Some(db) = self.get_database(origin, db_name) {
427 let _ = sender.send(db.version().map_err(BackendError::from));
428 } else {
429 let _ = sender.send(Err(BackendError::DbNotFound));
430 }
431 },
432 SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
433 if let Some(db) = self.get_database_mut(origin, db_name) {
434 db.serial_number_counter += 1;
435 let _ = sender.send(db.serial_number_counter);
436 }
437 },
438 SyncOperation::Exit(sender) => {
439 let _ = sender.send(());
441 },
442 }
443 }
444}