From 5121f9210c989fcc909e78195133876dff3bc9b9 Mon Sep 17 00:00:00 2001
From: Devon Hudson <devon.dmytro@gmail.com>
Date: Tue, 25 Feb 2025 16:25:39 +0000
Subject: [PATCH] Add background job to clear unreferenced state groups
 (#18154)

Fixes #18150

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erikj@element.io>
---
 changelog.d/18154.feature                     |   1 +
 docs/development/database_schema.md           |   2 +-
 synapse/_scripts/synapse_port_db.py           |  30 +++
 synapse/storage/controllers/purge_events.py   | 246 +++++++++++++-----
 synapse/storage/databases/state/bg_updates.py |  10 +-
 synapse/storage/databases/state/deletion.py   |  42 ++-
 synapse/storage/schema/__init__.py            |   1 +
 .../02_delete_unreferenced_state_groups.sql   |  16 ++
 synapse/types/storage/__init__.py             |   4 +
 tests/storage/test_purge.py                   |  97 +++++++
 10 files changed, 375 insertions(+), 74 deletions(-)
 create mode 100644 changelog.d/18154.feature
 create mode 100644 synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql

diff --git a/changelog.d/18154.feature b/changelog.d/18154.feature
new file mode 100644
index 0000000000..62e1b79a15
--- /dev/null
+++ b/changelog.d/18154.feature
@@ -0,0 +1 @@
+Add background job to clear unreferenced state groups.
diff --git a/docs/development/database_schema.md b/docs/development/database_schema.md
index 37a06acc12..620d1c16b0 100644
--- a/docs/development/database_schema.md
+++ b/docs/development/database_schema.md
@@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor
 * Whether the update requires a previous update to be complete.
 * A rough ordering for which to complete updates.
 
-A new background updates needs to be added to the `background_updates` table:
+A new background update needs to be added to the `background_updates` table:
 
 ```sql
 INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 3f67a739a0..59065a0504 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -191,6 +191,11 @@ APPEND_ONLY_TABLES = [
 
 
 IGNORED_TABLES = {
+    # Porting the auto generated sequence in this table is non-trivial.
+    # None of the entries in this list are mandatory for Synapse to keep working.
+    # If state group disk space is an issue after the port, the
+    # `delete_unreferenced_state_groups_bg_update` background task can be run again.
+    "state_groups_pending_deletion",
     # We don't port these tables, as they're a faff and we can regenerate
     # them anyway.
     "user_directory",
@@ -216,6 +221,15 @@ IGNORED_TABLES = {
 }
 
 
+# These background updates will not be applied upon creation of the postgres database.
+IGNORED_BACKGROUND_UPDATES = {
+    # Reapplying this background update to the postgres database is unnecessary after
+    # already having waited for the SQLite database to complete all running background
+    # updates.
+    "delete_unreferenced_state_groups_bg_update",
+}
+
+
 # Error returned by the run function. Used at the top-level part of the script to
 # handle errors and return codes.
 end_error: Optional[str] = None
@@ -687,6 +701,20 @@ class Porter:
         # 0 means off. 1 means full. 2 means incremental.
         return autovacuum_setting != 0
 
+    async def remove_ignored_background_updates_from_database(self) -> None:
+        def _remove_delete_unreferenced_state_groups_bg_updates(
+            txn: LoggingTransaction,
+        ) -> None:
+            txn.execute(
+                "DELETE FROM background_updates WHERE update_name = ANY(?)",
+                (list(IGNORED_BACKGROUND_UPDATES),),
+            )
+
+        await self.postgres_store.db_pool.runInteraction(
+            "remove_delete_unreferenced_state_groups_bg_updates",
+            _remove_delete_unreferenced_state_groups_bg_updates,
+        )
+
     async def run(self) -> None:
         """Ports the SQLite database to a PostgreSQL database.
 
@@ -732,6 +760,8 @@ class Porter:
                 self.hs_config.database.get_single_database()
             )
 
+            await self.remove_ignored_background_updates_from_database()
+
             await self.run_background_updates_on_postgres()
 
             self.progress.set_state("Creating port tables")
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index 47cec8c469..ef30bf2895 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -21,11 +21,18 @@
 
 import itertools
 import logging
-from typing import TYPE_CHECKING, Collection, Mapping, Set
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Mapping,
+    Set,
+)
 
 from synapse.logging.context import nested_logging_context
 from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.databases import Databases
+from synapse.types.storage import _BackgroundUpdates
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -44,6 +51,11 @@ class PurgeEventsStorageController:
                 self._delete_state_groups_loop, 60 * 1000
             )
 
+        self.stores.state.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
+            self._background_delete_unrefereneced_state_groups,
+        )
+
     async def purge_room(self, room_id: str) -> None:
         """Deletes all record of a room"""
 
@@ -80,68 +92,6 @@ class PurgeEventsStorageController:
                 sg_to_delete
             )
 
-    async def _find_unreferenced_groups(
-        self, state_groups: Collection[int]
-    ) -> Set[int]:
-        """Used when purging history to figure out which state groups can be
-        deleted.
-
-        Args:
-            state_groups: Set of state groups referenced by events
-                that are going to be deleted.
-
-        Returns:
-            The set of state groups that can be deleted.
-        """
-        # Set of events that we have found to be referenced by events
-        referenced_groups = set()
-
-        # Set of state groups we've already seen
-        state_groups_seen = set(state_groups)
-
-        # Set of state groups to handle next.
-        next_to_search = set(state_groups)
-        while next_to_search:
-            # We bound size of groups we're looking up at once, to stop the
-            # SQL query getting too big
-            if len(next_to_search) < 100:
-                current_search = next_to_search
-                next_to_search = set()
-            else:
-                current_search = set(itertools.islice(next_to_search, 100))
-                next_to_search -= current_search
-
-            referenced = await self.stores.main.get_referenced_state_groups(
-                current_search
-            )
-            referenced_groups |= referenced
-
-            # We don't continue iterating up the state group graphs for state
-            # groups that are referenced.
-            current_search -= referenced
-
-            edges = await self.stores.state.get_previous_state_groups(current_search)
-
-            prevs = set(edges.values())
-            # We don't bother re-handling groups we've already seen
-            prevs -= state_groups_seen
-            next_to_search |= prevs
-            state_groups_seen |= prevs
-
-            # We also check to see if anything referencing the state groups are
-            # also unreferenced. This helps ensure that we delete unreferenced
-            # state groups, if we don't then we will de-delta them when we
-            # delete the other state groups leading to increased DB usage.
-            next_edges = await self.stores.state.get_next_state_groups(current_search)
-            nexts = set(next_edges.keys())
-            nexts -= state_groups_seen
-            next_to_search |= nexts
-            state_groups_seen |= nexts
-
-        to_delete = state_groups_seen - referenced_groups
-
-        return to_delete
-
     @wrap_as_background_process("_delete_state_groups_loop")
     async def _delete_state_groups_loop(self) -> None:
         """Background task that deletes any state groups that may be pending
@@ -203,3 +153,173 @@ class PurgeEventsStorageController:
             room_id,
             groups_to_sequences,
         )
+
+    async def _background_delete_unrefereneced_state_groups(
+        self, progress: dict, batch_size: int
+    ) -> int:
+        """This background update will slowly delete any unreferenced state groups"""
+
+        last_checked_state_group = progress.get("last_checked_state_group")
+        max_state_group = progress.get("max_state_group")
+
+        if last_checked_state_group is None or max_state_group is None:
+            # This is the first run.
+            last_checked_state_group = 0
+
+            max_state_group = await self.stores.state.db_pool.simple_select_one_onecol(
+                table="state_groups",
+                keyvalues={},
+                retcol="MAX(id)",
+                allow_none=True,
+                desc="get_max_state_group",
+            )
+            if max_state_group is None:
+                # There are no state groups so the background process is finished.
+                await self.stores.state.db_pool.updates._end_background_update(
+                    _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
+                )
+                return batch_size
+
+        (
+            last_checked_state_group,
+            final_batch,
+        ) = await self._delete_unreferenced_state_groups_batch(
+            last_checked_state_group, batch_size, max_state_group
+        )
+
+        if not final_batch:
+            # There are more state groups to check.
+            progress = {
+                "last_checked_state_group": last_checked_state_group,
+                "max_state_group": max_state_group,
+            }
+            await self.stores.state.db_pool.updates._background_update_progress(
+                _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
+                progress,
+            )
+        else:
+            # This background process is finished.
+            await self.stores.state.db_pool.updates._end_background_update(
+                _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
+            )
+
+        return batch_size
+
+    async def _delete_unreferenced_state_groups_batch(
+        self,
+        last_checked_state_group: int,
+        batch_size: int,
+        max_state_group: int,
+    ) -> tuple[int, bool]:
+        """Looks for unreferenced state groups starting from the last state group
+        checked, and any state groups which would become unreferenced if a state group
+        was deleted, and marks them for deletion.
+
+        Args:
+            last_checked_state_group: The last state group that was checked.
+            batch_size: How many state groups to process in this iteration.
+
+        Returns:
+            (last_checked_state_group, final_batch)
+        """
+
+        # Look for state groups that can be cleaned up.
+        def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]:
+            state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+            txn.execute(
+                state_group_sql, (last_checked_state_group, max_state_group, batch_size)
+            )
+
+            next_set = {row[0] for row in txn}
+
+            return next_set
+
+        next_set = await self.stores.state.db_pool.runInteraction(
+            "get_next_state_groups", get_next_state_groups_txn
+        )
+
+        final_batch = False
+        if len(next_set) < batch_size:
+            final_batch = True
+        else:
+            last_checked_state_group = max(next_set)
+
+        if len(next_set) == 0:
+            return last_checked_state_group, final_batch
+
+        # Find all state groups that can be deleted if the original set is deleted.
+        # This set includes the original set, as well as any state groups that would
+        # become unreferenced upon deleting the original set.
+        to_delete = await self._find_unreferenced_groups(next_set)
+
+        if len(to_delete) == 0:
+            return last_checked_state_group, final_batch
+
+        await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
+            to_delete
+        )
+
+        return last_checked_state_group, final_batch
+
+    async def _find_unreferenced_groups(
+        self,
+        state_groups: Collection[int],
+    ) -> Set[int]:
+        """Used when purging history to figure out which state groups can be
+        deleted.
+
+        Args:
+            state_groups: Set of state groups referenced by events
+                that are going to be deleted.
+
+        Returns:
+            The set of state groups that can be deleted.
+        """
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                current_search = set(itertools.islice(next_to_search, 100))
+                next_to_search -= current_search
+
+            referenced = await self.stores.main.get_referenced_state_groups(
+                current_search
+            )
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            edges = await self.stores.state.get_previous_state_groups(current_search)
+
+            prevs = set(edges.values())
+            # We don't bother re-handling groups we've already seen
+            prevs -= state_groups_seen
+            next_to_search |= prevs
+            state_groups_seen |= prevs
+
+            # We also check to see if anything referencing the state groups are
+            # also unreferenced. This helps ensure that we delete unreferenced
+            # state groups, if we don't then we will de-delta them when we
+            # delete the other state groups leading to increased DB usage.
+            next_edges = await self.stores.state.get_next_state_groups(current_search)
+            nexts = set(next_edges.keys())
+            nexts -= state_groups_seen
+            next_to_search |= nexts
+            state_groups_seen |= nexts
+
+        to_delete = state_groups_seen - referenced_groups
+
+        return to_delete
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index f7824cba0f..95fd0ae73a 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -20,7 +20,15 @@
 #
 
 import logging
-from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    Union,
+)
 
 from synapse.logging.opentracing import tag_args, trace
 from synapse.storage._base import SQLBaseStore
diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index d4b1c20a45..f77c46f6ae 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -321,19 +321,43 @@ class StateDeletionDataStore:
     async def mark_state_groups_as_pending_deletion(
         self, state_groups: Collection[int]
     ) -> None:
-        """Mark the given state groups as pending deletion"""
+        """Mark the given state groups as pending deletion.
 
-        now = self._clock.time_msec()
+        If any of the state groups are already pending deletion, then those records are
+        left as is.
+        """
 
-        await self.db_pool.simple_upsert_many(
-            table="state_groups_pending_deletion",
-            key_names=("state_group",),
-            key_values=[(state_group,) for state_group in state_groups],
-            value_names=("insertion_ts",),
-            value_values=[(now,) for _ in state_groups],
-            desc="mark_state_groups_as_pending_deletion",
+        await self.db_pool.runInteraction(
+            "mark_state_groups_as_pending_deletion",
+            self._mark_state_groups_as_pending_deletion_txn,
+            state_groups,
         )
 
+    def _mark_state_groups_as_pending_deletion_txn(
+        self,
+        txn: LoggingTransaction,
+        state_groups: Collection[int],
+    ) -> None:
+        sql = """
+        INSERT INTO state_groups_pending_deletion (state_group, insertion_ts)
+        VALUES %s
+        ON CONFLICT (state_group)
+        DO NOTHING
+        """
+
+        now = self._clock.time_msec()
+        rows = [
+            (
+                state_group,
+                now,
+            )
+            for state_group in state_groups
+        ]
+        if isinstance(txn.database_engine, PostgresEngine):
+            txn.execute_values(sql % ("?",), rows, fetch=False)
+        else:
+            txn.execute_batch(sql % ("(?, ?)",), rows)
+
     async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
         """Mark the given state groups as now being referenced"""
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 49e648a92f..c90c2c6051 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -158,6 +158,7 @@ Changes in SCHEMA_VERSION = 88
 
 Changes in SCHEMA_VERSION = 89
     - Add `state_groups_pending_deletion` and `state_groups_persisting` tables.
+    - Add background update to delete unreferenced state groups.
 """
 
 
diff --git a/synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql b/synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql
new file mode 100644
index 0000000000..184dc8564c
--- /dev/null
+++ b/synapse/storage/schema/state/delta/89/02_delete_unreferenced_state_groups.sql
@@ -0,0 +1,16 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2025 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- Add a background update to delete any unreferenced state groups
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (8902, 'delete_unreferenced_state_groups_bg_update', '{}');
diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py
index b5fa20a41a..d0a85ef208 100644
--- a/synapse/types/storage/__init__.py
+++ b/synapse/types/storage/__init__.py
@@ -48,3 +48,7 @@ class _BackgroundUpdates:
     SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
         "sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
     )
+
+    DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE = (
+        "delete_unreferenced_state_groups_bg_update"
+    )
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index 916e42e731..ecdc893405 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -24,6 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
 from synapse.rest.client import room
 from synapse.server import HomeServer
 from synapse.types.state import StateFilter
+from synapse.types.storage import _BackgroundUpdates
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
@@ -303,3 +304,99 @@ class PurgeTests(HomeserverTestCase):
             )
         )
         self.assertEqual(len(state_groups), 1)
+
+    def test_clear_unreferenced_state_groups(self) -> None:
+        """Test that any unreferenced state groups are automatically cleaned up."""
+
+        self.helper.send(self.room_id, body="test1")
+        state1 = self.helper.send_state(
+            self.room_id, "org.matrix.test", body={"number": 2}
+        )
+        # Create enough state events to require multiple batches of
+        # delete_unreferenced_state_groups_bg_update to be run.
+        for i in range(200):
+            self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i})
+        state2 = self.helper.send_state(
+            self.room_id, "org.matrix.test", body={"number": 3}
+        )
+        self.helper.send(self.room_id, body="test4")
+        last = self.helper.send(self.room_id, body="test5")
+
+        # Create an unreferenced state group that has a prev group of one of the
+        # to-be-purged events.
+        prev_group = self.get_success(
+            self.store._get_state_group_for_event(state1["event_id"])
+        )
+        unreferenced_state_group = self.get_success(
+            self.state_store.store_state_group(
+                event_id=last["event_id"],
+                room_id=self.room_id,
+                prev_group=prev_group,
+                delta_ids={("org.matrix.test", ""): state2["event_id"]},
+                current_state_ids=None,
+            )
+        )
+
+        another_unreferenced_state_group = self.get_success(
+            self.state_store.store_state_group(
+                event_id=last["event_id"],
+                room_id=self.room_id,
+                prev_group=unreferenced_state_group,
+                delta_ids={("org.matrix.test", ""): state2["event_id"]},
+                current_state_ids=None,
+            )
+        )
+
+        # Insert and run the background update.
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "background_updates",
+                {
+                    "update_name": _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
+                    "progress_json": "{}",
+                },
+            )
+        )
+        self.store.db_pool.updates._all_done = False
+        self.wait_for_background_updates()
+
+        # Advance so that the background job to delete the state groups runs
+        self.reactor.advance(
+            1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000
+        )
+
+        # We expect that the unreferenced state group has been deleted.
+        row = self.get_success(
+            self.state_store.db_pool.simple_select_one_onecol(
+                table="state_groups",
+                keyvalues={"id": unreferenced_state_group},
+                retcol="id",
+                allow_none=True,
+                desc="test_purge_unreferenced_state_group",
+            )
+        )
+        self.assertIsNone(row)
+
+        # We expect that the other unreferenced state group has also been deleted.
+        row = self.get_success(
+            self.state_store.db_pool.simple_select_one_onecol(
+                table="state_groups",
+                keyvalues={"id": another_unreferenced_state_group},
+                retcol="id",
+                allow_none=True,
+                desc="test_purge_unreferenced_state_group",
+            )
+        )
+        self.assertIsNone(row)
+
+        # We expect there to now only be one state group for the room, which is
+        # the state group of the last event (as the only outlier).
+        state_groups = self.get_success(
+            self.state_store.db_pool.simple_select_onecol(
+                table="state_groups",
+                keyvalues={"room_id": self.room_id},
+                retcol="id",
+                desc="test_purge_unreferenced_state_group",
+            )
+        )
+        self.assertEqual(len(state_groups), 207)
-- 
GitLab