storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use profile_traits::generic_callback::GenericCallback;
18use rusqlite::Error as RusqliteError;
19use rustc_hash::FxHashMap;
20use servo_config::pref;
21use servo_url::origin::ImmutableOrigin;
22use storage_traits::indexeddb::{
23    AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
24    DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
25    SyncOperation,
26};
27use uuid::Uuid;
28
29use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
30use crate::shared::is_sqlite_disk_full_error;
31
32pub trait IndexedDBThreadFactory {
33    fn new(config_dir: Option<PathBuf>) -> Self;
34}
35
36impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
37    fn new(config_dir: Option<PathBuf>) -> GenericSender<IndexedDBThreadMsg> {
38        let (chan, port) = generic_channel::channel().unwrap();
39
40        let mut idb_base_dir = PathBuf::new();
41        if let Some(p) = config_dir {
42            idb_base_dir.push(p);
43        }
44        idb_base_dir.push("IndexedDB");
45
46        thread::Builder::new()
47            .name("IndexedDBManager".to_owned())
48            .spawn(move || {
49                IndexedDBManager::new(port, idb_base_dir).start();
50            })
51            .expect("Thread spawning failed");
52
53        chan
54    }
55}
56
57/// A key used to track databases.
58/// TODO: use a storage key.
59#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub struct IndexedDBDescription {
61    pub origin: ImmutableOrigin,
62    pub name: String,
63}
64
65impl IndexedDBDescription {
66    // randomly generated namespace for our purposes
67    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
68        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
69        0x5d,
70    ]);
71    // Converts the database description to a folder name where all
72    // data for this database is stored
73    pub(super) fn as_path(&self) -> PathBuf {
74        let mut path = PathBuf::new();
75
76        // uuid v5 is deterministic
77        let origin_uuid = Uuid::new_v5(
78            Self::NAMESPACE_SERVO_IDB,
79            self.origin.ascii_serialization().as_bytes(),
80        );
81        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
82        path.push(origin_uuid.to_string());
83        path.push(db_name_uuid.to_string());
84
85        path
86    }
87}
88
89struct IndexedDBEnvironment<E: KvsEngine> {
90    engine: E,
91    transactions: FxHashMap<u64, KvsTransaction>,
92}
93
94impl<E: KvsEngine> IndexedDBEnvironment<E> {
95    fn new(engine: E) -> IndexedDBEnvironment<E> {
96        IndexedDBEnvironment {
97            engine,
98            transactions: FxHashMap::default(),
99        }
100    }
101
102    fn queue_operation(
103        &mut self,
104        store_name: &str,
105        serial_number: u64,
106        mode: IndexedDBTxnMode,
107        operation: AsyncOperation,
108    ) {
109        self.transactions
110            .entry(serial_number)
111            .or_insert_with(|| KvsTransaction {
112                requests: VecDeque::new(),
113                mode,
114            })
115            .requests
116            .push_back(KvsOperation {
117                operation,
118                store_name: String::from(store_name),
119            });
120    }
121
122    // Executes all requests for a transaction (without committing)
123    fn start_transaction(&mut self, txn: u64, sender: Option<GenericSender<BackendResult<()>>>) {
124        // FIXME:(arihant2math) find optimizations in this function
125        //   rather than on the engine level code (less repetition)
126        if let Some(txn) = self.transactions.remove(&txn) {
127            let _ = self.engine.process_transaction(txn).blocking_recv();
128        }
129
130        // We have a sender if the transaction is started manually, and they
131        // probably want to know when it is finished
132        if let Some(sender) = sender {
133            if sender.send(Ok(())).is_err() {
134                warn!("IDBTransaction starter dropped its channel");
135            }
136        }
137    }
138
139    fn has_key_generator(&self, store_name: &str) -> bool {
140        self.engine.has_key_generator(store_name)
141    }
142
143    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
144        self.engine.key_path(store_name)
145    }
146
147    fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
148        self.engine
149            .indexes(store_name)
150            .map_err(|err| format!("{err:?}"))
151    }
152
153    fn create_index(
154        &self,
155        store_name: &str,
156        index_name: String,
157        key_path: KeyPath,
158        unique: bool,
159        multi_entry: bool,
160    ) -> DbResult<CreateObjectResult> {
161        self.engine
162            .create_index(store_name, index_name, key_path, unique, multi_entry)
163            .map_err(|err| format!("{err:?}"))
164    }
165
166    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
167        self.engine
168            .delete_index(store_name, index_name)
169            .map_err(|err| format!("{err:?}"))
170    }
171
172    fn create_object_store(
173        &mut self,
174        store_name: &str,
175        key_path: Option<KeyPath>,
176        auto_increment: bool,
177    ) -> DbResult<CreateObjectResult> {
178        self.engine
179            .create_store(store_name, key_path, auto_increment)
180            .map_err(|err| format!("{err:?}"))
181    }
182
183    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
184        let result = self.engine.delete_store(store_name);
185        result.map_err(|err| format!("{err:?}"))
186    }
187
188    fn delete_database(self) -> BackendResult<()> {
189        let result = self.engine.delete_database();
190        result
191            .map_err(|err| format!("{err:?}"))
192            .map_err(BackendError::from)
193    }
194
195    fn version(&self) -> Result<u64, E::Error> {
196        self.engine.version()
197    }
198
199    fn set_version(&mut self, version: u64) -> DbResult<()> {
200        self.engine
201            .set_version(version)
202            .map_err(|err| format!("{err:?}"))
203    }
204}
205
206fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
207    if is_sqlite_disk_full_error(&err) {
208        BackendError::QuotaExceeded
209    } else {
210        BackendError::DbErr(format!("{err:?}"))
211    }
212}
213
214/// <https://w3c.github.io/IndexedDB/#request-open-request>
215/// Used here to implement the
216/// <https://w3c.github.io/IndexedDB/#connection-queue>
217enum OpenRequest {
218    Open {
219        /// The callback used to send a result to script.
220        sender: GenericCallback<ConnectionMsg>,
221
222        /// The name of the database.
223        db_name: String,
224
225        /// Optionnaly, a requested db version.
226        /// Note: when the open algorithm starts, this will be mutated and set to something as per the algo.
227        version: Option<u64>,
228
229        /// Optionnaly, a version pending ugrade.
230        /// Used as <https://w3c.github.io/IndexedDB/#request-processed-flag>
231        pending_upgrade: Option<VersionUpgrade>,
232
233        /// This request is pending on these connections to close.
234        pending_close: HashSet<Uuid>,
235
236        /// This request is pending on these connections to fire a versionchange event.
237        /// Note: This starts as equal to `pending_close`, but when all events have fired,
238        /// not all connections need to have closed, in which case the `blocked` event
239        /// is fired on this request.
240        pending_versionchange: HashSet<Uuid>,
241
242        id: Uuid,
243    },
244    Delete {
245        /// The callback used to send a result to script.
246        sender: GenericCallback<BackendResult<u64>>,
247
248        /// The origin of the request.
249        /// TODO: storage key.
250        /// Note: will be used when the full spec is implemented.
251        _origin: ImmutableOrigin,
252
253        /// The name of the database.
254        /// Note: will be used when the full spec is implemented.
255        _db_name: String,
256
257        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
258        processed: bool,
259
260        id: Uuid,
261    },
262}
263
264impl OpenRequest {
265    fn get_id(&self) -> Uuid {
266        let id = match self {
267            OpenRequest::Open {
268                sender: _,
269                db_name: _,
270                version: _,
271                pending_upgrade: _,
272                pending_close: _,
273                pending_versionchange: _,
274                id,
275            } => id,
276            OpenRequest::Delete {
277                sender: _,
278                _origin: _,
279                _db_name: _,
280                processed: _,
281                id,
282            } => id,
283        };
284        *id
285    }
286
287    fn is_open(&self) -> bool {
288        match self {
289            OpenRequest::Open {
290                sender: _,
291                db_name: _,
292                version: _,
293                pending_upgrade: _,
294                pending_close: _,
295                pending_versionchange: _,
296                id: _,
297            } => true,
298            OpenRequest::Delete {
299                sender: _,
300                _origin: _,
301                _db_name: _,
302                processed: _,
303                id: _,
304            } => false,
305        }
306    }
307
308    /// An open request can be pending either an upgrade,
309    /// or the closing of other connections.
310    fn is_pending(&self) -> bool {
311        match self {
312            OpenRequest::Open {
313                sender: _,
314                db_name: _,
315                version: _,
316                pending_upgrade,
317                pending_close,
318                pending_versionchange,
319                id: _,
320            } => {
321                pending_upgrade.is_some() ||
322                    !pending_close.is_empty() ||
323                    !pending_versionchange.is_empty()
324            },
325            OpenRequest::Delete {
326                sender: _,
327                _origin: _,
328                _db_name: _,
329                processed,
330                id: _,
331            } => !processed,
332        }
333    }
334
335    /// Abort the open request,
336    /// optionally returning a version to revert to.
337    fn abort(&self) -> Option<u64> {
338        match self {
339            OpenRequest::Open {
340                sender,
341                db_name,
342                version: _,
343                pending_close: _,
344                pending_versionchange: _,
345                pending_upgrade,
346                id,
347            } => {
348                if sender
349                    .send(ConnectionMsg::AbortError {
350                        name: db_name.clone(),
351                        id: *id,
352                    })
353                    .is_err()
354                {
355                    error!("Failed to send ConnectionMsg::Connection to script.");
356                };
357                pending_upgrade.as_ref().map(|upgrade| upgrade.old)
358            },
359            OpenRequest::Delete {
360                sender,
361                _origin: _,
362                _db_name: _,
363                processed: _,
364                id: _,
365            } => {
366                if sender.send(Err(BackendError::DbNotFound)).is_err() {
367                    error!("Failed to send result of database delete to script.");
368                };
369                None
370            },
371        }
372    }
373}
374
375struct VersionUpgrade {
376    old: u64,
377    new: u64,
378}
379
380/// <https://w3c.github.io/IndexedDB/#connection>
381struct Connection {
382    /// <https://w3c.github.io/IndexedDB/#connection-close-pending-flag>
383    close_pending: bool,
384
385    /// The callback used to send a result to script.
386    sender: GenericCallback<ConnectionMsg>,
387}
388
389struct IndexedDBManager {
390    port: GenericReceiver<IndexedDBThreadMsg>,
391    idb_base_dir: PathBuf,
392    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
393    thread_pool: Arc<ThreadPool>,
394
395    /// A global counter to produce unique transaction ids.
396    /// TODO: remove once db connections lifecyle is managed.
397    /// A global counter is only necessary because of how deleting a db currently
398    /// does not wait for connection to close and transactions to finish.
399    serial_number_counter: u64,
400
401    /// <https://w3c.github.io/IndexedDB/#connection-queue>
402    connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
403
404    /// <https://w3c.github.io/IndexedDB/#connection>
405    connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
406}
407
408impl IndexedDBManager {
409    fn new(port: GenericReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
410        debug!("New indexedDBManager");
411
412        // Uses an estimate of the system cpus to process IndexedDB transactions
413        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
414        // If no information can be obtained about the system, uses 4 threads as a default
415        let thread_count = thread::available_parallelism()
416            .map(|i| i.get())
417            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
418            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
419
420        IndexedDBManager {
421            port,
422            idb_base_dir,
423            databases: HashMap::new(),
424            thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
425            serial_number_counter: 0,
426            connection_queues: Default::default(),
427            connections: Default::default(),
428        }
429    }
430}
431
432impl IndexedDBManager {
433    fn start(&mut self) {
434        loop {
435            // FIXME:(arihant2math) No message *most likely* means that
436            // the ipc sender has been dropped, so we break the look
437            let message = match self.port.recv() {
438                Ok(msg) => msg,
439                Err(ReceiveError::Disconnected) => {
440                    break;
441                },
442                Err(e) => {
443                    warn!("Error in IndexedDB thread: {e:?}");
444                    continue;
445                },
446            };
447            match message {
448                IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
449                    let _ = sender.send(());
450                    break;
451                },
452                IndexedDBThreadMsg::Sync(operation) => {
453                    self.handle_sync_operation(operation);
454                },
455                IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
456                    if let Some(db) = self.get_database_mut(origin, db_name) {
457                        // Queues an operation for a transaction without starting it
458                        db.queue_operation(&store_name, txn, mode, operation);
459                        // FIXME:(arihant2math) Schedule transactions properly
460                        // while db.transactions.iter().any(|s| s.1.mode == IndexedDBTxnMode::Readwrite) {
461                        //     std::hint::spin_loop();
462                        // }
463                        db.start_transaction(txn, None);
464                    }
465                },
466                IndexedDBThreadMsg::OpenTransactionInactive { name, origin } => {
467                    self.handle_open_transaction_inactive(name, origin);
468                },
469            }
470        }
471    }
472
473    /// Handle when an open transaction becomes inactive.
474    fn handle_open_transaction_inactive(&mut self, name: String, origin: ImmutableOrigin) {
475        let key = IndexedDBDescription { name, origin };
476        let Some(queue) = self.connection_queues.get_mut(&key) else {
477            return debug_assert!(false, "A connection queue should exist.");
478        };
479        let Some(open_request) = queue.pop_front() else {
480            return debug_assert!(false, "A pending open request should exist.");
481        };
482        let OpenRequest::Open {
483            sender,
484            db_name,
485            version: _,
486            pending_upgrade,
487            pending_close: _,
488            pending_versionchange: _,
489            id,
490        } = open_request
491        else {
492            return;
493        };
494        let Some(VersionUpgrade { old: _, new }) = pending_upgrade else {
495            return debug_assert!(false, "A pending version upgrade should exist.");
496        };
497        if sender
498            .send(ConnectionMsg::Connection {
499                id,
500                name: db_name,
501                version: new,
502                upgraded: true,
503            })
504            .is_err()
505        {
506            error!("Failed to send ConnectionMsg::Connection to script.");
507        };
508
509        self.advance_connection_queue(key);
510    }
511
512    /// Run the next open request in the queue.
513    fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
514        loop {
515            let is_open = {
516                let Some(queue) = self.connection_queues.get_mut(&key) else {
517                    return;
518                };
519                if queue.is_empty() {
520                    return;
521                }
522                queue.front().expect("Queue is not empty.").is_open()
523            };
524
525            if is_open {
526                self.open_database(key.clone());
527            } else {
528                self.delete_database(key.clone());
529            }
530
531            let was_pruned = self.maybe_remove_front_from_queue(&key);
532
533            if !was_pruned {
534                // Note: requests to delete a database are, at this point in the implementation,
535                // done in one step; so we can continue on to the next request.
536                // Request to open a connection consists of multiple async steps, so we must break if
537                // it is still pending.
538                break;
539            }
540        }
541    }
542
543    /// Remove the record at the front if it is not pending.
544    fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
545        let (is_empty, was_pruned) = {
546            let Some(queue) = self.connection_queues.get_mut(key) else {
547                debug_assert!(false, "A connection queue should exist.");
548                return false;
549            };
550            let mut pruned = false;
551            let front_is_pending = queue.front().map(|record| record.is_pending());
552            if let Some(is_pending) = front_is_pending {
553                if !is_pending {
554                    queue.pop_front().expect("Queue has a non-pending item.");
555                    pruned = true
556                }
557            }
558            (queue.is_empty(), pruned)
559        };
560        if is_empty {
561            self.connection_queues.remove(key);
562        }
563        was_pruned
564    }
565
566    fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
567        let is_empty = {
568            let Some(connections) = self.connections.get_mut(key) else {
569                return debug!("Connection already removed.");
570            };
571            connections.remove(id);
572            connections.is_empty()
573        };
574
575        if is_empty {
576            self.connections.remove(key);
577        }
578    }
579
580    /// Aborting the current upgrade for an origin.
581    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
582    /// Note: this only reverts the version at this point.
583    fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
584        let key = IndexedDBDescription {
585            name,
586            origin: origin.clone(),
587        };
588        let old = {
589            let Some(queue) = self.connection_queues.get_mut(&key) else {
590                return debug_assert!(
591                    false,
592                    "There should be a connection queue for the aborted upgrade."
593                );
594            };
595            let Some(open_request) = queue.pop_front() else {
596                return debug_assert!(false, "There should be an open request to upgrade.");
597            };
598            if open_request.get_id() != id {
599                return debug_assert!(
600                    false,
601                    "Open request to abort should be at the head of the queue."
602                );
603            }
604            open_request.abort()
605        };
606        if let Some(old_version) = old {
607            let Some(db) = self.databases.get_mut(&key) else {
608                return debug_assert!(false, "Db should have been created");
609            };
610            // Step 3: Set connection’s version to database’s version if database previously existed
611            //  or 0 (zero) if database was newly created.
612            let res = db.set_version(old_version);
613            debug_assert!(res.is_ok(), "Setting a db version should not fail.");
614        }
615
616        self.remove_connection(&key, &id);
617
618        self.advance_connection_queue(key);
619    }
620
621    /// Aborting all upgrades for an origin
622    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
623    /// Note: this only reverts the version at this point.
624    fn abort_pending_upgrades(
625        &mut self,
626        pending_upgrades: HashMap<String, HashSet<Uuid>>,
627        origin: ImmutableOrigin,
628    ) {
629        for (name, ids) in pending_upgrades.into_iter() {
630            let mut version_to_revert: Option<u64> = None;
631            let key = IndexedDBDescription {
632                name,
633                origin: origin.clone(),
634            };
635            for id in ids.iter() {
636                self.remove_connection(&key, id);
637            }
638            {
639                let is_empty = {
640                    let Some(queue) = self.connection_queues.get_mut(&key) else {
641                        continue;
642                    };
643                    queue.retain_mut(|open_request| {
644                        if ids.contains(&open_request.get_id()) {
645                            let old = open_request.abort();
646                            if version_to_revert.is_none() {
647                                if let Some(old) = old {
648                                    version_to_revert = Some(old);
649                                }
650                            }
651                            false
652                        } else {
653                            true
654                        }
655                    });
656                    queue.is_empty()
657                };
658                if is_empty {
659                    self.connection_queues.remove(&key);
660                }
661            }
662            if let Some(version) = version_to_revert {
663                let Some(db) = self.databases.get_mut(&key) else {
664                    return debug_assert!(false, "Db should have been created");
665                };
666                // Step 3: Set connection’s version to database’s version if database previously existed
667                //  or 0 (zero) if database was newly created.
668                let res = db.set_version(version);
669                debug_assert!(res.is_ok(), "Setting a db version should not fail.");
670            }
671        }
672    }
673
674    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
675    fn open_a_database_connection(
676        &mut self,
677        sender: GenericCallback<ConnectionMsg>,
678        origin: ImmutableOrigin,
679        db_name: String,
680        version: Option<u64>,
681        id: Uuid,
682    ) {
683        let key = IndexedDBDescription {
684            name: db_name.clone(),
685            origin: origin.clone(),
686        };
687        let open_request = OpenRequest::Open {
688            sender,
689            db_name,
690            version,
691            pending_close: Default::default(),
692            pending_versionchange: Default::default(),
693            pending_upgrade: None,
694            id,
695        };
696        let should_continue = {
697            // Step 1: Let queue be the connection queue for storageKey and name.
698            let queue = self.connection_queues.entry(key.clone()).or_default();
699
700            // Step 2: Add request to queue.
701            queue.push_back(open_request);
702            queue.len() == 1
703        };
704
705        // Step 3: Wait until all previous requests in queue have been processed.
706        if should_continue {
707            self.open_database(key.clone());
708            self.maybe_remove_front_from_queue(&key);
709        }
710    }
711
712    fn get_database(
713        &self,
714        origin: ImmutableOrigin,
715        db_name: String,
716    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
717        let idb_description = IndexedDBDescription {
718            origin,
719            name: db_name,
720        };
721
722        self.databases.get(&idb_description)
723    }
724
725    fn get_database_mut(
726        &mut self,
727        origin: ImmutableOrigin,
728        db_name: String,
729    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
730        let idb_description = IndexedDBDescription {
731            origin,
732            name: db_name,
733        };
734
735        self.databases.get_mut(&idb_description)
736    }
737
738    /// <https://w3c.github.io/IndexedDB/#upgrade-a-database>
739    /// To upgrade a database with connection (a connection),
740    /// a new version, and a request, run these steps:
741    /// TODO: connection and request.
742    fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
743        let Some(queue) = self.connection_queues.get_mut(&key) else {
744            return debug_assert!(false, "A connection queue should exist.");
745        };
746        let Some(open_request) = queue.front_mut() else {
747            return debug_assert!(false, "An open request should be in the queue.");
748        };
749        let OpenRequest::Open {
750            sender,
751            db_name,
752            version: _,
753            id,
754            pending_close: _,
755            pending_versionchange: _,
756            pending_upgrade,
757        } = open_request
758        else {
759            return;
760        };
761
762        // Step 1: Let db be connection’s database.
763        // TODO: connection.
764        let db = self
765            .databases
766            .get_mut(&key)
767            .expect("Db should have been opened.");
768
769        // Step 2: Let transaction be a new upgrade transaction with connection used as connection.
770        let transaction_id = self.serial_number_counter;
771        self.serial_number_counter += 1;
772
773        // Step 3: Set transaction’s scope to connection’s object store set.
774        // Step 4: Set db’s upgrade transaction to transaction.
775        // Step 5: Set transaction’s state to inactive.
776        // Step 6: Start transaction.
777        // TODO: implement transactions and their lifecyle.
778
779        // Step 7: Let old version be db’s version.
780        let old_version = db.version().expect("DB should have a version.");
781
782        // Step 8: Set db’s version to version.
783        // This change is considered part of the transaction,
784        // and so if the transaction is aborted, this change is reverted.
785        // TODO: wrap in transaction.
786        db.set_version(new_version)
787            .expect("Setting the version should not fail");
788
789        // Step 9: Set request’s processed flag to true.
790        // TODO: implement requests.
791
792        // Step 10: Queue a database task to run these steps:
793        if sender
794            .send(ConnectionMsg::Upgrade {
795                id: *id,
796                name: db_name.clone(),
797                version: new_version,
798                old_version,
799                transaction: transaction_id,
800            })
801            .is_err()
802        {
803            error!("Couldn't queue task for indexeddb upgrade event.");
804        }
805
806        // Step 11: Wait for transaction to finish.
807        let _ = pending_upgrade.insert(VersionUpgrade {
808            old: old_version,
809            new: new_version,
810        });
811    }
812
813    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
814    fn handle_version_change_done(
815        &mut self,
816        name: String,
817        from_id: Uuid,
818        old_version: u64,
819        origin: ImmutableOrigin,
820    ) {
821        let key = IndexedDBDescription {
822            name: name.clone(),
823            origin: origin.clone(),
824        };
825        let (can_upgrade, version) = {
826            let Some(queue) = self.connection_queues.get_mut(&key) else {
827                return debug_assert!(false, "A connection queue should exist.");
828            };
829            let Some(open_request) = queue.front_mut() else {
830                return debug_assert!(false, "An open request should be in the queue.");
831            };
832            let OpenRequest::Open {
833                sender,
834                db_name: _,
835                version,
836                id,
837                pending_upgrade: _,
838                pending_versionchange,
839                pending_close,
840            } = open_request
841            else {
842                return debug_assert!(
843                    false,
844                    "An request to open a connection should be in the queue."
845                );
846            };
847            debug_assert!(
848                pending_versionchange.contains(&from_id),
849                "The open request should be pending on the versionchange event for the connection sending the message."
850            );
851
852            pending_versionchange.remove(&from_id);
853
854            // Step 10.3: Wait for all of the events to be fired.
855            if !pending_versionchange.is_empty() {
856                return;
857            }
858
859            let Some(version) = *version else {
860                return debug_assert!(
861                    false,
862                    "An upgrade version should have been determined by now."
863                );
864            };
865
866            // Step 10.4: If any of the connections in openConnections are still not closed,
867            // queue a database task to fire a version change event named blocked
868            // at request with db’s version and version.
869            if !pending_close.is_empty() &&
870                sender
871                    .send(ConnectionMsg::Blocked {
872                        name,
873                        id: *id,
874                        version,
875                        old_version,
876                    })
877                    .is_err()
878            {
879                return debug!("Script exit during indexeddb database open");
880            }
881
882            (pending_close.is_empty(), version)
883        };
884
885        // Step 10.5: Wait until all connections in openConnections are closed.
886        // Note: if we still need to wait, the algorithm will continue in the handling of the close message.
887        if can_upgrade {
888            // Step 10.6: Run upgrade a database using connection, version and request.
889            self.upgrade_database(key.clone(), version);
890
891            let was_pruned = self.maybe_remove_front_from_queue(&key);
892            if was_pruned {
893                self.advance_connection_queue(key);
894            }
895        }
896    }
897
898    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
899    /// The part where the open request is ready for processing.
900    fn open_database(&mut self, key: IndexedDBDescription) {
901        let Some(queue) = self.connection_queues.get_mut(&key) else {
902            return debug_assert!(false, "A connection queue should exist.");
903        };
904        let Some(open_request) = queue.front_mut() else {
905            return debug_assert!(false, "An open request should be in the queue.");
906        };
907        let OpenRequest::Open {
908            sender,
909            db_name,
910            version,
911            id,
912            pending_upgrade: _,
913            pending_close,
914            pending_versionchange,
915        } = open_request
916        else {
917            return debug_assert!(
918                false,
919                "An request to open a connection should be in the queue."
920            );
921        };
922
923        let idb_base_dir = self.idb_base_dir.as_path();
924        let requested_version = *version;
925
926        // Step 4: Let db be the database named name in origin, or null otherwise.
927        let db_version = match self.databases.entry(key.clone()) {
928            Entry::Vacant(e) => {
929                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
930                // Note: done below with the zero as first tuple item.
931
932                // https://www.w3.org/TR/IndexedDB/#open-a-database-connection
933                // Step 6: If db is null, let db be a new database
934                // with name name, version 0 (zero), and with no object stores.
935                // If this fails for any reason, return an appropriate error
936                // (e.g. a "QuotaExceededError" or "UnknownError" DOMException).
937                let engine = match SqliteEngine::new(idb_base_dir, &key, self.thread_pool.clone()) {
938                    Ok(engine) => engine,
939                    Err(err) => {
940                        let error = backend_error_from_sqlite_error(err);
941                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
942                            id: *id,
943                            name: db_name.clone(),
944                            error,
945                        }) {
946                            debug!("Script exit during indexeddb database open {:?}", e);
947                        }
948                        return;
949                    },
950                };
951                let created_db_path = engine.created_db_path();
952                let db = IndexedDBEnvironment::new(engine);
953                let db_version = match db.version() {
954                    Ok(version) => version,
955                    Err(err) => {
956                        let error = backend_error_from_sqlite_error(err);
957                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
958                            id: *id,
959                            name: db_name.clone(),
960                            error,
961                        }) {
962                            debug!("Script exit during indexeddb database open {:?}", e);
963                        }
964                        return;
965                    },
966                };
967
968                *version = if created_db_path {
969                    Some(requested_version.unwrap_or(1))
970                } else {
971                    Some(requested_version.unwrap_or(db_version))
972                };
973
974                e.insert(db);
975                db_version
976            },
977            Entry::Occupied(db) => {
978                let db_version = match db.get().version() {
979                    Ok(version) => version,
980                    Err(err) => {
981                        let error = backend_error_from_sqlite_error(err);
982                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
983                            id: *id,
984                            name: db_name.clone(),
985                            error,
986                        }) {
987                            debug!("Script exit during indexeddb database open {:?}", e);
988                        }
989                        return;
990                    },
991                };
992                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
993                *version = Some(requested_version.unwrap_or(db_version));
994                db_version
995            },
996        };
997
998        let Some(version) = *version else {
999            return debug_assert!(
1000                false,
1001                "An upgrade version should have been determined by now."
1002            );
1003        };
1004
1005        // Step 7: If db’s version is greater than version,
1006        // return a newly created "VersionError" DOMException
1007        // and abort these steps.
1008        if version < db_version {
1009            if sender
1010                .send(ConnectionMsg::VersionError {
1011                    name: db_name.clone(),
1012                    id: *id,
1013                })
1014                .is_err()
1015            {
1016                debug!("Script exit during indexeddb database open");
1017            }
1018            return;
1019        }
1020
1021        // Step 8: Let connection be a new connection to db.
1022        // Step 9: Set connection’s version to version.
1023        let connection = Connection {
1024            close_pending: false,
1025            sender: sender.clone(),
1026        };
1027        let entry = self.connections.entry(key.clone()).or_default();
1028        entry.insert(*id, connection);
1029
1030        // Step 10: If db’s version is less than version, then:
1031        if db_version < version {
1032            // Step 10.1: Let openConnections be the set of all connections,
1033            // except connection, associated with db.
1034            let open_connections = entry
1035                .iter_mut()
1036                .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1037            for (id_to_close, conn) in open_connections {
1038                // Step 10.2: For each entry of openConnections
1039                // queue a database task to fire a version change event
1040                // named versionchange at entry with db’s version and version.
1041                if conn
1042                    .sender
1043                    .send(ConnectionMsg::VersionChange {
1044                        name: db_name.clone(),
1045                        id: *id_to_close,
1046                        version,
1047                        old_version: db_version,
1048                    })
1049                    .is_err()
1050                {
1051                    error!("Failed to send ConnectionMsg::Connection to script.");
1052                };
1053                pending_close.insert(*id_to_close);
1054                pending_versionchange.insert(*id_to_close);
1055            }
1056            if !pending_close.is_empty() {
1057                // Step 10.3: Wait for all of the events to be fired.
1058                return;
1059            }
1060
1061            // Step 10.6: Run upgrade a database using connection, version and request.
1062            self.upgrade_database(key, version);
1063            return;
1064        }
1065
1066        // Step 11:
1067        if sender
1068            .send(ConnectionMsg::Connection {
1069                name: db_name.clone(),
1070                id: *id,
1071                version: db_version,
1072                upgraded: false,
1073            })
1074            .is_err()
1075        {
1076            error!("Failed to send ConnectionMsg::Connection to script.");
1077        };
1078    }
1079
1080    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1081    /// The part adding the request to the connection queue.
1082    fn start_delete_database(
1083        &mut self,
1084        key: IndexedDBDescription,
1085        id: Uuid,
1086        sender: GenericCallback<BackendResult<u64>>,
1087    ) {
1088        let open_request = OpenRequest::Delete {
1089            sender,
1090            _origin: key.origin.clone(),
1091            _db_name: key.name.clone(),
1092            processed: false,
1093            id,
1094        };
1095
1096        let should_continue = {
1097            // Step 1: Let queue be the connection queue for storageKey and name.
1098            let queue = self.connection_queues.entry(key.clone()).or_default();
1099
1100            // Step 2: Add request to queue.
1101            queue.push_back(open_request);
1102            queue.len() == 1
1103        };
1104
1105        // Step 3: Wait until all previous requests in queue have been processed.
1106        if should_continue {
1107            self.delete_database(key.clone());
1108            self.maybe_remove_front_from_queue(&key);
1109        }
1110    }
1111
1112    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1113    fn delete_database(&mut self, key: IndexedDBDescription) {
1114        let Some(queue) = self.connection_queues.get_mut(&key) else {
1115            return debug_assert!(false, "A connection queue should exist.");
1116        };
1117        let Some(open_request) = queue.front_mut() else {
1118            return debug_assert!(false, "An open request should be in the queue.");
1119        };
1120        let OpenRequest::Delete {
1121            sender,
1122            _origin: _,
1123            _db_name: _,
1124            processed,
1125            id: _,
1126        } = open_request
1127        else {
1128            return debug_assert!(
1129                false,
1130                "An request to open a connection should be in the queue."
1131            );
1132        };
1133
1134        // Step4: Let db be the database named name in storageKey, if one exists. Otherwise, return 0 (zero).
1135        let version = if let Some(db) = self.databases.remove(&key) {
1136            // Step 5: Let openConnections be the set of all connections associated with db.
1137            // Step6: For each entry of openConnections that does not have its close pending flag set to true,
1138            // queue a database task to fire a version change event named versionchange
1139            // at entry with db’s version and null.
1140            // Step 7: Wait for all of the events to be fired.
1141            // Step 8: If any of the connections in openConnections are still not closed,
1142            // queue a database task to fire a version change event
1143            // named blocked at request with db’s version and null.
1144            // Step 9: Wait until all connections in openConnections are closed.
1145            // TODO: implement connections.
1146
1147            // Step 10: Let version be db’s version.
1148            let res = db.version();
1149            let Ok(version) = res else {
1150                if sender
1151                    .send(BackendResult::Err(BackendError::DbErr(
1152                        res.unwrap_err().to_string(),
1153                    )))
1154                    .is_err()
1155                {
1156                    debug!("Script went away during pending database delete.");
1157                }
1158                return;
1159            };
1160
1161            // Step 11: Delete db.
1162            // If this fails for any reason,
1163            // return an appropriate error (e.g. a QuotaExceededError, or an "UnknownError" DOMException).
1164            if let Err(err) = db.delete_database() {
1165                if sender
1166                    .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1167                    .is_err()
1168                {
1169                    debug!("Script went away during pending database delete.");
1170                }
1171                return;
1172            };
1173
1174            version
1175        } else {
1176            0
1177        };
1178
1179        // step 12: Return version.
1180        if sender.send(BackendResult::Ok(version)).is_err() {
1181            debug!("Script went away during pending database delete.");
1182        }
1183
1184        *processed = true;
1185    }
1186
1187    /// <https://w3c.github.io/IndexedDB/#closing-connection>
1188    fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1189        // Step 1: Set connection’s close pending flag to true.
1190        // TODO: seems like a script only flag.
1191
1192        // Step 2: If the forced flag is true,
1193        // then for each transaction created using connection
1194        // run abort a transaction with transaction and newly created "AbortError" DOMException.
1195        // Step 3: Wait for all transactions created using connection to complete.
1196        // Once they are complete, connection is closed.
1197        // TODO: transaction lifecycle.
1198
1199        // Step 4: If the forced flag is true, then fire an event named close at connection.
1200        // TODO: implement, probably only on the script side of things.
1201
1202        // Note: below we are continuing
1203        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1204        // in the case that an open request is waiting for connections to close.
1205        let key = IndexedDBDescription { origin, name };
1206        let (can_upgrade, version) = {
1207            self.remove_connection(&key, &id);
1208
1209            let Some(queue) = self.connection_queues.get_mut(&key) else {
1210                return;
1211            };
1212            let Some(open_request) = queue.front_mut() else {
1213                return;
1214            };
1215            if let OpenRequest::Open {
1216                sender: _,
1217                db_name: _,
1218                version,
1219                id: _,
1220                pending_upgrade,
1221                pending_versionchange,
1222                pending_close,
1223            } = open_request
1224            {
1225                pending_close.remove(&id);
1226                (
1227                    // Note: need to exclude requests that have already started upgrading.
1228                    pending_close.is_empty() &&
1229                        pending_versionchange.is_empty() &&
1230                        !pending_upgrade.is_some(),
1231                    *version,
1232                )
1233            } else {
1234                (false, None)
1235            }
1236        };
1237
1238        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1239        // Step 10.3: Wait for all of the events to be fired.
1240        // Step 10.5: Wait until all connections in openConnections are closed.
1241        // Note: both conditions must be checked here,
1242        // because that is the condition enabling the upgrade to proceed.
1243        if can_upgrade {
1244            // Step 10.6: Run upgrade a database using connection, version and request.
1245            let Some(version) = version else {
1246                return debug_assert!(
1247                    false,
1248                    "An upgrade version should have been determined by now."
1249                );
1250            };
1251            self.upgrade_database(key.clone(), version);
1252
1253            let was_pruned = self.maybe_remove_front_from_queue(&key);
1254            if was_pruned {
1255                self.advance_connection_queue(key);
1256            }
1257        }
1258    }
1259
1260    fn handle_sync_operation(&mut self, operation: SyncOperation) {
1261        match operation {
1262            SyncOperation::GetDatabases(sender, origin) => {
1263                // The in-parallel steps of https://www.w3.org/TR/IndexedDB/#dom-idbfactory-databases
1264
1265                // Step 4.1 Let databases be the set of databases in storageKey.
1266                // If this cannot be determined for any reason,
1267                // then queue a database task to reject p with an appropriate error
1268                // (e.g. an "UnknownError" DOMException) and terminate these steps.
1269                // TODO: separate database and connection concepts.
1270                // For now using `self.databases`, which track connections.
1271
1272                // Step 4.2: Let result be a new list.
1273                let info_list: Vec<DatabaseInfo> = self
1274                    .databases
1275                    .iter()
1276                    .filter_map(|(description, info)| {
1277                        // Step 4.3: For each db of databases:
1278                        if let Ok(version) = info.version() {
1279                            // Step 4.3.4: If db’s version is 0, then continue.
1280                            if version == 0 {
1281                                None
1282                            } else {
1283                                // Step 4.3.5: Let info be a new IDBDatabaseInfo dictionary.
1284                                // Step 4.3.6: Set info’s name dictionary member to db’s name.
1285                                // Step 4.3.7: Set info’s version dictionary member to db’s version.
1286                                // Step 4.3.8: Append info to result.
1287                                if description.origin == origin {
1288                                    Some(DatabaseInfo {
1289                                        name: description.name.clone(),
1290                                        version,
1291                                    })
1292                                } else {
1293                                    None
1294                                }
1295                            }
1296                        } else {
1297                            None
1298                        }
1299                    })
1300                    .collect();
1301
1302                // Note: if anything went wrong, we reply with an error.
1303                let result = if info_list.len() == self.databases.len() {
1304                    Ok(info_list)
1305                } else {
1306                    Err(BackendError::DbErr(
1307                        "Unknown error getting database info.".to_string(),
1308                    ))
1309                };
1310
1311                // Step 4.4: Queue a database task to resolve p with result.
1312                if sender.send(result).is_err() {
1313                    debug!("Couldn't send SyncOperation::GetDatabases reply.");
1314                }
1315            },
1316            SyncOperation::CloseDatabase(origin, id, db_name) => {
1317                self.close_database(origin, id, db_name);
1318            },
1319            SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1320                self.open_a_database_connection(sender, origin, db_name, version, id);
1321            },
1322            SyncOperation::AbortPendingUpgrades {
1323                pending_upgrades,
1324                origin,
1325            } => {
1326                self.abort_pending_upgrades(pending_upgrades, origin);
1327            },
1328            SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1329                self.abort_pending_upgrade(name, id, origin);
1330            },
1331            SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1332                let idb_description = IndexedDBDescription {
1333                    origin,
1334                    name: db_name,
1335                };
1336                self.start_delete_database(idb_description, id, callback);
1337            },
1338            SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1339                // FIXME:(arihant2math) Should we error out more aggressively here?
1340                let result = self
1341                    .get_database(origin, db_name)
1342                    .map(|db| IndexedDBObjectStore {
1343                        key_path: db.key_path(&store_name),
1344                        has_key_generator: db.has_key_generator(&store_name),
1345                        indexes: db.indexes(&store_name).unwrap_or_default(),
1346                        name: store_name,
1347                    });
1348                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1349            },
1350            SyncOperation::CreateIndex(
1351                origin,
1352                db_name,
1353                store_name,
1354                index_name,
1355                key_path,
1356                unique,
1357                multi_entry,
1358            ) => {
1359                if let Some(db) = self.get_database(origin, db_name) {
1360                    let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1361                }
1362            },
1363            SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
1364                if let Some(db) = self.get_database(origin, db_name) {
1365                    let _ = db.delete_index(&store_name, index_name);
1366                }
1367            },
1368            SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
1369                // FIXME:(arihant2math) This does nothing at the moment
1370                let _ = sender.send(Ok(()));
1371            },
1372            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
1373                if let Some(db) = self.get_database_mut(origin, db_name) {
1374                    if version > db.version().unwrap_or(0) {
1375                        let _ = db.set_version(version);
1376                    }
1377                    // erroring out if the version is not upgraded can be and non-replicable
1378                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1379                } else {
1380                    let _ = sender.send(Err(BackendError::DbNotFound));
1381                }
1382            },
1383            SyncOperation::CreateObjectStore(
1384                sender,
1385                origin,
1386                db_name,
1387                store_name,
1388                key_paths,
1389                auto_increment,
1390            ) => {
1391                if let Some(db) = self.get_database_mut(origin, db_name) {
1392                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
1393                    let _ = sender.send(result.map_err(BackendError::from));
1394                } else {
1395                    let _ = sender.send(Err(BackendError::DbNotFound));
1396                }
1397            },
1398            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
1399                if let Some(db) = self.get_database_mut(origin, db_name) {
1400                    let result = db.delete_object_store(&store_name);
1401                    let _ = sender.send(result.map_err(BackendError::from));
1402                } else {
1403                    let _ = sender.send(Err(BackendError::DbNotFound));
1404                }
1405            },
1406            SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
1407                if let Some(db) = self.get_database_mut(origin, db_name) {
1408                    db.start_transaction(txn, Some(sender));
1409                } else {
1410                    let _ = sender.send(Err(BackendError::DbNotFound));
1411                }
1412            },
1413            SyncOperation::Version(sender, origin, db_name) => {
1414                if let Some(db) = self.get_database(origin, db_name) {
1415                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1416                } else {
1417                    let _ = sender.send(Err(BackendError::DbNotFound));
1418                }
1419            },
1420            SyncOperation::RegisterNewTxn(sender, _origin, _db_name) => {
1421                // Note: ignoring origin and name for now,
1422                // but those could be used again when implementing
1423                // lifecycle.
1424                let transaction_id = self.serial_number_counter;
1425                self.serial_number_counter += 1;
1426                let _ = sender.send(transaction_id);
1427            },
1428            SyncOperation::NotifyEndOfVersionChange {
1429                name,
1430                id,
1431                old_version,
1432                origin,
1433            } => {
1434                self.handle_version_change_done(name, id, old_version, origin);
1435            },
1436            SyncOperation::Exit(_) => {
1437                unreachable!("We must've already broken out of event loop.");
1438            },
1439        }
1440    }
1441}