net/indexeddb/engines/
sqlite.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use ipc_channel::ipc::IpcSender;
8use log::{error, info};
9use net_traits::indexeddb_thread::{
10    AsyncOperation, AsyncReadOnlyOperation, AsyncReadWriteOperation, BackendError, BackendResult,
11    CreateObjectResult, IndexedDBKeyRange, IndexedDBKeyType, IndexedDBTxnMode, KeyPath,
12    PutItemResult,
13};
14use rusqlite::{Connection, Error, OptionalExtension, params};
15use sea_query::{Condition, Expr, ExprTrait, IntoCondition, SqliteQueryBuilder};
16use sea_query_rusqlite::RusqliteBinder;
17use serde::Serialize;
18use tokio::sync::oneshot;
19
20use crate::indexeddb::engines::{KvsEngine, KvsTransaction};
21use crate::indexeddb::idb_thread::IndexedDBDescription;
22use crate::resource_thread::CoreResourceThreadPool;
23
24mod create;
25mod database_model;
26mod object_data_model;
27mod object_store_index_model;
28mod object_store_model;
29
30// These pragmas need to be set once
31const DB_INIT_PRAGMAS: [&str; 2] = ["PRAGMA journal_mode = WAL;", "PRAGMA encoding = 'UTF-16';"];
32
33// These pragmas need to be run once per connection.
34const DB_PRAGMAS: [&str; 4] = [
35    "PRAGMA synchronous = NORMAL;",
36    "PRAGMA journal_size_limit = 67108864 -- 64 megabytes;",
37    "PRAGMA mmap_size = 67108864 -- 64 megabytes;",
38    "PRAGMA cache_size = 2000;",
39];
40
41fn range_to_query(range: IndexedDBKeyRange) -> Condition {
42    // Special case for optimization
43    if let Some(singleton) = range.as_singleton() {
44        let encoded = bincode::serialize(singleton).unwrap();
45        return Expr::column(object_data_model::Column::Key)
46            .eq(encoded)
47            .into_condition();
48    }
49    let mut parts = vec![];
50    if let Some(upper) = range.upper.as_ref() {
51        let upper_bytes = bincode::serialize(upper).unwrap();
52        let query = if range.upper_open {
53            Expr::column(object_data_model::Column::Key).lt(upper_bytes)
54        } else {
55            Expr::column(object_data_model::Column::Key).lte(upper_bytes)
56        };
57        parts.push(query);
58    }
59    if let Some(lower) = range.lower.as_ref() {
60        let lower_bytes = bincode::serialize(lower).unwrap();
61        let query = if range.upper_open {
62            Expr::column(object_data_model::Column::Key).gt(lower_bytes)
63        } else {
64            Expr::column(object_data_model::Column::Key).gte(lower_bytes)
65        };
66        parts.push(query);
67    }
68    let mut condition = Condition::all();
69    for part in parts {
70        condition = condition.add(part);
71    }
72    condition
73}
74
75pub struct SqliteEngine {
76    db_path: PathBuf,
77    connection: Connection,
78    read_pool: Arc<CoreResourceThreadPool>,
79    write_pool: Arc<CoreResourceThreadPool>,
80}
81
82impl SqliteEngine {
83    // TODO: intake dual pools
84    pub fn new(
85        base_dir: &Path,
86        db_info: &IndexedDBDescription,
87        pool: Arc<CoreResourceThreadPool>,
88    ) -> Result<Self, Error> {
89        let mut db_path = PathBuf::new();
90        db_path.push(base_dir);
91        db_path.push(db_info.as_path());
92        let db_parent = db_path.clone();
93        db_path.push("db.sqlite");
94
95        if !db_path.exists() {
96            std::fs::create_dir_all(db_parent).unwrap();
97            std::fs::File::create(&db_path).unwrap();
98        }
99        let connection = Self::init_db(&db_path, db_info)?;
100
101        for stmt in DB_PRAGMAS {
102            // TODO: Handle errors properly
103            let _ = connection.execute(stmt, ());
104        }
105
106        Ok(Self {
107            connection,
108            db_path,
109            read_pool: pool.clone(),
110            write_pool: pool,
111        })
112    }
113
114    fn init_db(path: &Path, db_info: &IndexedDBDescription) -> Result<Connection, Error> {
115        let connection = Connection::open(path)?;
116        if connection.table_exists(None, "database")? {
117            // Database already exists, no need to initialize
118            return Ok(connection);
119        }
120        info!("Initializing indexeddb database at {:?}", path);
121        for stmt in DB_INIT_PRAGMAS {
122            // FIXME(arihant2math): this fails occasionally
123            let _ = connection.execute(stmt, ());
124        }
125        create::create_tables(&connection)?;
126        // From https://w3c.github.io/IndexedDB/#database-version:
127        // "When a database is first created, its version is 0 (zero)."
128        connection.execute(
129            "INSERT INTO database (name, origin, version) VALUES (?, ?, ?)",
130            params![
131                db_info.name.to_owned(),
132                db_info.origin.to_owned().ascii_serialization(),
133                i64::from_ne_bytes(0_u64.to_ne_bytes())
134            ],
135        )?;
136        Ok(connection)
137    }
138
139    fn get(
140        connection: &Connection,
141        store: object_store_model::Model,
142        key_range: IndexedDBKeyRange,
143    ) -> Result<Option<object_data_model::Model>, Error> {
144        let query = range_to_query(key_range);
145        let (sql, values) = sea_query::Query::select()
146            .from(object_data_model::Column::Table)
147            .columns(vec![
148                object_data_model::Column::ObjectStoreId,
149                object_data_model::Column::Key,
150                object_data_model::Column::Data,
151            ])
152            .and_where(query.and(Expr::col(object_data_model::Column::ObjectStoreId).is(store.id)))
153            .limit(1)
154            .build_rusqlite(SqliteQueryBuilder);
155        connection
156            .prepare(&sql)?
157            .query_one(&*values.as_params(), |row| {
158                object_data_model::Model::try_from(row)
159            })
160            .optional()
161    }
162
163    fn get_key(
164        connection: &Connection,
165        store: object_store_model::Model,
166        key_range: IndexedDBKeyRange,
167    ) -> Result<Option<Vec<u8>>, Error> {
168        Self::get(connection, store, key_range).map(|opt| opt.map(|model| model.key))
169    }
170
171    fn get_item(
172        connection: &Connection,
173        store: object_store_model::Model,
174        key_range: IndexedDBKeyRange,
175    ) -> Result<Option<Vec<u8>>, Error> {
176        Self::get(connection, store, key_range).map(|opt| opt.map(|model| model.data))
177    }
178
179    fn put_item(
180        connection: &Connection,
181        store: object_store_model::Model,
182        serialized_key: Vec<u8>,
183        value: Vec<u8>,
184        should_overwrite: bool,
185    ) -> Result<PutItemResult, Error> {
186        let existing_item = connection
187            .prepare("SELECT * FROM object_data WHERE key = ? AND object_store_id = ?")
188            .and_then(|mut stmt| {
189                stmt.query_row(params![serialized_key, store.id], |row| {
190                    object_data_model::Model::try_from(row)
191                })
192                .optional()
193            })?;
194        if should_overwrite || existing_item.is_none() {
195            connection.execute(
196                "INSERT INTO object_data (object_store_id, key, data) VALUES (?, ?, ?)",
197                params![store.id, serialized_key, value],
198            )?;
199            Ok(PutItemResult::Success)
200        } else {
201            Ok(PutItemResult::CannotOverwrite)
202        }
203    }
204
205    fn delete_item(
206        connection: &Connection,
207        store: object_store_model::Model,
208        serialized_key: Vec<u8>,
209    ) -> Result<(), Error> {
210        connection.execute(
211            "DELETE FROM object_data WHERE key = ? AND object_store_id = ?",
212            params![serialized_key, store.id],
213        )?;
214        Ok(())
215    }
216
217    fn clear(connection: &Connection, store: object_store_model::Model) -> Result<(), Error> {
218        connection.execute(
219            "DELETE FROM object_data WHERE object_store_id = ?",
220            params![store.id],
221        )?;
222        Ok(())
223    }
224
225    fn count(
226        connection: &Connection,
227        store: object_store_model::Model,
228        key_range: IndexedDBKeyRange,
229    ) -> Result<usize, Error> {
230        let query = range_to_query(key_range);
231        let (sql, values) = sea_query::Query::select()
232            .expr(Expr::col(object_data_model::Column::Key).count())
233            .from(object_data_model::Column::Table)
234            .and_where(query.and(Expr::col(object_data_model::Column::ObjectStoreId).is(store.id)))
235            .build_rusqlite(SqliteQueryBuilder);
236        connection
237            .prepare(&sql)?
238            .query_row(&*values.as_params(), |row| row.get(0))
239            .map(|count: i64| count as usize)
240    }
241
242    fn generate_key(
243        connection: &Connection,
244        store: &object_store_model::Model,
245    ) -> Result<IndexedDBKeyType, Error> {
246        if store.auto_increment == 0 {
247            unreachable!("Should be caught in the script thread");
248        }
249        // TODO: handle overflows, this also needs to be able to handle 2^53 as per spec
250        let new_key = store.auto_increment + 1;
251        connection.execute(
252            "UPDATE object_store SET auto_increment = ? WHERE id = ?",
253            params![new_key, store.id],
254        )?;
255        Ok(IndexedDBKeyType::Number(new_key as f64))
256    }
257}
258
259impl KvsEngine for SqliteEngine {
260    type Error = Error;
261
262    fn create_store(
263        &self,
264        store_name: &str,
265        key_path: Option<KeyPath>,
266        auto_increment: bool,
267    ) -> Result<CreateObjectResult, Self::Error> {
268        let mut stmt = self
269            .connection
270            .prepare("SELECT * FROM object_store WHERE name = ?")?;
271        if stmt.exists(params![store_name.to_string()])? {
272            // Store already exists
273            return Ok(CreateObjectResult::AlreadyExists);
274        }
275        self.connection.execute(
276            "INSERT INTO object_store (name, key_path, auto_increment) VALUES (?, ?, ?)",
277            params![
278                store_name.to_string(),
279                key_path.map(|v| bincode::serialize(&v).unwrap()),
280                auto_increment as i32
281            ],
282        )?;
283
284        Ok(CreateObjectResult::Created)
285    }
286
287    fn delete_store(&self, store_name: &str) -> Result<(), Self::Error> {
288        let result = self.connection.execute(
289            "DELETE FROM object_store WHERE name = ?",
290            params![store_name.to_string()],
291        )?;
292        if result == 0 {
293            Err(Error::QueryReturnedNoRows)
294        } else if result > 1 {
295            Err(Error::QueryReturnedMoreThanOneRow)
296        } else {
297            Ok(())
298        }
299    }
300
301    fn close_store(&self, _store_name: &str) -> Result<(), Self::Error> {
302        // TODO: do something
303        Ok(())
304    }
305
306    fn delete_database(self) -> Result<(), Self::Error> {
307        // attempt to close the connection first
308        let _ = self.connection.close();
309        if self.db_path.exists() {
310            if let Err(e) = std::fs::remove_dir_all(self.db_path.parent().unwrap()) {
311                error!("Failed to delete database: {:?}", e);
312            }
313        }
314        Ok(())
315    }
316
317    fn process_transaction(
318        &self,
319        transaction: KvsTransaction,
320    ) -> oneshot::Receiver<Option<Vec<u8>>> {
321        let (tx, rx) = oneshot::channel();
322
323        let spawning_pool = if transaction.mode == IndexedDBTxnMode::Readonly {
324            self.read_pool.clone()
325        } else {
326            self.write_pool.clone()
327        };
328        let path = self.db_path.clone();
329        spawning_pool.spawn(move || {
330            let connection = Connection::open(path).unwrap();
331            for request in transaction.requests {
332                let object_store = connection
333                    .prepare("SELECT * FROM object_store WHERE name = ?")
334                    .and_then(|mut stmt| {
335                        stmt.query_row(params![request.store_name.to_string()], |row| {
336                            object_store_model::Model::try_from(row)
337                        })
338                        .optional()
339                    });
340                fn process_object_store<T>(
341                    object_store: Result<Option<object_store_model::Model>, Error>,
342                    sender: &IpcSender<BackendResult<T>>,
343                ) -> Result<object_store_model::Model, ()>
344                where
345                    T: Serialize,
346                {
347                    match object_store {
348                        Ok(Some(store)) => Ok(store),
349                        Ok(None) => {
350                            let _ = sender.send(Err(BackendError::StoreNotFound));
351                            Err(())
352                        },
353                        Err(e) => {
354                            let _ = sender.send(Err(BackendError::DbErr(format!("{:?}", e))));
355                            Err(())
356                        },
357                    }
358                }
359
360                match request.operation {
361                    AsyncOperation::ReadWrite(AsyncReadWriteOperation::PutItem {
362                        sender,
363                        key,
364                        value,
365                        should_overwrite,
366                    }) => {
367                        let Ok(object_store) = process_object_store(object_store, &sender) else {
368                            continue;
369                        };
370                        let key = match key
371                            .map(Ok)
372                            .unwrap_or_else(|| Self::generate_key(&connection, &object_store))
373                        {
374                            Ok(key) => key,
375                            Err(e) => {
376                                let _ = sender.send(Err(BackendError::DbErr(format!("{:?}", e))));
377                                continue;
378                            },
379                        };
380                        let serialized_key: Vec<u8> = bincode::serialize(&key).unwrap();
381                        let _ = sender.send(
382                            Self::put_item(
383                                &connection,
384                                object_store,
385                                serialized_key,
386                                value,
387                                should_overwrite,
388                            )
389                            .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
390                        );
391                    },
392                    AsyncOperation::ReadOnly(AsyncReadOnlyOperation::GetItem {
393                        sender,
394                        key_range,
395                    }) => {
396                        let Ok(object_store) = process_object_store(object_store, &sender) else {
397                            continue;
398                        };
399                        let _ = sender.send(
400                            Self::get_item(&connection, object_store, key_range)
401                                .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
402                        );
403                    },
404                    AsyncOperation::ReadWrite(AsyncReadWriteOperation::RemoveItem {
405                        sender,
406                        key,
407                    }) => {
408                        let Ok(object_store) = process_object_store(object_store, &sender) else {
409                            continue;
410                        };
411                        let serialized_key: Vec<u8> = bincode::serialize(&key).unwrap();
412                        let _ = sender.send(
413                            Self::delete_item(&connection, object_store, serialized_key)
414                                .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
415                        );
416                    },
417                    AsyncOperation::ReadOnly(AsyncReadOnlyOperation::Count {
418                        sender,
419                        key_range,
420                    }) => {
421                        let Ok(object_store) = process_object_store(object_store, &sender) else {
422                            continue;
423                        };
424                        let _ = sender.send(
425                            Self::count(&connection, object_store, key_range)
426                                .map(|r| r as u64)
427                                .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
428                        );
429                    },
430                    AsyncOperation::ReadWrite(AsyncReadWriteOperation::Clear(sender)) => {
431                        let Ok(object_store) = process_object_store(object_store, &sender) else {
432                            continue;
433                        };
434                        let _ = sender.send(
435                            Self::clear(&connection, object_store)
436                                .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
437                        );
438                    },
439                    AsyncOperation::ReadOnly(AsyncReadOnlyOperation::GetKey {
440                        sender,
441                        key_range,
442                    }) => {
443                        let Ok(object_store) = process_object_store(object_store, &sender) else {
444                            continue;
445                        };
446                        let _ = sender.send(
447                            Self::get_key(&connection, object_store, key_range)
448                                .map(|key| key.map(|k| bincode::deserialize(&k).unwrap()))
449                                .map_err(|e| BackendError::DbErr(format!("{:?}", e))),
450                        );
451                    },
452                }
453            }
454            let _ = tx.send(None);
455        });
456        rx
457    }
458
459    // TODO: we should be able to error out here, maybe change the trait definition?
460    fn has_key_generator(&self, store_name: &str) -> bool {
461        self.connection
462            .prepare("SELECT * FROM object_store WHERE name = ?")
463            .and_then(|mut stmt| {
464                stmt.query_row(params![store_name.to_string()], |r| {
465                    let object_store = object_store_model::Model::try_from(r).unwrap();
466                    Ok(object_store.auto_increment)
467                })
468            })
469            .optional()
470            .unwrap()
471            // TODO: Wrong (change trait definition for this function)
472            .unwrap_or_default() !=
473            0
474    }
475
476    fn key_path(&self, store_name: &str) -> Option<KeyPath> {
477        self.connection
478            .prepare("SELECT * FROM object_store WHERE name = ?")
479            .and_then(|mut stmt| {
480                stmt.query_row(params![store_name.to_string()], |r| {
481                    let object_store = object_store_model::Model::try_from(r).unwrap();
482                    Ok(object_store
483                        .key_path
484                        .map(|key_path| bincode::deserialize(&key_path).unwrap()))
485                })
486            })
487            .optional()
488            .unwrap()
489            // TODO: Wrong, same issues as has_key_generator
490            .unwrap_or_default()
491    }
492
493    fn create_index(
494        &self,
495        store_name: &str,
496        index_name: String,
497        key_path: KeyPath,
498        unique: bool,
499        multi_entry: bool,
500    ) -> Result<CreateObjectResult, Self::Error> {
501        let object_store = self.connection.query_row(
502            "SELECT * FROM object_store WHERE name = ?",
503            params![store_name.to_string()],
504            |row| object_store_model::Model::try_from(row),
505        )?;
506
507        let index_exists: bool = self.connection.query_row(
508            "SELECT EXISTS(SELECT * FROM object_store_index WHERE name = ? AND object_store_id = ?)",
509            params![index_name.to_string(), object_store.id],
510            |row| row.get(0),
511        )?;
512        if index_exists {
513            return Ok(CreateObjectResult::AlreadyExists);
514        }
515
516        self.connection.execute(
517            "INSERT INTO object_store_index (object_store_id, name, key_path, unique_index, multi_entry_index)\
518            VALUES (?, ?, ?, ?, ?)",
519            params![
520                object_store.id,
521                index_name.to_string(),
522                bincode::serialize(&key_path).unwrap(),
523                unique,
524                multi_entry,
525            ],
526        )?;
527        Ok(CreateObjectResult::Created)
528    }
529
530    fn delete_index(&self, store_name: &str, index_name: String) -> Result<(), Self::Error> {
531        let object_store = self.connection.query_row(
532            "SELECT * FROM object_store WHERE name = ?",
533            params![store_name.to_string()],
534            |r| Ok(object_store_model::Model::try_from(r).unwrap()),
535        )?;
536
537        // Delete the index if it exists
538        let _ = self.connection.execute(
539            "DELETE FROM object_store_index WHERE name = ? AND object_store_id = ?",
540            params![index_name.to_string(), object_store.id],
541        )?;
542        Ok(())
543    }
544
545    fn version(&self) -> Result<u64, Self::Error> {
546        let version: i64 =
547            self.connection
548                .query_row("SELECT version FROM database LIMIT 1", [], |row| row.get(0))?;
549        Ok(u64::from_ne_bytes(version.to_ne_bytes()))
550    }
551
552    fn set_version(&self, version: u64) -> Result<(), Self::Error> {
553        let rows_affected = self.connection.execute(
554            "UPDATE database SET version = ?",
555            params![i64::from_ne_bytes(version.to_ne_bytes())],
556        )?;
557        if rows_affected == 0 {
558            return Err(Error::QueryReturnedNoRows);
559        }
560        Ok(())
561    }
562}