Skip to content
Snippets Groups Projects
test_event_federation.py 19.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2018 New Vector Ltd
    #
    # Licensed under the Apache License, Version 2.0 (the 'License');
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an 'AS IS' BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    from typing import Tuple, Union
    
    
    import attr
    from parameterized import parameterized
    
    
    from synapse.api.room_versions import (
        KNOWN_ROOM_VERSIONS,
        EventFormatVersions,
        RoomVersion,
    )
    
    from synapse.events import _EventInternalMetadata
    
    from synapse.util import json_encoder
    
    import tests.unittest
    import tests.utils
    
    
    
    class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
        def prepare(self, reactor, clock, hs):
    
            self.store = hs.get_datastores().main
    
        def test_get_prev_events_for_room(self):
    
    Amber Brown's avatar
    Amber Brown committed
            room_id = "@ROOM:local"
    
    
            # add a bunch of events and hashes to act as forward extremities
            def insert_event(txn, i):
    
    Amber Brown's avatar
    Amber Brown committed
                event_id = "$event_%i:local" % i
    
    black's avatar
    black committed
                txn.execute(
                    (
                        "INSERT INTO events ("
                        "   room_id, event_id, type, depth, topological_ordering,"
    
                        "   content, processed, outlier, stream_ordering) "
                        "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?, ?)"
    
    black's avatar
    black committed
                    ),
    
                    (room_id, event_id, i, i, True, False, i),
    
    black's avatar
    black committed
                )
    
    black's avatar
    black committed
                txn.execute(
                    (
    
    Amber Brown's avatar
    Amber Brown committed
                        "INSERT INTO event_forward_extremities (room_id, event_id) "
                        "VALUES (?, ?)"
    
    black's avatar
    black committed
                    ),
                    (room_id, event_id),
                )
    
                self.get_success(
                    self.store.db_pool.runInteraction("insert", insert_event, i)
                )
    
            # this should get the last ten
    
            r = self.get_success(self.store.get_prev_events_for_room(room_id))
    
            self.assertEqual(10, len(r))
    
            for i in range(0, 10):
                self.assertEqual("$event_%i:local" % (19 - i), r[i])
    
    
        def test_get_rooms_with_many_extremities(self):
            room1 = "#room1"
            room2 = "#room2"
            room3 = "#room3"
    
            def insert_event(txn, i, room_id):
                event_id = "$event_%i:local" % i
                txn.execute(
                    (
                        "INSERT INTO event_forward_extremities (room_id, event_id) "
                        "VALUES (?, ?)"
                    ),
                    (room_id, event_id),
                )
    
            for i in range(0, 20):
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room1)
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room2)
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room3)
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, []))
    
            self.assertEqual(len(r), 3)
    
            # Does filter work?
    
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [room1]))
    
            self.assertTrue(room2 in r)
            self.assertTrue(room3 in r)
            self.assertEqual(len(r), 2)
    
    
            r = self.get_success(
                self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
            )
    
            self.assertEqual(r, [room3])
    
            # Does filter and limit work?
    
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1]))
    
            self.assertTrue(r == [room2] or r == [room3])
    
        def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
    
            room_id = "@ROOM:local"
    
            # The silly auth graph we use to test the auth difference algorithm,
            # where the top are the most recent events.
            #
            #   A   B
            #    \ /
            #  D  E
            #  \  |
            #   ` F   C
            #     |  /|
            #     G ´ |
            #     | \ |
            #     H   I
            #     |   |
            #     K   J
    
            auth_graph = {
                "a": ["e"],
                "b": ["e"],
                "c": ["g", "i"],
                "d": ["f"],
                "e": ["f"],
                "f": ["g"],
                "g": ["h", "i"],
                "h": ["k"],
                "i": ["j"],
                "k": [],
                "j": [],
            }
    
            depth_map = {
                "a": 7,
                "b": 7,
                "c": 4,
                "d": 6,
                "e": 6,
                "f": 5,
                "g": 3,
                "h": 2,
                "i": 2,
                "k": 1,
                "j": 1,
            }
    
    
            # Mark the room as maybe having a cover index.
    
    
            def store_room(txn):
                self.store.db_pool.simple_insert_txn(
                    txn,
                    "rooms",
                    {
                        "room_id": room_id,
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                        "has_auth_chain_index": use_chain_cover_index,
                    },
                )
    
            self.get_success(self.store.db_pool.runInteraction("store_room", store_room))
    
    
            # We rudely fiddle with the appropriate tables directly, as that's much
            # easier than constructing events properly.
    
    
            def insert_event(txn):
                stream_ordering = 0
    
                for event_id in auth_graph:
                    stream_ordering += 1
                    depth = depth_map[event_id]
    
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_id,
                            "room_id": room_id,
                            "depth": depth,
                            "topological_ordering": depth,
                            "type": "m.test",
                            "processed": True,
                            "outlier": False,
                            "stream_ordering": stream_ordering,
                        },
                    )
    
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
                    txn,
                    [
                        FakeEvent(event_id, room_id, auth_graph[event_id])
                        for event_id in auth_graph
                    ],
                )
    
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "insert",
                    insert_event,
                )
            )
    
            return room_id
    
        @parameterized.expand([(True,), (False,)])
        def test_auth_chain_ids(self, use_chain_cover_index: bool):
            room_id = self._setup_auth_chain(use_chain_cover_index)
    
            # a and b have the same auth chain.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["a"]))
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["b"]))
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["a", "b"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["c"]))
            self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
    
            # d and e have the same auth chain.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["d"]))
            self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["e"]))
            self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["f"]))
            self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["g"]))
            self.assertCountEqual(auth_chain_ids, ["h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["h"]))
    
            self.assertEqual(auth_chain_ids, {"k"})
    
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["i"]))
    
            self.assertEqual(auth_chain_ids, {"j"})
    
    
            # j and k have no parents.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["j"]))
    
            self.assertEqual(auth_chain_ids, set())
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["k"]))
    
            self.assertEqual(auth_chain_ids, set())
    
    
            # More complex input sequences.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["b", "c", "d"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["h", "i"])
            )
            self.assertCountEqual(auth_chain_ids, ["k", "j"])
    
            # e gets returned even though include_given is false, but it is in the
            # auth chain of b.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["b", "e"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            # Test include_given.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["i"], include_given=True)
            )
            self.assertCountEqual(auth_chain_ids, ["i", "j"])
    
        @parameterized.expand([(True,), (False,)])
        def test_auth_difference(self, use_chain_cover_index: bool):
            room_id = self._setup_auth_chain(use_chain_cover_index)
    
    
            # Now actually test that various combinations give the right result:
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
            )
            self.assertSetEqual(difference, {"a", "b", "d", "e"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}])
            )
            self.assertSetEqual(difference, set())
    
        def test_auth_difference_partial_cover(self):
            """Test that we correctly handle rooms where not all events have a chain
            cover calculated. This can happen in some obscure edge cases, including
            during the background update that calculates the chain cover for old
            rooms.
            """
    
            room_id = "@ROOM:local"
    
            # The silly auth graph we use to test the auth difference algorithm,
            # where the top are the most recent events.
            #
            #   A   B
            #    \ /
            #  D  E
            #  \  |
            #   ` F   C
            #     |  /|
            #     G ´ |
            #     | \ |
            #     H   I
            #     |   |
            #     K   J
    
            auth_graph = {
                "a": ["e"],
                "b": ["e"],
                "c": ["g", "i"],
                "d": ["f"],
                "e": ["f"],
                "f": ["g"],
                "g": ["h", "i"],
                "h": ["k"],
                "i": ["j"],
                "k": [],
                "j": [],
            }
    
            depth_map = {
                "a": 7,
                "b": 7,
                "c": 4,
                "d": 6,
                "e": 6,
                "f": 5,
                "g": 3,
                "h": 2,
                "i": 2,
                "k": 1,
                "j": 1,
            }
    
            # We rudely fiddle with the appropriate tables directly, as that's much
            # easier than constructing events properly.
    
            def insert_event(txn):
                # First insert the room and mark it as having a chain cover.
    
                self.store.db_pool.simple_insert_txn(
    
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                        "has_auth_chain_index": True,
    
                stream_ordering = 0
    
                for event_id in auth_graph:
                    stream_ordering += 1
                    depth = depth_map[event_id]
    
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_id,
                            "room_id": room_id,
                            "depth": depth,
                            "topological_ordering": depth,
                            "type": "m.test",
                            "processed": True,
                            "outlier": False,
                            "stream_ordering": stream_ordering,
                        },
                    )
    
                # Insert all events apart from 'B'
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
    
                    [
                        FakeEvent(event_id, room_id, auth_graph[event_id])
                        for event_id in auth_graph
                        if event_id != "b"
    
                # Now we insert the event 'B' without a chain cover, by temporarily
                # pretending the room doesn't have a chain cover.
    
                self.store.db_pool.simple_update_txn(
                    txn,
                    table="rooms",
                    keyvalues={"room_id": room_id},
                    updatevalues={"has_auth_chain_index": False},
                )
    
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
    
                    txn,
                    [FakeEvent("b", room_id, auth_graph["b"])],
    
                )
    
                self.store.db_pool.simple_update_txn(
                    txn,
                    table="rooms",
                    keyvalues={"room_id": room_id},
                    updatevalues={"has_auth_chain_index": True},
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "insert",
                    insert_event,
                )
            )
    
            # Now actually test that various combinations give the right result:
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
    
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c"})
    
    
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "d", "e"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
    
            )
            self.assertSetEqual(difference, {"a", "b"})
    
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}])
            )
    
            self.assertSetEqual(difference, set())
    
        @parameterized.expand(
            [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
        )
        def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
            """Test that pruning of inbound federation queues work"""
    
            def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
                """Account for differences in prev_events format across room versions"""
    
                if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
    
                    return prev_event_id, {}
    
                return prev_event_id
    
    
            # Insert a bunch of events that all reference the previous one.
            self.get_success(
                self.store.db_pool.simple_insert_many(
                    table="federation_inbound_events_staging",
    
                    keys=(
                        "origin",
                        "room_id",
                        "received_ts",
                        "event_id",
                        "event_json",
                        "internal_metadata",
                    ),
    
                        (
                            "some_origin",
                            room_id,
                            0,
                            f"$fake_event_id_{i + 1}",
    
                            json_encoder.encode(
                                {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
                            ),
    
                        for i in range(500)
                    ],
                    desc="test_prune_inbound_federation_queue",
                )
            )
    
            # Calling prune once should return True, i.e. a prune happen. The second
            # time it shouldn't.
            pruned = self.get_success(
    
                self.store.prune_staged_events_in_room(room_id, room_version)
    
            )
            self.assertTrue(pruned)
    
            pruned = self.get_success(
    
                self.store.prune_staged_events_in_room(room_id, room_version)
    
            )
            self.assertFalse(pruned)
    
            # Assert that we only have a single event left in the queue, and that it
            # is the last one.
            count = self.get_success(
                self.store.db_pool.simple_select_one_onecol(
                    table="federation_inbound_events_staging",
                    keyvalues={"room_id": room_id},
    
                    desc="test_prune_inbound_federation_queue",
                )
            )
            self.assertEqual(count, 1)
    
            _, event_id = self.get_success(
                self.store.get_next_staged_event_id_for_room(room_id)
            )
            self.assertEqual(event_id, "$fake_event_id_500")
    
    
    
    @attr.s
    class FakeEvent:
        event_id = attr.ib()
        room_id = attr.ib()
        auth_events = attr.ib()
    
        type = "foo"
        state_key = "foo"
    
        internal_metadata = _EventInternalMetadata({})
    
        def auth_event_ids(self):
            return self.auth_events
    
        def is_state(self):
            return True