Skip to content
Snippets Groups Projects
Unverified Commit 8ca79613 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Fix Redis reconnection logic (#7482)

Proactively send out `POSITION` commands (as if we had just received a `REPLICATE`) when we connect to Redis. This is important as other instances won't notice we've connected to issue a `REPLICATE` command (unlike for direct TCP connections). This is only currently an issue if master process reconnects without restarting (if it restarts then it won't have written anything and so other instances probably won't have missed anything). 
parent 51fb0fc2
No related branches found
No related tags found
No related merge requests found
Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting.
...@@ -151,6 +151,13 @@ class ReplicationCommandHandler: ...@@ -151,6 +151,13 @@ class ReplicationCommandHandler:
hs.get_reactor().connectTCP(host, port, self._factory) hs.get_reactor().connectTCP(host, port, self._factory)
async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
self.send_positions_to_connection(conn)
def send_positions_to_connection(self, conn: AbstractConnection):
"""Send current position of all streams this process is source of to
the connection.
"""
# We only want to announce positions by the writer of the streams. # We only want to announce positions by the writer of the streams.
# Currently this is just the master process. # Currently this is just the master process.
if not self._is_master: if not self._is_master:
...@@ -158,7 +165,7 @@ class ReplicationCommandHandler: ...@@ -158,7 +165,7 @@ class ReplicationCommandHandler:
for stream_name, stream in self._streams.items(): for stream_name, stream in self._streams.items():
current_token = stream.current_token(self._instance_name) current_token = stream.current_token(self._instance_name)
self.send_command( conn.send_command(
PositionCommand(stream_name, self._instance_name, current_token) PositionCommand(stream_name, self._instance_name, current_token)
) )
......
...@@ -70,7 +70,6 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ...@@ -70,7 +70,6 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
logger.info("Connected to redis") logger.info("Connected to redis")
super().connectionMade() super().connectionMade()
run_as_background_process("subscribe-replication", self._send_subscribe) run_as_background_process("subscribe-replication", self._send_subscribe)
self.handler.new_connection(self)
async def _send_subscribe(self): async def _send_subscribe(self):
# it's important to make sure that we only send the REPLICATE command once we # it's important to make sure that we only send the REPLICATE command once we
...@@ -81,9 +80,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ...@@ -81,9 +80,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
logger.info( logger.info(
"Successfully subscribed to redis stream, sending REPLICATE command" "Successfully subscribed to redis stream, sending REPLICATE command"
) )
self.handler.new_connection(self)
await self._async_send_command(ReplicateCommand()) await self._async_send_command(ReplicateCommand())
logger.info("REPLICATE successfully sent") logger.info("REPLICATE successfully sent")
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
self.handler.send_positions_to_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str): def messageReceived(self, pattern: str, channel: str, message: str):
"""Received a message from redis. """Received a message from redis.
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment