storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::threadpool::ThreadPool;
15use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
16use log::{debug, warn};
17use rustc_hash::FxHashMap;
18use servo_config::pref;
19use servo_url::origin::ImmutableOrigin;
20use storage_traits::indexeddb::{
21    AsyncOperation, BackendError, BackendResult, CreateObjectResult, DbResult, IndexedDBThreadMsg,
22    IndexedDBTxnMode, KeyPath, SyncOperation,
23};
24use uuid::Uuid;
25
26use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
27
28pub trait IndexedDBThreadFactory {
29    fn new(config_dir: Option<PathBuf>) -> Self;
30}
31
32impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
33    fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
34        let (chan, port) = ipc::channel().unwrap();
35
36        let mut idb_base_dir = PathBuf::new();
37        if let Some(p) = config_dir {
38            idb_base_dir.push(p);
39        }
40        idb_base_dir.push("IndexedDB");
41
42        thread::Builder::new()
43            .name("IndexedDBManager".to_owned())
44            .spawn(move || {
45                IndexedDBManager::new(port, idb_base_dir).start();
46            })
47            .expect("Thread spawning failed");
48
49        chan
50    }
51}
52
53#[derive(Clone, Eq, Hash, PartialEq)]
54pub struct IndexedDBDescription {
55    pub origin: ImmutableOrigin,
56    pub name: String,
57}
58
59impl IndexedDBDescription {
60    // randomly generated namespace for our purposes
61    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
62        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
63        0x5d,
64    ]);
65    // Converts the database description to a folder name where all
66    // data for this database is stored
67    pub(super) fn as_path(&self) -> PathBuf {
68        let mut path = PathBuf::new();
69
70        // uuid v5 is deterministic
71        let origin_uuid = Uuid::new_v5(
72            Self::NAMESPACE_SERVO_IDB,
73            self.origin.ascii_serialization().as_bytes(),
74        );
75        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
76        path.push(origin_uuid.to_string());
77        path.push(db_name_uuid.to_string());
78
79        path
80    }
81}
82
83struct IndexedDBEnvironment<E: KvsEngine> {
84    engine: E,
85    transactions: FxHashMap<u64, KvsTransaction>,
86    serial_number_counter: u64,
87}
88
89impl<E: KvsEngine> IndexedDBEnvironment<E> {
90    fn new(engine: E) -> IndexedDBEnvironment<E> {
91        IndexedDBEnvironment {
92            engine,
93            transactions: FxHashMap::default(),
94            serial_number_counter: 0,
95        }
96    }
97
98    fn queue_operation(
99        &mut self,
100        store_name: &str,
101        serial_number: u64,
102        mode: IndexedDBTxnMode,
103        operation: AsyncOperation,
104    ) {
105        self.transactions
106            .entry(serial_number)
107            .or_insert_with(|| KvsTransaction {
108                requests: VecDeque::new(),
109                mode,
110            })
111            .requests
112            .push_back(KvsOperation {
113                operation,
114                store_name: String::from(store_name),
115            });
116    }
117
118    // Executes all requests for a transaction (without committing)
119    fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<BackendResult<()>>>) {
120        // FIXME:(arihant2math) find optimizations in this function
121        //   rather than on the engine level code (less repetition)
122        if let Some(txn) = self.transactions.remove(&txn) {
123            let _ = self.engine.process_transaction(txn).blocking_recv();
124        }
125
126        // We have a sender if the transaction is started manually, and they
127        // probably want to know when it is finished
128        if let Some(sender) = sender {
129            if sender.send(Ok(())).is_err() {
130                warn!("IDBTransaction starter dropped its channel");
131            }
132        }
133    }
134
135    fn has_key_generator(&self, store_name: &str) -> bool {
136        self.engine.has_key_generator(store_name)
137    }
138
139    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
140        self.engine.key_path(store_name)
141    }
142
143    fn create_index(
144        &self,
145        store_name: &str,
146        index_name: String,
147        key_path: KeyPath,
148        unique: bool,
149        multi_entry: bool,
150    ) -> DbResult<CreateObjectResult> {
151        self.engine
152            .create_index(store_name, index_name, key_path, unique, multi_entry)
153            .map_err(|err| format!("{err:?}"))
154    }
155
156    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
157        self.engine
158            .delete_index(store_name, index_name)
159            .map_err(|err| format!("{err:?}"))
160    }
161
162    fn create_object_store(
163        &mut self,
164        store_name: &str,
165        key_path: Option<KeyPath>,
166        auto_increment: bool,
167    ) -> DbResult<CreateObjectResult> {
168        self.engine
169            .create_store(store_name, key_path, auto_increment)
170            .map_err(|err| format!("{err:?}"))
171    }
172
173    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
174        let result = self.engine.delete_store(store_name);
175        result.map_err(|err| format!("{err:?}"))
176    }
177
178    fn delete_database(self, sender: IpcSender<BackendResult<()>>) {
179        let result = self.engine.delete_database();
180        let _ = sender.send(
181            result
182                .map_err(|err| format!("{err:?}"))
183                .map_err(BackendError::from),
184        );
185    }
186
187    fn version(&self) -> DbResult<u64> {
188        self.engine.version().map_err(|err| format!("{err:?}"))
189    }
190
191    fn set_version(&mut self, version: u64) -> DbResult<()> {
192        self.engine
193            .set_version(version)
194            .map_err(|err| format!("{err:?}"))
195    }
196}
197
198struct IndexedDBManager {
199    port: IpcReceiver<IndexedDBThreadMsg>,
200    idb_base_dir: PathBuf,
201    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
202    thread_pool: Arc<ThreadPool>,
203}
204
205impl IndexedDBManager {
206    fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
207        debug!("New indexedDBManager");
208
209        // Uses an estimate of the system cpus to process IndexedDB transactions
210        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
211        // If no information can be obtained about the system, uses 4 threads as a default
212        let thread_count = thread::available_parallelism()
213            .map(|i| i.get())
214            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
215            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
216
217        IndexedDBManager {
218            port,
219            idb_base_dir,
220            databases: HashMap::new(),
221            thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
222        }
223    }
224}
225
226impl IndexedDBManager {
227    fn start(&mut self) {
228        loop {
229            // FIXME:(arihant2math) No message *most likely* means that
230            // the ipc sender has been dropped, so we break the look
231            let message = match self.port.recv() {
232                Ok(msg) => msg,
233                Err(e) => match e {
234                    IpcError::Disconnected => {
235                        break;
236                    },
237                    other => {
238                        warn!("Error in IndexedDB thread: {:?}", other);
239                        continue;
240                    },
241                },
242            };
243            match message {
244                IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
245                    let _ = sender.send(());
246                    break;
247                },
248                IndexedDBThreadMsg::Sync(operation) => {
249                    self.handle_sync_operation(operation);
250                },
251                IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
252                    if let Some(db) = self.get_database_mut(origin, db_name) {
253                        // Queues an operation for a transaction without starting it
254                        db.queue_operation(&store_name, txn, mode, operation);
255                        // FIXME:(arihant2math) Schedule transactions properly
256                        // while db.transactions.iter().any(|s| s.1.mode == IndexedDBTxnMode::Readwrite) {
257                        //     std::hint::spin_loop();
258                        // }
259                        db.start_transaction(txn, None);
260                    }
261                },
262            }
263        }
264    }
265
266    fn get_database(
267        &self,
268        origin: ImmutableOrigin,
269        db_name: String,
270    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
271        let idb_description = IndexedDBDescription {
272            origin,
273            name: db_name,
274        };
275
276        self.databases.get(&idb_description)
277    }
278
279    fn get_database_mut(
280        &mut self,
281        origin: ImmutableOrigin,
282        db_name: String,
283    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
284        let idb_description = IndexedDBDescription {
285            origin,
286            name: db_name,
287        };
288
289        self.databases.get_mut(&idb_description)
290    }
291
292    fn handle_sync_operation(&mut self, operation: SyncOperation) {
293        match operation {
294            SyncOperation::CloseDatabase(sender, origin, db_name) => {
295                let idb_description = IndexedDBDescription {
296                    origin,
297                    name: db_name,
298                };
299                if let Some(_db) = self.databases.remove(&idb_description) {
300                    // TODO: maybe a close database function should be added to the trait and called here?
301                }
302                let _ = sender.send(Ok(()));
303            },
304            SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
305                let idb_description = IndexedDBDescription {
306                    origin,
307                    name: db_name,
308                };
309
310                let idb_base_dir = self.idb_base_dir.as_path();
311
312                let version = version.unwrap_or(0);
313
314                match self.databases.entry(idb_description.clone()) {
315                    Entry::Vacant(e) => {
316                        let db = IndexedDBEnvironment::new(
317                            SqliteEngine::new(
318                                idb_base_dir,
319                                &idb_description,
320                                self.thread_pool.clone(),
321                            )
322                            .expect("Failed to create sqlite engine"),
323                        );
324                        let _ = sender.send(db.version().unwrap_or(version));
325                        e.insert(db);
326                    },
327                    Entry::Occupied(db) => {
328                        let _ = sender.send(db.get().version().unwrap_or(version));
329                    },
330                }
331            },
332            SyncOperation::DeleteDatabase(sender, origin, db_name) => {
333                // https://w3c.github.io/IndexedDB/#delete-a-database
334                // Step 4. Let db be the database named name in storageKey,
335                // if one exists. Otherwise, return 0 (zero).
336                let idb_description = IndexedDBDescription {
337                    origin,
338                    name: db_name,
339                };
340                if let Some(db) = self.databases.remove(&idb_description) {
341                    db.delete_database(sender);
342                } else {
343                    let _ = sender.send(Ok(()));
344                }
345            },
346            SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
347                let result = self
348                    .get_database(origin, db_name)
349                    .map(|db| db.has_key_generator(&store_name));
350                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
351            },
352            SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
353                let result = self
354                    .get_database(origin, db_name)
355                    .map(|db| db.key_path(&store_name));
356                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
357            },
358            SyncOperation::CreateIndex(
359                sender,
360                origin,
361                db_name,
362                store_name,
363                index_name,
364                key_path,
365                unique,
366                multi_entry,
367            ) => {
368                if let Some(db) = self.get_database(origin, db_name) {
369                    let result =
370                        db.create_index(&store_name, index_name, key_path, unique, multi_entry);
371                    let _ = sender.send(result.map_err(BackendError::from));
372                } else {
373                    let _ = sender.send(Err(BackendError::DbNotFound));
374                }
375            },
376            SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
377                if let Some(db) = self.get_database(origin, db_name) {
378                    let result = db.delete_index(&store_name, index_name);
379                    let _ = sender.send(result.map_err(BackendError::from));
380                } else {
381                    let _ = sender.send(Err(BackendError::DbNotFound));
382                }
383            },
384            SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
385                // FIXME:(arihant2math) This does nothing at the moment
386                let _ = sender.send(Ok(()));
387            },
388            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
389                if let Some(db) = self.get_database_mut(origin, db_name) {
390                    if version > db.version().unwrap_or(0) {
391                        let _ = db.set_version(version);
392                    }
393                    // erroring out if the version is not upgraded can be and non-replicable
394                    let _ = sender.send(db.version().map_err(BackendError::from));
395                } else {
396                    let _ = sender.send(Err(BackendError::DbNotFound));
397                }
398            },
399            SyncOperation::CreateObjectStore(
400                sender,
401                origin,
402                db_name,
403                store_name,
404                key_paths,
405                auto_increment,
406            ) => {
407                if let Some(db) = self.get_database_mut(origin, db_name) {
408                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
409                    let _ = sender.send(result.map_err(BackendError::from));
410                } else {
411                    let _ = sender.send(Err(BackendError::DbNotFound));
412                }
413            },
414            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
415                if let Some(db) = self.get_database_mut(origin, db_name) {
416                    let result = db.delete_object_store(&store_name);
417                    let _ = sender.send(result.map_err(BackendError::from));
418                } else {
419                    let _ = sender.send(Err(BackendError::DbNotFound));
420                }
421            },
422            SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
423                if let Some(db) = self.get_database_mut(origin, db_name) {
424                    db.start_transaction(txn, Some(sender));
425                } else {
426                    let _ = sender.send(Err(BackendError::DbNotFound));
427                }
428            },
429            SyncOperation::Version(sender, origin, db_name) => {
430                if let Some(db) = self.get_database(origin, db_name) {
431                    let _ = sender.send(db.version().map_err(BackendError::from));
432                } else {
433                    let _ = sender.send(Err(BackendError::DbNotFound));
434                }
435            },
436            SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
437                if let Some(db) = self.get_database_mut(origin, db_name) {
438                    db.serial_number_counter += 1;
439                    let _ = sender.send(db.serial_number_counter);
440                }
441            },
442            SyncOperation::Exit(_) => {
443                unreachable!("We must've already broken out of event loop.");
444            },
445        }
446    }
447}