Skip to main content

storage/indexeddb/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use std::thread;
12
13use log::{debug, error, warn};
14use malloc_size_of::MallocSizeOf;
15use malloc_size_of_derive::MallocSizeOf;
16use profile_traits::generic_callback::GenericCallback;
17use profile_traits::mem::{
18    ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, perform_memory_report,
19};
20use profile_traits::path;
21use rusqlite::Error as RusqliteError;
22use rustc_hash::{FxHashMap, FxHashSet};
23use servo_base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
24use servo_base::threadpool::ThreadPool;
25use servo_url::origin::ImmutableOrigin;
26use storage_traits::client_storage::StorageProxyMap;
27use storage_traits::indexeddb::{
28    AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
29    DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
30    SyncOperation, TxnCompleteMsg,
31};
32use uuid::Uuid;
33
34use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
35use crate::shared::is_sqlite_disk_full_error;
36
37pub trait IndexedDBThreadFactory {
38    fn new(mem_profiler_chan: MemProfilerChan, reporter_name: String) -> Self;
39}
40
41impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
42    fn new(
43        mem_profiler_chan: MemProfilerChan,
44        reporter_name: String,
45    ) -> GenericSender<IndexedDBThreadMsg> {
46        let (chan, port) = generic_channel::channel().unwrap();
47        let chan2 = chan.clone();
48
49        let manager_sender = chan.clone();
50
51        thread::Builder::new()
52            .name("IndexedDBManager".to_owned())
53            .spawn(move || {
54                mem_profiler_chan.run_with_memory_reporting(
55                    || IndexedDBManager::new(port, manager_sender).start(),
56                    reporter_name,
57                    chan2,
58                    IndexedDBThreadMsg::CollectMemoryReport,
59                );
60            })
61            .expect("Thread spawning failed");
62
63        chan
64    }
65}
66
67/// A key used to track databases.
68#[derive(Clone, Debug, Eq, Hash, MallocSizeOf, PartialEq)]
69pub struct IndexedDBDescription {
70    pub origin: ImmutableOrigin,
71    pub name: String,
72}
73
74#[derive(MallocSizeOf)]
75struct TxnInfo {
76    created_seq: u64,
77    mode: IndexedDBTxnMode,
78    scope: HashSet<String>,
79    live: bool,
80}
81
82#[derive(MallocSizeOf)]
83struct IndexedDBEnvironment<E: KvsEngine> {
84    engine: E,
85    manager_sender: GenericSender<IndexedDBThreadMsg>,
86    transactions: FxHashMap<u64, KvsTransaction>,
87    next_created_seq: u64,
88    txn_info: FxHashMap<u64, TxnInfo>,
89    queued_readwrite: VecDeque<u64>,
90    queued_readonly: VecDeque<u64>,
91    // Fast membership checks + de-dup for queued_*.
92    queued_readwrite_set: FxHashSet<u64>,
93    queued_readonly_set: FxHashSet<u64>,
94    running_readwrite: Option<u64>,
95    running_readonly: HashSet<u64>,
96    handled_next_unhandled_request_id: FxHashMap<u64, u64>,
97    handled_pending: FxHashMap<u64, HashSet<u64>>,
98    pending_commit_callbacks: FxHashMap<u64, Vec<GenericCallback<TxnCompleteMsg>>>,
99}
100
101impl<E: KvsEngine> IndexedDBEnvironment<E> {
102    fn new(
103        engine: E,
104        manager_sender: GenericSender<IndexedDBThreadMsg>,
105    ) -> IndexedDBEnvironment<E> {
106        IndexedDBEnvironment {
107            engine,
108            manager_sender,
109            transactions: FxHashMap::default(),
110            next_created_seq: 0,
111            txn_info: FxHashMap::default(),
112            queued_readwrite: VecDeque::new(),
113            queued_readonly: VecDeque::new(),
114            queued_readwrite_set: FxHashSet::default(),
115            queued_readonly_set: FxHashSet::default(),
116            running_readwrite: None,
117            running_readonly: HashSet::new(),
118            handled_next_unhandled_request_id: FxHashMap::default(),
119            handled_pending: FxHashMap::default(),
120            pending_commit_callbacks: FxHashMap::default(),
121        }
122    }
123
124    fn register_transaction(&mut self, txn: u64, mode: IndexedDBTxnMode, scope: Vec<String>) {
125        if self.txn_info.contains_key(&txn) {
126            return;
127        }
128        let created_seq = self.next_created_seq;
129        self.next_created_seq += 1;
130        self.txn_info.insert(
131            txn,
132            TxnInfo {
133                created_seq,
134                mode: mode.clone(),
135                scope: scope.into_iter().collect(),
136                live: true,
137            },
138        );
139        self.transactions
140            .entry(txn)
141            .or_insert_with(|| KvsTransaction {
142                requests: VecDeque::new(),
143                mode,
144            });
145    }
146
147    fn scopes_overlap(a: &TxnInfo, b: &TxnInfo) -> bool {
148        a.scope.iter().any(|store| b.scope.contains(store))
149    }
150
151    fn earlier_overlapping_live_exists<F>(&self, txn: u64, predicate: F) -> bool
152    where
153        F: Fn(&TxnInfo) -> bool,
154    {
155        let Some(current) = self.txn_info.get(&txn) else {
156            return false;
157        };
158        self.txn_info.iter().any(|(other_txn, other)| {
159            *other_txn != txn &&
160                other.live &&
161                other.created_seq < current.created_seq &&
162                Self::scopes_overlap(current, other) &&
163                predicate(other)
164        })
165    }
166
167    fn can_start_by_spec(&self, txn: u64) -> bool {
168        let Some(info) = self.txn_info.get(&txn) else {
169            return false;
170        };
171        match info.mode {
172            IndexedDBTxnMode::Readonly => !self.earlier_overlapping_live_exists(txn, |other| {
173                other.mode != IndexedDBTxnMode::Readonly
174            }),
175            IndexedDBTxnMode::Readwrite | IndexedDBTxnMode::Versionchange => {
176                !self.earlier_overlapping_live_exists(txn, |_other| true)
177            },
178        }
179    }
180
181    fn enqueue_txn(&mut self, txn: u64, mode: &IndexedDBTxnMode) {
182        match mode {
183            IndexedDBTxnMode::Readonly => {
184                if self.queued_readonly_set.insert(txn) {
185                    self.queued_readonly.push_back(txn);
186                }
187            },
188            _ => {
189                if self.queued_readwrite_set.insert(txn) {
190                    self.queued_readwrite.push_back(txn);
191                }
192            },
193        }
194    }
195
196    fn queue_operation(
197        &mut self,
198        store_name: &str,
199        serial_number: u64,
200        mode: IndexedDBTxnMode,
201        operation: AsyncOperation,
202    ) {
203        let mut enqueue_mode = None;
204        match self.transactions.entry(serial_number) {
205            Entry::Occupied(mut entry) => {
206                let transaction = entry.get_mut();
207                let transaction_mode = transaction.mode.clone();
208                let was_empty = transaction.requests.is_empty();
209                transaction.requests.push_back(KvsOperation {
210                    operation,
211                    store_name: String::from(store_name),
212                });
213                if was_empty {
214                    // If the queue was empty and we just added work, we must enqueue the txn
215                    // even if it is currently running a prior batch. The next batch will run
216                    // after EngineTxnBatchComplete triggers scheduling again.
217                    enqueue_mode = Some(transaction_mode);
218                }
219            },
220            Entry::Vacant(entry) => {
221                entry
222                    .insert(KvsTransaction {
223                        requests: VecDeque::new(),
224                        mode: mode.clone(),
225                    })
226                    .requests
227                    .push_back(KvsOperation {
228                        operation,
229                        store_name: String::from(store_name),
230                    });
231                enqueue_mode = Some(mode);
232            },
233        };
234        if let Some(mode) = enqueue_mode {
235            self.enqueue_txn(serial_number, &mode);
236        }
237    }
238
239    /// <https://w3c.github.io/IndexedDB/#transaction-scheduling>
240    fn schedule_transactions(&mut self, origin: ImmutableOrigin, db_name: &str) {
241        let readonly_len = self.queued_readonly.len();
242        for _ in 0..readonly_len {
243            let Some(txn) = self.queued_readonly.pop_front() else {
244                break;
245            };
246
247            self.queued_readonly_set.remove(&txn);
248
249            let Some(transaction) = self.transactions.get(&txn) else {
250                continue;
251            };
252            if self.running_readonly.contains(&txn) {
253                if !transaction.requests.is_empty() && self.queued_readonly_set.insert(txn) {
254                    self.queued_readonly.push_back(txn);
255                }
256                continue;
257            }
258            if transaction.requests.is_empty() {
259                continue;
260            }
261            if !self.can_start_by_spec(txn) {
262                if self.queued_readonly_set.insert(txn) {
263                    self.queued_readonly.push_back(txn);
264                }
265                continue;
266            }
267
268            // Mark running and start async.
269            // DO NOT clear here; clear on EngineTxnBatchComplete.
270            self.running_readonly.insert(txn);
271            self.start_transaction(origin.clone(), db_name.to_string(), txn, None);
272        }
273
274        // Start at most one readwrite txn.
275        if self.running_readwrite.is_some() {
276            return;
277        }
278        let readwrite_len = self.queued_readwrite.len();
279        for _ in 0..readwrite_len {
280            let Some(txn) = self.queued_readwrite.pop_front() else {
281                break;
282            };
283
284            self.queued_readwrite_set.remove(&txn);
285
286            let Some(transaction) = self.transactions.get(&txn) else {
287                continue;
288            };
289
290            if self.running_readwrite == Some(txn) {
291                if !transaction.requests.is_empty() && self.queued_readwrite_set.insert(txn) {
292                    self.queued_readwrite.push_back(txn);
293                }
294                continue;
295            }
296
297            if transaction.requests.is_empty() {
298                continue;
299            }
300            if !self.can_start_by_spec(txn) {
301                if self.queued_readwrite_set.insert(txn) {
302                    self.queued_readwrite.push_back(txn);
303                }
304                continue;
305            }
306
307            // Mark running and start async.
308            // DO NOT clear here; clear on EngineTxnBatchComplete.
309            self.running_readwrite = Some(txn);
310            self.start_transaction(origin, db_name.to_string(), txn, None);
311            return;
312        }
313    }
314
315    // Executes all requests for a transaction (without committing)
316    fn start_transaction(
317        &mut self,
318        origin: ImmutableOrigin,
319        db_name: String,
320        txn: u64,
321        sender: Option<GenericSender<BackendResult<()>>>,
322    ) {
323        // https://w3c.github.io/IndexedDB/#transaction-lifetime
324        // Take the current queued batch of requests for this txn.
325        // If more requests arrive while the engine is running, they’ll be queued into
326        // transaction.requests and scheduled in a later batch once we receive EngineTxnBatchComplete.
327        let (mode, requests) = match self.transactions.get_mut(&txn) {
328            Some(transaction) => {
329                let mode = transaction.mode.clone();
330                let requests = std::mem::take(&mut transaction.requests);
331                (mode, requests)
332            },
333            None => {
334                // If a manual starter is waiting, treat as "nothing to do".
335                if let Some(sender) = sender {
336                    let _ = sender.send(Ok(()));
337                }
338                return;
339            },
340        };
341
342        if requests.is_empty() {
343            if let Some(sender) = sender {
344                let _ = sender.send(Ok(()));
345            }
346            // Important: if there was no work, do NOT send EngineTxnBatchComplete,
347            // otherwise we can create a pointless reschedule loop.
348            return;
349        }
350
351        let manager_sender = self.manager_sender.clone();
352        self.engine.process_transaction(
353            KvsTransaction { mode, requests },
354            Box::new(move || {
355                // Notify the manager thread when the engine finishes so it can:
356                // - clear running_readonly / running_readwrite
357                // - re-run scheduling (maybe start next queued txn or next batch)
358                if let Err(err) = manager_sender.send(IndexedDBThreadMsg::EngineTxnBatchComplete {
359                    origin,
360                    db_name,
361                    txn,
362                }) {
363                    error!(
364                        "Failed to send IndexedDBThreadMsg::EngineTxnBatchComplete: {:?}",
365                        err
366                    );
367                    // TODO: Prevent backend/manager shutdown while a transaction batch is
368                    // running so completion notification cannot be dropped in normal flow.
369                }
370
371                // We have a sender if the transaction is started manually, and they
372                // probably want to know when it is finished.
373                if let Some(sender) = sender {
374                    let _ = sender.send(Ok(()));
375                }
376            }),
377        );
378    }
379
380    fn mark_request_handled(&mut self, txn: u64, request_id: u64) {
381        let current = self
382            .handled_next_unhandled_request_id
383            .get(&txn)
384            .copied()
385            .unwrap_or(0);
386        if request_id == current {
387            let mut next = current + 1;
388            if let Some(pending) = self.handled_pending.get_mut(&txn) {
389                while pending.remove(&next) {
390                    next += 1;
391                }
392                if pending.is_empty() {
393                    self.handled_pending.remove(&txn);
394                }
395            }
396            self.handled_next_unhandled_request_id.insert(txn, next);
397        } else if request_id > current {
398            self.handled_pending
399                .entry(txn)
400                .or_default()
401                .insert(request_id);
402        }
403    }
404
405    fn can_notify_txn_maybe_commit(&self, txn: u64) -> bool {
406        if self.running_readwrite == Some(txn) || self.running_readonly.contains(&txn) {
407            return false;
408        }
409        // Avoid if the txn is still queued or has queued operations.
410        if self.queued_readonly_set.contains(&txn) || self.queued_readwrite_set.contains(&txn) {
411            return false;
412        }
413        match self.transactions.get(&txn) {
414            Some(t) => t.requests.is_empty(),
415            None => true,
416        }
417    }
418
419    fn can_commit_now(&self, txn: u64) -> bool {
420        self.can_start_by_spec(txn) && self.can_notify_txn_maybe_commit(txn)
421    }
422
423    fn queue_pending_commit_callback(
424        &mut self,
425        txn: u64,
426        callback: GenericCallback<TxnCompleteMsg>,
427    ) {
428        self.pending_commit_callbacks
429            .entry(txn)
430            .or_default()
431            .push(callback);
432    }
433
434    fn take_pending_commit_callbacks(&mut self, txn: u64) -> Vec<GenericCallback<TxnCompleteMsg>> {
435        self.pending_commit_callbacks
436            .remove(&txn)
437            .unwrap_or_default()
438    }
439
440    fn maybe_commit_candidates(&self) -> Vec<u64> {
441        let mut candidates: Vec<(u64, u64)> = self
442            .txn_info
443            .iter()
444            .filter_map(|(txn, info)| {
445                if !info.live || !self.can_commit_now(*txn) {
446                    return None;
447                }
448                Some((*txn, info.created_seq))
449            })
450            .collect();
451        candidates.sort_by_key(|(_, created_seq)| *created_seq);
452        candidates.into_iter().map(|(txn, _)| txn).collect()
453    }
454
455    fn finish_transaction(&mut self, txn: u64) {
456        if let Some(info) = self.txn_info.get_mut(&txn) {
457            info.live = false;
458        }
459        self.txn_info.remove(&txn);
460        self.transactions.remove(&txn);
461        self.queued_readonly.retain(|queued| *queued != txn);
462        self.queued_readwrite.retain(|queued| *queued != txn);
463        self.queued_readonly_set.remove(&txn);
464        self.queued_readwrite_set.remove(&txn);
465        if self.running_readwrite == Some(txn) {
466            self.running_readwrite = None;
467        }
468        self.running_readonly.remove(&txn);
469        self.handled_next_unhandled_request_id.remove(&txn);
470        self.handled_pending.remove(&txn);
471        self.pending_commit_callbacks.remove(&txn);
472    }
473
474    fn abort_transaction(&mut self, txn: u64) {
475        // Keep scheduling metadata until script reports TransactionFinished.
476        // https://w3c.github.io/IndexedDB/#transaction-lifetime
477        self.transactions.remove(&txn);
478        self.queued_readonly.retain(|queued| *queued != txn);
479        self.queued_readwrite.retain(|queued| *queued != txn);
480        self.queued_readonly_set.remove(&txn);
481        self.queued_readwrite_set.remove(&txn);
482        if self.running_readwrite == Some(txn) {
483            self.running_readwrite = None;
484        }
485        self.running_readonly.remove(&txn);
486        self.handled_next_unhandled_request_id.remove(&txn);
487        self.handled_pending.remove(&txn);
488        self.pending_commit_callbacks.remove(&txn);
489    }
490
491    fn key_generator_current_number(&self, store_name: &str) -> Option<i32> {
492        self.engine.key_generator_current_number(store_name)
493    }
494
495    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
496        self.engine.key_path(store_name)
497    }
498
499    fn object_store_names(&self) -> DbResult<Vec<String>> {
500        self.engine
501            .object_store_names()
502            .map_err(|err| format!("{err:?}"))
503    }
504
505    fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
506        self.engine
507            .indexes(store_name)
508            .map_err(|err| format!("{err:?}"))
509    }
510
511    fn create_index(
512        &self,
513        store_name: &str,
514        index_name: String,
515        key_path: KeyPath,
516        unique: bool,
517        multi_entry: bool,
518    ) -> DbResult<CreateObjectResult> {
519        self.engine
520            .create_index(store_name, index_name, key_path, unique, multi_entry)
521            .map_err(|err| format!("{err:?}"))
522    }
523
524    fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
525        self.engine
526            .delete_index(store_name, index_name)
527            .map_err(|err| format!("{err:?}"))
528    }
529
530    fn create_object_store(
531        &mut self,
532        store_name: &str,
533        key_path: Option<KeyPath>,
534        auto_increment: bool,
535    ) -> DbResult<CreateObjectResult> {
536        self.engine
537            .create_store(store_name, key_path, auto_increment)
538            .map_err(|err| format!("{err:?}"))
539    }
540
541    fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
542        // https://www.w3.org/TR/IndexedDB-3/#dom-idbdatabase-deleteobjectstore
543        // Step 7. Destroy store.
544        let indexes = self
545            .engine
546            .indexes(store_name)
547            .map_err(|err| format!("{err:?}"))?;
548        for index in indexes {
549            self.engine
550                .delete_index(store_name, index.name)
551                .map_err(|err| format!("{err:?}"))?;
552        }
553        self.engine
554            .delete_store(store_name)
555            .map_err(|err| format!("{err:?}"))
556    }
557
558    fn version(&self) -> Result<u64, E::Error> {
559        self.engine.version()
560    }
561
562    fn set_version(&mut self, version: u64) -> DbResult<()> {
563        self.engine
564            .set_version(version)
565            .map_err(|err| format!("{err:?}"))
566    }
567}
568
569fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
570    if is_sqlite_disk_full_error(&err) {
571        BackendError::QuotaExceeded
572    } else {
573        BackendError::DbErr(format!("{err:?}"))
574    }
575}
576
577/// <https://w3c.github.io/IndexedDB/#request-open-request>
578/// Used here to implement the
579/// <https://w3c.github.io/IndexedDB/#connection-queue>
580#[derive(MallocSizeOf)]
581enum OpenRequest {
582    Open {
583        /// The callback used to send a result to script.
584        sender: GenericCallback<ConnectionMsg>,
585
586        /// The name of the database.
587        db_name: String,
588
589        /// Optionnaly, a requested db version.
590        /// Note: when the open algorithm starts, this will be mutated and set to something as per the algo.
591        version: Option<u64>,
592
593        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
594        processed: bool,
595
596        /// Optionally, a version pending upgrade.
597        pending_upgrade: Option<VersionUpgrade>,
598
599        /// This request is pending on these connections to close.
600        pending_close: HashSet<Uuid>,
601
602        /// This request is pending on these connections to fire a versionchange event.
603        /// Note: This starts as equal to `pending_close`, but when all events have fired,
604        /// not all connections need to have closed, in which case the `blocked` event
605        /// is fired on this request.
606        pending_versionchange: HashSet<Uuid>,
607
608        id: Uuid,
609
610        /// <https://storage.spec.whatwg.org/#storage-proxy-map>
611        proxy_map: StorageProxyMap,
612    },
613    Delete {
614        /// The callback used to send a result to script.
615        sender: GenericCallback<BackendResult<u64>>,
616
617        _origin: ImmutableOrigin,
618
619        /// The name of the database.
620        /// Note: will be used when the full spec is implemented.
621        db_name: String,
622
623        /// <https://w3c.github.io/IndexedDB/#request-processed-flag>
624        processed: bool,
625
626        id: Uuid,
627
628        /// <https://storage.spec.whatwg.org/#storage-proxy-map>
629        proxy_map: StorageProxyMap,
630    },
631}
632
633impl OpenRequest {
634    fn get_id(&self) -> Uuid {
635        let id = match self {
636            OpenRequest::Open {
637                sender: _,
638                db_name: _,
639                version: _,
640                processed: _,
641                pending_upgrade: _,
642                pending_close: _,
643                pending_versionchange: _,
644                id,
645                proxy_map: _,
646            } => id,
647            OpenRequest::Delete {
648                sender: _,
649                _origin: _,
650                db_name: _,
651                processed: _,
652                proxy_map: _,
653                id,
654            } => id,
655        };
656        *id
657    }
658
659    fn is_open(&self) -> bool {
660        match self {
661            OpenRequest::Open {
662                sender: _,
663                db_name: _,
664                version: _,
665                processed: _,
666                pending_upgrade: _,
667                pending_close: _,
668                pending_versionchange: _,
669                id: _,
670                proxy_map: _,
671            } => true,
672            OpenRequest::Delete {
673                sender: _,
674                _origin: _,
675                db_name: _,
676                processed: _,
677                proxy_map: _,
678                id: _,
679            } => false,
680        }
681    }
682
683    /// An open request remains pending until it has been processed,
684    /// and while waiting on upgrade completion or other connections.
685    fn is_pending(&self) -> bool {
686        match self {
687            OpenRequest::Open {
688                sender: _,
689                db_name: _,
690                version: _,
691                processed,
692                pending_upgrade,
693                pending_close,
694                pending_versionchange,
695                id: _,
696                proxy_map: _,
697            } => {
698                !processed ||
699                    pending_upgrade.is_some() ||
700                    !pending_close.is_empty() ||
701                    !pending_versionchange.is_empty()
702            },
703            OpenRequest::Delete {
704                sender: _,
705                _origin: _,
706                db_name: _,
707                processed,
708                id: _,
709                proxy_map: _,
710            } => !processed,
711        }
712    }
713
714    /// Abort the open request,
715    /// optionally returning a version to revert to.
716    fn abort(&self) -> Option<u64> {
717        match self {
718            OpenRequest::Open {
719                sender,
720                db_name,
721                version: _,
722                processed: _,
723                pending_close: _,
724                pending_versionchange: _,
725                pending_upgrade,
726                id,
727                proxy_map: _,
728            } => {
729                if sender
730                    .send(ConnectionMsg::AbortError {
731                        name: db_name.clone(),
732                        id: *id,
733                    })
734                    .is_err()
735                {
736                    error!("Failed to send ConnectionMsg::Connection to script.");
737                };
738                pending_upgrade.as_ref().map(|upgrade| upgrade.old)
739            },
740            OpenRequest::Delete {
741                sender,
742                _origin: _,
743                db_name: _,
744                processed: _,
745                id: _,
746                proxy_map: _,
747            } => {
748                if sender.send(Err(BackendError::DbNotFound)).is_err() {
749                    error!("Failed to send result of database delete to script.");
750                };
751                None
752            },
753        }
754    }
755}
756
757#[derive(MallocSizeOf)]
758struct VersionUpgrade {
759    old: u64,
760    new: u64,
761    transaction: u64,
762}
763
764/// <https://w3c.github.io/IndexedDB/#connection>
765#[derive(MallocSizeOf)]
766struct Connection {
767    /// <https://w3c.github.io/IndexedDB/#connection-close-pending-flag>
768    close_pending: bool,
769
770    /// The callback used to send a result to script.
771    sender: GenericCallback<ConnectionMsg>,
772}
773
774struct IndexedDBManager {
775    port: GenericReceiver<IndexedDBThreadMsg>,
776    manager_sender: GenericSender<IndexedDBThreadMsg>,
777    databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
778    thread_pool: Arc<ThreadPool>,
779
780    /// A global counter to produce unique transaction ids.
781    /// TODO: remove once db connections lifecyle is managed.
782    /// A global counter is only necessary because of how deleting a db currently
783    /// does not wait for connection to close and transactions to finish.
784    serial_number_counter: u64,
785
786    /// <https://w3c.github.io/IndexedDB/#connection-queue>
787    connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
788
789    /// <https://w3c.github.io/IndexedDB/#connection>
790    connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
791}
792
793impl IndexedDBManager {
794    fn new(
795        port: GenericReceiver<IndexedDBThreadMsg>,
796        manager_sender: GenericSender<IndexedDBThreadMsg>,
797    ) -> IndexedDBManager {
798        debug!("New indexedDBManager");
799
800        IndexedDBManager {
801            port,
802            manager_sender,
803            databases: HashMap::new(),
804            thread_pool: ThreadPool::global(),
805            serial_number_counter: 0,
806            connection_queues: Default::default(),
807            connections: Default::default(),
808        }
809    }
810}
811
812impl IndexedDBManager {
813    fn start(&mut self) {
814        loop {
815            // FIXME:(arihant2math) No message *most likely* means that
816            // the ipc sender has been dropped, so we break the look
817            let message = match self.port.recv() {
818                Ok(msg) => msg,
819                Err(ReceiveError::Disconnected) => {
820                    break;
821                },
822                Err(e) => {
823                    warn!("Error in IndexedDB thread: {e:?}");
824                    continue;
825                },
826            };
827            match message {
828                IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
829                    let _ = sender.send(());
830                    break;
831                },
832                IndexedDBThreadMsg::Sync(operation) => {
833                    self.handle_sync_operation(operation);
834                },
835                IndexedDBThreadMsg::Async(
836                    origin,
837                    db_name,
838                    store_name,
839                    txn,
840                    _request_id,
841                    mode,
842                    operation,
843                ) => {
844                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
845                        // Queues an operation for a transaction without starting it
846                        db.queue_operation(&store_name, txn, mode, operation);
847                        db.schedule_transactions(origin, &db_name);
848                    }
849                },
850                IndexedDBThreadMsg::EngineTxnBatchComplete {
851                    origin,
852                    db_name,
853                    txn,
854                } => {
855                    let should_notify =
856                        if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
857                            // Decide which running flag to clear based on txn mode.
858                            let mode = db.transactions.get(&txn).map(|t| t.mode.clone());
859
860                            match mode {
861                                Some(IndexedDBTxnMode::Readonly) => {
862                                    db.running_readonly.remove(&txn);
863                                },
864                                Some(_) if db.running_readwrite == Some(txn) => {
865                                    db.running_readwrite = None;
866                                },
867                                Some(_) => {},
868                                None => {
869                                    // txn might have been aborted/removed; nothing to clear
870                                },
871                            }
872
873                            // If more requests were queued while this batch was running,
874                            // schedule again now.
875                            db.schedule_transactions(origin.clone(), &db_name);
876                            db.can_notify_txn_maybe_commit(txn)
877                        } else {
878                            false
879                        };
880                    if should_notify {
881                        self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
882                            origin,
883                            db_name,
884                            txn,
885                        });
886                    }
887                },
888                IndexedDBThreadMsg::CollectMemoryReport(sender) => {
889                    let reports = self.collect_memory_reports();
890                    sender.send(ProcessReports::new(reports));
891                },
892            }
893        }
894    }
895
896    fn dispatch_txn_maybe_commit(&self, origin: ImmutableOrigin, db_name: String, txn: u64) {
897        let key = IndexedDBDescription {
898            origin,
899            name: db_name.clone(),
900        };
901        let Some(connections) = self.connections.get(&key) else {
902            return;
903        };
904        for connection in connections.values() {
905            if connection.close_pending {
906                continue;
907            }
908            let _ = connection.sender.send(ConnectionMsg::TxnMaybeCommit {
909                db_name: db_name.clone(),
910                txn,
911            });
912        }
913    }
914
915    fn handle_txn_maybe_commit(&mut self, origin: ImmutableOrigin, db_name: String, txn: u64) {
916        let key = IndexedDBDescription {
917            origin: origin.clone(),
918            name: db_name.clone(),
919        };
920        let callbacks = {
921            let Some(db) = self.databases.get_mut(&key) else {
922                return;
923            };
924            if !db.can_commit_now(txn) {
925                return;
926            }
927            db.take_pending_commit_callbacks(txn)
928        };
929
930        if callbacks.is_empty() {
931            self.dispatch_txn_maybe_commit(origin, db_name, txn);
932            return;
933        }
934
935        for callback in callbacks {
936            if callback
937                .send(TxnCompleteMsg {
938                    origin: origin.clone(),
939                    db_name: db_name.clone(),
940                    txn,
941                    result: Ok(()),
942                })
943                .is_err()
944            {
945                error!(
946                    "Failed to send deferred commit completion for db '{}' txn {}.",
947                    db_name, txn
948                );
949            }
950        }
951    }
952
953    /// Handle when an upgrade transaction finishes in script.
954    fn handle_upgrade_transaction_finished(
955        &mut self,
956        name: String,
957        origin: ImmutableOrigin,
958        txn: u64,
959        committed: bool,
960    ) {
961        let key = IndexedDBDescription {
962            name: name.clone(),
963            origin: origin.clone(),
964        };
965
966        if committed {
967            let Some(queue) = self.connection_queues.get_mut(&key) else {
968                return debug_assert!(false, "A connection queue should exist.");
969            };
970            let Some(front) = queue.front() else {
971                return debug_assert!(false, "A pending open request should exist.");
972            };
973            let OpenRequest::Open {
974                pending_upgrade: Some(pending_upgrade),
975                ..
976            } = front
977            else {
978                return;
979            };
980            if pending_upgrade.transaction != txn {
981                return;
982            }
983
984            let Some(open_request) = queue.pop_front() else {
985                return;
986            };
987            let OpenRequest::Open {
988                sender,
989                db_name,
990                version: _,
991                processed: _,
992                pending_upgrade: Some(pending_upgrade),
993                pending_close: _,
994                pending_versionchange: _,
995                id,
996                proxy_map: _,
997            } = open_request
998            else {
999                return;
1000            };
1001            let VersionUpgrade { new, .. } = pending_upgrade;
1002            let object_store_names = self
1003                .databases
1004                .get(&key)
1005                .and_then(|db| db.object_store_names().ok())
1006                .unwrap_or_default();
1007            if sender
1008                .send(ConnectionMsg::Connection {
1009                    id,
1010                    name: db_name,
1011                    version: new,
1012                    upgraded: true,
1013                    object_store_names,
1014                })
1015                .is_err()
1016            {
1017                error!("Failed to send ConnectionMsg::Connection to script.");
1018            };
1019
1020            self.advance_connection_queue(key);
1021            return;
1022        }
1023
1024        let (request_id, proxy_map, db_name) = {
1025            let Some(queue) = self.connection_queues.get_mut(&key) else {
1026                return debug_assert!(false, "A connection queue should exist.");
1027            };
1028            let Some(front) = queue.front() else {
1029                return debug_assert!(false, "A pending open request should exist.");
1030            };
1031            let OpenRequest::Open {
1032                pending_upgrade: Some(pending_upgrade),
1033                id,
1034                proxy_map,
1035                db_name,
1036                ..
1037            } = front
1038            else {
1039                return;
1040            };
1041            if pending_upgrade.transaction != txn {
1042                return;
1043            }
1044            (*id, (*proxy_map).clone(), db_name.clone())
1045        };
1046
1047        self.abort_pending_upgrade(name, request_id, origin, db_name, &proxy_map);
1048    }
1049
1050    /// Run the next open request in the queue.
1051    fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
1052        loop {
1053            let is_open = {
1054                let Some(queue) = self.connection_queues.get_mut(&key) else {
1055                    return;
1056                };
1057                if queue.is_empty() {
1058                    return;
1059                }
1060                queue.front().expect("Queue is not empty.").is_open()
1061            };
1062
1063            if is_open {
1064                self.open_database(key.clone());
1065            } else {
1066                self.delete_database(key.clone());
1067            }
1068
1069            let was_pruned = self.maybe_remove_front_from_queue(&key);
1070
1071            if !was_pruned {
1072                // Note: requests to delete a database are, at this point in the implementation,
1073                // done in one step; so we can continue on to the next request.
1074                // Request to open a connection consists of multiple async steps, so we must break if
1075                // it is still pending.
1076                break;
1077            }
1078        }
1079    }
1080
1081    /// Remove the record at the front if it is not pending.
1082    fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
1083        let (is_empty, was_pruned) = {
1084            let Some(queue) = self.connection_queues.get_mut(key) else {
1085                debug_assert!(false, "A connection queue should exist.");
1086                return false;
1087            };
1088            let mut pruned = false;
1089            let front_is_pending = queue.front().map(|record| record.is_pending());
1090            if let Some(is_pending) = front_is_pending &&
1091                !is_pending
1092            {
1093                queue.pop_front().expect("Queue has a non-pending item.");
1094                pruned = true
1095            }
1096            (queue.is_empty(), pruned)
1097        };
1098        if is_empty {
1099            self.connection_queues.remove(key);
1100        }
1101        was_pruned
1102    }
1103
1104    fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
1105        let is_empty = {
1106            let Some(connections) = self.connections.get_mut(key) else {
1107                return debug!("Connection already removed.");
1108            };
1109            connections.remove(id);
1110            connections.is_empty()
1111        };
1112
1113        if is_empty {
1114            self.connections.remove(key);
1115        }
1116    }
1117
1118    /// Revert the backing database state after aborting an upgrade transaction.
1119    ///
1120    /// <https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction>
1121    /// IndexedDB §5.8 step 3 restores the previous version, or `0` if the database
1122    /// was newly created. Step 4 restores the previous object store set, or the
1123    /// empty set if the database was newly created. Servo eagerly creates the
1124    /// backing database with version `0` and no stores during open, so aborting
1125    /// that first upgrade must roll back to the pre-creation state by deleting the
1126    /// placeholder backing store entirely.
1127    ///
1128    /// Related: <https://github.com/servo/servo/pull/42998>
1129    fn revert_aborted_upgrade(
1130        &mut self,
1131        key: &IndexedDBDescription,
1132        old_version: u64,
1133        db_name: String,
1134        proxy_map: &StorageProxyMap,
1135    ) {
1136        if old_version == 0 {
1137            if let Some(db) = self.databases.remove(key) {
1138                // Note: ensure db is dropped before deleting directory,
1139                // to get around windows file locks.
1140                drop(db);
1141                let response = proxy_map
1142                    .handle
1143                    .delete_database(proxy_map.bottle_id, db_name.clone())
1144                    .recv();
1145                if response.is_err() {
1146                    error!("Failed to communicate with client storage.");
1147                    return;
1148                }
1149                if response.unwrap().is_err() {
1150                    error!("Failed to delete database {:?}", db_name);
1151                }
1152            }
1153            return;
1154        }
1155
1156        let Some(db) = self.databases.get_mut(key) else {
1157            return debug_assert!(false, "Db should have been created");
1158        };
1159        let res = db.set_version(old_version);
1160        debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1161    }
1162
1163    /// Aborting the current upgrade for an origin.
1164    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1165    /// Note: this only reverts the version at this point.
1166    fn abort_pending_upgrade(
1167        &mut self,
1168        name: String,
1169        id: Uuid,
1170        origin: ImmutableOrigin,
1171        db_name: String,
1172        proxy_map: &StorageProxyMap,
1173    ) {
1174        let key = IndexedDBDescription { name, origin };
1175        let old = {
1176            let Some(queue) = self.connection_queues.get_mut(&key) else {
1177                return debug_assert!(
1178                    false,
1179                    "There should be a connection queue for the aborted upgrade."
1180                );
1181            };
1182            let Some(open_request) = queue.pop_front() else {
1183                return debug_assert!(false, "There should be an open request to upgrade.");
1184            };
1185            if open_request.get_id() != id {
1186                return debug_assert!(
1187                    false,
1188                    "Open request to abort should be at the head of the queue."
1189                );
1190            }
1191            open_request.abort()
1192        };
1193        if let Some(old_version) = old {
1194            self.revert_aborted_upgrade(&key, old_version, db_name, proxy_map);
1195        }
1196
1197        self.remove_connection(&key, &id);
1198
1199        self.advance_connection_queue(key);
1200    }
1201
1202    /// Aborting all upgrades for an origin
1203    // https://w3c.github.io/IndexedDB/#abort-an-upgrade-transaction
1204    /// Note: this only reverts the version at this point.
1205    fn abort_pending_upgrades(
1206        &mut self,
1207        pending_upgrades: HashMap<String, HashSet<Uuid>>,
1208        origin: ImmutableOrigin,
1209        proxy_map: StorageProxyMap,
1210    ) {
1211        for (name, ids) in pending_upgrades.into_iter() {
1212            let mut version_to_revert: Option<u64> = None;
1213            let key = IndexedDBDescription {
1214                name: name.clone(),
1215                origin: origin.clone(),
1216            };
1217            for id in ids.iter() {
1218                self.remove_connection(&key, id);
1219            }
1220            {
1221                let is_empty = {
1222                    let Some(queue) = self.connection_queues.get_mut(&key) else {
1223                        continue;
1224                    };
1225                    queue.retain_mut(|open_request| {
1226                        if ids.contains(&open_request.get_id()) {
1227                            let old = open_request.abort();
1228                            if version_to_revert.is_none() &&
1229                                let Some(old) = old
1230                            {
1231                                version_to_revert = Some(old);
1232                            }
1233                            false
1234                        } else {
1235                            true
1236                        }
1237                    });
1238                    queue.is_empty()
1239                };
1240                if is_empty {
1241                    self.connection_queues.remove(&key);
1242                }
1243            }
1244            if let Some(version) = version_to_revert {
1245                self.revert_aborted_upgrade(&key, version, name, &proxy_map);
1246            }
1247        }
1248    }
1249
1250    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1251    fn open_a_database_connection(
1252        &mut self,
1253        sender: GenericCallback<ConnectionMsg>,
1254        origin: ImmutableOrigin,
1255        db_name: String,
1256        version: Option<u64>,
1257        id: Uuid,
1258        proxy_map: StorageProxyMap,
1259    ) {
1260        let key = IndexedDBDescription {
1261            name: db_name.clone(),
1262            origin,
1263        };
1264        let open_request = OpenRequest::Open {
1265            sender,
1266            db_name,
1267            version,
1268            processed: false,
1269            pending_close: Default::default(),
1270            pending_versionchange: Default::default(),
1271            pending_upgrade: None,
1272            id,
1273            proxy_map,
1274        };
1275        let should_continue = {
1276            // Step 1: Let queue be the connection queue for storageKey and name.
1277            let queue = self.connection_queues.entry(key.clone()).or_default();
1278
1279            // Step 2: Add request to queue.
1280            queue.push_back(open_request);
1281            queue.len() == 1
1282        };
1283
1284        // Step 3: Wait until all previous requests in queue have been processed.
1285        if should_continue {
1286            self.open_database(key.clone());
1287            self.maybe_remove_front_from_queue(&key);
1288        }
1289    }
1290
1291    fn get_database(
1292        &self,
1293        origin: ImmutableOrigin,
1294        db_name: String,
1295    ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
1296        let idb_description = IndexedDBDescription {
1297            origin,
1298            name: db_name,
1299        };
1300
1301        self.databases.get(&idb_description)
1302    }
1303
1304    fn get_database_mut(
1305        &mut self,
1306        origin: ImmutableOrigin,
1307        db_name: String,
1308    ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
1309        let idb_description = IndexedDBDescription {
1310            origin,
1311            name: db_name,
1312        };
1313
1314        self.databases.get_mut(&idb_description)
1315    }
1316
1317    /// <https://w3c.github.io/IndexedDB/#upgrade-a-database>
1318    /// To upgrade a database with connection (a connection),
1319    /// a new version, and a request, run these steps:
1320    fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
1321        let Some(queue) = self.connection_queues.get_mut(&key) else {
1322            return debug_assert!(false, "A connection queue should exist.");
1323        };
1324        let Some(open_request) = queue.front_mut() else {
1325            return debug_assert!(false, "An open request should be in the queue.");
1326        };
1327        let OpenRequest::Open {
1328            sender,
1329            db_name,
1330            version: _,
1331            processed,
1332            id,
1333            pending_close: _,
1334            pending_versionchange: _,
1335            pending_upgrade,
1336            proxy_map: _,
1337        } = open_request
1338        else {
1339            return;
1340        };
1341
1342        // Step 1: Let db be connection’s database.
1343        let db = self
1344            .databases
1345            .get_mut(&key)
1346            .expect("Db should have been opened.");
1347
1348        // Step 2: Let transaction be a new upgrade transaction with connection used as connection.
1349        let transaction = self.serial_number_counter;
1350        self.serial_number_counter += 1;
1351
1352        // Step 3: Set transaction’s scope to connection’s object store set.
1353        let scope = db
1354            .object_store_names()
1355            .expect("Fetching object store names should not fail.");
1356
1357        // Step 4: Set db’s upgrade transaction to transaction.
1358        // Backend tracks the active upgrade transaction in `pending_upgrade` below.
1359        db.register_transaction(transaction, IndexedDBTxnMode::Versionchange, scope.clone());
1360
1361        // Step 5: Set transaction’s state to inactive.
1362        // Step 6: Start transaction.
1363        // Backend transactions are started by the scheduler when requests are queued;
1364        // newly created upgrade transactions are therefore initially inactive.
1365
1366        // Step 7: Let old version be db’s version.
1367        let old_version = db.version().expect("DB should have a version.");
1368
1369        // Step 8: Set db’s version to version. This change is considered part of the
1370        // transaction, and so if the transaction is aborted, this change is reverted.
1371        db.set_version(new_version)
1372            .expect("Setting the version should not fail");
1373
1374        // Step 9: Set request’s processed flag to true.
1375        *processed = true;
1376        let _ = pending_upgrade.insert(VersionUpgrade {
1377            old: old_version,
1378            new: new_version,
1379            transaction,
1380        });
1381
1382        // Step 10: Queue a database task to run these steps.
1383        if sender
1384            .send(ConnectionMsg::Upgrade {
1385                id: *id,
1386                name: db_name.clone(),
1387                version: new_version,
1388                old_version,
1389                transaction,
1390                object_store_names: scope,
1391            })
1392            .is_err()
1393        {
1394            error!("Couldn't queue task for indexeddb upgrade event.");
1395        }
1396
1397        // Step 11: Wait for transaction to finish.
1398        // Queue progression remains blocked while `pending_upgrade` is set.
1399    }
1400
1401    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1402    fn handle_version_change_done(
1403        &mut self,
1404        name: String,
1405        from_id: Uuid,
1406        old_version: u64,
1407        origin: ImmutableOrigin,
1408    ) {
1409        let key = IndexedDBDescription {
1410            name: name.clone(),
1411            origin,
1412        };
1413        let (can_upgrade, version) = {
1414            let Some(queue) = self.connection_queues.get_mut(&key) else {
1415                return debug_assert!(false, "A connection queue should exist.");
1416            };
1417            let Some(open_request) = queue.front_mut() else {
1418                return debug_assert!(false, "An open request should be in the queue.");
1419            };
1420            let OpenRequest::Open {
1421                sender,
1422                db_name: _,
1423                version,
1424                id,
1425                pending_upgrade: _,
1426                processed: _,
1427                pending_versionchange,
1428                pending_close,
1429                proxy_map: _,
1430            } = open_request
1431            else {
1432                return debug_assert!(
1433                    false,
1434                    "An request to open a connection should be in the queue."
1435                );
1436            };
1437            debug_assert!(
1438                pending_versionchange.contains(&from_id),
1439                "The open request should be pending on the versionchange event for the connection sending the message."
1440            );
1441
1442            pending_versionchange.remove(&from_id);
1443
1444            // Step 10.3: Wait for all of the events to be fired.
1445            if !pending_versionchange.is_empty() {
1446                return;
1447            }
1448
1449            let Some(version) = *version else {
1450                return debug_assert!(
1451                    false,
1452                    "An upgrade version should have been determined by now."
1453                );
1454            };
1455
1456            // Step 10.4: If any of the connections in openConnections are still not closed,
1457            // queue a database task to fire a version change event named blocked
1458            // at request with db’s version and version.
1459            if !pending_close.is_empty() &&
1460                sender
1461                    .send(ConnectionMsg::Blocked {
1462                        name,
1463                        id: *id,
1464                        version,
1465                        old_version,
1466                    })
1467                    .is_err()
1468            {
1469                return debug!("Script exit during indexeddb database open");
1470            }
1471
1472            (pending_close.is_empty(), version)
1473        };
1474
1475        // Step 10.5: Wait until all connections in openConnections are closed.
1476        // Note: if we still need to wait, the algorithm will continue in the handling of the close message.
1477        if can_upgrade {
1478            // Step 10.6: Run upgrade a database using connection, version and request.
1479            self.upgrade_database(key.clone(), version);
1480
1481            let was_pruned = self.maybe_remove_front_from_queue(&key);
1482            if was_pruned {
1483                self.advance_connection_queue(key);
1484            }
1485        }
1486    }
1487
1488    /// <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1489    /// The part where the open request is ready for processing.
1490    fn open_database(&mut self, key: IndexedDBDescription) {
1491        let Some(queue) = self.connection_queues.get_mut(&key) else {
1492            return debug_assert!(false, "A connection queue should exist.");
1493        };
1494        let Some(open_request) = queue.front_mut() else {
1495            return debug_assert!(false, "An open request should be in the queue.");
1496        };
1497        let OpenRequest::Open {
1498            sender,
1499            db_name,
1500            version,
1501            id,
1502            processed,
1503            pending_upgrade: _pending_upgrade,
1504            pending_close,
1505            pending_versionchange,
1506            proxy_map,
1507        } = open_request
1508        else {
1509            return debug_assert!(
1510                false,
1511                "An request to open a connection should be in the queue."
1512            );
1513        };
1514
1515        let requested_version = *version;
1516
1517        // Step 4: Let db be the database named name in origin, or null otherwise.
1518        let db_version = match self.databases.entry(key.clone()) {
1519            Entry::Vacant(e) => {
1520                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
1521                // Note: done below with the zero as first tuple item.
1522
1523                // https://www.w3.org/TR/IndexedDB/#open-a-database-connection
1524                // Step 6: If db is null, let db be a new database
1525                // with name name, version 0 (zero), and with no object stores.
1526                // If this fails for any reason, return an appropriate error
1527                // (e.g. a "QuotaExceededError" or "UnknownError" DOMException).
1528                let Ok(response) = proxy_map
1529                    .handle
1530                    .create_database(proxy_map.bottle_id, db_name.clone())
1531                    .recv()
1532                else {
1533                    if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1534                        id: *id,
1535                        name: db_name.clone(),
1536                        error: BackendError::DbErr(
1537                            "Failed to communicate with client storage.".to_string(),
1538                        ),
1539                    }) {
1540                        debug!("Script exit during indexeddb database open {:?}", e);
1541                    }
1542                    return;
1543                };
1544                let (path, created) = match response {
1545                    Ok((path, created)) => (path, created),
1546                    Err(err) => {
1547                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1548                            id: *id,
1549                            name: db_name.clone(),
1550                            error: BackendError::DbErr(format!("{err:?}")),
1551                        }) {
1552                            debug!("Script exit during indexeddb database open {:?}", e);
1553                        }
1554                        return;
1555                    },
1556                };
1557                let engine = match SqliteEngine::new(path, created, &key, self.thread_pool.clone())
1558                {
1559                    Ok(engine) => engine,
1560                    Err(err) => {
1561                        let error = backend_error_from_sqlite_error(err);
1562                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1563                            id: *id,
1564                            name: db_name.clone(),
1565                            error,
1566                        }) {
1567                            debug!("Script exit during indexeddb database open {:?}", e);
1568                        }
1569                        *processed = true;
1570                        return;
1571                    },
1572                };
1573                let created_db_path = engine.created_db_path();
1574                let db = IndexedDBEnvironment::new(engine, self.manager_sender.clone());
1575                let db_version = match db.version() {
1576                    Ok(version) => version,
1577                    Err(err) => {
1578                        let error = backend_error_from_sqlite_error(err);
1579                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1580                            id: *id,
1581                            name: db_name.clone(),
1582                            error,
1583                        }) {
1584                            debug!("Script exit during indexeddb database open {:?}", e);
1585                        }
1586                        *processed = true;
1587                        return;
1588                    },
1589                };
1590
1591                *version = if created_db_path {
1592                    Some(requested_version.unwrap_or(1))
1593                } else {
1594                    Some(requested_version.unwrap_or(db_version))
1595                };
1596
1597                e.insert(db);
1598                db_version
1599            },
1600            Entry::Occupied(db) => {
1601                let db_version = match db.get().version() {
1602                    Ok(version) => version,
1603                    Err(err) => {
1604                        let error = backend_error_from_sqlite_error(err);
1605                        if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1606                            id: *id,
1607                            name: db_name.clone(),
1608                            error,
1609                        }) {
1610                            debug!("Script exit during indexeddb database open {:?}", e);
1611                        }
1612                        *processed = true;
1613                        return;
1614                    },
1615                };
1616                // Step 5: If version is undefined, let version be 1 if db is null, or db’s version otherwise.
1617                *version = Some(requested_version.unwrap_or(db_version));
1618                db_version
1619            },
1620        };
1621
1622        let Some(version) = *version else {
1623            return debug_assert!(
1624                false,
1625                "An upgrade version should have been determined by now."
1626            );
1627        };
1628
1629        // Step 7: If db’s version is greater than version,
1630        // return a newly created "VersionError" DOMException
1631        // and abort these steps.
1632        if version < db_version {
1633            if sender
1634                .send(ConnectionMsg::VersionError {
1635                    name: db_name.clone(),
1636                    id: *id,
1637                })
1638                .is_err()
1639            {
1640                debug!("Script exit during indexeddb database open");
1641            }
1642            *processed = true;
1643            return;
1644        }
1645
1646        // Step 8: Let connection be a new connection to db.
1647        // Step 9: Set connection’s version to version.
1648        let connection = Connection {
1649            close_pending: false,
1650            sender: sender.clone(),
1651        };
1652        let entry = self.connections.entry(key.clone()).or_default();
1653        entry.insert(*id, connection);
1654
1655        // Step 10: If db’s version is less than version, then:
1656        if db_version < version {
1657            // Step 10.1: Let openConnections be the set of all connections,
1658            // except connection, associated with db.
1659            let open_connections = entry
1660                .iter_mut()
1661                .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1662            for (id_to_close, conn) in open_connections {
1663                // Step 10.2: For each entry of openConnections
1664                // queue a database task to fire a version change event
1665                // named versionchange at entry with db’s version and version.
1666                if conn
1667                    .sender
1668                    .send(ConnectionMsg::VersionChange {
1669                        name: db_name.clone(),
1670                        id: *id_to_close,
1671                        version,
1672                        old_version: db_version,
1673                    })
1674                    .is_err()
1675                {
1676                    error!("Failed to send ConnectionMsg::Connection to script.");
1677                };
1678                pending_close.insert(*id_to_close);
1679                pending_versionchange.insert(*id_to_close);
1680            }
1681            if !pending_close.is_empty() {
1682                // Step 10.3: Wait for all of the events to be fired.
1683                return;
1684            }
1685
1686            // Step 10.6: Run upgrade a database using connection, version and request.
1687            self.upgrade_database(key, version);
1688            return;
1689        }
1690
1691        // Step 11:
1692        let object_store_names = self
1693            .databases
1694            .get(&key)
1695            .and_then(|db| db.object_store_names().ok())
1696            .unwrap_or_default();
1697        *processed = true;
1698        if sender
1699            .send(ConnectionMsg::Connection {
1700                name: db_name.clone(),
1701                id: *id,
1702                version: db_version,
1703                upgraded: false,
1704                object_store_names,
1705            })
1706            .is_err()
1707        {
1708            error!("Failed to send ConnectionMsg::Connection to script.");
1709        };
1710    }
1711
1712    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1713    /// The part adding the request to the connection queue.
1714    fn start_delete_database(
1715        &mut self,
1716        key: IndexedDBDescription,
1717        id: Uuid,
1718        proxy_map: StorageProxyMap,
1719        sender: GenericCallback<BackendResult<u64>>,
1720    ) {
1721        let open_request = OpenRequest::Delete {
1722            sender,
1723            _origin: key.origin.clone(),
1724            db_name: key.name.clone(),
1725            processed: false,
1726            proxy_map,
1727            id,
1728        };
1729
1730        let should_continue = {
1731            // Step 1: Let queue be the connection queue for storageKey and name.
1732            let queue = self.connection_queues.entry(key.clone()).or_default();
1733
1734            // Step 2: Add request to queue.
1735            queue.push_back(open_request);
1736            queue.len() == 1
1737        };
1738
1739        // Step 3: Wait until all previous requests in queue have been processed.
1740        if should_continue {
1741            self.delete_database(key.clone());
1742            self.maybe_remove_front_from_queue(&key);
1743        }
1744    }
1745
1746    /// <https://www.w3.org/TR/IndexedDB/#delete-a-database>
1747    fn delete_database(&mut self, key: IndexedDBDescription) {
1748        let Some(queue) = self.connection_queues.get_mut(&key) else {
1749            return debug_assert!(false, "A connection queue should exist.");
1750        };
1751        let Some(open_request) = queue.front_mut() else {
1752            return debug_assert!(false, "An open request should be in the queue.");
1753        };
1754        let OpenRequest::Delete {
1755            sender,
1756            _origin: _,
1757            db_name,
1758            processed,
1759            id: _,
1760            proxy_map,
1761        } = open_request
1762        else {
1763            return debug_assert!(
1764                false,
1765                "An request to open a connection should be in the queue."
1766            );
1767        };
1768
1769        // Step 4: Let db be the database named name in storageKey, if one exists. Otherwise, return 0 (zero).
1770        let version = if let Some(db) = self.databases.remove(&key) {
1771            // Step 5: Let openConnections be the set of all connections associated with db.
1772            // Step6: For each entry of openConnections that does not have its close pending flag set to true,
1773            // queue a database task to fire a version change event named versionchange
1774            // at entry with db’s version and null.
1775            // Step 7: Wait for all of the events to be fired.
1776            // Step 8: If any of the connections in openConnections are still not closed,
1777            // queue a database task to fire a version change event
1778            // named blocked at request with db’s version and null.
1779            // Step 9: Wait until all connections in openConnections are closed.
1780            // TODO: implement connections.
1781
1782            // Step 10: Let version be db’s version.
1783            let res = db.version();
1784            let Ok(version) = res else {
1785                *processed = true;
1786                if sender
1787                    .send(BackendResult::Err(BackendError::DbErr(
1788                        res.unwrap_err().to_string(),
1789                    )))
1790                    .is_err()
1791                {
1792                    debug!("Script went away during pending database delete.");
1793                }
1794                return;
1795            };
1796
1797            // Note: ensure db is dropped before deleting directory,
1798            // to get around windows file locks.
1799            drop(db);
1800
1801            // Step 11: Delete db.
1802            // If this fails for any reason,
1803            // return an appropriate error (e.g. a QuotaExceededError, or an "UnknownError" DOMException).
1804            let Ok(response) = proxy_map
1805                .handle
1806                .delete_database(proxy_map.bottle_id, db_name.clone())
1807                .recv()
1808            else {
1809                if sender
1810                    .send(BackendResult::Err(BackendError::DbErr(
1811                        "Failed to communicate with client storage.".to_string(),
1812                    )))
1813                    .is_err()
1814                {
1815                    debug!("Script went away during pending database delete.");
1816                }
1817                return;
1818            };
1819            if let Err(err) = response {
1820                if sender
1821                    .send(BackendResult::Err(BackendError::DbErr(format!(
1822                        "Client storage error: {err:?}"
1823                    ))))
1824                    .is_err()
1825                {
1826                    debug!("Script went away during pending database delete.");
1827                }
1828                return;
1829            }
1830            version
1831        } else {
1832            0
1833        };
1834
1835        // step 12: Return version.
1836        if sender.send(BackendResult::Ok(version)).is_err() {
1837            debug!("Script went away during pending database delete.");
1838        }
1839
1840        *processed = true;
1841    }
1842
1843    /// <https://w3c.github.io/IndexedDB/#closing-connection>
1844    fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1845        // Step 1: Set connection’s close pending flag to true.
1846        // TODO: seems like a script only flag.
1847
1848        // Step 2: If the forced flag is true,
1849        // then for each transaction created using connection
1850        // run abort a transaction with transaction and newly created "AbortError" DOMException.
1851        // Step 3: Wait for all transactions created using connection to complete.
1852        // Once they are complete, connection is closed.
1853        // TODO: transaction lifecycle.
1854
1855        // Step 4: If the forced flag is true, then fire an event named close at connection.
1856        // TODO: implement, probably only on the script side of things.
1857
1858        // Note: below we are continuing
1859        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1860        // in the case that an open request is waiting for connections to close.
1861        let key = IndexedDBDescription { origin, name };
1862        let (can_upgrade, version) = {
1863            self.remove_connection(&key, &id);
1864
1865            let Some(queue) = self.connection_queues.get_mut(&key) else {
1866                return;
1867            };
1868            let Some(open_request) = queue.front_mut() else {
1869                return;
1870            };
1871            if let OpenRequest::Open {
1872                sender: _,
1873                db_name: _,
1874                version,
1875                id: _,
1876                processed: _,
1877                pending_upgrade,
1878                pending_versionchange,
1879                pending_close,
1880                proxy_map: _,
1881            } = open_request
1882            {
1883                pending_close.remove(&id);
1884                (
1885                    // Note: need to exclude requests that have already started upgrading.
1886                    pending_close.is_empty() &&
1887                        pending_versionchange.is_empty() &&
1888                        !pending_upgrade.is_some(),
1889                    *version,
1890                )
1891            } else {
1892                (false, None)
1893            }
1894        };
1895
1896        // <https://w3c.github.io/IndexedDB/#open-a-database-connection>
1897        // Step 10.3: Wait for all of the events to be fired.
1898        // Step 10.5: Wait until all connections in openConnections are closed.
1899        // Note: both conditions must be checked here,
1900        // because that is the condition enabling the upgrade to proceed.
1901        if can_upgrade {
1902            // Step 10.6: Run upgrade a database using connection, version and request.
1903            let Some(version) = version else {
1904                return debug_assert!(
1905                    false,
1906                    "An upgrade version should have been determined by now."
1907                );
1908            };
1909            self.upgrade_database(key.clone(), version);
1910
1911            let was_pruned = self.maybe_remove_front_from_queue(&key);
1912            if was_pruned {
1913                self.advance_connection_queue(key);
1914            }
1915        }
1916    }
1917
1918    fn handle_sync_operation(&mut self, operation: SyncOperation) {
1919        match operation {
1920            SyncOperation::GetDatabases(sender, origin) => {
1921                // The in-parallel steps of https://www.w3.org/TR/IndexedDB/#dom-idbfactory-databases
1922
1923                // Step 4.1 Let databases be the set of databases in storageKey.
1924                // If this cannot be determined for any reason,
1925                // then queue a database task to reject p with an appropriate error
1926                // (e.g. an "UnknownError" DOMException) and terminate these steps.
1927                // TODO: separate database and connection concepts.
1928                // For now using `self.databases`, which track connections.
1929
1930                // Step 4.2: Let result be a new list.
1931                let info_list: Vec<DatabaseInfo> = self
1932                    .databases
1933                    .iter()
1934                    .filter_map(|(description, info)| {
1935                        // Step 4.3: For each db of databases:
1936                        if let Ok(version) = info.version() {
1937                            // Step 4.3.4: If db’s version is 0, then continue.
1938                            if version == 0 {
1939                                None
1940                            } else {
1941                                // Step 4.3.5: Let info be a new IDBDatabaseInfo dictionary.
1942                                // Step 4.3.6: Set info’s name dictionary member to db’s name.
1943                                // Step 4.3.7: Set info’s version dictionary member to db’s version.
1944                                // Step 4.3.8: Append info to result.
1945                                if description.origin == origin {
1946                                    Some(DatabaseInfo {
1947                                        name: description.name.clone(),
1948                                        version,
1949                                    })
1950                                } else {
1951                                    None
1952                                }
1953                            }
1954                        } else {
1955                            None
1956                        }
1957                    })
1958                    .collect();
1959
1960                // IndexedDB `databases()` / "get a list of databases" returns the visible list;
1961                // filtering out non-user-visible entries must not turn success into an internal error.
1962                // https://w3c.github.io/IndexedDB/#dom-idbfactory-databases
1963                let result = Ok(info_list);
1964
1965                // Step 4.4: Queue a database task to resolve p with result.
1966                if sender.send(result).is_err() {
1967                    debug!("Couldn't send SyncOperation::GetDatabases reply.");
1968                }
1969            },
1970            SyncOperation::CloseDatabase(origin, id, db_name) => {
1971                self.close_database(origin, id, db_name);
1972            },
1973            SyncOperation::OpenDatabase(sender, origin, db_name, version, id, proxy_map) => {
1974                self.open_a_database_connection(sender, origin, db_name, version, id, proxy_map);
1975            },
1976            SyncOperation::AbortPendingUpgrades {
1977                pending_upgrades,
1978                origin,
1979                proxy_map,
1980            } => {
1981                self.abort_pending_upgrades(pending_upgrades, origin, proxy_map);
1982            },
1983            SyncOperation::DeleteDatabase(callback, origin, db_name, proxy_map, id) => {
1984                let idb_description = IndexedDBDescription {
1985                    origin,
1986                    name: db_name,
1987                };
1988                self.start_delete_database(idb_description, id, proxy_map, callback);
1989            },
1990            SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1991                // FIXME:(arihant2math) Should we error out more aggressively here?
1992                let result = self.get_database(origin, db_name).map(|db| {
1993                    let key_generator_current_number = db.key_generator_current_number(&store_name);
1994                    IndexedDBObjectStore {
1995                        key_path: db.key_path(&store_name),
1996                        has_key_generator: key_generator_current_number.is_some(),
1997                        key_generator_current_number,
1998                        indexes: db.indexes(&store_name).unwrap_or_default(),
1999                        name: store_name,
2000                    }
2001                });
2002                let _ = sender.send(result.ok_or(BackendError::DbNotFound));
2003            },
2004            SyncOperation::CreateIndex(
2005                origin,
2006                db_name,
2007                store_name,
2008                index_name,
2009                key_path,
2010                unique,
2011                multi_entry,
2012            ) => {
2013                if let Some(db) = self.get_database(origin, db_name) {
2014                    let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
2015                }
2016            },
2017            SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
2018                if let Some(db) = self.get_database(origin, db_name) {
2019                    let _ = db.delete_index(&store_name, index_name);
2020                }
2021            },
2022            SyncOperation::Commit(callback, origin, db_name, txn) => {
2023                // https://w3c.github.io/IndexedDB/#commit-a-transaction
2024                // TODO: implement the commit algorithm and only reply after the backend has
2025                // transitioned the transaction to committed/aborted (should be atomic).
2026                if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2027                    if db.can_commit_now(txn) {
2028                        if callback
2029                            .send(TxnCompleteMsg {
2030                                origin: origin.clone(),
2031                                db_name: db_name.clone(),
2032                                txn,
2033                                result: Ok(()),
2034                            })
2035                            .is_err()
2036                        {
2037                            error!(
2038                                "Failed to send immediate commit completion for db '{}' txn {}.",
2039                                db_name, txn
2040                            );
2041                        }
2042                    } else {
2043                        db.queue_pending_commit_callback(txn, callback);
2044                    }
2045                    db.schedule_transactions(origin, &db_name);
2046                } else if callback
2047                    .send(TxnCompleteMsg {
2048                        origin,
2049                        db_name: db_name.clone(),
2050                        txn,
2051                        // If the database entry has already been removed, treat commit as a
2052                        // no-op success so script side completion does not spuriously abort.
2053                        result: Ok(()),
2054                    })
2055                    .is_err()
2056                {
2057                    error!(
2058                        "Failed to send commit completion for missing db '{}' txn {}.",
2059                        db_name, txn
2060                    );
2061                }
2062            },
2063            SyncOperation::Abort(abort_callback, origin, db_name, txn) => {
2064                // https://w3c.github.io/IndexedDB/#abort-a-transaction
2065                // “When a transaction is aborted the implementation must undo (roll back) any changes that were made to the database during that transaction.”
2066                // TODO: implement the abort algorithm and rollback for the engine.
2067                let pending_commit_callbacks =
2068                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2069                        let callbacks = db.take_pending_commit_callbacks(txn);
2070                        db.abort_transaction(txn);
2071                        callbacks
2072                    } else {
2073                        Vec::new()
2074                    };
2075                if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2076                    db.schedule_transactions(origin.clone(), &db_name);
2077                }
2078                for callback in pending_commit_callbacks {
2079                    if callback
2080                        .send(storage_traits::indexeddb::TxnCompleteMsg {
2081                            origin: origin.clone(),
2082                            db_name: db_name.clone(),
2083                            txn,
2084                            result: Err(BackendError::Abort),
2085                        })
2086                        .is_err()
2087                    {
2088                        error!(
2089                            "Failed to send deferred abort completion for db '{}' txn {}.",
2090                            db_name, txn
2091                        );
2092                    }
2093                }
2094                if abort_callback
2095                    .send(storage_traits::indexeddb::TxnCompleteMsg {
2096                        origin,
2097                        db_name: db_name.clone(),
2098                        txn,
2099                        result: Err(BackendError::Abort),
2100                    })
2101                    .is_err()
2102                {
2103                    error!(
2104                        "Failed to send abort completion for db '{}' txn {}.",
2105                        db_name, txn
2106                    );
2107                }
2108            },
2109            SyncOperation::UpgradeTransactionFinished {
2110                origin,
2111                db_name,
2112                txn,
2113                committed,
2114            } => {
2115                self.handle_upgrade_transaction_finished(db_name, origin, txn, committed);
2116            },
2117            SyncOperation::RequestHandled {
2118                origin,
2119                db_name,
2120                txn,
2121                request_id,
2122            } => {
2123                // https://w3c.github.io/IndexedDB/#transaction-lifecycl
2124                // The implementation must attempt to commit an inactive transaction
2125                // when all requests placed against the transaction have completed
2126                // and their returned results handled, no new requests have been
2127                // placed against the transaction, and the transaction has not been aborted
2128
2129                let should_notify =
2130                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2131                        db.mark_request_handled(txn, request_id);
2132                        db.can_notify_txn_maybe_commit(txn)
2133                    } else {
2134                        false
2135                    };
2136                if should_notify {
2137                    self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
2138                        origin,
2139                        db_name,
2140                        txn,
2141                    });
2142                }
2143            },
2144            SyncOperation::TxnMaybeCommit {
2145                origin,
2146                db_name,
2147                txn,
2148            } => {
2149                self.handle_txn_maybe_commit(origin, db_name, txn);
2150            },
2151            SyncOperation::TransactionFinished {
2152                origin,
2153                db_name,
2154                txn,
2155            } => {
2156                let maybe_commit_txns =
2157                    if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2158                        db.finish_transaction(txn);
2159                        db.schedule_transactions(origin.clone(), &db_name);
2160                        db.maybe_commit_candidates()
2161                    } else {
2162                        Vec::new()
2163                    };
2164                for candidate in maybe_commit_txns {
2165                    self.handle_txn_maybe_commit(origin.clone(), db_name.clone(), candidate);
2166                }
2167            },
2168            SyncOperation::CreateTransaction {
2169                sender,
2170                origin,
2171                db_name,
2172                mode,
2173                scope,
2174            } => {
2175                let key = IndexedDBDescription {
2176                    origin: origin.clone(),
2177                    name: db_name.clone(),
2178                };
2179                if let Some(db) = self.databases.get_mut(&key) {
2180                    let transaction_id = self.serial_number_counter;
2181                    self.serial_number_counter += 1;
2182                    db.register_transaction(transaction_id, mode, scope);
2183                    db.schedule_transactions(origin, &db_name);
2184                    let _ = sender.send(Ok(transaction_id));
2185                } else {
2186                    let _ = sender.send(Err(BackendError::DbNotFound));
2187                }
2188            },
2189            SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
2190                if let Some(db) = self.get_database_mut(origin, db_name) {
2191                    if version > db.version().unwrap_or(0) {
2192                        let _ = db.set_version(version);
2193                    }
2194                    // erroring out if the version is not upgraded can be and non-replicable
2195                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2196                } else {
2197                    let _ = sender.send(Err(BackendError::DbNotFound));
2198                }
2199            },
2200            SyncOperation::CreateObjectStore(
2201                sender,
2202                origin,
2203                db_name,
2204                store_name,
2205                key_paths,
2206                auto_increment,
2207            ) => {
2208                if let Some(db) = self.get_database_mut(origin, db_name) {
2209                    let result = db.create_object_store(&store_name, key_paths, auto_increment);
2210                    let _ = sender.send(result.map_err(BackendError::from));
2211                } else {
2212                    let _ = sender.send(Err(BackendError::DbNotFound));
2213                }
2214            },
2215            SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
2216                if let Some(db) = self.get_database_mut(origin, db_name) {
2217                    let result = db.delete_object_store(&store_name);
2218                    let _ = sender.send(result.map_err(BackendError::from));
2219                } else {
2220                    let _ = sender.send(Err(BackendError::DbNotFound));
2221                }
2222            },
2223            SyncOperation::Version(sender, origin, db_name) => {
2224                if let Some(db) = self.get_database(origin, db_name) {
2225                    let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2226                } else {
2227                    let _ = sender.send(Err(BackendError::DbNotFound));
2228                }
2229            },
2230            SyncOperation::NotifyEndOfVersionChange {
2231                name,
2232                id,
2233                old_version,
2234                origin,
2235            } => {
2236                self.handle_version_change_done(name, id, old_version, origin);
2237            },
2238            SyncOperation::Exit(_) => {
2239                unreachable!("We must've already broken out of event loop.");
2240            },
2241        }
2242    }
2243
2244    fn collect_memory_reports(&self) -> Vec<Report> {
2245        let mut reports = vec![];
2246        perform_memory_report(|ops| {
2247            reports.push(Report {
2248                path: path!["indexeddb"],
2249                kind: ReportKind::ExplicitJemallocHeapSize,
2250                size: self.connections.size_of(ops) +
2251                    self.databases.size_of(ops) +
2252                    self.connection_queues.size_of(ops),
2253            });
2254        });
2255        reports
2256    }
2257}