diff --git a/changelog.d/18024.bugfix b/changelog.d/18024.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..956f43f0362f38c2730ec1ac7901b22ecfc1a1fd
--- /dev/null
+++ b/changelog.d/18024.bugfix
@@ -0,0 +1 @@
+Properly purge state groups tables when purging a room with the admin API.
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index e794b370c25c992333ca382b27a8796ea36d3bdd..15c04ffef891fbb8e1e55dcdc0128731c9b7df89 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -42,8 +42,8 @@ class PurgeEventsStorageController:
         """Deletes all record of a room"""
 
         with nested_logging_context(room_id):
-            state_groups_to_delete = await self.stores.main.purge_room(room_id)
-            await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
+            await self.stores.main.purge_room(room_id)
+            await self.stores.state.purge_room_state(room_id)
 
     async def purge_history(
         self, room_id: str, token: str, delete_local_events: bool
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index c195685af8338df80637d4853c86a5072cd3ba93..ebdeb8fbd701329bc032f358d7575dcb778ea7c7 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -20,7 +20,7 @@
 #
 
 import logging
-from typing import Any, List, Set, Tuple, cast
+from typing import Any, Set, Tuple, cast
 
 from synapse.api.errors import SynapseError
 from synapse.storage.database import LoggingTransaction
@@ -332,7 +332,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
 
         return referenced_state_groups
 
-    async def purge_room(self, room_id: str) -> List[int]:
+    async def purge_room(self, room_id: str) -> None:
         """Deletes all record of a room
 
         Args:
@@ -348,7 +348,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # purge any of those rows which were added during the first.
 
         logger.info("[purge] Starting initial main purge of [1/2]")
-        state_groups_to_delete = await self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "purge_room",
             self._purge_room_txn,
             room_id=room_id,
@@ -356,18 +356,15 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         )
 
         logger.info("[purge] Starting secondary main purge of [2/2]")
-        state_groups_to_delete.extend(
-            await self.db_pool.runInteraction(
-                "purge_room",
-                self._purge_room_txn,
-                room_id=room_id,
-            ),
+        await self.db_pool.runInteraction(
+            "purge_room",
+            self._purge_room_txn,
+            room_id=room_id,
         )
-        logger.info("[purge] Done with main purge")
 
-        return state_groups_to_delete
+        logger.info("[purge] Done with main purge")
 
-    def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
+    def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> None:
         # This collides with event persistence so we cannot write new events and metadata into
         # a room while deleting it or this transaction will fail.
         if isinstance(self.database_engine, PostgresEngine):
@@ -381,19 +378,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             # take a while!
             txn.execute("SET LOCAL statement_timeout = 0")
 
-        # First, fetch all the state groups that should be deleted, before
-        # we delete that information.
-        txn.execute(
-            """
-                SELECT DISTINCT state_group FROM events
-                INNER JOIN event_to_state_groups USING(event_id)
-                WHERE events.room_id = ?
-            """,
-            (room_id,),
-        )
-
-        state_groups = [row[0] for row in txn]
-
         # Get all the auth chains that are referenced by events that are to be
         # deleted.
         txn.execute(
@@ -513,5 +497,3 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         #       periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
 
         self._invalidate_caches_for_room_and_stream(txn, room_id)
-
-        return state_groups
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index f7a59c8992db96abb8466d2df9c0cab70401f81e..9944f90015ced2e0735a47f5db1fd03e0abe5477 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -840,60 +840,42 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return dict(rows)
 
-    async def purge_room_state(
-        self, room_id: str, state_groups_to_delete: Collection[int]
-    ) -> None:
-        """Deletes all record of a room from state tables
-
-        Args:
-            room_id:
-            state_groups_to_delete: State groups to delete
-        """
-
-        logger.info("[purge] Starting state purge")
-        await self.db_pool.runInteraction(
+    async def purge_room_state(self, room_id: str) -> None:
+        return await self.db_pool.runInteraction(
             "purge_room_state",
             self._purge_room_state_txn,
             room_id,
-            state_groups_to_delete,
         )
-        logger.info("[purge] Done with state purge")
 
     def _purge_room_state_txn(
         self,
         txn: LoggingTransaction,
         room_id: str,
-        state_groups_to_delete: Collection[int],
     ) -> None:
-        # first we have to delete the state groups states
-        logger.info("[purge] removing %s from state_groups_state", room_id)
-
-        self.db_pool.simple_delete_many_txn(
-            txn,
-            table="state_groups_state",
-            column="state_group",
-            values=state_groups_to_delete,
-            keyvalues={},
-        )
-
-        # ... and the state group edges
+        # Delete all edges that reference a state group linked to room_id
         logger.info("[purge] removing %s from state_group_edges", room_id)
+        txn.execute(
+            """
+            DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
+                SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
+            )""",
+            (room_id,),
+        )
 
-        self.db_pool.simple_delete_many_txn(
-            txn,
-            table="state_group_edges",
-            column="state_group",
-            values=state_groups_to_delete,
-            keyvalues={},
+        # state_groups_state table has a room_id column but no index on it, unlike state_groups,
+        # so we delete them by matching the room_id through the state_groups table.
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+        txn.execute(
+            """
+            DELETE FROM state_groups_state AS sgs WHERE sgs.state_group IN (
+                SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
+            )""",
+            (room_id,),
         )
 
-        # ... and the state groups
         logger.info("[purge] removing %s from state_groups", room_id)
-
-        self.db_pool.simple_delete_many_txn(
+        self.db_pool.simple_delete_txn(
             txn,
             table="state_groups",
-            column="id",
-            values=state_groups_to_delete,
-            keyvalues={},
+            keyvalues={"room_id": room_id},
         )
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 95ed73645107bd8c882bfad8205b77177fb2f18d..99df5915290789c2f0a15e03d665149c798691ec 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -3050,7 +3050,7 @@ PURGE_TABLES = [
     "pusher_throttle",
     "room_account_data",
     "room_tags",
-    # "state_groups",  # Current impl leaves orphaned state groups around.
+    "state_groups",
     "state_groups_state",
     "federation_inbound_events_staging",
 ]