1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use profile_traits::generic_callback::GenericCallback;
20use profile_traits::mem::{
21 ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, perform_memory_report,
22};
23use profile_traits::path;
24use rusqlite::Error as RusqliteError;
25use rustc_hash::{FxHashMap, FxHashSet};
26use servo_config::pref;
27use servo_url::origin::ImmutableOrigin;
28use storage_traits::indexeddb::{
29 AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
30 DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
31 SyncOperation, TxnCompleteMsg,
32};
33use uuid::Uuid;
34
35use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
36use crate::shared::is_sqlite_disk_full_error;
37
38pub trait IndexedDBThreadFactory {
39 fn new(config_dir: Option<PathBuf>, mem_profiler_chan: MemProfilerChan) -> Self;
40}
41
42impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
43 fn new(
44 config_dir: Option<PathBuf>,
45 mem_profiler_chan: MemProfilerChan,
46 ) -> GenericSender<IndexedDBThreadMsg> {
47 let (chan, port) = generic_channel::channel().unwrap();
48 let chan2 = chan.clone();
49
50 let mut idb_base_dir = PathBuf::new();
51 if let Some(p) = config_dir {
52 idb_base_dir.push(p);
53 }
54 idb_base_dir.push("IndexedDB");
55
56 let manager_sender = chan.clone();
57
58 thread::Builder::new()
59 .name("IndexedDBManager".to_owned())
60 .spawn(move || {
61 mem_profiler_chan.run_with_memory_reporting(
62 || IndexedDBManager::new(port, manager_sender, idb_base_dir).start(),
63 String::from("indexedDB-reporter"),
64 chan2,
65 IndexedDBThreadMsg::CollectMemoryReport,
66 );
67 })
68 .expect("Thread spawning failed");
69
70 chan
71 }
72}
73
74#[derive(Clone, Debug, Eq, Hash, MallocSizeOf, PartialEq)]
77pub struct IndexedDBDescription {
78 pub origin: ImmutableOrigin,
79 pub name: String,
80}
81
82impl IndexedDBDescription {
83 const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
85 0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
86 0x5d,
87 ]);
88 pub(super) fn as_path(&self) -> PathBuf {
91 let mut path = PathBuf::new();
92
93 let origin_uuid = Uuid::new_v5(
95 Self::NAMESPACE_SERVO_IDB,
96 self.origin.ascii_serialization().as_bytes(),
97 );
98 let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
99 path.push(origin_uuid.to_string());
100 path.push(db_name_uuid.to_string());
101
102 path
103 }
104}
105
106#[derive(MallocSizeOf)]
107struct TxnInfo {
108 created_seq: u64,
109 mode: IndexedDBTxnMode,
110 scope: HashSet<String>,
111 live: bool,
112}
113
114#[derive(MallocSizeOf)]
115struct IndexedDBEnvironment<E: KvsEngine> {
116 engine: E,
117 manager_sender: GenericSender<IndexedDBThreadMsg>,
118 transactions: FxHashMap<u64, KvsTransaction>,
119 next_created_seq: u64,
120 txn_info: FxHashMap<u64, TxnInfo>,
121 queued_readwrite: VecDeque<u64>,
122 queued_readonly: VecDeque<u64>,
123 queued_readwrite_set: FxHashSet<u64>,
125 queued_readonly_set: FxHashSet<u64>,
126 running_readwrite: Option<u64>,
127 running_readonly: HashSet<u64>,
128 handled_next_unhandled_request_id: FxHashMap<u64, u64>,
129 handled_pending: FxHashMap<u64, HashSet<u64>>,
130 pending_commit_callbacks: FxHashMap<u64, Vec<GenericCallback<TxnCompleteMsg>>>,
131}
132
133impl<E: KvsEngine> IndexedDBEnvironment<E> {
134 fn new(
135 engine: E,
136 manager_sender: GenericSender<IndexedDBThreadMsg>,
137 ) -> IndexedDBEnvironment<E> {
138 IndexedDBEnvironment {
139 engine,
140 manager_sender,
141 transactions: FxHashMap::default(),
142 next_created_seq: 0,
143 txn_info: FxHashMap::default(),
144 queued_readwrite: VecDeque::new(),
145 queued_readonly: VecDeque::new(),
146 queued_readwrite_set: FxHashSet::default(),
147 queued_readonly_set: FxHashSet::default(),
148 running_readwrite: None,
149 running_readonly: HashSet::new(),
150 handled_next_unhandled_request_id: FxHashMap::default(),
151 handled_pending: FxHashMap::default(),
152 pending_commit_callbacks: FxHashMap::default(),
153 }
154 }
155
156 fn register_transaction(&mut self, txn: u64, mode: IndexedDBTxnMode, scope: Vec<String>) {
157 if self.txn_info.contains_key(&txn) {
158 return;
159 }
160 let created_seq = self.next_created_seq;
161 self.next_created_seq += 1;
162 self.txn_info.insert(
163 txn,
164 TxnInfo {
165 created_seq,
166 mode: mode.clone(),
167 scope: scope.into_iter().collect(),
168 live: true,
169 },
170 );
171 self.transactions
172 .entry(txn)
173 .or_insert_with(|| KvsTransaction {
174 requests: VecDeque::new(),
175 mode,
176 });
177 }
178
179 fn scopes_overlap(a: &TxnInfo, b: &TxnInfo) -> bool {
180 a.scope.iter().any(|store| b.scope.contains(store))
181 }
182
183 fn earlier_overlapping_live_exists<F>(&self, txn: u64, predicate: F) -> bool
184 where
185 F: Fn(&TxnInfo) -> bool,
186 {
187 let Some(current) = self.txn_info.get(&txn) else {
188 return false;
189 };
190 self.txn_info.iter().any(|(other_txn, other)| {
191 *other_txn != txn &&
192 other.live &&
193 other.created_seq < current.created_seq &&
194 Self::scopes_overlap(current, other) &&
195 predicate(other)
196 })
197 }
198
199 fn can_start_by_spec(&self, txn: u64) -> bool {
200 let Some(info) = self.txn_info.get(&txn) else {
201 return false;
202 };
203 match info.mode {
204 IndexedDBTxnMode::Readonly => !self.earlier_overlapping_live_exists(txn, |other| {
205 other.mode != IndexedDBTxnMode::Readonly
206 }),
207 IndexedDBTxnMode::Readwrite | IndexedDBTxnMode::Versionchange => {
208 !self.earlier_overlapping_live_exists(txn, |_other| true)
209 },
210 }
211 }
212
213 fn enqueue_txn(&mut self, txn: u64, mode: &IndexedDBTxnMode) {
214 match mode {
215 IndexedDBTxnMode::Readonly => {
216 if self.queued_readonly_set.insert(txn) {
217 self.queued_readonly.push_back(txn);
218 }
219 },
220 _ => {
221 if self.queued_readwrite_set.insert(txn) {
222 self.queued_readwrite.push_back(txn);
223 }
224 },
225 }
226 }
227
228 fn queue_operation(
229 &mut self,
230 store_name: &str,
231 serial_number: u64,
232 mode: IndexedDBTxnMode,
233 operation: AsyncOperation,
234 ) {
235 let mut enqueue_mode = None;
236 match self.transactions.entry(serial_number) {
237 Entry::Occupied(mut entry) => {
238 let transaction = entry.get_mut();
239 let transaction_mode = transaction.mode.clone();
240 let was_empty = transaction.requests.is_empty();
241 transaction.requests.push_back(KvsOperation {
242 operation,
243 store_name: String::from(store_name),
244 });
245 if was_empty {
246 enqueue_mode = Some(transaction_mode);
250 }
251 },
252 Entry::Vacant(entry) => {
253 entry
254 .insert(KvsTransaction {
255 requests: VecDeque::new(),
256 mode: mode.clone(),
257 })
258 .requests
259 .push_back(KvsOperation {
260 operation,
261 store_name: String::from(store_name),
262 });
263 enqueue_mode = Some(mode);
264 },
265 };
266 if let Some(mode) = enqueue_mode {
267 self.enqueue_txn(serial_number, &mode);
268 }
269 }
270
271 fn schedule_transactions(&mut self, origin: ImmutableOrigin, db_name: &str) {
273 let readonly_len = self.queued_readonly.len();
274 for _ in 0..readonly_len {
275 let Some(txn) = self.queued_readonly.pop_front() else {
276 break;
277 };
278
279 self.queued_readonly_set.remove(&txn);
280
281 let Some(transaction) = self.transactions.get(&txn) else {
282 continue;
283 };
284 if self.running_readonly.contains(&txn) {
285 if !transaction.requests.is_empty() && self.queued_readonly_set.insert(txn) {
286 self.queued_readonly.push_back(txn);
287 }
288 continue;
289 }
290 if transaction.requests.is_empty() {
291 continue;
292 }
293 if !self.can_start_by_spec(txn) {
294 if self.queued_readonly_set.insert(txn) {
295 self.queued_readonly.push_back(txn);
296 }
297 continue;
298 }
299
300 self.running_readonly.insert(txn);
303 self.start_transaction(origin.clone(), db_name.to_string(), txn, None);
304 }
305
306 if self.running_readwrite.is_some() {
308 return;
309 }
310 let readwrite_len = self.queued_readwrite.len();
311 for _ in 0..readwrite_len {
312 let Some(txn) = self.queued_readwrite.pop_front() else {
313 break;
314 };
315
316 self.queued_readwrite_set.remove(&txn);
317
318 let Some(transaction) = self.transactions.get(&txn) else {
319 continue;
320 };
321
322 if self.running_readwrite == Some(txn) {
323 if !transaction.requests.is_empty() && self.queued_readwrite_set.insert(txn) {
324 self.queued_readwrite.push_back(txn);
325 }
326 continue;
327 }
328
329 if transaction.requests.is_empty() {
330 continue;
331 }
332 if !self.can_start_by_spec(txn) {
333 if self.queued_readwrite_set.insert(txn) {
334 self.queued_readwrite.push_back(txn);
335 }
336 continue;
337 }
338
339 self.running_readwrite = Some(txn);
342 self.start_transaction(origin, db_name.to_string(), txn, None);
343 return;
344 }
345 }
346
347 fn start_transaction(
349 &mut self,
350 origin: ImmutableOrigin,
351 db_name: String,
352 txn: u64,
353 sender: Option<GenericSender<BackendResult<()>>>,
354 ) {
355 let (mode, requests) = match self.transactions.get_mut(&txn) {
360 Some(transaction) => {
361 let mode = transaction.mode.clone();
362 let requests = std::mem::take(&mut transaction.requests);
363 (mode, requests)
364 },
365 None => {
366 if let Some(sender) = sender {
368 let _ = sender.send(Ok(()));
369 }
370 return;
371 },
372 };
373
374 if requests.is_empty() {
375 if let Some(sender) = sender {
376 let _ = sender.send(Ok(()));
377 }
378 return;
381 }
382
383 let manager_sender = self.manager_sender.clone();
384 self.engine.process_transaction(
385 KvsTransaction { mode, requests },
386 Box::new(move || {
387 if let Err(err) = manager_sender.send(IndexedDBThreadMsg::EngineTxnBatchComplete {
391 origin,
392 db_name,
393 txn,
394 }) {
395 error!(
396 "Failed to send IndexedDBThreadMsg::EngineTxnBatchComplete: {:?}",
397 err
398 );
399 }
402
403 if let Some(sender) = sender {
406 let _ = sender.send(Ok(()));
407 }
408 }),
409 );
410 }
411
412 fn mark_request_handled(&mut self, txn: u64, request_id: u64) {
413 let current = self
414 .handled_next_unhandled_request_id
415 .get(&txn)
416 .copied()
417 .unwrap_or(0);
418 if request_id == current {
419 let mut next = current + 1;
420 if let Some(pending) = self.handled_pending.get_mut(&txn) {
421 while pending.remove(&next) {
422 next += 1;
423 }
424 if pending.is_empty() {
425 self.handled_pending.remove(&txn);
426 }
427 }
428 self.handled_next_unhandled_request_id.insert(txn, next);
429 } else if request_id > current {
430 self.handled_pending
431 .entry(txn)
432 .or_default()
433 .insert(request_id);
434 }
435 }
436
437 fn can_notify_txn_maybe_commit(&self, txn: u64) -> bool {
438 if self.running_readwrite == Some(txn) || self.running_readonly.contains(&txn) {
439 return false;
440 }
441 if self.queued_readonly_set.contains(&txn) || self.queued_readwrite_set.contains(&txn) {
443 return false;
444 }
445 match self.transactions.get(&txn) {
446 Some(t) => t.requests.is_empty(),
447 None => true,
448 }
449 }
450
451 fn can_commit_now(&self, txn: u64) -> bool {
452 self.can_start_by_spec(txn) && self.can_notify_txn_maybe_commit(txn)
453 }
454
455 fn queue_pending_commit_callback(
456 &mut self,
457 txn: u64,
458 callback: GenericCallback<TxnCompleteMsg>,
459 ) {
460 self.pending_commit_callbacks
461 .entry(txn)
462 .or_default()
463 .push(callback);
464 }
465
466 fn take_pending_commit_callbacks(&mut self, txn: u64) -> Vec<GenericCallback<TxnCompleteMsg>> {
467 self.pending_commit_callbacks
468 .remove(&txn)
469 .unwrap_or_default()
470 }
471
472 fn maybe_commit_candidates(&self) -> Vec<u64> {
473 let mut candidates: Vec<(u64, u64)> = self
474 .txn_info
475 .iter()
476 .filter_map(|(txn, info)| {
477 if !info.live || !self.can_commit_now(*txn) {
478 return None;
479 }
480 Some((*txn, info.created_seq))
481 })
482 .collect();
483 candidates.sort_by_key(|(_, created_seq)| *created_seq);
484 candidates.into_iter().map(|(txn, _)| txn).collect()
485 }
486
487 fn finish_transaction(&mut self, txn: u64) {
488 if let Some(info) = self.txn_info.get_mut(&txn) {
489 info.live = false;
490 }
491 self.txn_info.remove(&txn);
492 self.transactions.remove(&txn);
493 self.queued_readonly.retain(|queued| *queued != txn);
494 self.queued_readwrite.retain(|queued| *queued != txn);
495 self.queued_readonly_set.remove(&txn);
496 self.queued_readwrite_set.remove(&txn);
497 if self.running_readwrite == Some(txn) {
498 self.running_readwrite = None;
499 }
500 self.running_readonly.remove(&txn);
501 self.handled_next_unhandled_request_id.remove(&txn);
502 self.handled_pending.remove(&txn);
503 self.pending_commit_callbacks.remove(&txn);
504 }
505
506 fn abort_transaction(&mut self, txn: u64) {
507 self.transactions.remove(&txn);
510 self.queued_readonly.retain(|queued| *queued != txn);
511 self.queued_readwrite.retain(|queued| *queued != txn);
512 self.queued_readonly_set.remove(&txn);
513 self.queued_readwrite_set.remove(&txn);
514 if self.running_readwrite == Some(txn) {
515 self.running_readwrite = None;
516 }
517 self.running_readonly.remove(&txn);
518 self.handled_next_unhandled_request_id.remove(&txn);
519 self.handled_pending.remove(&txn);
520 self.pending_commit_callbacks.remove(&txn);
521 }
522
523 fn key_generator_current_number(&self, store_name: &str) -> Option<i32> {
524 self.engine.key_generator_current_number(store_name)
525 }
526
527 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
528 self.engine.key_path(store_name)
529 }
530
531 fn object_store_names(&self) -> DbResult<Vec<String>> {
532 self.engine
533 .object_store_names()
534 .map_err(|err| format!("{err:?}"))
535 }
536
537 fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
538 self.engine
539 .indexes(store_name)
540 .map_err(|err| format!("{err:?}"))
541 }
542
543 fn create_index(
544 &self,
545 store_name: &str,
546 index_name: String,
547 key_path: KeyPath,
548 unique: bool,
549 multi_entry: bool,
550 ) -> DbResult<CreateObjectResult> {
551 self.engine
552 .create_index(store_name, index_name, key_path, unique, multi_entry)
553 .map_err(|err| format!("{err:?}"))
554 }
555
556 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
557 self.engine
558 .delete_index(store_name, index_name)
559 .map_err(|err| format!("{err:?}"))
560 }
561
562 fn create_object_store(
563 &mut self,
564 store_name: &str,
565 key_path: Option<KeyPath>,
566 auto_increment: bool,
567 ) -> DbResult<CreateObjectResult> {
568 self.engine
569 .create_store(store_name, key_path, auto_increment)
570 .map_err(|err| format!("{err:?}"))
571 }
572
573 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
574 let indexes = self
577 .engine
578 .indexes(store_name)
579 .map_err(|err| format!("{err:?}"))?;
580 for index in indexes {
581 self.engine
582 .delete_index(store_name, index.name)
583 .map_err(|err| format!("{err:?}"))?;
584 }
585 self.engine
586 .delete_store(store_name)
587 .map_err(|err| format!("{err:?}"))
588 }
589
590 fn delete_database(self) -> BackendResult<()> {
591 let result = self.engine.delete_database();
592 result
593 .map_err(|err| format!("{err:?}"))
594 .map_err(BackendError::from)
595 }
596
597 fn version(&self) -> Result<u64, E::Error> {
598 self.engine.version()
599 }
600
601 fn set_version(&mut self, version: u64) -> DbResult<()> {
602 self.engine
603 .set_version(version)
604 .map_err(|err| format!("{err:?}"))
605 }
606}
607
608fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
609 if is_sqlite_disk_full_error(&err) {
610 BackendError::QuotaExceeded
611 } else {
612 BackendError::DbErr(format!("{err:?}"))
613 }
614}
615
616#[derive(MallocSizeOf)]
620enum OpenRequest {
621 Open {
622 sender: GenericCallback<ConnectionMsg>,
624
625 db_name: String,
627
628 version: Option<u64>,
631
632 processed: bool,
634
635 pending_upgrade: Option<VersionUpgrade>,
637
638 pending_close: HashSet<Uuid>,
640
641 pending_versionchange: HashSet<Uuid>,
646
647 id: Uuid,
648 },
649 Delete {
650 sender: GenericCallback<BackendResult<u64>>,
652
653 _origin: ImmutableOrigin,
657
658 _db_name: String,
661
662 processed: bool,
664
665 id: Uuid,
666 },
667}
668
669impl OpenRequest {
670 fn get_id(&self) -> Uuid {
671 let id = match self {
672 OpenRequest::Open {
673 sender: _,
674 db_name: _,
675 version: _,
676 processed: _,
677 pending_upgrade: _,
678 pending_close: _,
679 pending_versionchange: _,
680 id,
681 } => id,
682 OpenRequest::Delete {
683 sender: _,
684 _origin: _,
685 _db_name: _,
686 processed: _,
687 id,
688 } => id,
689 };
690 *id
691 }
692
693 fn is_open(&self) -> bool {
694 match self {
695 OpenRequest::Open {
696 sender: _,
697 db_name: _,
698 version: _,
699 processed: _,
700 pending_upgrade: _,
701 pending_close: _,
702 pending_versionchange: _,
703 id: _,
704 } => true,
705 OpenRequest::Delete {
706 sender: _,
707 _origin: _,
708 _db_name: _,
709 processed: _,
710 id: _,
711 } => false,
712 }
713 }
714
715 fn is_pending(&self) -> bool {
718 match self {
719 OpenRequest::Open {
720 sender: _,
721 db_name: _,
722 version: _,
723 processed,
724 pending_upgrade,
725 pending_close,
726 pending_versionchange,
727 id: _,
728 } => {
729 !processed ||
730 pending_upgrade.is_some() ||
731 !pending_close.is_empty() ||
732 !pending_versionchange.is_empty()
733 },
734 OpenRequest::Delete {
735 sender: _,
736 _origin: _,
737 _db_name: _,
738 processed,
739 id: _,
740 } => !processed,
741 }
742 }
743
744 fn abort(&self) -> Option<u64> {
747 match self {
748 OpenRequest::Open {
749 sender,
750 db_name,
751 version: _,
752 processed: _,
753 pending_close: _,
754 pending_versionchange: _,
755 pending_upgrade,
756 id,
757 } => {
758 if sender
759 .send(ConnectionMsg::AbortError {
760 name: db_name.clone(),
761 id: *id,
762 })
763 .is_err()
764 {
765 error!("Failed to send ConnectionMsg::Connection to script.");
766 };
767 pending_upgrade.as_ref().map(|upgrade| upgrade.old)
768 },
769 OpenRequest::Delete {
770 sender,
771 _origin: _,
772 _db_name: _,
773 processed: _,
774 id: _,
775 } => {
776 if sender.send(Err(BackendError::DbNotFound)).is_err() {
777 error!("Failed to send result of database delete to script.");
778 };
779 None
780 },
781 }
782 }
783}
784
785#[derive(MallocSizeOf)]
786struct VersionUpgrade {
787 old: u64,
788 new: u64,
789 transaction: u64,
790}
791
792#[derive(MallocSizeOf)]
794struct Connection {
795 close_pending: bool,
797
798 sender: GenericCallback<ConnectionMsg>,
800}
801
802struct IndexedDBManager {
803 port: GenericReceiver<IndexedDBThreadMsg>,
804 manager_sender: GenericSender<IndexedDBThreadMsg>,
805 idb_base_dir: PathBuf,
806 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
807 thread_pool: Arc<ThreadPool>,
808
809 serial_number_counter: u64,
814
815 connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
817
818 connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
820}
821
822impl IndexedDBManager {
823 fn new(
824 port: GenericReceiver<IndexedDBThreadMsg>,
825 manager_sender: GenericSender<IndexedDBThreadMsg>,
826 idb_base_dir: PathBuf,
827 ) -> IndexedDBManager {
828 debug!("New indexedDBManager");
829
830 let thread_count = thread::available_parallelism()
834 .map(|i| i.get())
835 .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
836 .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
837
838 IndexedDBManager {
839 port,
840 manager_sender,
841 idb_base_dir,
842 databases: HashMap::new(),
843 thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
844 serial_number_counter: 0,
845 connection_queues: Default::default(),
846 connections: Default::default(),
847 }
848 }
849}
850
851impl IndexedDBManager {
852 fn start(&mut self) {
853 loop {
854 let message = match self.port.recv() {
857 Ok(msg) => msg,
858 Err(ReceiveError::Disconnected) => {
859 break;
860 },
861 Err(e) => {
862 warn!("Error in IndexedDB thread: {e:?}");
863 continue;
864 },
865 };
866 match message {
867 IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
868 let _ = sender.send(());
869 break;
870 },
871 IndexedDBThreadMsg::Sync(operation) => {
872 self.handle_sync_operation(operation);
873 },
874 IndexedDBThreadMsg::Async(
875 origin,
876 db_name,
877 store_name,
878 txn,
879 _request_id,
880 mode,
881 operation,
882 ) => {
883 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
884 db.queue_operation(&store_name, txn, mode, operation);
886 db.schedule_transactions(origin, &db_name);
887 }
888 },
889 IndexedDBThreadMsg::EngineTxnBatchComplete {
890 origin,
891 db_name,
892 txn,
893 } => {
894 let should_notify =
895 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
896 let mode = db.transactions.get(&txn).map(|t| t.mode.clone());
898
899 match mode {
900 Some(IndexedDBTxnMode::Readonly) => {
901 db.running_readonly.remove(&txn);
902 },
903 Some(_) => {
904 if db.running_readwrite == Some(txn) {
905 db.running_readwrite = None;
906 }
907 },
908 None => {
909 },
911 }
912
913 db.schedule_transactions(origin.clone(), &db_name);
916 db.can_notify_txn_maybe_commit(txn)
917 } else {
918 false
919 };
920 if should_notify {
921 self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
922 origin,
923 db_name,
924 txn,
925 });
926 }
927 },
928 IndexedDBThreadMsg::CollectMemoryReport(sender) => {
929 let reports = self.collect_memory_reports();
930 sender.send(ProcessReports::new(reports));
931 },
932 }
933 }
934 }
935
936 fn dispatch_txn_maybe_commit(&self, origin: ImmutableOrigin, db_name: String, txn: u64) {
937 let key = IndexedDBDescription {
938 origin,
939 name: db_name.clone(),
940 };
941 let Some(connections) = self.connections.get(&key) else {
942 return;
943 };
944 for connection in connections.values() {
945 if connection.close_pending {
946 continue;
947 }
948 let _ = connection.sender.send(ConnectionMsg::TxnMaybeCommit {
949 db_name: db_name.clone(),
950 txn,
951 });
952 }
953 }
954
955 fn handle_txn_maybe_commit(&mut self, origin: ImmutableOrigin, db_name: String, txn: u64) {
956 let key = IndexedDBDescription {
957 origin: origin.clone(),
958 name: db_name.clone(),
959 };
960 let callbacks = {
961 let Some(db) = self.databases.get_mut(&key) else {
962 return;
963 };
964 if !db.can_commit_now(txn) {
965 return;
966 }
967 db.take_pending_commit_callbacks(txn)
968 };
969
970 if callbacks.is_empty() {
971 self.dispatch_txn_maybe_commit(origin, db_name, txn);
972 return;
973 }
974
975 for callback in callbacks {
976 if callback
977 .send(TxnCompleteMsg {
978 origin: origin.clone(),
979 db_name: db_name.clone(),
980 txn,
981 result: Ok(()),
982 })
983 .is_err()
984 {
985 error!(
986 "Failed to send deferred commit completion for db '{}' txn {}.",
987 db_name, txn
988 );
989 }
990 }
991 }
992
993 fn handle_upgrade_transaction_finished(
995 &mut self,
996 name: String,
997 origin: ImmutableOrigin,
998 txn: u64,
999 committed: bool,
1000 ) {
1001 let key = IndexedDBDescription {
1002 name: name.clone(),
1003 origin: origin.clone(),
1004 };
1005
1006 if committed {
1007 let Some(queue) = self.connection_queues.get_mut(&key) else {
1008 return debug_assert!(false, "A connection queue should exist.");
1009 };
1010 let Some(front) = queue.front() else {
1011 return debug_assert!(false, "A pending open request should exist.");
1012 };
1013 let OpenRequest::Open {
1014 pending_upgrade: Some(pending_upgrade),
1015 ..
1016 } = front
1017 else {
1018 return;
1019 };
1020 if pending_upgrade.transaction != txn {
1021 return;
1022 }
1023
1024 let Some(open_request) = queue.pop_front() else {
1025 return;
1026 };
1027 let OpenRequest::Open {
1028 sender,
1029 db_name,
1030 version: _,
1031 processed: _,
1032 pending_upgrade: Some(pending_upgrade),
1033 pending_close: _,
1034 pending_versionchange: _,
1035 id,
1036 } = open_request
1037 else {
1038 return;
1039 };
1040 let VersionUpgrade { new, .. } = pending_upgrade;
1041 let object_store_names = self
1042 .databases
1043 .get(&key)
1044 .and_then(|db| db.object_store_names().ok())
1045 .unwrap_or_default();
1046 if sender
1047 .send(ConnectionMsg::Connection {
1048 id,
1049 name: db_name,
1050 version: new,
1051 upgraded: true,
1052 object_store_names,
1053 })
1054 .is_err()
1055 {
1056 error!("Failed to send ConnectionMsg::Connection to script.");
1057 };
1058
1059 self.advance_connection_queue(key);
1060 return;
1061 }
1062
1063 let request_id = {
1064 let Some(queue) = self.connection_queues.get_mut(&key) else {
1065 return debug_assert!(false, "A connection queue should exist.");
1066 };
1067 let Some(front) = queue.front() else {
1068 return debug_assert!(false, "A pending open request should exist.");
1069 };
1070 let OpenRequest::Open {
1071 pending_upgrade: Some(pending_upgrade),
1072 id,
1073 ..
1074 } = front
1075 else {
1076 return;
1077 };
1078 if pending_upgrade.transaction != txn {
1079 return;
1080 }
1081 *id
1082 };
1083
1084 self.abort_pending_upgrade(name, request_id, origin);
1085 }
1086
1087 fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
1089 loop {
1090 let is_open = {
1091 let Some(queue) = self.connection_queues.get_mut(&key) else {
1092 return;
1093 };
1094 if queue.is_empty() {
1095 return;
1096 }
1097 queue.front().expect("Queue is not empty.").is_open()
1098 };
1099
1100 if is_open {
1101 self.open_database(key.clone());
1102 } else {
1103 self.delete_database(key.clone());
1104 }
1105
1106 let was_pruned = self.maybe_remove_front_from_queue(&key);
1107
1108 if !was_pruned {
1109 break;
1114 }
1115 }
1116 }
1117
1118 fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
1120 let (is_empty, was_pruned) = {
1121 let Some(queue) = self.connection_queues.get_mut(key) else {
1122 debug_assert!(false, "A connection queue should exist.");
1123 return false;
1124 };
1125 let mut pruned = false;
1126 let front_is_pending = queue.front().map(|record| record.is_pending());
1127 if let Some(is_pending) = front_is_pending {
1128 if !is_pending {
1129 queue.pop_front().expect("Queue has a non-pending item.");
1130 pruned = true
1131 }
1132 }
1133 (queue.is_empty(), pruned)
1134 };
1135 if is_empty {
1136 self.connection_queues.remove(key);
1137 }
1138 was_pruned
1139 }
1140
1141 fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
1142 let is_empty = {
1143 let Some(connections) = self.connections.get_mut(key) else {
1144 return debug!("Connection already removed.");
1145 };
1146 connections.remove(id);
1147 connections.is_empty()
1148 };
1149
1150 if is_empty {
1151 self.connections.remove(key);
1152 }
1153 }
1154
1155 fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
1159 let key = IndexedDBDescription {
1160 name,
1161 origin: origin.clone(),
1162 };
1163 let old = {
1164 let Some(queue) = self.connection_queues.get_mut(&key) else {
1165 return debug_assert!(
1166 false,
1167 "There should be a connection queue for the aborted upgrade."
1168 );
1169 };
1170 let Some(open_request) = queue.pop_front() else {
1171 return debug_assert!(false, "There should be an open request to upgrade.");
1172 };
1173 if open_request.get_id() != id {
1174 return debug_assert!(
1175 false,
1176 "Open request to abort should be at the head of the queue."
1177 );
1178 }
1179 open_request.abort()
1180 };
1181 if let Some(old_version) = old {
1182 if old_version == 0 {
1183 self.databases.remove(&key);
1189 } else {
1190 let Some(db) = self.databases.get_mut(&key) else {
1191 return debug_assert!(false, "Db should have been created");
1192 };
1193 let res = db.set_version(old_version);
1196 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1197 }
1198 }
1199
1200 self.remove_connection(&key, &id);
1201
1202 self.advance_connection_queue(key);
1203 }
1204
1205 fn abort_pending_upgrades(
1209 &mut self,
1210 pending_upgrades: HashMap<String, HashSet<Uuid>>,
1211 origin: ImmutableOrigin,
1212 ) {
1213 for (name, ids) in pending_upgrades.into_iter() {
1214 let mut version_to_revert: Option<u64> = None;
1215 let key = IndexedDBDescription {
1216 name,
1217 origin: origin.clone(),
1218 };
1219 for id in ids.iter() {
1220 self.remove_connection(&key, id);
1221 }
1222 {
1223 let is_empty = {
1224 let Some(queue) = self.connection_queues.get_mut(&key) else {
1225 continue;
1226 };
1227 queue.retain_mut(|open_request| {
1228 if ids.contains(&open_request.get_id()) {
1229 let old = open_request.abort();
1230 if version_to_revert.is_none() {
1231 if let Some(old) = old {
1232 version_to_revert = Some(old);
1233 }
1234 }
1235 false
1236 } else {
1237 true
1238 }
1239 });
1240 queue.is_empty()
1241 };
1242 if is_empty {
1243 self.connection_queues.remove(&key);
1244 }
1245 }
1246 if let Some(version) = version_to_revert {
1247 if version == 0 {
1248 self.databases.remove(&key);
1254 } else {
1255 let Some(db) = self.databases.get_mut(&key) else {
1256 return debug_assert!(false, "Db should have been created");
1257 };
1258 let res = db.set_version(version);
1261 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
1262 }
1263 }
1264 }
1265 }
1266
1267 fn open_a_database_connection(
1269 &mut self,
1270 sender: GenericCallback<ConnectionMsg>,
1271 origin: ImmutableOrigin,
1272 db_name: String,
1273 version: Option<u64>,
1274 id: Uuid,
1275 ) {
1276 let key = IndexedDBDescription {
1277 name: db_name.clone(),
1278 origin: origin.clone(),
1279 };
1280 let open_request = OpenRequest::Open {
1281 sender,
1282 db_name,
1283 version,
1284 processed: false,
1285 pending_close: Default::default(),
1286 pending_versionchange: Default::default(),
1287 pending_upgrade: None,
1288 id,
1289 };
1290 let should_continue = {
1291 let queue = self.connection_queues.entry(key.clone()).or_default();
1293
1294 queue.push_back(open_request);
1296 queue.len() == 1
1297 };
1298
1299 if should_continue {
1301 self.open_database(key.clone());
1302 self.maybe_remove_front_from_queue(&key);
1303 }
1304 }
1305
1306 fn get_database(
1307 &self,
1308 origin: ImmutableOrigin,
1309 db_name: String,
1310 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
1311 let idb_description = IndexedDBDescription {
1312 origin,
1313 name: db_name,
1314 };
1315
1316 self.databases.get(&idb_description)
1317 }
1318
1319 fn get_database_mut(
1320 &mut self,
1321 origin: ImmutableOrigin,
1322 db_name: String,
1323 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
1324 let idb_description = IndexedDBDescription {
1325 origin,
1326 name: db_name,
1327 };
1328
1329 self.databases.get_mut(&idb_description)
1330 }
1331
1332 fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
1336 let Some(queue) = self.connection_queues.get_mut(&key) else {
1337 return debug_assert!(false, "A connection queue should exist.");
1338 };
1339 let Some(open_request) = queue.front_mut() else {
1340 return debug_assert!(false, "An open request should be in the queue.");
1341 };
1342 let OpenRequest::Open {
1343 sender,
1344 db_name,
1345 version: _,
1346 processed,
1347 id,
1348 pending_close: _,
1349 pending_versionchange: _,
1350 pending_upgrade,
1351 } = open_request
1352 else {
1353 return;
1354 };
1355
1356 let db = self
1358 .databases
1359 .get_mut(&key)
1360 .expect("Db should have been opened.");
1361
1362 let transaction = self.serial_number_counter;
1364 self.serial_number_counter += 1;
1365
1366 let scope = db
1368 .object_store_names()
1369 .expect("Fetching object store names should not fail.");
1370
1371 db.register_transaction(transaction, IndexedDBTxnMode::Versionchange, scope.clone());
1374
1375 let old_version = db.version().expect("DB should have a version.");
1382
1383 db.set_version(new_version)
1386 .expect("Setting the version should not fail");
1387
1388 *processed = true;
1390 let _ = pending_upgrade.insert(VersionUpgrade {
1391 old: old_version,
1392 new: new_version,
1393 transaction,
1394 });
1395
1396 if sender
1398 .send(ConnectionMsg::Upgrade {
1399 id: *id,
1400 name: db_name.clone(),
1401 version: new_version,
1402 old_version,
1403 transaction,
1404 object_store_names: scope.clone(),
1405 })
1406 .is_err()
1407 {
1408 error!("Couldn't queue task for indexeddb upgrade event.");
1409 }
1410
1411 }
1414
1415 fn handle_version_change_done(
1417 &mut self,
1418 name: String,
1419 from_id: Uuid,
1420 old_version: u64,
1421 origin: ImmutableOrigin,
1422 ) {
1423 let key = IndexedDBDescription {
1424 name: name.clone(),
1425 origin: origin.clone(),
1426 };
1427 let (can_upgrade, version) = {
1428 let Some(queue) = self.connection_queues.get_mut(&key) else {
1429 return debug_assert!(false, "A connection queue should exist.");
1430 };
1431 let Some(open_request) = queue.front_mut() else {
1432 return debug_assert!(false, "An open request should be in the queue.");
1433 };
1434 let OpenRequest::Open {
1435 sender,
1436 db_name: _,
1437 version,
1438 id,
1439 pending_upgrade: _,
1440 processed: _,
1441 pending_versionchange,
1442 pending_close,
1443 } = open_request
1444 else {
1445 return debug_assert!(
1446 false,
1447 "An request to open a connection should be in the queue."
1448 );
1449 };
1450 debug_assert!(
1451 pending_versionchange.contains(&from_id),
1452 "The open request should be pending on the versionchange event for the connection sending the message."
1453 );
1454
1455 pending_versionchange.remove(&from_id);
1456
1457 if !pending_versionchange.is_empty() {
1459 return;
1460 }
1461
1462 let Some(version) = *version else {
1463 return debug_assert!(
1464 false,
1465 "An upgrade version should have been determined by now."
1466 );
1467 };
1468
1469 if !pending_close.is_empty() &&
1473 sender
1474 .send(ConnectionMsg::Blocked {
1475 name,
1476 id: *id,
1477 version,
1478 old_version,
1479 })
1480 .is_err()
1481 {
1482 return debug!("Script exit during indexeddb database open");
1483 }
1484
1485 (pending_close.is_empty(), version)
1486 };
1487
1488 if can_upgrade {
1491 self.upgrade_database(key.clone(), version);
1493
1494 let was_pruned = self.maybe_remove_front_from_queue(&key);
1495 if was_pruned {
1496 self.advance_connection_queue(key);
1497 }
1498 }
1499 }
1500
1501 fn open_database(&mut self, key: IndexedDBDescription) {
1504 let Some(queue) = self.connection_queues.get_mut(&key) else {
1505 return debug_assert!(false, "A connection queue should exist.");
1506 };
1507 let Some(open_request) = queue.front_mut() else {
1508 return debug_assert!(false, "An open request should be in the queue.");
1509 };
1510 let OpenRequest::Open {
1511 sender,
1512 db_name,
1513 version,
1514 id,
1515 processed,
1516 pending_upgrade: _pending_upgrade,
1517 pending_close,
1518 pending_versionchange,
1519 } = open_request
1520 else {
1521 return debug_assert!(
1522 false,
1523 "An request to open a connection should be in the queue."
1524 );
1525 };
1526
1527 let idb_base_dir = self.idb_base_dir.as_path();
1528 let requested_version = *version;
1529
1530 let db_version = match self.databases.entry(key.clone()) {
1532 Entry::Vacant(e) => {
1533 let engine = match SqliteEngine::new(idb_base_dir, &key, self.thread_pool.clone()) {
1542 Ok(engine) => engine,
1543 Err(err) => {
1544 let error = backend_error_from_sqlite_error(err);
1545 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1546 id: *id,
1547 name: db_name.clone(),
1548 error,
1549 }) {
1550 debug!("Script exit during indexeddb database open {:?}", e);
1551 }
1552 *processed = true;
1553 return;
1554 },
1555 };
1556 let created_db_path = engine.created_db_path();
1557 let db = IndexedDBEnvironment::new(engine, self.manager_sender.clone());
1558 let db_version = match db.version() {
1559 Ok(version) => version,
1560 Err(err) => {
1561 let error = backend_error_from_sqlite_error(err);
1562 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1563 id: *id,
1564 name: db_name.clone(),
1565 error,
1566 }) {
1567 debug!("Script exit during indexeddb database open {:?}", e);
1568 }
1569 *processed = true;
1570 return;
1571 },
1572 };
1573
1574 *version = if created_db_path {
1575 Some(requested_version.unwrap_or(1))
1576 } else {
1577 Some(requested_version.unwrap_or(db_version))
1578 };
1579
1580 e.insert(db);
1581 db_version
1582 },
1583 Entry::Occupied(db) => {
1584 let db_version = match db.get().version() {
1585 Ok(version) => version,
1586 Err(err) => {
1587 let error = backend_error_from_sqlite_error(err);
1588 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
1589 id: *id,
1590 name: db_name.clone(),
1591 error,
1592 }) {
1593 debug!("Script exit during indexeddb database open {:?}", e);
1594 }
1595 *processed = true;
1596 return;
1597 },
1598 };
1599 *version = Some(requested_version.unwrap_or(db_version));
1601 db_version
1602 },
1603 };
1604
1605 let Some(version) = *version else {
1606 return debug_assert!(
1607 false,
1608 "An upgrade version should have been determined by now."
1609 );
1610 };
1611
1612 if version < db_version {
1616 if sender
1617 .send(ConnectionMsg::VersionError {
1618 name: db_name.clone(),
1619 id: *id,
1620 })
1621 .is_err()
1622 {
1623 debug!("Script exit during indexeddb database open");
1624 }
1625 *processed = true;
1626 return;
1627 }
1628
1629 let connection = Connection {
1632 close_pending: false,
1633 sender: sender.clone(),
1634 };
1635 let entry = self.connections.entry(key.clone()).or_default();
1636 entry.insert(*id, connection);
1637
1638 if db_version < version {
1640 let open_connections = entry
1643 .iter_mut()
1644 .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1645 for (id_to_close, conn) in open_connections {
1646 if conn
1650 .sender
1651 .send(ConnectionMsg::VersionChange {
1652 name: db_name.clone(),
1653 id: *id_to_close,
1654 version,
1655 old_version: db_version,
1656 })
1657 .is_err()
1658 {
1659 error!("Failed to send ConnectionMsg::Connection to script.");
1660 };
1661 pending_close.insert(*id_to_close);
1662 pending_versionchange.insert(*id_to_close);
1663 }
1664 if !pending_close.is_empty() {
1665 return;
1667 }
1668
1669 self.upgrade_database(key, version);
1671 return;
1672 }
1673
1674 let object_store_names = self
1676 .databases
1677 .get(&key)
1678 .and_then(|db| db.object_store_names().ok())
1679 .unwrap_or_default();
1680 *processed = true;
1681 if sender
1682 .send(ConnectionMsg::Connection {
1683 name: db_name.clone(),
1684 id: *id,
1685 version: db_version,
1686 upgraded: false,
1687 object_store_names,
1688 })
1689 .is_err()
1690 {
1691 error!("Failed to send ConnectionMsg::Connection to script.");
1692 };
1693 }
1694
1695 fn start_delete_database(
1698 &mut self,
1699 key: IndexedDBDescription,
1700 id: Uuid,
1701 sender: GenericCallback<BackendResult<u64>>,
1702 ) {
1703 let open_request = OpenRequest::Delete {
1704 sender,
1705 _origin: key.origin.clone(),
1706 _db_name: key.name.clone(),
1707 processed: false,
1708 id,
1709 };
1710
1711 let should_continue = {
1712 let queue = self.connection_queues.entry(key.clone()).or_default();
1714
1715 queue.push_back(open_request);
1717 queue.len() == 1
1718 };
1719
1720 if should_continue {
1722 self.delete_database(key.clone());
1723 self.maybe_remove_front_from_queue(&key);
1724 }
1725 }
1726
1727 fn delete_database(&mut self, key: IndexedDBDescription) {
1729 let Some(queue) = self.connection_queues.get_mut(&key) else {
1730 return debug_assert!(false, "A connection queue should exist.");
1731 };
1732 let Some(open_request) = queue.front_mut() else {
1733 return debug_assert!(false, "An open request should be in the queue.");
1734 };
1735 let OpenRequest::Delete {
1736 sender,
1737 _origin: _,
1738 _db_name: _,
1739 processed,
1740 id: _,
1741 } = open_request
1742 else {
1743 return debug_assert!(
1744 false,
1745 "An request to open a connection should be in the queue."
1746 );
1747 };
1748
1749 let version = if let Some(db) = self.databases.remove(&key) {
1751 let res = db.version();
1764 let Ok(version) = res else {
1765 *processed = true;
1766 if sender
1767 .send(BackendResult::Err(BackendError::DbErr(
1768 res.unwrap_err().to_string(),
1769 )))
1770 .is_err()
1771 {
1772 debug!("Script went away during pending database delete.");
1773 }
1774 return;
1775 };
1776
1777 if let Err(err) = db.delete_database() {
1781 *processed = true;
1782 if sender
1783 .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1784 .is_err()
1785 {
1786 debug!("Script went away during pending database delete.");
1787 }
1788 return;
1789 };
1790
1791 version
1792 } else {
1793 0
1794 };
1795
1796 if sender.send(BackendResult::Ok(version)).is_err() {
1798 debug!("Script went away during pending database delete.");
1799 }
1800
1801 *processed = true;
1802 }
1803
1804 fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1806 let key = IndexedDBDescription { origin, name };
1823 let (can_upgrade, version) = {
1824 self.remove_connection(&key, &id);
1825
1826 let Some(queue) = self.connection_queues.get_mut(&key) else {
1827 return;
1828 };
1829 let Some(open_request) = queue.front_mut() else {
1830 return;
1831 };
1832 if let OpenRequest::Open {
1833 sender: _,
1834 db_name: _,
1835 version,
1836 id: _,
1837 processed: _,
1838 pending_upgrade,
1839 pending_versionchange,
1840 pending_close,
1841 } = open_request
1842 {
1843 pending_close.remove(&id);
1844 (
1845 pending_close.is_empty() &&
1847 pending_versionchange.is_empty() &&
1848 !pending_upgrade.is_some(),
1849 *version,
1850 )
1851 } else {
1852 (false, None)
1853 }
1854 };
1855
1856 if can_upgrade {
1862 let Some(version) = version else {
1864 return debug_assert!(
1865 false,
1866 "An upgrade version should have been determined by now."
1867 );
1868 };
1869 self.upgrade_database(key.clone(), version);
1870
1871 let was_pruned = self.maybe_remove_front_from_queue(&key);
1872 if was_pruned {
1873 self.advance_connection_queue(key);
1874 }
1875 }
1876 }
1877
1878 fn handle_sync_operation(&mut self, operation: SyncOperation) {
1879 match operation {
1880 SyncOperation::GetDatabases(sender, origin) => {
1881 let info_list: Vec<DatabaseInfo> = self
1892 .databases
1893 .iter()
1894 .filter_map(|(description, info)| {
1895 if let Ok(version) = info.version() {
1897 if version == 0 {
1899 None
1900 } else {
1901 if description.origin == origin {
1906 Some(DatabaseInfo {
1907 name: description.name.clone(),
1908 version,
1909 })
1910 } else {
1911 None
1912 }
1913 }
1914 } else {
1915 None
1916 }
1917 })
1918 .collect();
1919
1920 let result = Ok(info_list);
1924
1925 if sender.send(result).is_err() {
1927 debug!("Couldn't send SyncOperation::GetDatabases reply.");
1928 }
1929 },
1930 SyncOperation::CloseDatabase(origin, id, db_name) => {
1931 self.close_database(origin, id, db_name);
1932 },
1933 SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1934 self.open_a_database_connection(sender, origin, db_name, version, id);
1935 },
1936 SyncOperation::AbortPendingUpgrades {
1937 pending_upgrades,
1938 origin,
1939 } => {
1940 self.abort_pending_upgrades(pending_upgrades, origin);
1941 },
1942 SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1943 self.abort_pending_upgrade(name, id, origin);
1944 },
1945 SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1946 let idb_description = IndexedDBDescription {
1947 origin,
1948 name: db_name,
1949 };
1950 self.start_delete_database(idb_description, id, callback);
1951 },
1952 SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1953 let result = self.get_database(origin, db_name).map(|db| {
1955 let key_generator_current_number = db.key_generator_current_number(&store_name);
1956 IndexedDBObjectStore {
1957 key_path: db.key_path(&store_name),
1958 has_key_generator: key_generator_current_number.is_some(),
1959 key_generator_current_number,
1960 indexes: db.indexes(&store_name).unwrap_or_default(),
1961 name: store_name,
1962 }
1963 });
1964 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1965 },
1966 SyncOperation::CreateIndex(
1967 origin,
1968 db_name,
1969 store_name,
1970 index_name,
1971 key_path,
1972 unique,
1973 multi_entry,
1974 ) => {
1975 if let Some(db) = self.get_database(origin, db_name) {
1976 let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1977 }
1978 },
1979 SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
1980 if let Some(db) = self.get_database(origin, db_name) {
1981 let _ = db.delete_index(&store_name, index_name);
1982 }
1983 },
1984 SyncOperation::Commit(callback, origin, db_name, txn) => {
1985 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
1989 if db.can_commit_now(txn) {
1990 if callback
1991 .send(TxnCompleteMsg {
1992 origin: origin.clone(),
1993 db_name: db_name.clone(),
1994 txn,
1995 result: Ok(()),
1996 })
1997 .is_err()
1998 {
1999 error!(
2000 "Failed to send immediate commit completion for db '{}' txn {}.",
2001 db_name, txn
2002 );
2003 }
2004 } else {
2005 db.queue_pending_commit_callback(txn, callback);
2006 }
2007 db.schedule_transactions(origin.clone(), &db_name);
2008 } else if callback
2009 .send(TxnCompleteMsg {
2010 origin: origin.clone(),
2011 db_name: db_name.clone(),
2012 txn,
2013 result: Ok(()),
2016 })
2017 .is_err()
2018 {
2019 error!(
2020 "Failed to send commit completion for missing db '{}' txn {}.",
2021 db_name, txn
2022 );
2023 }
2024 },
2025 SyncOperation::Abort(abort_callback, origin, db_name, txn) => {
2026 let pending_commit_callbacks =
2030 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2031 let callbacks = db.take_pending_commit_callbacks(txn);
2032 db.abort_transaction(txn);
2033 callbacks
2034 } else {
2035 Vec::new()
2036 };
2037 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2038 db.schedule_transactions(origin.clone(), &db_name);
2039 }
2040 for callback in pending_commit_callbacks {
2041 if callback
2042 .send(storage_traits::indexeddb::TxnCompleteMsg {
2043 origin: origin.clone(),
2044 db_name: db_name.clone(),
2045 txn,
2046 result: Err(BackendError::Abort),
2047 })
2048 .is_err()
2049 {
2050 error!(
2051 "Failed to send deferred abort completion for db '{}' txn {}.",
2052 db_name, txn
2053 );
2054 }
2055 }
2056 if abort_callback
2057 .send(storage_traits::indexeddb::TxnCompleteMsg {
2058 origin: origin.clone(),
2059 db_name: db_name.clone(),
2060 txn,
2061 result: Err(BackendError::Abort),
2062 })
2063 .is_err()
2064 {
2065 error!(
2066 "Failed to send abort completion for db '{}' txn {}.",
2067 db_name, txn
2068 );
2069 }
2070 },
2071 SyncOperation::UpgradeTransactionFinished {
2072 origin,
2073 db_name,
2074 txn,
2075 committed,
2076 } => {
2077 self.handle_upgrade_transaction_finished(db_name, origin, txn, committed);
2078 },
2079 SyncOperation::RequestHandled {
2080 origin,
2081 db_name,
2082 txn,
2083 request_id,
2084 } => {
2085 let should_notify =
2092 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2093 db.mark_request_handled(txn, request_id);
2094 db.can_notify_txn_maybe_commit(txn)
2095 } else {
2096 false
2097 };
2098 if should_notify {
2099 self.handle_sync_operation(SyncOperation::TxnMaybeCommit {
2100 origin,
2101 db_name,
2102 txn,
2103 });
2104 }
2105 },
2106 SyncOperation::TxnMaybeCommit {
2107 origin,
2108 db_name,
2109 txn,
2110 } => {
2111 self.handle_txn_maybe_commit(origin, db_name, txn);
2112 },
2113 SyncOperation::TransactionFinished {
2114 origin,
2115 db_name,
2116 txn,
2117 } => {
2118 let maybe_commit_txns =
2119 if let Some(db) = self.get_database_mut(origin.clone(), db_name.clone()) {
2120 db.finish_transaction(txn);
2121 db.schedule_transactions(origin.clone(), &db_name);
2122 db.maybe_commit_candidates()
2123 } else {
2124 Vec::new()
2125 };
2126 for candidate in maybe_commit_txns {
2127 self.handle_txn_maybe_commit(origin.clone(), db_name.clone(), candidate);
2128 }
2129 },
2130 SyncOperation::CreateTransaction {
2131 sender,
2132 origin,
2133 db_name,
2134 mode,
2135 scope,
2136 } => {
2137 let key = IndexedDBDescription {
2138 origin: origin.clone(),
2139 name: db_name.clone(),
2140 };
2141 if let Some(db) = self.databases.get_mut(&key) {
2142 let transaction_id = self.serial_number_counter;
2143 self.serial_number_counter += 1;
2144 db.register_transaction(transaction_id, mode, scope);
2145 db.schedule_transactions(origin, &db_name);
2146 let _ = sender.send(Ok(transaction_id));
2147 } else {
2148 let _ = sender.send(Err(BackendError::DbNotFound));
2149 }
2150 },
2151 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
2152 if let Some(db) = self.get_database_mut(origin, db_name) {
2153 if version > db.version().unwrap_or(0) {
2154 let _ = db.set_version(version);
2155 }
2156 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2158 } else {
2159 let _ = sender.send(Err(BackendError::DbNotFound));
2160 }
2161 },
2162 SyncOperation::CreateObjectStore(
2163 sender,
2164 origin,
2165 db_name,
2166 store_name,
2167 key_paths,
2168 auto_increment,
2169 ) => {
2170 if let Some(db) = self.get_database_mut(origin, db_name) {
2171 let result = db.create_object_store(&store_name, key_paths, auto_increment);
2172 let _ = sender.send(result.map_err(BackendError::from));
2173 } else {
2174 let _ = sender.send(Err(BackendError::DbNotFound));
2175 }
2176 },
2177 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
2178 if let Some(db) = self.get_database_mut(origin, db_name) {
2179 let result = db.delete_object_store(&store_name);
2180 let _ = sender.send(result.map_err(BackendError::from));
2181 } else {
2182 let _ = sender.send(Err(BackendError::DbNotFound));
2183 }
2184 },
2185 SyncOperation::Version(sender, origin, db_name) => {
2186 if let Some(db) = self.get_database(origin, db_name) {
2187 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
2188 } else {
2189 let _ = sender.send(Err(BackendError::DbNotFound));
2190 }
2191 },
2192 SyncOperation::NotifyEndOfVersionChange {
2193 name,
2194 id,
2195 old_version,
2196 origin,
2197 } => {
2198 self.handle_version_change_done(name, id, old_version, origin);
2199 },
2200 SyncOperation::Exit(_) => {
2201 unreachable!("We must've already broken out of event loop.");
2202 },
2203 }
2204 }
2205
2206 fn collect_memory_reports(&self) -> Vec<Report> {
2207 let mut reports = vec![];
2208 perform_memory_report(|ops| {
2209 reports.push(Report {
2210 path: path!["indexeddb"],
2211 kind: ReportKind::ExplicitJemallocHeapSize,
2212 size: self.connections.size_of(ops) +
2213 self.databases.size_of(ops) +
2214 self.connection_queues.size_of(ops),
2215 });
2216 });
2217 reports
2218 }
2219}