Skip to content
Snippets Groups Projects
Commit c06c0019 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Merge pull request #2116 from matrix-org/erikj/dedupe_federation_repl2

Dedupe KeyedEdu and Devices federation repl traffic
parents bf906015 26ae5178
No related branches found
No related tags found
No related merge requests found
...@@ -267,9 +267,12 @@ class FederationRemoteSendQueue(object): ...@@ -267,9 +267,12 @@ class FederationRemoteSendQueue(object):
keys = self.keyed_edu_changed.keys() keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token) i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1 j = keys.bisect_right(to_token) + 1
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) # We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
for (pos, (destination, edu_key)) in keyed_edus: for ((destination, edu_key), pos) in keyed_edus.iteritems():
rows.append((pos, KeyedEduRow( rows.append((pos, KeyedEduRow(
key=edu_key, key=edu_key,
edu=self.keyed_edu[(destination, edu_key)], edu=self.keyed_edu[(destination, edu_key)],
...@@ -279,7 +282,7 @@ class FederationRemoteSendQueue(object): ...@@ -279,7 +282,7 @@ class FederationRemoteSendQueue(object):
keys = self.edus.keys() keys = self.edus.keys()
i = keys.bisect_right(from_token) i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1 j = keys.bisect_right(to_token) + 1
edus = set((k, self.edus[k]) for k in keys[i:j]) edus = ((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus: for (pos, edu) in edus:
rows.append((pos, EduRow(edu))) rows.append((pos, EduRow(edu)))
...@@ -288,7 +291,7 @@ class FederationRemoteSendQueue(object): ...@@ -288,7 +291,7 @@ class FederationRemoteSendQueue(object):
keys = self.failures.keys() keys = self.failures.keys()
i = keys.bisect_right(from_token) i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1 j = keys.bisect_right(to_token) + 1
failures = set((k, self.failures[k]) for k in keys[i:j]) failures = ((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures: for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow( rows.append((pos, FailureRow(
...@@ -300,9 +303,9 @@ class FederationRemoteSendQueue(object): ...@@ -300,9 +303,9 @@ class FederationRemoteSendQueue(object):
keys = self.device_messages.keys() keys = self.device_messages.keys()
i = keys.bisect_right(from_token) i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1 j = keys.bisect_right(to_token) + 1
device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) device_messages = {self.device_messages[k]: k for k in keys[i:j]}
for (pos, destination) in device_messages: for (destination, pos) in device_messages.iteritems():
rows.append((pos, DeviceRow( rows.append((pos, DeviceRow(
destination=destination, destination=destination,
))) )))
...@@ -380,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( ...@@ -380,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu "key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu "edu", # Edu
))): ))):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
TypeId = "k" TypeId = "k"
@staticmethod @staticmethod
...@@ -404,6 +411,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( ...@@ -404,6 +411,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
class EduRow(BaseFederationRow, namedtuple("EduRow", ( class EduRow(BaseFederationRow, namedtuple("EduRow", (
"edu", # Edu "edu", # Edu
))): ))):
"""Streams EDUs that don't have keys. See KeyedEduRow
"""
TypeId = "e" TypeId = "e"
@staticmethod @staticmethod
...@@ -421,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( ...@@ -421,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
"destination", # str "destination", # str
"failure", "failure",
))): ))):
"""Streams failures to a remote server. Failures are issued when there was
something wrong with a transaction the remote sent us, e.g. it included
an event that was invalid.
"""
TypeId = "f" TypeId = "f"
@staticmethod @staticmethod
...@@ -443,6 +457,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( ...@@ -443,6 +457,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str "destination", # str
))): ))):
"""Streams the fact that either a) there is pending to device messages for
users on the remote, or b) a local users device has changed and needs to
be sent to the remote.
"""
TypeId = "d" TypeId = "d"
@staticmethod @staticmethod
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment