Skip to content
Snippets Groups Projects
Commit f1b83d88 authored by Mark Haines's avatar Mark Haines
Browse files

Discard unused NotifierUserStreams

parent 9af43225
No related branches found
No related tags found
No related merge requests found
...@@ -71,14 +71,17 @@ class _NotifierUserStream(object): ...@@ -71,14 +71,17 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class. so that it can remove itself from the indexes in the Notifier class.
""" """
def __init__(self, user, rooms, current_token, appservice=None): def __init__(self, user, rooms, current_token, time_now_ms,
appservice=None):
self.user = str(user) self.user = str(user)
self.appservice = appservice self.appservice = appservice
self.listeners = set() self.listeners = set()
self.rooms = set(rooms) self.rooms = set(rooms)
self.current_token = current_token self.current_token = current_token
self.last_notified_ms = time_now_ms
def notify(self, stream_key, stream_id): def notify(self, stream_key, stream_id, time_now_ms):
self.last_notified_ms = time_now_ms
self.current_token = self.current_token.copy_and_replace( self.current_token = self.current_token.copy_and_replace(
stream_key, stream_id stream_key, stream_id
) )
...@@ -96,7 +99,7 @@ class _NotifierUserStream(object): ...@@ -96,7 +99,7 @@ class _NotifierUserStream(object):
lst = notifier.room_to_user_streams.get(room, set()) lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self) lst.discard(self)
notifier.user_to_user_streams.get(self.user, set()).discard(self) notifier.user_to_user_stream.pop(self.user)
if self.appservice: if self.appservice:
notifier.appservice_to_user_streams.get( notifier.appservice_to_user_streams.get(
...@@ -111,6 +114,8 @@ class Notifier(object): ...@@ -111,6 +114,8 @@ class Notifier(object):
Primarily used from the /events stream. Primarily used from the /events stream.
""" """
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
...@@ -128,6 +133,10 @@ class Notifier(object): ...@@ -128,6 +133,10 @@ class Notifier(object):
"user_joined_room", self._user_joined_room "user_joined_room", self._user_joined_room
) )
self.clock.looping_call(
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)
# This is not a very cheap test to perform, but it's only executed # This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at # when rendering the metrics page, which is likely once per minute at
# most when scraping it. # most when scraping it.
...@@ -221,9 +230,12 @@ class Notifier(object): ...@@ -221,9 +230,12 @@ class Notifier(object):
logger.debug("on_new_room_event listeners %s", user_streams) logger.debug("on_new_room_event listeners %s", user_streams)
time_now_ms = self.clock.time_msec()
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify("room_key", "s%d" % (room_stream_id,)) user_stream.notify(
"room_key", "s%d" % (room_stream_id,), time_now_ms
)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
...@@ -246,9 +258,10 @@ class Notifier(object): ...@@ -246,9 +258,10 @@ class Notifier(object):
for room in rooms: for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set()) user_streams |= self.room_to_user_streams.get(room, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify(stream_key, new_token) user_stream.notify(stream_key, new_token, time_now_ms)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
...@@ -260,6 +273,7 @@ class Notifier(object): ...@@ -260,6 +273,7 @@ class Notifier(object):
""" """
deferred = defer.Deferred() deferred = defer.Deferred()
time_now_ms = self.clock.time_msec()
user = str(user) user = str(user)
user_stream = self.user_to_user_stream.get(user) user_stream = self.user_to_user_stream.get(user)
...@@ -272,6 +286,7 @@ class Notifier(object): ...@@ -272,6 +286,7 @@ class Notifier(object):
rooms=rooms, rooms=rooms,
appservice=appservice, appservice=appservice,
current_token=current_token, current_token=current_token,
time_now_ms=time_now_ms,
) )
self._register_with_keys(user_stream) self._register_with_keys(user_stream)
else: else:
...@@ -365,6 +380,20 @@ class Notifier(object): ...@@ -365,6 +380,20 @@ class Notifier(object):
defer.returnValue(result) defer.returnValue(result)
@log_function
def remove_expired_streams(self):
time_now_ms = self.clock.time_msec()
expired_streams = []
expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
for stream in self.user_to_user_stream.values():
if stream.listeners:
continue
if stream.last_notified_ms < expire_before_ts:
expired_streams.append(stream)
for expired_stream in expired_streams:
expired_stream.remove(self)
@log_function @log_function
def _register_with_keys(self, user_stream): def _register_with_keys(self, user_stream):
self.user_to_user_stream[user_stream.user] = user_stream self.user_to_user_stream[user_stream.user] = user_stream
...@@ -385,14 +414,3 @@ class Notifier(object): ...@@ -385,14 +414,3 @@ class Notifier(object):
room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream) room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id) new_user_stream.rooms.add(room_id)
def _discard_if_notified(listener_set):
"""Remove any 'stale' listeners from the given set.
"""
to_discard = set()
for l in listener_set:
if l.notified():
to_discard.add(l)
listener_set -= to_discard
...@@ -271,6 +271,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): ...@@ -271,6 +271,7 @@ class PresenceEventStreamTestCase(unittest.TestCase):
"call_later", "call_later",
"cancel_call_later", "cancel_call_later",
"time_msec", "time_msec",
"looping_call",
]), ]),
) )
......
...@@ -197,6 +197,9 @@ class MockClock(object): ...@@ -197,6 +197,9 @@ class MockClock(object):
return t return t
def looping_call(self, function, interval):
pass
def cancel_call_later(self, timer): def cancel_call_later(self, timer):
if timer[2]: if timer[2]:
raise Exception("Cannot cancel an expired timer") raise Exception("Cannot cancel an expired timer")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment