storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use profile_traits::generic_callback::GenericCallback;
20use profile_traits::mem::{
21    ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, perform_memory_report,
22};
23use profile_traits::path;
24use rusqlite::Error as RusqliteError;
25use rustc_hash::{FxHashMap, FxHashSet};
26use servo_config::pref;
27use servo_url::origin::ImmutableOrigin;
28use storage_traits::indexeddb::{
29    AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
30    DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
31    SyncOperation, TxnCompleteMsg,
32};
33use uuid::Uuid;
34
35use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
36use crate::shared::is_sqlite_disk_full_error;
37
38pub trait IndexedDBThreadFactory {
39    fn new(config_dir: Option<PathBuf>, mem_profiler_chan: MemProfilerChan) -> Self;
40}
41
42impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
43    fn new(
44        config_dir: Option<PathBuf>,
45        mem_profiler_chan: MemProfilerChan,
46    ) -> GenericSender<IndexedDBThreadMsg> {
47        let (chan, port) = generic_channel::channel().unwrap();
48        let chan2 = chan.clone();
49
50        let mut idb_base_dir = PathBuf::new();
51        if let Some(p) = config_dir {
52            idb_base_dir.push(p);
53        }
54        idb_base_dir.push("IndexedDB");
55
56        let manager_sender = chan.clone();
57
58        thread::Builder::new()
59            .name("IndexedDBManager".to_owned())
60            .spawn(move || {
61                mem_profiler_chan.run_with_memory_reporting(
62                    || IndexedDBManager::new(port, manager_sender, idb_base_dir).start(),
63                    String::from("indexedDB-reporter"),
64                    chan2,
65                    IndexedDBThreadMsg::CollectMemoryReport,
66                );
67            })
68            .expect("Thread spawning failed");
69
70        chan
71    }
72}
73
74/// A key used to track databases.
75/// TODO: use a storage key.
76#[derive(Clone, Debug, Eq, Hash, MallocSizeOf, PartialEq)]
77pub struct IndexedDBDescription {
78    pub origin: ImmutableOrigin,
79    pub name: String,
80}
81
82impl IndexedDBDescription {
83    // randomly generated namespace for our purposes
84    const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
85        0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
86        0x5d,
87    ]);
88    // Converts the database description to a folder name where all
89    // data for this database is stored
90    pub(super) fn as_path(&self) -> PathBuf {
91        let mut path = PathBuf::new();
92
93        // uuid v5 is deterministic
94        let origin_uuid = Uuid::new_v5(
95            Self::NAMESPACE_SERVO_IDB,
96            self.origin.ascii_serialization().as_bytes(),
97        );
98        let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
99        path.push(origin_uuid.to_string());
100        path.push(db_name_uuid.to_string());
101
102        path
103    }
104}
105
106#[derive(MallocSizeOf)]
107struct TxnInfo {
108    created_seq: u64,
109    mode: IndexedDBTxnMode,
110    scope: HashSet<String>,
111    live: bool,
112}
113
114#[derive(MallocSizeOf)]
115struct IndexedDBEnvironment<E: KvsEngine> {
116    engine: E,
117    manager_sender: GenericSender<IndexedDBThreadMsg>,
118    transactions: FxHashMap<u64, KvsTransaction>,
119    next_created_seq: u64,
120    txn_info: FxHashMap<u64, TxnInfo>,
121    queued_readwrite: VecDeque<u64>,
122    queued_readonly: VecDeque<u64>,
123    // Fast membership checks + de-dup for queued_*.
124    queued_readwrite_set: FxHashSet<u64>,
125    queued_readonly_set: FxHashSet<u64>,
126    running_readwrite: Option<u64>,
127    running_readonly: HashSet<u64>,
128    handled_next_unhandled_request_id: FxHashMap<u64, u64>,
129    handled_pending: FxHashMap<u64, HashSet<u64>>,
130    pending_commit_callbacks: FxHashMap<u64, Vec<GenericCallback<TxnCompleteMsg>>>,
131}
132
133impl<E: KvsEngine> IndexedDBEnvironment<E> {
134    fn new(
135        engine: E,
136        manager_sender: GenericSender<IndexedDBThreadMsg>,
137    ) -> IndexedDBEnvironment<E> {
138        IndexedDBEnvironment {
139            engine,
140            manager_sender,
141            transactions: FxHashMap::default(),
142            next_created_seq: 0,
143            txn_info: FxHashMap::default(),
144            queued_readwrite: VecDeque::new(),
145            queued_readonly: VecDeque::new(),
146            queued_readwrite_set: FxHashSet::default(),
147            queued_readonly_set: FxHashSet::default(),
148            running_readwrite: None,
149            running_readonly: HashSet::new(),
150            handled_next_unhandled_request_id: FxHashMap::default(),
151            handled_pending: FxHashMap::default(),
152            pending_commit_callbacks: FxHashMap::default(),
153        }
154    }
155
156    fn register_transaction(&mut self, txn: u64, mode: IndexedDBTxnMode, scope: Vec<String>) {
157        if self.txn_info.contains_key(&txn) {
158            return;
159        }
160        let created_seq = self.next_created_seq;
161        self.next_created_seq += 1;
162        self.txn_info.insert(
163            txn,
164            TxnInfo {
165                created_seq,
166                mode: mode.clone(),
167                scope: scope.into_iter().collect(),
168                live: true,
169            },
170        );
171        self.transactions
172            .entry(txn)
173            .or_insert_with(|| KvsTransaction {
174                requests: VecDeque::new(),
175                mode,
176            });
177    }
178
179    fn scopes_overlap(a: &TxnInfo, b: &TxnInfo) -> bool {
180        a.scope.iter().any(|store| b.scope.contains(store))
181    }
182
183    fn earlier_overlapping_live_exists<F>(&self, txn: u64, predicate: F) -> bool
184    where
185        F: Fn(&TxnInfo) -> bool,
186    {
187        let Some(current) = self.txn_info.get(&txn) else {
188            return false;
189        };
190        self.txn_info.iter().any(|(other_txn, other)| {
191            *other_txn != txn &&
192                other.live &&
193                other.created_seq < current.created_seq &&
194                Self::scopes_overlap(current, other) &&
195                predicate(other)
196        })
197    }
198
199    fn can_start_by_spec(&self, txn: u64) -> bool {
200        let Some(info) = self.txn_info.get(&txn) else {
201            return false;
202        };
203        match info.mode {
204            IndexedDBTxnMode::Readonly => !self.earlier_overlapping_live_exists(txn, |other| {
205                other.mode != IndexedDBTxnMode::Readonly
206            }),
207            IndexedDBTxnMode::Readwrite | IndexedDBTxnMode::Versionchange => {
208                !self.earlier_overlapping_live_exists(txn, |_other| true)
209            },
210        }
211    }
212
213    fn enqueue_txn(&mut self, txn: u64, mode: &IndexedDBTxnMode) {
214        match mode {
215            IndexedDBTxnMode::Readonly => {
216                if self.queued_readonly_set.insert(txn) {
217                    self.queued_readonly.push_back(txn);
218                }
219            },
220            _ => {
221                if self.queued_readwrite_set.insert(txn) {
222                    self.queued_readwrite.push_back(txn);
223                }
224            },
225        }
226    }
227
228    fn queue_operation(
229        &mut self,
230        store_name: &str,
231        serial_number: u64,
232        mode: IndexedDBTxnMode,
233        operation: AsyncOperation,
234    ) {
235        let mut enqueue_mode = None;
236        match self.transactions.entry(serial_number) {
237            Entry::Occupied(mut entry) => {
238                let transaction = entry.get_mut();
239                let transaction_mode = transaction.mode.clone();
240                let was_empty = transaction.requests.is_empty();
241                transaction.requests.push_back(KvsOperation {
242                    operation,
243                    store_name: String::from(store_name),
244                });
245                if was_empty {
246                    // If the queue was empty and we just added work, we must enqueue the txn
247                    // even if it is currently running a prior batch. The next batch will run
248                    // after EngineTxnBatchComplete triggers scheduling again.
249                    enqueue_mode = Some(transaction_mode);
250                }
251            },
252            Entry::Vacant(entry) => {
253                entry
254                    .insert(KvsTransaction {
255                        requests: VecDeque::new(),
256                        mode: mode.clone(),
257                    })
258                    .requests
259                    .push_back(KvsOperation {
260                        operation,
261                        store_name: String::from(store_name),
262                    });
263                enqueue_mode = Some(mode);
264            },
265        };
266        if let Some(mode) = enqueue_mode {
267            self.enqueue_txn(serial_number, &mode);
268        }
269    }
270
271    /// <https://w3c.github.io/IndexedDB/#transaction-scheduling>
272    fn schedule_transactions(&mut self, origin: ImmutableOrigin, db_name: &str) {
273        let readonly_len = self.queued_readonly.len();
274        for _ in 0..readonly_len {
275            let Some(txn) = self.queued_readonly.pop_front() else {
276                break;
277            };
278
279            self.queued_readonly_set.remove(&txn);
280
281            let Some(transaction) = self.transactions.get(&txn) else {
282                continue;
283            };
284            if self.running_readonly.contains(&txn) {
285                if !transaction.requests.is_empty() && self.queued_readonly_set.insert(txn) {
286                    self.queued_readonly.push_back(txn);
287                }
288                continue;
289            }
290            if transaction.requests.is_empty() {
291                continue;
292            }
293            if !self.can_start_by_spec(txn) {
294                if self.queued_readonly_set.insert(txn) {
295                    self.queued_readonly.push_back(txn);
296                }
297                continue;
298            }
299
300            // Mark running and start async.
301            // DO NOT clear here; clear on EngineTxnBatchComplete.
302            self.running_readonly.insert(txn);
303            self.start_transaction(origin.clone(), db_name.to_string(), txn, None);
304        }
305
306        // Start at most one readwrite txn.
307        if self.running_readwrite.is_some() {
308            return;
309        }
310        let readwrite_len = self.queued_readwrite.len();
311        for _ in 0..readwrite_len {
312            let Some(txn) = self.queued_readwrite.pop_front() else {
313                break;
314            };
315
316            self.queued_readwrite_set.remove(&txn);
317
318            let Some(transaction) = self.transactions.get(&txn) else {
319                continue;
320            };
321
322            if self.running_readwrite == Some(txn) {
323                if !transaction.requests.is_empty() && self.queued_readwrite_set.insert(txn) {
324                    self.queued_readwrite.push_back(txn);
325                }
326                continue;
327            }
328
329            if transaction.requests.is_empty() {
330                continue;
331            }
332            if !self.can_start_by_spec(txn) {
333                if self.queued_readwrite_set.insert(txn) {
334                    self.queued_readwrite.push_back(txn);
335                }
336                continue;
337            }
338
339            // Mark running and start async.
340            // DO NOT clear here; clear on EngineTxnBatchComplete.
341            self.running_readwrite = Some(txn);
342            self.start_transaction(origin, db_name.to_string(), txn, None);
343            return;
344        }
345    }
346
347    // Executes all requests for a transaction (without committing)
348    fn start_transaction(
349        &mut self,
350        origin: ImmutableOrigin,
351        db_name: String,
352        txn: u64,
353        sender: Option<GenericSender<BackendResult<()>>>,
354    ) {
355        // https://w3c.github.io/IndexedDB/#transaction-lifetime
356        // Take the current queued batch of requests for this txn.
357        // If more requests arrive while the engine is running, they’ll be queued into
358        // transaction.requests and scheduled in a later batch once we receive EngineTxnBatchComplete.
359        let (mode, requests) = match self.transactions.get_mut(&txn) {
360            Some(transaction) => {
361                let mode = transaction.mode.clone();
362                let requests = std::mem::take(&mut transaction.requests);
363                (mode, requests)
364            },
365            None => {
366                // If a manual starter is waiting, treat as "nothing to do".
367                if let Some(sender) = sender {
368                    let _ = sender.send(Ok(()));
369                }
370                return;
371            },
372        };
373
374        if requests.is_empty() {
375            if let Some(sender) = sender {
376                let _ = sender.send(Ok(()));
377            }
378            // Important: if there was no work, do NOT send EngineTxnBatchComplete,
379            // otherwise we can create a pointless reschedule loop.
380            return;
381        }
382
383        let manager_sender = self.manager_sender.clone();
384        self.engine.process_transaction(
385            KvsTransaction { mode, requests },
386            Box::new(move || {
387                // Notify the manager thread when the engine finishes so it can:
388                // - clear running_readonly / running_readwrite
389                // - re-run scheduling (maybe start next queued txn or next batch)
390                if let Err(err) = manager_sender.send(IndexedDBThreadMsg::EngineTxnBatchComplete {
391                    origin,
392                    db_name,
393                    txn,
394                }) {
395                    error!(
396                        "Failed to send IndexedDBThreadMsg::EngineTxnBatchComplete: {:?}",
397                        err
398                    );
399                    // TODO: Prevent backend/manager shutdown while a transaction batch is
400                    // running so completion notification cannot be dropped in normal flow.
401                }
402
403                // We have a sender if the transaction is started manually, and they
404                // probably want to know when it is finished.
405                if let Some(sender) = sender {
406                    let _ = sender.send(Ok(()));
407                }
408            }),
409        );
410    }
411
412    fn mark_request_handled(&mut self, txn: u64, request_id: u64) {
413        let current = self
414            .handled_next_unhandled_request_id
415            .get(&txn)
416            .copied()
417            .unwrap_or(0);
418        if request_id == current {
419            let mut next = current + 1;
420            if let Some(pending) = self.handled_pending.get_mut(&txn) {
421                while pending.remove(&next) {
422                    next += 1;
423                }
424                if pending.is_empty() {
425                    self.handled_pending.remove(&txn);
426                }
427            }
428            self.handled_next_unhandled_request_id.insert(txn, next);
429        } else if request_id > current {
430            self.handled_pending
431                .entry(txn)
432                .or_default()
433                .insert(request_id);
434        }
435    }
436
437    fn can_notify_txn_maybe_commit(&self, txn: u64) -> bool {
438        if self.running_readwrite == Some(txn) || self.running_readonly.contains(&txn) {
439            return false;
440        }
441        // Avoid if the txn is still queued or has queued operations.
442        if self.queued_readonly_set.contains(&txn) || self.queued_readwrite_set.contains(&txn) {
443            return false;
444        }
445        match self.transactions.get(&txn) {
446            Some(t) => t.requests.is_empty(),
447            None => true,
448        }
449    }
450
451    fn can_commit_now(&self, txn: u64) -> bool {
452        self.can_start_by_spec(txn) && self.can_notify_txn_maybe_commit(txn)
453    }
454
455    fn queue_pending_commit_callback(
456        &mut self,
457        txn: u64,
458        callback: GenericCallback<TxnCompleteMsg>,
459    ) {
460        self.pending_commit_callbacks
461            .entry(txn)
462            .or_default()
463            .push(callback);
464    }
465
466    fn take_pending_commit_callbacks(&mut self, txn: u64) -> Vec<GenericCallback<TxnCompleteMsg>> {
467        self.pending_commit_callbacks
468            .remove(&txn)
469            .unwrap_or_default()
470    }
471
472    fn maybe_commit_candidates(&self) -> Vec<u64> {
473        let mut candidates: Vec<(u64, u64)> = self
474            .txn_info
475            .iter()
476            .filter_map(|(txn, info)| {
477                if !info.live || !self.can_commit_now(*txn) {
478                    return None;
479                }
480                Some((*txn, info.created_seq))
481            })
482            .collect();
483        candidates.sort_by_key(|(_, created_seq)| *created_seq);
484        candidates.into_iter().map(|(txn, _)| txn).collect()
485    }
486
487    fn finish_transaction(&mut self, txn: u64) {
488        if let Some(info) = self.txn_info.get_mut(&txn) {
489            info.live = false;
490        }
491        self.txn_info.remove(&txn);
492        self.transactions.remove(&txn);
493        self.queued_readonly.retain(|queued| *queued != txn);
494        self.queued_readwrite.retain(|queued| *queued != txn);
495        self.queued_readonly_set.remove(&txn);
496        self.queued_readwrite_set.remove(&txn);
497        if self.running_readwrite == Some(txn) {
498            self.running_readwrite = None;
499        }
500        self.running_readonly.remove(&txn);
501        self.handled_next_unhandled_request_id.remove(&txn);
502        self.handled_pending.remove(&txn);
503        self.pending_commit_callbacks.remove(&txn);
504    }
505
506    fn abort_transaction(&mut self, txn: u64) {
507        // Keep scheduling metadata until script reports TransactionFinished.
508        // https://w3c.github.io/IndexedDB/#transaction-lifetime
509        self.transactions.remove(&txn);
510        self.queued_readonly.retain(|queued| *queued != txn);
511        self.queued_readwrite.retain(|queued| *queued != txn);
512        self.queued_readonly_set.remove(&txn);
513        self.queued_readwrite_set.remove(&txn);
514        if self.running_readwrite == Some(txn) {
515            self.running_readwrite = None;
516        }
517        self.running_readonly.remove(&txn);
518        self.handled_next_unhandled_request_id.remove(&txn);
519        self.handled_pending.remove(&txn);
520        self.pending_commit_callbacks.remove(&txn);
521    }
522
523    fn key_generator_current_number(&self, store_name: &str) -> Option<i32> {
524        self.engine.key_generator_current_number(store_name)
525    }
526
527    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
528        self.engine.key_path(store_name)
529    }
530
531    fn object_store_names(&self) -> DbResult<Vec<String>> {
532        self.engine
533            .object_store_names()
534            .map_err(|err| format!("{err:?}"))
535    }
536
537    fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
538        self.engine
539            .indexes(store_name)
540            .map_err(|err| format!("{err:?}"))
541    }
542
543    fn create_index(
544        &self,
545        store_name: &str,
546        index_name: String,
547        key_path: KeyPath,
548        unique: bool,
549        multi_entry: bool,
550    ) -> DbResult<CreateObjectResult> {
551        self.engine
552            .create_index(store_name, index_name, key_path, unique, multi_entry)
553            .map_err(|err| format!("{err:?}"))
554    }
555
556    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
557        self.engine
558            .delete_index(store_name, index_name)
559            .map_err(|err| format!("{err:?}"))
560    }
561
562    fn create_object_store(
563        &mut self,
564        store_name: &str,
565        key_path: Option<KeyPath>,
566        auto_increment: bool,
567    ) -> DbResult<CreateObjectResult> {
568        self.engine
569            .create_store(store_name, key_path, auto_increment)
570            .map_err(|err| format!("{err:?}"))
571    }
572
573    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
574        // IndexedDB §5.5: "For upgrade transactions this includes changes to the set of object stores and indexes".
575        // Delete index metadata first, then remove the store metadata row.
576        let indexes = self
577            .engine
578            .indexes(store_name)
579            .map_err(|err| format!("{err:?}"))?;
580        for index in indexes {
581            self.engine
582                .delete_index(store_name, index.name)
583                .map_err(|err| format!("{err:?}"))?;
584        }
585        self.engine
586            .delete_store(store_name)
587            .map_err(|err| format!("{err:?}"))
588    }
589
590    fn delete_database(self) -> BackendResult<()> {
591        let result = self.engine.delete_database();
592        result
593            .map_err(|err| format!("{err:?}"))
594            .map_err(BackendError::from)
595    }
596
597    fn version(&self) -> Result<u64, E::Error> {
598        self.engine.version()
599    }
600
601    fn set_version(&mut self, version: u64) -> DbResult<()> {
602        self.engine
603            .set_version(version)
604            .map_err(|err| format!("{err:?}"))
605    }
606}
607
608fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
609    if is_sqlite_disk_full_error(&err) {
610        BackendError::QuotaExceeded
611    } else {
612        BackendError::DbErr(format!("{err:?}"))
613    }
614}
615
616/// <https://w3c.github.io/IndexedDB/#request-open-request>
617/// Used here to implement the
618/// <https://w3c.github.io/IndexedDB/#connection-queue>
619#[derive(MallocSizeOf)]
620enum OpenRequest {
621    Open {
622        /// The callback used to send a result to script.
623        sender: GenericCallback<ConnectionMsg>,
624
625        /// The name of the database.
626        db_name: String,
627
628        /// Optionnaly, a requested db version.
629        /// Note: when the open algorithm starts, this will be mutated and set to something as per the algo.
630        version: Option<u64>,
631
632        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
633        processed: bool,
634
635        /// Optionally, a version pending upgrade.
636        pending_upgrade: Option<VersionUpgrade>,
637
638        /// This request is pending on these connections to close.
639        pending_close: HashSet<Uuid>,
640
641        /// This request is pending on these connections to fire a versionchange event.
642        /// Note: This starts as equal to `pending_close`, but when all events have fired,
643        /// not all connections need to have closed, in which case the `blocked` event
644        /// is fired on this request.
645        pending_versionchange: HashSet<Uuid>,
646
647        id: Uuid,
648    },
649    Delete {
650        /// The callback used to send a result to script.
651        sender: GenericCallback<BackendResult<u64>>,
652
653        /// The origin of the request.
654        /// TODO: storage key.
655        /// Note: will be used when the full spec is implemented.
656        _origin: ImmutableOrigin,
657
658        /// The name of the database.
659        /// Note: will be used when the full spec is implemented.
660        _db_name: String,
661
662        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
663        processed: bool,
664
665        id: Uuid,
666    },
667}
668
669impl OpenRequest {
670    fn get_id(&self) -> Uuid {
671        let id = match self {
672            OpenRequest::Open {
673                sender: _,
674                db_name: _,
675                version: _,
676                processed: _,
677                pending_upgrade: _,
678                pending_close: _,
679                pending_versionchange: _,
680                id,
681            } => id,
682            OpenRequest::Delete {
683                sender: _,
684                _origin: _,
685                _db_name: _,
686                processed: _,
687                id,
688            } => id,
689        };
690        *id
691    }
692
693    fn is_open(&self) -> bool {
694        match self {
695            OpenRequest::Open {
696                sender: _,
697                db_name: _,
698                version: _,
699                processed: _,
700                pending_upgrade: _,
701                pending_close: _,
702                pending_versionchange: _,
703                id: _,
704            } => true,
705            OpenRequest::Delete {
706                sender: _,
707                _origin: _,
708                _db_name: _,
709                processed: _,
710                id: _,
711            } => false,
712        }
713    }
714
715    /// An open request remains pending until it has been processed,
716    /// and while waiting on upgrade completion or other connections.
717    fn is_pending(&self) -> bool {
718        match self {
719            OpenRequest::Open {
720                sender: _,
721                db_name: _,
722                version: _,
723                processed,
724                pending_upgrade,
725                pending_close,
726                pending_versionchange,
727                id: _,
728            } => {
729                !processed ||
730                    pending_upgrade.is_some() ||
731                    !pending_close.is_empty() ||
732                    !pending_versionchange.is_empty()
733            },
734            OpenRequest::Delete {
735                sender: _,
736                _origin: _,
737                _db_name: _,
738                processed,
739                id: _,
740            } => !processed,
741        }
742    }
743
744    /// Abort the open request,
745    /// optionally returning a version to revert to.
746    fn abort(&self) -> Option<u64> {
747        match self {
748            OpenRequest::Open {
749                sender,
750                db_name,
751                version: _,
752                processed: _,
753                pending_close: _,
754                pending_versionchange: _,
755                pending_upgrade,
756                id,
757            } => {
758                if sender
759                    .send(ConnectionMsg::AbortError {
760                        name: db_name.clone(),
761                        id: *id,
762                    })
763                    .is_err()
764                {
765                    error!("Failed to send ConnectionMsg::Connection to script.");
766                };
767                pending_upgrade.as_ref().map(|upgrade| upgrade.old)
768            },
769            OpenRequest::Delete {
770                sender,
771                _origin: _,
772                _db_name: _,
773                processed: _,
774                id: _,
775            } => {
776                if sender.send(Err(BackendError::DbNotFound)).is_err() {
777                    error!("Failed to send result of database delete to script.");
778                };
779                None
780            },
781        }
782    }
783}
784
785#[derive(MallocSizeOf)]
786struct VersionUpgrade {
787    old: u64,
788    new: u64,
789    transaction: u64,
790}
791
792/// <https://w3c.github.io/IndexedDB/#connection>
793#[derive(MallocSizeOf)]
794struct Connection {
795    /// <https://w3c.github.io/IndexedDB/#connection-close-pending-flag>
796    close_pending: bool,
797
798    /// The callback used to send a result to script.
799    sender: GenericCallback<ConnectionMsg>,
800}
801
802struct IndexedDBManager {
803    port: GenericReceiver<IndexedDBThreadMsg>,
804    manager_sender: GenericSender<IndexedDBThreadMsg>,
805    idb_base_dir: PathBuf,
806    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
807    thread_pool: Arc<ThreadPool>,
808
809    /// A global counter to produce unique transaction ids.
810    /// TODO: remove once db connections lifecyle is managed.
811    /// A global counter is only necessary because of how deleting a db currently
812    /// does not wait for connection to close and transactions to finish.
813    serial_number_counter: u64,
814
815    /// <https://w3c.github.io/IndexedDB/#connection-queue>
816    connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
817
818    /// <https://w3c.github.io/IndexedDB/#connection>
819    connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
820}
821
822impl IndexedDBManager {
823    fn new(
824        port: GenericReceiver<IndexedDBThreadMsg>,
825        manager_sender: GenericSender<IndexedDBThreadMsg>,
826        idb_base_dir: PathBuf,
827    ) -> IndexedDBManager {
828        debug!("New indexedDBManager");
829
830        // Uses an estimate of the system cpus to process IndexedDB transactions
831        // See https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html
832        // If no information can be obtained about the system, uses 4 threads as a default
833        let thread_count = thread::available_parallelism()
834            .map(|i| i.get())
835            .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
836            .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
837
838        IndexedDBManager {
839            port,
840            manager_sender,
841            idb_base_dir,
842            databases: HashMap::new(),
843            thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
844            serial_number_counter: 0,
845            connection_queues: Default::default(),
846            connections: Default::default(),
847        }
848    }
849}
850
851impl IndexedDBManager {
852    fn start(&mut self) {
853        loop {
854            // FIXME:(arihant2math) No message *most likely* means that
855            // the ipc sender has been dropped, so we break the look
856            let message = match self.port.recv() {
857                Ok(msg) => msg,
858                Err(ReceiveError::Disconnected) => {
859                    break;
860                },
861                Err(e) => {
862                    warn!("Error in IndexedDB thread: {e:?}");
863                    continue;
864                },
865            };
866            match message {
867                IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
868                    let _ = sender.send(());
869                    break;
870                },
871                IndexedDBThreadMsg::Sync(operation) => {
872                    self.handle_sync_operation(operation);
873                },
874                IndexedDBThreadMsg::Async(
875                    origin,
876                    db_name,
877                    store_name,
878                    txn,
879                    _request_id,
880                    mode,
881                    operation,
882                ) => {
883                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
884                        // Queues an operation for a transaction without starting it
885                        db.queue_operation(&store_name, txn, mode, operation);
886                        db.schedule_transactions(origin, &db_name);
887                    }
888                },
889                IndexedDBThreadMsg::EngineTxnBatchComplete {
890                    origin,
891                    db_name,
892                    txn,
893                } => {
894                    let should_notify =
895                        if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
896                            // Decide which running flag to clear based on txn mode.
897                            let mode = db.transactions.get(&txn).map(|t| t.mode.clone());
898
899                            match mode {
900                                Some(IndexedDBTxnMode::Readonly) => {
901                                    db.running_readonly.remove(&txn);
902                                },
903                                Some(_) => {
904                                    if db.running_readwrite == Some(txn) {
905                                        db.running_readwrite = None;
906                                    }
907                                },
908                                None => {
909                                    // txn might have been aborted/removed; nothing to clear
910                                },
911                            }
912
913                            // If more requests were queued while this batch was running,
914                            // schedule again now.
915                            db.schedule_transactions(origin.clone(), &db_name);
916                            db.can_notify_txn_maybe_commit(txn)
917                        } else {
918                            false
919                        };
920                    if should_notify {
921                        self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
922                            origin,
923                            db_name,
924                            txn,
925                        });
926                    }
927                },
928                IndexedDBThreadMsg::CollectMemoryReport(sender) => {
929                    let reports = self.collect_memory_reports();
930                    sender.send(ProcessReports::new(reports));
931                },
932            }
933        }
934    }
935
936    fn dispatch_txn_maybe_commit(&self, origin: ImmutableOrigin, db_name: String, txn: u64) {
937        let key = IndexedDBDescription {
938            origin,
939            name: db_name.clone(),
940        };
941        let Some(connections) = self.connections.get(&key) else {
942            return;
943        };
944        for connection in connections.values() {
945            if connection.close_pending {
946                continue;
947            }
948            let _ = connection.sender.send(ConnectionMsg::TxnMaybeCommit {
949                db_name: db_name.clone(),
950                txn,
951            });
952        }
953    }
954
955    fn handle_txn_maybe_commit(&mut self, origin: ImmutableOrigin, db_name: String, txn: u64) {
956        let key = IndexedDBDescription {
957            origin: origin.clone(),
958            name: db_name.clone(),
959        };
960        let callbacks = {
961            let Some(db) = self.databases.get_mut(&key) else {
962                return;
963            };
964            if !db.can_commit_now(txn) {
965                return;
966            }
967            db.take_pending_commit_callbacks(txn)
968        };
969
970        if callbacks.is_empty() {
971            self.dispatch_txn_maybe_commit(origin, db_name, txn);
972            return;
973        }
974
975        for callback in callbacks {
976            if callback
977                .send(TxnCompleteMsg {
978                    origin: origin.clone(),
979                    db_name: db_name.clone(),
980                    txn,
981                    result: Ok(()),
982                })
983                .is_err()
984            {
985                error!(
986                    "Failed to send deferred commit completion for db '{}' txn {}.",
987                    db_name, txn
988                );
989            }
990        }
991    }
992
993    /// Handle when an upgrade transaction finishes in script.
994    fn handle_upgrade_transaction_finished(
995        &mut self,
996        name: String,
997        origin: ImmutableOrigin,
998        txn: u64,
999        committed: bool,
1000    ) {
1001        let key = IndexedDBDescription {
1002            name: name.clone(),
1003            origin: origin.clone(),
1004        };
1005
1006        if committed {
1007            let Some(queue) = self.connection_queues.get_mut(&key) else {
1008                return debug_assert!(false, "A connection queue should exist.");
1009            };
1010            let Some(front) = queue.front() else {
1011                return debug_assert!(false, "A pending open request should exist.");
1012            };
1013            let OpenRequest::Open {
1014                pending_upgrade: Some(pending_upgrade),
1015                ..
1016            } = front
1017            else {
1018                return;
1019            };
1020            if pending_upgrade.transaction != txn {
1021                return;
1022            }
1023
1024            let Some(open_request) = queue.pop_front() else {
1025                return;
1026            };
1027            let OpenRequest::Open {
1028                sender,
1029                db_name,
1030                version: _,
1031                processed: _,
1032                pending_upgrade: Some(pending_upgrade),
1033                pending_close: _,
1034                pending_versionchange: _,
1035                id,
1036            } = open_request
1037            else {
1038                return;
1039            };
1040            let VersionUpgrade { new, .. } = pending_upgrade;
1041            let object_store_names = self
1042                .databases
1043                .get(&key)
1044                .and_then(|db| db.object_store_names().ok())
1045                .unwrap_or_default();
1046            if sender
1047                .send(ConnectionMsg::Connection {
1048                    id,
1049                    name: db_name,
1050                    version: new,
1051                    upgraded: true,
1052                    object_store_names,
1053                })
1054                .is_err()
1055            {
1056                error!("Failed to send ConnectionMsg::Connection to script.");
1057            };
1058
1059            self.advance_connection_queue(key);
1060            return;
1061        }
1062
1063        let request_id = {
1064            let Some(queue) = self.connection_queues.get_mut(&key) else {
1065                return debug_assert!(false, "A connection queue should exist.");
1066            };
1067            let Some(front) = queue.front() else {
1068                return debug_assert!(false, "A pending open request should exist.");
1069            };
1070            let OpenRequest::Open {
1071                pending_upgrade: Some(pending_upgrade),
1072                id,
1073                ..
1074            } = front
1075            else {
1076                return;
1077            };
1078            if pending_upgrade.transaction != txn {
1079                return;
1080            }
1081            *id
1082        };
1083
1084        self.abort_pending_upgrade(name, request_id, origin);
1085    }
1086
1087    /// Run the next open request in the queue.
1088    fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
1089        loop {
1090            let is_open = {
1091                let Some(queue) = self.connection_queues.get_mut(&key) else {
1092                    return;
1093                };
1094                if queue.is_empty() {
1095                    return;
1096                }
1097                queue.front().expect("Queue is not empty.").is_open()
1098            };
1099
1100            if is_open {
1101                self.open_database(key.clone());
1102            } else {
1103                self.delete_database(key.clone());
1104            }
1105
1106            let was_pruned = self.maybe_remove_front_from_queue(&key);
1107
1108            if !was_pruned {
1109                // Note: requests to delete a database are, at this point in the implementation,
1110                // done in one step; so we can continue on to the next request.
1111                // Request to open a connection consists of multiple async steps, so we must break if
1112                // it is still pending.
1113                break;
1114            }
1115        }
1116    }
1117
1118    /// Remove the record at the front if it is not pending.
1119    fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
1120        let (is_empty, was_pruned) = {
1121            let Some(queue) = self.connection_queues.get_mut(key) else {
1122                debug_assert!(false, "A connection queue should exist.");
1123                return false;
1124            };
1125            let mut pruned = false;
1126            let front_is_pending = queue.front().map(|record| record.is_pending());
1127            if let Some(is_pending) = front_is_pending {
1128                if !is_pending {
1129                    queue.pop_front().expect("Queue has a non-pending item.");
1130                    pruned = true
1131                }
1132            }
1133            (queue.is_empty(), pruned)
1134        };
1135        if is_empty {
1136            self.connection_queues.remove(key);
1137        }
1138        was_pruned
1139    }
1140
1141    fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
1142        let is_empty = {
1143            let Some(connections) = self.connections.get_mut(key) else {
1144                return debug!("Connection already removed.");
1145            };
1146            connections.remove(id);
1147            connections.is_empty()
1148        };
1149
1150        if is_empty {
1151            self.connections.remove(key);
1152        }
1153    }
1154
1155    /// Aborting the current upgrade for an origin.
1156    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1157    /// Note: this only reverts the version at this point.
1158    fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
1159        let key = IndexedDBDescription {
1160            name,
1161            origin: origin.clone(),
1162        };
1163        let old = {
1164            let Some(queue) = self.connection_queues.get_mut(&key) else {
1165                return debug_assert!(
1166                    false,
1167                    "There should be a connection queue for the aborted upgrade."
1168                );
1169            };
1170            let Some(open_request) = queue.pop_front() else {
1171                return debug_assert!(false, "There should be an open request to upgrade.");
1172            };
1173            if open_request.get_id() != id {
1174                return debug_assert!(
1175                    false,
1176                    "Open request to abort should be at the head of the queue."
1177                );
1178            }
1179            open_request.abort()
1180        };
1181        if let Some(old_version) = old {
1182            if old_version == 0 {
1183                // IndexedDB §5.8 "Aborting an upgrade transaction" sets connection version to 0
1184                // for newly created databases; Servo also drops the just-created backend entry
1185                // so it is not observable via `indexedDB.databases()` after the abort.
1186                // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1187                // Invariant: aborting initial creation leaves no database entry behind.
1188                self.databases.remove(&key);
1189            } else {
1190                let Some(db) = self.databases.get_mut(&key) else {
1191                    return debug_assert!(false, "Db should have been created");
1192                };
1193                // Step 3: Set connection’s version to database’s version if database previously existed
1194                //  or 0 (zero) if database was newly created.
1195                let res = db.set_version(old_version);
1196                debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1197            }
1198        }
1199
1200        self.remove_connection(&key, &id);
1201
1202        self.advance_connection_queue(key);
1203    }
1204
1205    /// Aborting all upgrades for an origin
1206    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1207    /// Note: this only reverts the version at this point.
1208    fn abort_pending_upgrades(
1209        &mut self,
1210        pending_upgrades: HashMap<String, HashSet<Uuid>>,
1211        origin: ImmutableOrigin,
1212    ) {
1213        for (name, ids) in pending_upgrades.into_iter() {
1214            let mut version_to_revert: Option<u64> = None;
1215            let key = IndexedDBDescription {
1216                name,
1217                origin: origin.clone(),
1218            };
1219            for id in ids.iter() {
1220                self.remove_connection(&key, id);
1221            }
1222            {
1223                let is_empty = {
1224                    let Some(queue) = self.connection_queues.get_mut(&key) else {
1225                        continue;
1226                    };
1227                    queue.retain_mut(|open_request| {
1228                        if ids.contains(&open_request.get_id()) {
1229                            let old = open_request.abort();
1230                            if version_to_revert.is_none() {
1231                                if let Some(old) = old {
1232                                    version_to_revert = Some(old);
1233                                }
1234                            }
1235                            false
1236                        } else {
1237                            true
1238                        }
1239                    });
1240                    queue.is_empty()
1241                };
1242                if is_empty {
1243                    self.connection_queues.remove(&key);
1244                }
1245            }
1246            if let Some(version) = version_to_revert {
1247                if version == 0 {
1248                    // IndexedDB §5.8 "Aborting an upgrade transaction" sets connection version to 0
1249                    // for newly created databases; Servo also drops the just-created backend entry
1250                    // so it is not observable via `indexedDB.databases()` after the abort.
1251                    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1252                    // Invariant: aborted initial upgrades must not remain as version-0 databases.
1253                    self.databases.remove(&key);
1254                } else {
1255                    let Some(db) = self.databases.get_mut(&key) else {
1256                        return debug_assert!(false, "Db should have been created");
1257                    };
1258                    // Step 3: Set connection’s version to database’s version if database previously existed
1259                    //  or 0 (zero) if database was newly created.
1260                    let res = db.set_version(version);
1261                    debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1262                }
1263            }
1264        }
1265    }
1266
1267    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1268    fn open_a_database_connection(
1269        &mut self,
1270        sender: GenericCallback<ConnectionMsg>,
1271        origin: ImmutableOrigin,
1272        db_name: String,
1273        version: Option<u64>,
1274        id: Uuid,
1275    ) {
1276        let key = IndexedDBDescription {
1277            name: db_name.clone(),
1278            origin: origin.clone(),
1279        };
1280        let open_request = OpenRequest::Open {
1281            sender,
1282            db_name,
1283            version,
1284            processed: false,
1285            pending_close: Default::default(),
1286            pending_versionchange: Default::default(),
1287            pending_upgrade: None,
1288            id,
1289        };
1290        let should_continue = {
1291            // Step 1: Let queue be the connection queue for storageKey and name.
1292            let queue = self.connection_queues.entry(key.clone()).or_default();
1293
1294            // Step 2: Add request to queue.
1295            queue.push_back(open_request);
1296            queue.len() == 1
1297        };
1298
1299        // Step 3: Wait until all previous requests in queue have been processed.
1300        if should_continue {
1301            self.open_database(key.clone());
1302            self.maybe_remove_front_from_queue(&key);
1303        }
1304    }
1305
1306    fn get_database(
1307        &self,
1308        origin: ImmutableOrigin,
1309        db_name: String,
1310    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
1311        let idb_description = IndexedDBDescription {
1312            origin,
1313            name: db_name,
1314        };
1315
1316        self.databases.get(&idb_description)
1317    }
1318
1319    fn get_database_mut(
1320        &mut self,
1321        origin: ImmutableOrigin,
1322        db_name: String,
1323    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
1324        let idb_description = IndexedDBDescription {
1325            origin,
1326            name: db_name,
1327        };
1328
1329        self.databases.get_mut(&idb_description)
1330    }
1331
1332    /// <https://w3c.github.io/IndexedDB/#upgrade-a-database>
1333    /// To upgrade a database with connection (a connection),
1334    /// a new version, and a request, run these steps:
1335    fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
1336        let Some(queue) = self.connection_queues.get_mut(&key) else {
1337            return debug_assert!(false, "A connection queue should exist.");
1338        };
1339        let Some(open_request) = queue.front_mut() else {
1340            return debug_assert!(false, "An open request should be in the queue.");
1341        };
1342        let OpenRequest::Open {
1343            sender,
1344            db_name,
1345            version: _,
1346            processed,
1347            id,
1348            pending_close: _,
1349            pending_versionchange: _,
1350            pending_upgrade,
1351        } = open_request
1352        else {
1353            return;
1354        };
1355
1356        // Step 1: Let db be connection’s database.
1357        let db = self
1358            .databases
1359            .get_mut(&key)
1360            .expect("Db should have been opened.");
1361
1362        // Step 2: Let transaction be a new upgrade transaction with connection used as connection.
1363        let transaction = self.serial_number_counter;
1364        self.serial_number_counter += 1;
1365
1366        // Step 3: Set transaction’s scope to connection’s object store set.
1367        let scope = db
1368            .object_store_names()
1369            .expect("Fetching object store names should not fail.");
1370
1371        // Step 4: Set db’s upgrade transaction to transaction.
1372        // Backend tracks the active upgrade transaction in `pending_upgrade` below.
1373        db.register_transaction(transaction, IndexedDBTxnMode::Versionchange, scope.clone());
1374
1375        // Step 5: Set transaction’s state to inactive.
1376        // Step 6: Start transaction.
1377        // Backend transactions are started by the scheduler when requests are queued;
1378        // newly created upgrade transactions are therefore initially inactive.
1379
1380        // Step 7: Let old version be db’s version.
1381        let old_version = db.version().expect("DB should have a version.");
1382
1383        // Step 8: Set db’s version to version. This change is considered part of the
1384        // transaction, and so if the transaction is aborted, this change is reverted.
1385        db.set_version(new_version)
1386            .expect("Setting the version should not fail");
1387
1388        // Step 9: Set request’s processed flag to true.
1389        *processed = true;
1390        let _ = pending_upgrade.insert(VersionUpgrade {
1391            old: old_version,
1392            new: new_version,
1393            transaction,
1394        });
1395
1396        // Step 10: Queue a database task to run these steps.
1397        if sender
1398            .send(ConnectionMsg::Upgrade {
1399                id: *id,
1400                name: db_name.clone(),
1401                version: new_version,
1402                old_version,
1403                transaction,
1404                object_store_names: scope.clone(),
1405            })
1406            .is_err()
1407        {
1408            error!("Couldn't queue task for indexeddb upgrade event.");
1409        }
1410
1411        // Step 11: Wait for transaction to finish.
1412        // Queue progression remains blocked while `pending_upgrade` is set.
1413    }
1414
1415    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1416    fn handle_version_change_done(
1417        &mut self,
1418        name: String,
1419        from_id: Uuid,
1420        old_version: u64,
1421        origin: ImmutableOrigin,
1422    ) {
1423        let key = IndexedDBDescription {
1424            name: name.clone(),
1425            origin: origin.clone(),
1426        };
1427        let (can_upgrade, version) = {
1428            let Some(queue) = self.connection_queues.get_mut(&key) else {
1429                return debug_assert!(false, "A connection queue should exist.");
1430            };
1431            let Some(open_request) = queue.front_mut() else {
1432                return debug_assert!(false, "An open request should be in the queue.");
1433            };
1434            let OpenRequest::Open {
1435                sender,
1436                db_name: _,
1437                version,
1438                id,
1439                pending_upgrade: _,
1440                processed: _,
1441                pending_versionchange,
1442                pending_close,
1443            } = open_request
1444            else {
1445                return debug_assert!(
1446                    false,
1447                    "An request to open a connection should be in the queue."
1448                );
1449            };
1450            debug_assert!(
1451                pending_versionchange.contains(&from_id),
1452                "The open request should be pending on the versionchange event for the connection sending the message."
1453            );
1454
1455            pending_versionchange.remove(&from_id);
1456
1457            // Step 10.3: Wait for all of the events to be fired.
1458            if !pending_versionchange.is_empty() {
1459                return;
1460            }
1461
1462            let Some(version) = *version else {
1463                return debug_assert!(
1464                    false,
1465                    "An upgrade version should have been determined by now."
1466                );
1467            };
1468
1469            // Step 10.4: If any of the connections in openConnections are still not closed,
1470            // queue a database task to fire a version change event named blocked
1471            // at request with db’s version and version.
1472            if !pending_close.is_empty() &&
1473                sender
1474                    .send(ConnectionMsg::Blocked {
1475                        name,
1476                        id: *id,
1477                        version,
1478                        old_version,
1479                    })
1480                    .is_err()
1481            {
1482                return debug!("Script exit during indexeddb database open");
1483            }
1484
1485            (pending_close.is_empty(), version)
1486        };
1487
1488        // Step 10.5: Wait until all connections in openConnections are closed.
1489        // Note: if we still need to wait, the algorithm will continue in the handling of the close message.
1490        if can_upgrade {
1491            // Step 10.6: Run upgrade a database using connection, version and request.
1492            self.upgrade_database(key.clone(), version);
1493
1494            let was_pruned = self.maybe_remove_front_from_queue(&key);
1495            if was_pruned {
1496                self.advance_connection_queue(key);
1497            }
1498        }
1499    }
1500
1501    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1502    /// The part where the open request is ready for processing.
1503    fn open_database(&mut self, key: IndexedDBDescription) {
1504        let Some(queue) = self.connection_queues.get_mut(&key) else {
1505            return debug_assert!(false, "A connection queue should exist.");
1506        };
1507        let Some(open_request) = queue.front_mut() else {
1508            return debug_assert!(false, "An open request should be in the queue.");
1509        };
1510        let OpenRequest::Open {
1511            sender,
1512            db_name,
1513            version,
1514            id,
1515            processed,
1516            pending_upgrade: _pending_upgrade,
1517            pending_close,
1518            pending_versionchange,
1519        } = open_request
1520        else {
1521            return debug_assert!(
1522                false,
1523                "An request to open a connection should be in the queue."
1524            );
1525        };
1526
1527        let idb_base_dir = self.idb_base_dir.as_path();
1528        let requested_version = *version;
1529
1530        // Step 4: Let db be the database named name in origin, or null otherwise.
1531        let db_version = match self.databases.entry(key.clone()) {
1532            Entry::Vacant(e) => {
1533                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
1534                // Note: done below with the zero as first tuple item.
1535
1536                // https://www.w3.org/TR/IndexedDB/#open-a-database-connection
1537                // Step 6: If db is null, let db be a new database
1538                // with name name, version 0 (zero), and with no object stores.
1539                // If this fails for any reason, return an appropriate error
1540                // (e.g. a "QuotaExceededError" or "UnknownError" DOMException).
1541                let engine = match SqliteEngine::new(idb_base_dir, &key, self.thread_pool.clone()) {
1542                    Ok(engine) => engine,
1543                    Err(err) => {
1544                        let error = backend_error_from_sqlite_error(err);
1545                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1546                            id: *id,
1547                            name: db_name.clone(),
1548                            error,
1549                        }) {
1550                            debug!("Script exit during indexeddb database open {:?}", e);
1551                        }
1552                        *processed = true;
1553                        return;
1554                    },
1555                };
1556                let created_db_path = engine.created_db_path();
1557                let db = IndexedDBEnvironment::new(engine, self.manager_sender.clone());
1558                let db_version = match db.version() {
1559                    Ok(version) => version,
1560                    Err(err) => {
1561                        let error = backend_error_from_sqlite_error(err);
1562                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1563                            id: *id,
1564                            name: db_name.clone(),
1565                            error,
1566                        }) {
1567                            debug!("Script exit during indexeddb database open {:?}", e);
1568                        }
1569                        *processed = true;
1570                        return;
1571                    },
1572                };
1573
1574                *version = if created_db_path {
1575                    Some(requested_version.unwrap_or(1))
1576                } else {
1577                    Some(requested_version.unwrap_or(db_version))
1578                };
1579
1580                e.insert(db);
1581                db_version
1582            },
1583            Entry::Occupied(db) => {
1584                let db_version = match db.get().version() {
1585                    Ok(version) => version,
1586                    Err(err) => {
1587                        let error = backend_error_from_sqlite_error(err);
1588                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1589                            id: *id,
1590                            name: db_name.clone(),
1591                            error,
1592                        }) {
1593                            debug!("Script exit during indexeddb database open {:?}", e);
1594                        }
1595                        *processed = true;
1596                        return;
1597                    },
1598                };
1599                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
1600                *version = Some(requested_version.unwrap_or(db_version));
1601                db_version
1602            },
1603        };
1604
1605        let Some(version) = *version else {
1606            return debug_assert!(
1607                false,
1608                "An upgrade version should have been determined by now."
1609            );
1610        };
1611
1612        // Step 7: If db’s version is greater than version,
1613        // return a newly created "VersionError" DOMException
1614        // and abort these steps.
1615        if version < db_version {
1616            if sender
1617                .send(ConnectionMsg::VersionError {
1618                    name: db_name.clone(),
1619                    id: *id,
1620                })
1621                .is_err()
1622            {
1623                debug!("Script exit during indexeddb database open");
1624            }
1625            *processed = true;
1626            return;
1627        }
1628
1629        // Step 8: Let connection be a new connection to db.
1630        // Step 9: Set connection’s version to version.
1631        let connection = Connection {
1632            close_pending: false,
1633            sender: sender.clone(),
1634        };
1635        let entry = self.connections.entry(key.clone()).or_default();
1636        entry.insert(*id, connection);
1637
1638        // Step 10: If db’s version is less than version, then:
1639        if db_version < version {
1640            // Step 10.1: Let openConnections be the set of all connections,
1641            // except connection, associated with db.
1642            let open_connections = entry
1643                .iter_mut()
1644                .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1645            for (id_to_close, conn) in open_connections {
1646                // Step 10.2: For each entry of openConnections
1647                // queue a database task to fire a version change event
1648                // named versionchange at entry with db’s version and version.
1649                if conn
1650                    .sender
1651                    .send(ConnectionMsg::VersionChange {
1652                        name: db_name.clone(),
1653                        id: *id_to_close,
1654                        version,
1655                        old_version: db_version,
1656                    })
1657                    .is_err()
1658                {
1659                    error!("Failed to send ConnectionMsg::Connection to script.");
1660                };
1661                pending_close.insert(*id_to_close);
1662                pending_versionchange.insert(*id_to_close);
1663            }
1664            if !pending_close.is_empty() {
1665                // Step 10.3: Wait for all of the events to be fired.
1666                return;
1667            }
1668
1669            // Step 10.6: Run upgrade a database using connection, version and request.
1670            self.upgrade_database(key, version);
1671            return;
1672        }
1673
1674        // Step 11:
1675        let object_store_names = self
1676            .databases
1677            .get(&key)
1678            .and_then(|db| db.object_store_names().ok())
1679            .unwrap_or_default();
1680        *processed = true;
1681        if sender
1682            .send(ConnectionMsg::Connection {
1683                name: db_name.clone(),
1684                id: *id,
1685                version: db_version,
1686                upgraded: false,
1687                object_store_names,
1688            })
1689            .is_err()
1690        {
1691            error!("Failed to send ConnectionMsg::Connection to script.");
1692        };
1693    }
1694
1695    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1696    /// The part adding the request to the connection queue.
1697    fn start_delete_database(
1698        &mut self,
1699        key: IndexedDBDescription,
1700        id: Uuid,
1701        sender: GenericCallback<BackendResult<u64>>,
1702    ) {
1703        let open_request = OpenRequest::Delete {
1704            sender,
1705            _origin: key.origin.clone(),
1706            _db_name: key.name.clone(),
1707            processed: false,
1708            id,
1709        };
1710
1711        let should_continue = {
1712            // Step 1: Let queue be the connection queue for storageKey and name.
1713            let queue = self.connection_queues.entry(key.clone()).or_default();
1714
1715            // Step 2: Add request to queue.
1716            queue.push_back(open_request);
1717            queue.len() == 1
1718        };
1719
1720        // Step 3: Wait until all previous requests in queue have been processed.
1721        if should_continue {
1722            self.delete_database(key.clone());
1723            self.maybe_remove_front_from_queue(&key);
1724        }
1725    }
1726
1727    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1728    fn delete_database(&mut self, key: IndexedDBDescription) {
1729        let Some(queue) = self.connection_queues.get_mut(&key) else {
1730            return debug_assert!(false, "A connection queue should exist.");
1731        };
1732        let Some(open_request) = queue.front_mut() else {
1733            return debug_assert!(false, "An open request should be in the queue.");
1734        };
1735        let OpenRequest::Delete {
1736            sender,
1737            _origin: _,
1738            _db_name: _,
1739            processed,
1740            id: _,
1741        } = open_request
1742        else {
1743            return debug_assert!(
1744                false,
1745                "An request to open a connection should be in the queue."
1746            );
1747        };
1748
1749        // Step4: Let db be the database named name in storageKey, if one exists. Otherwise, return 0 (zero).
1750        let version = if let Some(db) = self.databases.remove(&key) {
1751            // Step 5: Let openConnections be the set of all connections associated with db.
1752            // Step6: For each entry of openConnections that does not have its close pending flag set to true,
1753            // queue a database task to fire a version change event named versionchange
1754            // at entry with db’s version and null.
1755            // Step 7: Wait for all of the events to be fired.
1756            // Step 8: If any of the connections in openConnections are still not closed,
1757            // queue a database task to fire a version change event
1758            // named blocked at request with db’s version and null.
1759            // Step 9: Wait until all connections in openConnections are closed.
1760            // TODO: implement connections.
1761
1762            // Step 10: Let version be db’s version.
1763            let res = db.version();
1764            let Ok(version) = res else {
1765                *processed = true;
1766                if sender
1767                    .send(BackendResult::Err(BackendError::DbErr(
1768                        res.unwrap_err().to_string(),
1769                    )))
1770                    .is_err()
1771                {
1772                    debug!("Script went away during pending database delete.");
1773                }
1774                return;
1775            };
1776
1777            // Step 11: Delete db.
1778            // If this fails for any reason,
1779            // return an appropriate error (e.g. a QuotaExceededError, or an "UnknownError" DOMException).
1780            if let Err(err) = db.delete_database() {
1781                *processed = true;
1782                if sender
1783                    .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1784                    .is_err()
1785                {
1786                    debug!("Script went away during pending database delete.");
1787                }
1788                return;
1789            };
1790
1791            version
1792        } else {
1793            0
1794        };
1795
1796        // step 12: Return version.
1797        if sender.send(BackendResult::Ok(version)).is_err() {
1798            debug!("Script went away during pending database delete.");
1799        }
1800
1801        *processed = true;
1802    }
1803
1804    /// <https://w3c.github.io/IndexedDB/#closing-connection>
1805    fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1806        // Step 1: Set connection’s close pending flag to true.
1807        // TODO: seems like a script only flag.
1808
1809        // Step 2: If the forced flag is true,
1810        // then for each transaction created using connection
1811        // run abort a transaction with transaction and newly created "AbortError" DOMException.
1812        // Step 3: Wait for all transactions created using connection to complete.
1813        // Once they are complete, connection is closed.
1814        // TODO: transaction lifecycle.
1815
1816        // Step 4: If the forced flag is true, then fire an event named close at connection.
1817        // TODO: implement, probably only on the script side of things.
1818
1819        // Note: below we are continuing
1820        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1821        // in the case that an open request is waiting for connections to close.
1822        let key = IndexedDBDescription { origin, name };
1823        let (can_upgrade, version) = {
1824            self.remove_connection(&key, &id);
1825
1826            let Some(queue) = self.connection_queues.get_mut(&key) else {
1827                return;
1828            };
1829            let Some(open_request) = queue.front_mut() else {
1830                return;
1831            };
1832            if let OpenRequest::Open {
1833                sender: _,
1834                db_name: _,
1835                version,
1836                id: _,
1837                processed: _,
1838                pending_upgrade,
1839                pending_versionchange,
1840                pending_close,
1841            } = open_request
1842            {
1843                pending_close.remove(&id);
1844                (
1845                    // Note: need to exclude requests that have already started upgrading.
1846                    pending_close.is_empty() &&
1847                        pending_versionchange.is_empty() &&
1848                        !pending_upgrade.is_some(),
1849                    *version,
1850                )
1851            } else {
1852                (false, None)
1853            }
1854        };
1855
1856        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1857        // Step 10.3: Wait for all of the events to be fired.
1858        // Step 10.5: Wait until all connections in openConnections are closed.
1859        // Note: both conditions must be checked here,
1860        // because that is the condition enabling the upgrade to proceed.
1861        if can_upgrade {
1862            // Step 10.6: Run upgrade a database using connection, version and request.
1863            let Some(version) = version else {
1864                return debug_assert!(
1865                    false,
1866                    "An upgrade version should have been determined by now."
1867                );
1868            };
1869            self.upgrade_database(key.clone(), version);
1870
1871            let was_pruned = self.maybe_remove_front_from_queue(&key);
1872            if was_pruned {
1873                self.advance_connection_queue(key);
1874            }
1875        }
1876    }
1877
1878    fn handle_sync_operation(&mut self, operation: SyncOperation) {
1879        match operation {
1880            SyncOperation::GetDatabases(sender, origin) => {
1881                // The in-parallel steps of https://www.w3.org/TR/IndexedDB/#dom-idbfactory-databases
1882
1883                // Step 4.1 Let databases be the set of databases in storageKey.
1884                // If this cannot be determined for any reason,
1885                // then queue a database task to reject p with an appropriate error
1886                // (e.g. an "UnknownError" DOMException) and terminate these steps.
1887                // TODO: separate database and connection concepts.
1888                // For now using `self.databases`, which track connections.
1889
1890                // Step 4.2: Let result be a new list.
1891                let info_list: Vec<DatabaseInfo> = self
1892                    .databases
1893                    .iter()
1894                    .filter_map(|(description, info)| {
1895                        // Step 4.3: For each db of databases:
1896                        if let Ok(version) = info.version() {
1897                            // Step 4.3.4: If db’s version is 0, then continue.
1898                            if version == 0 {
1899                                None
1900                            } else {
1901                                // Step 4.3.5: Let info be a new IDBDatabaseInfo dictionary.
1902                                // Step 4.3.6: Set info’s name dictionary member to db’s name.
1903                                // Step 4.3.7: Set info’s version dictionary member to db’s version.
1904                                // Step 4.3.8: Append info to result.
1905                                if description.origin == origin {
1906                                    Some(DatabaseInfo {
1907                                        name: description.name.clone(),
1908                                        version,
1909                                    })
1910                                } else {
1911                                    None
1912                                }
1913                            }
1914                        } else {
1915                            None
1916                        }
1917                    })
1918                    .collect();
1919
1920                // IndexedDB `databases()` / "get a list of databases" returns the visible list;
1921                // filtering out non-user-visible entries must not turn success into an internal error.
1922                // https://w3c.github.io/IndexedDB/#dom-idbfactory-databases
1923                let result = Ok(info_list);
1924
1925                // Step 4.4: Queue a database task to resolve p with result.
1926                if sender.send(result).is_err() {
1927                    debug!("Couldn't send SyncOperation::GetDatabases reply.");
1928                }
1929            },
1930            SyncOperation::CloseDatabase(origin, id, db_name) => {
1931                self.close_database(origin, id, db_name);
1932            },
1933            SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1934                self.open_a_database_connection(sender, origin, db_name, version, id);
1935            },
1936            SyncOperation::AbortPendingUpgrades {
1937                pending_upgrades,
1938                origin,
1939            } => {
1940                self.abort_pending_upgrades(pending_upgrades, origin);
1941            },
1942            SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1943                self.abort_pending_upgrade(name, id, origin);
1944            },
1945            SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1946                let idb_description = IndexedDBDescription {
1947                    origin,
1948                    name: db_name,
1949                };
1950                self.start_delete_database(idb_description, id, callback);
1951            },
1952            SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1953                // FIXME:(arihant2math) Should we error out more aggressively here?
1954                let result = self.get_database(origin, db_name).map(|db| {
1955                    let key_generator_current_number = db.key_generator_current_number(&store_name);
1956                    IndexedDBObjectStore {
1957                        key_path: db.key_path(&store_name),
1958                        has_key_generator: key_generator_current_number.is_some(),
1959                        key_generator_current_number,
1960                        indexes: db.indexes(&store_name).unwrap_or_default(),
1961                        name: store_name,
1962                    }
1963                });
1964                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1965            },
1966            SyncOperation::CreateIndex(
1967                origin,
1968                db_name,
1969                store_name,
1970                index_name,
1971                key_path,
1972                unique,
1973                multi_entry,
1974            ) => {
1975                if let Some(db) = self.get_database(origin, db_name) {
1976                    let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1977                }
1978            },
1979            SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
1980                if let Some(db) = self.get_database(origin, db_name) {
1981                    let _ = db.delete_index(&store_name, index_name);
1982                }
1983            },
1984            SyncOperation::Commit(callback, origin, db_name, txn) => {
1985                // https://w3c.github.io/IndexedDB/#commit-a-transaction
1986                // TODO: implement the commit algorithm and only reply after the backend has
1987                // transitioned the transaction to committed/aborted (should be atomic).
1988                if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
1989                    if db.can_commit_now(txn) {
1990                        if callback
1991                            .send(TxnCompleteMsg {
1992                                origin: origin.clone(),
1993                                db_name: db_name.clone(),
1994                                txn,
1995                                result: Ok(()),
1996                            })
1997                            .is_err()
1998                        {
1999                            error!(
2000                                "Failed to send immediate commit completion for db '{}' txn {}.",
2001                                db_name, txn
2002                            );
2003                        }
2004                    } else {
2005                        db.queue_pending_commit_callback(txn, callback);
2006                    }
2007                    db.schedule_transactions(origin.clone(), &db_name);
2008                } else if callback
2009                    .send(TxnCompleteMsg {
2010                        origin: origin.clone(),
2011                        db_name: db_name.clone(),
2012                        txn,
2013                        // If the database entry has already been removed, treat commit as a
2014                        // no-op success so script side completion does not spuriously abort.
2015                        result: Ok(()),
2016                    })
2017                    .is_err()
2018                {
2019                    error!(
2020                        "Failed to send commit completion for missing db '{}' txn {}.",
2021                        db_name, txn
2022                    );
2023                }
2024            },
2025            SyncOperation::Abort(abort_callback, origin, db_name, txn) => {
2026                // https://w3c.github.io/IndexedDB/#abort-a-transaction
2027                // “When a transaction is aborted the implementation must undo (roll back) any changes that were made to the database during that transaction.”
2028                // TODO: implement the abort algorithm and rollback for the engine.
2029                let pending_commit_callbacks =
2030                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2031                        let callbacks = db.take_pending_commit_callbacks(txn);
2032                        db.abort_transaction(txn);
2033                        callbacks
2034                    } else {
2035                        Vec::new()
2036                    };
2037                if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2038                    db.schedule_transactions(origin.clone(), &db_name);
2039                }
2040                for callback in pending_commit_callbacks {
2041                    if callback
2042                        .send(storage_traits::indexeddb::TxnCompleteMsg {
2043                            origin: origin.clone(),
2044                            db_name: db_name.clone(),
2045                            txn,
2046                            result: Err(BackendError::Abort),
2047                        })
2048                        .is_err()
2049                    {
2050                        error!(
2051                            "Failed to send deferred abort completion for db '{}' txn {}.",
2052                            db_name, txn
2053                        );
2054                    }
2055                }
2056                if abort_callback
2057                    .send(storage_traits::indexeddb::TxnCompleteMsg {
2058                        origin: origin.clone(),
2059                        db_name: db_name.clone(),
2060                        txn,
2061                        result: Err(BackendError::Abort),
2062                    })
2063                    .is_err()
2064                {
2065                    error!(
2066                        "Failed to send abort completion for db '{}' txn {}.",
2067                        db_name, txn
2068                    );
2069                }
2070            },
2071            SyncOperation::UpgradeTransactionFinished {
2072                origin,
2073                db_name,
2074                txn,
2075                committed,
2076            } => {
2077                self.handle_upgrade_transaction_finished(db_name, origin, txn, committed);
2078            },
2079            SyncOperation::RequestHandled {
2080                origin,
2081                db_name,
2082                txn,
2083                request_id,
2084            } => {
2085                // https://w3c.github.io/IndexedDB/#transaction-lifecycl
2086                // The implementation must attempt to commit an inactive transaction
2087                // when all requests placed against the transaction have completed
2088                // and their returned results handled, no new requests have been
2089                // placed against the transaction, and the transaction has not been aborted
2090
2091                let should_notify =
2092                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2093                        db.mark_request_handled(txn, request_id);
2094                        db.can_notify_txn_maybe_commit(txn)
2095                    } else {
2096                        false
2097                    };
2098                if should_notify {
2099                    self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
2100                        origin,
2101                        db_name,
2102                        txn,
2103                    });
2104                }
2105            },
2106            SyncOperation::TxnMaybeCommit {
2107                origin,
2108                db_name,
2109                txn,
2110            } => {
2111                self.handle_txn_maybe_commit(origin, db_name, txn);
2112            },
2113            SyncOperation::TransactionFinished {
2114                origin,
2115                db_name,
2116                txn,
2117            } => {
2118                let maybe_commit_txns =
2119                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2120                        db.finish_transaction(txn);
2121                        db.schedule_transactions(origin.clone(), &db_name);
2122                        db.maybe_commit_candidates()
2123                    } else {
2124                        Vec::new()
2125                    };
2126                for candidate in maybe_commit_txns {
2127                    self.handle_txn_maybe_commit(origin.clone(), db_name.clone(), candidate);
2128                }
2129            },
2130            SyncOperation::CreateTransaction {
2131                sender,
2132                origin,
2133                db_name,
2134                mode,
2135                scope,
2136            } => {
2137                let key = IndexedDBDescription {
2138                    origin: origin.clone(),
2139                    name: db_name.clone(),
2140                };
2141                if let Some(db) = self.databases.get_mut(&key) {
2142                    let transaction_id = self.serial_number_counter;
2143                    self.serial_number_counter += 1;
2144                    db.register_transaction(transaction_id, mode, scope);
2145                    db.schedule_transactions(origin, &db_name);
2146                    let _ = sender.send(Ok(transaction_id));
2147                } else {
2148                    let _ = sender.send(Err(BackendError::DbNotFound));
2149                }
2150            },
2151            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
2152                if let Some(db) = self.get_database_mut(origin, db_name) {
2153                    if version > db.version().unwrap_or(0) {
2154                        let _ = db.set_version(version);
2155                    }
2156                    // erroring out if the version is not upgraded can be and non-replicable
2157                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2158                } else {
2159                    let _ = sender.send(Err(BackendError::DbNotFound));
2160                }
2161            },
2162            SyncOperation::CreateObjectStore(
2163                sender,
2164                origin,
2165                db_name,
2166                store_name,
2167                key_paths,
2168                auto_increment,
2169            ) => {
2170                if let Some(db) = self.get_database_mut(origin, db_name) {
2171                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
2172                    let _ = sender.send(result.map_err(BackendError::from));
2173                } else {
2174                    let _ = sender.send(Err(BackendError::DbNotFound));
2175                }
2176            },
2177            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
2178                if let Some(db) = self.get_database_mut(origin, db_name) {
2179                    let result = db.delete_object_store(&store_name);
2180                    let _ = sender.send(result.map_err(BackendError::from));
2181                } else {
2182                    let _ = sender.send(Err(BackendError::DbNotFound));
2183                }
2184            },
2185            SyncOperation::Version(sender, origin, db_name) => {
2186                if let Some(db) = self.get_database(origin, db_name) {
2187                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2188                } else {
2189                    let _ = sender.send(Err(BackendError::DbNotFound));
2190                }
2191            },
2192            SyncOperation::NotifyEndOfVersionChange {
2193                name,
2194                id,
2195                old_version,
2196                origin,
2197            } => {
2198                self.handle_version_change_done(name, id, old_version, origin);
2199            },
2200            SyncOperation::Exit(_) => {
2201                unreachable!("We must've already broken out of event loop.");
2202            },
2203        }
2204    }
2205
2206    fn collect_memory_reports(&self) -> Vec<Report> {
2207        let mut reports = vec![];
2208        perform_memory_report(|ops| {
2209            reports.push(Report {
2210                path: path!["indexeddb"],
2211                kind: ReportKind::ExplicitJemallocHeapSize,
2212                size: self.connections.size_of(ops) +
2213                    self.databases.size_of(ops) +
2214                    self.connection_queues.size_of(ops),
2215            });
2216        });
2217        reports
2218    }
2219}