Skip to content
Snippets Groups Projects
Unverified Commit 38919b52 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Run replication streamers on workers (#7146)

Currently we never write to streams from workers, but that will change soon
parent 07337fe3
No related branches found
No related tags found
No related merge requests found
Run replication streamers on workers.
......@@ -960,17 +960,22 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
ss = GenericWorkerServer(
hs = GenericWorkerServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)
setup_logging(ss, config, use_worker_options=True)
setup_logging(hs, config, use_worker_options=True)
hs.setup()
# Ensure the replication streamer is always started in case we write to any
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
"before", "startup", _base.start, hs, config.worker_listeners
)
_base.start_worker_reactor("synapse-generic-worker", config)
......
......@@ -17,9 +17,7 @@
import logging
import random
from typing import Dict
from six import itervalues
from typing import Dict, List
from prometheus_client import Counter
......@@ -71,29 +69,28 @@ class ReplicationStreamer(object):
def __init__(self, hs):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()
self._replication_torture_level = hs.config.replication_torture_level
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
# disabled on the master.
self.streams = [
stream(hs)
for stream in itervalues(STREAMS_MAP)
if stream != FederationStream or not hs.config.send_federation
]
# Work out list of streams that this instance is the source of.
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending
# hase been disabled on the master.
continue
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.streams.append(stream(hs))
self.federation_sender = None
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)
# Only bother registering the notifier callback if we have streams to
# publish.
if self.streams:
self.notifier.add_replication_callback(self.on_notifier_poke)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment