Skip to content
Snippets Groups Projects
Unverified Commit daa1ac89 authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Fix device list update stream ids going backward (#7158)

Occasionally we could get a federation device list update transaction which
looked like:

```
[
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D2', 'prev_id': [], 'stream_id': 12, 'deleted': True}},
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D1', 'prev_id': [12], 'stream_id': 11, 'deleted': True}},
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D3', 'prev_id': [11], 'stream_id': 13, 'deleted': True}}
]
```

Having `stream_ids` which are lower than `prev_ids` looks odd. It might work
(I'm not actually sure), but in any case it doesn't seem like a reasonable
thing to expect other implementations to support.
parent 61bb8343
No related branches found
No related tags found
No related merge requests found
Fix device list update stream ids going backward.
...@@ -165,7 +165,6 @@ class DeviceWorkerStore(SQLBaseStore): ...@@ -165,7 +165,6 @@ class DeviceWorkerStore(SQLBaseStore):
# the max stream_id across each set of duplicate entries # the max stream_id across each set of duplicate entries
# #
# maps (user_id, device_id) -> (stream_id, opentracing_context) # maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row
# #
# opentracing_context contains the opentracing metadata for the request # opentracing_context contains the opentracing metadata for the request
# that created the poke # that created the poke
...@@ -270,7 +269,14 @@ class DeviceWorkerStore(SQLBaseStore): ...@@ -270,7 +269,14 @@ class DeviceWorkerStore(SQLBaseStore):
prev_id = yield self._get_last_device_update_for_remote_user( prev_id = yield self._get_last_device_update_for_remote_user(
destination, user_id, from_stream_id destination, user_id, from_stream_id
) )
for device_id, device in iteritems(user_devices):
# make sure we go through the devices in stream order
device_ids = sorted(
user_devices.keys(), key=lambda i: query_map[(user_id, i)][0],
)
for device_id in device_ids:
device = user_devices[device_id]
stream_id, opentracing_context = query_map[(user_id, device_id)] stream_id, opentracing_context = query_map[(user_id, device_id)]
result = { result = {
"user_id": user_id, "user_id": user_id,
......
...@@ -297,6 +297,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ...@@ -297,6 +297,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
c = edu["content"] c = edu["content"]
if stream_id is not None: if stream_id is not None:
self.assertEqual(c["prev_id"], [stream_id]) self.assertEqual(c["prev_id"], [stream_id])
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2"}, devices) self.assertEqual({"D1", "D2"}, devices)
...@@ -330,6 +331,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ...@@ -330,6 +331,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
c.items(), c.items(),
{"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(), {"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
) )
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2", "D3"}, devices) self.assertEqual({"D1", "D2", "D3"}, devices)
...@@ -366,6 +368,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ...@@ -366,6 +368,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.assertEqual(edu["edu_type"], "m.device_list_update") self.assertEqual(edu["edu_type"], "m.device_list_update")
c = edu["content"] c = edu["content"]
self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else []) self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
if stream_id is not None:
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2", "D3"}, devices) self.assertEqual({"D1", "D2", "D3"}, devices)
...@@ -482,6 +486,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ...@@ -482,6 +486,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
} }
self.assertLessEqual(expected.items(), content.items()) self.assertLessEqual(expected.items(), content.items())
if prev_stream_id is not None:
self.assertGreaterEqual(content["stream_id"], prev_stream_id)
return content["stream_id"] return content["stream_id"]
def check_signing_key_update_txn(self, txn: JsonDict,) -> None: def check_signing_key_update_txn(self, txn: JsonDict,) -> None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment