Skip to content
Snippets Groups Projects
Unverified Commit 4a31a61e authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Merge pull request #2786 from matrix-org/rav/rdata_metrics

Metrics for number of RDATA commands received
parents 1159abbd 5c3c32f1
No related merge requests found
......@@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.send_error("Wrong remote")
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.inc(stream_name)
try:
row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
self.id(), cmd.stream_name, cmd.row
self.id(), stream_name, cmd.row
)
raise
if cmd.token is None:
# I.e. this is part of a batch of updates for this stream. Batch
# until we get an update for the stream with a non None token
self.pending_batches.setdefault(cmd.stream_name, []).append(row)
self.pending_batches.setdefault(stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(cmd.stream_name, [])
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
......@@ -644,3 +647,9 @@ metrics.register_callback(
},
labels=["command", "name", "conn_id"],
)
# number of updates received for each RDATA stream
inbound_rdata_count = metrics.register_counter(
"inbound_rdata_count",
labels=["stream_name"],
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment