1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use profile_traits::generic_callback::GenericCallback;
18use rusqlite::Error as RusqliteError;
19use rustc_hash::FxHashMap;
20use servo_config::pref;
21use servo_url::origin::ImmutableOrigin;
22use storage_traits::indexeddb::{
23 AsyncOperation, BackendError, BackendResult, ConnectionMsg, CreateObjectResult, DatabaseInfo,
24 DbResult, IndexedDBIndex, IndexedDBObjectStore, IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath,
25 SyncOperation,
26};
27use uuid::Uuid;
28
29use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
30use crate::shared::is_sqlite_disk_full_error;
31
32pub trait IndexedDBThreadFactory {
33 fn new(config_dir: Option<PathBuf>) -> Self;
34}
35
36impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
37 fn new(config_dir: Option<PathBuf>) -> GenericSender<IndexedDBThreadMsg> {
38 let (chan, port) = generic_channel::channel().unwrap();
39
40 let mut idb_base_dir = PathBuf::new();
41 if let Some(p) = config_dir {
42 idb_base_dir.push(p);
43 }
44 idb_base_dir.push("IndexedDB");
45
46 thread::Builder::new()
47 .name("IndexedDBManager".to_owned())
48 .spawn(move || {
49 IndexedDBManager::new(port, idb_base_dir).start();
50 })
51 .expect("Thread spawning failed");
52
53 chan
54 }
55}
56
57#[derive(Clone, Debug, Eq, Hash, PartialEq)]
60pub struct IndexedDBDescription {
61 pub origin: ImmutableOrigin,
62 pub name: String,
63}
64
65impl IndexedDBDescription {
66 const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
68 0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
69 0x5d,
70 ]);
71 pub(super) fn as_path(&self) -> PathBuf {
74 let mut path = PathBuf::new();
75
76 let origin_uuid = Uuid::new_v5(
78 Self::NAMESPACE_SERVO_IDB,
79 self.origin.ascii_serialization().as_bytes(),
80 );
81 let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
82 path.push(origin_uuid.to_string());
83 path.push(db_name_uuid.to_string());
84
85 path
86 }
87}
88
89struct IndexedDBEnvironment<E: KvsEngine> {
90 engine: E,
91 transactions: FxHashMap<u64, KvsTransaction>,
92}
93
94impl<E: KvsEngine> IndexedDBEnvironment<E> {
95 fn new(engine: E) -> IndexedDBEnvironment<E> {
96 IndexedDBEnvironment {
97 engine,
98 transactions: FxHashMap::default(),
99 }
100 }
101
102 fn queue_operation(
103 &mut self,
104 store_name: &str,
105 serial_number: u64,
106 mode: IndexedDBTxnMode,
107 operation: AsyncOperation,
108 ) {
109 self.transactions
110 .entry(serial_number)
111 .or_insert_with(|| KvsTransaction {
112 requests: VecDeque::new(),
113 mode,
114 })
115 .requests
116 .push_back(KvsOperation {
117 operation,
118 store_name: String::from(store_name),
119 });
120 }
121
122 fn start_transaction(&mut self, txn: u64, sender: Option<GenericSender<BackendResult<()>>>) {
124 if let Some(txn) = self.transactions.remove(&txn) {
127 let _ = self.engine.process_transaction(txn).blocking_recv();
128 }
129
130 if let Some(sender) = sender {
133 if sender.send(Ok(())).is_err() {
134 warn!("IDBTransaction starter dropped its channel");
135 }
136 }
137 }
138
139 fn has_key_generator(&self, store_name: &str) -> bool {
140 self.engine.has_key_generator(store_name)
141 }
142
143 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
144 self.engine.key_path(store_name)
145 }
146
147 fn indexes(&self, store_name: &str) -> DbResult<Vec<IndexedDBIndex>> {
148 self.engine
149 .indexes(store_name)
150 .map_err(|err| format!("{err:?}"))
151 }
152
153 fn create_index(
154 &self,
155 store_name: &str,
156 index_name: String,
157 key_path: KeyPath,
158 unique: bool,
159 multi_entry: bool,
160 ) -> DbResult<CreateObjectResult> {
161 self.engine
162 .create_index(store_name, index_name, key_path, unique, multi_entry)
163 .map_err(|err| format!("{err:?}"))
164 }
165
166 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
167 self.engine
168 .delete_index(store_name, index_name)
169 .map_err(|err| format!("{err:?}"))
170 }
171
172 fn create_object_store(
173 &mut self,
174 store_name: &str,
175 key_path: Option<KeyPath>,
176 auto_increment: bool,
177 ) -> DbResult<CreateObjectResult> {
178 self.engine
179 .create_store(store_name, key_path, auto_increment)
180 .map_err(|err| format!("{err:?}"))
181 }
182
183 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
184 let result = self.engine.delete_store(store_name);
185 result.map_err(|err| format!("{err:?}"))
186 }
187
188 fn delete_database(self) -> BackendResult<()> {
189 let result = self.engine.delete_database();
190 result
191 .map_err(|err| format!("{err:?}"))
192 .map_err(BackendError::from)
193 }
194
195 fn version(&self) -> Result<u64, E::Error> {
196 self.engine.version()
197 }
198
199 fn set_version(&mut self, version: u64) -> DbResult<()> {
200 self.engine
201 .set_version(version)
202 .map_err(|err| format!("{err:?}"))
203 }
204}
205
206fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
207 if is_sqlite_disk_full_error(&err) {
208 BackendError::QuotaExceeded
209 } else {
210 BackendError::DbErr(format!("{err:?}"))
211 }
212}
213
214enum OpenRequest {
218 Open {
219 sender: GenericCallback<ConnectionMsg>,
221
222 db_name: String,
224
225 version: Option<u64>,
228
229 pending_upgrade: Option<VersionUpgrade>,
232
233 pending_close: HashSet<Uuid>,
235
236 pending_versionchange: HashSet<Uuid>,
241
242 id: Uuid,
243 },
244 Delete {
245 sender: GenericCallback<BackendResult<u64>>,
247
248 _origin: ImmutableOrigin,
252
253 _db_name: String,
256
257 processed: bool,
259
260 id: Uuid,
261 },
262}
263
264impl OpenRequest {
265 fn get_id(&self) -> Uuid {
266 let id = match self {
267 OpenRequest::Open {
268 sender: _,
269 db_name: _,
270 version: _,
271 pending_upgrade: _,
272 pending_close: _,
273 pending_versionchange: _,
274 id,
275 } => id,
276 OpenRequest::Delete {
277 sender: _,
278 _origin: _,
279 _db_name: _,
280 processed: _,
281 id,
282 } => id,
283 };
284 *id
285 }
286
287 fn is_open(&self) -> bool {
288 match self {
289 OpenRequest::Open {
290 sender: _,
291 db_name: _,
292 version: _,
293 pending_upgrade: _,
294 pending_close: _,
295 pending_versionchange: _,
296 id: _,
297 } => true,
298 OpenRequest::Delete {
299 sender: _,
300 _origin: _,
301 _db_name: _,
302 processed: _,
303 id: _,
304 } => false,
305 }
306 }
307
308 fn is_pending(&self) -> bool {
311 match self {
312 OpenRequest::Open {
313 sender: _,
314 db_name: _,
315 version: _,
316 pending_upgrade,
317 pending_close,
318 pending_versionchange,
319 id: _,
320 } => {
321 pending_upgrade.is_some() ||
322 !pending_close.is_empty() ||
323 !pending_versionchange.is_empty()
324 },
325 OpenRequest::Delete {
326 sender: _,
327 _origin: _,
328 _db_name: _,
329 processed,
330 id: _,
331 } => !processed,
332 }
333 }
334
335 fn abort(&self) -> Option<u64> {
338 match self {
339 OpenRequest::Open {
340 sender,
341 db_name,
342 version: _,
343 pending_close: _,
344 pending_versionchange: _,
345 pending_upgrade,
346 id,
347 } => {
348 if sender
349 .send(ConnectionMsg::AbortError {
350 name: db_name.clone(),
351 id: *id,
352 })
353 .is_err()
354 {
355 error!("Failed to send ConnectionMsg::Connection to script.");
356 };
357 pending_upgrade.as_ref().map(|upgrade| upgrade.old)
358 },
359 OpenRequest::Delete {
360 sender,
361 _origin: _,
362 _db_name: _,
363 processed: _,
364 id: _,
365 } => {
366 if sender.send(Err(BackendError::DbNotFound)).is_err() {
367 error!("Failed to send result of database delete to script.");
368 };
369 None
370 },
371 }
372 }
373}
374
375struct VersionUpgrade {
376 old: u64,
377 new: u64,
378}
379
380struct Connection {
382 close_pending: bool,
384
385 sender: GenericCallback<ConnectionMsg>,
387}
388
389struct IndexedDBManager {
390 port: GenericReceiver<IndexedDBThreadMsg>,
391 idb_base_dir: PathBuf,
392 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
393 thread_pool: Arc<ThreadPool>,
394
395 serial_number_counter: u64,
400
401 connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
403
404 connections: HashMap<IndexedDBDescription, HashMap<Uuid, Connection>>,
406}
407
408impl IndexedDBManager {
409 fn new(port: GenericReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
410 debug!("New indexedDBManager");
411
412 let thread_count = thread::available_parallelism()
416 .map(|i| i.get())
417 .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
418 .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
419
420 IndexedDBManager {
421 port,
422 idb_base_dir,
423 databases: HashMap::new(),
424 thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
425 serial_number_counter: 0,
426 connection_queues: Default::default(),
427 connections: Default::default(),
428 }
429 }
430}
431
432impl IndexedDBManager {
433 fn start(&mut self) {
434 loop {
435 let message = match self.port.recv() {
438 Ok(msg) => msg,
439 Err(ReceiveError::Disconnected) => {
440 break;
441 },
442 Err(e) => {
443 warn!("Error in IndexedDB thread: {e:?}");
444 continue;
445 },
446 };
447 match message {
448 IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
449 let _ = sender.send(());
450 break;
451 },
452 IndexedDBThreadMsg::Sync(operation) => {
453 self.handle_sync_operation(operation);
454 },
455 IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
456 if let Some(db) = self.get_database_mut(origin, db_name) {
457 db.queue_operation(&store_name, txn, mode, operation);
459 db.start_transaction(txn, None);
464 }
465 },
466 IndexedDBThreadMsg::OpenTransactionInactive { name, origin } => {
467 self.handle_open_transaction_inactive(name, origin);
468 },
469 }
470 }
471 }
472
473 fn handle_open_transaction_inactive(&mut self, name: String, origin: ImmutableOrigin) {
475 let key = IndexedDBDescription { name, origin };
476 let Some(queue) = self.connection_queues.get_mut(&key) else {
477 return debug_assert!(false, "A connection queue should exist.");
478 };
479 let Some(open_request) = queue.pop_front() else {
480 return debug_assert!(false, "A pending open request should exist.");
481 };
482 let OpenRequest::Open {
483 sender,
484 db_name,
485 version: _,
486 pending_upgrade,
487 pending_close: _,
488 pending_versionchange: _,
489 id,
490 } = open_request
491 else {
492 return;
493 };
494 let Some(VersionUpgrade { old: _, new }) = pending_upgrade else {
495 return debug_assert!(false, "A pending version upgrade should exist.");
496 };
497 if sender
498 .send(ConnectionMsg::Connection {
499 id,
500 name: db_name,
501 version: new,
502 upgraded: true,
503 })
504 .is_err()
505 {
506 error!("Failed to send ConnectionMsg::Connection to script.");
507 };
508
509 self.advance_connection_queue(key);
510 }
511
512 fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
514 loop {
515 let is_open = {
516 let Some(queue) = self.connection_queues.get_mut(&key) else {
517 return;
518 };
519 if queue.is_empty() {
520 return;
521 }
522 queue.front().expect("Queue is not empty.").is_open()
523 };
524
525 if is_open {
526 self.open_database(key.clone());
527 } else {
528 self.delete_database(key.clone());
529 }
530
531 let was_pruned = self.maybe_remove_front_from_queue(&key);
532
533 if !was_pruned {
534 break;
539 }
540 }
541 }
542
543 fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
545 let (is_empty, was_pruned) = {
546 let Some(queue) = self.connection_queues.get_mut(key) else {
547 debug_assert!(false, "A connection queue should exist.");
548 return false;
549 };
550 let mut pruned = false;
551 let front_is_pending = queue.front().map(|record| record.is_pending());
552 if let Some(is_pending) = front_is_pending {
553 if !is_pending {
554 queue.pop_front().expect("Queue has a non-pending item.");
555 pruned = true
556 }
557 }
558 (queue.is_empty(), pruned)
559 };
560 if is_empty {
561 self.connection_queues.remove(key);
562 }
563 was_pruned
564 }
565
566 fn remove_connection(&mut self, key: &IndexedDBDescription, id: &Uuid) {
567 let is_empty = {
568 let Some(connections) = self.connections.get_mut(key) else {
569 return debug!("Connection already removed.");
570 };
571 connections.remove(id);
572 connections.is_empty()
573 };
574
575 if is_empty {
576 self.connections.remove(key);
577 }
578 }
579
580 fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
584 let key = IndexedDBDescription {
585 name,
586 origin: origin.clone(),
587 };
588 let old = {
589 let Some(queue) = self.connection_queues.get_mut(&key) else {
590 return debug_assert!(
591 false,
592 "There should be a connection queue for the aborted upgrade."
593 );
594 };
595 let Some(open_request) = queue.pop_front() else {
596 return debug_assert!(false, "There should be an open request to upgrade.");
597 };
598 if open_request.get_id() != id {
599 return debug_assert!(
600 false,
601 "Open request to abort should be at the head of the queue."
602 );
603 }
604 open_request.abort()
605 };
606 if let Some(old_version) = old {
607 let Some(db) = self.databases.get_mut(&key) else {
608 return debug_assert!(false, "Db should have been created");
609 };
610 let res = db.set_version(old_version);
613 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
614 }
615
616 self.remove_connection(&key, &id);
617
618 self.advance_connection_queue(key);
619 }
620
621 fn abort_pending_upgrades(
625 &mut self,
626 pending_upgrades: HashMap<String, HashSet<Uuid>>,
627 origin: ImmutableOrigin,
628 ) {
629 for (name, ids) in pending_upgrades.into_iter() {
630 let mut version_to_revert: Option<u64> = None;
631 let key = IndexedDBDescription {
632 name,
633 origin: origin.clone(),
634 };
635 for id in ids.iter() {
636 self.remove_connection(&key, id);
637 }
638 {
639 let is_empty = {
640 let Some(queue) = self.connection_queues.get_mut(&key) else {
641 continue;
642 };
643 queue.retain_mut(|open_request| {
644 if ids.contains(&open_request.get_id()) {
645 let old = open_request.abort();
646 if version_to_revert.is_none() {
647 if let Some(old) = old {
648 version_to_revert = Some(old);
649 }
650 }
651 false
652 } else {
653 true
654 }
655 });
656 queue.is_empty()
657 };
658 if is_empty {
659 self.connection_queues.remove(&key);
660 }
661 }
662 if let Some(version) = version_to_revert {
663 let Some(db) = self.databases.get_mut(&key) else {
664 return debug_assert!(false, "Db should have been created");
665 };
666 let res = db.set_version(version);
669 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
670 }
671 }
672 }
673
674 fn open_a_database_connection(
676 &mut self,
677 sender: GenericCallback<ConnectionMsg>,
678 origin: ImmutableOrigin,
679 db_name: String,
680 version: Option<u64>,
681 id: Uuid,
682 ) {
683 let key = IndexedDBDescription {
684 name: db_name.clone(),
685 origin: origin.clone(),
686 };
687 let open_request = OpenRequest::Open {
688 sender,
689 db_name,
690 version,
691 pending_close: Default::default(),
692 pending_versionchange: Default::default(),
693 pending_upgrade: None,
694 id,
695 };
696 let should_continue = {
697 let queue = self.connection_queues.entry(key.clone()).or_default();
699
700 queue.push_back(open_request);
702 queue.len() == 1
703 };
704
705 if should_continue {
707 self.open_database(key.clone());
708 self.maybe_remove_front_from_queue(&key);
709 }
710 }
711
712 fn get_database(
713 &self,
714 origin: ImmutableOrigin,
715 db_name: String,
716 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
717 let idb_description = IndexedDBDescription {
718 origin,
719 name: db_name,
720 };
721
722 self.databases.get(&idb_description)
723 }
724
725 fn get_database_mut(
726 &mut self,
727 origin: ImmutableOrigin,
728 db_name: String,
729 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
730 let idb_description = IndexedDBDescription {
731 origin,
732 name: db_name,
733 };
734
735 self.databases.get_mut(&idb_description)
736 }
737
738 fn upgrade_database(&mut self, key: IndexedDBDescription, new_version: u64) {
743 let Some(queue) = self.connection_queues.get_mut(&key) else {
744 return debug_assert!(false, "A connection queue should exist.");
745 };
746 let Some(open_request) = queue.front_mut() else {
747 return debug_assert!(false, "An open request should be in the queue.");
748 };
749 let OpenRequest::Open {
750 sender,
751 db_name,
752 version: _,
753 id,
754 pending_close: _,
755 pending_versionchange: _,
756 pending_upgrade,
757 } = open_request
758 else {
759 return;
760 };
761
762 let db = self
765 .databases
766 .get_mut(&key)
767 .expect("Db should have been opened.");
768
769 let transaction_id = self.serial_number_counter;
771 self.serial_number_counter += 1;
772
773 let old_version = db.version().expect("DB should have a version.");
781
782 db.set_version(new_version)
787 .expect("Setting the version should not fail");
788
789 if sender
794 .send(ConnectionMsg::Upgrade {
795 id: *id,
796 name: db_name.clone(),
797 version: new_version,
798 old_version,
799 transaction: transaction_id,
800 })
801 .is_err()
802 {
803 error!("Couldn't queue task for indexeddb upgrade event.");
804 }
805
806 let _ = pending_upgrade.insert(VersionUpgrade {
808 old: old_version,
809 new: new_version,
810 });
811 }
812
813 fn handle_version_change_done(
815 &mut self,
816 name: String,
817 from_id: Uuid,
818 old_version: u64,
819 origin: ImmutableOrigin,
820 ) {
821 let key = IndexedDBDescription {
822 name: name.clone(),
823 origin: origin.clone(),
824 };
825 let (can_upgrade, version) = {
826 let Some(queue) = self.connection_queues.get_mut(&key) else {
827 return debug_assert!(false, "A connection queue should exist.");
828 };
829 let Some(open_request) = queue.front_mut() else {
830 return debug_assert!(false, "An open request should be in the queue.");
831 };
832 let OpenRequest::Open {
833 sender,
834 db_name: _,
835 version,
836 id,
837 pending_upgrade: _,
838 pending_versionchange,
839 pending_close,
840 } = open_request
841 else {
842 return debug_assert!(
843 false,
844 "An request to open a connection should be in the queue."
845 );
846 };
847 debug_assert!(
848 pending_versionchange.contains(&from_id),
849 "The open request should be pending on the versionchange event for the connection sending the message."
850 );
851
852 pending_versionchange.remove(&from_id);
853
854 if !pending_versionchange.is_empty() {
856 return;
857 }
858
859 let Some(version) = *version else {
860 return debug_assert!(
861 false,
862 "An upgrade version should have been determined by now."
863 );
864 };
865
866 if !pending_close.is_empty() &&
870 sender
871 .send(ConnectionMsg::Blocked {
872 name,
873 id: *id,
874 version,
875 old_version,
876 })
877 .is_err()
878 {
879 return debug!("Script exit during indexeddb database open");
880 }
881
882 (pending_close.is_empty(), version)
883 };
884
885 if can_upgrade {
888 self.upgrade_database(key.clone(), version);
890
891 let was_pruned = self.maybe_remove_front_from_queue(&key);
892 if was_pruned {
893 self.advance_connection_queue(key);
894 }
895 }
896 }
897
898 fn open_database(&mut self, key: IndexedDBDescription) {
901 let Some(queue) = self.connection_queues.get_mut(&key) else {
902 return debug_assert!(false, "A connection queue should exist.");
903 };
904 let Some(open_request) = queue.front_mut() else {
905 return debug_assert!(false, "An open request should be in the queue.");
906 };
907 let OpenRequest::Open {
908 sender,
909 db_name,
910 version,
911 id,
912 pending_upgrade: _,
913 pending_close,
914 pending_versionchange,
915 } = open_request
916 else {
917 return debug_assert!(
918 false,
919 "An request to open a connection should be in the queue."
920 );
921 };
922
923 let idb_base_dir = self.idb_base_dir.as_path();
924 let requested_version = *version;
925
926 let db_version = match self.databases.entry(key.clone()) {
928 Entry::Vacant(e) => {
929 let engine = match SqliteEngine::new(idb_base_dir, &key, self.thread_pool.clone()) {
938 Ok(engine) => engine,
939 Err(err) => {
940 let error = backend_error_from_sqlite_error(err);
941 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
942 id: *id,
943 name: db_name.clone(),
944 error,
945 }) {
946 debug!("Script exit during indexeddb database open {:?}", e);
947 }
948 return;
949 },
950 };
951 let created_db_path = engine.created_db_path();
952 let db = IndexedDBEnvironment::new(engine);
953 let db_version = match db.version() {
954 Ok(version) => version,
955 Err(err) => {
956 let error = backend_error_from_sqlite_error(err);
957 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
958 id: *id,
959 name: db_name.clone(),
960 error,
961 }) {
962 debug!("Script exit during indexeddb database open {:?}", e);
963 }
964 return;
965 },
966 };
967
968 *version = if created_db_path {
969 Some(requested_version.unwrap_or(1))
970 } else {
971 Some(requested_version.unwrap_or(db_version))
972 };
973
974 e.insert(db);
975 db_version
976 },
977 Entry::Occupied(db) => {
978 let db_version = match db.get().version() {
979 Ok(version) => version,
980 Err(err) => {
981 let error = backend_error_from_sqlite_error(err);
982 if let Err(e) = sender.send(ConnectionMsg::DatabaseError {
983 id: *id,
984 name: db_name.clone(),
985 error,
986 }) {
987 debug!("Script exit during indexeddb database open {:?}", e);
988 }
989 return;
990 },
991 };
992 *version = Some(requested_version.unwrap_or(db_version));
994 db_version
995 },
996 };
997
998 let Some(version) = *version else {
999 return debug_assert!(
1000 false,
1001 "An upgrade version should have been determined by now."
1002 );
1003 };
1004
1005 if version < db_version {
1009 if sender
1010 .send(ConnectionMsg::VersionError {
1011 name: db_name.clone(),
1012 id: *id,
1013 })
1014 .is_err()
1015 {
1016 debug!("Script exit during indexeddb database open");
1017 }
1018 return;
1019 }
1020
1021 let connection = Connection {
1024 close_pending: false,
1025 sender: sender.clone(),
1026 };
1027 let entry = self.connections.entry(key.clone()).or_default();
1028 entry.insert(*id, connection);
1029
1030 if db_version < version {
1032 let open_connections = entry
1035 .iter_mut()
1036 .filter(|(other_id, conn)| !conn.close_pending && *other_id != id);
1037 for (id_to_close, conn) in open_connections {
1038 if conn
1042 .sender
1043 .send(ConnectionMsg::VersionChange {
1044 name: db_name.clone(),
1045 id: *id_to_close,
1046 version,
1047 old_version: db_version,
1048 })
1049 .is_err()
1050 {
1051 error!("Failed to send ConnectionMsg::Connection to script.");
1052 };
1053 pending_close.insert(*id_to_close);
1054 pending_versionchange.insert(*id_to_close);
1055 }
1056 if !pending_close.is_empty() {
1057 return;
1059 }
1060
1061 self.upgrade_database(key, version);
1063 return;
1064 }
1065
1066 if sender
1068 .send(ConnectionMsg::Connection {
1069 name: db_name.clone(),
1070 id: *id,
1071 version: db_version,
1072 upgraded: false,
1073 })
1074 .is_err()
1075 {
1076 error!("Failed to send ConnectionMsg::Connection to script.");
1077 };
1078 }
1079
1080 fn start_delete_database(
1083 &mut self,
1084 key: IndexedDBDescription,
1085 id: Uuid,
1086 sender: GenericCallback<BackendResult<u64>>,
1087 ) {
1088 let open_request = OpenRequest::Delete {
1089 sender,
1090 _origin: key.origin.clone(),
1091 _db_name: key.name.clone(),
1092 processed: false,
1093 id,
1094 };
1095
1096 let should_continue = {
1097 let queue = self.connection_queues.entry(key.clone()).or_default();
1099
1100 queue.push_back(open_request);
1102 queue.len() == 1
1103 };
1104
1105 if should_continue {
1107 self.delete_database(key.clone());
1108 self.maybe_remove_front_from_queue(&key);
1109 }
1110 }
1111
1112 fn delete_database(&mut self, key: IndexedDBDescription) {
1114 let Some(queue) = self.connection_queues.get_mut(&key) else {
1115 return debug_assert!(false, "A connection queue should exist.");
1116 };
1117 let Some(open_request) = queue.front_mut() else {
1118 return debug_assert!(false, "An open request should be in the queue.");
1119 };
1120 let OpenRequest::Delete {
1121 sender,
1122 _origin: _,
1123 _db_name: _,
1124 processed,
1125 id: _,
1126 } = open_request
1127 else {
1128 return debug_assert!(
1129 false,
1130 "An request to open a connection should be in the queue."
1131 );
1132 };
1133
1134 let version = if let Some(db) = self.databases.remove(&key) {
1136 let res = db.version();
1149 let Ok(version) = res else {
1150 if sender
1151 .send(BackendResult::Err(BackendError::DbErr(
1152 res.unwrap_err().to_string(),
1153 )))
1154 .is_err()
1155 {
1156 debug!("Script went away during pending database delete.");
1157 }
1158 return;
1159 };
1160
1161 if let Err(err) = db.delete_database() {
1165 if sender
1166 .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1167 .is_err()
1168 {
1169 debug!("Script went away during pending database delete.");
1170 }
1171 return;
1172 };
1173
1174 version
1175 } else {
1176 0
1177 };
1178
1179 if sender.send(BackendResult::Ok(version)).is_err() {
1181 debug!("Script went away during pending database delete.");
1182 }
1183
1184 *processed = true;
1185 }
1186
1187 fn close_database(&mut self, origin: ImmutableOrigin, id: Uuid, name: String) {
1189 let key = IndexedDBDescription { origin, name };
1206 let (can_upgrade, version) = {
1207 self.remove_connection(&key, &id);
1208
1209 let Some(queue) = self.connection_queues.get_mut(&key) else {
1210 return;
1211 };
1212 let Some(open_request) = queue.front_mut() else {
1213 return;
1214 };
1215 if let OpenRequest::Open {
1216 sender: _,
1217 db_name: _,
1218 version,
1219 id: _,
1220 pending_upgrade,
1221 pending_versionchange,
1222 pending_close,
1223 } = open_request
1224 {
1225 pending_close.remove(&id);
1226 (
1227 pending_close.is_empty() &&
1229 pending_versionchange.is_empty() &&
1230 !pending_upgrade.is_some(),
1231 *version,
1232 )
1233 } else {
1234 (false, None)
1235 }
1236 };
1237
1238 if can_upgrade {
1244 let Some(version) = version else {
1246 return debug_assert!(
1247 false,
1248 "An upgrade version should have been determined by now."
1249 );
1250 };
1251 self.upgrade_database(key.clone(), version);
1252
1253 let was_pruned = self.maybe_remove_front_from_queue(&key);
1254 if was_pruned {
1255 self.advance_connection_queue(key);
1256 }
1257 }
1258 }
1259
1260 fn handle_sync_operation(&mut self, operation: SyncOperation) {
1261 match operation {
1262 SyncOperation::GetDatabases(sender, origin) => {
1263 let info_list: Vec<DatabaseInfo> = self
1274 .databases
1275 .iter()
1276 .filter_map(|(description, info)| {
1277 if let Ok(version) = info.version() {
1279 if version == 0 {
1281 None
1282 } else {
1283 if description.origin == origin {
1288 Some(DatabaseInfo {
1289 name: description.name.clone(),
1290 version,
1291 })
1292 } else {
1293 None
1294 }
1295 }
1296 } else {
1297 None
1298 }
1299 })
1300 .collect();
1301
1302 let result = if info_list.len() == self.databases.len() {
1304 Ok(info_list)
1305 } else {
1306 Err(BackendError::DbErr(
1307 "Unknown error getting database info.".to_string(),
1308 ))
1309 };
1310
1311 if sender.send(result).is_err() {
1313 debug!("Couldn't send SyncOperation::GetDatabases reply.");
1314 }
1315 },
1316 SyncOperation::CloseDatabase(origin, id, db_name) => {
1317 self.close_database(origin, id, db_name);
1318 },
1319 SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1320 self.open_a_database_connection(sender, origin, db_name, version, id);
1321 },
1322 SyncOperation::AbortPendingUpgrades {
1323 pending_upgrades,
1324 origin,
1325 } => {
1326 self.abort_pending_upgrades(pending_upgrades, origin);
1327 },
1328 SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1329 self.abort_pending_upgrade(name, id, origin);
1330 },
1331 SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1332 let idb_description = IndexedDBDescription {
1333 origin,
1334 name: db_name,
1335 };
1336 self.start_delete_database(idb_description, id, callback);
1337 },
1338 SyncOperation::GetObjectStore(sender, origin, db_name, store_name) => {
1339 let result = self
1341 .get_database(origin, db_name)
1342 .map(|db| IndexedDBObjectStore {
1343 key_path: db.key_path(&store_name),
1344 has_key_generator: db.has_key_generator(&store_name),
1345 indexes: db.indexes(&store_name).unwrap_or_default(),
1346 name: store_name,
1347 });
1348 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1349 },
1350 SyncOperation::CreateIndex(
1351 origin,
1352 db_name,
1353 store_name,
1354 index_name,
1355 key_path,
1356 unique,
1357 multi_entry,
1358 ) => {
1359 if let Some(db) = self.get_database(origin, db_name) {
1360 let _ = db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1361 }
1362 },
1363 SyncOperation::DeleteIndex(origin, db_name, store_name, index_name) => {
1364 if let Some(db) = self.get_database(origin, db_name) {
1365 let _ = db.delete_index(&store_name, index_name);
1366 }
1367 },
1368 SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
1369 let _ = sender.send(Ok(()));
1371 },
1372 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
1373 if let Some(db) = self.get_database_mut(origin, db_name) {
1374 if version > db.version().unwrap_or(0) {
1375 let _ = db.set_version(version);
1376 }
1377 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1379 } else {
1380 let _ = sender.send(Err(BackendError::DbNotFound));
1381 }
1382 },
1383 SyncOperation::CreateObjectStore(
1384 sender,
1385 origin,
1386 db_name,
1387 store_name,
1388 key_paths,
1389 auto_increment,
1390 ) => {
1391 if let Some(db) = self.get_database_mut(origin, db_name) {
1392 let result = db.create_object_store(&store_name, key_paths, auto_increment);
1393 let _ = sender.send(result.map_err(BackendError::from));
1394 } else {
1395 let _ = sender.send(Err(BackendError::DbNotFound));
1396 }
1397 },
1398 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
1399 if let Some(db) = self.get_database_mut(origin, db_name) {
1400 let result = db.delete_object_store(&store_name);
1401 let _ = sender.send(result.map_err(BackendError::from));
1402 } else {
1403 let _ = sender.send(Err(BackendError::DbNotFound));
1404 }
1405 },
1406 SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
1407 if let Some(db) = self.get_database_mut(origin, db_name) {
1408 db.start_transaction(txn, Some(sender));
1409 } else {
1410 let _ = sender.send(Err(BackendError::DbNotFound));
1411 }
1412 },
1413 SyncOperation::Version(sender, origin, db_name) => {
1414 if let Some(db) = self.get_database(origin, db_name) {
1415 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1416 } else {
1417 let _ = sender.send(Err(BackendError::DbNotFound));
1418 }
1419 },
1420 SyncOperation::RegisterNewTxn(sender, _origin, _db_name) => {
1421 let transaction_id = self.serial_number_counter;
1425 self.serial_number_counter += 1;
1426 let _ = sender.send(transaction_id);
1427 },
1428 SyncOperation::NotifyEndOfVersionChange {
1429 name,
1430 id,
1431 old_version,
1432 origin,
1433 } => {
1434 self.handle_version_change_done(name, id, old_version, origin);
1435 },
1436 SyncOperation::Exit(_) => {
1437 unreachable!("We must've already broken out of event loop.");
1438 },
1439 }
1440 }
1441}