Skip to content
Snippets Groups Projects
Commit 31a2b892 authored by Erik Johnston's avatar Erik Johnston
Browse files

Revert to putting it around the entire block

parent 9daa4e2a
No related branches found
No related tags found
No related merge requests found
...@@ -128,7 +128,8 @@ class Pusher(object): ...@@ -128,7 +128,8 @@ class Pusher(object):
try: try:
if wait > 0: if wait > 0:
yield synapse.util.async.sleep(wait) yield synapse.util.async.sleep(wait)
yield self.get_and_dispatch() with Measure(self.clock, "push"):
yield self.get_and_dispatch()
wait = 0 wait = 0
except: except:
if wait == 0: if wait == 0:
...@@ -150,27 +151,115 @@ class Pusher(object): ...@@ -150,27 +151,115 @@ class Pusher(object):
only_keys=("room", "receipt",), only_keys=("room", "receipt",),
) )
with Measure(self.clock, "push"): # limiting to 1 may get 1 event plus 1 presence event, so
# limiting to 1 may get 1 event plus 1 presence event, so # pick out the actual event
# pick out the actual event single_event = None
single_event = None read_receipt = None
read_receipt = None for c in chunk['chunk']:
for c in chunk['chunk']: if 'event_id' in c: # Hmmm...
if 'event_id' in c: # Hmmm... single_event = c
single_event = c elif c['type'] == 'm.receipt':
elif c['type'] == 'm.receipt': read_receipt = c
read_receipt = c
have_updated_badge = False
have_updated_badge = False if read_receipt:
if read_receipt: for receipt_part in read_receipt['content'].values():
for receipt_part in read_receipt['content'].values(): if 'm.read' in receipt_part:
if 'm.read' in receipt_part: if self.user_id in receipt_part['m.read'].keys():
if self.user_id in receipt_part['m.read'].keys(): have_updated_badge = True
have_updated_badge = True
if not single_event:
if not single_event: if have_updated_badge:
if have_updated_badge: yield self.update_badge()
yield self.update_badge() self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
self.user_id,
self.last_token
)
return
if not self.alive:
return
processed = False
rule_evaluator = yield \
push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
self.user_id, self.profile_tag, single_event['room_id'], self.store
)
actions = yield rule_evaluator.actions_for_event(single_event)
tweaks = rule_evaluator.tweaks_for_actions(actions)
if 'notify' in actions:
self.badge = yield self._get_badge_count()
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
for pk in rejected:
if pk != self.pushkey:
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warn(
("Ignoring rejected pushkey %s because we"
" didn't send it"), pk
)
else:
logger.info(
"Pushkey %s was rejected: removing",
pk
)
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_id
)
else:
if have_updated_badge:
yield self.update_badge()
processed = True
if not self.alive:
return
if processed:
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_token,
self.clock.time_msec()
)
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
self.failing_since)
else:
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
self.failing_since
)
if (self.failing_since and
self.failing_since <
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_id, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end'] self.last_token = chunk['end']
yield self.store.update_pusher_last_token( yield self.store.update_pusher_last_token(
self.app_id, self.app_id,
...@@ -178,114 +267,25 @@ class Pusher(object): ...@@ -178,114 +267,25 @@ class Pusher(object):
self.user_id, self.user_id,
self.last_token self.last_token
) )
return
if not self.alive:
return
processed = False
rule_evaluator = yield \
push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
self.user_id, self.profile_tag, single_event['room_id'], self.store
)
actions = yield rule_evaluator.actions_for_event(single_event) self.failing_since = None
tweaks = rule_evaluator.tweaks_for_actions(actions) yield self.store.update_pusher_failing_since(
if 'notify' in actions:
self.badge = yield self._get_badge_count()
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
for pk in rejected:
if pk != self.pushkey:
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warn(
("Ignoring rejected pushkey %s because we"
" didn't send it"), pk
)
else:
logger.info(
"Pushkey %s was rejected: removing",
pk
)
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_id
)
else:
if have_updated_badge:
yield self.update_badge()
processed = True
if not self.alive:
return
if processed:
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token_and_success(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_id, self.user_id,
self.last_token, self.failing_since
self.clock.time_msec()
) )
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
self.failing_since)
else: else:
if not self.failing_since: logger.warn("Failed to dispatch push for user %s "
self.failing_since = self.clock.time_msec() "(failing for %dms)."
yield self.store.update_pusher_failing_since( "Trying again in %dms",
self.app_id, self.user_id,
self.pushkey, self.clock.time_msec() - self.failing_since,
self.user_id, self.backoff_delay)
self.failing_since yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
) self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
if (self.failing_since and self.backoff_delay = Pusher.MAX_BACKOFF
self.failing_since <
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_id, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
self.user_id,
self.last_token
)
self.failing_since = None
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
self.failing_since
)
else:
logger.warn("Failed to dispatch push for user %s "
"(failing for %dms)."
"Trying again in %dms",
self.user_id,
self.clock.time_msec() - self.failing_since,
self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
def stop(self): def stop(self):
self.alive = False self.alive = False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment