1mod engines;
6
7use std::borrow::ToOwned;
8use std::collections::hash_map::Entry;
9use std::collections::{HashMap, VecDeque};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::thread;
13
14use base::threadpool::ThreadPool;
15use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
16use log::{debug, warn};
17use rustc_hash::FxHashMap;
18use servo_config::pref;
19use servo_url::origin::ImmutableOrigin;
20use storage_traits::indexeddb::{
21 AsyncOperation, BackendError, BackendResult, CreateObjectResult, DbResult, IndexedDBThreadMsg,
22 IndexedDBTxnMode, KeyPath, SyncOperation,
23};
24use uuid::Uuid;
25
26use crate::indexeddb::engines::{KvsEngine, KvsOperation, KvsTransaction, SqliteEngine};
27
28pub trait IndexedDBThreadFactory {
29 fn new(config_dir: Option<PathBuf>) -> Self;
30}
31
32impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
33 fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
34 let (chan, port) = ipc::channel().unwrap();
35
36 let mut idb_base_dir = PathBuf::new();
37 if let Some(p) = config_dir {
38 idb_base_dir.push(p);
39 }
40 idb_base_dir.push("IndexedDB");
41
42 thread::Builder::new()
43 .name("IndexedDBManager".to_owned())
44 .spawn(move || {
45 IndexedDBManager::new(port, idb_base_dir).start();
46 })
47 .expect("Thread spawning failed");
48
49 chan
50 }
51}
52
53#[derive(Clone, Eq, Hash, PartialEq)]
54pub struct IndexedDBDescription {
55 pub origin: ImmutableOrigin,
56 pub name: String,
57}
58
59impl IndexedDBDescription {
60 const NAMESPACE_SERVO_IDB: &uuid::Uuid = &Uuid::from_bytes([
62 0x37, 0x9e, 0x56, 0xb0, 0x1a, 0x76, 0x44, 0xc2, 0xa0, 0xdb, 0xe2, 0x18, 0xc5, 0xc8, 0xa3,
63 0x5d,
64 ]);
65 pub(super) fn as_path(&self) -> PathBuf {
68 let mut path = PathBuf::new();
69
70 let origin_uuid = Uuid::new_v5(
72 Self::NAMESPACE_SERVO_IDB,
73 self.origin.ascii_serialization().as_bytes(),
74 );
75 let db_name_uuid = Uuid::new_v5(Self::NAMESPACE_SERVO_IDB, self.name.as_bytes());
76 path.push(origin_uuid.to_string());
77 path.push(db_name_uuid.to_string());
78
79 path
80 }
81}
82
83struct IndexedDBEnvironment<E: KvsEngine> {
84 engine: E,
85 transactions: FxHashMap<u64, KvsTransaction>,
86 serial_number_counter: u64,
87}
88
89impl<E: KvsEngine> IndexedDBEnvironment<E> {
90 fn new(engine: E) -> IndexedDBEnvironment<E> {
91 IndexedDBEnvironment {
92 engine,
93 transactions: FxHashMap::default(),
94 serial_number_counter: 0,
95 }
96 }
97
98 fn queue_operation(
99 &mut self,
100 store_name: &str,
101 serial_number: u64,
102 mode: IndexedDBTxnMode,
103 operation: AsyncOperation,
104 ) {
105 self.transactions
106 .entry(serial_number)
107 .or_insert_with(|| KvsTransaction {
108 requests: VecDeque::new(),
109 mode,
110 })
111 .requests
112 .push_back(KvsOperation {
113 operation,
114 store_name: String::from(store_name),
115 });
116 }
117
118 fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<BackendResult<()>>>) {
120 if let Some(txn) = self.transactions.remove(&txn) {
123 let _ = self.engine.process_transaction(txn).blocking_recv();
124 }
125
126 if let Some(sender) = sender {
129 if sender.send(Ok(())).is_err() {
130 warn!("IDBTransaction starter dropped its channel");
131 }
132 }
133 }
134
135 fn has_key_generator(&self, store_name: &str) -> bool {
136 self.engine.has_key_generator(store_name)
137 }
138
139 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
140 self.engine.key_path(store_name)
141 }
142
143 fn create_index(
144 &self,
145 store_name: &str,
146 index_name: String,
147 key_path: KeyPath,
148 unique: bool,
149 multi_entry: bool,
150 ) -> DbResult<CreateObjectResult> {
151 self.engine
152 .create_index(store_name, index_name, key_path, unique, multi_entry)
153 .map_err(|err| format!("{err:?}"))
154 }
155
156 fn delete_index(&self, store_name: &str, index_name: String) -> DbResult<()> {
157 self.engine
158 .delete_index(store_name, index_name)
159 .map_err(|err| format!("{err:?}"))
160 }
161
162 fn create_object_store(
163 &mut self,
164 store_name: &str,
165 key_path: Option<KeyPath>,
166 auto_increment: bool,
167 ) -> DbResult<CreateObjectResult> {
168 self.engine
169 .create_store(store_name, key_path, auto_increment)
170 .map_err(|err| format!("{err:?}"))
171 }
172
173 fn delete_object_store(&mut self, store_name: &str) -> DbResult<()> {
174 let result = self.engine.delete_store(store_name);
175 result.map_err(|err| format!("{err:?}"))
176 }
177
178 fn delete_database(self, sender: IpcSender<BackendResult<()>>) {
179 let result = self.engine.delete_database();
180 let _ = sender.send(
181 result
182 .map_err(|err| format!("{err:?}"))
183 .map_err(BackendError::from),
184 );
185 }
186
187 fn version(&self) -> DbResult<u64> {
188 self.engine.version().map_err(|err| format!("{err:?}"))
189 }
190
191 fn set_version(&mut self, version: u64) -> DbResult<()> {
192 self.engine
193 .set_version(version)
194 .map_err(|err| format!("{err:?}"))
195 }
196}
197
198struct IndexedDBManager {
199 port: IpcReceiver<IndexedDBThreadMsg>,
200 idb_base_dir: PathBuf,
201 databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<SqliteEngine>>,
202 thread_pool: Arc<ThreadPool>,
203}
204
205impl IndexedDBManager {
206 fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
207 debug!("New indexedDBManager");
208
209 let thread_count = thread::available_parallelism()
213 .map(|i| i.get())
214 .unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
215 .min(pref!(threadpools_indexeddb_workers_max).max(1) as usize);
216
217 IndexedDBManager {
218 port,
219 idb_base_dir,
220 databases: HashMap::new(),
221 thread_pool: Arc::new(ThreadPool::new(thread_count, "IndexedDB".to_string())),
222 }
223 }
224}
225
226impl IndexedDBManager {
227 fn start(&mut self) {
228 loop {
229 let message = match self.port.recv() {
232 Ok(msg) => msg,
233 Err(e) => match e {
234 IpcError::Disconnected => {
235 break;
236 },
237 other => {
238 warn!("Error in IndexedDB thread: {:?}", other);
239 continue;
240 },
241 },
242 };
243 match message {
244 IndexedDBThreadMsg::Sync(SyncOperation::Exit(sender)) => {
245 let _ = sender.send(());
246 break;
247 },
248 IndexedDBThreadMsg::Sync(operation) => {
249 self.handle_sync_operation(operation);
250 },
251 IndexedDBThreadMsg::Async(origin, db_name, store_name, txn, mode, operation) => {
252 if let Some(db) = self.get_database_mut(origin, db_name) {
253 db.queue_operation(&store_name, txn, mode, operation);
255 db.start_transaction(txn, None);
260 }
261 },
262 }
263 }
264 }
265
266 fn get_database(
267 &self,
268 origin: ImmutableOrigin,
269 db_name: String,
270 ) -> Option<&IndexedDBEnvironment<SqliteEngine>> {
271 let idb_description = IndexedDBDescription {
272 origin,
273 name: db_name,
274 };
275
276 self.databases.get(&idb_description)
277 }
278
279 fn get_database_mut(
280 &mut self,
281 origin: ImmutableOrigin,
282 db_name: String,
283 ) -> Option<&mut IndexedDBEnvironment<SqliteEngine>> {
284 let idb_description = IndexedDBDescription {
285 origin,
286 name: db_name,
287 };
288
289 self.databases.get_mut(&idb_description)
290 }
291
292 fn handle_sync_operation(&mut self, operation: SyncOperation) {
293 match operation {
294 SyncOperation::CloseDatabase(sender, origin, db_name) => {
295 let idb_description = IndexedDBDescription {
296 origin,
297 name: db_name,
298 };
299 if let Some(_db) = self.databases.remove(&idb_description) {
300 }
302 let _ = sender.send(Ok(()));
303 },
304 SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
305 let idb_description = IndexedDBDescription {
306 origin,
307 name: db_name,
308 };
309
310 let idb_base_dir = self.idb_base_dir.as_path();
311
312 let version = version.unwrap_or(0);
313
314 match self.databases.entry(idb_description.clone()) {
315 Entry::Vacant(e) => {
316 let db = IndexedDBEnvironment::new(
317 SqliteEngine::new(
318 idb_base_dir,
319 &idb_description,
320 self.thread_pool.clone(),
321 )
322 .expect("Failed to create sqlite engine"),
323 );
324 let _ = sender.send(db.version().unwrap_or(version));
325 e.insert(db);
326 },
327 Entry::Occupied(db) => {
328 let _ = sender.send(db.get().version().unwrap_or(version));
329 },
330 }
331 },
332 SyncOperation::DeleteDatabase(sender, origin, db_name) => {
333 let idb_description = IndexedDBDescription {
337 origin,
338 name: db_name,
339 };
340 if let Some(db) = self.databases.remove(&idb_description) {
341 db.delete_database(sender);
342 } else {
343 let _ = sender.send(Ok(()));
344 }
345 },
346 SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
347 let result = self
348 .get_database(origin, db_name)
349 .map(|db| db.has_key_generator(&store_name));
350 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
351 },
352 SyncOperation::KeyPath(sender, origin, db_name, store_name) => {
353 let result = self
354 .get_database(origin, db_name)
355 .map(|db| db.key_path(&store_name));
356 let _ = sender.send(result.ok_or(BackendError::DbNotFound));
357 },
358 SyncOperation::CreateIndex(
359 sender,
360 origin,
361 db_name,
362 store_name,
363 index_name,
364 key_path,
365 unique,
366 multi_entry,
367 ) => {
368 if let Some(db) = self.get_database(origin, db_name) {
369 let result =
370 db.create_index(&store_name, index_name, key_path, unique, multi_entry);
371 let _ = sender.send(result.map_err(BackendError::from));
372 } else {
373 let _ = sender.send(Err(BackendError::DbNotFound));
374 }
375 },
376 SyncOperation::DeleteIndex(sender, origin, db_name, store_name, index_name) => {
377 if let Some(db) = self.get_database(origin, db_name) {
378 let result = db.delete_index(&store_name, index_name);
379 let _ = sender.send(result.map_err(BackendError::from));
380 } else {
381 let _ = sender.send(Err(BackendError::DbNotFound));
382 }
383 },
384 SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
385 let _ = sender.send(Ok(()));
387 },
388 SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
389 if let Some(db) = self.get_database_mut(origin, db_name) {
390 if version > db.version().unwrap_or(0) {
391 let _ = db.set_version(version);
392 }
393 let _ = sender.send(db.version().map_err(BackendError::from));
395 } else {
396 let _ = sender.send(Err(BackendError::DbNotFound));
397 }
398 },
399 SyncOperation::CreateObjectStore(
400 sender,
401 origin,
402 db_name,
403 store_name,
404 key_paths,
405 auto_increment,
406 ) => {
407 if let Some(db) = self.get_database_mut(origin, db_name) {
408 let result = db.create_object_store(&store_name, key_paths, auto_increment);
409 let _ = sender.send(result.map_err(BackendError::from));
410 } else {
411 let _ = sender.send(Err(BackendError::DbNotFound));
412 }
413 },
414 SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
415 if let Some(db) = self.get_database_mut(origin, db_name) {
416 let result = db.delete_object_store(&store_name);
417 let _ = sender.send(result.map_err(BackendError::from));
418 } else {
419 let _ = sender.send(Err(BackendError::DbNotFound));
420 }
421 },
422 SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
423 if let Some(db) = self.get_database_mut(origin, db_name) {
424 db.start_transaction(txn, Some(sender));
425 } else {
426 let _ = sender.send(Err(BackendError::DbNotFound));
427 }
428 },
429 SyncOperation::Version(sender, origin, db_name) => {
430 if let Some(db) = self.get_database(origin, db_name) {
431 let _ = sender.send(db.version().map_err(BackendError::from));
432 } else {
433 let _ = sender.send(Err(BackendError::DbNotFound));
434 }
435 },
436 SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
437 if let Some(db) = self.get_database_mut(origin, db_name) {
438 db.serial_number_counter += 1;
439 let _ = sender.send(db.serial_number_counter);
440 }
441 },
442 SyncOperation::Exit(_) => {
443 unreachable!("We must've already broken out of event loop.");
444 },
445 }
446 }
447}