From b3ba501c526bf15240bb8cc635858d05cd702c2c Mon Sep 17 00:00:00 2001
From: Mathieu Velten <matmaul@gmail.com>
Date: Mon, 6 Jan 2025 16:32:18 +0100
Subject: [PATCH] Properly purge state groups tables when purging a room
 (#18024)

Currently purging a complex room can lead to a lot of orphaned rows left
behind in the state groups tables.
It seems it is because we are loosing track of state groups sometimes.

This change uses the `room_id` indexed column of `state_groups` table to
decide what to delete instead of doing an indirection through
`event_to_state_groups`.

Related to https://github.com/element-hq/synapse/issues/3364.

### Pull Request Checklist

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erikj@jki.re>
---
 changelog.d/18024.bugfix                      |  1 +
 synapse/storage/controllers/purge_events.py   |  4 +-
 .../storage/databases/main/purge_events.py    | 36 +++--------
 synapse/storage/databases/state/store.py      | 60 +++++++------------
 tests/rest/admin/test_room.py                 |  2 +-
 5 files changed, 34 insertions(+), 69 deletions(-)
 create mode 100644 changelog.d/18024.bugfix

diff --git a/changelog.d/18024.bugfix b/changelog.d/18024.bugfix
new file mode 100644
index 0000000000..956f43f036
--- /dev/null
+++ b/changelog.d/18024.bugfix
@@ -0,0 +1 @@
+Properly purge state groups tables when purging a room with the admin API.
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index e794b370c2..15c04ffef8 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -42,8 +42,8 @@ class PurgeEventsStorageController:
         """Deletes all record of a room"""
 
         with nested_logging_context(room_id):
-            state_groups_to_delete = await self.stores.main.purge_room(room_id)
-            await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
+            await self.stores.main.purge_room(room_id)
+            await self.stores.state.purge_room_state(room_id)
 
     async def purge_history(
         self, room_id: str, token: str, delete_local_events: bool
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index c195685af8..ebdeb8fbd7 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -20,7 +20,7 @@
 #
 
 import logging
-from typing import Any, List, Set, Tuple, cast
+from typing import Any, Set, Tuple, cast
 
 from synapse.api.errors import SynapseError
 from synapse.storage.database import LoggingTransaction
@@ -332,7 +332,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
 
         return referenced_state_groups
 
-    async def purge_room(self, room_id: str) -> List[int]:
+    async def purge_room(self, room_id: str) -> None:
         """Deletes all record of a room
 
         Args:
@@ -348,7 +348,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # purge any of those rows which were added during the first.
 
         logger.info("[purge] Starting initial main purge of [1/2]")
-        state_groups_to_delete = await self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "purge_room",
             self._purge_room_txn,
             room_id=room_id,
@@ -356,18 +356,15 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         )
 
         logger.info("[purge] Starting secondary main purge of [2/2]")
-        state_groups_to_delete.extend(
-            await self.db_pool.runInteraction(
-                "purge_room",
-                self._purge_room_txn,
-                room_id=room_id,
-            ),
+        await self.db_pool.runInteraction(
+            "purge_room",
+            self._purge_room_txn,
+            room_id=room_id,
         )
-        logger.info("[purge] Done with main purge")
 
-        return state_groups_to_delete
+        logger.info("[purge] Done with main purge")
 
-    def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
+    def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> None:
         # This collides with event persistence so we cannot write new events and metadata into
         # a room while deleting it or this transaction will fail.
         if isinstance(self.database_engine, PostgresEngine):
@@ -381,19 +378,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             # take a while!
             txn.execute("SET LOCAL statement_timeout = 0")
 
-        # First, fetch all the state groups that should be deleted, before
-        # we delete that information.
-        txn.execute(
-            """
-                SELECT DISTINCT state_group FROM events
-                INNER JOIN event_to_state_groups USING(event_id)
-                WHERE events.room_id = ?
-            """,
-            (room_id,),
-        )
-
-        state_groups = [row[0] for row in txn]
-
         # Get all the auth chains that are referenced by events that are to be
         # deleted.
         txn.execute(
@@ -513,5 +497,3 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         #       periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
 
         self._invalidate_caches_for_room_and_stream(txn, room_id)
-
-        return state_groups
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index f7a59c8992..9944f90015 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -840,60 +840,42 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return dict(rows)
 
-    async def purge_room_state(
-        self, room_id: str, state_groups_to_delete: Collection[int]
-    ) -> None:
-        """Deletes all record of a room from state tables
-
-        Args:
-            room_id:
-            state_groups_to_delete: State groups to delete
-        """
-
-        logger.info("[purge] Starting state purge")
-        await self.db_pool.runInteraction(
+    async def purge_room_state(self, room_id: str) -> None:
+        return await self.db_pool.runInteraction(
             "purge_room_state",
             self._purge_room_state_txn,
             room_id,
-            state_groups_to_delete,
         )
-        logger.info("[purge] Done with state purge")
 
     def _purge_room_state_txn(
         self,
         txn: LoggingTransaction,
         room_id: str,
-        state_groups_to_delete: Collection[int],
     ) -> None:
-        # first we have to delete the state groups states
-        logger.info("[purge] removing %s from state_groups_state", room_id)
-
-        self.db_pool.simple_delete_many_txn(
-            txn,
-            table="state_groups_state",
-            column="state_group",
-            values=state_groups_to_delete,
-            keyvalues={},
-        )
-
-        # ... and the state group edges
+        # Delete all edges that reference a state group linked to room_id
         logger.info("[purge] removing %s from state_group_edges", room_id)
+        txn.execute(
+            """
+            DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
+                SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
+            )""",
+            (room_id,),
+        )
 
-        self.db_pool.simple_delete_many_txn(
-            txn,
-            table="state_group_edges",
-            column="state_group",
-            values=state_groups_to_delete,
-            keyvalues={},
+        # state_groups_state table has a room_id column but no index on it, unlike state_groups,
+        # so we delete them by matching the room_id through the state_groups table.
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+        txn.execute(
+            """
+            DELETE FROM state_groups_state AS sgs WHERE sgs.state_group IN (
+                SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
+            )""",
+            (room_id,),
         )
 
-        # ... and the state groups
         logger.info("[purge] removing %s from state_groups", room_id)
-
-        self.db_pool.simple_delete_many_txn(
+        self.db_pool.simple_delete_txn(
             txn,
             table="state_groups",
-            column="id",
-            values=state_groups_to_delete,
-            keyvalues={},
+            keyvalues={"room_id": room_id},
         )
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 95ed736451..99df591529 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -3050,7 +3050,7 @@ PURGE_TABLES = [
     "pusher_throttle",
     "room_account_data",
     "room_tags",
-    # "state_groups",  # Current impl leaves orphaned state groups around.
+    "state_groups",
     "state_groups_state",
     "federation_inbound_events_staging",
 ]
-- 
GitLab