Skip to content
Snippets Groups Projects
Unverified Commit b1433bf2 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Don't table scan events on worker startup (#8419)


* Fix table scan of events on worker startup.

This happened because we assumed "new" writers had an initial stream
position of 0, so the replication code tried to fetch all events written
by the instance between 0 and the current position.

Instead, set the initial position of new writers to the current
persisted up to position, on the assumption that new writers won't have
written anything before that point.

* Consider old writers coming back as "new".

Otherwise we'd try and fetch entries between the old stale token and the
current position, even though it won't have written any rows.

Co-authored-by: default avatarAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>

Co-authored-by: default avatarAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>
parent 2649d545
No related branches found
No related tags found
No related merge requests found
Add experimental support for sharding event persister.
...@@ -273,6 +273,19 @@ class MultiWriterIdGenerator: ...@@ -273,6 +273,19 @@ class MultiWriterIdGenerator:
# Load the current positions of all writers for the stream. # Load the current positions of all writers for the stream.
if self._writers: if self._writers:
# We delete any stale entries in the positions table. This is
# important if we add back a writer after a long time; we want to
# consider that a "new" writer, rather than using the old stale
# entry here.
sql = """
DELETE FROM stream_positions
WHERE
stream_name = ?
AND instance_name != ALL(?)
"""
sql = self._db.engine.convert_param_style(sql)
cur.execute(sql, (self._stream_name, self._writers))
sql = """ sql = """
SELECT instance_name, stream_id FROM stream_positions SELECT instance_name, stream_id FROM stream_positions
WHERE stream_name = ? WHERE stream_name = ?
...@@ -453,11 +466,22 @@ class MultiWriterIdGenerator: ...@@ -453,11 +466,22 @@ class MultiWriterIdGenerator:
"""Returns the position of the given writer. """Returns the position of the given writer.
""" """
# If we don't have an entry for the given instance name, we assume it's a
# new writer.
#
# For new writers we assume their initial position to be the current
# persisted up to position. This stops Synapse from doing a full table
# scan when a new writer announces itself over replication.
with self._lock: with self._lock:
return self._return_factor * self._current_positions.get(instance_name, 0) return self._return_factor * self._current_positions.get(
instance_name, self._persisted_upto_position
)
def get_positions(self) -> Dict[str, int]: def get_positions(self) -> Dict[str, int]:
"""Get a copy of the current positon map. """Get a copy of the current positon map.
Note that this won't necessarily include all configured writers if some
writers haven't written anything yet.
""" """
with self._lock: with self._lock:
......
...@@ -390,17 +390,28 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): ...@@ -390,17 +390,28 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
# Initial config has two writers # Initial config has two writers
id_gen = self._create_id_generator("first", writers=["first", "second"]) id_gen = self._create_id_generator("first", writers=["first", "second"])
self.assertEqual(id_gen.get_persisted_upto_position(), 3) self.assertEqual(id_gen.get_persisted_upto_position(), 3)
self.assertEqual(id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(id_gen.get_current_token_for_writer("second"), 5)
# New config removes one of the configs. Note that if the writer is # New config removes one of the configs. Note that if the writer is
# removed from config we assume that it has been shut down and has # removed from config we assume that it has been shut down and has
# finished persisting, hence why the persisted upto position is 5. # finished persisting, hence why the persisted upto position is 5.
id_gen_2 = self._create_id_generator("second", writers=["second"]) id_gen_2 = self._create_id_generator("second", writers=["second"])
self.assertEqual(id_gen_2.get_persisted_upto_position(), 5) self.assertEqual(id_gen_2.get_persisted_upto_position(), 5)
self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5)
# This config points to a single, previously unused writer. # This config points to a single, previously unused writer.
id_gen_3 = self._create_id_generator("third", writers=["third"]) id_gen_3 = self._create_id_generator("third", writers=["third"])
self.assertEqual(id_gen_3.get_persisted_upto_position(), 5) self.assertEqual(id_gen_3.get_persisted_upto_position(), 5)
# For new writers we assume their initial position to be the current
# persisted up to position. This stops Synapse from doing a full table
# scan when a new writer comes along.
self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5)
id_gen_4 = self._create_id_generator("fourth", writers=["third"])
self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5)
# Check that we get a sane next stream ID with this new config. # Check that we get a sane next stream ID with this new config.
async def _get_next_async(): async def _get_next_async():
...@@ -410,6 +421,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): ...@@ -410,6 +421,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
self.get_success(_get_next_async()) self.get_success(_get_next_async())
self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6)
# If we add back the old "first" then we shouldn't see the persisted up
# to position revert back to 3.
id_gen_5 = self._create_id_generator("five", writers=["first", "third"])
self.assertEqual(id_gen_5.get_persisted_upto_position(), 6)
self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6)
self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6)
def test_sequence_consistency(self): def test_sequence_consistency(self):
"""Test that we error out if the table and sequence diverges. """Test that we error out if the table and sequence diverges.
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment