From 710a6b5c6f488cadaafa9ff1b3862c6f70486993 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz> Date: Fri, 22 Mar 2024 19:47:56 -0400 Subject: [PATCH] refactor: remove previous typing implementation and add sync wakeup for new one Signed-off-by: strawberry <strawberry@puppygock.gay> --- src/database/key_value/globals.rs | 4 +- src/database/key_value/rooms/edus/mod.rs | 1 - src/database/key_value/rooms/edus/typing.rs | 115 -------------------- src/database/mod.rs | 4 - src/service/mod.rs | 4 +- src/service/rooms/edus/mod.rs | 2 +- src/service/rooms/edus/typing/data.rs | 23 ---- src/service/rooms/edus/typing/mod.rs | 21 +++- 8 files changed, 22 insertions(+), 152 deletions(-) delete mode 100644 src/database/key_value/rooms/edus/typing.rs delete mode 100644 src/service/rooms/edus/typing/data.rs diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 3b5a47306..fd0978d57 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -85,7 +85,9 @@ async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> { futures.push(self.pduid_pdu.watch_prefix(&short_roomid)); // EDUs - futures.push(self.roomid_lasttypingupdate.watch_prefix(&roomid_bytes)); + futures.push(Box::pin(async move { + let _result = services().rooms.edus.typing.wait_for_update(&room_id).await; + })); futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix)); diff --git a/src/database/key_value/rooms/edus/mod.rs b/src/database/key_value/rooms/edus/mod.rs index 6c6529184..7abf946f1 100644 --- a/src/database/key_value/rooms/edus/mod.rs +++ b/src/database/key_value/rooms/edus/mod.rs @@ -1,6 +1,5 @@ mod presence; mod read_receipt; -mod typing; use crate::{database::KeyValueDatabase, service}; diff --git a/src/database/key_value/rooms/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs deleted file mode 100644 index e1724aa79..000000000 --- a/src/database/key_value/rooms/edus/typing.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{collections::HashSet, mem}; - -use ruma::{OwnedUserId, RoomId, UserId}; - -use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; - -impl service::rooms::edus::typing::Data for KeyValueDatabase { - fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - - let count = services().globals.next_count()?.to_be_bytes(); - - let mut room_typing_id = prefix; - room_typing_id.extend_from_slice(&timeout.to_be_bytes()); - room_typing_id.push(0xFF); - room_typing_id.extend_from_slice(&count); - - self.typingid_userid.insert(&room_typing_id, user_id.as_bytes())?; - - self.roomid_lasttypingupdate.insert(room_id.as_bytes(), &count)?; - - Ok(()) - } - - fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - - let user_id = user_id.to_string(); - - let mut found_outdated = false; - - // Maybe there are multiple ones from calling roomtyping_add multiple times - for outdated_edu in self.typingid_userid.scan_prefix(prefix).filter(|(_, v)| &**v == user_id.as_bytes()) { - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate.insert(room_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - - fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xFF) - .nth(1) - .ok_or_else(|| Error::bad_database("RoomTyping has invalid timestamp or delimiters."))?[0..mem::size_of::<u64>()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(std::result::Result::ok) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate.insert(room_id.as_bytes(), &services().globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - - fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { - Ok(self - .roomid_lasttypingupdate - .get(room_id.as_bytes())? - .map(|bytes| { - utils::u64_from_bytes(&bytes) - .map_err(|_| Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.")) - }) - .transpose()? - .unwrap_or(0)) - } - - fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<OwnedUserId>> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xFF); - - let mut user_ids = HashSet::new(); - - for (_, user_id) in self.typingid_userid.scan_prefix(prefix) { - let user_id = UserId::parse( - utils::string_from_bytes(&user_id) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid unicode."))?, - ) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; - - user_ids.insert(user_id); - } - - Ok(user_ids) - } -} diff --git a/src/database/mod.rs b/src/database/mod.rs index 886419358..b9d8fb68b 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -77,8 +77,6 @@ pub struct KeyValueDatabase { pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count - pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count - pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count pub(super) roomuserid_presence: Arc<dyn KvTree>, //pub rooms: rooms::Rooms, @@ -303,8 +301,6 @@ pub async fn load_or_create(config: Config) -> Result<()> { readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt roomuserid_lastprivatereadupdate: builder.open_tree("roomuserid_lastprivatereadupdate")?, - typingid_userid: builder.open_tree("typingid_userid")?, - roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?, roomuserid_presence: builder.open_tree("roomuserid_presence")?, pduid_pdu: builder.open_tree("pduid_pdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, diff --git a/src/service/mod.rs b/src/service/mod.rs index 8a6afe509..dad17f581 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -4,7 +4,7 @@ }; use lru_cache::LruCache; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{broadcast, Mutex, RwLock}; use crate::{Config, Result}; @@ -77,9 +77,9 @@ pub fn build< db, }, typing: rooms::edus::typing::Service { - db, typing: RwLock::new(BTreeMap::new()), last_typing_update: RwLock::new(BTreeMap::new()), + typing_update_sender: broadcast::channel(100).0, }, }, event_handler: rooms::event_handler::Service, diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs index 593265cb7..6bedd16d5 100644 --- a/src/service/rooms/edus/mod.rs +++ b/src/service/rooms/edus/mod.rs @@ -2,7 +2,7 @@ pub mod read_receipt; pub mod typing; -pub trait Data: presence::Data + read_receipt::Data + typing::Data + 'static {} +pub trait Data: presence::Data + read_receipt::Data + 'static {} pub struct Service { pub presence: presence::Service, diff --git a/src/service/rooms/edus/typing/data.rs b/src/service/rooms/edus/typing/data.rs deleted file mode 100644 index cadc30eee..000000000 --- a/src/service/rooms/edus/typing/data.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::collections::HashSet; - -use ruma::{OwnedUserId, RoomId, UserId}; - -use crate::Result; - -pub trait Data: Send + Sync { - /// Sets a user as typing until the timeout timestamp is reached or - /// roomtyping_remove is called. - fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()>; - - /// Removes a user from typing before the timeout is reached. - fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; - - /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain(&self, room_id: &RoomId) -> Result<()>; - - /// Returns the count of the last typing update in this room. - fn last_typing_update(&self, room_id: &RoomId) -> Result<u64>; - - /// Returns all user ids currently typing. - fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<OwnedUserId>>; -} diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index 88dd549c8..37d90bee0 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -1,18 +1,15 @@ -mod data; - use std::collections::BTreeMap; -pub use data::Data; use ruma::{events::SyncEphemeralRoomEvent, OwnedRoomId, OwnedUserId, RoomId, UserId}; -use tokio::sync::RwLock; +use tokio::sync::{broadcast, RwLock}; use crate::{services, utils, Result}; pub struct Service { - pub db: &'static dyn Data, pub typing: RwLock<BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, u64>>>, // u64 is unix timestamp of timeout pub last_typing_update: RwLock<BTreeMap<OwnedRoomId, u64>>, /* timestamp of the last change to typing * users */ + pub typing_update_sender: broadcast::Sender<OwnedRoomId>, } impl Service { @@ -21,6 +18,7 @@ impl Service { pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { self.typing.write().await.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), timeout); self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + let _ = self.typing_update_sender.send(room_id.to_owned()); Ok(()) } @@ -28,6 +26,18 @@ pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { self.typing.write().await.entry(room_id.to_owned()).or_default().remove(user_id); self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + let _ = self.typing_update_sender.send(room_id.to_owned()); + Ok(()) + } + + pub async fn wait_for_update(&self, room_id: &RoomId) -> Result<()> { + let mut receiver = self.typing_update_sender.subscribe(); + while let Ok(next) = receiver.recv().await { + if next == room_id { + break; + } + } + Ok(()) } @@ -55,6 +65,7 @@ async fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { room.remove(&user); } self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + let _ = self.typing_update_sender.send(room_id.to_owned()); } Ok(()) -- GitLab