Skip to content
Snippets Groups Projects
test_event_federation.py 37.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2018 New Vector Ltd
    #
    # Licensed under the Apache License, Version 2.0 (the 'License');
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an 'AS IS' BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import datetime
    from typing import Dict, List, Tuple, Union
    
    import attr
    from parameterized import parameterized
    
    
    from twisted.test.proto_helpers import MemoryReactor
    
    from synapse.api.constants import EventTypes
    
    from synapse.api.room_versions import (
        KNOWN_ROOM_VERSIONS,
        EventFormatVersions,
        RoomVersion,
    )
    
    from synapse.events import _EventInternalMetadata
    
    from synapse.server import HomeServer
    from synapse.storage.database import LoggingTransaction
    from synapse.types import JsonDict
    from synapse.util import Clock, json_encoder
    
    import tests.unittest
    import tests.utils
    
    
    
    @attr.s(auto_attribs=True, frozen=True, slots=True)
    class _BackfillSetupInfo:
        room_id: str
        depth_map: Dict[str, int]
    
    
    
    class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
    
        def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
    
            self.store = hs.get_datastores().main
    
        def test_get_prev_events_for_room(self):
    
    Amber Brown's avatar
    Amber Brown committed
            room_id = "@ROOM:local"
    
    
            # add a bunch of events and hashes to act as forward extremities
            def insert_event(txn, i):
    
    Amber Brown's avatar
    Amber Brown committed
                event_id = "$event_%i:local" % i
    
    black's avatar
    black committed
                txn.execute(
                    (
                        "INSERT INTO events ("
                        "   room_id, event_id, type, depth, topological_ordering,"
    
                        "   content, processed, outlier, stream_ordering) "
                        "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?, ?)"
    
    black's avatar
    black committed
                    ),
    
                    (room_id, event_id, i, i, True, False, i),
    
    black's avatar
    black committed
                )
    
    black's avatar
    black committed
                txn.execute(
                    (
    
    Amber Brown's avatar
    Amber Brown committed
                        "INSERT INTO event_forward_extremities (room_id, event_id) "
                        "VALUES (?, ?)"
    
    black's avatar
    black committed
                    ),
                    (room_id, event_id),
                )
    
                self.get_success(
                    self.store.db_pool.runInteraction("insert", insert_event, i)
                )
    
            # this should get the last ten
    
            r = self.get_success(self.store.get_prev_events_for_room(room_id))
    
            self.assertEqual(10, len(r))
    
            for i in range(0, 10):
                self.assertEqual("$event_%i:local" % (19 - i), r[i])
    
    
        def test_get_rooms_with_many_extremities(self):
            room1 = "#room1"
            room2 = "#room2"
            room3 = "#room3"
    
            def insert_event(txn, i, room_id):
                event_id = "$event_%i:local" % i
                txn.execute(
                    (
                        "INSERT INTO event_forward_extremities (room_id, event_id) "
                        "VALUES (?, ?)"
                    ),
                    (room_id, event_id),
                )
    
            for i in range(0, 20):
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room1)
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room2)
    
                    self.store.db_pool.runInteraction("insert", insert_event, i, room3)
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, []))
    
            self.assertEqual(len(r), 3)
    
            # Does filter work?
    
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [room1]))
    
            self.assertTrue(room2 in r)
            self.assertTrue(room3 in r)
            self.assertEqual(len(r), 2)
    
    
            r = self.get_success(
                self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
            )
    
            self.assertEqual(r, [room3])
    
            # Does filter and limit work?
    
    
            r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1]))
    
            self.assertTrue(r == [room2] or r == [room3])
    
        def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
    
            room_id = "@ROOM:local"
    
            # The silly auth graph we use to test the auth difference algorithm,
            # where the top are the most recent events.
            #
            #   A   B
            #    \ /
            #  D  E
            #  \  |
            #   ` F   C
            #     |  /|
            #     G ´ |
            #     | \ |
            #     H   I
            #     |   |
            #     K   J
    
            auth_graph = {
                "a": ["e"],
                "b": ["e"],
                "c": ["g", "i"],
                "d": ["f"],
                "e": ["f"],
                "f": ["g"],
                "g": ["h", "i"],
                "h": ["k"],
                "i": ["j"],
                "k": [],
                "j": [],
            }
    
            depth_map = {
                "a": 7,
                "b": 7,
                "c": 4,
                "d": 6,
                "e": 6,
                "f": 5,
                "g": 3,
                "h": 2,
                "i": 2,
                "k": 1,
                "j": 1,
            }
    
    
            # Mark the room as maybe having a cover index.
    
    
            def store_room(txn):
                self.store.db_pool.simple_insert_txn(
                    txn,
                    "rooms",
                    {
                        "room_id": room_id,
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                        "has_auth_chain_index": use_chain_cover_index,
                    },
                )
    
            self.get_success(self.store.db_pool.runInteraction("store_room", store_room))
    
    
            # We rudely fiddle with the appropriate tables directly, as that's much
            # easier than constructing events properly.
    
    
            def insert_event(txn):
                stream_ordering = 0
    
                for event_id in auth_graph:
                    stream_ordering += 1
                    depth = depth_map[event_id]
    
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_id,
                            "room_id": room_id,
                            "depth": depth,
                            "topological_ordering": depth,
                            "type": "m.test",
                            "processed": True,
                            "outlier": False,
                            "stream_ordering": stream_ordering,
                        },
                    )
    
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
                    txn,
                    [
                        FakeEvent(event_id, room_id, auth_graph[event_id])
                        for event_id in auth_graph
                    ],
                )
    
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "insert",
                    insert_event,
                )
            )
    
            return room_id
    
        @parameterized.expand([(True,), (False,)])
        def test_auth_chain_ids(self, use_chain_cover_index: bool):
            room_id = self._setup_auth_chain(use_chain_cover_index)
    
            # a and b have the same auth chain.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["a"]))
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["b"]))
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["a", "b"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["c"]))
            self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
    
            # d and e have the same auth chain.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["d"]))
            self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["e"]))
            self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["f"]))
            self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["g"]))
            self.assertCountEqual(auth_chain_ids, ["h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["h"]))
    
            self.assertEqual(auth_chain_ids, {"k"})
    
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["i"]))
    
            self.assertEqual(auth_chain_ids, {"j"})
    
    
            # j and k have no parents.
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["j"]))
    
            self.assertEqual(auth_chain_ids, set())
    
            auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["k"]))
    
            self.assertEqual(auth_chain_ids, set())
    
    
            # More complex input sequences.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["b", "c", "d"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["h", "i"])
            )
            self.assertCountEqual(auth_chain_ids, ["k", "j"])
    
            # e gets returned even though include_given is false, but it is in the
            # auth chain of b.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["b", "e"])
            )
            self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
    
            # Test include_given.
            auth_chain_ids = self.get_success(
                self.store.get_auth_chain_ids(room_id, ["i"], include_given=True)
            )
            self.assertCountEqual(auth_chain_ids, ["i", "j"])
    
        @parameterized.expand([(True,), (False,)])
        def test_auth_difference(self, use_chain_cover_index: bool):
            room_id = self._setup_auth_chain(use_chain_cover_index)
    
    
            # Now actually test that various combinations give the right result:
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
            )
            self.assertSetEqual(difference, {"a", "b", "d", "e"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
            )
            self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}])
            )
            self.assertSetEqual(difference, set())
    
        def test_auth_difference_partial_cover(self):
            """Test that we correctly handle rooms where not all events have a chain
            cover calculated. This can happen in some obscure edge cases, including
            during the background update that calculates the chain cover for old
            rooms.
            """
    
            room_id = "@ROOM:local"
    
            # The silly auth graph we use to test the auth difference algorithm,
            # where the top are the most recent events.
            #
            #   A   B
            #    \ /
            #  D  E
            #  \  |
            #   ` F   C
            #     |  /|
            #     G ´ |
            #     | \ |
            #     H   I
            #     |   |
            #     K   J
    
            auth_graph = {
                "a": ["e"],
                "b": ["e"],
                "c": ["g", "i"],
                "d": ["f"],
                "e": ["f"],
                "f": ["g"],
                "g": ["h", "i"],
                "h": ["k"],
                "i": ["j"],
                "k": [],
                "j": [],
            }
    
            depth_map = {
                "a": 7,
                "b": 7,
                "c": 4,
                "d": 6,
                "e": 6,
                "f": 5,
                "g": 3,
                "h": 2,
                "i": 2,
                "k": 1,
                "j": 1,
            }
    
            # We rudely fiddle with the appropriate tables directly, as that's much
            # easier than constructing events properly.
    
            def insert_event(txn):
                # First insert the room and mark it as having a chain cover.
    
                self.store.db_pool.simple_insert_txn(
    
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                        "has_auth_chain_index": True,
    
                stream_ordering = 0
    
                for event_id in auth_graph:
                    stream_ordering += 1
                    depth = depth_map[event_id]
    
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_id,
                            "room_id": room_id,
                            "depth": depth,
                            "topological_ordering": depth,
                            "type": "m.test",
                            "processed": True,
                            "outlier": False,
                            "stream_ordering": stream_ordering,
                        },
                    )
    
                # Insert all events apart from 'B'
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
    
                    [
                        FakeEvent(event_id, room_id, auth_graph[event_id])
                        for event_id in auth_graph
                        if event_id != "b"
    
                # Now we insert the event 'B' without a chain cover, by temporarily
                # pretending the room doesn't have a chain cover.
    
                self.store.db_pool.simple_update_txn(
                    txn,
                    table="rooms",
                    keyvalues={"room_id": room_id},
                    updatevalues={"has_auth_chain_index": False},
                )
    
                self.hs.datastores.persist_events._persist_event_auth_chain_txn(
    
                    txn,
                    [FakeEvent("b", room_id, auth_graph["b"])],
    
                )
    
                self.store.db_pool.simple_update_txn(
                    txn,
                    table="rooms",
                    keyvalues={"room_id": room_id},
                    updatevalues={"has_auth_chain_index": True},
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "insert",
                    insert_event,
                )
            )
    
            # Now actually test that various combinations give the right result:
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
    
            )
            self.assertSetEqual(difference, {"a", "b"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c"})
    
    
                self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "d", "e"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
    
            )
            self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
    
            difference = self.get_success(
    
                self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
    
            )
            self.assertSetEqual(difference, {"a", "b"})
    
    
            difference = self.get_success(
                self.store.get_auth_chain_difference(room_id, [{"a"}])
            )
    
            self.assertSetEqual(difference, set())
    
        @parameterized.expand(
            [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
        )
        def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
            """Test that pruning of inbound federation queues work"""
    
            def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
                """Account for differences in prev_events format across room versions"""
    
                if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
    
                    return prev_event_id, {}
    
                return prev_event_id
    
    
            # Insert a bunch of events that all reference the previous one.
            self.get_success(
                self.store.db_pool.simple_insert_many(
                    table="federation_inbound_events_staging",
    
                    keys=(
                        "origin",
                        "room_id",
                        "received_ts",
                        "event_id",
                        "event_json",
                        "internal_metadata",
                    ),
    
                        (
                            "some_origin",
                            room_id,
                            0,
                            f"$fake_event_id_{i + 1}",
    
                            json_encoder.encode(
                                {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
                            ),
    
                        for i in range(500)
                    ],
                    desc="test_prune_inbound_federation_queue",
                )
            )
    
            # Calling prune once should return True, i.e. a prune happen. The second
            # time it shouldn't.
            pruned = self.get_success(
    
                self.store.prune_staged_events_in_room(room_id, room_version)
    
            )
            self.assertTrue(pruned)
    
            pruned = self.get_success(
    
                self.store.prune_staged_events_in_room(room_id, room_version)
    
            )
            self.assertFalse(pruned)
    
            # Assert that we only have a single event left in the queue, and that it
            # is the last one.
            count = self.get_success(
                self.store.db_pool.simple_select_one_onecol(
                    table="federation_inbound_events_staging",
                    keyvalues={"room_id": room_id},
    
                    desc="test_prune_inbound_federation_queue",
                )
            )
            self.assertEqual(count, 1)
    
    
            next_staged_event_info = self.get_success(
    
                self.store.get_next_staged_event_id_for_room(room_id)
            )
    
            assert next_staged_event_info
            _, event_id = next_staged_event_info
    
            self.assertEqual(event_id, "$fake_event_id_500")
    
    
    594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
        def _setup_room_for_backfill_tests(self) -> _BackfillSetupInfo:
            """
            Sets up a room with various events and backward extremities to test
            backfill functions against.
    
            Returns:
                _BackfillSetupInfo including the `room_id` to test against and
                `depth_map` of events in the room
            """
            room_id = "!backfill-room-test:some-host"
    
            # The silly graph we use to test grabbing backward extremities,
            # where the top is the oldest events.
            #    1 (oldest)
            #    |
            #    2 ⹁
            #    |  \
            #    |   [b1, b2, b3]
            #    |   |
            #    |   A
            #    |  /
            #    3 {
            #    |  \
            #    |   [b4, b5, b6]
            #    |   |
            #    |   B
            #    |  /
            #    4 ´
            #    |
            #    5 (newest)
    
            event_graph: Dict[str, List[str]] = {
                "1": [],
                "2": ["1"],
                "3": ["2", "A"],
                "4": ["3", "B"],
                "5": ["4"],
                "A": ["b1", "b2", "b3"],
                "b1": ["2"],
                "b2": ["2"],
                "b3": ["2"],
                "B": ["b4", "b5", "b6"],
                "b4": ["3"],
                "b5": ["3"],
                "b6": ["3"],
            }
    
            depth_map: Dict[str, int] = {
                "1": 1,
                "2": 2,
                "b1": 3,
                "b2": 3,
                "b3": 3,
                "A": 4,
                "3": 5,
                "b4": 6,
                "b5": 6,
                "b6": 6,
                "B": 7,
                "4": 8,
                "5": 9,
            }
    
            # The events we have persisted on our server.
            # The rest are events in the room but not backfilled tet.
            our_server_events = {"5", "4", "B", "3", "A"}
    
            complete_event_dict_map: Dict[str, JsonDict] = {}
            stream_ordering = 0
            for (event_id, prev_event_ids) in event_graph.items():
                depth = depth_map[event_id]
    
                complete_event_dict_map[event_id] = {
                    "event_id": event_id,
                    "type": "test_regular_type",
                    "room_id": room_id,
                    "sender": "@sender",
                    "prev_event_ids": prev_event_ids,
                    "auth_event_ids": [],
                    "origin_server_ts": stream_ordering,
                    "depth": depth,
                    "stream_ordering": stream_ordering,
                    "content": {"body": "event" + event_id},
                }
    
                stream_ordering += 1
    
            def populate_db(txn: LoggingTransaction):
                # Insert the room to satisfy the foreign key constraint of
                # `event_failed_pull_attempts`
                self.store.db_pool.simple_insert_txn(
                    txn,
                    "rooms",
                    {
                        "room_id": room_id,
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                    },
                )
    
                # Insert our server events
                for event_id in our_server_events:
                    event_dict = complete_event_dict_map[event_id]
    
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_dict.get("event_id"),
                            "type": event_dict.get("type"),
                            "room_id": event_dict.get("room_id"),
                            "depth": event_dict.get("depth"),
                            "topological_ordering": event_dict.get("depth"),
                            "stream_ordering": event_dict.get("stream_ordering"),
                            "processed": True,
                            "outlier": False,
                        },
                    )
    
                # Insert the event edges
                for event_id in our_server_events:
                    for prev_event_id in event_graph[event_id]:
                        self.store.db_pool.simple_insert_txn(
                            txn,
                            table="event_edges",
                            values={
                                "event_id": event_id,
                                "prev_event_id": prev_event_id,
                                "room_id": room_id,
                            },
                        )
    
                # Insert the backward extremities
                prev_events_of_our_events = {
                    prev_event_id
                    for our_server_event in our_server_events
                    for prev_event_id in complete_event_dict_map[our_server_event][
                        "prev_event_ids"
                    ]
                }
                backward_extremities = prev_events_of_our_events - our_server_events
                for backward_extremity in backward_extremities:
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="event_backward_extremities",
                        values={
                            "event_id": backward_extremity,
                            "room_id": room_id,
                        },
                    )
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "_setup_room_for_backfill_tests_populate_db",
                    populate_db,
                )
            )
    
            return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
    
        def test_get_backfill_points_in_room(self):
            """
            Test to make sure we get some backfill points
            """
            setup_info = self._setup_room_for_backfill_tests()
            room_id = setup_info.room_id
    
            backfill_points = self.get_success(
                self.store.get_backfill_points_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            self.assertListEqual(
                backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
            )
    
        def test_get_backfill_points_in_room_excludes_events_we_have_attempted(
            self,
        ):
            """
            Test to make sure that events we have attempted to backfill (and within
            backoff timeout duration) do not show up as an event to backfill again.
            """
            setup_info = self._setup_room_for_backfill_tests()
            room_id = setup_info.room_id
    
            # Record some attempts to backfill these events which will make
            # `get_backfill_points_in_room` exclude them because we
            # haven't passed the backoff interval.
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b5", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b4", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b2", "fake cause")
            )
    
            # No time has passed since we attempted to backfill ^
    
            backfill_points = self.get_success(
                self.store.get_backfill_points_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            # Only the backfill points that we didn't record earlier exist here.
            self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
    
        def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
            self,
        ):
            """
            Test to make sure after we fake attempt to backfill event "b3" many times,
            we can see retry and see the "b3" again after the backoff timeout duration
            has exceeded.
            """
            setup_info = self._setup_room_for_backfill_tests()
            room_id = setup_info.room_id
    
            # Record some attempts to backfill these events which will make
            # `get_backfill_points_in_room` exclude them because we
            # haven't passed the backoff interval.
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
            )
            self.get_success(
                self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
            )
    
            # Now advance time by 2 hours and we should only be able to see "b3"
            # because we have waited long enough for the single attempt (2^1 hours)
            # but we still shouldn't see "b1" because we haven't waited long enough
            # for this many attempts. We didn't do anything to "b2" so it should be
            # visible regardless.
            self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
    
            # Make sure that "b1" is not in the list because we've
            # already attempted many times
            backfill_points = self.get_success(
                self.store.get_backfill_points_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"])
    
            # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
            # see if we can now backfill it
            self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
    
            # Try again after we advanced enough time and we should see "b3" again
            backfill_points = self.get_success(
                self.store.get_backfill_points_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            self.assertListEqual(
                backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
            )
    
        def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
            """
            Sets up a room with various insertion event backward extremities to test
            backfill functions against.
    
            Returns:
                _BackfillSetupInfo including the `room_id` to test against and
                `depth_map` of events in the room
            """
            room_id = "!backfill-room-test:some-host"
    
            depth_map: Dict[str, int] = {
                "1": 1,
                "2": 2,
                "insertion_eventA": 3,
                "3": 4,
                "insertion_eventB": 5,
                "4": 6,
                "5": 7,
            }
    
            def populate_db(txn: LoggingTransaction):
                # Insert the room to satisfy the foreign key constraint of
                # `event_failed_pull_attempts`
                self.store.db_pool.simple_insert_txn(
                    txn,
                    "rooms",
                    {
                        "room_id": room_id,
                        "creator": "room_creator_user_id",
                        "is_public": True,
                        "room_version": "6",
                    },
                )
    
                # Insert our server events
                stream_ordering = 0
                for event_id, depth in depth_map.items():
                    self.store.db_pool.simple_insert_txn(
                        txn,
                        table="events",
                        values={
                            "event_id": event_id,
                            "type": EventTypes.MSC2716_INSERTION
                            if event_id.startswith("insertion_event")
                            else "test_regular_type",
                            "room_id": room_id,
                            "depth": depth,
                            "topological_ordering": depth,
                            "stream_ordering": stream_ordering,
                            "processed": True,
                            "outlier": False,
                        },
                    )
    
                    if event_id.startswith("insertion_event"):
                        self.store.db_pool.simple_insert_txn(
                            txn,
                            table="insertion_event_extremities",
                            values={
                                "event_id": event_id,
                                "room_id": room_id,
                            },
                        )
    
                    stream_ordering += 1
    
            self.get_success(
                self.store.db_pool.runInteraction(
                    "_setup_room_for_insertion_backfill_tests_populate_db",
                    populate_db,
                )
            )
    
            return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
    
        def test_get_insertion_event_backward_extremities_in_room(self):
            """
            Test to make sure insertion event backward extremities are returned.
            """
            setup_info = self._setup_room_for_insertion_backfill_tests()
            room_id = setup_info.room_id
    
            backfill_points = self.get_success(
                self.store.get_insertion_event_backward_extremities_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            self.assertListEqual(
                backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
            )
    
        def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
            self,
        ):
            """
            Test to make sure that insertion events we have attempted to backfill
            (and within backoff timeout duration) do not show up as an event to
            backfill again.
            """
            setup_info = self._setup_room_for_insertion_backfill_tests()
            room_id = setup_info.room_id
    
            # Record some attempts to backfill these events which will make
            # `get_insertion_event_backward_extremities_in_room` exclude them
            # because we haven't passed the backoff interval.
            self.get_success(
                self.store.record_event_failed_pull_attempt(
                    room_id, "insertion_eventA", "fake cause"
                )
            )
    
            # No time has passed since we attempted to backfill ^
    
            backfill_points = self.get_success(
                self.store.get_insertion_event_backward_extremities_in_room(room_id)
            )
            backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
            # Only the backfill points that we didn't record earlier exist here.
            self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
    
        def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
            self,
        ):
            """
            Test to make sure after we fake attempt to backfill event
            "insertion_eventA" many times, we can see retry and see the
            "insertion_eventA" again after the backoff timeout duration has
            exceeded.
            """
            setup_info = self._setup_room_for_insertion_backfill_tests()
            room_id = setup_info.room_id
    
            # Record some attempts to backfill these events which will make
            # `get_backfill_points_in_room` exclude them because we
            # haven't passed the backoff interval.
            self.get_success(
                self.store.record_event_failed_pull_attempt(
                    room_id, "insertion_eventB", "fake cause"