Skip to content
Snippets Groups Projects
Commit 12159156 authored by Erik Johnston's avatar Erik Johnston
Browse files

Send events to ASes concurrently

parent ce72d590
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging
......@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events:
break
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
continue # no services need notifying
return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
......@@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
service, event
)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
events_processed_counter.inc_by(len(events))
yield self.store.set_appservice_last_pos(upper_bound)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment