storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::threadpool::ThreadPool;
15use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
16use log::{debug, warn};
17use rustc_hash::FxHashMap;
18use servo_config::pref;
19use servo_url::origin::ImmutableOrigin;
20use storage_traits::indexeddb_thread::{
21    AsyncOperation, BackendError, BackendResult, CreateObjectResult, DbResult, IndexedDBThreadMsg,
22    IndexedDBTxnMode, KeyPath, SyncOperation,
23};
24use uuid::Uuid;
25
26use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
27
28pub trait IndexedDBThreadFactory {
29    fn new(config_dir: Option<PathBuf>) -> Self;
30}
31
32impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
33    fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
34        let (chan, port) = ipc::channel().unwrap();
35
36        let mut idb_base_dir = PathBuf::new();
37        if let Some(p) = config_dir {
38            idb_base_dir.push(p);
39        }
40        idb_base_dir.push("IndexedDB");
41
42        thread::Builder::new()
43            .name("IndexedDBManager".to_owned())
44            .spawn(move || {
45                IndexedDBManager::new(port, idb_base_dir).start();
46            })
47            .expect("Thread spawning failed");
48
49        chan
50    }
51}
52
53#[derive(Clone, Eq, Hash, PartialEq)]
54pub struct IndexedDBDescription {
55    pub origin: ImmutableOrigin,
56    pub name: String,
57}
58
59impl IndexedDBDescription {
60    // randomly generated namespace for our purposes
61    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
62        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
63        0x5d,
64    ]);
65    // Converts the database description to a folder name where all
66    // data for this database is stored
67    pub(super) fn as_path(&self) -> PathBuf {
68        let mut path = PathBuf::new();
69
70        // uuid v5 is deterministic
71        let origin_uuid = Uuid::new_v5(
72            Self::NAMESPACE_SERVO_IDB,
73            self.origin.ascii_serialization().as_bytes(),
74        );
75        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
76        path.push(origin_uuid.to_string());
77        path.push(db_name_uuid.to_string());
78
79        path
80    }
81}
82
83struct IndexedDBEnvironment<E: KvsEngine> {
84    engine: E,
85    transactions: FxHashMap<u64, KvsTransaction>,
86    serial_number_counter: u64,
87}
88
89impl<E: KvsEngine> IndexedDBEnvironment<E> {
90    fn new(engine: E) -> IndexedDBEnvironment<E> {
91        IndexedDBEnvironment {
92            engine,
93            transactions: FxHashMap::default(),
94            serial_number_counter: 0,
95        }
96    }
97
98    fn queue_operation(
99        &mut self,
100        store_name: &str,
101        serial_number: u64,
102        mode: IndexedDBTxnMode,
103        operation: AsyncOperation,
104    ) {
105        self.transactions
106            .entry(serial_number)
107            .or_insert_with(|| KvsTransaction {
108                requests: VecDeque::new(),
109                mode,
110            })
111            .requests
112            .push_back(KvsOperation {
113                operation,
114                store_name: String::from(store_name),
115            });
116    }
117
118    // Executes all requests for a transaction (without committing)
119    fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<BackendResult<()>>>) {
120        // FIXME:(arihant2math) find optimizations in this function
121        //   rather than on the engine level code (less repetition)
122        if let Some(txn) = self.transactions.remove(&txn) {
123            let _ = self.engine.process_transaction(txn).blocking_recv();
124        }
125
126        // We have a sender if the transaction is started manually, and they
127        // probably want to know when it is finished
128        if let Some(sender) = sender {
129            if sender.send(Ok(())).is_err() {
130                warn!("IDBTransaction starter dropped its channel");
131            }
132        }
133    }
134
135    fn has_key_generator(&self, store_name: &str) -> bool {
136        self.engine.has_key_generator(store_name)
137    }
138
139    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
140        self.engine.key_path(store_name)
141    }
142
143    fn create_index(
144        &self,
145        store_name: &str,
146        index_name: String,
147        key_path: KeyPath,
148        unique: bool,
149        multi_entry: bool,
150    ) -> DbResult<CreateObjectResult> {
151        self.engine
152            .create_index(store_name, index_name, key_path, unique, multi_entry)
153            .map_err(|err| format!("{err:?}"))
154    }
155
156    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
157        self.engine
158            .delete_index(store_name, index_name)
159            .map_err(|err| format!("{err:?}"))
160    }
161
162    fn create_object_store(
163        &mut self,
164        store_name: &str,
165        key_path: Option<KeyPath>,
166        auto_increment: bool,
167    ) -> DbResult<CreateObjectResult> {
168        self.engine
169            .create_store(store_name, key_path, auto_increment)
170            .map_err(|err| format!("{err:?}"))
171    }
172
173    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
174        let result = self.engine.delete_store(store_name);
175        result.map_err(|err| format!("{err:?}"))
176    }
177
178    fn delete_database(self, sender: IpcSender<BackendResult<()>>) {
179        let result = self.engine.delete_database();
180        let _ = sender.send(
181            result
182                .map_err(|err| format!("{err:?}"))
183                .map_err(BackendError::from),
184        );
185    }
186
187    fn version(&self) -> DbResult<u64> {
188        self.engine.version().map_err(|err| format!("{err:?}"))
189    }
190
191    fn set_version(&mut self, version: u64) -> DbResult<()> {
192        self.engine
193            .set_version(version)
194            .map_err(|err| format!("{err:?}"))
195    }
196}
197
198struct IndexedDBManager {
199    port: IpcReceiver<IndexedDBThreadMsg>,
200    idb_base_dir: PathBuf,
201    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
202    thread_pool: Arc<ThreadPool>,
203}
204
205impl IndexedDBManager {
206    fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
207        debug!("New indexedDBManager");
208
209        // Uses an estimate of the system cpus to process IndexedDB transactions
210        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
211        // If no information can be obtained about the system, uses 4 threads as a default
212        let thread_count = thread::available_parallelism()
213            .map(|i| i.get())
214            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
215            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
216
217        IndexedDBManager {
218            port,
219            idb_base_dir,
220            databases: HashMap::new(),
221            thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
222        }
223    }
224}
225
226impl IndexedDBManager {
227    fn start(&mut self) {
228        loop {
229            // FIXME:(arihant2math) No message *most likely* means that
230            // the ipc sender has been dropped, so we break the look
231            let message = match self.port.recv() {
232                Ok(msg) => msg,
233                Err(e) => match e {
234                    IpcError::Disconnected => {
235                        break;
236                    },
237                    other => {
238                        warn!("Error in IndexedDB thread: {:?}", other);
239                        continue;
240                    },
241                },
242            };
243            match message {
244                IndexedDBThreadMsg::Sync(operation) => {
245                    self.handle_sync_operation(operation);
246                },
247                IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
248                    if let Some(db) = self.get_database_mut(origin, db_name) {
249                        // Queues an operation for a transaction without starting it
250                        db.queue_operation(&store_name, txn, mode, operation);
251                        // FIXME:(arihant2math) Schedule transactions properly
252                        // while db.transactions.iter().any(|s| s.1.mode == IndexedDBTxnMode::Readwrite) {
253                        //     std::hint::spin_loop();
254                        // }
255                        db.start_transaction(txn, None);
256                    }
257                },
258            }
259        }
260    }
261
262    fn get_database(
263        &self,
264        origin: ImmutableOrigin,
265        db_name: String,
266    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
267        let idb_description = IndexedDBDescription {
268            origin,
269            name: db_name,
270        };
271
272        self.databases.get(&idb_description)
273    }
274
275    fn get_database_mut(
276        &mut self,
277        origin: ImmutableOrigin,
278        db_name: String,
279    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
280        let idb_description = IndexedDBDescription {
281            origin,
282            name: db_name,
283        };
284
285        self.databases.get_mut(&idb_description)
286    }
287
288    fn handle_sync_operation(&mut self, operation: SyncOperation) {
289        match operation {
290            SyncOperation::CloseDatabase(sender, origin, db_name) => {
291                let idb_description = IndexedDBDescription {
292                    origin,
293                    name: db_name,
294                };
295                if let Some(_db) = self.databases.remove(&idb_description) {
296                    // TODO: maybe a close database function should be added to the trait and called here?
297                }
298                let _ = sender.send(Ok(()));
299            },
300            SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
301                let idb_description = IndexedDBDescription {
302                    origin,
303                    name: db_name,
304                };
305
306                let idb_base_dir = self.idb_base_dir.as_path();
307
308                let version = version.unwrap_or(0);
309
310                match self.databases.entry(idb_description.clone()) {
311                    Entry::Vacant(e) => {
312                        let db = IndexedDBEnvironment::new(
313                            SqliteEngine::new(
314                                idb_base_dir,
315                                &idb_description,
316                                self.thread_pool.clone(),
317                            )
318                            .expect("Failed to create sqlite engine"),
319                        );
320                        let _ = sender.send(db.version().unwrap_or(version));
321                        e.insert(db);
322                    },
323                    Entry::Occupied(db) => {
324                        let _ = sender.send(db.get().version().unwrap_or(version));
325                    },
326                }
327            },
328            SyncOperation::DeleteDatabase(sender, origin, db_name) => {
329                // https://w3c.github.io/IndexedDB/#delete-a-database
330                // Step 4. Let db be the database named name in storageKey,
331                // if one exists. Otherwise, return 0 (zero).
332                let idb_description = IndexedDBDescription {
333                    origin,
334                    name: db_name,
335                };
336                if let Some(db) = self.databases.remove(&idb_description) {
337                    db.delete_database(sender);
338                } else {
339                    let _ = sender.send(Ok(()));
340                }
341            },
342            SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
343                let result = self
344                    .get_database(origin, db_name)
345                    .map(|db| db.has_key_generator(&store_name));
346                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
347            },
348            SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
349                let result = self
350                    .get_database(origin, db_name)
351                    .map(|db| db.key_path(&store_name));
352                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
353            },
354            SyncOperation::CreateIndex(
355                sender,
356                origin,
357                db_name,
358                store_name,
359                index_name,
360                key_path,
361                unique,
362                multi_entry,
363            ) => {
364                if let Some(db) = self.get_database(origin, db_name) {
365                    let result =
366                        db.create_index(&store_name, index_name, key_path, unique, multi_entry);
367                    let _ = sender.send(result.map_err(BackendError::from));
368                } else {
369                    let _ = sender.send(Err(BackendError::DbNotFound));
370                }
371            },
372            SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
373                if let Some(db) = self.get_database(origin, db_name) {
374                    let result = db.delete_index(&store_name, index_name);
375                    let _ = sender.send(result.map_err(BackendError::from));
376                } else {
377                    let _ = sender.send(Err(BackendError::DbNotFound));
378                }
379            },
380            SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
381                // FIXME:(arihant2math) This does nothing at the moment
382                let _ = sender.send(Ok(()));
383            },
384            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
385                if let Some(db) = self.get_database_mut(origin, db_name) {
386                    if version > db.version().unwrap_or(0) {
387                        let _ = db.set_version(version);
388                    }
389                    // erroring out if the version is not upgraded can be and non-replicable
390                    let _ = sender.send(db.version().map_err(BackendError::from));
391                } else {
392                    let _ = sender.send(Err(BackendError::DbNotFound));
393                }
394            },
395            SyncOperation::CreateObjectStore(
396                sender,
397                origin,
398                db_name,
399                store_name,
400                key_paths,
401                auto_increment,
402            ) => {
403                if let Some(db) = self.get_database_mut(origin, db_name) {
404                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
405                    let _ = sender.send(result.map_err(BackendError::from));
406                } else {
407                    let _ = sender.send(Err(BackendError::DbNotFound));
408                }
409            },
410            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
411                if let Some(db) = self.get_database_mut(origin, db_name) {
412                    let result = db.delete_object_store(&store_name);
413                    let _ = sender.send(result.map_err(BackendError::from));
414                } else {
415                    let _ = sender.send(Err(BackendError::DbNotFound));
416                }
417            },
418            SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
419                if let Some(db) = self.get_database_mut(origin, db_name) {
420                    db.start_transaction(txn, Some(sender));
421                } else {
422                    let _ = sender.send(Err(BackendError::DbNotFound));
423                }
424            },
425            SyncOperation::Version(sender, origin, db_name) => {
426                if let Some(db) = self.get_database(origin, db_name) {
427                    let _ = sender.send(db.version().map_err(BackendError::from));
428                } else {
429                    let _ = sender.send(Err(BackendError::DbNotFound));
430                }
431            },
432            SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
433                if let Some(db) = self.get_database_mut(origin, db_name) {
434                    db.serial_number_counter += 1;
435                    let _ = sender.send(db.serial_number_counter);
436                }
437            },
438            SyncOperation::Exit(sender) => {
439                // FIXME:(rasviitanen) Nothing to do?
440                let _ = sender.send(());
441            },
442        }
443    }
444}