Skip to content
Snippets Groups Projects
Commit fdcad8ea authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

Move client receipt processing to federation sender worker.

This is mostly a prerequisite for #4730, but also fits with the general theme
of "move everything off the master that we possibly can".
parent eed7271b
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
......@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
......@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
......@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows,
)
@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
)
yield self.federation_sender.send_read_receipt(receipt_info)
@defer.inlineCallbacks
def update_token(self, token):
try:
......
......@@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_read_receipt(self, receipt):
"""As per TransactionQueue
Args:
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
pass
def send_presence(self, states):
"""As per TransactionQueue
......
......@@ -290,6 +290,41 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
@defer.inlineCallbacks
def send_read_receipt(self, receipt):
"""Send a RR to any other servers in the room
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
"""
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
domains = [d for d in domains if d != self.server_name]
if not domains:
return
logger.debug("Sending receipt to: %r", domains)
content = {
receipt.room_id: {
receipt.receipt_type: {
receipt.user_id: {
"event_ids": receipt.event_ids,
"data": receipt.data,
},
},
},
}
key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
for domain in domains:
self.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content=content,
key=key,
)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
......
......@@ -16,9 +16,8 @@ import logging
from twisted.internet import defer
from synapse.types import ReadReceipt, get_domain_from_id
from ._base import BaseHandler
from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
......@@ -87,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
affected_room_ids = list(set([r["room_id"] for r in receipts]))
affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
......@@ -119,35 +118,7 @@ class ReceiptsHandler(BaseHandler):
if not is_new:
return
# Work out which remote servers should be poked and poke them.
# TODO: optimise this to move some of the work to the workers.
data = receipt.data
# XXX why does this not use state.get_current_hosts_in_room() ?
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": [event_id],
"data": data,
}
}
},
},
key=(room_id, receipt_type, user_id),
)
self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment