1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use ipc_channel::ipc::IpcSender;
8use log::{error, info};
9use net_traits::indexeddb_thread::{
10 AsyncOperation, AsyncReadOnlyOperation, AsyncReadWriteOperation, BackendError, BackendResult,
11 CreateObjectResult, IndexedDBKeyRange, IndexedDBKeyType, IndexedDBTxnMode, KeyPath,
12 PutItemResult,
13};
14use rusqlite::{Connection, Error, OptionalExtension, params};
15use sea_query::{Condition, Expr, ExprTrait, IntoCondition, SqliteQueryBuilder};
16use sea_query_rusqlite::RusqliteBinder;
17use serde::Serialize;
18use tokio::sync::oneshot;
19
20use crate::indexeddb::engines::{KvsEngine, KvsTransaction};
21use crate::indexeddb::idb_thread::IndexedDBDescription;
22use crate::resource_thread::CoreResourceThreadPool;
23
24mod create;
25mod database_model;
26mod object_data_model;
27mod object_store_index_model;
28mod object_store_model;
29
30const DB_INIT_PRAGMAS: [&str; 2] = ["PRAGMA journal_mode = WAL;", "PRAGMA encoding = 'UTF-16';"];
32
33const DB_PRAGMAS: [&str; 4] = [
35 "PRAGMA synchronous = NORMAL;",
36 "PRAGMA journal_size_limit = 67108864 -- 64 megabytes;",
37 "PRAGMA mmap_size = 67108864 -- 64 megabytes;",
38 "PRAGMA cache_size = 2000;",
39];
40
41fn range_to_query(range: IndexedDBKeyRange) -> Condition {
42 if let Some(singleton) = range.as_singleton() {
44 let encoded = bincode::serialize(singleton).unwrap();
45 return Expr::column(object_data_model::Column::Key)
46 .eq(encoded)
47 .into_condition();
48 }
49 let mut parts = vec![];
50 if let Some(upper) = range.upper.as_ref() {
51 let upper_bytes = bincode::serialize(upper).unwrap();
52 let query = if range.upper_open {
53 Expr::column(object_data_model::Column::Key).lt(upper_bytes)
54 } else {
55 Expr::column(object_data_model::Column::Key).lte(upper_bytes)
56 };
57 parts.push(query);
58 }
59 if let Some(lower) = range.lower.as_ref() {
60 let lower_bytes = bincode::serialize(lower).unwrap();
61 let query = if range.upper_open {
62 Expr::column(object_data_model::Column::Key).gt(lower_bytes)
63 } else {
64 Expr::column(object_data_model::Column::Key).gte(lower_bytes)
65 };
66 parts.push(query);
67 }
68 let mut condition = Condition::all();
69 for part in parts {
70 condition = condition.add(part);
71 }
72 condition
73}
74
75pub struct SqliteEngine {
76 db_path: PathBuf,
77 connection: Connection,
78 read_pool: Arc<CoreResourceThreadPool>,
79 write_pool: Arc<CoreResourceThreadPool>,
80}
81
82impl SqliteEngine {
83 pub fn new(
85 base_dir: &Path,
86 db_info: &IndexedDBDescription,
87 pool: Arc<CoreResourceThreadPool>,
88 ) -> Result<Self, Error> {
89 let mut db_path = PathBuf::new();
90 db_path.push(base_dir);
91 db_path.push(db_info.as_path());
92 let db_parent = db_path.clone();
93 db_path.push("db.sqlite");
94
95 if !db_path.exists() {
96 std::fs::create_dir_all(db_parent).unwrap();
97 std::fs::File::create(&db_path).unwrap();
98 }
99 let connection = Self::init_db(&db_path, db_info)?;
100
101 for stmt in DB_PRAGMAS {
102 let _ = connection.execute(stmt, ());
104 }
105
106 Ok(Self {
107 connection,
108 db_path,
109 read_pool: pool.clone(),
110 write_pool: pool,
111 })
112 }
113
114 fn init_db(path: &Path, db_info: &IndexedDBDescription) -> Result<Connection, Error> {
115 let connection = Connection::open(path)?;
116 if connection.table_exists(None, "database")? {
117 return Ok(connection);
119 }
120 info!("Initializing indexeddb database at {:?}", path);
121 for stmt in DB_INIT_PRAGMAS {
122 let _ = connection.execute(stmt, ());
124 }
125 create::create_tables(&connection)?;
126 connection.execute(
129 "INSERT INTO database (name, origin, version) VALUES (?, ?, ?)",
130 params![
131 db_info.name.to_owned(),
132 db_info.origin.to_owned().ascii_serialization(),
133 i64::from_ne_bytes(0_u64.to_ne_bytes())
134 ],
135 )?;
136 Ok(connection)
137 }
138
139 fn get(
140 connection: &Connection,
141 store: object_store_model::Model,
142 key_range: IndexedDBKeyRange,
143 ) -> Result<Option<object_data_model::Model>, Error> {
144 let query = range_to_query(key_range);
145 let (sql, values) = sea_query::Query::select()
146 .from(object_data_model::Column::Table)
147 .columns(vec![
148 object_data_model::Column::ObjectStoreId,
149 object_data_model::Column::Key,
150 object_data_model::Column::Data,
151 ])
152 .and_where(query.and(Expr::col(object_data_model::Column::ObjectStoreId).is(store.id)))
153 .limit(1)
154 .build_rusqlite(SqliteQueryBuilder);
155 connection
156 .prepare(&sql)?
157 .query_one(&*values.as_params(), |row| {
158 object_data_model::Model::try_from(row)
159 })
160 .optional()
161 }
162
163 fn get_key(
164 connection: &Connection,
165 store: object_store_model::Model,
166 key_range: IndexedDBKeyRange,
167 ) -> Result<Option<Vec<u8>>, Error> {
168 Self::get(connection, store, key_range).map(|opt| opt.map(|model| model.key))
169 }
170
171 fn get_item(
172 connection: &Connection,
173 store: object_store_model::Model,
174 key_range: IndexedDBKeyRange,
175 ) -> Result<Option<Vec<u8>>, Error> {
176 Self::get(connection, store, key_range).map(|opt| opt.map(|model| model.data))
177 }
178
179 fn put_item(
180 connection: &Connection,
181 store: object_store_model::Model,
182 serialized_key: Vec<u8>,
183 value: Vec<u8>,
184 should_overwrite: bool,
185 ) -> Result<PutItemResult, Error> {
186 let existing_item = connection
187 .prepare("SELECT * FROM object_data WHERE key = ? AND object_store_id = ?")
188 .and_then(|mut stmt| {
189 stmt.query_row(params![serialized_key, store.id], |row| {
190 object_data_model::Model::try_from(row)
191 })
192 .optional()
193 })?;
194 if should_overwrite || existing_item.is_none() {
195 connection.execute(
196 "INSERT INTO object_data (object_store_id, key, data) VALUES (?, ?, ?)",
197 params![store.id, serialized_key, value],
198 )?;
199 Ok(PutItemResult::Success)
200 } else {
201 Ok(PutItemResult::CannotOverwrite)
202 }
203 }
204
205 fn delete_item(
206 connection: &Connection,
207 store: object_store_model::Model,
208 serialized_key: Vec<u8>,
209 ) -> Result<(), Error> {
210 connection.execute(
211 "DELETE FROM object_data WHERE key = ? AND object_store_id = ?",
212 params![serialized_key, store.id],
213 )?;
214 Ok(())
215 }
216
217 fn clear(connection: &Connection, store: object_store_model::Model) -> Result<(), Error> {
218 connection.execute(
219 "DELETE FROM object_data WHERE object_store_id = ?",
220 params![store.id],
221 )?;
222 Ok(())
223 }
224
225 fn count(
226 connection: &Connection,
227 store: object_store_model::Model,
228 key_range: IndexedDBKeyRange,
229 ) -> Result<usize, Error> {
230 let query = range_to_query(key_range);
231 let (sql, values) = sea_query::Query::select()
232 .expr(Expr::col(object_data_model::Column::Key).count())
233 .from(object_data_model::Column::Table)
234 .and_where(query.and(Expr::col(object_data_model::Column::ObjectStoreId).is(store.id)))
235 .build_rusqlite(SqliteQueryBuilder);
236 connection
237 .prepare(&sql)?
238 .query_row(&*values.as_params(), |row| row.get(0))
239 .map(|count: i64| count as usize)
240 }
241
242 fn generate_key(
243 connection: &Connection,
244 store: &object_store_model::Model,
245 ) -> Result<IndexedDBKeyType, Error> {
246 if store.auto_increment == 0 {
247 unreachable!("Should be caught in the script thread");
248 }
249 let new_key = store.auto_increment + 1;
251 connection.execute(
252 "UPDATE object_store SET auto_increment = ? WHERE id = ?",
253 params![new_key, store.id],
254 )?;
255 Ok(IndexedDBKeyType::Number(new_key as f64))
256 }
257}
258
259impl KvsEngine for SqliteEngine {
260 type Error = Error;
261
262 fn create_store(
263 &self,
264 store_name: &str,
265 key_path: Option<KeyPath>,
266 auto_increment: bool,
267 ) -> Result<CreateObjectResult, Self::Error> {
268 let mut stmt = self
269 .connection
270 .prepare("SELECT * FROM object_store WHERE name = ?")?;
271 if stmt.exists(params![store_name.to_string()])? {
272 return Ok(CreateObjectResult::AlreadyExists);
274 }
275 self.connection.execute(
276 "INSERT INTO object_store (name, key_path, auto_increment) VALUES (?, ?, ?)",
277 params![
278 store_name.to_string(),
279 key_path.map(|v| bincode::serialize(&v).unwrap()),
280 auto_increment as i32
281 ],
282 )?;
283
284 Ok(CreateObjectResult::Created)
285 }
286
287 fn delete_store(&self, store_name: &str) -> Result<(), Self::Error> {
288 let result = self.connection.execute(
289 "DELETE FROM object_store WHERE name = ?",
290 params![store_name.to_string()],
291 )?;
292 if result == 0 {
293 Err(Error::QueryReturnedNoRows)
294 } else if result > 1 {
295 Err(Error::QueryReturnedMoreThanOneRow)
296 } else {
297 Ok(())
298 }
299 }
300
301 fn close_store(&self, _store_name: &str) -> Result<(), Self::Error> {
302 Ok(())
304 }
305
306 fn delete_database(self) -> Result<(), Self::Error> {
307 let _ = self.connection.close();
309 if self.db_path.exists() {
310 if let Err(e) = std::fs::remove_dir_all(self.db_path.parent().unwrap()) {
311 error!("Failed to delete database: {:?}", e);
312 }
313 }
314 Ok(())
315 }
316
317 fn process_transaction(
318 &self,
319 transaction: KvsTransaction,
320 ) -> oneshot::Receiver<Option<Vec<u8>>> {
321 let (tx, rx) = oneshot::channel();
322
323 let spawning_pool = if transaction.mode == IndexedDBTxnMode::Readonly {
324 self.read_pool.clone()
325 } else {
326 self.write_pool.clone()
327 };
328 let path = self.db_path.clone();
329 spawning_pool.spawn(move || {
330 let connection = Connection::open(path).unwrap();
331 for request in transaction.requests {
332 let object_store = connection
333 .prepare("SELECT * FROM object_store WHERE name = ?")
334 .and_then(|mut stmt| {
335 stmt.query_row(params![request.store_name.to_string()], |row| {
336 object_store_model::Model::try_from(row)
337 })
338 .optional()
339 });
340 fn process_object_store<T>(
341 object_store: Result<Option<object_store_model::Model>, Error>,
342 sender: &IpcSender<BackendResult<T>>,
343 ) -> Result<object_store_model::Model, ()>
344 where
345 T: Serialize,
346 {
347 match object_store {
348 Ok(Some(store)) => Ok(store),
349 Ok(None) => {
350 let _ = sender.send(Err(BackendError::StoreNotFound));
351 Err(())
352 },
353 Err(e) => {
354 let _ = sender.send(Err(BackendError::DbErr(format!("{:?}", e))));
355 Err(())
356 },
357 }
358 }
359
360 match request.operation {
361 AsyncOperation::ReadWrite(AsyncReadWriteOperation::PutItem {
362 sender,
363 key,
364 value,
365 should_overwrite,
366 }) => {
367 let Ok(object_store) = process_object_store(object_store, &sender) else {
368 continue;
369 };
370 let key = match key
371 .map(Ok)
372 .unwrap_or_else(|| Self::generate_key(&connection, &object_store))
373 {
374 Ok(key) => key,
375 Err(e) => {
376 let _ = sender.send(Err(BackendError::DbErr(format!("{:?}", e))));
377 continue;
378 },
379 };
380 let serialized_key: Vec<u8> = bincode::serialize(&key).unwrap();
381 let _ = sender.send(
382 Self::put_item(
383 &connection,
384 object_store,
385 serialized_key,
386 value,
387 should_overwrite,
388 )
389 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
390 );
391 },
392 AsyncOperation::ReadOnly(AsyncReadOnlyOperation::GetItem {
393 sender,
394 key_range,
395 }) => {
396 let Ok(object_store) = process_object_store(object_store, &sender) else {
397 continue;
398 };
399 let _ = sender.send(
400 Self::get_item(&connection, object_store, key_range)
401 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
402 );
403 },
404 AsyncOperation::ReadWrite(AsyncReadWriteOperation::RemoveItem {
405 sender,
406 key,
407 }) => {
408 let Ok(object_store) = process_object_store(object_store, &sender) else {
409 continue;
410 };
411 let serialized_key: Vec<u8> = bincode::serialize(&key).unwrap();
412 let _ = sender.send(
413 Self::delete_item(&connection, object_store, serialized_key)
414 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
415 );
416 },
417 AsyncOperation::ReadOnly(AsyncReadOnlyOperation::Count {
418 sender,
419 key_range,
420 }) => {
421 let Ok(object_store) = process_object_store(object_store, &sender) else {
422 continue;
423 };
424 let _ = sender.send(
425 Self::count(&connection, object_store, key_range)
426 .map(|r| r as u64)
427 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
428 );
429 },
430 AsyncOperation::ReadWrite(AsyncReadWriteOperation::Clear(sender)) => {
431 let Ok(object_store) = process_object_store(object_store, &sender) else {
432 continue;
433 };
434 let _ = sender.send(
435 Self::clear(&connection, object_store)
436 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
437 );
438 },
439 AsyncOperation::ReadOnly(AsyncReadOnlyOperation::GetKey {
440 sender,
441 key_range,
442 }) => {
443 let Ok(object_store) = process_object_store(object_store, &sender) else {
444 continue;
445 };
446 let _ = sender.send(
447 Self::get_key(&connection, object_store, key_range)
448 .map(|key| key.map(|k| bincode::deserialize(&k).unwrap()))
449 .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
450 );
451 },
452 }
453 }
454 let _ = tx.send(None);
455 });
456 rx
457 }
458
459 fn has_key_generator(&self, store_name: &str) -> bool {
461 self.connection
462 .prepare("SELECT * FROM object_store WHERE name = ?")
463 .and_then(|mut stmt| {
464 stmt.query_row(params![store_name.to_string()], |r| {
465 let object_store = object_store_model::Model::try_from(r).unwrap();
466 Ok(object_store.auto_increment)
467 })
468 })
469 .optional()
470 .unwrap()
471 .unwrap_or_default() !=
473 0
474 }
475
476 fn key_path(&self, store_name: &str) -> Option<KeyPath> {
477 self.connection
478 .prepare("SELECT * FROM object_store WHERE name = ?")
479 .and_then(|mut stmt| {
480 stmt.query_row(params![store_name.to_string()], |r| {
481 let object_store = object_store_model::Model::try_from(r).unwrap();
482 Ok(object_store
483 .key_path
484 .map(|key_path| bincode::deserialize(&key_path).unwrap()))
485 })
486 })
487 .optional()
488 .unwrap()
489 .unwrap_or_default()
491 }
492
493 fn create_index(
494 &self,
495 store_name: &str,
496 index_name: String,
497 key_path: KeyPath,
498 unique: bool,
499 multi_entry: bool,
500 ) -> Result<CreateObjectResult, Self::Error> {
501 let object_store = self.connection.query_row(
502 "SELECT * FROM object_store WHERE name = ?",
503 params![store_name.to_string()],
504 |row| object_store_model::Model::try_from(row),
505 )?;
506
507 let index_exists: bool = self.connection.query_row(
508 "SELECT EXISTS(SELECT * FROM object_store_index WHERE name = ? AND object_store_id = ?)",
509 params![index_name.to_string(), object_store.id],
510 |row| row.get(0),
511 )?;
512 if index_exists {
513 return Ok(CreateObjectResult::AlreadyExists);
514 }
515
516 self.connection.execute(
517 "INSERT INTO object_store_index (object_store_id, name, key_path, unique_index, multi_entry_index)\
518 VALUES (?, ?, ?, ?, ?)",
519 params![
520 object_store.id,
521 index_name.to_string(),
522 bincode::serialize(&key_path).unwrap(),
523 unique,
524 multi_entry,
525 ],
526 )?;
527 Ok(CreateObjectResult::Created)
528 }
529
530 fn delete_index(&self, store_name: &str, index_name: String) -> Result<(), Self::Error> {
531 let object_store = self.connection.query_row(
532 "SELECT * FROM object_store WHERE name = ?",
533 params![store_name.to_string()],
534 |r| Ok(object_store_model::Model::try_from(r).unwrap()),
535 )?;
536
537 let _ = self.connection.execute(
539 "DELETE FROM object_store_index WHERE name = ? AND object_store_id = ?",
540 params![index_name.to_string(), object_store.id],
541 )?;
542 Ok(())
543 }
544
545 fn version(&self) -> Result<u64, Self::Error> {
546 let version: i64 =
547 self.connection
548 .query_row("SELECT version FROM database LIMIT 1", [], |row| row.get(0))?;
549 Ok(u64::from_ne_bytes(version.to_ne_bytes()))
550 }
551
552 fn set_version(&self, version: u64) -> Result<(), Self::Error> {
553 let rows_affected = self.connection.execute(
554 "UPDATE database SET version = ?",
555 params![i64::from_ne_bytes(version.to_ne_bytes())],
556 )?;
557 if rows_affected == 0 {
558 return Err(Error::QueryReturnedNoRows);
559 }
560 Ok(())
561 }
562}