From 019a82850d7fbc0df7e273d728ddbd03523fb4ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz> Date: Fri, 8 Mar 2024 10:05:48 -0500 Subject: [PATCH] improvement: do not save typing edus in db Signed-off-by: strawberry <strawberry@puppygock.gay> --- src/api/client_server/sync.rs | 4 +- src/api/client_server/typing.rs | 17 ++++--- src/api/server_server.rs | 13 +++--- src/service/mod.rs | 2 + src/service/rooms/edus/typing/mod.rs | 69 ++++++++++++++++++++++------ 5 files changed, 76 insertions(+), 29 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 38e1cd3dc..c8bc61c03 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -861,10 +861,10 @@ async fn load_joined_room( .map(|(_, _, v)| v) .collect(); - if services().rooms.edus.typing.last_typing_update(room_id)? > since { + if services().rooms.edus.typing.last_typing_update(room_id).await? > since { edus.push( serde_json::from_str( - &serde_json::to_string(&services().rooms.edus.typing.typings_all(room_id)?) + &serde_json::to_string(&services().rooms.edus.typing.typings_all(room_id).await?) .expect("event is valid, we just created it"), ) .expect("event is valid, we just created it"), diff --git a/src/api/client_server/typing.rs b/src/api/client_server/typing.rs index a51c3b880..c8cbbd6a8 100644 --- a/src/api/client_server/typing.rs +++ b/src/api/client_server/typing.rs @@ -17,13 +17,18 @@ pub async fn create_typing_event_route( } if let Typing::Yes(duration) = body.state { - services().rooms.edus.typing.typing_add( - sender_user, - &body.room_id, - duration.as_millis() as u64 + utils::millis_since_unix_epoch(), - )?; + services() + .rooms + .edus + .typing + .typing_add( + sender_user, + &body.room_id, + duration.as_millis() as u64 + utils::millis_since_unix_epoch(), + ) + .await?; } else { - services().rooms.edus.typing.typing_remove(sender_user, &body.room_id)?; + services().rooms.edus.typing.typing_remove(sender_user, &body.room_id).await?; } Ok(create_typing_event::v3::Response {}) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 1413f55b9..508c5b775 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -854,13 +854,14 @@ pub async fn send_transaction_message_route( Edu::Typing(typing) => { if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? { if typing.typing { - services().rooms.edus.typing.typing_add( - &typing.user_id, - &typing.room_id, - 3000 + utils::millis_since_unix_epoch(), - )?; + services() + .rooms + .edus + .typing + .typing_add(&typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch()) + .await?; } else { - services().rooms.edus.typing.typing_remove(&typing.user_id, &typing.room_id)?; + services().rooms.edus.typing.typing_remove(&typing.user_id, &typing.room_id).await?; } } }, diff --git a/src/service/mod.rs b/src/service/mod.rs index 117af89ae..b4e7bb4e1 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -80,6 +80,8 @@ pub fn build< }, typing: rooms::edus::typing::Service { db, + typing: RwLock::new(BTreeMap::new()), + last_typing_update: RwLock::new(BTreeMap::new()), }, }, event_handler: rooms::event_handler::Service, diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index 59231eb42..88dd549c8 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -1,45 +1,84 @@ mod data; +use std::collections::BTreeMap; + pub use data::Data; -use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId}; +use ruma::{events::SyncEphemeralRoomEvent, OwnedRoomId, OwnedUserId, RoomId, UserId}; +use tokio::sync::RwLock; -use crate::Result; +use crate::{services, utils, Result}; pub struct Service { pub db: &'static dyn Data, + pub typing: RwLock<BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, u64>>>, // u64 is unix timestamp of timeout + pub last_typing_update: RwLock<BTreeMap<OwnedRoomId, u64>>, /* timestamp of the last change to typing + * users */ } impl Service { /// Sets a user as typing until the timeout timestamp is reached or /// roomtyping_remove is called. - pub fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) + pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { + self.typing.write().await.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), timeout); + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + Ok(()) } /// Removes a user from typing before the timeout is reached. - pub fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { - self.db.typing_remove(user_id, room_id) + pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + self.typing.write().await.entry(room_id.to_owned()).or_default().remove(user_id); + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + Ok(()) } /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { self.db.typings_maintain(room_id) } + async fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { + let current_timestamp = utils::millis_since_unix_epoch(); + let mut removable = Vec::new(); + { + let typing = self.typing.read().await; + let Some(room) = typing.get(room_id) else { + return Ok(()); + }; + for (user, timeout) in room { + if *timeout < current_timestamp { + removable.push(user.clone()); + } + } + drop(typing) + }; - /// Returns the count of the last typing update in this room. - pub fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { - self.typings_maintain(room_id)?; + if !removable.is_empty() { + let typing = &mut self.typing.write().await; + let room = typing.entry(room_id.to_owned()).or_default(); + for user in removable { + room.remove(&user); + } + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + } + + Ok(()) + } - self.db.last_typing_update(room_id) + /// Returns the count of the last typing update in this room. + pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> { + self.typings_maintain(room_id).await?; + Ok(self.last_typing_update.read().await.get(room_id).copied().unwrap_or(0)) } /// Returns a new typing EDU. - pub fn typings_all( + pub async fn typings_all( &self, room_id: &RoomId, ) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> { - let user_ids = self.db.typings_all(room_id)?; - Ok(SyncEphemeralRoomEvent { content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), + user_ids: self + .typing + .read() + .await + .get(room_id) + .map(|m| m.keys().cloned().collect()) + .unwrap_or_default(), }, }) } -- GitLab