Skip to content
Snippets Groups Projects
Unverified Commit 3039d61b authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Merge pull request #4991 from matrix-org/erikj/stagger_push_startup

Make starting pushers faster during start up
parents 66e78700 6f226eed
No related branches found
No related tags found
No related merge requests found
Reduce CPU usage starting pushers during start up.
......@@ -72,8 +72,15 @@ class EmailPusher(object):
self._is_processing = False
def on_started(self):
if self.mailer is not None:
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs and self.mailer is not None:
self._start_processing()
def on_stop(self):
......
......@@ -112,8 +112,16 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
def on_started(self):
self._start_processing()
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs:
self._start_processing()
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
......
......@@ -21,6 +21,7 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
logger = logging.getLogger(__name__)
......@@ -197,7 +198,7 @@ class PusherPool:
p = r
if p:
self._start_pusher(p)
yield self._start_pusher(p)
@defer.inlineCallbacks
def _start_pushers(self):
......@@ -208,10 +209,14 @@ class PusherPool:
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
self._start_pusher(pusherdict)
# Stagger starting up the pushers so we don't completely drown the
# process on start up.
yield concurrently_execute(self._start_pusher, pushers, 10)
logger.info("Started pushers")
@defer.inlineCallbacks
def _start_pusher(self, pusherdict):
"""Start the given pusher
......@@ -248,7 +253,22 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
p.on_started()
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
user_id = pusherdict["user_name"]
last_stream_ordering = pusherdict["last_stream_ordering"]
if last_stream_ordering:
have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
user_id, last_stream_ordering,
)
else:
# We always want to default to starting up the pusher rather than
# risk missing push.
have_notifs = True
p.on_started(have_notifs)
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
......
......@@ -386,6 +386,36 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
"""A fast check to see if there might be something to push for the
user since the given stream ordering. May return false positives.
Useful to know whether to bother starting a pusher on start up or not.
Args:
user_id (str)
min_stream_ordering (int)
Returns:
Deferred[bool]: True if there may be push to process, False if
there definitely isn't.
"""
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
WHERE user_id = ? AND stream_ordering > ?
LIMIT 1
"""
txn.execute(sql, (user_id, min_stream_ordering,))
return bool(txn.fetchone())
return self.runInteraction(
"get_if_maybe_push_in_range_for_user",
_get_if_maybe_push_in_range_for_user_txn,
)
def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment