diff --git a/changelog.d/18141.bugfix b/changelog.d/18141.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..7826a519fab65d54c7cef9557bb3e359b18e08f7
--- /dev/null
+++ b/changelog.d/18141.bugfix
@@ -0,0 +1 @@
+Fix regression in performance of sending events due to superfluous reads and locks. Introduced in v1.124.0rc1.
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5b746f2037a1e953afa6044b6ce45ce0d6c449a6..9e48e09270fa018ce1a8b3022cc8b6124be71c4f 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -359,6 +359,28 @@ class StateHandler:
                 await_full_state=False,
             )
 
+            # Ensure we still have the state groups we're relying on, and bump
+            # their usage time to avoid them being deleted from under us.
+            if entry.state_group:
+                missing_state_group = await self._state_deletion_store.check_state_groups_and_bump_deletion(
+                    {entry.state_group}
+                )
+                if missing_state_group:
+                    raise Exception(f"Missing state group: {entry.state_group}")
+            elif entry.prev_group:
+                # We only rely on the prev group when persisting the event if we
+                # don't have an `entry.state_group`.
+                missing_state_group = await self._state_deletion_store.check_state_groups_and_bump_deletion(
+                    {entry.prev_group}
+                )
+
+                if missing_state_group:
+                    # If we're missing the prev group then we can just clear the
+                    # entries, and rely on `entry._state` (which must exist if
+                    # `entry.state_group` is None)
+                    entry.prev_group = None
+                    entry.delta_ids = None
+
             state_group_before_event_prev_group = entry.prev_group
             deltas_to_state_group_before_event = entry.delta_ids
             state_ids_before_event = None
@@ -519,16 +541,6 @@ class StateHandler:
                 state_group_id
             )
 
-            if prev_group:
-                # Ensure that we still have the prev group, and ensure we don't
-                # delete it while we're persisting the event.
-                missing_state_group = await self._state_deletion_store.check_state_groups_and_bump_deletion(
-                    {prev_group}
-                )
-                if missing_state_group:
-                    prev_group = None
-                    delta_ids = None
-
             return _StateCacheEntry(
                 state=None,
                 state_group=state_group_id,
diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index d0949261f2baccaf9adbfb7fdf8fa7084f88c5fb..d4b1c20a455fb3953b23a5bc3e1a57c5e0e1b6da 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -123,12 +123,28 @@ class StateDeletionDataStore:
             "check_state_groups_and_bump_deletion",
             self._check_state_groups_and_bump_deletion_txn,
             state_groups,
+            # We don't need to lock if we're just doing a quick check, as the
+            # lock doesn't prevent any races here.
+            lock=False,
         )
 
     def _check_state_groups_and_bump_deletion_txn(
-        self, txn: LoggingTransaction, state_groups: AbstractSet[int]
+        self, txn: LoggingTransaction, state_groups: AbstractSet[int], lock: bool = True
     ) -> Collection[int]:
-        existing_state_groups = self._get_existing_groups_with_lock(txn, state_groups)
+        """Checks to make sure that the state groups haven't been deleted, and
+        if they're pending deletion we delay it (allowing time for any event
+        that will use them to finish persisting).
+
+        The `lock` flag sets if we should lock the `state_group` rows we're
+        checking, which we should do when storing new groups.
+
+        Returns:
+            The state groups that are missing, if any.
+        """
+
+        existing_state_groups = self._get_existing_groups_with_lock(
+            txn, state_groups, lock=lock
+        )
 
         self._bump_deletion_txn(txn, existing_state_groups)
 
@@ -188,18 +204,18 @@ class StateDeletionDataStore:
             )
 
     def _get_existing_groups_with_lock(
-        self, txn: LoggingTransaction, state_groups: Collection[int]
+        self, txn: LoggingTransaction, state_groups: Collection[int], lock: bool = True
     ) -> AbstractSet[int]:
         """Return which of the given state groups are in the database, and locks
         those rows with `KEY SHARE` to ensure they don't get concurrently
-        deleted."""
+        deleted (if `lock` is true)."""
         clause, args = make_in_list_sql_clause(self.db_pool.engine, "id", state_groups)
 
         sql = f"""
             SELECT id FROM state_groups
             WHERE {clause}
         """
-        if isinstance(self.db_pool.engine, PostgresEngine):
+        if lock and isinstance(self.db_pool.engine, PostgresEngine):
             # On postgres we add a row level lock to the rows to ensure that we
             # conflict with any concurrent DELETEs. `FOR KEY SHARE` lock will
             # not conflict with other read
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 3bb539bf87bd27a718c269ba180abeb6709d943c..5473a8e76983970df38040b6dbac9774da3db193 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -742,7 +742,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(36, channel.resource_usage.db_txn_count)
+        self.assertEqual(35, channel.resource_usage.db_txn_count)
 
     def test_post_room_initial_state(self) -> None:
         # POST with initial_state config key, expect new room id
@@ -755,7 +755,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(38, channel.resource_usage.db_txn_count)
+        self.assertEqual(37, channel.resource_usage.db_txn_count)
 
     def test_post_room_visibility_key(self) -> None:
         # POST with visibility config key, expect new room id