Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
synapse
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Monitor
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Maunium
synapse
Commits
b687010f
Unverified
Commit
b687010f
authored
2 years ago
by
Nick Mills-Barrett
Committed by
GitHub
2 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Rewrite get push actions queries (#13597)
parent
ba882c03
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
changelog.d/13597.misc
+1
-0
1 addition, 0 deletions
changelog.d/13597.misc
synapse/storage/databases/main/event_push_actions.py
+68
-160
68 additions, 160 deletions
synapse/storage/databases/main/event_push_actions.py
with
69 additions
and
160 deletions
changelog.d/13597.misc
0 → 100644
+
1
−
0
View file @
b687010f
Optimise push action fetching queries. Contributed by Nick @ Beeper (@fizzadar).
This diff is collapsed.
Click to expand it.
synapse/storage/databases/main/event_push_actions.py
+
68
−
160
View file @
b687010f
...
@@ -459,6 +459,32 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
...
@@ -459,6 +459,32 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return
await
self
.
db_pool
.
runInteraction
(
"
get_push_action_users_in_range
"
,
f
)
return
await
self
.
db_pool
.
runInteraction
(
"
get_push_action_users_in_range
"
,
f
)
def
_get_receipts_by_room_txn
(
self
,
txn
:
LoggingTransaction
,
user_id
:
str
)
->
List
[
Tuple
[
str
,
int
]]:
receipt_types_clause
,
args
=
make_in_list_sql_clause
(
self
.
database_engine
,
"
receipt_type
"
,
(
ReceiptTypes
.
READ
,
ReceiptTypes
.
READ_PRIVATE
,
ReceiptTypes
.
UNSTABLE_READ_PRIVATE
,
),
)
sql
=
f
"""
SELECT room_id, MAX(stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE
{
receipt_types_clause
}
AND user_id = ?
GROUP BY room_id
"""
args
.
extend
((
user_id
,))
txn
.
execute
(
sql
,
args
)
return
cast
(
List
[
Tuple
[
str
,
int
]],
txn
.
fetchall
())
async
def
get_unread_push_actions_for_user_in_range_for_http
(
async
def
get_unread_push_actions_for_user_in_range_for_http
(
self
,
self
,
user_id
:
str
,
user_id
:
str
,
...
@@ -482,106 +508,45 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
...
@@ -482,106 +508,45 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
The list will have between 0~limit entries.
"""
"""
# find rooms that have a read receipt in them and return the next
receipts_by_room
=
dict
(
# push actions
await
self
.
db_pool
.
runInteraction
(
def
get_after_receipt
(
"
get_unread_push_actions_for_user_in_range_http_receipts
"
,
txn
:
LoggingTransaction
,
self
.
_get_receipts_by_room_txn
,
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]]:
user_id
=
user_id
,
# find rooms that have a read receipt in them and return the next
),
# push actions
receipt_types_clause
,
args
=
make_in_list_sql_clause
(
self
.
database_engine
,
"
receipt_type
"
,
(
ReceiptTypes
.
READ
,
ReceiptTypes
.
READ_PRIVATE
,
ReceiptTypes
.
UNSTABLE_READ_PRIVATE
,
),
)
sql
=
f
"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM (
SELECT room_id,
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
WHERE
{
receipt_types_clause
}
AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
WHERE
ep.room_id = rl.room_id
AND ep.stream_ordering > rl.stream_ordering
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args
.
extend
(
(
user_id
,
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
)
)
txn
.
execute
(
sql
,
args
)
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]],
txn
.
fetchall
())
after_read_receipt
=
await
self
.
db_pool
.
runInteraction
(
"
get_unread_push_actions_for_user_in_range_http_arr
"
,
get_after_receipt
)
)
# There are rooms with push actions in them but you don't have a read receipt in
def
get_push_actions_txn
(
# them e.g. rooms you've been invited to, so get push actions for rooms which do
# not have read receipts in them too.
def
get_no_receipt
(
txn
:
LoggingTransaction
,
txn
:
LoggingTransaction
,
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]]:
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]]:
receipt_types_clause
,
args
=
make_in_list_sql_clause
(
sql
=
"""
self
.
database_engine
,
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
"
receipt_type
"
,
(
ReceiptTypes
.
READ
,
ReceiptTypes
.
READ_PRIVATE
,
ReceiptTypes
.
UNSTABLE_READ_PRIVATE
,
),
)
sql
=
f
"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM event_push_actions AS ep
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
WHERE
ep.room_id NOT IN (
ep.user_id = ?
SELECT room_id FROM receipts_linearized
WHERE
{
receipt_types_clause
}
AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
"""
args
.
extend
(
txn
.
execute
(
sql
,
(
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
))
(
user_id
,
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
)
)
txn
.
execute
(
sql
,
args
)
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]],
txn
.
fetchall
())
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
]],
txn
.
fetchall
())
no_read_receipt
=
await
self
.
db_pool
.
runInteraction
(
push_actions
=
await
self
.
db_pool
.
runInteraction
(
"
get_unread_push_actions_for_user_in_range_http
_nrr
"
,
get_
no_receipt
"
get_unread_push_actions_for_user_in_range_http
"
,
get_
push_actions_txn
)
)
notifs
=
[
notifs
=
[
HttpPushAction
(
HttpPushAction
(
event_id
=
row
[
0
]
,
event_id
=
event_id
,
room_id
=
ro
w
[
1
]
,
room_id
=
ro
om_id
,
stream_ordering
=
row
[
2
]
,
stream_ordering
=
stream_ordering
,
actions
=
_deserialize_action
(
row
[
3
],
row
[
4
]
),
actions
=
_deserialize_action
(
actions
,
highlight
),
)
)
for
row
in
after_read_receipt
+
no_read_receipt
for
event_id
,
room_id
,
stream_ordering
,
actions
,
highlight
in
push_actions
# Only include push actions with a stream ordering after any receipt, or without any
# receipt present (invited to but never read rooms).
if
stream_ordering
>
receipts_by_room
.
get
(
room_id
,
0
)
]
]
# Now sort it so it's ordered correctly, since currently it will
# Now sort it so it's ordered correctly, since currently it will
...
@@ -617,106 +582,49 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
...
@@ -617,106 +582,49 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
The list will have between 0~limit entries.
"""
"""
# find rooms that have a read receipt in them and return the most recent
receipts_by_room
=
dict
(
# push actions
await
self
.
db_pool
.
runInteraction
(
def
get_after_receipt
(
"
get_unread_push_actions_for_user_in_range_email_receipts
"
,
txn
:
LoggingTransaction
,
self
.
_get_receipts_by_room_txn
,
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]]:
user_id
=
user_id
,
receipt_types_clause
,
args
=
make_in_list_sql_clause
(
),
self
.
database_engine
,
"
receipt_type
"
,
(
ReceiptTypes
.
READ
,
ReceiptTypes
.
READ_PRIVATE
,
ReceiptTypes
.
UNSTABLE_READ_PRIVATE
,
),
)
sql
=
f
"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM (
SELECT room_id,
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
WHERE
{
receipt_types_clause
}
AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
ep.room_id = rl.room_id
AND ep.stream_ordering > rl.stream_ordering
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
args
.
extend
(
(
user_id
,
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
)
)
txn
.
execute
(
sql
,
args
)
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]],
txn
.
fetchall
())
after_read_receipt
=
await
self
.
db_pool
.
runInteraction
(
"
get_unread_push_actions_for_user_in_range_email_arr
"
,
get_after_receipt
)
)
# There are rooms with push actions in them but you don't have a read receipt in
def
get_push_actions_txn
(
# them e.g. rooms you've been invited to, so get push actions for rooms which do
# not have read receipts in them too.
def
get_no_receipt
(
txn
:
LoggingTransaction
,
txn
:
LoggingTransaction
,
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]]:
)
->
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]]:
receipt_types_clause
,
args
=
make_in_list_sql_clause
(
sql
=
"""
self
.
database_engine
,
"
receipt_type
"
,
(
ReceiptTypes
.
READ
,
ReceiptTypes
.
READ_PRIVATE
,
ReceiptTypes
.
UNSTABLE_READ_PRIVATE
,
),
)
sql
=
f
"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
ep.highlight, e.received_ts
FROM event_push_actions AS ep
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
WHERE
ep.room_id NOT IN (
ep.user_id = ?
SELECT room_id FROM receipts_linearized
WHERE
{
receipt_types_clause
}
AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
"""
args
.
extend
(
txn
.
execute
(
sql
,
(
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
))
(
user_id
,
user_id
,
min_stream_ordering
,
max_stream_ordering
,
limit
)
)
txn
.
execute
(
sql
,
args
)
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]],
txn
.
fetchall
())
return
cast
(
List
[
Tuple
[
str
,
str
,
int
,
str
,
bool
,
int
]],
txn
.
fetchall
())
no_read_receipt
=
await
self
.
db_pool
.
runInteraction
(
push_actions
=
await
self
.
db_pool
.
runInteraction
(
"
get_unread_push_actions_for_user_in_range_email
_nrr
"
,
get_
no_receipt
"
get_unread_push_actions_for_user_in_range_email
"
,
get_
push_actions_txn
)
)
# Make a list of dicts from the two sets of results.
# Make a list of dicts from the two sets of results.
notifs
=
[
notifs
=
[
EmailPushAction
(
EmailPushAction
(
event_id
=
row
[
0
]
,
event_id
=
event_id
,
room_id
=
ro
w
[
1
]
,
room_id
=
ro
om_id
,
stream_ordering
=
row
[
2
]
,
stream_ordering
=
stream_ordering
,
actions
=
_deserialize_action
(
row
[
3
],
row
[
4
]
),
actions
=
_deserialize_action
(
actions
,
highlight
),
received_ts
=
r
ow
[
5
]
,
received_ts
=
r
eceived_ts
,
)
)
for
row
in
after_read_receipt
+
no_read_receipt
for
event_id
,
room_id
,
stream_ordering
,
actions
,
highlight
,
received_ts
in
push_actions
# Only include push actions with a stream ordering after any receipt, or without any
# receipt present (invited to but never read rooms).
if
stream_ordering
>
receipts_by_room
.
get
(
room_id
,
0
)
]
]
# Now sort it so it's ordered correctly, since currently it will
# Now sort it so it's ordered correctly, since currently it will
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment