Skip to content
Snippets Groups Projects
Unverified Commit 624cd2ac authored by Paul Robertson's avatar Paul Robertson
Browse files

remove sqlite code

parent fb758f5f
No related branches found
No related tags found
6 merge requests!483drop sqlite support,!480Misc,!479sha256 media default,!475Room Alias related,!476Misc Admin related,!462SQLighter 🔥
......@@ -25,12 +25,6 @@
#[derive(Error)]
pub enum Error {
#[cfg(feature = "sqlite")]
#[error("There was a problem with the connection to the sqlite database: {source}")]
Sqlite {
#[from]
source: rusqlite::Error,
},
#[cfg(feature = "rocksdb")]
#[error("There was a problem with the connection to the rocksdb database: {source}")]
RocksDb {
......@@ -120,10 +114,6 @@ pub fn sanitized_error(&self) -> String {
let db_error = String::from("Database or I/O error occurred.");
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite {
..
} => db_error,
#[cfg(feature = "rocksdb")]
Self::RocksDb {
..
......
......@@ -274,13 +274,6 @@ pub async fn load_or_create(server: &Arc<Server>) -> Result<Self> {
fn build(config: &Config) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
match &*config.database_backend {
"sqlite" => {
debug!("Got sqlite database backend");
#[cfg(not(feature = "sqlite"))]
return Err(Error::bad_config("Database backend not found."));
#[cfg(feature = "sqlite")]
Ok(Arc::new(Arc::<crate::sqlite::Engine>::open(config)?))
},
"rocksdb" => {
debug!("Got rocksdb database backend");
#[cfg(not(feature = "rocksdb"))]
......@@ -289,7 +282,7 @@ fn build(config: &Config) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
Ok(Arc::new(Arc::<crate::rocksdb::Engine>::open(config)?))
},
_ => Err(Error::bad_config(
"Database backend not found. sqlite (not recommended) and rocksdb are the only supported backends.",
"Database backend not found. rocksdb is the only supported backend.",
)),
}
}
......@@ -297,19 +290,8 @@ fn build(config: &Config) -> Result<Arc<dyn KeyValueDatabaseEngine>> {
fn check_db_setup(config: &Config) -> Result<()> {
let path = Path::new(&config.database_path);
let sqlite_exists = path.join("conduit.db").exists();
let rocksdb_exists = path.join("IDENTITY").exists();
if sqlite_exists && rocksdb_exists {
return Err(Error::bad_config("Multiple databases at database_path detected."));
}
if sqlite_exists && config.database_backend != "sqlite" {
return Err(Error::bad_config(
"Found sqlite at database_path, but is not specified in config.",
));
}
if rocksdb_exists && config.database_backend != "rocksdb" {
return Err(Error::bad_config(
"Found rocksdb at database_path, but is not specified in config.",
......
......@@ -6,10 +6,7 @@
#[cfg(feature = "rocksdb")]
pub(crate) mod rocksdb;
#[cfg(feature = "sqlite")]
pub(crate) mod sqlite;
#[cfg(any(feature = "sqlite", feature = "rocksdb"))]
#[cfg(any(feature = "rocksdb"))]
pub(crate) mod watchers;
extern crate conduit_core as conduit;
......
use std::{
cell::RefCell,
future::Future,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use conduit::{Config, Result};
use parking_lot::{Mutex, MutexGuard};
use rusqlite::{Connection, DatabaseName::Main, OptionalExtension};
use thread_local::ThreadLocal;
use tracing::debug;
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
thread_local! {
static READ_CONNECTION: RefCell<Option<&'static Connection>> = const { RefCell::new(None) };
static READ_CONNECTION_ITERATOR: RefCell<Option<&'static Connection>> = const { RefCell::new(None) };
}
struct PreparedStatementIterator<'a> {
iterator: Box<dyn Iterator<Item = TupleOfBytes> + 'a>,
_statement_ref: AliasableBox<rusqlite::Statement<'a>>,
}
impl Iterator for PreparedStatementIterator<'_> {
type Item = TupleOfBytes;
fn next(&mut self) -> Option<Self::Item> { self.iterator.next() }
}
struct AliasableBox<T>(*mut T);
impl<T> Drop for AliasableBox<T> {
fn drop(&mut self) {
// SAFETY: This is cursed and relies on non-local reasoning.
//
// In order for this to be safe:
//
// * All aliased references to this value must have been dropped first, for
// example by coming after its referrers in struct fields, because struct
// fields are automatically dropped in order from top to bottom in the absence
// of an explicit Drop impl. Otherwise, the referrers may read into
// deallocated memory.
// * This type must not be copyable or cloneable. Otherwise, double-free can
// occur.
//
// These conditions are met, but again, note that changing safe code in
// this module can result in unsoundness if any of these constraints are
// violated.
unsafe { drop(Box::from_raw(self.0)) }
}
}
pub(crate) struct Engine {
writer: Mutex<Connection>,
read_conn_tls: ThreadLocal<Connection>,
read_iterator_conn_tls: ThreadLocal<Connection>,
path: PathBuf,
cache_size_per_thread: u32,
}
impl Engine {
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
let conn = Connection::open(path)?;
conn.pragma_update(Some(Main), "page_size", 2048)?;
conn.pragma_update(Some(Main), "journal_mode", "WAL")?;
conn.pragma_update(Some(Main), "synchronous", "NORMAL")?;
conn.pragma_update(Some(Main), "cache_size", -i64::from(cache_size_kb))?;
conn.pragma_update(Some(Main), "wal_autocheckpoint", 0)?;
Ok(conn)
}
fn write_lock(&self) -> MutexGuard<'_, Connection> { self.writer.lock() }
fn read_lock(&self) -> &Connection {
self.read_conn_tls
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
}
fn read_lock_iterator(&self) -> &Connection {
self.read_iterator_conn_tls
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
}
fn flush_wal(self: &Arc<Self>) -> Result<()> {
self.write_lock()
.pragma_update(Some(Main), "wal_checkpoint", "RESTART")?;
Ok(())
}
}
impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> {
let path = Path::new(&config.database_path).join("conduit.db");
// calculates cache-size per permanent connection
// 1. convert MB to KiB
// 2. divide by permanent connections + permanent iter connections + write
// connection
// 3. round down to nearest integer
#[allow(
clippy::as_conversions,
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss
)]
let cache_size_per_thread = ((config.db_cache_capacity_mb * 1024.0)
/ (conduit::utils::available_parallelism() as f64).mul_add(2.0, 1.0)) as u32;
let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
let arc = Self::new(Engine {
writer,
read_conn_tls: ThreadLocal::new(),
read_iterator_conn_tls: ThreadLocal::new(),
path,
cache_size_per_thread,
});
Ok(arc)
}
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
self.write_lock().execute(
&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"),
[],
)?;
Ok(Arc::new(SqliteTable {
engine: Self::clone(self),
name: name.to_owned(),
watchers: Watchers::default(),
}))
}
fn flush(&self) -> Result<()> {
// we enabled PRAGMA synchronous=normal, so this should not be necessary
Ok(())
}
fn cleanup(&self) -> Result<()> { self.flush_wal() }
}
struct SqliteTable {
engine: Arc<Engine>,
name: String,
watchers: Watchers,
}
type TupleOfBytes = (Vec<u8>, Vec<u8>);
impl SqliteTable {
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(guard
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
.query_row([key], |row| row.get(0))
.optional()?)
}
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
guard.execute(
format!("INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", self.name).as_str(),
[key, value],
)?;
Ok(())
}
fn iter_with_guard<'a>(&'a self, guard: &'a Connection) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let statement = Box::leak(Box::new(
guard
.prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name))
.unwrap(),
));
let statement_ref = AliasableBox(statement);
//let name = self.name.clone();
let iterator = Box::new(
statement
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(Result::unwrap),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
}
}
impl KvTree for SqliteTable {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { self.get_with_guard(self.engine.read_lock(), key) }
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
let guard = self.engine.write_lock();
self.insert_with_guard(&guard, key, value)?;
drop(guard);
self.watchers.wake(key);
Ok(())
}
fn insert_batch(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
let guard = self.engine.write_lock();
guard.execute("BEGIN", [])?;
for (key, value) in iter {
self.insert_with_guard(&guard, &key, &value)?;
}
guard.execute("COMMIT", [])?;
drop(guard);
Ok(())
}
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let guard = self.engine.write_lock();
guard.execute("BEGIN", [])?;
for key in iter {
let old = self.get_with_guard(&guard, &key)?;
let new = conduit::utils::increment(old.as_deref());
self.insert_with_guard(&guard, &key, &new)?;
}
guard.execute("COMMIT", [])?;
drop(guard);
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
let guard = self.engine.write_lock();
guard.execute(format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), [key])?;
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let guard = self.engine.read_lock_iterator();
self.iter_with_guard(guard)
}
fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let guard = self.engine.read_lock_iterator();
let from = from.to_vec(); // TODO change interface?
//let name = self.name.clone();
if backwards {
let statement = Box::leak(Box::new(
guard
.prepare(&format!(
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC",
&self.name
))
.unwrap(),
));
let statement_ref = AliasableBox(statement);
let iterator = Box::new(
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(Result::unwrap),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
} else {
let statement = Box::leak(Box::new(
guard
.prepare(&format!(
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC",
&self.name
))
.unwrap(),
));
let statement_ref = AliasableBox(statement);
let iterator = Box::new(
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(Result::unwrap),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
}
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
let guard = self.engine.write_lock();
let old = self.get_with_guard(&guard, key)?;
let new = conduit::utils::increment(old.as_deref());
self.insert_with_guard(&guard, key, &new)?;
Ok(new)
}
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
Box::new(
self.iter_from(&prefix, false)
.take_while(move |(key, _)| key.starts_with(&prefix)),
)
}
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.watchers.watch(prefix)
}
fn clear(&self) -> Result<()> {
debug!("clear: running");
self.engine
.write_lock()
.execute(format!("DELETE FROM {}", self.name).as_str(), [])?;
debug!("clear: ran");
Ok(())
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment