diff --git a/src/database/kvdatabase.rs b/src/database/kvdatabase.rs index ddbb8e922c5b2553991f6af1e81130bd8ef7d503..906712dc3bbea5956478ccd61534b7d5f3bfad0f 100644 --- a/src/database/kvdatabase.rs +++ b/src/database/kvdatabase.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, path::Path, sync::{Arc, Mutex, RwLock}, }; @@ -150,7 +150,6 @@ pub struct KeyValueDatabase { pub senderkey_pusher: Arc<dyn KvTree>, pub auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[u64]>>>, - pub our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>, pub appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, pub lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, } @@ -265,7 +264,6 @@ pub async fn load_or_create(server: &Arc<Server>) -> Result<KeyValueDatabase> { auth_chain_cache: Mutex::new(LruCache::new( (f64::from(config.auth_chain_cache_capacity) * config.conduit_cache_capacity_modifier) as usize, )), - our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()), }) diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 5b26598b80e137877562a4c66448cb17821b5373..354ff2f7b87129cb4bbc4b2656dceee44034570c 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -177,19 +177,16 @@ fn cork_and_flush(&self) -> Cork { Cork::new(&self.db, true, false) } fn memory_usage(&self) -> String { let auth_chain_cache = self.auth_chain_cache.lock().unwrap().len(); - let our_real_users_cache = self.our_real_users_cache.read().unwrap().len(); let appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().len(); let lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().len(); let max_auth_chain_cache = self.auth_chain_cache.lock().unwrap().capacity(); - let max_our_real_users_cache = self.our_real_users_cache.read().unwrap().capacity(); let max_appservice_in_room_cache = self.appservice_in_room_cache.read().unwrap().capacity(); let max_lasttimelinecount_cache = self.lasttimelinecount_cache.lock().unwrap().capacity(); format!( "\ auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache} -our_real_users_cache: {our_real_users_cache} / {max_our_real_users_cache} appservice_in_room_cache: {appservice_in_room_cache} / {max_appservice_in_room_cache} lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n {}", @@ -203,14 +200,10 @@ fn clear_caches(&self, amount: u32) { *c = LruCache::new(c.capacity()); } if amount > 2 { - let c = &mut *self.our_real_users_cache.write().unwrap(); - *c = HashMap::new(); - } - if amount > 3 { let c = &mut *self.appservice_in_room_cache.write().unwrap(); *c = HashMap::new(); } - if amount > 4 { + if amount > 3 { let c = &mut *self.lasttimelinecount_cache.lock().unwrap(); *c = HashMap::new(); } diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index 08e93d894055c881fd99930c3d3824a7f3184372..5694b22cf6fe5267745f4d952c7362e69af0b08c 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::collections::HashSet; use itertools::Itertools; use ruma::{ @@ -29,8 +29,6 @@ fn mark_as_invited( fn update_joined_count(&self, room_id: &RoomId) -> Result<()>; - fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>>; - fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool>; /// Makes a user forget a room. @@ -48,6 +46,10 @@ fn mark_as_invited( /// Returns an iterator over all joined members of a room. fn room_members<'a>(&'a self, room_id: &RoomId) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>; + /// Returns a vec of all the users joined in a room who are active + /// (not guests, not deactivated users) + fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId>; + fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>>; fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>>; @@ -225,13 +227,9 @@ fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { let mut joinedcount = 0_u64; let mut invitedcount = 0_u64; let mut joined_servers = HashSet::new(); - let mut real_users = HashSet::new(); for joined in self.room_members(room_id).filter_map(Result::ok) { joined_servers.insert(joined.server_name().to_owned()); - if user_is_local(&joined) && !services().users.is_deactivated(&joined).unwrap_or(true) { - real_users.insert(joined); - } joinedcount = joinedcount.saturating_add(1); } @@ -245,11 +243,6 @@ fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { self.roomid_invitedcount .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; - self.our_real_users_cache - .write() - .unwrap() - .insert(room_id.to_owned(), Arc::new(real_users)); - for old_joined_server in self.room_servers(room_id).filter_map(Result::ok) { if !joined_servers.remove(&old_joined_server) { // Server not in room anymore @@ -288,28 +281,6 @@ fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { Ok(()) } - #[tracing::instrument(skip(self, room_id))] - fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>> { - let maybe = self - .our_real_users_cache - .read() - .unwrap() - .get(room_id) - .cloned(); - if let Some(users) = maybe { - Ok(users) - } else { - self.update_joined_count(room_id)?; - Ok(Arc::clone( - self.our_real_users_cache - .read() - .unwrap() - .get(room_id) - .unwrap(), - )) - } - } - #[tracing::instrument(skip(self, room_id, appservice))] fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> { let maybe = self @@ -429,6 +400,16 @@ fn room_members<'a>(&'a self, room_id: &RoomId) -> Box<dyn Iterator<Item = Resul })) } + /// Returns a vec of all our local users joined in a room who are active + /// (not guests / not deactivated users) + #[tracing::instrument(skip(self))] + fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId> { + self.room_members(room_id) + .filter_map(Result::ok) + .filter(|user| user_is_local(user) && !services().users.is_deactivated(user).unwrap_or(true)) + .collect_vec() + } + /// Returns the number of users which are currently in a room #[tracing::instrument(skip(self))] fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> { diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index c9ac278c391c48927988401bb0155ee5fabf2154..ab59c0ade0885cbf3bfc7f8ec7b0f6ae0471e163 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use data::Data; use itertools::Itertools; @@ -212,11 +212,6 @@ pub fn update_membership( #[tracing::instrument(skip(self, room_id))] pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { self.db.update_joined_count(room_id) } - #[tracing::instrument(skip(self, room_id))] - pub fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>> { - self.db.get_our_real_users(room_id) - } - #[tracing::instrument(skip(self, room_id, appservice))] pub fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> Result<bool> { self.db.appservice_in_room(room_id, appservice) @@ -278,6 +273,13 @@ pub fn room_members<'a>(&'a self, room_id: &RoomId) -> impl Iterator<Item = Resu #[tracing::instrument(skip(self))] pub fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_joined_count(room_id) } + #[tracing::instrument(skip(self))] + /// Returns a vec of all the users joined in a room who are active + /// (not guests, not deactivated users) + pub fn active_local_users_in_room(&self, room_id: &RoomId) -> Vec<OwnedUserId> { + self.db.active_local_users_in_room(room_id) + } + #[tracing::instrument(skip(self))] pub fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>> { self.db.room_invited_count(room_id) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index b45b2a3be8a967a89f8c0bdb6a43dd11171234c7..a94828534b53abeae38da713c6a5df8b3eccf6d7 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -309,21 +309,19 @@ pub async fn append_pdu( let mut push_target = services() .rooms .state_cache - .get_our_real_users(&pdu.room_id)?; + .active_local_users_in_room(&pdu.room_id); if pdu.kind == TimelineEventType::RoomMember { if let Some(state_key) = &pdu.state_key { let target_user_id = UserId::parse(state_key.clone()).expect("This state_key was previously validated"); if !push_target.contains(&target_user_id) { - let mut target = push_target.as_ref().clone(); - target.insert(target_user_id); - push_target = Arc::new(target); + push_target.push(target_user_id); } } } - for user in push_target.iter() { + for user in &push_target { // Don't notify the user of their own events if user == &pdu.sender { continue; diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index c4de3e60d13828109fc925d386d414f0f33299bb..9192fdca4e8bc3bd5e7c334f33c66aa1ee485699 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -485,7 +485,7 @@ async fn send_events_dest_push( .ok_or_else(|| { ( dest.clone(), - Error::bad_database("[Push] Event in servernamevent_datas not found in db."), + Error::bad_database("[Push] Event in servernameevent_data not found in db."), ) })?, ); @@ -567,7 +567,7 @@ async fn send_events_dest_normal( ); ( dest.clone(), - Error::bad_database("[Normal] Event in servernamevent_datas not found in db."), + Error::bad_database("[Normal] Event in servernameevent_data not found in db."), ) })?, );