Skip to content
Snippets Groups Projects
Commit fd2eef49 authored by Erik Johnston's avatar Erik Johnston
Browse files

Reduce spurious calls to generate sync

parent 6a3c5d68
No related branches found
No related tags found
No related merge requests found
...@@ -73,6 +73,13 @@ class _NotifierUserStream(object): ...@@ -73,6 +73,13 @@ class _NotifierUserStream(object):
self.user_id = user_id self.user_id = user_id
self.rooms = set(rooms) self.rooms = set(rooms)
self.current_token = current_token self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated everytime we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.last_notified_ms = time_now_ms self.last_notified_ms = time_now_ms
with PreserveLoggingContext(): with PreserveLoggingContext():
...@@ -89,6 +96,7 @@ class _NotifierUserStream(object): ...@@ -89,6 +96,7 @@ class _NotifierUserStream(object):
self.current_token = self.current_token.copy_and_advance( self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id stream_key, stream_id
) )
self.last_notified_token = self.current_token
self.last_notified_ms = time_now_ms self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred noify_deferred = self.notify_deferred
...@@ -113,8 +121,14 @@ class _NotifierUserStream(object): ...@@ -113,8 +121,14 @@ class _NotifierUserStream(object):
def new_listener(self, token): def new_listener(self, token):
"""Returns a deferred that is resolved when there is a new token """Returns a deferred that is resolved when there is a new token
greater than the given token. greater than the given token.
Args:
token: The token from which we are streaming from, i.e. we shouldn't
notify for things that happened before this.
""" """
if self.current_token.is_after(token): # Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token.is_after(token):
return _NotificationListener(defer.succeed(self.current_token)) return _NotificationListener(defer.succeed(self.current_token))
else: else:
return _NotificationListener(self.notify_deferred.observe()) return _NotificationListener(self.notify_deferred.observe())
...@@ -294,40 +308,44 @@ class Notifier(object): ...@@ -294,40 +308,44 @@ class Notifier(object):
self._register_with_keys(user_stream) self._register_with_keys(user_stream)
result = None result = None
prev_token = from_token
if timeout: if timeout:
end_time = self.clock.time_msec() + timeout end_time = self.clock.time_msec() + timeout
prev_token = from_token
while not result: while not result:
try: try:
current_token = user_stream.current_token
result = yield callback(prev_token, current_token)
if result:
break
now = self.clock.time_msec() now = self.clock.time_msec()
if end_time <= now: if end_time <= now:
break break
# Now we wait for the _NotifierUserStream to be told there # Now we wait for the _NotifierUserStream to be told there
# is a new token. # is a new token.
# We need to supply the token we supplied to callback so
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token) listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext(): with PreserveLoggingContext():
yield self.clock.time_bound_deferred( yield self.clock.time_bound_deferred(
listener.deferred, listener.deferred,
time_out=(end_time - now) / 1000. time_out=(end_time - now) / 1000.
) )
current_token = user_stream.current_token
result = yield callback(prev_token, current_token)
if result:
break
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimedOutError: except DeferredTimedOutError:
break break
except defer.CancelledError: except defer.CancelledError:
break break
else:
if result is None:
# This happened if there was no timeout or if the timeout had
# already expired.
current_token = user_stream.current_token current_token = user_stream.current_token
result = yield callback(from_token, current_token) result = yield callback(prev_token, current_token)
defer.returnValue(result) defer.returnValue(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment