Skip to content
Snippets Groups Projects
Unverified Commit d5275fc5 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Propagate cache invalidates from workers to other workers. (#6748)

Currently if a worker invalidates a cache it will be streamed to master, which then didn't forward those to other workers.
parent f74d178b
No related branches found
No related tags found
No related merge requests found
Propagate cache invalidates from workers to other workers.
...@@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): ...@@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
async def on_INVALIDATE_CACHE(self, cmd): async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data) self.streamer.on_remote_server_up(cmd.data)
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import logging import logging
import random import random
from typing import List from typing import Any, List
from six import itervalues from six import itervalues
...@@ -271,11 +271,14 @@ class ReplicationStreamer(object): ...@@ -271,11 +271,14 @@ class ReplicationStreamer(object):
self.notifier.on_new_replication_data() self.notifier.on_new_replication_data()
@measure_func("repl.on_invalidate_cache") @measure_func("repl.on_invalidate_cache")
def on_invalidate_cache(self, cache_func, keys): async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
"""The client has asked us to invalidate a cache """The client has asked us to invalidate a cache
""" """
invalidate_cache_counter.inc() invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))
# We invalidate the cache locally, but then also stream that to other
# workers.
await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))
@measure_func("repl.on_user_ip") @measure_func("repl.on_user_ip")
async def on_user_ip( async def on_user_ip(
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
import itertools import itertools
import logging import logging
from typing import Any, Iterable, Optional from typing import Any, Iterable, Optional, Tuple
from twisted.internet import defer from twisted.internet import defer
...@@ -33,6 +33,26 @@ CURRENT_STATE_CACHE_NAME = "cs_cache_fake" ...@@ -33,6 +33,26 @@ CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
class CacheInvalidationStore(SQLBaseStore): class CacheInvalidationStore(SQLBaseStore):
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return
cache_func.invalidate(keys)
await self.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)
def _invalidate_cache_and_stream(self, txn, cache_func, keys): def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves """Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches. will know to invalidate their caches.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment