1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use std::thread;
12
13use log::{debug, error, warn};
14use malloc_size_of::MallocSizeOf;
15use malloc_size_of_derive::MallocSizeOf;
16use profile_traits::generic_callback::GenericCallback;
17use profile_traits::mem::{
18 ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, perform_memory_report,
19};
20use profile_traits::path;
21use rusqlite::Error as RusqliteError;
22use rustc_hash::{FxHashMap, FxHashSet};
23use servo_base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
24use servo_base::threadpool::ThreadPool;
25use servo_url::origin::ImmutableOrigin;
26use storage_traits::client_storage::StorageProxyMap;
27use storage_traits::indexeddb::{
28 AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
29 DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
30 SyncOperation, TxnCompleteMsg,
31};
32use uuid::Uuid;
33
34use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
35use crate::shared::is_sqlite_disk_full_error;
36
37pub trait IndexedDBThreadFactory {
38 fn new(mem_profiler_chan: MemProfilerChan, reporter_name: String) -> Self;
39}
40
41impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
42 fn new(
43 mem_profiler_chan: MemProfilerChan,
44 reporter_name: String,
45 ) -> GenericSender<IndexedDBThreadMsg> {
46 let (chan, port) = generic_channel::channel().unwrap();
47 let chan2 = chan.clone();
48
49 let manager_sender = chan.clone();
50
51 thread::Builder::new()
52 .name("IndexedDBManager".to_owned())
53 .spawn(move || {
54 mem_profiler_chan.run_with_memory_reporting(
55 || IndexedDBManager::new(port, manager_sender).start(),
56 reporter_name,
57 chan2,
58 IndexedDBThreadMsg::CollectMemoryReport,
59 );
60 })
61 .expect("Thread spawning failed");
62
63 chan
64 }
65}
66
67#[derive(Clone, Debug, Eq, Hash, MallocSizeOf, PartialEq)]
69pub struct IndexedDBDescription {
70 pub origin: ImmutableOrigin,
71 pub name: String,
72}
73
74#[derive(MallocSizeOf)]
75struct TxnInfo {
76 created_seq: u64,
77 mode: IndexedDBTxnMode,
78 scope: HashSet<String>,
79 live: bool,
80}
81
82#[derive(MallocSizeOf)]
83struct IndexedDBEnvironment<E: KvsEngine> {
84 engine: E,
85 manager_sender: GenericSender<IndexedDBThreadMsg>,
86 transactions: FxHashMap<u64, KvsTransaction>,
87 next_created_seq: u64,
88 txn_info: FxHashMap<u64, TxnInfo>,
89 queued_readwrite: VecDeque<u64>,
90 queued_readonly: VecDeque<u64>,
91 queued_readwrite_set: FxHashSet<u64>,
93 queued_readonly_set: FxHashSet<u64>,
94 running_readwrite: Option<u64>,
95 running_readonly: HashSet<u64>,
96 handled_next_unhandled_request_id: FxHashMap<u64, u64>,
97 handled_pending: FxHashMap<u64, HashSet<u64>>,
98 pending_commit_callbacks: FxHashMap<u64, Vec<GenericCallback<TxnCompleteMsg>>>,
99}
100
101impl<E: KvsEngine> IndexedDBEnvironment<E> {
102 fn new(
103 engine: E,
104 manager_sender: GenericSender<IndexedDBThreadMsg>,
105 ) -> IndexedDBEnvironment<E> {
106 IndexedDBEnvironment {
107 engine,
108 manager_sender,
109 transactions: FxHashMap::default(),
110 next_created_seq: 0,
111 txn_info: FxHashMap::default(),
112 queued_readwrite: VecDeque::new(),
113 queued_readonly: VecDeque::new(),
114 queued_readwrite_set: FxHashSet::default(),
115 queued_readonly_set: FxHashSet::default(),
116 running_readwrite: None,
117 running_readonly: HashSet::new(),
118 handled_next_unhandled_request_id: FxHashMap::default(),
119 handled_pending: FxHashMap::default(),
120 pending_commit_callbacks: FxHashMap::default(),
121 }
122 }
123
124 fn register_transaction(&mut self, txn: u64, mode: IndexedDBTxnMode, scope: Vec<String>) {
125 if self.txn_info.contains_key(&txn) {
126 return;
127 }
128 let created_seq = self.next_created_seq;
129 self.next_created_seq += 1;
130 self.txn_info.insert(
131 txn,
132 TxnInfo {
133 created_seq,
134 mode: mode.clone(),
135 scope: scope.into_iter().collect(),
136 live: true,
137 },
138 );
139 self.transactions
140 .entry(txn)
141 .or_insert_with(|| KvsTransaction {
142 requests: VecDeque::new(),
143 mode,
144 });
145 }
146
147 fn scopes_overlap(a: &TxnInfo, b: &TxnInfo) -> bool {
148 a.scope.iter().any(|store| b.scope.contains(store))
149 }
150
151 fn earlier_overlapping_live_exists<F>(&self, txn: u64, predicate: F) -> bool
152 where
153 F: Fn(&TxnInfo) -> bool,
154 {
155 let Some(current) = self.txn_info.get(&txn) else {
156 return false;
157 };
158 self.txn_info.iter().any(|(other_txn, other)| {
159 *other_txn != txn &&
160 other.live &&
161 other.created_seq < current.created_seq &&
162 Self::scopes_overlap(current, other) &&
163 predicate(other)
164 })
165 }
166
167 fn can_start_by_spec(&self, txn: u64) -> bool {
168 let Some(info) = self.txn_info.get(&txn) else {
169 return false;
170 };
171 match info.mode {
172 IndexedDBTxnMode::Readonly => !self.earlier_overlapping_live_exists(txn, |other| {
173 other.mode != IndexedDBTxnMode::Readonly
174 }),
175 IndexedDBTxnMode::Readwrite | IndexedDBTxnMode::Versionchange => {
176 !self.earlier_overlapping_live_exists(txn, |_other| true)
177 },
178 }
179 }
180
181 fn enqueue_txn(&mut self, txn: u64, mode: &IndexedDBTxnMode) {
182 match mode {
183 IndexedDBTxnMode::Readonly => {
184 if self.queued_readonly_set.insert(txn) {
185 self.queued_readonly.push_back(txn);
186 }
187 },
188 _ => {
189 if self.queued_readwrite_set.insert(txn) {
190 self.queued_readwrite.push_back(txn);
191 }
192 },
193 }
194 }
195
196 fn queue_operation(
197 &mut self,
198 store_name: &str,
199 serial_number: u64,
200 mode: IndexedDBTxnMode,
201 operation: AsyncOperation,
202 ) {
203 let mut enqueue_mode = None;
204 match self.transactions.entry(serial_number) {
205 Entry::Occupied(mut entry) => {
206 let transaction = entry.get_mut();
207 let transaction_mode = transaction.mode.clone();
208 let was_empty = transaction.requests.is_empty();
209 transaction.requests.push_back(KvsOperation {
210 operation,
211 store_name: String::from(store_name),
212 });
213 if was_empty {
214 enqueue_mode = Some(transaction_mode);
218 }
219 },
220 Entry::Vacant(entry) => {
221 entry
222 .insert(KvsTransaction {
223 requests: VecDeque::new(),
224 mode: mode.clone(),
225 })
226 .requests
227 .push_back(KvsOperation {
228 operation,
229 store_name: String::from(store_name),
230 });
231 enqueue_mode = Some(mode);
232 },
233 };
234 if let Some(mode) = enqueue_mode {
235 self.enqueue_txn(serial_number, &mode);
236 }
237 }
238
239 fn schedule_transactions(&mut self, origin: ImmutableOrigin, db_name: &str) {
241 let readonly_len = self.queued_readonly.len();
242 for _ in 0..readonly_len {
243 let Some(txn) = self.queued_readonly.pop_front() else {
244 break;
245 };
246
247 self.queued_readonly_set.remove(&txn);
248
249 let Some(transaction) = self.transactions.get(&txn) else {
250 continue;
251 };
252 if self.running_readonly.contains(&txn) {
253 if !transaction.requests.is_empty() && self.queued_readonly_set.insert(txn) {
254 self.queued_readonly.push_back(txn);
255 }
256 continue;
257 }
258 if transaction.requests.is_empty() {
259 continue;
260 }
261 if !self.can_start_by_spec(txn) {
262 if self.queued_readonly_set.insert(txn) {
263 self.queued_readonly.push_back(txn);
264 }
265 continue;
266 }
267
268 self.running_readonly.insert(txn);
271 self.start_transaction(origin.clone(), db_name.to_string(), txn, None);
272 }
273
274 if self.running_readwrite.is_some() {
276 return;
277 }
278 let readwrite_len = self.queued_readwrite.len();
279 for _ in 0..readwrite_len {
280 let Some(txn) = self.queued_readwrite.pop_front() else {
281 break;
282 };
283
284 self.queued_readwrite_set.remove(&txn);
285
286 let Some(transaction) = self.transactions.get(&txn) else {
287 continue;
288 };
289
290 if self.running_readwrite == Some(txn) {
291 if !transaction.requests.is_empty() && self.queued_readwrite_set.insert(txn) {
292 self.queued_readwrite.push_back(txn);
293 }
294 continue;
295 }
296
297 if transaction.requests.is_empty() {
298 continue;
299 }
300 if !self.can_start_by_spec(txn) {
301 if self.queued_readwrite_set.insert(txn) {
302 self.queued_readwrite.push_back(txn);
303 }
304 continue;
305 }
306
307 self.running_readwrite = Some(txn);
310 self.start_transaction(origin, db_name.to_string(), txn, None);
311 return;
312 }
313 }
314
315 fn start_transaction(
317 &mut self,
318 origin: ImmutableOrigin,
319 db_name: String,
320 txn: u64,
321 sender: Option<GenericSender<BackendResult<()>>>,
322 ) {
323 let (mode, requests) = match self.transactions.get_mut(&txn) {
328 Some(transaction) => {
329 let mode = transaction.mode.clone();
330 let requests = std::mem::take(&mut transaction.requests);
331 (mode, requests)
332 },
333 None => {
334 if let Some(sender) = sender {
336 let _ = sender.send(Ok(()));
337 }
338 return;
339 },
340 };
341
342 if requests.is_empty() {
343 if let Some(sender) = sender {
344 let _ = sender.send(Ok(()));
345 }
346 return;
349 }
350
351 let manager_sender = self.manager_sender.clone();
352 self.engine.process_transaction(
353 KvsTransaction { mode, requests },
354 Box::new(move || {
355 if let Err(err) = manager_sender.send(IndexedDBThreadMsg::EngineTxnBatchComplete {
359 origin,
360 db_name,
361 txn,
362 }) {
363 error!(
364 "Failed to send IndexedDBThreadMsg::EngineTxnBatchComplete: {:?}",
365 err
366 );
367 }
370
371 if let Some(sender) = sender {
374 let _ = sender.send(Ok(()));
375 }
376 }),
377 );
378 }
379
380 fn mark_request_handled(&mut self, txn: u64, request_id: u64) {
381 let current = self
382 .handled_next_unhandled_request_id
383 .get(&txn)
384 .copied()
385 .unwrap_or(0);
386 if request_id == current {
387 let mut next = current + 1;
388 if let Some(pending) = self.handled_pending.get_mut(&txn) {
389 while pending.remove(&next) {
390 next += 1;
391 }
392 if pending.is_empty() {
393 self.handled_pending.remove(&txn);
394 }
395 }
396 self.handled_next_unhandled_request_id.insert(txn, next);
397 } else if request_id > current {
398 self.handled_pending
399 .entry(txn)
400 .or_default()
401 .insert(request_id);
402 }
403 }
404
405 fn can_notify_txn_maybe_commit(&self, txn: u64) -> bool {
406 if self.running_readwrite == Some(txn) || self.running_readonly.contains(&txn) {
407 return false;
408 }
409 if self.queued_readonly_set.contains(&txn) || self.queued_readwrite_set.contains(&txn) {
411 return false;
412 }
413 match self.transactions.get(&txn) {
414 Some(t) => t.requests.is_empty(),
415 None => true,
416 }
417 }
418
419 fn can_commit_now(&self, txn: u64) -> bool {
420 self.can_start_by_spec(txn) && self.can_notify_txn_maybe_commit(txn)
421 }
422
423 fn queue_pending_commit_callback(
424 &mut self,
425 txn: u64,
426 callback: GenericCallback<TxnCompleteMsg>,
427 ) {
428 self.pending_commit_callbacks
429 .entry(txn)
430 .or_default()
431 .push(callback);
432 }
433
434 fn take_pending_commit_callbacks(&mut self, txn: u64) -> Vec<GenericCallback<TxnCompleteMsg>> {
435 self.pending_commit_callbacks
436 .remove(&txn)
437 .unwrap_or_default()
438 }
439
440 fn maybe_commit_candidates(&self) -> Vec<u64> {
441 let mut candidates: Vec<(u64, u64)> = self
442 .txn_info
443 .iter()
444 .filter_map(|(txn, info)| {
445 if !info.live || !self.can_commit_now(*txn) {
446 return None;
447 }
448 Some((*txn, info.created_seq))
449 })
450 .collect();
451 candidates.sort_by_key(|(_, created_seq)| *created_seq);
452 candidates.into_iter().map(|(txn, _)| txn).collect()
453 }
454
455 fn finish_transaction(&mut self, txn: u64) {
456 if let Some(info) = self.txn_info.get_mut(&txn) {
457 info.live = false;
458 }
459 self.txn_info.remove(&txn);
460 self.transactions.remove(&txn);
461 self.queued_readonly.retain(|queued| *queued != txn);
462 self.queued_readwrite.retain(|queued| *queued != txn);
463 self.queued_readonly_set.remove(&txn);
464 self.queued_readwrite_set.remove(&txn);
465 if self.running_readwrite == Some(txn) {
466 self.running_readwrite = None;
467 }
468 self.running_readonly.remove(&txn);
469 self.handled_next_unhandled_request_id.remove(&txn);
470 self.handled_pending.remove(&txn);
471 self.pending_commit_callbacks.remove(&txn);
472 }
473
474 fn abort_transaction(&mut self, txn: u64) {
475 self.transactions.remove(&txn);
478 self.queued_readonly.retain(|queued| *queued != txn);
479 self.queued_readwrite.retain(|queued| *queued != txn);
480 self.queued_readonly_set.remove(&txn);
481 self.queued_readwrite_set.remove(&txn);
482 if self.running_readwrite == Some(txn) {
483 self.running_readwrite = None;
484 }
485 self.running_readonly.remove(&txn);
486 self.handled_next_unhandled_request_id.remove(&txn);
487 self.handled_pending.remove(&txn);
488 self.pending_commit_callbacks.remove(&txn);
489 }
490
491 fn key_generator_current_number(&self, store_name: &str) -> Option<i32> {
492 self.engine.key_generator_current_number(store_name)
493 }
494
495 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
496 self.engine.key_path(store_name)
497 }
498
499 fn object_store_names(&self) -> DbResult<Vec<String>> {
500 self.engine
501 .object_store_names()
502 .map_err(|err| format!("{err:?}"))
503 }
504
505 fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
506 self.engine
507 .indexes(store_name)
508 .map_err(|err| format!("{err:?}"))
509 }
510
511 fn create_index(
512 &self,
513 store_name: &str,
514 index_name: String,
515 key_path: KeyPath,
516 unique: bool,
517 multi_entry: bool,
518 ) -> DbResult<CreateObjectResult> {
519 self.engine
520 .create_index(store_name, index_name, key_path, unique, multi_entry)
521 .map_err(|err| format!("{err:?}"))
522 }
523
524 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
525 self.engine
526 .delete_index(store_name, index_name)
527 .map_err(|err| format!("{err:?}"))
528 }
529
530 fn create_object_store(
531 &mut self,
532 store_name: &str,
533 key_path: Option<KeyPath>,
534 auto_increment: bool,
535 ) -> DbResult<CreateObjectResult> {
536 self.engine
537 .create_store(store_name, key_path, auto_increment)
538 .map_err(|err| format!("{err:?}"))
539 }
540
541 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
542 let indexes = self
545 .engine
546 .indexes(store_name)
547 .map_err(|err| format!("{err:?}"))?;
548 for index in indexes {
549 self.engine
550 .delete_index(store_name, index.name)
551 .map_err(|err| format!("{err:?}"))?;
552 }
553 self.engine
554 .delete_store(store_name)
555 .map_err(|err| format!("{err:?}"))
556 }
557
558 fn version(&self) -> Result<u64, E::Error> {
559 self.engine.version()
560 }
561
562 fn set_version(&mut self, version: u64) -> DbResult<()> {
563 self.engine
564 .set_version(version)
565 .map_err(|err| format!("{err:?}"))
566 }
567}
568
569fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
570 if is_sqlite_disk_full_error(&err) {
571 BackendError::QuotaExceeded
572 } else {
573 BackendError::DbErr(format!("{err:?}"))
574 }
575}
576
577#[derive(MallocSizeOf)]
581enum OpenRequest {
582 Open {
583 sender: GenericCallback<ConnectionMsg>,
585
586 db_name: String,
588
589 version: Option<u64>,
592
593 processed: bool,
595
596 pending_upgrade: Option<VersionUpgrade>,
598
599 pending_close: HashSet<Uuid>,
601
602 pending_versionchange: HashSet<Uuid>,
607
608 id: Uuid,
609
610 proxy_map: StorageProxyMap,
612 },
613 Delete {
614 sender: GenericCallback<BackendResult<u64>>,
616
617 _origin: ImmutableOrigin,
618
619 db_name: String,
622
623 processed: bool,
625
626 id: Uuid,
627
628 proxy_map: StorageProxyMap,
630 },
631}
632
633impl OpenRequest {
634 fn get_id(&self) -> Uuid {
635 let id = match self {
636 OpenRequest::Open {
637 sender: _,
638 db_name: _,
639 version: _,
640 processed: _,
641 pending_upgrade: _,
642 pending_close: _,
643 pending_versionchange: _,
644 id,
645 proxy_map: _,
646 } => id,
647 OpenRequest::Delete {
648 sender: _,
649 _origin: _,
650 db_name: _,
651 processed: _,
652 proxy_map: _,
653 id,
654 } => id,
655 };
656 *id
657 }
658
659 fn is_open(&self) -> bool {
660 match self {
661 OpenRequest::Open {
662 sender: _,
663 db_name: _,
664 version: _,
665 processed: _,
666 pending_upgrade: _,
667 pending_close: _,
668 pending_versionchange: _,
669 id: _,
670 proxy_map: _,
671 } => true,
672 OpenRequest::Delete {
673 sender: _,
674 _origin: _,
675 db_name: _,
676 processed: _,
677 proxy_map: _,
678 id: _,
679 } => false,
680 }
681 }
682
683 fn is_pending(&self) -> bool {
686 match self {
687 OpenRequest::Open {
688 sender: _,
689 db_name: _,
690 version: _,
691 processed,
692 pending_upgrade,
693 pending_close,
694 pending_versionchange,
695 id: _,
696 proxy_map: _,
697 } => {
698 !processed ||
699 pending_upgrade.is_some() ||
700 !pending_close.is_empty() ||
701 !pending_versionchange.is_empty()
702 },
703 OpenRequest::Delete {
704 sender: _,
705 _origin: _,
706 db_name: _,
707 processed,
708 id: _,
709 proxy_map: _,
710 } => !processed,
711 }
712 }
713
714 fn abort(&self) -> Option<u64> {
717 match self {
718 OpenRequest::Open {
719 sender,
720 db_name,
721 version: _,
722 processed: _,
723 pending_close: _,
724 pending_versionchange: _,
725 pending_upgrade,
726 id,
727 proxy_map: _,
728 } => {
729 if sender
730 .send(ConnectionMsg::AbortError {
731 name: db_name.clone(),
732 id: *id,
733 })
734 .is_err()
735 {
736 error!("Failed to send ConnectionMsg::Connection to script.");
737 };
738 pending_upgrade.as_ref().map(|upgrade| upgrade.old)
739 },
740 OpenRequest::Delete {
741 sender,
742 _origin: _,
743 db_name: _,
744 processed: _,
745 id: _,
746 proxy_map: _,
747 } => {
748 if sender.send(Err(BackendError::DbNotFound)).is_err() {
749 error!("Failed to send result of database delete to script.");
750 };
751 None
752 },
753 }
754 }
755}
756
757#[derive(MallocSizeOf)]
758struct VersionUpgrade {
759 old: u64,
760 new: u64,
761 transaction: u64,
762}
763
764#[derive(MallocSizeOf)]
766struct Connection {
767 close_pending: bool,
769
770 sender: GenericCallback<ConnectionMsg>,
772}
773
774struct IndexedDBManager {
775 port: GenericReceiver<IndexedDBThreadMsg>,
776 manager_sender: GenericSender<IndexedDBThreadMsg>,
777 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
778 thread_pool: Arc<ThreadPool>,
779
780 serial_number_counter: u64,
785
786 connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
788
789 connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
791}
792
793impl IndexedDBManager {
794 fn new(
795 port: GenericReceiver<IndexedDBThreadMsg>,
796 manager_sender: GenericSender<IndexedDBThreadMsg>,
797 ) -> IndexedDBManager {
798 debug!("New indexedDBManager");
799
800 IndexedDBManager {
801 port,
802 manager_sender,
803 databases: HashMap::new(),
804 thread_pool: ThreadPool::global(),
805 serial_number_counter: 0,
806 connection_queues: Default::default(),
807 connections: Default::default(),
808 }
809 }
810}
811
812impl IndexedDBManager {
813 fn start(&mut self) {
814 loop {
815 let message = match self.port.recv() {
818 Ok(msg) => msg,
819 Err(ReceiveError::Disconnected) => {
820 break;
821 },
822 Err(e) => {
823 warn!("Error in IndexedDB thread: {e:?}");
824 continue;
825 },
826 };
827 match message {
828 IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
829 let _ = sender.send(());
830 break;
831 },
832 IndexedDBThreadMsg::Sync(operation) => {
833 self.handle_sync_operation(operation);
834 },
835 IndexedDBThreadMsg::Async(
836 origin,
837 db_name,
838 store_name,
839 txn,
840 _request_id,
841 mode,
842 operation,
843 ) => {
844 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
845 db.queue_operation(&store_name, txn, mode, operation);
847 db.schedule_transactions(origin, &db_name);
848 }
849 },
850 IndexedDBThreadMsg::EngineTxnBatchComplete {
851 origin,
852 db_name,
853 txn,
854 } => {
855 let should_notify =
856 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
857 let mode = db.transactions.get(&txn).map(|t| t.mode.clone());
859
860 match mode {
861 Some(IndexedDBTxnMode::Readonly) => {
862 db.running_readonly.remove(&txn);
863 },
864 Some(_) if db.running_readwrite == Some(txn) => {
865 db.running_readwrite = None;
866 },
867 Some(_) => {},
868 None => {
869 },
871 }
872
873 db.schedule_transactions(origin.clone(), &db_name);
876 db.can_notify_txn_maybe_commit(txn)
877 } else {
878 false
879 };
880 if should_notify {
881 self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
882 origin,
883 db_name,
884 txn,
885 });
886 }
887 },
888 IndexedDBThreadMsg::CollectMemoryReport(sender) => {
889 let reports = self.collect_memory_reports();
890 sender.send(ProcessReports::new(reports));
891 },
892 }
893 }
894 }
895
896 fn dispatch_txn_maybe_commit(&self, origin: ImmutableOrigin, db_name: String, txn: u64) {
897 let key = IndexedDBDescription {
898 origin,
899 name: db_name.clone(),
900 };
901 let Some(connections) = self.connections.get(&key) else {
902 return;
903 };
904 for connection in connections.values() {
905 if connection.close_pending {
906 continue;
907 }
908 let _ = connection.sender.send(ConnectionMsg::TxnMaybeCommit {
909 db_name: db_name.clone(),
910 txn,
911 });
912 }
913 }
914
915 fn handle_txn_maybe_commit(&mut self, origin: ImmutableOrigin, db_name: String, txn: u64) {
916 let key = IndexedDBDescription {
917 origin: origin.clone(),
918 name: db_name.clone(),
919 };
920 let callbacks = {
921 let Some(db) = self.databases.get_mut(&key) else {
922 return;
923 };
924 if !db.can_commit_now(txn) {
925 return;
926 }
927 db.take_pending_commit_callbacks(txn)
928 };
929
930 if callbacks.is_empty() {
931 self.dispatch_txn_maybe_commit(origin, db_name, txn);
932 return;
933 }
934
935 for callback in callbacks {
936 if callback
937 .send(TxnCompleteMsg {
938 origin: origin.clone(),
939 db_name: db_name.clone(),
940 txn,
941 result: Ok(()),
942 })
943 .is_err()
944 {
945 error!(
946 "Failed to send deferred commit completion for db '{}' txn {}.",
947 db_name, txn
948 );
949 }
950 }
951 }
952
953 fn handle_upgrade_transaction_finished(
955 &mut self,
956 name: String,
957 origin: ImmutableOrigin,
958 txn: u64,
959 committed: bool,
960 ) {
961 let key = IndexedDBDescription {
962 name: name.clone(),
963 origin: origin.clone(),
964 };
965
966 if committed {
967 let Some(queue) = self.connection_queues.get_mut(&key) else {
968 return debug_assert!(false, "A connection queue should exist.");
969 };
970 let Some(front) = queue.front() else {
971 return debug_assert!(false, "A pending open request should exist.");
972 };
973 let OpenRequest::Open {
974 pending_upgrade: Some(pending_upgrade),
975 ..
976 } = front
977 else {
978 return;
979 };
980 if pending_upgrade.transaction != txn {
981 return;
982 }
983
984 let Some(open_request) = queue.pop_front() else {
985 return;
986 };
987 let OpenRequest::Open {
988 sender,
989 db_name,
990 version: _,
991 processed: _,
992 pending_upgrade: Some(pending_upgrade),
993 pending_close: _,
994 pending_versionchange: _,
995 id,
996 proxy_map: _,
997 } = open_request
998 else {
999 return;
1000 };
1001 let VersionUpgrade { new, .. } = pending_upgrade;
1002 let object_store_names = self
1003 .databases
1004 .get(&key)
1005 .and_then(|db| db.object_store_names().ok())
1006 .unwrap_or_default();
1007 if sender
1008 .send(ConnectionMsg::Connection {
1009 id,
1010 name: db_name,
1011 version: new,
1012 upgraded: true,
1013 object_store_names,
1014 })
1015 .is_err()
1016 {
1017 error!("Failed to send ConnectionMsg::Connection to script.");
1018 };
1019
1020 self.advance_connection_queue(key);
1021 return;
1022 }
1023
1024 let (request_id, proxy_map, db_name) = {
1025 let Some(queue) = self.connection_queues.get_mut(&key) else {
1026 return debug_assert!(false, "A connection queue should exist.");
1027 };
1028 let Some(front) = queue.front() else {
1029 return debug_assert!(false, "A pending open request should exist.");
1030 };
1031 let OpenRequest::Open {
1032 pending_upgrade: Some(pending_upgrade),
1033 id,
1034 proxy_map,
1035 db_name,
1036 ..
1037 } = front
1038 else {
1039 return;
1040 };
1041 if pending_upgrade.transaction != txn {
1042 return;
1043 }
1044 (*id, (*proxy_map).clone(), db_name.clone())
1045 };
1046
1047 self.abort_pending_upgrade(name, request_id, origin, db_name, &proxy_map);
1048 }
1049
1050 fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
1052 loop {
1053 let is_open = {
1054 let Some(queue) = self.connection_queues.get_mut(&key) else {
1055 return;
1056 };
1057 if queue.is_empty() {
1058 return;
1059 }
1060 queue.front().expect("Queue is not empty.").is_open()
1061 };
1062
1063 if is_open {
1064 self.open_database(key.clone());
1065 } else {
1066 self.delete_database(key.clone());
1067 }
1068
1069 let was_pruned = self.maybe_remove_front_from_queue(&key);
1070
1071 if !was_pruned {
1072 break;
1077 }
1078 }
1079 }
1080
1081 fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
1083 let (is_empty, was_pruned) = {
1084 let Some(queue) = self.connection_queues.get_mut(key) else {
1085 debug_assert!(false, "A connection queue should exist.");
1086 return false;
1087 };
1088 let mut pruned = false;
1089 let front_is_pending = queue.front().map(|record| record.is_pending());
1090 if let Some(is_pending) = front_is_pending &&
1091 !is_pending
1092 {
1093 queue.pop_front().expect("Queue has a non-pending item.");
1094 pruned = true
1095 }
1096 (queue.is_empty(), pruned)
1097 };
1098 if is_empty {
1099 self.connection_queues.remove(key);
1100 }
1101 was_pruned
1102 }
1103
1104 fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
1105 let is_empty = {
1106 let Some(connections) = self.connections.get_mut(key) else {
1107 return debug!("Connection already removed.");
1108 };
1109 connections.remove(id);
1110 connections.is_empty()
1111 };
1112
1113 if is_empty {
1114 self.connections.remove(key);
1115 }
1116 }
1117
1118 fn revert_aborted_upgrade(
1130 &mut self,
1131 key: &IndexedDBDescription,
1132 old_version: u64,
1133 db_name: String,
1134 proxy_map: &StorageProxyMap,
1135 ) {
1136 if old_version == 0 {
1137 if let Some(db) = self.databases.remove(key) {
1138 drop(db);
1141 let response = proxy_map
1142 .handle
1143 .delete_database(proxy_map.bottle_id, db_name.clone())
1144 .recv();
1145 if response.is_err() {
1146 error!("Failed to communicate with client storage.");
1147 return;
1148 }
1149 if response.unwrap().is_err() {
1150 error!("Failed to delete database {:?}", db_name);
1151 }
1152 }
1153 return;
1154 }
1155
1156 let Some(db) = self.databases.get_mut(key) else {
1157 return debug_assert!(false, "Db should have been created");
1158 };
1159 let res = db.set_version(old_version);
1160 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1161 }
1162
1163 fn abort_pending_upgrade(
1167 &mut self,
1168 name: String,
1169 id: Uuid,
1170 origin: ImmutableOrigin,
1171 db_name: String,
1172 proxy_map: &StorageProxyMap,
1173 ) {
1174 let key = IndexedDBDescription { name, origin };
1175 let old = {
1176 let Some(queue) = self.connection_queues.get_mut(&key) else {
1177 return debug_assert!(
1178 false,
1179 "There should be a connection queue for the aborted upgrade."
1180 );
1181 };
1182 let Some(open_request) = queue.pop_front() else {
1183 return debug_assert!(false, "There should be an open request to upgrade.");
1184 };
1185 if open_request.get_id() != id {
1186 return debug_assert!(
1187 false,
1188 "Open request to abort should be at the head of the queue."
1189 );
1190 }
1191 open_request.abort()
1192 };
1193 if let Some(old_version) = old {
1194 self.revert_aborted_upgrade(&key, old_version, db_name, proxy_map);
1195 }
1196
1197 self.remove_connection(&key, &id);
1198
1199 self.advance_connection_queue(key);
1200 }
1201
1202 fn abort_pending_upgrades(
1206 &mut self,
1207 pending_upgrades: HashMap<String, HashSet<Uuid>>,
1208 origin: ImmutableOrigin,
1209 proxy_map: StorageProxyMap,
1210 ) {
1211 for (name, ids) in pending_upgrades.into_iter() {
1212 let mut version_to_revert: Option<u64> = None;
1213 let key = IndexedDBDescription {
1214 name: name.clone(),
1215 origin: origin.clone(),
1216 };
1217 for id in ids.iter() {
1218 self.remove_connection(&key, id);
1219 }
1220 {
1221 let is_empty = {
1222 let Some(queue) = self.connection_queues.get_mut(&key) else {
1223 continue;
1224 };
1225 queue.retain_mut(|open_request| {
1226 if ids.contains(&open_request.get_id()) {
1227 let old = open_request.abort();
1228 if version_to_revert.is_none() &&
1229 let Some(old) = old
1230 {
1231 version_to_revert = Some(old);
1232 }
1233 false
1234 } else {
1235 true
1236 }
1237 });
1238 queue.is_empty()
1239 };
1240 if is_empty {
1241 self.connection_queues.remove(&key);
1242 }
1243 }
1244 if let Some(version) = version_to_revert {
1245 self.revert_aborted_upgrade(&key, version, name, &proxy_map);
1246 }
1247 }
1248 }
1249
1250 fn open_a_database_connection(
1252 &mut self,
1253 sender: GenericCallback<ConnectionMsg>,
1254 origin: ImmutableOrigin,
1255 db_name: String,
1256 version: Option<u64>,
1257 id: Uuid,
1258 proxy_map: StorageProxyMap,
1259 ) {
1260 let key = IndexedDBDescription {
1261 name: db_name.clone(),
1262 origin,
1263 };
1264 let open_request = OpenRequest::Open {
1265 sender,
1266 db_name,
1267 version,
1268 processed: false,
1269 pending_close: Default::default(),
1270 pending_versionchange: Default::default(),
1271 pending_upgrade: None,
1272 id,
1273 proxy_map,
1274 };
1275 let should_continue = {
1276 let queue = self.connection_queues.entry(key.clone()).or_default();
1278
1279 queue.push_back(open_request);
1281 queue.len() == 1
1282 };
1283
1284 if should_continue {
1286 self.open_database(key.clone());
1287 self.maybe_remove_front_from_queue(&key);
1288 }
1289 }
1290
1291 fn get_database(
1292 &self,
1293 origin: ImmutableOrigin,
1294 db_name: String,
1295 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
1296 let idb_description = IndexedDBDescription {
1297 origin,
1298 name: db_name,
1299 };
1300
1301 self.databases.get(&idb_description)
1302 }
1303
1304 fn get_database_mut(
1305 &mut self,
1306 origin: ImmutableOrigin,
1307 db_name: String,
1308 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
1309 let idb_description = IndexedDBDescription {
1310 origin,
1311 name: db_name,
1312 };
1313
1314 self.databases.get_mut(&idb_description)
1315 }
1316
1317 fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
1321 let Some(queue) = self.connection_queues.get_mut(&key) else {
1322 return debug_assert!(false, "A connection queue should exist.");
1323 };
1324 let Some(open_request) = queue.front_mut() else {
1325 return debug_assert!(false, "An open request should be in the queue.");
1326 };
1327 let OpenRequest::Open {
1328 sender,
1329 db_name,
1330 version: _,
1331 processed,
1332 id,
1333 pending_close: _,
1334 pending_versionchange: _,
1335 pending_upgrade,
1336 proxy_map: _,
1337 } = open_request
1338 else {
1339 return;
1340 };
1341
1342 let db = self
1344 .databases
1345 .get_mut(&key)
1346 .expect("Db should have been opened.");
1347
1348 let transaction = self.serial_number_counter;
1350 self.serial_number_counter += 1;
1351
1352 let scope = db
1354 .object_store_names()
1355 .expect("Fetching object store names should not fail.");
1356
1357 db.register_transaction(transaction, IndexedDBTxnMode::Versionchange, scope.clone());
1360
1361 let old_version = db.version().expect("DB should have a version.");
1368
1369 db.set_version(new_version)
1372 .expect("Setting the version should not fail");
1373
1374 *processed = true;
1376 let _ = pending_upgrade.insert(VersionUpgrade {
1377 old: old_version,
1378 new: new_version,
1379 transaction,
1380 });
1381
1382 if sender
1384 .send(ConnectionMsg::Upgrade {
1385 id: *id,
1386 name: db_name.clone(),
1387 version: new_version,
1388 old_version,
1389 transaction,
1390 object_store_names: scope,
1391 })
1392 .is_err()
1393 {
1394 error!("Couldn't queue task for indexeddb upgrade event.");
1395 }
1396
1397 }
1400
1401 fn handle_version_change_done(
1403 &mut self,
1404 name: String,
1405 from_id: Uuid,
1406 old_version: u64,
1407 origin: ImmutableOrigin,
1408 ) {
1409 let key = IndexedDBDescription {
1410 name: name.clone(),
1411 origin,
1412 };
1413 let (can_upgrade, version) = {
1414 let Some(queue) = self.connection_queues.get_mut(&key) else {
1415 return debug_assert!(false, "A connection queue should exist.");
1416 };
1417 let Some(open_request) = queue.front_mut() else {
1418 return debug_assert!(false, "An open request should be in the queue.");
1419 };
1420 let OpenRequest::Open {
1421 sender,
1422 db_name: _,
1423 version,
1424 id,
1425 pending_upgrade: _,
1426 processed: _,
1427 pending_versionchange,
1428 pending_close,
1429 proxy_map: _,
1430 } = open_request
1431 else {
1432 return debug_assert!(
1433 false,
1434 "An request to open a connection should be in the queue."
1435 );
1436 };
1437 debug_assert!(
1438 pending_versionchange.contains(&from_id),
1439 "The open request should be pending on the versionchange event for the connection sending the message."
1440 );
1441
1442 pending_versionchange.remove(&from_id);
1443
1444 if !pending_versionchange.is_empty() {
1446 return;
1447 }
1448
1449 let Some(version) = *version else {
1450 return debug_assert!(
1451 false,
1452 "An upgrade version should have been determined by now."
1453 );
1454 };
1455
1456 if !pending_close.is_empty() &&
1460 sender
1461 .send(ConnectionMsg::Blocked {
1462 name,
1463 id: *id,
1464 version,
1465 old_version,
1466 })
1467 .is_err()
1468 {
1469 return debug!("Script exit during indexeddb database open");
1470 }
1471
1472 (pending_close.is_empty(), version)
1473 };
1474
1475 if can_upgrade {
1478 self.upgrade_database(key.clone(), version);
1480
1481 let was_pruned = self.maybe_remove_front_from_queue(&key);
1482 if was_pruned {
1483 self.advance_connection_queue(key);
1484 }
1485 }
1486 }
1487
1488 fn open_database(&mut self, key: IndexedDBDescription) {
1491 let Some(queue) = self.connection_queues.get_mut(&key) else {
1492 return debug_assert!(false, "A connection queue should exist.");
1493 };
1494 let Some(open_request) = queue.front_mut() else {
1495 return debug_assert!(false, "An open request should be in the queue.");
1496 };
1497 let OpenRequest::Open {
1498 sender,
1499 db_name,
1500 version,
1501 id,
1502 processed,
1503 pending_upgrade: _pending_upgrade,
1504 pending_close,
1505 pending_versionchange,
1506 proxy_map,
1507 } = open_request
1508 else {
1509 return debug_assert!(
1510 false,
1511 "An request to open a connection should be in the queue."
1512 );
1513 };
1514
1515 let requested_version = *version;
1516
1517 let db_version = match self.databases.entry(key.clone()) {
1519 Entry::Vacant(e) => {
1520 let Ok(response) = proxy_map
1529 .handle
1530 .create_database(proxy_map.bottle_id, db_name.clone())
1531 .recv()
1532 else {
1533 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1534 id: *id,
1535 name: db_name.clone(),
1536 error: BackendError::DbErr(
1537 "Failed to communicate with client storage.".to_string(),
1538 ),
1539 }) {
1540 debug!("Script exit during indexeddb database open {:?}", e);
1541 }
1542 return;
1543 };
1544 let (path, created) = match response {
1545 Ok((path, created)) => (path, created),
1546 Err(err) => {
1547 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1548 id: *id,
1549 name: db_name.clone(),
1550 error: BackendError::DbErr(format!("{err:?}")),
1551 }) {
1552 debug!("Script exit during indexeddb database open {:?}", e);
1553 }
1554 return;
1555 },
1556 };
1557 let engine = match SqliteEngine::new(path, created, &key, self.thread_pool.clone())
1558 {
1559 Ok(engine) => engine,
1560 Err(err) => {
1561 let error = backend_error_from_sqlite_error(err);
1562 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1563 id: *id,
1564 name: db_name.clone(),
1565 error,
1566 }) {
1567 debug!("Script exit during indexeddb database open {:?}", e);
1568 }
1569 *processed = true;
1570 return;
1571 },
1572 };
1573 let created_db_path = engine.created_db_path();
1574 let db = IndexedDBEnvironment::new(engine, self.manager_sender.clone());
1575 let db_version = match db.version() {
1576 Ok(version) => version,
1577 Err(err) => {
1578 let error = backend_error_from_sqlite_error(err);
1579 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1580 id: *id,
1581 name: db_name.clone(),
1582 error,
1583 }) {
1584 debug!("Script exit during indexeddb database open {:?}", e);
1585 }
1586 *processed = true;
1587 return;
1588 },
1589 };
1590
1591 *version = if created_db_path {
1592 Some(requested_version.unwrap_or(1))
1593 } else {
1594 Some(requested_version.unwrap_or(db_version))
1595 };
1596
1597 e.insert(db);
1598 db_version
1599 },
1600 Entry::Occupied(db) => {
1601 let db_version = match db.get().version() {
1602 Ok(version) => version,
1603 Err(err) => {
1604 let error = backend_error_from_sqlite_error(err);
1605 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1606 id: *id,
1607 name: db_name.clone(),
1608 error,
1609 }) {
1610 debug!("Script exit during indexeddb database open {:?}", e);
1611 }
1612 *processed = true;
1613 return;
1614 },
1615 };
1616 *version = Some(requested_version.unwrap_or(db_version));
1618 db_version
1619 },
1620 };
1621
1622 let Some(version) = *version else {
1623 return debug_assert!(
1624 false,
1625 "An upgrade version should have been determined by now."
1626 );
1627 };
1628
1629 if version < db_version {
1633 if sender
1634 .send(ConnectionMsg::VersionError {
1635 name: db_name.clone(),
1636 id: *id,
1637 })
1638 .is_err()
1639 {
1640 debug!("Script exit during indexeddb database open");
1641 }
1642 *processed = true;
1643 return;
1644 }
1645
1646 let connection = Connection {
1649 close_pending: false,
1650 sender: sender.clone(),
1651 };
1652 let entry = self.connections.entry(key.clone()).or_default();
1653 entry.insert(*id, connection);
1654
1655 if db_version < version {
1657 let open_connections = entry
1660 .iter_mut()
1661 .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1662 for (id_to_close, conn) in open_connections {
1663 if conn
1667 .sender
1668 .send(ConnectionMsg::VersionChange {
1669 name: db_name.clone(),
1670 id: *id_to_close,
1671 version,
1672 old_version: db_version,
1673 })
1674 .is_err()
1675 {
1676 error!("Failed to send ConnectionMsg::Connection to script.");
1677 };
1678 pending_close.insert(*id_to_close);
1679 pending_versionchange.insert(*id_to_close);
1680 }
1681 if !pending_close.is_empty() {
1682 return;
1684 }
1685
1686 self.upgrade_database(key, version);
1688 return;
1689 }
1690
1691 let object_store_names = self
1693 .databases
1694 .get(&key)
1695 .and_then(|db| db.object_store_names().ok())
1696 .unwrap_or_default();
1697 *processed = true;
1698 if sender
1699 .send(ConnectionMsg::Connection {
1700 name: db_name.clone(),
1701 id: *id,
1702 version: db_version,
1703 upgraded: false,
1704 object_store_names,
1705 })
1706 .is_err()
1707 {
1708 error!("Failed to send ConnectionMsg::Connection to script.");
1709 };
1710 }
1711
1712 fn start_delete_database(
1715 &mut self,
1716 key: IndexedDBDescription,
1717 id: Uuid,
1718 proxy_map: StorageProxyMap,
1719 sender: GenericCallback<BackendResult<u64>>,
1720 ) {
1721 let open_request = OpenRequest::Delete {
1722 sender,
1723 _origin: key.origin.clone(),
1724 db_name: key.name.clone(),
1725 processed: false,
1726 proxy_map,
1727 id,
1728 };
1729
1730 let should_continue = {
1731 let queue = self.connection_queues.entry(key.clone()).or_default();
1733
1734 queue.push_back(open_request);
1736 queue.len() == 1
1737 };
1738
1739 if should_continue {
1741 self.delete_database(key.clone());
1742 self.maybe_remove_front_from_queue(&key);
1743 }
1744 }
1745
1746 fn delete_database(&mut self, key: IndexedDBDescription) {
1748 let Some(queue) = self.connection_queues.get_mut(&key) else {
1749 return debug_assert!(false, "A connection queue should exist.");
1750 };
1751 let Some(open_request) = queue.front_mut() else {
1752 return debug_assert!(false, "An open request should be in the queue.");
1753 };
1754 let OpenRequest::Delete {
1755 sender,
1756 _origin: _,
1757 db_name,
1758 processed,
1759 id: _,
1760 proxy_map,
1761 } = open_request
1762 else {
1763 return debug_assert!(
1764 false,
1765 "An request to open a connection should be in the queue."
1766 );
1767 };
1768
1769 let version = if let Some(db) = self.databases.remove(&key) {
1771 let res = db.version();
1784 let Ok(version) = res else {
1785 *processed = true;
1786 if sender
1787 .send(BackendResult::Err(BackendError::DbErr(
1788 res.unwrap_err().to_string(),
1789 )))
1790 .is_err()
1791 {
1792 debug!("Script went away during pending database delete.");
1793 }
1794 return;
1795 };
1796
1797 drop(db);
1800
1801 let Ok(response) = proxy_map
1805 .handle
1806 .delete_database(proxy_map.bottle_id, db_name.clone())
1807 .recv()
1808 else {
1809 if sender
1810 .send(BackendResult::Err(BackendError::DbErr(
1811 "Failed to communicate with client storage.".to_string(),
1812 )))
1813 .is_err()
1814 {
1815 debug!("Script went away during pending database delete.");
1816 }
1817 return;
1818 };
1819 if let Err(err) = response {
1820 if sender
1821 .send(BackendResult::Err(BackendError::DbErr(format!(
1822 "Client storage error: {err:?}"
1823 ))))
1824 .is_err()
1825 {
1826 debug!("Script went away during pending database delete.");
1827 }
1828 return;
1829 }
1830 version
1831 } else {
1832 0
1833 };
1834
1835 if sender.send(BackendResult::Ok(version)).is_err() {
1837 debug!("Script went away during pending database delete.");
1838 }
1839
1840 *processed = true;
1841 }
1842
1843 fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1845 let key = IndexedDBDescription { origin, name };
1862 let (can_upgrade, version) = {
1863 self.remove_connection(&key, &id);
1864
1865 let Some(queue) = self.connection_queues.get_mut(&key) else {
1866 return;
1867 };
1868 let Some(open_request) = queue.front_mut() else {
1869 return;
1870 };
1871 if let OpenRequest::Open {
1872 sender: _,
1873 db_name: _,
1874 version,
1875 id: _,
1876 processed: _,
1877 pending_upgrade,
1878 pending_versionchange,
1879 pending_close,
1880 proxy_map: _,
1881 } = open_request
1882 {
1883 pending_close.remove(&id);
1884 (
1885 pending_close.is_empty() &&
1887 pending_versionchange.is_empty() &&
1888 !pending_upgrade.is_some(),
1889 *version,
1890 )
1891 } else {
1892 (false, None)
1893 }
1894 };
1895
1896 if can_upgrade {
1902 let Some(version) = version else {
1904 return debug_assert!(
1905 false,
1906 "An upgrade version should have been determined by now."
1907 );
1908 };
1909 self.upgrade_database(key.clone(), version);
1910
1911 let was_pruned = self.maybe_remove_front_from_queue(&key);
1912 if was_pruned {
1913 self.advance_connection_queue(key);
1914 }
1915 }
1916 }
1917
1918 fn handle_sync_operation(&mut self, operation: SyncOperation) {
1919 match operation {
1920 SyncOperation::GetDatabases(sender, origin) => {
1921 let info_list: Vec<DatabaseInfo> = self
1932 .databases
1933 .iter()
1934 .filter_map(|(description, info)| {
1935 if let Ok(version) = info.version() {
1937 if version == 0 {
1939 None
1940 } else {
1941 if description.origin == origin {
1946 Some(DatabaseInfo {
1947 name: description.name.clone(),
1948 version,
1949 })
1950 } else {
1951 None
1952 }
1953 }
1954 } else {
1955 None
1956 }
1957 })
1958 .collect();
1959
1960 let result = Ok(info_list);
1964
1965 if sender.send(result).is_err() {
1967 debug!("Couldn't send SyncOperation::GetDatabases reply.");
1968 }
1969 },
1970 SyncOperation::CloseDatabase(origin, id, db_name) => {
1971 self.close_database(origin, id, db_name);
1972 },
1973 SyncOperation::OpenDatabase(sender, origin, db_name, version, id, proxy_map) => {
1974 self.open_a_database_connection(sender, origin, db_name, version, id, proxy_map);
1975 },
1976 SyncOperation::AbortPendingUpgrades {
1977 pending_upgrades,
1978 origin,
1979 proxy_map,
1980 } => {
1981 self.abort_pending_upgrades(pending_upgrades, origin, proxy_map);
1982 },
1983 SyncOperation::DeleteDatabase(callback, origin, db_name, proxy_map, id) => {
1984 let idb_description = IndexedDBDescription {
1985 origin,
1986 name: db_name,
1987 };
1988 self.start_delete_database(idb_description, id, proxy_map, callback);
1989 },
1990 SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1991 let result = self.get_database(origin, db_name).map(|db| {
1993 let key_generator_current_number = db.key_generator_current_number(&store_name);
1994 IndexedDBObjectStore {
1995 key_path: db.key_path(&store_name),
1996 has_key_generator: key_generator_current_number.is_some(),
1997 key_generator_current_number,
1998 indexes: db.indexes(&store_name).unwrap_or_default(),
1999 name: store_name,
2000 }
2001 });
2002 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
2003 },
2004 SyncOperation::CreateIndex(
2005 origin,
2006 db_name,
2007 store_name,
2008 index_name,
2009 key_path,
2010 unique,
2011 multi_entry,
2012 ) => {
2013 if let Some(db) = self.get_database(origin, db_name) {
2014 let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
2015 }
2016 },
2017 SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
2018 if let Some(db) = self.get_database(origin, db_name) {
2019 let _ = db.delete_index(&store_name, index_name);
2020 }
2021 },
2022 SyncOperation::Commit(callback, origin, db_name, txn) => {
2023 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2027 if db.can_commit_now(txn) {
2028 if callback
2029 .send(TxnCompleteMsg {
2030 origin: origin.clone(),
2031 db_name: db_name.clone(),
2032 txn,
2033 result: Ok(()),
2034 })
2035 .is_err()
2036 {
2037 error!(
2038 "Failed to send immediate commit completion for db '{}' txn {}.",
2039 db_name, txn
2040 );
2041 }
2042 } else {
2043 db.queue_pending_commit_callback(txn, callback);
2044 }
2045 db.schedule_transactions(origin, &db_name);
2046 } else if callback
2047 .send(TxnCompleteMsg {
2048 origin,
2049 db_name: db_name.clone(),
2050 txn,
2051 result: Ok(()),
2054 })
2055 .is_err()
2056 {
2057 error!(
2058 "Failed to send commit completion for missing db '{}' txn {}.",
2059 db_name, txn
2060 );
2061 }
2062 },
2063 SyncOperation::Abort(abort_callback, origin, db_name, txn) => {
2064 let pending_commit_callbacks =
2068 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2069 let callbacks = db.take_pending_commit_callbacks(txn);
2070 db.abort_transaction(txn);
2071 callbacks
2072 } else {
2073 Vec::new()
2074 };
2075 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2076 db.schedule_transactions(origin.clone(), &db_name);
2077 }
2078 for callback in pending_commit_callbacks {
2079 if callback
2080 .send(storage_traits::indexeddb::TxnCompleteMsg {
2081 origin: origin.clone(),
2082 db_name: db_name.clone(),
2083 txn,
2084 result: Err(BackendError::Abort),
2085 })
2086 .is_err()
2087 {
2088 error!(
2089 "Failed to send deferred abort completion for db '{}' txn {}.",
2090 db_name, txn
2091 );
2092 }
2093 }
2094 if abort_callback
2095 .send(storage_traits::indexeddb::TxnCompleteMsg {
2096 origin,
2097 db_name: db_name.clone(),
2098 txn,
2099 result: Err(BackendError::Abort),
2100 })
2101 .is_err()
2102 {
2103 error!(
2104 "Failed to send abort completion for db '{}' txn {}.",
2105 db_name, txn
2106 );
2107 }
2108 },
2109 SyncOperation::UpgradeTransactionFinished {
2110 origin,
2111 db_name,
2112 txn,
2113 committed,
2114 } => {
2115 self.handle_upgrade_transaction_finished(db_name, origin, txn, committed);
2116 },
2117 SyncOperation::RequestHandled {
2118 origin,
2119 db_name,
2120 txn,
2121 request_id,
2122 } => {
2123 let should_notify =
2130 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2131 db.mark_request_handled(txn, request_id);
2132 db.can_notify_txn_maybe_commit(txn)
2133 } else {
2134 false
2135 };
2136 if should_notify {
2137 self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
2138 origin,
2139 db_name,
2140 txn,
2141 });
2142 }
2143 },
2144 SyncOperation::TxnMaybeCommit {
2145 origin,
2146 db_name,
2147 txn,
2148 } => {
2149 self.handle_txn_maybe_commit(origin, db_name, txn);
2150 },
2151 SyncOperation::TransactionFinished {
2152 origin,
2153 db_name,
2154 txn,
2155 } => {
2156 let maybe_commit_txns =
2157 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2158 db.finish_transaction(txn);
2159 db.schedule_transactions(origin.clone(), &db_name);
2160 db.maybe_commit_candidates()
2161 } else {
2162 Vec::new()
2163 };
2164 for candidate in maybe_commit_txns {
2165 self.handle_txn_maybe_commit(origin.clone(), db_name.clone(), candidate);
2166 }
2167 },
2168 SyncOperation::CreateTransaction {
2169 sender,
2170 origin,
2171 db_name,
2172 mode,
2173 scope,
2174 } => {
2175 let key = IndexedDBDescription {
2176 origin: origin.clone(),
2177 name: db_name.clone(),
2178 };
2179 if let Some(db) = self.databases.get_mut(&key) {
2180 let transaction_id = self.serial_number_counter;
2181 self.serial_number_counter += 1;
2182 db.register_transaction(transaction_id, mode, scope);
2183 db.schedule_transactions(origin, &db_name);
2184 let _ = sender.send(Ok(transaction_id));
2185 } else {
2186 let _ = sender.send(Err(BackendError::DbNotFound));
2187 }
2188 },
2189 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
2190 if let Some(db) = self.get_database_mut(origin, db_name) {
2191 if version > db.version().unwrap_or(0) {
2192 let _ = db.set_version(version);
2193 }
2194 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2196 } else {
2197 let _ = sender.send(Err(BackendError::DbNotFound));
2198 }
2199 },
2200 SyncOperation::CreateObjectStore(
2201 sender,
2202 origin,
2203 db_name,
2204 store_name,
2205 key_paths,
2206 auto_increment,
2207 ) => {
2208 if let Some(db) = self.get_database_mut(origin, db_name) {
2209 let result = db.create_object_store(&store_name, key_paths, auto_increment);
2210 let _ = sender.send(result.map_err(BackendError::from));
2211 } else {
2212 let _ = sender.send(Err(BackendError::DbNotFound));
2213 }
2214 },
2215 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
2216 if let Some(db) = self.get_database_mut(origin, db_name) {
2217 let result = db.delete_object_store(&store_name);
2218 let _ = sender.send(result.map_err(BackendError::from));
2219 } else {
2220 let _ = sender.send(Err(BackendError::DbNotFound));
2221 }
2222 },
2223 SyncOperation::Version(sender, origin, db_name) => {
2224 if let Some(db) = self.get_database(origin, db_name) {
2225 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2226 } else {
2227 let _ = sender.send(Err(BackendError::DbNotFound));
2228 }
2229 },
2230 SyncOperation::NotifyEndOfVersionChange {
2231 name,
2232 id,
2233 old_version,
2234 origin,
2235 } => {
2236 self.handle_version_change_done(name, id, old_version, origin);
2237 },
2238 SyncOperation::Exit(_) => {
2239 unreachable!("We must've already broken out of event loop.");
2240 },
2241 }
2242 }
2243
2244 fn collect_memory_reports(&self) -> Vec<Report> {
2245 let mut reports = vec![];
2246 perform_memory_report(|ops| {
2247 reports.push(Report {
2248 path: path!["indexeddb"],
2249 kind: ReportKind::ExplicitJemallocHeapSize,
2250 size: self.connections.size_of(ops) +
2251 self.databases.size_of(ops) +
2252 self.connection_queues.size_of(ops),
2253 });
2254 });
2255 reports
2256 }
2257}