Skip to content
Snippets Groups Projects
Commit 458b6f47 authored by Erik Johnston's avatar Erik Johnston
Browse files

Only invalidate membership caches based on the cache stream

Before we completely invalidated get_users_in_room whenever we updated
any current_state_events table. This was way too aggressive.
parent ac001dab
No related branches found
No related tags found
No related merge requests found
...@@ -299,9 +299,6 @@ class ReplicationResource(Resource): ...@@ -299,9 +299,6 @@ class ReplicationResource(Resource):
"backward_ex_outliers", res.backward_ex_outliers, "backward_ex_outliers", res.backward_ex_outliers,
("position", "event_id", "state_group"), ("position", "event_id", "state_group"),
) )
writer.write_header_and_rows(
"state_resets", res.state_resets, ("position",),
)
@defer.inlineCallbacks @defer.inlineCallbacks
def presence(self, writer, current_token, request_streams): def presence(self, writer, current_token, request_streams):
......
...@@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore):
return result return result
def process_replication(self, result): def process_replication(self, result):
state_resets = set(
r[0] for r in result.get("state_resets", {"rows": []})["rows"]
)
stream = result.get("events") stream = result.get("events")
if stream: if stream:
self._stream_id_gen.advance(int(stream["position"])) self._stream_id_gen.advance(int(stream["position"]))
...@@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore):
for row in stream["rows"]: for row in stream["rows"]:
self._process_replication_row( self._process_replication_row(
row, backfilled=False, state_resets=state_resets row, backfilled=False,
) )
stream = result.get("backfill") stream = result.get("backfill")
...@@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore):
self._backfill_id_gen.advance(-int(stream["position"])) self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]: for row in stream["rows"]:
self._process_replication_row( self._process_replication_row(
row, backfilled=True, state_resets=state_resets row, backfilled=True,
) )
stream = result.get("forward_ex_outliers") stream = result.get("forward_ex_outliers")
...@@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore):
return super(SlavedEventStore, self).process_replication(result) return super(SlavedEventStore, self).process_replication(result)
def _process_replication_row(self, row, backfilled, state_resets): def _process_replication_row(self, row, backfilled):
position = row[0]
internal = json.loads(row[1]) internal = json.loads(row[1])
event_json = json.loads(row[2]) event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal) event = FrozenEvent(event_json, internal_metadata_dict=internal)
self.invalidate_caches_for_event( self.invalidate_caches_for_event(
event, backfilled, reset_state=position in state_resets event, backfilled,
) )
def invalidate_caches_for_event(self, event, backfilled, reset_state): def invalidate_caches_for_event(self, event, backfilled):
if reset_state:
self.get_rooms_for_user.invalidate_all()
self.get_users_in_room.invalidate((event.room_id,))
self._invalidate_get_event_cache(event.event_id) self._invalidate_get_event_cache(event.event_id)
self.get_latest_event_ids_in_room.invalidate((event.room_id,)) self.get_latest_event_ids_in_room.invalidate((event.room_id,))
...@@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore): ...@@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore):
self._invalidate_get_event_cache(event.redacts) self._invalidate_get_event_cache(event.redacts)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
self.get_rooms_for_user.invalidate((event.state_key,))
self.get_users_in_room.invalidate((event.room_id,))
self._membership_stream_cache.entity_has_changed( self._membership_stream_cache.entity_has_changed(
event.state_key, event.internal_metadata.stream_ordering event.state_key, event.internal_metadata.stream_ordering
) )
......
...@@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore): ...@@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,) txn, self.get_users_in_room, (room_id,)
) )
# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
self._simple_insert_txn(
txn,
table="current_state_resets",
values={"event_stream_ordering": max_stream_order}
)
for room_id, new_extrem in new_forward_extremeties.items(): for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
...@@ -1610,15 +1602,6 @@ class EventsStore(SQLBaseStore): ...@@ -1610,15 +1602,6 @@ class EventsStore(SQLBaseStore):
else: else:
upper_bound = current_forward_id upper_bound = current_forward_id
sql = (
"SELECT event_stream_ordering FROM current_state_resets"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering ASC"
)
txn.execute(sql, (last_forward_id, upper_bound))
state_resets = txn.fetchall()
sql = ( sql = (
"SELECT event_stream_ordering, event_id, state_group" "SELECT event_stream_ordering, event_id, state_group"
" FROM ex_outlier_stream" " FROM ex_outlier_stream"
...@@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore): ...@@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore):
forward_ex_outliers = txn.fetchall() forward_ex_outliers = txn.fetchall()
else: else:
new_forward_events = [] new_forward_events = []
state_resets = []
forward_ex_outliers = [] forward_ex_outliers = []
sql = ( sql = (
...@@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore): ...@@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore):
return AllNewEventsResult( return AllNewEventsResult(
new_forward_events, new_backfill_events, new_forward_events, new_backfill_events,
forward_ex_outliers, backward_ex_outliers, forward_ex_outliers, backward_ex_outliers,
state_resets,
) )
return self.runInteraction("get_all_new_events", get_all_new_events_txn) return self.runInteraction("get_all_new_events", get_all_new_events_txn)
...@@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore): ...@@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore):
AllNewEventsResult = namedtuple("AllNewEventsResult", [ AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events", "new_forward_events", "new_backfill_events",
"forward_ex_outliers", "backward_ex_outliers", "forward_ex_outliers", "backward_ex_outliers",
"state_resets"
]) ])
...@@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore): ...@@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore):
) )
for event in events: for event in events:
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after( txn.call_after(
self._membership_stream_cache.entity_has_changed, self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering event.state_key, event.internal_metadata.stream_ordering
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment