Skip to content
Snippets Groups Projects
Commit 11ea1677 authored by Quentin Dufour's avatar Quentin Dufour Committed by Richard van der Hoff
Browse files

Limit the number of EDUs in transactions to 100 as expected by receiver (#5138)

Fixes #3951.
parent d216a36b
No related branches found
No related tags found
No related merge requests found
Limit the number of EDUs in transactions to 100 as expected by synapse. Thanks to @superboum for this work!
...@@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process ...@@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import UserPresenceState from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -197,7 +200,8 @@ class PerDestinationQueue(object): ...@@ -197,7 +200,8 @@ class PerDestinationQueue(object):
pending_pdus = [] pending_pdus = []
while True: while True:
device_message_edus, device_stream_id, dev_list_id = ( device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages() # We have to keep 2 free slots for presence and rr_edus
yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
) )
# BEGIN CRITICAL SECTION # BEGIN CRITICAL SECTION
...@@ -216,19 +220,9 @@ class PerDestinationQueue(object): ...@@ -216,19 +220,9 @@ class PerDestinationQueue(object):
pending_edus = [] pending_edus = []
pending_edus.extend(self._get_rr_edus(force_flush=False))
# We can only include at most 100 EDUs per transactions # We can only include at most 100 EDUs per transactions
pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus))) # rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_edus.extend(
self._pending_edus_keyed.values()
)
self._pending_edus_keyed = {}
pending_edus.extend(device_message_edus)
pending_presence = self._pending_presence pending_presence = self._pending_presence
self._pending_presence = {} self._pending_presence = {}
if pending_presence: if pending_presence:
...@@ -248,6 +242,12 @@ class PerDestinationQueue(object): ...@@ -248,6 +242,12 @@ class PerDestinationQueue(object):
) )
) )
pending_edus.extend(device_message_edus)
pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)))
while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed:
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
if pending_pdus: if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination, len(pending_pdus)) self._destination, len(pending_pdus))
...@@ -259,7 +259,7 @@ class PerDestinationQueue(object): ...@@ -259,7 +259,7 @@ class PerDestinationQueue(object):
# if we've decided to send a transaction anyway, and we have room, we # if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs # may as well send any pending RRs
if len(pending_edus) < 100: if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True)) pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION # END CRITICAL SECTION
...@@ -346,33 +346,37 @@ class PerDestinationQueue(object): ...@@ -346,33 +346,37 @@ class PerDestinationQueue(object):
return pending_edus return pending_edus
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_new_device_messages(self): def _get_new_device_messages(self, limit):
last_device_stream_id = self._last_device_stream_id last_device_list = self._last_device_list_stream_id
to_device_stream_id = self._store.get_to_device_stream_token() # Will return at most 20 entries
contents, stream_id = yield self._store.get_new_device_msgs_for_remote( now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_stream_id, to_device_stream_id self._destination, last_device_list
) )
edus = [ edus = [
Edu( Edu(
origin=self._server_name, origin=self._server_name,
destination=self._destination, destination=self._destination,
edu_type="m.direct_to_device", edu_type="m.device_list_update",
content=content, content=content,
) )
for content in contents for content in results
] ]
last_device_list = self._last_device_list_stream_id assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus)
) )
edus.extend( edus.extend(
Edu( Edu(
origin=self._server_name, origin=self._server_name,
destination=self._destination, destination=self._destination,
edu_type="m.device_list_update", edu_type="m.direct_to_device",
content=content, content=content,
) )
for content in results for content in contents
) )
defer.returnValue((edus, stream_id, now_stream_id)) defer.returnValue((edus, stream_id, now_stream_id))
...@@ -118,7 +118,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ...@@ -118,7 +118,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
defer.returnValue(count) defer.returnValue(count)
def get_new_device_msgs_for_remote( def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit=100 self, destination, last_stream_id, current_stream_id, limit
): ):
""" """
Args: Args:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment