Skip to content
Snippets Groups Projects
Unverified Commit 507c1cb3 authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Update the rejected state of events during resync (#13459)

Events can be un-rejected or newly-rejected during resync, so ensure we update
the database and caches when that happens.
parent 22814271
Branches
Tags
No related merge requests found
Faster joins: update the rejected state of events during de-partial-stating.
...@@ -2200,3 +2200,63 @@ class EventsWorkerStore(SQLBaseStore): ...@@ -2200,3 +2200,63 @@ class EventsWorkerStore(SQLBaseStore):
(room_id,), (room_id,),
) )
return [row[0] for row in txn] return [row[0] for row in txn]
def mark_event_rejected_txn(
self,
txn: LoggingTransaction,
event_id: str,
rejection_reason: Optional[str],
) -> None:
"""Mark an event that was previously accepted as rejected, or vice versa
This can happen, for example, when resyncing state during a faster join.
Args:
txn:
event_id: ID of event to update
rejection_reason: reason it has been rejected, or None if it is now accepted
"""
if rejection_reason is None:
logger.info(
"Marking previously-processed event %s as accepted",
event_id,
)
self.db_pool.simple_delete_txn(
txn,
"rejections",
keyvalues={"event_id": event_id},
)
else:
logger.info(
"Marking previously-processed event %s as rejected(%s)",
event_id,
rejection_reason,
)
self.db_pool.simple_upsert_txn(
txn,
table="rejections",
keyvalues={"event_id": event_id},
values={
"reason": rejection_reason,
"last_check": self._clock.time_msec(),
},
)
self.db_pool.simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={"rejection_reason": rejection_reason},
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
# call '_send_invalidation_to_replication', but we actually need the other
# end to call _invalidate_local_get_event_cache() rather than (just)
# _get_event_cache.invalidate().
#
# One solution might be to (somehow) get the workers to call
# _invalidate_caches_for_event() (though that will invalidate more than
# strictly necessary).
#
# https://github.com/matrix-org/synapse/issues/12994
...@@ -430,6 +430,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ...@@ -430,6 +430,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
updatevalues={"state_group": state_group}, updatevalues={"state_group": state_group},
) )
# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
if bool(context.rejected) != (event.rejected_reason is not None):
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
self.db_pool.simple_delete_one_txn( self.db_pool.simple_delete_one_txn(
txn, txn,
table="partial_state_events", table="partial_state_events",
......
...@@ -539,15 +539,6 @@ class StateFilter: ...@@ -539,15 +539,6 @@ class StateFilter:
is_mine_id: a callable which confirms if a given state_key matches a mxid is_mine_id: a callable which confirms if a given state_key matches a mxid
of a local user of a local user
""" """
# TODO(faster_joins): it's not entirely clear that this is safe. In particular,
# there may be circumstances in which we return a piece of state that, once we
# resync the state, we discover is invalid. For example: if it turns out that
# the sender of a piece of state wasn't actually in the room, then clearly that
# state shouldn't have been returned.
# We should at least add some tests around this to see what happens.
# https://github.com/matrix-org/synapse/issues/13006
# if we haven't requested membership events, then it depends on the value of # if we haven't requested membership events, then it depends on the value of
# 'include_others' # 'include_others'
if EventTypes.Member not in self.types: if EventTypes.Member not in self.types:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment