Skip to content
Snippets Groups Projects
Commit 0a7d3cd0 authored by Mark Haines's avatar Mark Haines
Browse files

Create separate methods for getting messages to push

for the email and http pushers rather than trying to make a single
method that will work with their conflicting requirements.

The http pusher needs to get the messages in ascending stream order, and
doesn't want to miss a message.

The email pusher needs to get the messages in descending timestamp order,
and doesn't mind if it misses messages.
parent 328ad690
Branches
Tags
No related merge requests found
...@@ -140,9 +140,8 @@ class EmailPusher(object): ...@@ -140,9 +140,8 @@ class EmailPusher(object):
being run. being run.
""" """
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( fn = self.store.get_unread_push_actions_for_user_in_range_for_email
self.user_id, start, self.max_stream_ordering unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
)
soonest_due_at = None soonest_due_at = None
......
...@@ -141,7 +141,8 @@ class HttpPusher(object): ...@@ -141,7 +141,8 @@ class HttpPusher(object):
run once per pusher. run once per pusher.
""" """
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( fn = self.store.get_unread_push_actions_for_user_in_range_for_http
unprocessed = yield fn(
self.user_id, self.last_stream_ordering, self.max_stream_ordering self.user_id, self.last_stream_ordering, self.max_stream_ordering
) )
......
...@@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore):
StreamStore.__dict__["get_recent_event_ids_for_room"] StreamStore.__dict__["get_recent_event_ids_for_room"]
) )
get_unread_push_actions_for_user_in_range = ( get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range.__func__ DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
)
get_unread_push_actions_for_user_in_range_for_email = (
DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
) )
get_push_action_users_in_range = ( get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__ DataStore.get_push_action_users_in_range.__func__
......
...@@ -117,40 +117,149 @@ class EventPushActionsStore(SQLBaseStore): ...@@ -117,40 +117,149 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id, def get_unread_push_actions_for_user_in_range_for_http(
min_stream_ordering, self, user_id, min_stream_ordering, max_stream_ordering, limit=20
max_stream_ordering, ):
limit=20):
"""Get a list of the most recent unread push actions for a given user, """Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. within the given stream ordering range. Called by the httppusher.
Args: Args:
user_id (str) user_id (str): The user to fetch push actions for.
min_stream_ordering min_stream_ordering(int): The exclusive lower bound on the
max_stream_ordering stream ordering of event push actions to fetch.
limit (int) max_stream_ordering(int): The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
Returns:
A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions".
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""
# find rooms that have a read receipt in them and return the next
# push actions
def get_after_receipt(txn):
# find rooms that have a read receipt in them and return the next
# push actions
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AS rl,"
" event_push_actions AS ep"
" WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
" OR ("
" ep.topological_ordering = rl.topological_ordering"
" AND ep.stream_ordering > rl.stream_ordering"
" )"
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [
user_id, user_id,
min_stream_ordering, max_stream_ordering, limit,
]
txn.execute(sql, args)
return txn.fetchall()
after_read_receipt = yield self.runInteraction(
"get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
)
# There are rooms with push actions in them but you don't have a read receipt in
# them e.g. rooms you've been invited to, so get push actions for rooms which do
# not have read receipts in them too.
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts"
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
" ep.room_id NOT IN ("
" SELECT room_id FROM receipts_linearized"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
)
args = [
user_id, user_id,
min_stream_ordering, max_stream_ordering, limit,
]
txn.execute(sql, args)
return txn.fetchall()
no_read_receipt = yield self.runInteraction(
"get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
)
notifs = [
{
"event_id": row[0],
"room_id": row[1],
"stream_ordering": row[2],
"actions": json.loads(row[3]),
} for row in after_read_receipt + no_read_receipt
]
# Now sort it so it's ordered correctly, since currently it will
# contain results from the first query, correctly ordered, followed
# by results from the second query, but we want them all ordered
# by stream_ordering, oldest first.
notifs.sort(key=lambda r: r['stream_ordering'])
# Take only up to the limit. We have to stop at the limit because
# one of the subqueries may have hit the limit.
defer.returnValue(notifs[:limit])
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_email(
self, user_id, min_stream_ordering, max_stream_ordering, limit=20
):
"""Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. Called by the emailpusher
Args:
user_id (str): The user to fetch push actions for.
min_stream_ordering(int): The exclusive lower bound on the
stream ordering of event push actions to fetch.
max_stream_ordering(int): The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
Returns: Returns:
A promise which resolves to a list of dicts with the keys "event_id", A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions", "received_ts". "room_id", "stream_ordering", "actions", "received_ts".
The list will be ordered by descending received_ts.
The list will have between 0~limit entries. The list will have between 0~limit entries.
""" """
# find rooms that have a read receipt in them and return the most recent # find rooms that have a read receipt in them and return the most recent
# push actions # push actions
def get_after_receipt(txn): def get_after_receipt(txn):
# XXX: Do we really need to GROUP BY user_id on the inner SELECT?
# XXX: NATURAL JOIN obfuscates which columns are being joined on the
# inner SELECT (the room_id and event_id), can we
# INNER JOIN ... USING instead?
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
"e.received_ts " " e.received_ts"
"FROM (" " FROM ("
" SELECT room_id, user_id, " " SELECT room_id,"
" max(topological_ordering) as topological_ordering, " " MAX(topological_ordering) as topological_ordering,"
" max(stream_ordering) as stream_ordering " " MAX(stream_ordering) as stream_ordering"
" FROM events" " FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " INNER JOIN receipts_linearized USING (room_id, event_id)"
" GROUP BY room_id, user_id" " WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AS rl," ") AS rl,"
" event_push_actions AS ep" " event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)" " INNER JOIN events AS e USING (room_id, event_id)"
...@@ -165,47 +274,47 @@ class EventPushActionsStore(SQLBaseStore): ...@@ -165,47 +274,47 @@ class EventPushActionsStore(SQLBaseStore):
" )" " )"
" AND ep.stream_ordering > ?" " AND ep.stream_ordering > ?"
" AND ep.user_id = ?" " AND ep.user_id = ?"
" AND ep.user_id = rl.user_id" " AND ep.stream_ordering <= ?"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
) )
args = [min_stream_ordering, user_id] args = [
if max_stream_ordering is not None: user_id, user_id,
sql += " AND ep.stream_ordering <= ?" min_stream_ordering, max_stream_ordering, limit,
args.append(max_stream_ordering) ]
sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
args.append(limit)
txn.execute(sql, args) txn.execute(sql, args)
return txn.fetchall() return txn.fetchall()
after_read_receipt = yield self.runInteraction( after_read_receipt = yield self.runInteraction(
"get_unread_push_actions_for_user_in_range", get_after_receipt "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
) )
# There are rooms with push actions in them but you don't have a read receipt in # There are rooms with push actions in them but you don't have a read receipt in
# them e.g. rooms you've been invited to, so get push actions for rooms which do # them e.g. rooms you've been invited to, so get push actions for rooms which do
# not have read receipts in them too. # not have read receipts in them too.
def get_no_receipt(txn): def get_no_receipt(txn):
# XXX: Does the inner SELECT really need to select from the events table?
# We're just extracting the room_id, so isn't receipts_linearized enough?
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts" " e.received_ts"
" FROM event_push_actions AS ep" " FROM event_push_actions AS ep"
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" " INNER JOIN events AS e USING (room_id, event_id)"
" WHERE ep.room_id not in (" " WHERE"
" SELECT room_id FROM events NATURAL JOIN receipts_linearized" " ep.room_id NOT IN ("
" WHERE receipt_type = 'm.read' AND user_id = ?" " SELECT room_id FROM receipts_linearized"
" GROUP BY room_id" " WHERE receipt_type = 'm.read' AND user_id = ?"
") AND ep.user_id = ? AND ep.stream_ordering > ?" " GROUP BY room_id"
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
) )
args = [user_id, user_id, min_stream_ordering] args = [
if max_stream_ordering is not None: user_id, user_id,
sql += " AND ep.stream_ordering <= ?" min_stream_ordering, max_stream_ordering, limit,
args.append(max_stream_ordering) ]
sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
args.append(limit)
txn.execute(sql, args) txn.execute(sql, args)
return txn.fetchall() return txn.fetchall()
no_read_receipt = yield self.runInteraction( no_read_receipt = yield self.runInteraction(
"get_unread_push_actions_for_user_in_range", get_no_receipt "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
) )
# Make a list of dicts from the two sets of results. # Make a list of dicts from the two sets of results.
......
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import tests.unittest
import tests.utils
USER_ID = "@user:example.com"
class EventPushActionsStoreTestCase(tests.unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def test_get_unread_push_actions_for_user_in_range_for_http(self):
yield self.store.get_unread_push_actions_for_user_in_range_for_http(
USER_ID, 0, 1000, 20
)
@defer.inlineCallbacks
def test_get_unread_push_actions_for_user_in_range_for_email(self):
yield self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment