diff --git a/src/api/client_server/media.rs b/src/api/client_server/media.rs index 1e753a7a5572b683882ff64e8c3e8881e6948dfd..2c34a24ec0f59986d4394aac22f60c24e672be02 100644 --- a/src/api/client_server/media.rs +++ b/src/api/client_server/media.rs @@ -50,8 +50,10 @@ pub async fn create_content_route( ) .await?; + let content_uri = mxc.into(); + Ok(create_content::v3::Response { - content_uri: mxc.try_into()?, + content_uri, blurhash: None, }) } diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index e2a14c585997b1862ac8c7b238e0684f4afdcb86..82ed9a2f25ebebef53cadbfd0253e6d2201bdd8f 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -722,7 +722,8 @@ async fn join_room_by_id_helper( } info!("Running send_join auth check"); - if !state_res::event_auth::auth_check( + + let auth_check = state_res::event_auth::auth_check( &state_res::RoomVersion::new(&room_version_id).expect("room version is supported"), &parsed_join_pdu, None::<PduEvent>, // TODO: third party invite @@ -745,7 +746,9 @@ async fn join_room_by_id_helper( .map_err(|e| { warn!("Auth check failed: {e}"); Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed") - })? { + })?; + + if !auth_check { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Auth check failed", diff --git a/src/api/server_server.rs b/src/api/server_server.rs index a361c848b6db799233984d0d2ce2a705671462e9..1dffb4811fe5a25f354687e4d17395d7159f46b3 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -572,7 +572,7 @@ pub async fn get_server_version_route( Ok(get_server_version::v1::Response { server: Some(get_server_version::v1::Server { - name: Some("cowonduit".to_owned()), + name: Some(env!("CARGO_CRATE_NAME").to_owned()), version: Some(env!("CARGO_PKG_VERSION").to_owned()), }), }) diff --git a/src/config/mod.rs b/src/config/mod.rs index a26b471422a04053290709f5d1b9b5e7a29a75c7..758ad180c215b4f53bb21f747bbbf02c304768e7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -122,6 +122,7 @@ pub struct TlsConfig { const DEPRECATED_KEYS: &[&str] = &["cache_capacity"]; impl Config { + /// Iterates over all the keys in the config file and warns if there is a deprecated key specified pub fn warn_deprecated(&self) { let mut was_deprecated = false; for key in self @@ -139,16 +140,17 @@ pub fn warn_deprecated(&self) { } /// Checks the presence of the `address` and `unix_socket_path` keys in the raw_config, exiting the process if both keys were detected. - pub fn error_dual_listening(&self, raw_config: Figment) -> Result<(), ()> { + pub fn is_dual_listening(&self, raw_config: Figment) -> bool { let check_address = raw_config.find_value("address"); let check_unix_socket = raw_config.find_value("unix_socket_path"); + // are the check_address and check_unix_socket keys both Ok (specified) at the same time? if check_address.is_ok() && check_unix_socket.is_ok() { error!("TOML keys \"address\" and \"unix_socket_path\" were both defined. Please specify only one option."); - return Err(()); + return true; } - Ok(()) + false } } diff --git a/src/database/abstraction/watchers.rs b/src/database/abstraction/watchers.rs index 55cb60b36443fa624d6600015fca44776823479b..07087c1f1e680a075f1c7c4f215bb8a1b79d3bf7 100644 --- a/src/database/abstraction/watchers.rs +++ b/src/database/abstraction/watchers.rs @@ -6,9 +6,11 @@ }; use tokio::sync::watch; +type Watcher = RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>; + #[derive(Default)] pub(super) struct Watchers { - watchers: RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>, + watchers: Watcher, } impl Watchers { diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index e1eef9663f324e045c07af3bef91ca2ce7625a6d..daa63901a00faeed9e8395aec3d61b581a8e4e46 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -6,6 +6,7 @@ serde::Raw, RoomId, UserId, }; +use tracing::warn; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; @@ -123,13 +124,15 @@ fn changes_since( .take_while(move |(k, _)| k.starts_with(&prefix)) .map(|(k, v)| { Ok::<_, Error>(( - RoomAccountDataEventType::try_from( + RoomAccountDataEventType::from( utils::string_from_bytes(k.rsplit(|&b| b == 0xff).next().ok_or_else( || Error::bad_database("RoomUserData ID in db is invalid."), )?) - .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, - ) - .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, + .map_err(|e| { + warn!("RoomUserData ID in database is invalid: {}", e); + Error::bad_database("RoomUserData ID in db is invalid.") + })?, + ), serde_json::from_slice::<Raw<AnyEphemeralRoomEvent>>(&v).map_err(|_| { Error::bad_database("Database contains invalid account data.") })?, diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs index 0641f9d8df3a86af9c742fce2279479141859440..ebfba814b13a59ca31683a438cbb64dbbf95c596 100644 --- a/src/database/key_value/rooms/pdu_metadata.rs +++ b/src/database/key_value/rooms/pdu_metadata.rs @@ -4,8 +4,11 @@ use crate::{ database::KeyValueDatabase, - service::{self, rooms::timeline::PduCount}, - services, utils, Error, PduEvent, Result, + service::{ + self, + rooms::timeline::{data::PduData, PduCount}, + }, + services, utils, Error, Result, }; impl service::rooms::pdu_metadata::Data for KeyValueDatabase { @@ -22,7 +25,7 @@ fn relations_until<'a>( shortroomid: u64, target: u64, until: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { + ) -> PduData<'a> { let prefix = target.to_be_bytes().to_vec(); let mut current = prefix.clone(); diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index ad573f065caf51d03eec4f8578a65ae5bebf1263..9aceaa63cda700e00ccdc3a9453a5aabdb2d148e 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -2,6 +2,8 @@ use crate::{database::KeyValueDatabase, service, services, utils, Result}; +type SearchPdusResult<'a> = Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>; + impl service::rooms::search::Data for KeyValueDatabase { fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { let mut batch = message_body @@ -20,11 +22,7 @@ fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> self.tokenids.insert_batch(&mut batch) } - fn search_pdus<'a>( - &'a self, - room_id: &RoomId, - search_string: &str, - ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>> { + fn search_pdus<'a>(&'a self, room_id: &RoomId, search_string: &str) -> SearchPdusResult<'a> { let prefix = services() .rooms .short diff --git a/src/database/key_value/rooms/short.rs b/src/database/key_value/rooms/short.rs index c02231706010e48523e22de3112a939739150571..502557a0d3106d41709b0455540d07ab9f9c6f34 100644 --- a/src/database/key_value/rooms/short.rs +++ b/src/database/key_value/rooms/short.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use ruma::{events::StateEventType, EventId, RoomId}; +use tracing::warn; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; @@ -157,10 +158,10 @@ fn get_statekey_from_short(&self, shortstatekey: u64) -> Result<(StateEventType, .ok_or_else(|| Error::bad_database("Invalid statekey in shortstatekey_statekey."))?; let event_type = - StateEventType::try_from(utils::string_from_bytes(eventtype_bytes).map_err(|_| { - Error::bad_database("Event type in shortstatekey_statekey is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("Event type in shortstatekey_statekey is invalid."))?; + StateEventType::from(utils::string_from_bytes(eventtype_bytes).map_err(|e| { + warn!("Event type in shortstatekey_statekey is invalid: {}", e); + Error::bad_database("Event type in shortstatekey_statekey is invalid.") + })?); let state_key = utils::string_from_bytes(statekey_bytes).map_err(|_| { Error::bad_database("Statekey in shortstatekey_statekey is invalid unicode.") diff --git a/src/database/key_value/rooms/state_cache.rs b/src/database/key_value/rooms/state_cache.rs index d0ea0c2cfaec624e55f5dc0c3be9dcf51d09a372..7894b0f3ef9c9d4f48624b2a53a2845cc03c3117 100644 --- a/src/database/key_value/rooms/state_cache.rs +++ b/src/database/key_value/rooms/state_cache.rs @@ -9,6 +9,12 @@ use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; +type StrippedStateEventIter<'a> = + Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>; + +type AnySyncStateEventIter<'a> = + Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>; + impl service::rooms::state_cache::Data for KeyValueDatabase { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { let mut userroom_id = user_id.as_bytes().to_vec(); @@ -472,10 +478,7 @@ fn rooms_joined<'a>( /// Returns an iterator over all rooms a user was invited to. #[tracing::instrument(skip(self))] - fn rooms_invited<'a>( - &'a self, - user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a> { + fn rooms_invited<'a>(&'a self, user_id: &UserId) -> StrippedStateEventIter<'a> { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); @@ -550,10 +553,7 @@ fn left_state( /// Returns an iterator over all rooms a user left. #[tracing::instrument(skip(self))] - fn rooms_left<'a>( - &'a self, - user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a> { + fn rooms_left<'a>(&'a self, user_id: &UserId) -> AnySyncStateEventIter<'a> { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/key_value/rooms/threads.rs b/src/database/key_value/rooms/threads.rs index 5e3dc9707d4c787a18447a85e2dd9fdd2b588c89..02176feedbd862e09fd16ed5b14b2610db728cfb 100644 --- a/src/database/key_value/rooms/threads.rs +++ b/src/database/key_value/rooms/threads.rs @@ -4,6 +4,8 @@ use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; +type PduEventIterResult<'a> = Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>; + impl service::rooms::threads::Data for KeyValueDatabase { fn threads_until<'a>( &'a self, @@ -11,7 +13,7 @@ fn threads_until<'a>( room_id: &'a RoomId, until: u64, _include: &'a IncludeThreads, - ) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> { + ) -> PduEventIterResult<'a> { let prefix = services() .rooms .short diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index f322d4304e8a5ce51874143f5dbfb492321ba3fe..d097aaf18e634a559c8e1b7e92a9ffeab790b70c 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -5,7 +5,11 @@ }; use tracing::error; -use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, rooms::timeline::data::PduData}, + services, utils, Error, PduEvent, Result, +}; use service::rooms::timeline::PduCount; @@ -228,7 +232,7 @@ fn pdus_until<'a>( user_id: &UserId, room_id: &RoomId, until: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { + ) -> PduData<'a> { let (prefix, current) = count_to_id(room_id, until, 1, true)?; let user_id = user_id.to_owned(); @@ -250,12 +254,7 @@ fn pdus_until<'a>( )) } - fn pdus_after<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - from: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> { + fn pdus_after<'a>(&'a self, user_id: &UserId, room_id: &RoomId, from: PduCount) -> PduData<'a> { let (prefix, current) = count_to_id(room_id, from, 1, false)?; let user_id = user_id.to_owned(); diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 2b09d684ae7f8e94e6f685c2c70553dc4ae071a9..76e41f1bc024e4b1531548f61a2a9d8fb8367fcf 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -146,10 +146,12 @@ fn avatar_url(&self, user_id: &UserId) -> Result<Option<OwnedMxcUri>> { self.userid_avatarurl .get(user_id.as_bytes())? .map(|bytes| { - let s = utils::string_from_bytes(&bytes) - .map_err(|_| Error::bad_database("Avatar URL in db is invalid."))?; - s.try_into() - .map_err(|_| Error::bad_database("Avatar URL in db is invalid.")) + let s_bytes = utils::string_from_bytes(&bytes).map_err(|e| { + warn!("Avatar URL in db is invalid: {}", e); + Error::bad_database("Avatar URL in db is invalid.") + })?; + let mxc_uri: OwnedMxcUri = s_bytes.into(); + Ok(mxc_uri) }) .transpose() } diff --git a/src/main.rs b/src/main.rs index 88acf002cafbb8b569698d737d2d07cc381b8304..6867429306bcf0f65d92045a87fe3713a6cbc715 100644 --- a/src/main.rs +++ b/src/main.rs @@ -145,7 +145,7 @@ async fn main() { maximize_fd_limit().expect("should be able to increase the soft limit to the hard limit"); config.warn_deprecated(); - if config.error_dual_listening(raw_config).is_err() { + if config.is_dual_listening(raw_config) { return; }; @@ -542,7 +542,7 @@ async fn initial_sync(_uri: Uri) -> impl IntoResponse { } async fn it_works() -> &'static str { - "hewwo from cowonduit woof!" + "hewwo from conduwuit woof!" } /* diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index bb202c67f11cbe406ebc6f376e45cb80f48eff6b..d41e7e6a4b9c2b2a79dfcf1c9a7ea621d834fe49 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -109,13 +109,15 @@ fn default() -> Self { } } +type DnsOverrides = Box<dyn Fn(&str) -> Option<SocketAddr> + Send + Sync>; + pub struct Resolver { inner: GaiResolver, - overrides: Box<dyn Fn(&str) -> Option<SocketAddr> + Send + Sync>, + overrides: DnsOverrides, } impl Resolver { - pub fn new(overrides: Box<dyn Fn(&str) -> Option<SocketAddr> + Send + Sync>) -> Resolver { + pub fn new(overrides: DnsOverrides) -> Resolver { Resolver { inner: GaiResolver::new(), overrides, diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs index a183d196f0de7e7bb262f3d160a4f212e87ec870..29b4a986228d8fde18a88231104bcf26fd29ae1a 100644 --- a/src/service/rooms/edus/read_receipt/data.rs +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -1,5 +1,12 @@ use crate::Result; -use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId}; +use ruma::{ + events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent}, + serde::Raw, + OwnedUserId, RoomId, UserId, +}; + +type AnySyncEphemeralRoomEventIter<'a> = + Box<dyn Iterator<Item = Result<(OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>)>> + 'a>; pub trait Data: Send + Sync { /// Replaces the previous read receipt. @@ -11,19 +18,8 @@ fn readreceipt_update( ) -> Result<()>; /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - fn readreceipts_since<'a>( - &'a self, - room_id: &RoomId, - since: u64, - ) -> Box< - dyn Iterator< - Item = Result<( - OwnedUserId, - u64, - Raw<ruma::events::AnySyncEphemeralRoomEvent>, - )>, - > + 'a, - >; + fn readreceipts_since(&self, room_id: &RoomId, since: u64) + -> AnySyncEphemeralRoomEventIter<'_>; /// Sets a private read marker at `count`. fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 8d633529df184f6793a8363bc990246ae12a950a..dcadece63ed9b95146982000b7fe4908b4ebebcc 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -40,6 +40,11 @@ use super::state_compressor::CompressedStateEvent; +type AsyncRecursiveCanonicalJsonVec<'a> = + AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>>; +type AsyncRecursiveCanonicalJsonResult<'a> = + AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>>; + pub struct Service; impl Service { @@ -287,7 +292,7 @@ fn handle_outlier_pdu<'a>( mut value: BTreeMap<String, CanonicalJsonValue>, auth_events_known: bool, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, - ) -> AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>> { + ) -> AsyncRecursiveCanonicalJsonResult<'a> { Box::pin(async move { // 1. Remove unsigned field value.remove("unsigned"); @@ -1022,8 +1027,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( room_id: &'a RoomId, room_version_id: &'a RoomVersionId, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, - ) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> - { + ) -> AsyncRecursiveCanonicalJsonVec<'a> { Box::pin(async move { let back_off = |id| match services() .globals diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index e6e4f8963162eaabe19a7c8e9511544cec44fcfb..d466231edf8fa2ab0a766955471d45a3a83b81f0 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -11,11 +11,13 @@ use super::timeline::PduCount; +type LazyLoadWaitingMutex = + Mutex<HashMap<(OwnedUserId, OwnedDeviceId, OwnedRoomId, PduCount), HashSet<OwnedUserId>>>; + pub struct Service { pub db: &'static dyn Data, - pub lazy_load_waiting: - Mutex<HashMap<(OwnedUserId, OwnedDeviceId, OwnedRoomId, PduCount), HashSet<OwnedUserId>>>, + pub lazy_load_waiting: LazyLoadWaitingMutex, } impl Service { diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 6c4cb3ced8f52231a9d62d8af49a014ea0f465e9..121d80dc78c354d105d36557fe909ad1be6e4b51 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use crate::{service::rooms::timeline::PduCount, PduEvent, Result}; +use crate::{ + service::rooms::timeline::{data::PduData, PduCount}, + Result, +}; use ruma::{EventId, RoomId, UserId}; pub trait Data: Send + Sync { @@ -11,7 +14,7 @@ fn relations_until<'a>( room_id: u64, target: u64, until: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>; + ) -> PduData<'a>; fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>; fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>; fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>; diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index 6eef38fbe90bad602a0a98dbb79eb6f53ac31560..88fd88e58acb1a7730cc2c1943a74404f591867b 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -1,12 +1,10 @@ use crate::Result; use ruma::RoomId; +type SearchPdusResult<'a> = Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>; + pub trait Data: Send + Sync { fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>; - fn search_pdus<'a>( - &'a self, - room_id: &RoomId, - search_string: &str, - ) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>; + fn search_pdus<'a>(&'a self, room_id: &RoomId, search_string: &str) -> SearchPdusResult<'a>; } diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index d8bb4a44f35916c7446f7572e01663c16fc26022..ed8685290b509eeb38a5bae341fa3f49ebbb1898 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -7,6 +7,12 @@ OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; +type StrippedStateEventIter<'a> = + Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>; + +type AnySyncStateEventIter<'a> = + Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>; + pub trait Data: Send + Sync { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; @@ -78,10 +84,7 @@ fn rooms_joined<'a>( ) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>; /// Returns an iterator over all rooms a user was invited to. - fn rooms_invited<'a>( - &'a self, - user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>; + fn rooms_invited<'a>(&'a self, user_id: &UserId) -> StrippedStateEventIter<'a>; fn invite_state( &self, @@ -96,10 +99,7 @@ fn left_state( ) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>>; /// Returns an iterator over all rooms a user left. - fn rooms_left<'a>( - &'a self, - user_id: &UserId, - ) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>; + fn rooms_left<'a>(&'a self, user_id: &UserId) -> AnySyncStateEventIter<'a>; fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool>; diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index b0ede3ee1ebdeb87352be0ff1444fccd31b8bbd9..338249f6477775940394c0fd44df2adb2c3a9efe 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -13,20 +13,44 @@ use self::data::StateDiff; +type StateInfoLruCache = Mutex< + LruCache< + u64, + Vec<( + u64, // sstatehash + Arc<HashSet<CompressedStateEvent>>, // full state + Arc<HashSet<CompressedStateEvent>>, // added + Arc<HashSet<CompressedStateEvent>>, // removed + )>, + >, +>; + +type ShortStateInfoResult = Result< + Vec<( + u64, // sstatehash + Arc<HashSet<CompressedStateEvent>>, // full state + Arc<HashSet<CompressedStateEvent>>, // added + Arc<HashSet<CompressedStateEvent>>, // removed + )>, +>; + +type ParentStatesVec = Vec<( + u64, // sstatehash + Arc<HashSet<CompressedStateEvent>>, // full state + Arc<HashSet<CompressedStateEvent>>, // added + Arc<HashSet<CompressedStateEvent>>, // removed +)>; + +type HashSetCompressStateEvent = Result<( + u64, + Arc<HashSet<CompressedStateEvent>>, + Arc<HashSet<CompressedStateEvent>>, +)>; + pub struct Service { pub db: &'static dyn Data, - pub stateinfo_cache: Mutex< - LruCache< - u64, - Vec<( - u64, // sstatehash - Arc<HashSet<CompressedStateEvent>>, // full state - Arc<HashSet<CompressedStateEvent>>, // added - Arc<HashSet<CompressedStateEvent>>, // removed - )>, - >, - >, + pub stateinfo_cache: StateInfoLruCache, } pub type CompressedStateEvent = [u8; 2 * size_of::<u64>()]; @@ -34,17 +58,7 @@ pub struct Service { impl Service { /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. #[tracing::instrument(skip(self))] - pub fn load_shortstatehash_info( - &self, - shortstatehash: u64, - ) -> Result< - Vec<( - u64, // sstatehash - Arc<HashSet<CompressedStateEvent>>, // full state - Arc<HashSet<CompressedStateEvent>>, // added - Arc<HashSet<CompressedStateEvent>>, // removed - )>, - > { + pub fn load_shortstatehash_info(&self, shortstatehash: u64) -> ShortStateInfoResult { if let Some(r) = self .stateinfo_cache .lock() @@ -144,12 +158,7 @@ pub fn save_state_from_diff( statediffnew: Arc<HashSet<CompressedStateEvent>>, statediffremoved: Arc<HashSet<CompressedStateEvent>>, diff_to_sibling: usize, - mut parent_states: Vec<( - u64, // sstatehash - Arc<HashSet<CompressedStateEvent>>, // full state - Arc<HashSet<CompressedStateEvent>>, // added - Arc<HashSet<CompressedStateEvent>>, // removed - )>, + mut parent_states: ParentStatesVec, ) -> Result<()> { let diffsum = statediffnew.len() + statediffremoved.len(); @@ -257,11 +266,7 @@ pub fn save_state( &self, room_id: &RoomId, new_state_ids_compressed: Arc<HashSet<CompressedStateEvent>>, - ) -> Result<( - u64, - Arc<HashSet<CompressedStateEvent>>, - Arc<HashSet<CompressedStateEvent>>, - )> { + ) -> HashSetCompressStateEvent { let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?; let state_hash = utils::calculate_hash( diff --git a/src/service/rooms/threads/data.rs b/src/service/rooms/threads/data.rs index 9221e8e8998697a6f105f5b3d5cf0226031fa92b..2f062e2337ef8453c9d5e5966feb8ae9cd59a488 100644 --- a/src/service/rooms/threads/data.rs +++ b/src/service/rooms/threads/data.rs @@ -1,6 +1,8 @@ use crate::{PduEvent, Result}; use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId}; +type PduEventIterResult<'a> = Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>; + pub trait Data: Send + Sync { fn threads_until<'a>( &'a self, @@ -8,7 +10,7 @@ fn threads_until<'a>( room_id: &'a RoomId, until: u64, include: &'a IncludeThreads, - ) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>; + ) -> PduEventIterResult<'a>; fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()>; fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>>; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index afa2cfbf0e10d1127159db1299b7fd07cd25068e..df329fece83b8c281ac75a8b9574f4b933c0fe0c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -6,6 +6,8 @@ use super::PduCount; +pub type PduData<'a> = Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>; + pub trait Data: Send + Sync { fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount>; @@ -66,21 +68,12 @@ fn replace_pdu( /// Returns an iterator over all events and their tokens in a room that happened before the /// event with id `until` in reverse-chronological order. - fn pdus_until<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - until: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>; + fn pdus_until<'a>(&'a self, user_id: &UserId, room_id: &RoomId, until: PduCount) + -> PduData<'a>; /// Returns an iterator over all events in a room that happened after the event with id `from` /// in chronological order. - fn pdus_after<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - from: PduCount, - ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>; + fn pdus_after<'a>(&'a self, user_id: &UserId, room_id: &RoomId, from: PduCount) -> PduData<'a>; fn increment_notification_counts( &self, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 048ffe9a3b72e7c6a16272dd8e56ddc7e5317216..5133015a5d7ad443f3b97fd2244e02bba7297535 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1,4 +1,4 @@ -mod data; +pub(crate) mod data; use std::{ cmp::Ordering, diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 2e574e235d1539b69cc791aad71cc2aa3df44cb5..427ee9393ecb51e55923e2e2c63f538363435ed7 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -4,14 +4,13 @@ use super::{OutgoingKind, SendingEventType}; +type OutgoingSendingIter<'a> = + Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>; +type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>; + pub trait Data: Send + Sync { - fn active_requests<'a>( - &'a self, - ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>; - fn active_requests_for<'a>( - &'a self, - outgoing_kind: &OutgoingKind, - ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>; + fn active_requests(&self) -> OutgoingSendingIter<'_>; + fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>; fn delete_active_request(&self, key: Vec<u8>) -> Result<()>; fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 6299b9ea01be89969a35fcd409d2d8bdbdc07ee0..10578679dd511392ef62f0ff06c259b66dde4c45 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -32,10 +32,12 @@ pub struct SlidingSyncCache { extensions: ExtensionsConfig, } +type DbConnections = + Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>; + pub struct Service { pub db: &'static dyn Data, - pub connections: - Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>, + pub connections: DbConnections, } impl Service {