Skip to content
Snippets Groups Projects
Unverified Commit 4734a7bb authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Move EventStream handling into default ReplicationDataHandler (#7493)

This is so that the logic can happen on both master and workers when we move event persistence out.
parent 1de36407
No related branches found
No related tags found
No related merge requests found
Move EventStream handling into default ReplicationDataHandler.
......@@ -26,7 +26,6 @@ from twisted.web.resource import NoResource
import synapse
import synapse.events
from synapse.api.constants import EventTypes
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
......@@ -81,11 +80,6 @@ from synapse.replication.tcp.streams import (
ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
......@@ -633,7 +627,7 @@ class GenericWorkerServer(HomeServer):
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
super(GenericWorkerReplicationHandler, self).__init__(hs)
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
......@@ -659,30 +653,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
stream_name, token, rows
)
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(
event, token, max_token, extra_users
)
await self.pusher_pool.on_new_notifications(token, token)
elif stream_name == PushRulesStream.NAME:
if stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
......
......@@ -16,12 +16,17 @@
"""
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Tuple
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.api.constants import EventTypes
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
......@@ -83,8 +88,10 @@ class ReplicationDataHandler:
to handle updates in additional ways.
"""
def __init__(self, store: BaseSlavedStore):
self.store = store
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.pusher_pool = hs.get_pusherpool()
self.notifier = hs.get_notifier()
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
......@@ -102,6 +109,28 @@ class ReplicationDataHandler:
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = () # type: Tuple[str, ...]
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)
await self.pusher_pool.on_new_notifications(token, token)
async def on_position(self, stream_name: str, instance_name: str, token: int):
self.store.process_replication_rows(stream_name, instance_name, token, [])
......
......@@ -581,7 +581,7 @@ class HomeServer(object):
return ReplicationStreamer(self)
def build_replication_data_handler(self):
return ReplicationDataHandler(self.get_datastore())
return ReplicationDataHandler(self)
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
......
......@@ -19,6 +19,7 @@ import synapse.handlers.room_member
import synapse.handlers.set_password
import synapse.http.client
import synapse.notifier
import synapse.push.pusherpool
import synapse.replication.tcp.client
import synapse.replication.tcp.handler
import synapse.rest.media.v1.media_repository
......@@ -133,3 +134,5 @@ class HomeServer(object):
pass
def get_macaroon_generator(self) -> synapse.handlers.auth.MacaroonGenerator:
pass
def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool:
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment