diff --git a/changelog.d/17693.misc b/changelog.d/17693.misc
new file mode 100644
index 0000000000000000000000000000000000000000..0d20c8091641d83ecf9ae132d22fbfbbf28e835e
--- /dev/null
+++ b/changelog.d/17693.misc
@@ -0,0 +1 @@
+Use Sliding Sync tables as a bulk shortcut for getting the max `event_stream_ordering` of rooms.
diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py
index 165b15c60f5ac5c1ce3b744159e0b9a0f8b8c3ae..652d05dbe98f5e7b111392cab5f14639c135c681 100644
--- a/synapse/handlers/sliding_sync/room_lists.py
+++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -27,6 +27,7 @@ from typing import (
     Set,
     Tuple,
     Union,
+    cast,
 )
 
 import attr
@@ -355,11 +356,18 @@ class SlidingSyncRoomLists:
                     if list_config.ranges:
                         if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
                             # If we are asking for the full range, we don't need to sort the list.
-                            sorted_room_info = list(filtered_sync_room_map.values())
+                            sorted_room_info: List[RoomsForUserType] = list(
+                                filtered_sync_room_map.values()
+                            )
                         else:
                             # Sort the list
-                            sorted_room_info = await self.sort_rooms_using_tables(
-                                filtered_sync_room_map, to_token
+                            sorted_room_info = await self.sort_rooms(
+                                # Cast is safe because RoomsForUserSlidingSync is part
+                                # of the `RoomsForUserType` union. Why can't it detect this?
+                                cast(
+                                    Dict[str, RoomsForUserType], filtered_sync_room_map
+                                ),
+                                to_token,
                             )
 
                         for range in list_config.ranges:
@@ -1762,63 +1770,6 @@ class SlidingSyncRoomLists:
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
 
-    @trace
-    async def sort_rooms_using_tables(
-        self,
-        sync_room_map: Mapping[str, RoomsForUserSlidingSync],
-        to_token: StreamToken,
-    ) -> List[RoomsForUserSlidingSync]:
-        """
-        Sort by `stream_ordering` of the last event that the user should see in the
-        room. `stream_ordering` is unique so we get a stable sort.
-
-        Args:
-            sync_room_map: Dictionary of room IDs to sort along with membership
-                information in the room at the time of `to_token`.
-            to_token: We sort based on the events in the room at this token (<= `to_token`)
-
-        Returns:
-            A sorted list of room IDs by `stream_ordering` along with membership information.
-        """
-
-        # Assemble a map of room ID to the `stream_ordering` of the last activity that the
-        # user should see in the room (<= `to_token`)
-        last_activity_in_room_map: Dict[str, int] = {}
-
-        for room_id, room_for_user in sync_room_map.items():
-            if room_for_user.membership != Membership.JOIN:
-                # If the user has left/been invited/knocked/been banned from a
-                # room, they shouldn't see anything past that point.
-                #
-                # FIXME: It's possible that people should see beyond this point
-                # in invited/knocked cases if for example the room has
-                # `invite`/`world_readable` history visibility, see
-                # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-                last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
-
-        # For fully-joined rooms, we find the latest activity at/before the
-        # `to_token`.
-        joined_room_positions = (
-            await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
-                [
-                    room_id
-                    for room_id, room_for_user in sync_room_map.items()
-                    if room_for_user.membership == Membership.JOIN
-                ],
-                to_token.room_key,
-            )
-        )
-
-        last_activity_in_room_map.update(joined_room_positions)
-
-        return sorted(
-            sync_room_map.values(),
-            # Sort by the last activity (stream_ordering) in the room
-            key=lambda room_info: last_activity_in_room_map[room_info.room_id],
-            # We want descending order
-            reverse=True,
-        )
-
     @trace
     async def sort_rooms(
         self,
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 743200471b60da7bd86a7b9bd0d65e5e8a6d573c..a8723f94bc869c5dc77108be7d7f6673c0157723 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -52,6 +52,7 @@ from synapse.storage.types import Cursor
 from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
 from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
 from synapse.types.state import StateFilter
+from synapse.types.storage import _BackgroundUpdates
 from synapse.util import json_encoder
 from synapse.util.iterutils import batch_iter
 
@@ -76,34 +77,6 @@ _REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
 )
 
 
-class _BackgroundUpdates:
-    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
-    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
-    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
-    POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
-    INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
-    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
-    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
-    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
-    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
-    REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
-
-    EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
-    EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
-
-    EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
-
-    EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
-
-    SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
-        "sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
-    )
-    SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
-    SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
-        "sliding_sync_membership_snapshots_bg_update"
-    )
-
-
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
     """Return value for _calculate_chain_cover_txn."""
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b188f32927d8640f0cd1a02518c6f0f1a2973148..029f4bd87de093709012a68099ef16bf4840bef9 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -83,6 +83,7 @@ from synapse.storage.util.id_generators import (
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.types.state import StateFilter
+from synapse.types.storage import _BackgroundUpdates
 from synapse.util import unwrapFirstError
 from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
 from synapse.util.caches.descriptors import cached, cachedList
@@ -2465,3 +2466,14 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
         self.invalidate_get_event_cache_after_txn(txn, event_id)
+
+    async def have_finished_sliding_sync_background_jobs(self) -> bool:
+        """Return if it's safe to use the sliding sync membership tables."""
+
+        return await self.db_pool.updates.have_completed_background_updates(
+            (
+                _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
+                _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
+                _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
+            )
+        )
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 1fc2d7ba1e8a165a5435f49d9f057703665ebbd2..8bfa6254f3df7446c2aef629f282fc1ff1631bef 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -51,7 +51,6 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.roommember import (
@@ -1449,17 +1448,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             get_sliding_sync_rooms_for_user_txn,
         )
 
-    async def have_finished_sliding_sync_background_jobs(self) -> bool:
-        """Return if it's safe to use the sliding sync membership tables."""
-
-        return await self.db_pool.updates.have_completed_background_updates(
-            (
-                _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
-                _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
-                _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
-            )
-        )
-
 
 class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 459436e304e7ab17012c6d6858ca17e131edea78..94a7efee7355c18c9ed8639fc04bb24063304472 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1524,7 +1524,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # majority of rooms will have a latest token from before the min stream
         # pos.
 
-        def bulk_get_max_event_pos_txn(
+        def bulk_get_max_event_pos_fallback_txn(
             txn: LoggingTransaction, batched_room_ids: StrCollection
         ) -> Dict[str, int]:
             clause, args = make_in_list_sql_clause(
@@ -1547,11 +1547,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn.execute(sql, [max_pos] + args)
             return {row[0]: row[1] for row in txn}
 
+        # It's easier to look at the `sliding_sync_joined_rooms` table and avoid all of
+        # the joins and sub-queries.
+        def bulk_get_max_event_pos_from_sliding_sync_tables_txn(
+            txn: LoggingTransaction, batched_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batched_room_ids
+            )
+            sql = f"""
+                SELECT room_id, event_stream_ordering
+                FROM sliding_sync_joined_rooms
+                WHERE {clause}
+                ORDER BY event_stream_ordering DESC
+            """
+            txn.execute(sql, args)
+            return {row[0]: row[1] for row in txn}
+
         recheck_rooms: Set[str] = set()
         for batched in batch_iter(room_ids, 1000):
-            batch_results = await self.db_pool.runInteraction(
-                "_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
-            )
+            if await self.have_finished_sliding_sync_background_jobs():
+                batch_results = await self.db_pool.runInteraction(
+                    "bulk_get_max_event_pos_from_sliding_sync_tables_txn",
+                    bulk_get_max_event_pos_from_sliding_sync_tables_txn,
+                    batched,
+                )
+            else:
+                batch_results = await self.db_pool.runInteraction(
+                    "bulk_get_max_event_pos_fallback_txn",
+                    bulk_get_max_event_pos_fallback_txn,
+                    batched,
+                )
             for room_id, stream_ordering in batch_results.items():
                 if stream_ordering <= now_token.stream:
                     results.update(batch_results)
diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..fae5449bcc35de3e74d789f506ad37f99063efc7
--- /dev/null
+++ b/synapse/types/storage/__init__.py
@@ -0,0 +1,47 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+
+
+class _BackgroundUpdates:
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
+    POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
+    INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
+    REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
+
+    EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
+    EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+
+    EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
+
+    EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
+
+    SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
+        "sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
+    )
+    SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
+    SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
+        "sliding_sync_membership_snapshots_bg_update"
+    )
diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py
index 61dccc8077dff57c41b3e54867a016234dd652ad..35917505a429b562703d86f62f986aba9f41214d 100644
--- a/tests/storage/test_sliding_sync_tables.py
+++ b/tests/storage/test_sliding_sync_tables.py
@@ -34,11 +34,11 @@ from synapse.rest.client import login, room
 from synapse.server import HomeServer
 from synapse.storage.databases.main.events import DeltaState
 from synapse.storage.databases.main.events_bg_updates import (
-    _BackgroundUpdates,
     _resolve_stale_data_in_sliding_sync_joined_rooms_table,
     _resolve_stale_data_in_sliding_sync_membership_snapshots_table,
 )
 from synapse.types import create_requester
+from synapse.types.storage import _BackgroundUpdates
 from synapse.util import Clock
 
 from tests.test_utils.event_injection import create_event