From ec24813220f9d54108924dc04aecd24555277b99 Mon Sep 17 00:00:00 2001
From: Patrick Cloke <>
Date: Thu, 4 Aug 2022 15:24:44 -0400
Subject: [PATCH] Improve comments (& avoid a duplicate query) in push actions
 processing. (#13455)

* Adds docstrings and inline comments.
* Formats SQL queries using triple quoted strings.
* Minor formatting changes.
* Avoid fetching `event_push_summary_stream_ordering` multiple times
  in the same transactions.
 changelog.d/13455.misc                        |   1 +
 .../databases/main/      | 282 ++++++++++--------
 2 files changed, 159 insertions(+), 124 deletions(-)
 create mode 100644 changelog.d/13455.misc

diff --git a/changelog.d/13455.misc b/changelog.d/13455.misc
new file mode 100644
index 0000000000..17462c56f3
--- /dev/null
+++ b/changelog.d/13455.misc
@@ -0,0 +1 @@
+Add some comments about how event push actions are stored.
diff --git a/synapse/storage/databases/main/ b/synapse/storage/databases/main/
index dd2627037c..5ddddb1cf3 100644
--- a/synapse/storage/databases/main/
+++ b/synapse/storage/databases/main/
@@ -265,7 +265,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             counts.notify_count += row[1]
             counts.unread_count += row[2]
-        # Next we need to count highlights, which aren't summarized
+        # Next we need to count highlights, which aren't summarised
         sql = """
             SELECT COUNT(*) FROM event_push_actions
             WHERE user_id = ?
@@ -280,7 +280,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # Finally we need to count push actions that aren't included in the
         # summary returned above, e.g. recent events that haven't been
-        # summarized yet, or the summary is empty due to a recent read receipt.
+        # summarised yet, or the summary is empty due to a recent read receipt.
         stream_ordering = max(stream_ordering, summary_stream_ordering)
         notify_count, unread_count = self._get_notif_unread_count_for_user_room(
             txn, room_id, user_id, stream_ordering
@@ -304,6 +304,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         Does not consult `event_push_summary` table, which may include push
         actions that have been deleted from `event_push_actions` table.
+        Args:
+            txn: The database transaction.
+            room_id: The room ID to get unread counts for.
+            user_id: The user ID to get unread counts for.
+            stream_ordering: The (exclusive) minimum stream ordering to consider.
+            max_stream_ordering: The (inclusive) maximum stream ordering to consider.
+                If this is not given, then no maximum is applied.
+        Return:
+            A tuple of the notif count and unread count in the given range.
         # If there have been no events in the room since the stream ordering,
@@ -383,27 +394,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         ) -> List[Tuple[str, str, int, str, bool]]:
             # find rooms that have a read receipt in them and return the next
             # push actions
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight "
-                " FROM ("
-                "   SELECT room_id,"
-                "       MAX(stream_ordering) as stream_ordering"
-                "   FROM events"
-                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
-                "   WHERE receipt_type = '' AND user_id = ?"
-                "   GROUP BY room_id"
-                ") AS rl,"
-                " event_push_actions AS ep"
-                " WHERE"
-                "   ep.room_id = rl.room_id"
-                "   AND ep.stream_ordering > rl.stream_ordering"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                "   AND ep.notif = 1"
-                " ORDER BY ep.stream_ordering ASC LIMIT ?"
-            )
+            sql = """
+                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+                    ep.highlight
+                FROM (
+                    SELECT room_id,
+                        MAX(stream_ordering) as stream_ordering
+                    FROM events
+                    INNER JOIN receipts_linearized USING (room_id, event_id)
+                    WHERE receipt_type = '' AND user_id = ?
+                    GROUP BY room_id
+                ) AS rl,
+                    event_push_actions AS ep
+                WHERE
+                    ep.room_id = rl.room_id
+                    AND ep.stream_ordering > rl.stream_ordering
+                    AND ep.user_id = ?
+                    AND ep.stream_ordering > ?
+                    AND ep.stream_ordering <= ?
+                    AND ep.notif = 1
+                ORDER BY ep.stream_ordering ASC LIMIT ?
+            """
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
             txn.execute(sql, args)
             return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -418,23 +429,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         def get_no_receipt(
             txn: LoggingTransaction,
         ) -> List[Tuple[str, str, int, str, bool]]:
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight "
-                " FROM event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id NOT IN ("
-                "     SELECT room_id FROM receipts_linearized"
-                "       WHERE receipt_type = '' AND user_id = ?"
-                "       GROUP BY room_id"
-                "   )"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                "   AND ep.notif = 1"
-                " ORDER BY ep.stream_ordering ASC LIMIT ?"
-            )
+            sql = """
+                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+                    ep.highlight
+                FROM event_push_actions AS ep
+                INNER JOIN events AS e USING (room_id, event_id)
+                WHERE
+                    ep.room_id NOT IN (
+                        SELECT room_id FROM receipts_linearized
+                        WHERE receipt_type = '' AND user_id = ?
+                        GROUP BY room_id
+                    )
+                    AND ep.user_id = ?
+                    AND ep.stream_ordering > ?
+                    AND ep.stream_ordering <= ?
+                    AND ep.notif = 1
+                ORDER BY ep.stream_ordering ASC LIMIT ?
+            """
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
             txn.execute(sql, args)
             return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -490,28 +501,28 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         def get_after_receipt(
             txn: LoggingTransaction,
         ) -> List[Tuple[str, str, int, str, bool, int]]:
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "  ep.highlight, e.received_ts"
-                " FROM ("
-                "   SELECT room_id,"
-                "       MAX(stream_ordering) as stream_ordering"
-                "   FROM events"
-                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
-                "   WHERE receipt_type = '' AND user_id = ?"
-                "   GROUP BY room_id"
-                ") AS rl,"
-                " event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id = rl.room_id"
-                "   AND ep.stream_ordering > rl.stream_ordering"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                "   AND ep.notif = 1"
-                " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            )
+            sql = """
+                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+                    ep.highlight, e.received_ts
+                FROM (
+                    SELECT room_id,
+                        MAX(stream_ordering) as stream_ordering
+                    FROM events
+                    INNER JOIN receipts_linearized USING (room_id, event_id)
+                    WHERE receipt_type = '' AND user_id = ?
+                    GROUP BY room_id
+                ) AS rl,
+                event_push_actions AS ep
+                INNER JOIN events AS e USING (room_id, event_id)
+                WHERE
+                    ep.room_id = rl.room_id
+                    AND ep.stream_ordering > rl.stream_ordering
+                    AND ep.user_id = ?
+                    AND ep.stream_ordering > ?
+                    AND ep.stream_ordering <= ?
+                    AND ep.notif = 1
+                ORDER BY ep.stream_ordering DESC LIMIT ?
+            """
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
             txn.execute(sql, args)
             return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@@ -526,23 +537,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         def get_no_receipt(
             txn: LoggingTransaction,
         ) -> List[Tuple[str, str, int, str, bool, int]]:
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight, e.received_ts"
-                " FROM event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id NOT IN ("
-                "     SELECT room_id FROM receipts_linearized"
-                "       WHERE receipt_type = '' AND user_id = ?"
-                "       GROUP BY room_id"
-                "   )"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                "   AND ep.notif = 1"
-                " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            )
+            sql = """
+                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
+                    ep.highlight, e.received_ts
+                FROM event_push_actions AS ep
+                INNER JOIN events AS e USING (room_id, event_id)
+                WHERE
+                    ep.room_id NOT IN (
+                        SELECT room_id FROM receipts_linearized
+                        WHERE receipt_type = '' AND user_id = ?
+                        GROUP BY room_id
+                    )
+                    AND ep.user_id = ?
+                    AND ep.stream_ordering > ?
+                    AND ep.stream_ordering <= ?
+                    AND ep.notif = 1
+                ORDER BY ep.stream_ordering DESC LIMIT ?
+            """
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
             txn.execute(sql, args)
             return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@@ -769,12 +780,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # [10, <none>, 20], we should treat this as being equivalent to
         # [10, 10, 20].
-        sql = (
-            "SELECT received_ts FROM events"
-            " WHERE stream_ordering <= ?"
-            " ORDER BY stream_ordering DESC"
-            " LIMIT 1"
-        )
+        sql = """
+            SELECT received_ts FROM events
+            WHERE stream_ordering <= ?
+            ORDER BY stream_ordering DESC
+            LIMIT 1
+        """
         while range_end - range_start > 0:
             middle = (range_end + range_start) // 2
@@ -802,14 +813,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         self, stream_ordering: int
     ) -> Optional[int]:
         def f(txn: LoggingTransaction) -> Optional[Tuple[int]]:
-            sql = (
-                "SELECT e.received_ts"
-                " FROM event_push_actions AS ep"
-                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
-                " WHERE ep.stream_ordering > ? AND notif = 1"
-                " ORDER BY ep.stream_ordering ASC"
-                " LIMIT 1"
-            )
+            sql = """
+                SELECT e.received_ts
+                FROM event_push_actions AS ep
+                JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id
+                WHERE ep.stream_ordering > ? AND notif = 1
+                ORDER BY ep.stream_ordering ASC
+                LIMIT 1
+            """
             txn.execute(sql, (stream_ordering,))
             return cast(Optional[Tuple[int]], txn.fetchone())
@@ -858,10 +869,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         Any push actions which predate the user's most recent read receipt are
         now redundant, so we can remove them from `event_push_actions` and
         update `event_push_summary`.
+        Returns true if all new receipts have been processed.
         limit = 100
+        # The (inclusive) receipt stream ID that was previously processed..
         min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn(
@@ -871,6 +885,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         max_receipts_stream_id = self._receipts_id_gen.get_current_token()
+        # The (inclusive) event stream ordering that was previously summarised.
+        old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
+        )
         sql = """
             SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
             FROM receipts_linearized AS r
@@ -895,13 +917,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         rows = txn.fetchall()
-        old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary_stream_ordering",
-            keyvalues={},
-            retcol="stream_ordering",
-        )
         # For each new read receipt we delete push actions from before it and
         # recalculate the summary.
         for _, room_id, user_id, stream_ordering in rows:
@@ -920,10 +935,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 (room_id, user_id, stream_ordering),
+            # Fetch the notification counts between the stream ordering of the
+            # latest receipt and what was previously summarised.
             notif_count, unread_count = self._get_notif_unread_count_for_user_room(
                 txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
+            # Replace the previous summary with the new counts.
@@ -956,10 +974,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         return len(rows) < limit
     def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
-        """Archives older notifications into event_push_summary. Returns whether
-        the archiving process has caught up or not.
+        """Archives older notifications (from event_push_actions) into event_push_summary.
+        Returns whether the archiving process has caught up or not.
+        # The (inclusive) event stream ordering that was previously summarised.
         old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
@@ -974,7 +994,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             SELECT stream_ordering FROM event_push_actions
             WHERE stream_ordering > ?
             ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
-        """,
+            """,
             (old_rotate_stream_ordering, self._rotate_count),
         stream_row = txn.fetchone()
@@ -993,19 +1013,31 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 "Rotating notifications up to: %s", rotate_to_stream_ordering)
-        self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
+        self._rotate_notifs_before_txn(
+            txn, old_rotate_stream_ordering, rotate_to_stream_ordering
+        )
         return caught_up
     def _rotate_notifs_before_txn(
-        self, txn: LoggingTransaction, rotate_to_stream_ordering: int
+        self,
+        txn: LoggingTransaction,
+        old_rotate_stream_ordering: int,
+        rotate_to_stream_ordering: int,
     ) -> None:
-        old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary_stream_ordering",
-            keyvalues={},
-            retcol="stream_ordering",
-        )
+        """Archives older notifications (from event_push_actions) into event_push_summary.
+        Any event_push_actions between old_rotate_stream_ordering (exclusive) and
+        rotate_to_stream_ordering (inclusive) will be added to the event_push_summary
+        table.
+        Args:
+            txn: The database transaction.
+            old_rotate_stream_ordering: The previous maximum event stream ordering.
+            rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
+        Returns whether the archiving process has caught up or not.
+        """
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
@@ -1093,9 +1125,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
     async def _remove_old_push_actions_that_have_rotated(
     ) -> None:
-        """Clear out old push actions that have been summarized."""
+        """Clear out old push actions that have been summarised."""
-        # We want to clear out anything that older than a day that *has* already
+        # We want to clear out anything that is older than a day that *has* already
         # been rotated.
         rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
@@ -1119,7 +1151,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 SELECT stream_ordering FROM event_push_actions
                 WHERE stream_ordering <= ? AND highlight = 0
                 ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
-            """,
+                """,
@@ -1215,16 +1247,18 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             # NB. This assumes event_ids are globally unique since
             # it makes the query easier to index
-            sql = (
-                "SELECT epa.event_id, epa.room_id,"
-                " epa.stream_ordering, epa.topological_ordering,"
-                " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
-                " FROM event_push_actions epa, events e"
-                " WHERE epa.event_id = e.event_id"
-                " AND epa.user_id = ? %s"
-                " AND epa.notif = 1"
-                " ORDER BY epa.stream_ordering DESC"
-                " LIMIT ?" % (before_clause,)
+            sql = """
+                SELECT epa.event_id, epa.room_id,
+                    epa.stream_ordering, epa.topological_ordering,
+                    epa.actions, epa.highlight, epa.profile_tag, e.received_ts
+                FROM event_push_actions epa, events e
+                WHERE epa.event_id = e.event_id
+                    AND epa.user_id = ? %s
+                    AND epa.notif = 1
+                ORDER BY epa.stream_ordering DESC
+                LIMIT ?
+            """ % (
+                before_clause,
             txn.execute(sql, args)
             return cast(