storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use profile_traits::generic_callback::GenericCallback;
18use rusqlite::Error as RusqliteError;
19use rustc_hash::FxHashMap;
20use servo_config::pref;
21use servo_url::origin::ImmutableOrigin;
22use storage_traits::indexeddb::{
23    AsyncOperation, BackendError, BackendResult, CreateObjectResult, DatabaseInfo, DbResult,
24    IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath, OpenDatabaseResult, SyncOperation,
25};
26use uuid::Uuid;
27
28use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
29use crate::shared::is_sqlite_disk_full_error;
30
31pub trait IndexedDBThreadFactory {
32    fn new(config_dir: Option<PathBuf>) -> Self;
33}
34
35impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
36    fn new(config_dir: Option<PathBuf>) -> GenericSender<IndexedDBThreadMsg> {
37        let (chan, port) = generic_channel::channel().unwrap();
38
39        let mut idb_base_dir = PathBuf::new();
40        if let Some(p) = config_dir {
41            idb_base_dir.push(p);
42        }
43        idb_base_dir.push("IndexedDB");
44
45        thread::Builder::new()
46            .name("IndexedDBManager".to_owned())
47            .spawn(move || {
48                IndexedDBManager::new(port, idb_base_dir).start();
49            })
50            .expect("Thread spawning failed");
51
52        chan
53    }
54}
55
56/// A key used to track databases.
57/// TODO: use a storage key.
58#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub struct IndexedDBDescription {
60    pub origin: ImmutableOrigin,
61    pub name: String,
62}
63
64impl IndexedDBDescription {
65    // randomly generated namespace for our purposes
66    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
67        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
68        0x5d,
69    ]);
70    // Converts the database description to a folder name where all
71    // data for this database is stored
72    pub(super) fn as_path(&self) -> PathBuf {
73        let mut path = PathBuf::new();
74
75        // uuid v5 is deterministic
76        let origin_uuid = Uuid::new_v5(
77            Self::NAMESPACE_SERVO_IDB,
78            self.origin.ascii_serialization().as_bytes(),
79        );
80        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
81        path.push(origin_uuid.to_string());
82        path.push(db_name_uuid.to_string());
83
84        path
85    }
86}
87
88struct IndexedDBEnvironment<E: KvsEngine> {
89    engine: E,
90    transactions: FxHashMap<u64, KvsTransaction>,
91}
92
93impl<E: KvsEngine> IndexedDBEnvironment<E> {
94    fn new(engine: E) -> IndexedDBEnvironment<E> {
95        IndexedDBEnvironment {
96            engine,
97            transactions: FxHashMap::default(),
98        }
99    }
100
101    fn queue_operation(
102        &mut self,
103        store_name: &str,
104        serial_number: u64,
105        mode: IndexedDBTxnMode,
106        operation: AsyncOperation,
107    ) {
108        self.transactions
109            .entry(serial_number)
110            .or_insert_with(|| KvsTransaction {
111                requests: VecDeque::new(),
112                mode,
113            })
114            .requests
115            .push_back(KvsOperation {
116                operation,
117                store_name: String::from(store_name),
118            });
119    }
120
121    // Executes all requests for a transaction (without committing)
122    fn start_transaction(&mut self, txn: u64, sender: Option<GenericSender<BackendResult<()>>>) {
123        // FIXME:(arihant2math) find optimizations in this function
124        //   rather than on the engine level code (less repetition)
125        if let Some(txn) = self.transactions.remove(&txn) {
126            let _ = self.engine.process_transaction(txn).blocking_recv();
127        }
128
129        // We have a sender if the transaction is started manually, and they
130        // probably want to know when it is finished
131        if let Some(sender) = sender {
132            if sender.send(Ok(())).is_err() {
133                warn!("IDBTransaction starter dropped its channel");
134            }
135        }
136    }
137
138    fn has_key_generator(&self, store_name: &str) -> bool {
139        self.engine.has_key_generator(store_name)
140    }
141
142    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
143        self.engine.key_path(store_name)
144    }
145
146    fn create_index(
147        &self,
148        store_name: &str,
149        index_name: String,
150        key_path: KeyPath,
151        unique: bool,
152        multi_entry: bool,
153    ) -> DbResult<CreateObjectResult> {
154        self.engine
155            .create_index(store_name, index_name, key_path, unique, multi_entry)
156            .map_err(|err| format!("{err:?}"))
157    }
158
159    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
160        self.engine
161            .delete_index(store_name, index_name)
162            .map_err(|err| format!("{err:?}"))
163    }
164
165    fn create_object_store(
166        &mut self,
167        store_name: &str,
168        key_path: Option<KeyPath>,
169        auto_increment: bool,
170    ) -> DbResult<CreateObjectResult> {
171        self.engine
172            .create_store(store_name, key_path, auto_increment)
173            .map_err(|err| format!("{err:?}"))
174    }
175
176    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
177        let result = self.engine.delete_store(store_name);
178        result.map_err(|err| format!("{err:?}"))
179    }
180
181    fn delete_database(self) -> BackendResult<()> {
182        let result = self.engine.delete_database();
183        result
184            .map_err(|err| format!("{err:?}"))
185            .map_err(BackendError::from)
186    }
187
188    fn version(&self) -> Result<u64, E::Error> {
189        self.engine.version()
190    }
191
192    fn set_version(&mut self, version: u64) -> DbResult<()> {
193        self.engine
194            .set_version(version)
195            .map_err(|err| format!("{err:?}"))
196    }
197}
198
199fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
200    if is_sqlite_disk_full_error(&err) {
201        BackendError::QuotaExceeded
202    } else {
203        BackendError::DbErr(format!("{err:?}"))
204    }
205}
206
207/// <https://w3c.github.io/IndexedDB/#request-open-request>
208/// Used here to implement the
209/// <https://w3c.github.io/IndexedDB/#connection-queue>
210enum OpenRequest {
211    Open {
212        /// The callback used to send a result to script.
213        sender: GenericCallback<BackendResult<OpenDatabaseResult>>,
214
215        /// The origin of the request.
216        /// TODO: storage key.
217        origin: ImmutableOrigin,
218
219        /// The name of the database.
220        db_name: String,
221
222        /// Optionnaly, a requested db version.
223        version: Option<u64>,
224
225        /// Optionnaly, a version pending ugrade.
226        /// Used as <https://w3c.github.io/IndexedDB/#request-processed-flag>
227        pending_upgrade: Option<VersionUpgrade>,
228
229        id: Uuid,
230    },
231    Delete {
232        /// The callback used to send a result to script.
233        sender: GenericCallback<BackendResult<u64>>,
234
235        /// The origin of the request.
236        /// TODO: storage key.
237        /// Note: will be used when the full spec is implemented.
238        _origin: ImmutableOrigin,
239
240        /// The name of the database.
241        /// Note: will be used when the full spec is implemented.
242        _db_name: String,
243
244        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
245        processed: bool,
246
247        id: Uuid,
248    },
249}
250
251impl OpenRequest {
252    fn get_id(&self) -> Uuid {
253        let id = match self {
254            OpenRequest::Open {
255                sender: _,
256                origin: _,
257                db_name: _,
258                version: _,
259                pending_upgrade: _,
260                id,
261            } => id,
262            OpenRequest::Delete {
263                sender: _,
264                _origin: _,
265                _db_name: _,
266                processed: _,
267                id,
268            } => id,
269        };
270        *id
271    }
272
273    fn is_open(&self) -> bool {
274        match self {
275            OpenRequest::Open {
276                sender: _,
277                origin: _,
278                db_name: _,
279                version: _,
280                pending_upgrade: _,
281                id: _,
282            } => true,
283            OpenRequest::Delete {
284                sender: _,
285                _origin: _,
286                _db_name: _,
287                processed: _,
288                id: _,
289            } => false,
290        }
291    }
292    fn is_pending(&self) -> bool {
293        match self {
294            OpenRequest::Open {
295                sender: _,
296                origin: _,
297                db_name: _,
298                version: _,
299                pending_upgrade,
300                id: _,
301            } => pending_upgrade.is_some(),
302            OpenRequest::Delete {
303                sender: _,
304                _origin: _,
305                _db_name: _,
306                processed,
307                id: _,
308            } => !processed,
309        }
310    }
311
312    /// Abort the open request,
313    /// optionally returning a version to revert to.
314    fn abort(&self) -> Option<u64> {
315        match self {
316            OpenRequest::Open {
317                sender,
318                origin: _,
319                db_name: _,
320                version: _,
321                pending_upgrade,
322                id: _,
323            } => {
324                if sender.send(Ok(OpenDatabaseResult::AbortError)).is_err() {
325                    error!("Failed to send OpenDatabaseResult::Connection to script.");
326                };
327                pending_upgrade.as_ref().map(|upgrade| upgrade.old)
328            },
329            OpenRequest::Delete {
330                sender,
331                _origin: _,
332                _db_name: _,
333                processed: _,
334                id: _,
335            } => {
336                if sender.send(Err(BackendError::DbNotFound)).is_err() {
337                    error!("Failed to send result of database delete to script.");
338                };
339                None
340            },
341        }
342    }
343}
344
345struct VersionUpgrade {
346    old: u64,
347    new: u64,
348}
349
350/// <https://w3c.github.io/IndexedDB/#connection>
351#[derive(Clone, Eq, Hash, PartialEq)]
352struct Connection {
353    /// <https://w3c.github.io/IndexedDB/#connection-version>
354    version: u64,
355
356    /// <https://w3c.github.io/IndexedDB/#connection-close-pending-flag>
357    close_pending: bool,
358}
359
360struct IndexedDBManager {
361    port: GenericReceiver<IndexedDBThreadMsg>,
362    idb_base_dir: PathBuf,
363    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
364    thread_pool: Arc<ThreadPool>,
365
366    /// A global counter to produce unique transaction ids.
367    /// TODO: remove once db connections lifecyle is managed.
368    /// A global counter is only necessary because of how deleting a db currently
369    /// does not wait for connection to close and transactions to finish.
370    serial_number_counter: u64,
371
372    /// <https://w3c.github.io/IndexedDB/#connection-queue>
373    connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
374
375    /// <https://w3c.github.io/IndexedDB/#connection>
376    connections: HashMap<IndexedDBDescription, HashSet<Connection>>,
377}
378
379impl IndexedDBManager {
380    fn new(port: GenericReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
381        debug!("New indexedDBManager");
382
383        // Uses an estimate of the system cpus to process IndexedDB transactions
384        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
385        // If no information can be obtained about the system, uses 4 threads as a default
386        let thread_count = thread::available_parallelism()
387            .map(|i| i.get())
388            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
389            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
390
391        IndexedDBManager {
392            port,
393            idb_base_dir,
394            databases: HashMap::new(),
395            thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
396            serial_number_counter: 0,
397            connection_queues: Default::default(),
398            connections: Default::default(),
399        }
400    }
401}
402
403impl IndexedDBManager {
404    fn start(&mut self) {
405        loop {
406            // FIXME:(arihant2math) No message *most likely* means that
407            // the ipc sender has been dropped, so we break the look
408            let message = match self.port.recv() {
409                Ok(msg) => msg,
410                Err(ReceiveError::Disconnected) => {
411                    break;
412                },
413                Err(e) => {
414                    warn!("Error in IndexedDB thread: {e:?}");
415                    continue;
416                },
417            };
418            match message {
419                IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
420                    let _ = sender.send(());
421                    break;
422                },
423                IndexedDBThreadMsg::Sync(operation) => {
424                    self.handle_sync_operation(operation);
425                },
426                IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
427                    if let Some(db) = self.get_database_mut(origin, db_name) {
428                        // Queues an operation for a transaction without starting it
429                        db.queue_operation(&store_name, txn, mode, operation);
430                        // FIXME:(arihant2math) Schedule transactions properly
431                        // while db.transactions.iter().any(|s| s.1.mode == IndexedDBTxnMode::Readwrite) {
432                        //     std::hint::spin_loop();
433                        // }
434                        db.start_transaction(txn, None);
435                    }
436                },
437                IndexedDBThreadMsg::OpenTransactionInactive { name, origin } => {
438                    self.handle_open_transaction_inactive(name, origin);
439                },
440            }
441        }
442    }
443
444    /// Handle when an open transaction becomes inactive.
445    fn handle_open_transaction_inactive(&mut self, name: String, origin: ImmutableOrigin) {
446        let key = IndexedDBDescription { name, origin };
447        let Some(queue) = self.connection_queues.get_mut(&key) else {
448            return debug_assert!(false, "A connection queue should exist.");
449        };
450        let Some(open_request) = queue.pop_front() else {
451            return debug_assert!(false, "A pending open request should exist.");
452        };
453        let OpenRequest::Open {
454            sender,
455            origin: _,
456            db_name: _,
457            version: _,
458            pending_upgrade,
459            id: _,
460        } = open_request
461        else {
462            return;
463        };
464        let Some(VersionUpgrade { old: _, new }) = pending_upgrade else {
465            return debug_assert!(false, "A pending version upgrade should exist.");
466        };
467        if sender
468            .send(Ok(OpenDatabaseResult::Connection {
469                version: new,
470                upgraded: true,
471            }))
472            .is_err()
473        {
474            error!("Failed to send OpenDatabaseResult::Connection to script.");
475        };
476
477        self.advance_connection_queue(key);
478    }
479
480    /// Run the next open request in the queue.
481    fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
482        loop {
483            let is_open = {
484                let Some(queue) = self.connection_queues.get_mut(&key) else {
485                    return;
486                };
487                if queue.is_empty() {
488                    return;
489                }
490                queue.front().expect("Queue is not empty.").is_open()
491            };
492
493            if is_open {
494                self.open_database(key.clone());
495            } else {
496                self.delete_database(key.clone());
497            }
498
499            let was_pruned = self.maybe_remove_front_from_queue(&key);
500
501            if !was_pruned {
502                // Note: requests to delete a database are, at this point in the implementation,
503                // done in one step; so we can continue on to the next request.
504                // Request to open a connection consists of multiple async steps, so we must break if
505                // it is still pending.
506                break;
507            }
508        }
509    }
510
511    /// Remove the record at the front if it is not pending.
512    fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
513        let (is_empty, was_pruned) = {
514            let Some(queue) = self.connection_queues.get_mut(key) else {
515                debug_assert!(false, "A connection queue should exist.");
516                return false;
517            };
518            let mut pruned = false;
519            let front_is_pending = queue.front().map(|record| record.is_pending());
520            if let Some(is_pending) = front_is_pending {
521                if !is_pending {
522                    queue.pop_front().expect("Queue has a non-pending item.");
523                    pruned = true
524                }
525            }
526            (queue.is_empty(), pruned)
527        };
528        if is_empty {
529            self.connection_queues.remove(key);
530        }
531        was_pruned
532    }
533
534    /// Aborting the current upgrade for an origin.
535    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
536    /// Note: this only reverts the version at this point.
537    fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
538        let key = IndexedDBDescription {
539            name,
540            origin: origin.clone(),
541        };
542        let old = {
543            let Some(queue) = self.connection_queues.get_mut(&key) else {
544                return debug_assert!(
545                    false,
546                    "There should be a connection queue for the aborted upgrade."
547                );
548            };
549            let Some(open_request) = queue.pop_front() else {
550                return debug_assert!(false, "There should be an open request to upgrade.");
551            };
552            if open_request.get_id() != id {
553                return debug_assert!(
554                    false,
555                    "Open request to abort should be at the head of the queue."
556                );
557            }
558            open_request.abort()
559        };
560        if let Some(old_version) = old {
561            let Some(db) = self.databases.get_mut(&key) else {
562                return debug_assert!(false, "Db should have been created");
563            };
564            // Step 3: Set connection’s version to database’s version if database previously existed
565            //  or 0 (zero) if database was newly created.
566            let res = db.set_version(old_version);
567            debug_assert!(res.is_ok(), "Setting a db version should not fail.");
568        }
569
570        self.advance_connection_queue(key);
571    }
572
573    /// Aborting all upgrades for an origin
574    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
575    /// Note: this only reverts the version at this point.
576    fn abort_pending_upgrades(
577        &mut self,
578        pending_upgrades: HashMap<String, HashSet<Uuid>>,
579        origin: ImmutableOrigin,
580    ) {
581        for (name, ids) in pending_upgrades.into_iter() {
582            let mut version_to_revert: Option<u64> = None;
583            let key = IndexedDBDescription {
584                name,
585                origin: origin.clone(),
586            };
587            {
588                let is_empty = {
589                    let Some(queue) = self.connection_queues.get_mut(&key) else {
590                        continue;
591                    };
592                    queue.retain_mut(|open_request| {
593                        if ids.contains(&open_request.get_id()) {
594                            let old = open_request.abort();
595                            if version_to_revert.is_none() {
596                                if let Some(old) = old {
597                                    version_to_revert = Some(old);
598                                }
599                            }
600                            false
601                        } else {
602                            true
603                        }
604                    });
605                    queue.is_empty()
606                };
607                if is_empty {
608                    self.connection_queues.remove(&key);
609                }
610            }
611            if let Some(version) = version_to_revert {
612                let Some(db) = self.databases.get_mut(&key) else {
613                    return debug_assert!(false, "Db should have been created");
614                };
615                // Step 3: Set connection’s version to database’s version if database previously existed
616                //  or 0 (zero) if database was newly created.
617                let res = db.set_version(version);
618                debug_assert!(res.is_ok(), "Setting a db version should not fail.");
619            }
620        }
621    }
622
623    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
624    fn open_a_database_connection(
625        &mut self,
626        sender: GenericCallback<BackendResult<OpenDatabaseResult>>,
627        origin: ImmutableOrigin,
628        db_name: String,
629        version: Option<u64>,
630        id: Uuid,
631    ) {
632        let key = IndexedDBDescription {
633            name: db_name.clone(),
634            origin: origin.clone(),
635        };
636        let open_request = OpenRequest::Open {
637            sender,
638            origin,
639            db_name,
640            version,
641            pending_upgrade: None,
642            id,
643        };
644        let should_continue = {
645            // Step 1: Let queue be the connection queue for storageKey and name.
646            let queue = self.connection_queues.entry(key.clone()).or_default();
647
648            // Step 2: Add request to queue.
649            queue.push_back(open_request);
650            queue.len() == 1
651        };
652
653        // Step 3: Wait until all previous requests in queue have been processed.
654        if should_continue {
655            self.open_database(key.clone());
656            self.maybe_remove_front_from_queue(&key);
657        }
658    }
659
660    fn get_database(
661        &self,
662        origin: ImmutableOrigin,
663        db_name: String,
664    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
665        let idb_description = IndexedDBDescription {
666            origin,
667            name: db_name,
668        };
669
670        self.databases.get(&idb_description)
671    }
672
673    fn get_database_mut(
674        &mut self,
675        origin: ImmutableOrigin,
676        db_name: String,
677    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
678        let idb_description = IndexedDBDescription {
679            origin,
680            name: db_name,
681        };
682
683        self.databases.get_mut(&idb_description)
684    }
685
686    /// <https://w3c.github.io/IndexedDB/#upgrade-a-database>
687    /// To upgrade a database with connection (a connection),
688    /// a new version, and a request, run these steps:
689    /// TODO: connection and request.
690    fn upgrade_database(
691        &mut self,
692        idb_description: IndexedDBDescription,
693        key: IndexedDBDescription,
694        new_version: u64,
695    ) {
696        let Some(queue) = self.connection_queues.get_mut(&key) else {
697            return debug_assert!(false, "A connection queue should exist.");
698        };
699        let Some(open_request) = queue.front_mut() else {
700            return debug_assert!(false, "An open request should be in the queue.");
701        };
702        let OpenRequest::Open {
703            sender,
704            origin: _,
705            db_name: _,
706            version: _,
707            id: _,
708            pending_upgrade,
709        } = open_request
710        else {
711            return;
712        };
713
714        // Step 1: Let db be connection’s database.
715        // TODO: connection.
716        let db = self
717            .databases
718            .get_mut(&idb_description)
719            .expect("Db should have been opened.");
720
721        // Step 2: Let transaction be a new upgrade transaction with connection used as connection.
722        let transaction_id = self.serial_number_counter;
723        self.serial_number_counter += 1;
724
725        // Step 3: Set transaction’s scope to connection’s object store set.
726        // Step 4: Set db’s upgrade transaction to transaction.
727        // Step 5: Set transaction’s state to inactive.
728        // Step 6: Start transaction.
729        // TODO: implement transactions and their lifecyle.
730
731        // Step 7: Let old version be db’s version.
732        let old_version = db.version().expect("DB should have a version.");
733
734        // Step 8: Set db’s version to version.
735        // This change is considered part of the transaction,
736        // and so if the transaction is aborted, this change is reverted.
737        // TODO: wrap in transaction.
738        db.set_version(new_version)
739            .expect("Setting the version should not fail");
740
741        // Step 9: Set request’s processed flag to true.
742        // TODO: implement requests.
743
744        // Step 10: Queue a database task to run these steps:
745        if sender
746            .send(Ok(OpenDatabaseResult::Upgrade {
747                version: new_version,
748                old_version,
749                transaction: transaction_id,
750            }))
751            .is_err()
752        {
753            error!("Couldn't queue task for indexeddb upgrade event.");
754        }
755
756        // Step 11: Wait for transaction to finish.
757        let _ = pending_upgrade.insert(VersionUpgrade {
758            old: old_version,
759            new: new_version,
760        });
761    }
762
763    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
764    /// The part where the open request is ready for processing.
765    fn open_database(&mut self, key: IndexedDBDescription) {
766        let Some(queue) = self.connection_queues.get_mut(&key) else {
767            return debug_assert!(false, "A connection queue should exist.");
768        };
769        let Some(open_request) = queue.front_mut() else {
770            return debug_assert!(false, "An open request should be in the queue.");
771        };
772        let OpenRequest::Open {
773            sender,
774            origin,
775            db_name,
776            version,
777            id: _,
778            pending_upgrade: _,
779        } = open_request
780        else {
781            return debug_assert!(
782                false,
783                "An request to open a connection should be in the queue."
784            );
785        };
786
787        let idb_description = IndexedDBDescription {
788            origin: origin.clone(),
789            name: db_name.clone(),
790        };
791
792        let idb_base_dir = self.idb_base_dir.as_path();
793        let requested_version = version;
794
795        // Step 4: Let db be the database named name in origin, or null otherwise.
796        let (db_version, version) = match self.databases.entry(idb_description.clone()) {
797            Entry::Vacant(e) => {
798                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
799                // Note: done below with the zero as first tuple item.
800
801                // https://www.w3.org/TR/IndexedDB/#open-a-database-connection
802                // Step 6: If db is null, let db be a new database
803                // with name name, version 0 (zero), and with no object stores.
804                // If this fails for any reason, return an appropriate error
805                // (e.g. a "QuotaExceededError" or "UnknownError" DOMException).
806                let engine = match SqliteEngine::new(
807                    idb_base_dir,
808                    &idb_description,
809                    self.thread_pool.clone(),
810                ) {
811                    Ok(engine) => engine,
812                    Err(err) => {
813                        if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
814                            debug!("Script exit during indexeddb database open {:?}", e);
815                        }
816                        return;
817                    },
818                };
819                let created_db_path = engine.created_db_path();
820                let db = IndexedDBEnvironment::new(engine);
821                let db_version = match db.version() {
822                    Ok(version) => version,
823                    Err(err) => {
824                        if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
825                            debug!("Script exit during indexeddb database open {:?}", e);
826                        }
827
828                        return;
829                    },
830                };
831
832                let version = if created_db_path {
833                    requested_version.unwrap_or(1)
834                } else {
835                    requested_version.unwrap_or(db_version)
836                };
837
838                e.insert(db);
839                (db_version, version)
840            },
841            Entry::Occupied(db) => {
842                let db_version = match db.get().version() {
843                    Ok(version) => version,
844                    Err(err) => {
845                        if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
846                            debug!("Script exit during indexeddb database open {:?}", e);
847                        }
848                        return;
849                    },
850                };
851                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
852                (db_version, requested_version.unwrap_or(db_version))
853            },
854        };
855
856        // Step 7: If db’s version is greater than version,
857        // return a newly created "VersionError" DOMException
858        // and abort these steps.
859        if version < db_version {
860            if sender.send(Ok(OpenDatabaseResult::VersionError)).is_err() {
861                debug!("Script exit during indexeddb database open");
862            }
863            return;
864        }
865
866        // Step 8: Let connection be a new connection to db.
867        // Step 9: Set connection’s version to version.
868        let connection = Connection {
869            version,
870            close_pending: false,
871        };
872        let entry = self.connections.entry(idb_description.clone()).or_default();
873        let open_connections = entry.clone();
874        entry.insert(connection);
875
876        // Step 10: If db’s version is less than version, then:
877        if db_version < version {
878            // Step 10.1: Let openConnections be the set of all connections,
879            // except connection, associated with db.
880            // Note: done above with `open_connections`.
881
882            // Step 10.2: For each entry of openConnections
883            for _conn in open_connections
884                .into_iter()
885                .filter(|conn| conn.close_pending)
886            {
887                // TODO: queue a database task to fire a version change event
888                // named versionchange at entry with db’s version and version.
889            }
890
891            // that does not have its close pending flag set to true,
892            // queue a database task to fire a version change event
893            // named versionchange at entry with db’s version and version.
894            // Step 10.3: Wait for all of the events to be fired.
895            // Step 10.4: If any of the connections in openConnections are still not closed,
896            // queue a database task to fire a version change event named blocked
897            // at request with db’s version and version.
898            // Step 10.5: Wait until all connections in openConnections are closed.
899            // TODO: implement connections.
900
901            // Step 10.6: Run upgrade a database using connection, version and request.
902            self.upgrade_database(idb_description, key, version);
903            return;
904        }
905
906        // Step 11:
907        if sender
908            .send(Ok(OpenDatabaseResult::Connection {
909                version: db_version,
910                upgraded: false,
911            }))
912            .is_err()
913        {
914            error!("Failed to send OpenDatabaseResult::Connection to script.");
915        };
916    }
917
918    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
919    /// The part adding the request to the connection queue.
920    fn start_delete_database(
921        &mut self,
922        key: IndexedDBDescription,
923        id: Uuid,
924        sender: GenericCallback<BackendResult<u64>>,
925    ) {
926        let open_request = OpenRequest::Delete {
927            sender,
928            _origin: key.origin.clone(),
929            _db_name: key.name.clone(),
930            processed: false,
931            id,
932        };
933
934        let should_continue = {
935            // Step 1: Let queue be the connection queue for storageKey and name.
936            let queue = self.connection_queues.entry(key.clone()).or_default();
937
938            // Step 2: Add request to queue.
939            queue.push_back(open_request);
940            queue.len() == 1
941        };
942
943        // Step 3: Wait until all previous requests in queue have been processed.
944        if should_continue {
945            self.delete_database(key.clone());
946            self.maybe_remove_front_from_queue(&key);
947        }
948    }
949
950    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
951    fn delete_database(&mut self, key: IndexedDBDescription) {
952        let Some(queue) = self.connection_queues.get_mut(&key) else {
953            return debug_assert!(false, "A connection queue should exist.");
954        };
955        let Some(open_request) = queue.front_mut() else {
956            return debug_assert!(false, "An open request should be in the queue.");
957        };
958        let OpenRequest::Delete {
959            sender,
960            _origin: _,
961            _db_name: _,
962            processed,
963            id: _,
964        } = open_request
965        else {
966            return debug_assert!(
967                false,
968                "An request to open a connection should be in the queue."
969            );
970        };
971
972        // Step4: Let db be the database named name in storageKey, if one exists. Otherwise, return 0 (zero).
973        let version = if let Some(db) = self.databases.remove(&key) {
974            // Step 5: Let openConnections be the set of all connections associated with db.
975            // Step6: For each entry of openConnections that does not have its close pending flag set to true,
976            // queue a database task to fire a version change event named versionchange
977            // at entry with db’s version and null.
978            // Step 7: Wait for all of the events to be fired.
979            // Step 8: If any of the connections in openConnections are still not closed,
980            // queue a database task to fire a version change event
981            // named blocked at request with db’s version and null.
982            // Step 9: Wait until all connections in openConnections are closed.
983            // TODO: implement connections.
984
985            // Step 10: Let version be db’s version.
986            let res = db.version();
987            let Ok(version) = res else {
988                if sender
989                    .send(BackendResult::Err(BackendError::DbErr(
990                        res.unwrap_err().to_string(),
991                    )))
992                    .is_err()
993                {
994                    debug!("Script went away during pending database delete.");
995                }
996                return;
997            };
998
999            // Step 11: Delete db.
1000            // If this fails for any reason,
1001            // return an appropriate error (e.g. a QuotaExceededError, or an "UnknownError" DOMException).
1002            if let Err(err) = db.delete_database() {
1003                if sender
1004                    .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1005                    .is_err()
1006                {
1007                    debug!("Script went away during pending database delete.");
1008                }
1009                return;
1010            };
1011
1012            version
1013        } else {
1014            0
1015        };
1016
1017        // step 12: Return version.
1018        if sender.send(BackendResult::Ok(version)).is_err() {
1019            debug!("Script went away during pending database delete.");
1020        }
1021
1022        *processed = true;
1023    }
1024
1025    fn handle_sync_operation(&mut self, operation: SyncOperation) {
1026        match operation {
1027            SyncOperation::GetDatabases(sender, origin) => {
1028                // The in-parallel steps of https://www.w3.org/TR/IndexedDB/#dom-idbfactory-databases
1029
1030                // Step 4.1 Let databases be the set of databases in storageKey.
1031                // If this cannot be determined for any reason,
1032                // then queue a database task to reject p with an appropriate error
1033                // (e.g. an "UnknownError" DOMException) and terminate these steps.
1034                // TODO: separate database and connection concepts.
1035                // For now using `self.databases`, which track connections.
1036
1037                // Step 4.2: Let result be a new list.
1038                let info_list: Vec<DatabaseInfo> = self
1039                    .databases
1040                    .iter()
1041                    .filter_map(|(description, info)| {
1042                        // Step 4.3: For each db of databases:
1043                        if let Ok(version) = info.version() {
1044                            // Step 4.3.4: If db’s version is 0, then continue.
1045                            if version == 0 {
1046                                None
1047                            } else {
1048                                // Step 4.3.5: Let info be a new IDBDatabaseInfo dictionary.
1049                                // Step 4.3.6: Set info’s name dictionary member to db’s name.
1050                                // Step 4.3.7: Set info’s version dictionary member to db’s version.
1051                                // Step 4.3.8: Append info to result.
1052                                if description.origin == origin {
1053                                    Some(DatabaseInfo {
1054                                        name: description.name.clone(),
1055                                        version,
1056                                    })
1057                                } else {
1058                                    None
1059                                }
1060                            }
1061                        } else {
1062                            None
1063                        }
1064                    })
1065                    .collect();
1066
1067                // Note: if anything went wrong, we reply with an error.
1068                let result = if info_list.len() == self.databases.len() {
1069                    Ok(info_list)
1070                } else {
1071                    Err(BackendError::DbErr(
1072                        "Unknown error getting database info.".to_string(),
1073                    ))
1074                };
1075
1076                // Step 4.4: Queue a database task to resolve p with result.
1077                if sender.send(result).is_err() {
1078                    debug!("Couldn't send SyncOperation::GetDatabases reply.");
1079                }
1080            },
1081            SyncOperation::CloseDatabase(sender, origin, db_name) => {
1082                // TODO: Wait for all transactions created using connection to complete.
1083                // Note: current behavior is as if the `forced` flag is always set.
1084                // TODO: do not delete the database, only the connection.
1085                let idb_description = IndexedDBDescription {
1086                    origin,
1087                    name: db_name,
1088                };
1089                if let Some(_db) = self.databases.remove(&idb_description) {
1090                    // TODO: maybe a close database function should be added to the trait and called here?
1091                }
1092                let _ = sender.send(Ok(()));
1093            },
1094            SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1095                self.open_a_database_connection(sender, origin, db_name, version, id);
1096            },
1097            SyncOperation::AbortPendingUpgrades {
1098                pending_upgrades,
1099                origin,
1100            } => {
1101                self.abort_pending_upgrades(pending_upgrades, origin);
1102            },
1103            SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1104                self.abort_pending_upgrade(name, id, origin);
1105            },
1106            SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1107                let idb_description = IndexedDBDescription {
1108                    origin,
1109                    name: db_name,
1110                };
1111                self.start_delete_database(idb_description, id, callback);
1112            },
1113            SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
1114                let result = self
1115                    .get_database(origin, db_name)
1116                    .map(|db| db.has_key_generator(&store_name));
1117                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1118            },
1119            SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
1120                let result = self
1121                    .get_database(origin, db_name)
1122                    .map(|db| db.key_path(&store_name));
1123                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1124            },
1125            SyncOperation::CreateIndex(
1126                sender,
1127                origin,
1128                db_name,
1129                store_name,
1130                index_name,
1131                key_path,
1132                unique,
1133                multi_entry,
1134            ) => {
1135                if let Some(db) = self.get_database(origin, db_name) {
1136                    let result =
1137                        db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1138                    let _ = sender.send(result.map_err(BackendError::from));
1139                } else {
1140                    let _ = sender.send(Err(BackendError::DbNotFound));
1141                }
1142            },
1143            SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
1144                if let Some(db) = self.get_database(origin, db_name) {
1145                    let result = db.delete_index(&store_name, index_name);
1146                    let _ = sender.send(result.map_err(BackendError::from));
1147                } else {
1148                    let _ = sender.send(Err(BackendError::DbNotFound));
1149                }
1150            },
1151            SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
1152                // FIXME:(arihant2math) This does nothing at the moment
1153                let _ = sender.send(Ok(()));
1154            },
1155            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
1156                if let Some(db) = self.get_database_mut(origin, db_name) {
1157                    if version > db.version().unwrap_or(0) {
1158                        let _ = db.set_version(version);
1159                    }
1160                    // erroring out if the version is not upgraded can be and non-replicable
1161                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1162                } else {
1163                    let _ = sender.send(Err(BackendError::DbNotFound));
1164                }
1165            },
1166            SyncOperation::CreateObjectStore(
1167                sender,
1168                origin,
1169                db_name,
1170                store_name,
1171                key_paths,
1172                auto_increment,
1173            ) => {
1174                if let Some(db) = self.get_database_mut(origin, db_name) {
1175                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
1176                    let _ = sender.send(result.map_err(BackendError::from));
1177                } else {
1178                    let _ = sender.send(Err(BackendError::DbNotFound));
1179                }
1180            },
1181            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
1182                if let Some(db) = self.get_database_mut(origin, db_name) {
1183                    let result = db.delete_object_store(&store_name);
1184                    let _ = sender.send(result.map_err(BackendError::from));
1185                } else {
1186                    let _ = sender.send(Err(BackendError::DbNotFound));
1187                }
1188            },
1189            SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
1190                if let Some(db) = self.get_database_mut(origin, db_name) {
1191                    db.start_transaction(txn, Some(sender));
1192                } else {
1193                    let _ = sender.send(Err(BackendError::DbNotFound));
1194                }
1195            },
1196            SyncOperation::Version(sender, origin, db_name) => {
1197                if let Some(db) = self.get_database(origin, db_name) {
1198                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1199                } else {
1200                    let _ = sender.send(Err(BackendError::DbNotFound));
1201                }
1202            },
1203            SyncOperation::RegisterNewTxn(sender, _origin, _db_name) => {
1204                // Note: ignoring origin and name for now,
1205                // but those could be used again when implementing
1206                // lifecycle.
1207                let transaction_id = self.serial_number_counter;
1208                self.serial_number_counter += 1;
1209                let _ = sender.send(transaction_id);
1210            },
1211            SyncOperation::Exit(_) => {
1212                unreachable!("We must've already broken out of event loop.");
1213            },
1214        }
1215    }
1216}