diff --git a/Cargo.lock b/Cargo.lock index 347f235b5c752742fba70921f769a7c42030c582..485aeab16b33d4baca2b56083e49c29b85abe69a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -944,7 +944,7 @@ dependencies = [ [[package]] name = "heed" version = "0.10.6" -source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" +source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1" dependencies = [ "bytemuck", "byteorder", @@ -962,12 +962,12 @@ dependencies = [ [[package]] name = "heed-traits" version = "0.7.0" -source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" +source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1" [[package]] name = "heed-types" version = "0.7.2" -source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" +source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1" dependencies = [ "bincode", "bytemuck", @@ -2586,9 +2586,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28c5e91e4240b46c4c19219d6cc84784444326131a4210f496f948d5cc827a29" +checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 1d774dd369e070d9f6f97752dfd880829fb8db61..19ce6b101612958842a9921837567e0b5305a487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,10 +79,10 @@ parking_lot = { version = "0.11.1", optional = true } crossbeam = { version = "0.8.1", optional = true } num_cpus = "1.13.0" threadpool = "1.8.1" -heed = { git = "https://github.com/Kerollmops/heed.git", rev = "b235e9c3e9984737c967b5de1014b48f125dc28b", optional = true } +heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } [features] -default = ["conduit_bin", "backend_heed"] +default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_rocksdb = ["rocksdb"] backend_sqlite = ["sqlite"] diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 49a6052a5872bd6b6ef0b082ed9d191c122f1ec0..d5188e8b394977e6644cb68d20e5ad694e3b007f 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -15,7 +15,7 @@ RoomAliasId, RoomId, RoomVersionId, }; use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; -use tracing::info; +use tracing::{info, warn}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -233,7 +233,8 @@ pub async fn create_room_route( // 5. Events listed in initial_state for event in &body.initial_state { - let pdu_builder = PduBuilder::from(event.deserialize().map_err(|_| { + let pdu_builder = PduBuilder::from(event.deserialize().map_err(|e| { + warn!("Invalid initial state event: {:?}", e); Error::BadRequest(ErrorKind::InvalidParam, "Invalid initial state event.") })?); diff --git a/src/database.rs b/src/database.rs index 65a60f084850f898d4afb1bcb53c1b379550d6bb..db0eae81391955ff610d634805ef76b746de3fbc 100644 --- a/src/database.rs +++ b/src/database.rs @@ -189,22 +189,26 @@ pub fn try_remove(server_name: &str) -> Result<()> { } fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { - let path = Path::new(&config.database_path); - - let sled_exists = path.join("db").exists(); - let sqlite_exists = path.join("conduit.db").exists(); - // TODO: heed - if sled_exists { - if sqlite_exists { - // most likely an in-place directory, only warn - warn!("Both sled and sqlite databases are detected in database directory"); - warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") - } else { - error!("Sled database detected, conduit now uses sqlite for database operations"); - error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); - return Err(Error::bad_config( - "sled database detected, migrate to sqlite", - )); + #[cfg(feature = "backend_sqlite")] + { + let path = Path::new(&config.database_path); + + let sled_exists = path.join("db").exists(); + let sqlite_exists = path.join("conduit.db").exists(); + if sled_exists { + if sqlite_exists { + // most likely an in-place directory, only warn + warn!("Both sled and sqlite databases are detected in database directory"); + warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") + } else { + error!( + "Sled database detected, conduit now uses sqlite for database operations" + ); + error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); + return Err(Error::bad_config( + "sled database detected, migrate to sqlite", + )); + } } } @@ -298,6 +302,7 @@ pub async fn load_or_create(config: &Config) -> Result<Arc<TokioRwLock<Self>>> { }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, + roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, }, media: media::Media { mediaid_file: builder.open_tree("mediaid_file")?, @@ -420,6 +425,30 @@ pub async fn load_or_create(config: &Config) -> Result<Arc<TokioRwLock<Self>>> { println!("Migration: 3 -> 4 finished"); } + + if db.globals.database_version()? < 5 { + // Upgrade user data store + for (roomuserdataid, _) in db.account_data.roomuserdataid_accountdata.iter() { + let mut parts = roomuserdataid.split(|&b| b == 0xff); + let user_id = parts.next().unwrap(); + let room_id = parts.next().unwrap(); + let event_type = roomuserdataid.rsplit(|&b| b == 0xff).next().unwrap(); + + let mut key = room_id.to_vec(); + key.push(0xff); + key.extend_from_slice(user_id); + key.push(0xff); + key.extend_from_slice(event_type); + + db.account_data + .roomusertype_roomuserdataid + .insert(&key, &roomuserdataid)?; + } + + db.globals.bump_database_version(5)?; + + println!("Migration: 4 -> 5 finished"); + } } let guard = db.read().await; @@ -516,7 +545,7 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { futures.push( self.account_data - .roomuserdataid_accountdata + .roomusertype_roomuserdataid .watch_prefix(&roomuser_prefix), ); } @@ -526,7 +555,7 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { futures.push( self.account_data - .roomuserdataid_accountdata + .roomusertype_roomuserdataid .watch_prefix(&globaluserdata_prefix), ); diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index 61e7927be23f449ce924f0f58c725c499003cb67..0421b14001604f30660db77c73560c860b26b9df 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -27,7 +27,6 @@ pub struct EngineTree { } fn convert_error(error: heed::Error) -> Error { - panic!(error.to_string()); Error::HeedError { error: error.to_string(), } @@ -40,8 +39,8 @@ fn open(config: &Config) -> Result<Arc<Self>> { env_builder.max_readers(126); env_builder.max_dbs(128); unsafe { - env_builder.flag(heed::flags::Flags::MdbNoSync); - env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + env_builder.flag(heed::flags::Flags::MdbWriteMap); + env_builder.flag(heed::flags::Flags::MdbMapAsync); } Ok(Arc::new(Engine { @@ -79,7 +78,7 @@ fn iter_from_thread( from: Vec<u8>, backwards: bool, ) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> { - let (s, r) = bounded::<TupleOfBytes>(5); + let (s, r) = bounded::<TupleOfBytes>(100); let engine = Arc::clone(&self.engine); let lock = self.engine.iter_pool.lock().unwrap(); diff --git a/src/database/account_data.rs b/src/database/account_data.rs index 8a8d2c22b2ff4bcaa17d68e5a7fc34dd02fd6aea..e1d4c62065c74006fc9043910d4886fdc4dc5d49 100644 --- a/src/database/account_data.rs +++ b/src/database/account_data.rs @@ -12,6 +12,7 @@ pub struct AccountData { pub(super) roomuserdataid_accountdata: Arc<dyn Tree>, // RoomUserDataId = Room + User + Count + Type + pub(super) roomusertype_roomuserdataid: Arc<dyn Tree>, // RoomUserType = Room + User + Type } impl AccountData { @@ -34,15 +35,13 @@ pub fn update<T: Serialize>( prefix.extend_from_slice(&user_id.as_bytes()); prefix.push(0xff); - // Remove old entry - if let Some((old_key, _)) = self.find_event(room_id, user_id, &event_type)? { - self.roomuserdataid_accountdata.remove(&old_key)?; - } + let mut roomuserdataid = prefix.clone(); + roomuserdataid.extend_from_slice(&globals.next_count()?.to_be_bytes()); + roomuserdataid.push(0xff); + roomuserdataid.extend_from_slice(&event_type.as_bytes()); - let mut key = prefix; - key.extend_from_slice(&globals.next_count()?.to_be_bytes()); - key.push(0xff); - key.extend_from_slice(event_type.as_ref().as_bytes()); + let mut key = prefix.clone(); + key.extend_from_slice(event_type.as_bytes()); let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling if json.get("type").is_none() || json.get("content").is_none() { @@ -53,10 +52,20 @@ pub fn update<T: Serialize>( } self.roomuserdataid_accountdata.insert( - &key, + &roomuserdataid, &serde_json::to_vec(&json).expect("to_vec always works on json values"), )?; + let prev = self.roomusertype_roomuserdataid.get(&key)?; + + self.roomusertype_roomuserdataid + .insert(&key, &roomuserdataid)?; + + // Remove old entry + if let Some(prev) = prev { + self.roomuserdataid_accountdata.remove(&prev)?; + } + Ok(()) } @@ -68,9 +77,27 @@ pub fn get<T: DeserializeOwned>( user_id: &UserId, kind: EventType, ) -> Result<Option<T>> { - self.find_event(room_id, user_id, &kind)? - .map(|(_, v)| { - serde_json::from_slice(&v).map_err(|_| Error::bad_database("could not deserialize")) + let mut key = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(kind.as_ref().as_bytes()); + + self.roomusertype_roomuserdataid + .get(&key)? + .and_then(|roomuserdataid| { + self.roomuserdataid_accountdata + .get(&roomuserdataid) + .transpose() + }) + .transpose()? + .map(|data| { + serde_json::from_slice(&data) + .map_err(|_| Error::bad_database("could not deserialize")) }) .transpose() } @@ -123,37 +150,4 @@ pub fn changes_since( Ok(userdata) } - - #[tracing::instrument(skip(self, room_id, user_id, kind))] - fn find_event( - &self, - room_id: Option<&RoomId>, - user_id: &UserId, - kind: &EventType, - ) -> Result<Option<(Vec<u8>, Vec<u8>)>> { - let mut prefix = room_id - .map(|r| r.to_string()) - .unwrap_or_default() - .as_bytes() - .to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(&user_id.as_bytes()); - prefix.push(0xff); - - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - let kind = kind.clone(); - - Ok(self - .roomuserdataid_accountdata - .iter_from(&last_possible_key, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .find(move |(k, _)| { - k.rsplit(|&b| b == 0xff) - .next() - .map(|current_event_type| current_event_type == kind.as_ref().as_bytes()) - .unwrap_or(false) - })) - } } diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 9e81dd16973158d1e44fe1d6ac42e483eda55af3..3df9ed4f8944d39ffe18e77221161764a05a82d1 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -9,10 +9,11 @@ }, IncomingResponse, OutgoingRequest, SendAccessToken, }, - events::{room::power_levels::PowerLevelsEventContent, EventType}, + events::{room::power_levels::PowerLevelsEventContent, AnySyncRoomEvent, EventType}, identifiers::RoomName, push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak}, - uint, UInt, UserId, + serde::Raw, + uint, RoomId, UInt, UserId, }; use tracing::{error, info, warn}; @@ -172,7 +173,24 @@ pub async fn send_push_notice( let mut notify = None; let mut tweaks = Vec::new(); - for action in get_actions(user, &ruleset, pdu, db)? { + let power_levels: PowerLevelsEventContent = db + .rooms + .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? + .map(|ev| { + serde_json::from_value(ev.content.clone()) + .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) + }) + .transpose()? + .unwrap_or_default(); + + for action in get_actions( + user, + &ruleset, + &power_levels, + &pdu.to_sync_room_event(), + &pdu.room_id, + db, + )? { let n = match action { Action::DontNotify => false, // TODO: Implement proper support for coalesce @@ -204,32 +222,24 @@ pub async fn send_push_notice( pub fn get_actions<'a>( user: &UserId, ruleset: &'a Ruleset, - pdu: &PduEvent, + power_levels: &PowerLevelsEventContent, + pdu: &Raw<AnySyncRoomEvent>, + room_id: &RoomId, db: &Database, ) -> Result<&'a [Action]> { - let power_levels: PowerLevelsEventContent = db - .rooms - .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? - .map(|ev| { - serde_json::from_value(ev.content.clone()) - .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) - }) - .transpose()? - .unwrap_or_default(); - let ctx = PushConditionRoomCtx { - room_id: pdu.room_id.clone(), + room_id: room_id.clone(), member_count: 10_u32.into(), // TODO: get member count efficiently user_display_name: db .users .displayname(&user)? .unwrap_or_else(|| user.localpart().to_owned()), - users_power_levels: power_levels.users, + users_power_levels: power_levels.users.clone(), default_power_level: power_levels.users_default, - notification_power_levels: power_levels.notifications, + notification_power_levels: power_levels.notifications.clone(), }; - Ok(ruleset.get_actions(&pdu.to_sync_room_event(), &ctx)) + Ok(ruleset.get_actions(pdu, &ctx)) } #[tracing::instrument(skip(unread, pusher, tweaks, event, db))] diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 8ada87f9c703510be84e426ebda964d55ca4f679..79bb059d1c9afa38c1083e9f0d7d26e49cd01355 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -12,7 +12,9 @@ api::{client::error::ErrorKind, federation}, events::{ ignored_user_list, push_rules, - room::{create::CreateEventContent, member, message}, + room::{ + create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent, + }, AnyStrippedStateEvent, AnySyncStateEvent, EventType, }, push::{self, Action, Tweak}, @@ -760,6 +762,18 @@ pub fn append_pdu( .insert(pdu.event_id.as_bytes(), &pdu_id)?; // See if the event matches any known pushers + let power_levels: PowerLevelsEventContent = db + .rooms + .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? + .map(|ev| { + serde_json::from_value(ev.content.clone()) + .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) + }) + .transpose()? + .unwrap_or_default(); + + let sync_pdu = pdu.to_sync_room_event(); + for user in db .rooms .room_members(&pdu.room_id) @@ -781,7 +795,14 @@ pub fn append_pdu( let mut highlight = false; let mut notify = false; - for action in pusher::get_actions(&user, &rules_for_user, pdu, db)? { + for action in pusher::get_actions( + &user, + &rules_for_user, + &power_levels, + &sync_pdu, + &pdu.room_id, + db, + )? { match action { Action::DontNotify => notify = false, // TODO: Implement proper support for coalesce