Skip to content
Snippets Groups Projects
Commit 149fa411 authored by David Baker's avatar David Baker
Browse files

Only delete push actions after 30 days

parent 60ff2e79
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,8 @@ import ujson as json ...@@ -22,6 +22,8 @@ import ujson as json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000
class EventPushActionsStore(SQLBaseStore): class EventPushActionsStore(SQLBaseStore):
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
...@@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore): ...@@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id) (room_id, event_id)
) )
def _remove_push_actions_before_txn(self, txn, room_id, user_id, def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering): topological_ordering):
"""
Purges old, stale push actions for a user and room before a given
topological_ordering
Args:
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
topological_ordering: The lowest topological ordering which will
not be deleted.
Returns:
"""
txn.call_after( txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many, self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, ) (room_id, user_id, )
) )
threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS
# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute( txn.execute(
"DELETE FROM event_push_actions" "DELETE FROM event_push_actions "
" WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", " WHERE user_id = ? AND room_id = ? AND "
(room_id, user_id, topological_ordering,) " topological_ordering < ? AND stream_ordering < ("
" SELECT stream_ordering FROM events"
" WHERE room_id = ? AND received_ts < ?"
" ORDER BY stream_ordering DESC"
" LIMIT 1"
")",
(user_id, room_id, topological_ordering, room_id, threshold)
) )
......
...@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore): ...@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
) )
if receipt_type == "m.read" and topological_ordering: if receipt_type == "m.read" and topological_ordering:
self._remove_push_actions_before_txn( self._remove_old_push_actions_before_txn(
txn, txn,
room_id=room_id, room_id=room_id,
user_id=user_id, user_id=user_id,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment