Skip to content
Snippets Groups Projects
Unverified Commit 05060e02 authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Track command processing as a background process (#7879)

I'm going to be doing more stuff synchronously, and I don't want to lose the
CPU metrics down the sofa.
parent 15997618
Branches
Tags
No related merge requests found
Report CPU metrics to prometheus for time spent processing replication commands.
...@@ -22,6 +22,7 @@ class RedisProtocol: ...@@ -22,6 +22,7 @@ class RedisProtocol:
def publish(self, channel: str, message: bytes): ... def publish(self, channel: str, message: bytes): ...
class SubscriberProtocol: class SubscriberProtocol:
def __init__(self, *args, **kwargs): ...
password: Optional[str] password: Optional[str]
def subscribe(self, channels: Union[str, List[str]]): ... def subscribe(self, channels: Union[str, List[str]]): ...
def connectionMade(self): ... def connectionMade(self): ...
......
...@@ -57,8 +57,12 @@ from prometheus_client import Counter ...@@ -57,8 +57,12 @@ from prometheus_client import Counter
from twisted.protocols.basic import LineOnlyReceiver from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure from twisted.python.failure import Failure
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import (
BackgroundProcessLoggingContext,
run_as_background_process,
)
from synapse.replication.tcp.commands import ( from synapse.replication.tcp.commands import (
VALID_CLIENT_COMMANDS, VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS, VALID_SERVER_COMMANDS,
...@@ -160,6 +164,12 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): ...@@ -160,6 +164,12 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings. # The LoopingCall for sending pings.
self._send_ping_loop = None self._send_ping_loop = None
# a logcontext which we use for processing incoming commands. We declare it as a
# background process so that the CPU stats get reported to prometheus.
self._logging_context = BackgroundProcessLoggingContext(
"replication_command_handler-%s" % self.conn_id
)
def connectionMade(self): def connectionMade(self):
logger.info("[%s] Connection established", self.id()) logger.info("[%s] Connection established", self.id())
...@@ -210,6 +220,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): ...@@ -210,6 +220,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def lineReceived(self, line: bytes): def lineReceived(self, line: bytes):
"""Called when we've received a line """Called when we've received a line
""" """
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_line(line)
def _parse_and_dispatch_line(self, line: bytes):
if line.strip() == "": if line.strip() == "":
# Ignore blank lines # Ignore blank lines
return return
...@@ -397,6 +411,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): ...@@ -397,6 +411,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
if self.transport: if self.transport:
self.transport.unregisterProducer() self.transport.unregisterProducer()
# mark the logging context as finished
self._logging_context.__exit__(None, None, None)
def __str__(self): def __str__(self):
addr = None addr = None
if self.transport: if self.transport:
......
...@@ -18,8 +18,11 @@ from typing import TYPE_CHECKING ...@@ -18,8 +18,11 @@ from typing import TYPE_CHECKING
import txredisapi import txredisapi
from synapse.logging.context import make_deferred_yieldable from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import (
BackgroundProcessLoggingContext,
run_as_background_process,
)
from synapse.replication.tcp.commands import ( from synapse.replication.tcp.commands import (
Command, Command,
ReplicateCommand, ReplicateCommand,
...@@ -66,6 +69,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ...@@ -66,6 +69,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
stream_name = None # type: str stream_name = None # type: str
outbound_redis_connection = None # type: txredisapi.RedisProtocol outbound_redis_connection = None # type: txredisapi.RedisProtocol
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# a logcontext which we use for processing incoming commands. We declare it as a
# background process so that the CPU stats get reported to prometheus.
self._logging_context = BackgroundProcessLoggingContext(
"replication_command_handler"
)
def connectionMade(self): def connectionMade(self):
logger.info("Connected to redis") logger.info("Connected to redis")
super().connectionMade() super().connectionMade()
...@@ -92,7 +104,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ...@@ -92,7 +104,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
def messageReceived(self, pattern: str, channel: str, message: str): def messageReceived(self, pattern: str, channel: str, message: str):
"""Received a message from redis. """Received a message from redis.
""" """
with PreserveLoggingContext(self._logging_context):
self._parse_and_dispatch_message(message)
def _parse_and_dispatch_message(self, message: str):
if message.strip() == "": if message.strip() == "":
# Ignore blank lines # Ignore blank lines
return return
...@@ -145,6 +160,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ...@@ -145,6 +160,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
super().connectionLost(reason) super().connectionLost(reason)
self.handler.lost_connection(self) self.handler.lost_connection(self)
# mark the logging context as finished
self._logging_context.__exit__(None, None, None)
def send_command(self, cmd: Command): def send_command(self, cmd: Command):
"""Send a command if connection has been established. """Send a command if connection has been established.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment