1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::generic_channel::{self, GenericReceiver, GenericSender, ReceiveError};
15use base::threadpool::ThreadPool;
16use log::{debug, error, warn};
17use profile_traits::generic_callback::GenericCallback;
18use rusqlite::Error as RusqliteError;
19use rustc_hash::FxHashMap;
20use servo_config::pref;
21use servo_url::origin::ImmutableOrigin;
22use storage_traits::indexeddb::{
23 AsyncOperation, BackendError, BackendResult, CreateObjectResult, DatabaseInfo, DbResult,
24 IndexedDBThreadMsg, IndexedDBTxnMode, KeyPath, OpenDatabaseResult, SyncOperation,
25};
26use uuid::Uuid;
27
28use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
29use crate::shared::is_sqlite_disk_full_error;
30
31pub trait IndexedDBThreadFactory {
32 fn new(config_dir: Option<PathBuf>) -> Self;
33}
34
35impl IndexedDBThreadFactory for GenericSender<IndexedDBThreadMsg> {
36 fn new(config_dir: Option<PathBuf>) -> GenericSender<IndexedDBThreadMsg> {
37 let (chan, port) = generic_channel::channel().unwrap();
38
39 let mut idb_base_dir = PathBuf::new();
40 if let Some(p) = config_dir {
41 idb_base_dir.push(p);
42 }
43 idb_base_dir.push("IndexedDB");
44
45 thread::Builder::new()
46 .name("IndexedDBManager".to_owned())
47 .spawn(move || {
48 IndexedDBManager::new(port, idb_base_dir).start();
49 })
50 .expect("Thread spawning failed");
51
52 chan
53 }
54}
55
56#[derive(Clone, Debug, Eq, Hash, PartialEq)]
59pub struct IndexedDBDescription {
60 pub origin: ImmutableOrigin,
61 pub name: String,
62}
63
64impl IndexedDBDescription {
65 const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
67 0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
68 0x5d,
69 ]);
70 pub(super) fn as_path(&self) -> PathBuf {
73 let mut path = PathBuf::new();
74
75 let origin_uuid = Uuid::new_v5(
77 Self::NAMESPACE_SERVO_IDB,
78 self.origin.ascii_serialization().as_bytes(),
79 );
80 let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
81 path.push(origin_uuid.to_string());
82 path.push(db_name_uuid.to_string());
83
84 path
85 }
86}
87
88struct IndexedDBEnvironment<E: KvsEngine> {
89 engine: E,
90 transactions: FxHashMap<u64, KvsTransaction>,
91}
92
93impl<E: KvsEngine> IndexedDBEnvironment<E> {
94 fn new(engine: E) -> IndexedDBEnvironment<E> {
95 IndexedDBEnvironment {
96 engine,
97 transactions: FxHashMap::default(),
98 }
99 }
100
101 fn queue_operation(
102 &mut self,
103 store_name: &str,
104 serial_number: u64,
105 mode: IndexedDBTxnMode,
106 operation: AsyncOperation,
107 ) {
108 self.transactions
109 .entry(serial_number)
110 .or_insert_with(|| KvsTransaction {
111 requests: VecDeque::new(),
112 mode,
113 })
114 .requests
115 .push_back(KvsOperation {
116 operation,
117 store_name: String::from(store_name),
118 });
119 }
120
121 fn start_transaction(&mut self, txn: u64, sender: Option<GenericSender<BackendResult<()>>>) {
123 if let Some(txn) = self.transactions.remove(&txn) {
126 let _ = self.engine.process_transaction(txn).blocking_recv();
127 }
128
129 if let Some(sender) = sender {
132 if sender.send(Ok(())).is_err() {
133 warn!("IDBTransaction starter dropped its channel");
134 }
135 }
136 }
137
138 fn has_key_generator(&self, store_name: &str) -> bool {
139 self.engine.has_key_generator(store_name)
140 }
141
142 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
143 self.engine.key_path(store_name)
144 }
145
146 fn create_index(
147 &self,
148 store_name: &str,
149 index_name: String,
150 key_path: KeyPath,
151 unique: bool,
152 multi_entry: bool,
153 ) -> DbResult<CreateObjectResult> {
154 self.engine
155 .create_index(store_name, index_name, key_path, unique, multi_entry)
156 .map_err(|err| format!("{err:?}"))
157 }
158
159 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
160 self.engine
161 .delete_index(store_name, index_name)
162 .map_err(|err| format!("{err:?}"))
163 }
164
165 fn create_object_store(
166 &mut self,
167 store_name: &str,
168 key_path: Option<KeyPath>,
169 auto_increment: bool,
170 ) -> DbResult<CreateObjectResult> {
171 self.engine
172 .create_store(store_name, key_path, auto_increment)
173 .map_err(|err| format!("{err:?}"))
174 }
175
176 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
177 let result = self.engine.delete_store(store_name);
178 result.map_err(|err| format!("{err:?}"))
179 }
180
181 fn delete_database(self) -> BackendResult<()> {
182 let result = self.engine.delete_database();
183 result
184 .map_err(|err| format!("{err:?}"))
185 .map_err(BackendError::from)
186 }
187
188 fn version(&self) -> Result<u64, E::Error> {
189 self.engine.version()
190 }
191
192 fn set_version(&mut self, version: u64) -> DbResult<()> {
193 self.engine
194 .set_version(version)
195 .map_err(|err| format!("{err:?}"))
196 }
197}
198
199fn backend_error_from_sqlite_error(err: RusqliteError) -> BackendError {
200 if is_sqlite_disk_full_error(&err) {
201 BackendError::QuotaExceeded
202 } else {
203 BackendError::DbErr(format!("{err:?}"))
204 }
205}
206
207enum OpenRequest {
211 Open {
212 sender: GenericCallback<BackendResult<OpenDatabaseResult>>,
214
215 origin: ImmutableOrigin,
218
219 db_name: String,
221
222 version: Option<u64>,
224
225 pending_upgrade: Option<VersionUpgrade>,
228
229 id: Uuid,
230 },
231 Delete {
232 sender: GenericCallback<BackendResult<u64>>,
234
235 _origin: ImmutableOrigin,
239
240 _db_name: String,
243
244 processed: bool,
246
247 id: Uuid,
248 },
249}
250
251impl OpenRequest {
252 fn get_id(&self) -> Uuid {
253 let id = match self {
254 OpenRequest::Open {
255 sender: _,
256 origin: _,
257 db_name: _,
258 version: _,
259 pending_upgrade: _,
260 id,
261 } => id,
262 OpenRequest::Delete {
263 sender: _,
264 _origin: _,
265 _db_name: _,
266 processed: _,
267 id,
268 } => id,
269 };
270 *id
271 }
272
273 fn is_open(&self) -> bool {
274 match self {
275 OpenRequest::Open {
276 sender: _,
277 origin: _,
278 db_name: _,
279 version: _,
280 pending_upgrade: _,
281 id: _,
282 } => true,
283 OpenRequest::Delete {
284 sender: _,
285 _origin: _,
286 _db_name: _,
287 processed: _,
288 id: _,
289 } => false,
290 }
291 }
292 fn is_pending(&self) -> bool {
293 match self {
294 OpenRequest::Open {
295 sender: _,
296 origin: _,
297 db_name: _,
298 version: _,
299 pending_upgrade,
300 id: _,
301 } => pending_upgrade.is_some(),
302 OpenRequest::Delete {
303 sender: _,
304 _origin: _,
305 _db_name: _,
306 processed,
307 id: _,
308 } => !processed,
309 }
310 }
311
312 fn abort(&self) -> Option<u64> {
315 match self {
316 OpenRequest::Open {
317 sender,
318 origin: _,
319 db_name: _,
320 version: _,
321 pending_upgrade,
322 id: _,
323 } => {
324 if sender.send(Ok(OpenDatabaseResult::AbortError)).is_err() {
325 error!("Failed to send OpenDatabaseResult::Connection to script.");
326 };
327 pending_upgrade.as_ref().map(|upgrade| upgrade.old)
328 },
329 OpenRequest::Delete {
330 sender,
331 _origin: _,
332 _db_name: _,
333 processed: _,
334 id: _,
335 } => {
336 if sender.send(Err(BackendError::DbNotFound)).is_err() {
337 error!("Failed to send result of database delete to script.");
338 };
339 None
340 },
341 }
342 }
343}
344
345struct VersionUpgrade {
346 old: u64,
347 new: u64,
348}
349
350#[derive(Clone, Eq, Hash, PartialEq)]
352struct Connection {
353 version: u64,
355
356 close_pending: bool,
358}
359
360struct IndexedDBManager {
361 port: GenericReceiver<IndexedDBThreadMsg>,
362 idb_base_dir: PathBuf,
363 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
364 thread_pool: Arc<ThreadPool>,
365
366 serial_number_counter: u64,
371
372 connection_queues: HashMap<IndexedDBDescription, VecDeque<OpenRequest>>,
374
375 connections: HashMap<IndexedDBDescription, HashSet<Connection>>,
377}
378
379impl IndexedDBManager {
380 fn new(port: GenericReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
381 debug!("New indexedDBManager");
382
383 let thread_count = thread::available_parallelism()
387 .map(|i| i.get())
388 .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
389 .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
390
391 IndexedDBManager {
392 port,
393 idb_base_dir,
394 databases: HashMap::new(),
395 thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
396 serial_number_counter: 0,
397 connection_queues: Default::default(),
398 connections: Default::default(),
399 }
400 }
401}
402
403impl IndexedDBManager {
404 fn start(&mut self) {
405 loop {
406 let message = match self.port.recv() {
409 Ok(msg) => msg,
410 Err(ReceiveError::Disconnected) => {
411 break;
412 },
413 Err(e) => {
414 warn!("Error in IndexedDB thread: {e:?}");
415 continue;
416 },
417 };
418 match message {
419 IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
420 let _ = sender.send(());
421 break;
422 },
423 IndexedDBThreadMsg::Sync(operation) => {
424 self.handle_sync_operation(operation);
425 },
426 IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
427 if let Some(db) = self.get_database_mut(origin, db_name) {
428 db.queue_operation(&store_name, txn, mode, operation);
430 db.start_transaction(txn, None);
435 }
436 },
437 IndexedDBThreadMsg::OpenTransactionInactive { name, origin } => {
438 self.handle_open_transaction_inactive(name, origin);
439 },
440 }
441 }
442 }
443
444 fn handle_open_transaction_inactive(&mut self, name: String, origin: ImmutableOrigin) {
446 let key = IndexedDBDescription { name, origin };
447 let Some(queue) = self.connection_queues.get_mut(&key) else {
448 return debug_assert!(false, "A connection queue should exist.");
449 };
450 let Some(open_request) = queue.pop_front() else {
451 return debug_assert!(false, "A pending open request should exist.");
452 };
453 let OpenRequest::Open {
454 sender,
455 origin: _,
456 db_name: _,
457 version: _,
458 pending_upgrade,
459 id: _,
460 } = open_request
461 else {
462 return;
463 };
464 let Some(VersionUpgrade { old: _, new }) = pending_upgrade else {
465 return debug_assert!(false, "A pending version upgrade should exist.");
466 };
467 if sender
468 .send(Ok(OpenDatabaseResult::Connection {
469 version: new,
470 upgraded: true,
471 }))
472 .is_err()
473 {
474 error!("Failed to send OpenDatabaseResult::Connection to script.");
475 };
476
477 self.advance_connection_queue(key);
478 }
479
480 fn advance_connection_queue(&mut self, key: IndexedDBDescription) {
482 loop {
483 let is_open = {
484 let Some(queue) = self.connection_queues.get_mut(&key) else {
485 return;
486 };
487 if queue.is_empty() {
488 return;
489 }
490 queue.front().expect("Queue is not empty.").is_open()
491 };
492
493 if is_open {
494 self.open_database(key.clone());
495 } else {
496 self.delete_database(key.clone());
497 }
498
499 let was_pruned = self.maybe_remove_front_from_queue(&key);
500
501 if !was_pruned {
502 break;
507 }
508 }
509 }
510
511 fn maybe_remove_front_from_queue(&mut self, key: &IndexedDBDescription) -> bool {
513 let (is_empty, was_pruned) = {
514 let Some(queue) = self.connection_queues.get_mut(key) else {
515 debug_assert!(false, "A connection queue should exist.");
516 return false;
517 };
518 let mut pruned = false;
519 let front_is_pending = queue.front().map(|record| record.is_pending());
520 if let Some(is_pending) = front_is_pending {
521 if !is_pending {
522 queue.pop_front().expect("Queue has a non-pending item.");
523 pruned = true
524 }
525 }
526 (queue.is_empty(), pruned)
527 };
528 if is_empty {
529 self.connection_queues.remove(key);
530 }
531 was_pruned
532 }
533
534 fn abort_pending_upgrade(&mut self, name: String, id: Uuid, origin: ImmutableOrigin) {
538 let key = IndexedDBDescription {
539 name,
540 origin: origin.clone(),
541 };
542 let old = {
543 let Some(queue) = self.connection_queues.get_mut(&key) else {
544 return debug_assert!(
545 false,
546 "There should be a connection queue for the aborted upgrade."
547 );
548 };
549 let Some(open_request) = queue.pop_front() else {
550 return debug_assert!(false, "There should be an open request to upgrade.");
551 };
552 if open_request.get_id() != id {
553 return debug_assert!(
554 false,
555 "Open request to abort should be at the head of the queue."
556 );
557 }
558 open_request.abort()
559 };
560 if let Some(old_version) = old {
561 let Some(db) = self.databases.get_mut(&key) else {
562 return debug_assert!(false, "Db should have been created");
563 };
564 let res = db.set_version(old_version);
567 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
568 }
569
570 self.advance_connection_queue(key);
571 }
572
573 fn abort_pending_upgrades(
577 &mut self,
578 pending_upgrades: HashMap<String, HashSet<Uuid>>,
579 origin: ImmutableOrigin,
580 ) {
581 for (name, ids) in pending_upgrades.into_iter() {
582 let mut version_to_revert: Option<u64> = None;
583 let key = IndexedDBDescription {
584 name,
585 origin: origin.clone(),
586 };
587 {
588 let is_empty = {
589 let Some(queue) = self.connection_queues.get_mut(&key) else {
590 continue;
591 };
592 queue.retain_mut(|open_request| {
593 if ids.contains(&open_request.get_id()) {
594 let old = open_request.abort();
595 if version_to_revert.is_none() {
596 if let Some(old) = old {
597 version_to_revert = Some(old);
598 }
599 }
600 false
601 } else {
602 true
603 }
604 });
605 queue.is_empty()
606 };
607 if is_empty {
608 self.connection_queues.remove(&key);
609 }
610 }
611 if let Some(version) = version_to_revert {
612 let Some(db) = self.databases.get_mut(&key) else {
613 return debug_assert!(false, "Db should have been created");
614 };
615 let res = db.set_version(version);
618 debug_assert!(res.is_ok(), "Setting a db version should not fail.");
619 }
620 }
621 }
622
623 fn open_a_database_connection(
625 &mut self,
626 sender: GenericCallback<BackendResult<OpenDatabaseResult>>,
627 origin: ImmutableOrigin,
628 db_name: String,
629 version: Option<u64>,
630 id: Uuid,
631 ) {
632 let key = IndexedDBDescription {
633 name: db_name.clone(),
634 origin: origin.clone(),
635 };
636 let open_request = OpenRequest::Open {
637 sender,
638 origin,
639 db_name,
640 version,
641 pending_upgrade: None,
642 id,
643 };
644 let should_continue = {
645 let queue = self.connection_queues.entry(key.clone()).or_default();
647
648 queue.push_back(open_request);
650 queue.len() == 1
651 };
652
653 if should_continue {
655 self.open_database(key.clone());
656 self.maybe_remove_front_from_queue(&key);
657 }
658 }
659
660 fn get_database(
661 &self,
662 origin: ImmutableOrigin,
663 db_name: String,
664 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
665 let idb_description = IndexedDBDescription {
666 origin,
667 name: db_name,
668 };
669
670 self.databases.get(&idb_description)
671 }
672
673 fn get_database_mut(
674 &mut self,
675 origin: ImmutableOrigin,
676 db_name: String,
677 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
678 let idb_description = IndexedDBDescription {
679 origin,
680 name: db_name,
681 };
682
683 self.databases.get_mut(&idb_description)
684 }
685
686 fn upgrade_database(
691 &mut self,
692 idb_description: IndexedDBDescription,
693 key: IndexedDBDescription,
694 new_version: u64,
695 ) {
696 let Some(queue) = self.connection_queues.get_mut(&key) else {
697 return debug_assert!(false, "A connection queue should exist.");
698 };
699 let Some(open_request) = queue.front_mut() else {
700 return debug_assert!(false, "An open request should be in the queue.");
701 };
702 let OpenRequest::Open {
703 sender,
704 origin: _,
705 db_name: _,
706 version: _,
707 id: _,
708 pending_upgrade,
709 } = open_request
710 else {
711 return;
712 };
713
714 let db = self
717 .databases
718 .get_mut(&idb_description)
719 .expect("Db should have been opened.");
720
721 let transaction_id = self.serial_number_counter;
723 self.serial_number_counter += 1;
724
725 let old_version = db.version().expect("DB should have a version.");
733
734 db.set_version(new_version)
739 .expect("Setting the version should not fail");
740
741 if sender
746 .send(Ok(OpenDatabaseResult::Upgrade {
747 version: new_version,
748 old_version,
749 transaction: transaction_id,
750 }))
751 .is_err()
752 {
753 error!("Couldn't queue task for indexeddb upgrade event.");
754 }
755
756 let _ = pending_upgrade.insert(VersionUpgrade {
758 old: old_version,
759 new: new_version,
760 });
761 }
762
763 fn open_database(&mut self, key: IndexedDBDescription) {
766 let Some(queue) = self.connection_queues.get_mut(&key) else {
767 return debug_assert!(false, "A connection queue should exist.");
768 };
769 let Some(open_request) = queue.front_mut() else {
770 return debug_assert!(false, "An open request should be in the queue.");
771 };
772 let OpenRequest::Open {
773 sender,
774 origin,
775 db_name,
776 version,
777 id: _,
778 pending_upgrade: _,
779 } = open_request
780 else {
781 return debug_assert!(
782 false,
783 "An request to open a connection should be in the queue."
784 );
785 };
786
787 let idb_description = IndexedDBDescription {
788 origin: origin.clone(),
789 name: db_name.clone(),
790 };
791
792 let idb_base_dir = self.idb_base_dir.as_path();
793 let requested_version = version;
794
795 let (db_version, version) = match self.databases.entry(idb_description.clone()) {
797 Entry::Vacant(e) => {
798 let engine = match SqliteEngine::new(
807 idb_base_dir,
808 &idb_description,
809 self.thread_pool.clone(),
810 ) {
811 Ok(engine) => engine,
812 Err(err) => {
813 if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
814 debug!("Script exit during indexeddb database open {:?}", e);
815 }
816 return;
817 },
818 };
819 let created_db_path = engine.created_db_path();
820 let db = IndexedDBEnvironment::new(engine);
821 let db_version = match db.version() {
822 Ok(version) => version,
823 Err(err) => {
824 if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
825 debug!("Script exit during indexeddb database open {:?}", e);
826 }
827
828 return;
829 },
830 };
831
832 let version = if created_db_path {
833 requested_version.unwrap_or(1)
834 } else {
835 requested_version.unwrap_or(db_version)
836 };
837
838 e.insert(db);
839 (db_version, version)
840 },
841 Entry::Occupied(db) => {
842 let db_version = match db.get().version() {
843 Ok(version) => version,
844 Err(err) => {
845 if let Err(e) = sender.send(Err(backend_error_from_sqlite_error(err))) {
846 debug!("Script exit during indexeddb database open {:?}", e);
847 }
848 return;
849 },
850 };
851 (db_version, requested_version.unwrap_or(db_version))
853 },
854 };
855
856 if version < db_version {
860 if sender.send(Ok(OpenDatabaseResult::VersionError)).is_err() {
861 debug!("Script exit during indexeddb database open");
862 }
863 return;
864 }
865
866 let connection = Connection {
869 version,
870 close_pending: false,
871 };
872 let entry = self.connections.entry(idb_description.clone()).or_default();
873 let open_connections = entry.clone();
874 entry.insert(connection);
875
876 if db_version < version {
878 for _conn in open_connections
884 .into_iter()
885 .filter(|conn| conn.close_pending)
886 {
887 }
890
891 self.upgrade_database(idb_description, key, version);
903 return;
904 }
905
906 if sender
908 .send(Ok(OpenDatabaseResult::Connection {
909 version: db_version,
910 upgraded: false,
911 }))
912 .is_err()
913 {
914 error!("Failed to send OpenDatabaseResult::Connection to script.");
915 };
916 }
917
918 fn start_delete_database(
921 &mut self,
922 key: IndexedDBDescription,
923 id: Uuid,
924 sender: GenericCallback<BackendResult<u64>>,
925 ) {
926 let open_request = OpenRequest::Delete {
927 sender,
928 _origin: key.origin.clone(),
929 _db_name: key.name.clone(),
930 processed: false,
931 id,
932 };
933
934 let should_continue = {
935 let queue = self.connection_queues.entry(key.clone()).or_default();
937
938 queue.push_back(open_request);
940 queue.len() == 1
941 };
942
943 if should_continue {
945 self.delete_database(key.clone());
946 self.maybe_remove_front_from_queue(&key);
947 }
948 }
949
950 fn delete_database(&mut self, key: IndexedDBDescription) {
952 let Some(queue) = self.connection_queues.get_mut(&key) else {
953 return debug_assert!(false, "A connection queue should exist.");
954 };
955 let Some(open_request) = queue.front_mut() else {
956 return debug_assert!(false, "An open request should be in the queue.");
957 };
958 let OpenRequest::Delete {
959 sender,
960 _origin: _,
961 _db_name: _,
962 processed,
963 id: _,
964 } = open_request
965 else {
966 return debug_assert!(
967 false,
968 "An request to open a connection should be in the queue."
969 );
970 };
971
972 let version = if let Some(db) = self.databases.remove(&key) {
974 let res = db.version();
987 let Ok(version) = res else {
988 if sender
989 .send(BackendResult::Err(BackendError::DbErr(
990 res.unwrap_err().to_string(),
991 )))
992 .is_err()
993 {
994 debug!("Script went away during pending database delete.");
995 }
996 return;
997 };
998
999 if let Err(err) = db.delete_database() {
1003 if sender
1004 .send(BackendResult::Err(BackendError::DbErr(err.to_string())))
1005 .is_err()
1006 {
1007 debug!("Script went away during pending database delete.");
1008 }
1009 return;
1010 };
1011
1012 version
1013 } else {
1014 0
1015 };
1016
1017 if sender.send(BackendResult::Ok(version)).is_err() {
1019 debug!("Script went away during pending database delete.");
1020 }
1021
1022 *processed = true;
1023 }
1024
1025 fn handle_sync_operation(&mut self, operation: SyncOperation) {
1026 match operation {
1027 SyncOperation::GetDatabases(sender, origin) => {
1028 let info_list: Vec<DatabaseInfo> = self
1039 .databases
1040 .iter()
1041 .filter_map(|(description, info)| {
1042 if let Ok(version) = info.version() {
1044 if version == 0 {
1046 None
1047 } else {
1048 if description.origin == origin {
1053 Some(DatabaseInfo {
1054 name: description.name.clone(),
1055 version,
1056 })
1057 } else {
1058 None
1059 }
1060 }
1061 } else {
1062 None
1063 }
1064 })
1065 .collect();
1066
1067 let result = if info_list.len() == self.databases.len() {
1069 Ok(info_list)
1070 } else {
1071 Err(BackendError::DbErr(
1072 "Unknown error getting database info.".to_string(),
1073 ))
1074 };
1075
1076 if sender.send(result).is_err() {
1078 debug!("Couldn't send SyncOperation::GetDatabases reply.");
1079 }
1080 },
1081 SyncOperation::CloseDatabase(sender, origin, db_name) => {
1082 let idb_description = IndexedDBDescription {
1086 origin,
1087 name: db_name,
1088 };
1089 if let Some(_db) = self.databases.remove(&idb_description) {
1090 }
1092 let _ = sender.send(Ok(()));
1093 },
1094 SyncOperation::OpenDatabase(sender, origin, db_name, version, id) => {
1095 self.open_a_database_connection(sender, origin, db_name, version, id);
1096 },
1097 SyncOperation::AbortPendingUpgrades {
1098 pending_upgrades,
1099 origin,
1100 } => {
1101 self.abort_pending_upgrades(pending_upgrades, origin);
1102 },
1103 SyncOperation::AbortPendingUpgrade { name, id, origin } => {
1104 self.abort_pending_upgrade(name, id, origin);
1105 },
1106 SyncOperation::DeleteDatabase(callback, origin, db_name, id) => {
1107 let idb_description = IndexedDBDescription {
1108 origin,
1109 name: db_name,
1110 };
1111 self.start_delete_database(idb_description, id, callback);
1112 },
1113 SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
1114 let result = self
1115 .get_database(origin, db_name)
1116 .map(|db| db.has_key_generator(&store_name));
1117 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1118 },
1119 SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
1120 let result = self
1121 .get_database(origin, db_name)
1122 .map(|db| db.key_path(&store_name));
1123 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
1124 },
1125 SyncOperation::CreateIndex(
1126 sender,
1127 origin,
1128 db_name,
1129 store_name,
1130 index_name,
1131 key_path,
1132 unique,
1133 multi_entry,
1134 ) => {
1135 if let Some(db) = self.get_database(origin, db_name) {
1136 let result =
1137 db.create_index(&store_name, index_name, key_path, unique, multi_entry);
1138 let _ = sender.send(result.map_err(BackendError::from));
1139 } else {
1140 let _ = sender.send(Err(BackendError::DbNotFound));
1141 }
1142 },
1143 SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
1144 if let Some(db) = self.get_database(origin, db_name) {
1145 let result = db.delete_index(&store_name, index_name);
1146 let _ = sender.send(result.map_err(BackendError::from));
1147 } else {
1148 let _ = sender.send(Err(BackendError::DbNotFound));
1149 }
1150 },
1151 SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
1152 let _ = sender.send(Ok(()));
1154 },
1155 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
1156 if let Some(db) = self.get_database_mut(origin, db_name) {
1157 if version > db.version().unwrap_or(0) {
1158 let _ = db.set_version(version);
1159 }
1160 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1162 } else {
1163 let _ = sender.send(Err(BackendError::DbNotFound));
1164 }
1165 },
1166 SyncOperation::CreateObjectStore(
1167 sender,
1168 origin,
1169 db_name,
1170 store_name,
1171 key_paths,
1172 auto_increment,
1173 ) => {
1174 if let Some(db) = self.get_database_mut(origin, db_name) {
1175 let result = db.create_object_store(&store_name, key_paths, auto_increment);
1176 let _ = sender.send(result.map_err(BackendError::from));
1177 } else {
1178 let _ = sender.send(Err(BackendError::DbNotFound));
1179 }
1180 },
1181 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
1182 if let Some(db) = self.get_database_mut(origin, db_name) {
1183 let result = db.delete_object_store(&store_name);
1184 let _ = sender.send(result.map_err(BackendError::from));
1185 } else {
1186 let _ = sender.send(Err(BackendError::DbNotFound));
1187 }
1188 },
1189 SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
1190 if let Some(db) = self.get_database_mut(origin, db_name) {
1191 db.start_transaction(txn, Some(sender));
1192 } else {
1193 let _ = sender.send(Err(BackendError::DbNotFound));
1194 }
1195 },
1196 SyncOperation::Version(sender, origin, db_name) => {
1197 if let Some(db) = self.get_database(origin, db_name) {
1198 let _ = sender.send(db.version().map_err(backend_error_from_sqlite_error));
1199 } else {
1200 let _ = sender.send(Err(BackendError::DbNotFound));
1201 }
1202 },
1203 SyncOperation::RegisterNewTxn(sender, _origin, _db_name) => {
1204 let transaction_id = self.serial_number_counter;
1208 self.serial_number_counter += 1;
1209 let _ = sender.send(transaction_id);
1210 },
1211 SyncOperation::Exit(_) => {
1212 unreachable!("We must've already broken out of event loop.");
1213 },
1214 }
1215 }
1216}