Skip to content
Snippets Groups Projects
Unverified Commit 2aba1f54 authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Merge pull request #3533 from matrix-org/rav/fix_federation_ratelimite_queue

Make FederationRateLimiter queue requests properly
parents ba22b6a4 08546be4
No related branches found
No related tags found
No related merge requests found
Fix queued federation requests being processed in the wrong order
...@@ -92,13 +92,22 @@ class _PerHostRatelimiter(object): ...@@ -92,13 +92,22 @@ class _PerHostRatelimiter(object):
self.window_size = window_size self.window_size = window_size
self.sleep_limit = sleep_limit self.sleep_limit = sleep_limit
self.sleep_msec = sleep_msec self.sleep_sec = sleep_msec / 1000.0
self.reject_limit = reject_limit self.reject_limit = reject_limit
self.concurrent_requests = concurrent_requests self.concurrent_requests = concurrent_requests
# request_id objects for requests which have been slept
self.sleeping_requests = set() self.sleeping_requests = set()
# map from request_id object to Deferred for requests which are ready
# for processing but have been queued
self.ready_request_queue = collections.OrderedDict() self.ready_request_queue = collections.OrderedDict()
# request id objects for requests which are in progress
self.current_processing = set() self.current_processing = set()
# times at which we have recently (within the last window_size ms)
# received requests.
self.request_times = [] self.request_times = []
@contextlib.contextmanager @contextlib.contextmanager
...@@ -117,11 +126,15 @@ class _PerHostRatelimiter(object): ...@@ -117,11 +126,15 @@ class _PerHostRatelimiter(object):
def _on_enter(self, request_id): def _on_enter(self, request_id):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
# remove any entries from request_times which aren't within the window
self.request_times[:] = [ self.request_times[:] = [
r for r in self.request_times r for r in self.request_times
if time_now - r < self.window_size if time_now - r < self.window_size
] ]
# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit: if queue_size > self.reject_limit:
raise LimitExceededError( raise LimitExceededError(
...@@ -134,9 +147,13 @@ class _PerHostRatelimiter(object): ...@@ -134,9 +147,13 @@ class _PerHostRatelimiter(object):
def queue_request(): def queue_request():
if len(self.current_processing) > self.concurrent_requests: if len(self.current_processing) > self.concurrent_requests:
logger.debug("Ratelimit [%s]: Queue req", id(request_id))
queue_defer = defer.Deferred() queue_defer = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer self.ready_request_queue[request_id] = queue_defer
logger.info(
"Ratelimiter: queueing request (queue now %i items)",
len(self.ready_request_queue),
)
return queue_defer return queue_defer
else: else:
return defer.succeed(None) return defer.succeed(None)
...@@ -148,10 +165,9 @@ class _PerHostRatelimiter(object): ...@@ -148,10 +165,9 @@ class _PerHostRatelimiter(object):
if len(self.request_times) > self.sleep_limit: if len(self.request_times) > self.sleep_limit:
logger.debug( logger.debug(
"Ratelimit [%s]: sleeping req", "Ratelimiter: sleeping request for %f sec", self.sleep_sec,
id(request_id),
) )
ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0) ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
self.sleeping_requests.add(request_id) self.sleeping_requests.add(request_id)
...@@ -200,11 +216,8 @@ class _PerHostRatelimiter(object): ...@@ -200,11 +216,8 @@ class _PerHostRatelimiter(object):
) )
self.current_processing.discard(request_id) self.current_processing.discard(request_id)
try: try:
request_id, deferred = self.ready_request_queue.popitem() # start processing the next item on the queue.
_, deferred = self.ready_request_queue.popitem(last=False)
# XXX: why do we do the following? the on_start callback above will
# do it for us.
self.current_processing.add(request_id)
with PreserveLoggingContext(): with PreserveLoggingContext():
deferred.callback(None) deferred.callback(None)
......
...@@ -65,6 +65,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None ...@@ -65,6 +65,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
config.federation_domain_whitelist = None config.federation_domain_whitelist = None
config.federation_rc_reject_limit = 10 config.federation_rc_reject_limit = 10
config.federation_rc_sleep_limit = 10 config.federation_rc_sleep_limit = 10
config.federation_rc_sleep_delay = 100
config.federation_rc_concurrent = 10 config.federation_rc_concurrent = 10
config.filter_timeline_limit = 5000 config.filter_timeline_limit = 5000
config.user_directory_search_all_users = False config.user_directory_search_all_users = False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment