Skip to content
Snippets Groups Projects
Commit b2fd03d0 authored by H. Shay's avatar H. Shay
Browse files

Merge branch 'master' into develop

parents 69553052 8c3fa748
No related branches found
No related tags found
No related merge requests found
Synapse 1.78.0 (2023-02-28)
===========================
Bugfixes
--------
- Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers. ([\#15150](https://github.com/matrix-org/synapse/issues/15150))
Synapse 1.78.0rc1 (2023-02-21) Synapse 1.78.0rc1 (2023-02-21)
============================== ==============================
......
matrix-synapse-py3 (1.78.0) stable; urgency=medium
* New Synapse release 1.78.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 28 Feb 2023 08:56:03 -0800
matrix-synapse-py3 (1.78.0~rc1) stable; urgency=medium matrix-synapse-py3 (1.78.0~rc1) stable; urgency=medium
* Add `matrix-org-archive-keyring` package as recommended. * Add `matrix-org-archive-keyring` package as recommended.
......
...@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml" ...@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.78.0rc1" version = "1.78.0"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0" license = "Apache-2.0"
......
...@@ -238,6 +238,24 @@ class ReplicationStreamer: ...@@ -238,6 +238,24 @@ class ReplicationStreamer:
except Exception: except Exception:
logger.exception("Failed to replicate") logger.exception("Failed to replicate")
# The last token we send may not match the current
# token, in which case we want to send out a `POSITION`
# to tell other workers the actual current position.
if updates[-1][0] < current_token:
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
updates[-1][0],
current_token,
)
)
logger.debug("No more pending updates, breaking poke loop") logger.debug("No more pending updates, breaking poke loop")
finally: finally:
self.pending_updates = False self.pending_updates = False
......
...@@ -141,3 +141,64 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase): ...@@ -141,3 +141,64 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
self.get_success(ctx_worker1.__aexit__(None, None, None)) self.get_success(ctx_worker1.__aexit__(None, None, None))
self.assertTrue(d.called) self.assertTrue(d.called)
def test_wait_for_stream_position_rdata(self) -> None:
"""Check that wait for stream position correctly waits for an update
from the correct instance, when RDATA is sent.
"""
store = self.hs.get_datastores().main
cmd_handler = self.hs.get_replication_command_handler()
data_handler = self.hs.get_replication_data_handler()
worker1 = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"worker_name": "worker1",
"run_background_tasks_on": "worker1",
"redis": {"enabled": True},
},
)
cache_id_gen = worker1.get_datastores().main._cache_id_gen
assert cache_id_gen is not None
self.replicate()
# First, make sure the master knows that `worker1` exists.
initial_token = cache_id_gen.get_current_token()
cmd_handler.send_command(
PositionCommand("caches", "worker1", initial_token, initial_token)
)
self.replicate()
# `wait_for_stream_position` should only return once master receives a
# notification that `next_token2` has persisted.
ctx_worker1 = cache_id_gen.get_next_mult(2)
next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())
d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token2)
)
self.assertFalse(d.called)
# Insert an entry into the cache stream with token `next_token1`, but
# not `next_token2`.
self.get_success(
store.db_pool.simple_insert(
table="cache_invalidation_stream_by_instance",
values={
"stream_id": next_token1,
"instance_name": "worker1",
"cache_func": "foo",
"keys": [],
"invalidation_ts": 0,
},
)
)
# Finish the context manager, triggering the data to be sent to master.
self.get_success(ctx_worker1.__aexit__(None, None, None))
# Master should get told about `next_token2`, so the deferred should
# resolve.
self.assertTrue(d.called)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment