Skip to content
Snippets Groups Projects
Unverified Commit 903f04c2 authored by Amber Brown's avatar Amber Brown Committed by GitHub
Browse files

Use the state event amount for userdir import batching, not room count (#4944)

parent 4a125be1
No related branches found
No related tags found
No related merge requests found
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
...@@ -135,7 +135,12 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): ...@@ -135,7 +135,12 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_user_directory_process_rooms(self, progress, batch_size): def _populate_user_directory_process_rooms(self, progress, batch_size):
"""
Args:
progress (dict)
batch_size (int): Maximum number of state events to process
per cycle.
"""
state = self.hs.get_state_handler() state = self.hs.get_state_handler()
# If we don't have progress filed, delete everything. # If we don't have progress filed, delete everything.
...@@ -143,13 +148,14 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): ...@@ -143,13 +148,14 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
yield self.delete_all_from_user_dir() yield self.delete_all_from_user_dir()
def _get_next_batch(txn): def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """ sql = """
SELECT room_id FROM %s SELECT room_id, events FROM %s
ORDER BY events DESC ORDER BY events DESC
LIMIT %s LIMIT 250
""" % ( """ % (
TEMP_TABLE + "_rooms", TEMP_TABLE + "_rooms",
str(batch_size),
) )
txn.execute(sql) txn.execute(sql)
rooms_to_work_on = txn.fetchall() rooms_to_work_on = txn.fetchall()
...@@ -157,8 +163,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): ...@@ -157,8 +163,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
if not rooms_to_work_on: if not rooms_to_work_on:
return None return None
rooms_to_work_on = [x[0] for x in rooms_to_work_on]
# Get how many are left to process, so we can give status on how # Get how many are left to process, so we can give status on how
# far we are in processing # far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
...@@ -180,7 +184,9 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): ...@@ -180,7 +184,9 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
% (len(rooms_to_work_on), progress["remaining"]) % (len(rooms_to_work_on), progress["remaining"])
) )
for room_id in rooms_to_work_on: processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
is_in_room = yield self.is_host_joined(room_id, self.server_name) is_in_room = yield self.is_host_joined(room_id, self.server_name)
if is_in_room: if is_in_room:
...@@ -247,7 +253,13 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): ...@@ -247,7 +253,13 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
progress, progress,
) )
defer.returnValue(len(rooms_to_work_on)) processed_event_count += event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
defer.returnValue(processed_event_count)
defer.returnValue(processed_event_count)
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_user_directory_process_users(self, progress, batch_size): def _populate_user_directory_process_users(self, progress, batch_size):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment