From 12b0efac8bc6b4045e2c6d6bfb648de95f2ec3f3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Sun, 18 Oct 2020 08:56:21 +0200
Subject: [PATCH] fix: random timeline reloads

---
 src/client_server/membership.rs | 13 +++++++++--
 src/client_server/sync.rs       | 10 ++++-----
 src/database/rooms.rs           | 38 +++++++++++++++++++++------------
 src/server_server.rs            | 16 +++++++++++---
 4 files changed, 53 insertions(+), 24 deletions(-)

diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs
index 9c1e7c6c1..06e5adfd2 100644
--- a/src/client_server/membership.rs
+++ b/src/client_server/membership.rs
@@ -22,7 +22,10 @@
     EventId, Raw, RoomId, RoomVersionId, ServerName, UserId,
 };
 use state_res::StateEvent;
-use std::{collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter, sync::Arc};
+use std::{
+    collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter,
+    sync::Arc,
+};
 
 #[cfg(feature = "conduit_bin")]
 use rocket::{get, post};
@@ -642,9 +645,15 @@ async fn join_room_by_id_helper(
                 .expect("Found event_id in sorted events that is not in resolved state");
 
             // We do not rebuild the PDU in this case only insert to DB
-            let pdu_id = db.rooms.append_pdu(
+            let count = db.globals.next_count()?;
+            let mut pdu_id = room_id.as_bytes().to_vec();
+            pdu_id.push(0xff);
+            pdu_id.extend_from_slice(&count.to_be_bytes());
+            db.rooms.append_pdu(
                 &PduEvent::from(&**pdu),
                 &serde_json::to_value(&**pdu).expect("PDU is valid value"),
+                count,
+                pdu_id.clone().into(),
                 &db.globals,
                 &db.account_data,
                 &db.sending,
diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs
index 6f4116020..688d304b6 100644
--- a/src/client_server/sync.rs
+++ b/src/client_server/sync.rs
@@ -110,11 +110,11 @@ pub async fn sync_events_route(
         // since and the current room state, meaning there should be no updates.
         // The inner Option is None when there is an event, but there is no state hash associated
         // with it. This can happen for the RoomCreate event, so all updates should arrive.
-        let since_state_hash = db
-            .rooms
-            .pdus_after(sender_id, &room_id, since) // - 1 So we can get the event at since
-            .next()
-            .map(|pdu| db.rooms.pdu_state_hash(&pdu.ok()?.0).ok()?);
+        let first_pdu_after_since = db.rooms.pdus_after(sender_id, &room_id, since).next();
+
+        let since_state_hash = first_pdu_after_since
+            .as_ref()
+            .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?);
 
         let since_members = since_state_hash.as_ref().map(|state_hash| {
             state_hash.as_ref().and_then(|state_hash| {
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index db473ffbd..35c3eac14 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -438,25 +438,18 @@ pub fn append_pdu(
         &self,
         pdu: &PduEvent,
         pdu_json: &serde_json::Value,
+        count: u64,
+        pdu_id: IVec,
         globals: &super::globals::Globals,
         account_data: &super::account_data::AccountData,
         sending: &super::sending::Sending,
-    ) -> Result<Vec<u8>> {
+    ) -> Result<()> {
         self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?;
 
-        // Increment the last index and use that
-        // This is also the next_batch/since value
-        let index = globals.next_count()?;
-
         // Mark as read first so the sending client doesn't get a notification even if appending
         // fails
         self.edus
-            .private_read_set(&pdu.room_id, &pdu.sender, index, &globals)?;
-
-        let room_id = pdu.room_id.clone();
-        let mut pdu_id = room_id.as_bytes().to_vec();
-        pdu_id.push(0xff);
-        pdu_id.extend_from_slice(&index.to_be_bytes());
+            .private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?;
 
         self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?;
 
@@ -537,7 +530,7 @@ pub fn append_pdu(
                                 },
                                 &UserId::try_from(format!("@conduit:{}", globals.server_name()))
                                     .expect("@conduit:server_name is valid"),
-                                &room_id,
+                                &pdu.room_id,
                                 &globals,
                                 &sending,
                                 &account_data,
@@ -549,7 +542,7 @@ pub fn append_pdu(
             _ => {}
         }
 
-        Ok(pdu_id)
+        Ok(())
     }
 
     /// Generates a new StateHash and associates it with the incoming event.
@@ -834,10 +827,27 @@ pub fn build_and_append_pdu(
             .expect("json is object")
             .insert("event_id".to_owned(), pdu.event_id.to_string().into());
 
-        let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data, sending)?;
+        // Increment the last index and use that
+        // This is also the next_batch/since value
+        let count = globals.next_count()?;
+        let mut pdu_id = room_id.as_bytes().to_vec();
+        pdu_id.push(0xff);
+        pdu_id.extend_from_slice(&count.to_be_bytes());
 
+        // We append to state before appending the pdu, so we don't have a moment in time with the
+        // pdu without it's state. This is okay because append_pdu can't fail.
         self.append_to_state(&pdu_id, &pdu)?;
 
+        self.append_pdu(
+            &pdu,
+            &pdu_json,
+            count,
+            pdu_id.clone().into(),
+            globals,
+            account_data,
+            sending,
+        )?;
+
         for server in self
             .room_servers(room_id)
             .filter_map(|r| r.ok())
diff --git a/src/server_server.rs b/src/server_server.rs
index b8b575ebe..3fefbd509 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -389,9 +389,19 @@ pub fn send_transaction_message_route<'a>(
         let pdu = serde_json::from_value::<PduEvent>(value.clone())
             .expect("all ruma pdus are conduit pdus");
         if db.rooms.exists(&pdu.room_id)? {
-            let pdu_id =
-                db.rooms
-                    .append_pdu(&pdu, &value, &db.globals, &db.account_data, &db.sending)?;
+            let count = db.globals.next_count()?;
+            let mut pdu_id = pdu.room_id.as_bytes().to_vec();
+            pdu_id.push(0xff);
+            pdu_id.extend_from_slice(&count.to_be_bytes());
+            db.rooms.append_pdu(
+                &pdu,
+                &value,
+                count,
+                pdu_id.clone().into(),
+                &db.globals,
+                &db.account_data,
+                &db.sending,
+            )?;
             db.rooms.append_to_state(&pdu_id, &pdu)?;
         }
     }
-- 
GitLab