Skip to content
Snippets Groups Projects
Commit bd1236c0 authored by Erik Johnston's avatar Erik Johnston
Browse files

Consolidate duplicate code in notifier

parent ddf79795
No related branches found
No related tags found
No related merge requests found
......@@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler):
room_ids([str]): List of room_ids to notify.
"""
with PreserveLoggingContext():
self.notifier.on_new_user_event(
self.notifier.on_new_event(
"presence_key",
self._user_cachemap_latest_serial,
users_to_push,
......
......@@ -88,7 +88,7 @@ class ReceiptsHandler(BaseHandler):
self._latest_serial = max(self._latest_serial, stream_id)
with PreserveLoggingContext():
self.notifier.on_new_user_event(
self.notifier.on_new_event(
"recei[t_key", self._latest_serial, rooms=[room_id]
)
......@@ -102,7 +102,7 @@ class ReceiptsHandler(BaseHandler):
receipt["remotedomains"] = remotedomains
self.notifier.on_new_user_event(
self.notifier.on_new_event(
"receipt_key", self._latest_room_serial, rooms=[room_id]
)
......
......@@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler):
self._room_serials[room_id] = self._latest_room_serial
with PreserveLoggingContext():
self.notifier.on_new_user_event(
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[room_id]
)
......
......@@ -221,16 +221,7 @@ class Notifier(object):
event
)
room_id = event.room_id
room_user_streams = self.room_to_user_streams.get(room_id, set())
user_streams = room_user_streams.copy()
for user in extra_users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
user_streams.add(user_stream)
app_streams = set()
for appservice in self.appservice_to_user_streams:
# TODO (kegan): Redundant appservice listener checks?
......@@ -242,24 +233,20 @@ class Notifier(object):
app_user_streams = self.appservice_to_user_streams.get(
appservice, set()
)
user_streams |= app_user_streams
logger.debug("on_new_room_event listeners %s", user_streams)
app_streams |= app_user_streams
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
"room_key", "s%d" % (room_stream_id,), time_now_ms
)
except:
logger.exception("Failed to notify listener")
self.on_new_event(
"room_key", room_stream_id,
users=extra_users,
rooms=[event.room_id],
extra_streams=app_streams,
)
@defer.inlineCallbacks
@log_function
def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend
presence/user event wise.
def on_new_event(self, stream_key, new_token, users=[], rooms=[],
extra_streams=set()):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
......
......@@ -66,8 +66,8 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.mock_federation_resource = MockHttpResource()
mock_notifier = Mock(spec=["on_new_user_event"])
self.on_new_user_event = mock_notifier.on_new_user_event
mock_notifier = Mock(spec=["on_new_event"])
self.on_new_event = mock_notifier.on_new_event
self.auth = Mock(spec=[])
......@@ -182,7 +182,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
timeout=20000,
)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 1, rooms=[self.room_id]),
])
......@@ -245,7 +245,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
)
)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 1, rooms=[self.room_id]),
])
......@@ -299,7 +299,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
room_id=self.room_id,
)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 1, rooms=[self.room_id]),
])
......@@ -331,10 +331,10 @@ class TypingNotificationsTestCase(unittest.TestCase):
timeout=10000,
)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 1, rooms=[self.room_id]),
])
self.on_new_user_event.reset_mock()
self.on_new_event.reset_mock()
self.assertEquals(self.event_source.get_current_key(), 1)
events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None)
......@@ -351,7 +351,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.clock.advance_time(11)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 2, rooms=[self.room_id]),
])
......@@ -377,10 +377,10 @@ class TypingNotificationsTestCase(unittest.TestCase):
timeout=10000,
)
self.on_new_user_event.assert_has_calls([
self.on_new_event.assert_has_calls([
call('typing_key', 3, rooms=[self.room_id]),
])
self.on_new_user_event.reset_mock()
self.on_new_event.reset_mock()
self.assertEquals(self.event_source.get_current_key(), 3)
events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment