net/indexeddb/
idb_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use std::borrow::ToOwned;
5use std::collections::hash_map::Entry;
6use std::collections::{HashMap, VecDeque};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::thread;
10
11use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
12use log::{debug, warn};
13use net_traits::indexeddb_thread::{
14    AsyncOperation, BackendError, BackendResult, CreateObjectResult, DbResult, IndexedDBThreadMsg,
15    IndexedDBTxnMode, KeyPath, SyncOperation,
16};
17use servo_config::pref;
18use servo_url::origin::ImmutableOrigin;
19use uuid::Uuid;
20
21use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
22use crate::resource_thread::CoreResourceThreadPool;
23
24pub trait IndexedDBThreadFactory {
25    fn new(config_dir: Option<PathBuf>) -> Self;
26}
27
28impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
29    fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
30        let (chan, port) = ipc::channel().unwrap();
31
32        let mut idb_base_dir = PathBuf::new();
33        if let Some(p) = config_dir {
34            idb_base_dir.push(p);
35        }
36        idb_base_dir.push("IndexedDB");
37
38        thread::Builder::new()
39            .name("IndexedDBManager".to_owned())
40            .spawn(move || {
41                IndexedDBManager::new(port, idb_base_dir).start();
42            })
43            .expect("Thread spawning failed");
44
45        chan
46    }
47}
48
49#[derive(Clone, Eq, Hash, PartialEq)]
50pub struct IndexedDBDescription {
51    pub origin: ImmutableOrigin,
52    pub name: String,
53}
54
55impl IndexedDBDescription {
56    // randomly generated namespace for our purposes
57    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
58        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
59        0x5d,
60    ]);
61    // Converts the database description to a folder name where all
62    // data for this database is stored
63    pub(super) fn as_path(&self) -> PathBuf {
64        let mut path = PathBuf::new();
65
66        // uuid v5 is deterministic
67        let origin_uuid = Uuid::new_v5(
68            Self::NAMESPACE_SERVO_IDB,
69            self.origin.ascii_serialization().as_bytes(),
70        );
71        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
72        path.push(origin_uuid.to_string());
73        path.push(db_name_uuid.to_string());
74
75        path
76    }
77}
78
79struct IndexedDBEnvironment<E: KvsEngine> {
80    engine: E,
81    transactions: HashMap<u64, KvsTransaction>,
82    serial_number_counter: u64,
83}
84
85impl<E: KvsEngine> IndexedDBEnvironment<E> {
86    fn new(engine: E) -> IndexedDBEnvironment<E> {
87        IndexedDBEnvironment {
88            engine,
89            transactions: HashMap::new(),
90            serial_number_counter: 0,
91        }
92    }
93
94    fn queue_operation(
95        &mut self,
96        store_name: &str,
97        serial_number: u64,
98        mode: IndexedDBTxnMode,
99        operation: AsyncOperation,
100    ) {
101        self.transactions
102            .entry(serial_number)
103            .or_insert_with(|| KvsTransaction {
104                requests: VecDeque::new(),
105                mode,
106            })
107            .requests
108            .push_back(KvsOperation {
109                operation,
110                store_name: String::from(store_name),
111            });
112    }
113
114    // Executes all requests for a transaction (without committing)
115    fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<BackendResult<()>>>) {
116        // FIXME:(arihant2math) find optimizations in this function
117        //   rather than on the engine level code (less repetition)
118        if let Some(txn) = self.transactions.remove(&txn) {
119            let _ = self.engine.process_transaction(txn).blocking_recv();
120        }
121
122        // We have a sender if the transaction is started manually, and they
123        // probably want to know when it is finished
124        if let Some(sender) = sender {
125            if sender.send(Ok(())).is_err() {
126                warn!("IDBTransaction starter dropped its channel");
127            }
128        }
129    }
130
131    fn has_key_generator(&self, store_name: &str) -> bool {
132        self.engine.has_key_generator(store_name)
133    }
134
135    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
136        self.engine.key_path(store_name)
137    }
138
139    fn create_index(
140        &self,
141        store_name: &str,
142        index_name: String,
143        key_path: KeyPath,
144        unique: bool,
145        multi_entry: bool,
146    ) -> DbResult<CreateObjectResult> {
147        self.engine
148            .create_index(store_name, index_name, key_path, unique, multi_entry)
149            .map_err(|err| format!("{err:?}"))
150    }
151
152    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
153        self.engine
154            .delete_index(store_name, index_name)
155            .map_err(|err| format!("{err:?}"))
156    }
157
158    fn create_object_store(
159        &mut self,
160        store_name: &str,
161        key_path: Option<KeyPath>,
162        auto_increment: bool,
163    ) -> DbResult<CreateObjectResult> {
164        self.engine
165            .create_store(store_name, key_path, auto_increment)
166            .map_err(|err| format!("{err:?}"))
167    }
168
169    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
170        let result = self.engine.delete_store(store_name);
171        result.map_err(|err| format!("{err:?}"))
172    }
173
174    fn delete_database(self, sender: IpcSender<BackendResult<()>>) {
175        let result = self.engine.delete_database();
176        let _ = sender.send(
177            result
178                .map_err(|err| format!("{err:?}"))
179                .map_err(BackendError::from),
180        );
181    }
182
183    fn version(&self) -> DbResult<u64> {
184        self.engine.version().map_err(|err| format!("{err:?}"))
185    }
186
187    fn set_version(&mut self, version: u64) -> DbResult<()> {
188        self.engine
189            .set_version(version)
190            .map_err(|err| format!("{err:?}"))
191    }
192}
193
194struct IndexedDBManager {
195    port: IpcReceiver<IndexedDBThreadMsg>,
196    idb_base_dir: PathBuf,
197    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
198    thread_pool: Arc<CoreResourceThreadPool>,
199}
200
201impl IndexedDBManager {
202    fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
203        debug!("New indexedDBManager");
204
205        // Uses an estimate of the system cpus to process IndexedDB transactions
206        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
207        // If no information can be obtained about the system, uses 4 threads as a default
208        let thread_count = thread::available_parallelism()
209            .map(|i| i.get())
210            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
211            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
212
213        IndexedDBManager {
214            port,
215            idb_base_dir,
216            databases: HashMap::new(),
217            thread_pool: Arc::new(CoreResourceThreadPool::new(
218                thread_count,
219                "IndexedDB".to_string(),
220            )),
221        }
222    }
223}
224
225impl IndexedDBManager {
226    fn start(&mut self) {
227        loop {
228            // FIXME:(arihant2math) No message *most likely* means that
229            // the ipc sender has been dropped, so we break the look
230            let message = match self.port.recv() {
231                Ok(msg) => msg,
232                Err(e) => match e {
233                    IpcError::Disconnected => {
234                        break;
235                    },
236                    other => {
237                        warn!("Error in IndexedDB thread: {:?}", other);
238                        continue;
239                    },
240                },
241            };
242            match message {
243                IndexedDBThreadMsg::Sync(operation) => {
244                    self.handle_sync_operation(operation);
245                },
246                IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
247                    if let Some(db) = self.get_database_mut(origin, db_name) {
248                        // Queues an operation for a transaction without starting it
249                        db.queue_operation(&store_name, txn, mode, operation);
250                        // FIXME:(arihant2math) Schedule transactions properly
251                        // while db.transactions.iter().any(|s| s.1.mode == IndexedDBTxnMode::Readwrite) {
252                        //     std::hint::spin_loop();
253                        // }
254                        db.start_transaction(txn, None);
255                    }
256                },
257            }
258        }
259    }
260
261    fn get_database(
262        &self,
263        origin: ImmutableOrigin,
264        db_name: String,
265    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
266        let idb_description = IndexedDBDescription {
267            origin,
268            name: db_name,
269        };
270
271        self.databases.get(&idb_description)
272    }
273
274    fn get_database_mut(
275        &mut self,
276        origin: ImmutableOrigin,
277        db_name: String,
278    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
279        let idb_description = IndexedDBDescription {
280            origin,
281            name: db_name,
282        };
283
284        self.databases.get_mut(&idb_description)
285    }
286
287    fn handle_sync_operation(&mut self, operation: SyncOperation) {
288        match operation {
289            SyncOperation::CloseDatabase(sender, origin, db_name) => {
290                let idb_description = IndexedDBDescription {
291                    origin,
292                    name: db_name,
293                };
294                if let Some(_db) = self.databases.remove(&idb_description) {
295                    // TODO: maybe a close database function should be added to the trait and called here?
296                }
297                let _ = sender.send(Ok(()));
298            },
299            SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
300                let idb_description = IndexedDBDescription {
301                    origin,
302                    name: db_name,
303                };
304
305                let idb_base_dir = self.idb_base_dir.as_path();
306
307                let version = version.unwrap_or(0);
308
309                match self.databases.entry(idb_description.clone()) {
310                    Entry::Vacant(e) => {
311                        let db = IndexedDBEnvironment::new(
312                            SqliteEngine::new(
313                                idb_base_dir,
314                                &idb_description,
315                                self.thread_pool.clone(),
316                            )
317                            .expect("Failed to create sqlite engine"),
318                        );
319                        let _ = sender.send(db.version().unwrap_or(version));
320                        e.insert(db);
321                    },
322                    Entry::Occupied(db) => {
323                        let _ = sender.send(db.get().version().unwrap_or(version));
324                    },
325                }
326            },
327            SyncOperation::DeleteDatabase(sender, origin, db_name) => {
328                // https://w3c.github.io/IndexedDB/#delete-a-database
329                // Step 4. Let db be the database named name in storageKey,
330                // if one exists. Otherwise, return 0 (zero).
331                let idb_description = IndexedDBDescription {
332                    origin,
333                    name: db_name,
334                };
335                if let Some(db) = self.databases.remove(&idb_description) {
336                    db.delete_database(sender);
337                } else {
338                    let _ = sender.send(Ok(()));
339                }
340            },
341            SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
342                let result = self
343                    .get_database(origin, db_name)
344                    .map(|db| db.has_key_generator(&store_name));
345                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
346            },
347            SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
348                let result = self
349                    .get_database(origin, db_name)
350                    .map(|db| db.key_path(&store_name));
351                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
352            },
353            SyncOperation::CreateIndex(
354                sender,
355                origin,
356                db_name,
357                store_name,
358                index_name,
359                key_path,
360                unique,
361                multi_entry,
362            ) => {
363                if let Some(db) = self.get_database(origin, db_name) {
364                    let result =
365                        db.create_index(&store_name, index_name, key_path, unique, multi_entry);
366                    let _ = sender.send(result.map_err(BackendError::from));
367                } else {
368                    let _ = sender.send(Err(BackendError::DbNotFound));
369                }
370            },
371            SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
372                if let Some(db) = self.get_database(origin, db_name) {
373                    let result = db.delete_index(&store_name, index_name);
374                    let _ = sender.send(result.map_err(BackendError::from));
375                } else {
376                    let _ = sender.send(Err(BackendError::DbNotFound));
377                }
378            },
379            SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
380                // FIXME:(arihant2math) This does nothing at the moment
381                let _ = sender.send(Ok(()));
382            },
383            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
384                if let Some(db) = self.get_database_mut(origin, db_name) {
385                    if version > db.version().unwrap_or(0) {
386                        let _ = db.set_version(version);
387                    }
388                    // erroring out if the version is not upgraded can be and non-replicable
389                    let _ = sender.send(db.version().map_err(BackendError::from));
390                } else {
391                    let _ = sender.send(Err(BackendError::DbNotFound));
392                }
393            },
394            SyncOperation::CreateObjectStore(
395                sender,
396                origin,
397                db_name,
398                store_name,
399                key_paths,
400                auto_increment,
401            ) => {
402                if let Some(db) = self.get_database_mut(origin, db_name) {
403                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
404                    let _ = sender.send(result.map_err(BackendError::from));
405                } else {
406                    let _ = sender.send(Err(BackendError::DbNotFound));
407                }
408            },
409            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
410                if let Some(db) = self.get_database_mut(origin, db_name) {
411                    let result = db.delete_object_store(&store_name);
412                    let _ = sender.send(result.map_err(BackendError::from));
413                } else {
414                    let _ = sender.send(Err(BackendError::DbNotFound));
415                }
416            },
417            SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
418                if let Some(db) = self.get_database_mut(origin, db_name) {
419                    db.start_transaction(txn, Some(sender));
420                } else {
421                    let _ = sender.send(Err(BackendError::DbNotFound));
422                }
423            },
424            SyncOperation::Version(sender, origin, db_name) => {
425                if let Some(db) = self.get_database(origin, db_name) {
426                    let _ = sender.send(db.version().map_err(BackendError::from));
427                } else {
428                    let _ = sender.send(Err(BackendError::DbNotFound));
429                }
430            },
431            SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
432                if let Some(db) = self.get_database_mut(origin, db_name) {
433                    db.serial_number_counter += 1;
434                    let _ = sender.send(db.serial_number_counter);
435                }
436            },
437            SyncOperation::Exit(sender) => {
438                // FIXME:(rasviitanen) Nothing to do?
439                let _ = sender.send(());
440            },
441        }
442    }
443}