diff --git a/changelog.d/18075.bugfix b/changelog.d/18075.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..95b486bed11a5f7390be8afc8503642ce00e7c52
--- /dev/null
+++ b/changelog.d/18075.bugfix
@@ -0,0 +1 @@
+Fix join being denied after being invited over federation. Also fixes other out-of-band membership transitions.
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index c208b900c53e3b31ebc727c540db7f54fb96390a..3fe344ac9374d1e7f591103b8353b5476550be51 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -566,6 +566,7 @@ def _is_membership_change_allowed(
     logger.debug(
         "_is_membership_change_allowed: %s",
         {
+            "caller_membership": caller.membership if caller else None,
             "caller_in_room": caller_in_room,
             "caller_invited": caller_invited,
             "caller_knocked": caller_knocked,
@@ -677,7 +678,8 @@ def _is_membership_change_allowed(
                 and join_rule == JoinRules.KNOCK_RESTRICTED
             )
         ):
-            if not caller_in_room and not caller_invited:
+            # You can only join the room if you are invited or are already in the room.
+            if not (caller_in_room or caller_invited):
                 raise AuthError(403, "You are not invited to this room.")
         else:
             # TODO (erikj): may_join list
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 2e56b671f06fc57fb87ef52180df2deea2c56c3a..8e9d27138cae8d8964852c0d14d6ef11b2ad0cfa 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -42,7 +42,7 @@ import attr
 from typing_extensions import Literal
 from unpaddedbase64 import encode_base64
 
-from synapse.api.constants import RelationTypes
+from synapse.api.constants import EventTypes, RelationTypes
 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
 from synapse.synapse_rust.events import EventInternalMetadata
 from synapse.types import JsonDict, StrCollection
@@ -325,12 +325,17 @@ class EventBase(metaclass=abc.ABCMeta):
     def __repr__(self) -> str:
         rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else ""
 
+        conditional_membership_string = ""
+        if self.get("type") == EventTypes.Member:
+            conditional_membership_string = f"membership={self.membership}, "
+
         return (
             f"<{self.__class__.__name__} "
             f"{rejection}"
             f"event_id={self.event_id}, "
             f"type={self.get('type')}, "
             f"state_key={self.get('state_key')}, "
+            f"{conditional_membership_string}"
             f"outlier={self.internal_metadata.is_outlier()}"
             ">"
         )
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 10ef01131b48eee8c9b593f24c426aeee4870f02..76df083d6913758a56c295a17854e8a1a12e728a 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 import attr
 from signedjson.types import SigningKey
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.room_versions import (
     KNOWN_EVENT_FORMAT_VERSIONS,
     EventFormatVersions,
@@ -109,6 +109,19 @@ class EventBuilder:
     def is_state(self) -> bool:
         return self._state_key is not None
 
+    def is_mine_id(self, user_id: str) -> bool:
+        """Determines whether a user ID or room alias originates from this homeserver.
+
+        Returns:
+            `True` if the hostname part of the user ID or room alias matches this
+            homeserver.
+            `False` otherwise, or if the user ID or room alias is malformed.
+        """
+        localpart_hostname = user_id.split(":", 1)
+        if len(localpart_hostname) < 2:
+            return False
+        return localpart_hostname[1] == self._hostname
+
     async def build(
         self,
         prev_event_ids: List[str],
@@ -142,6 +155,46 @@ class EventBuilder:
                 self, state_ids
             )
 
+            # Check for out-of-band membership that may have been exposed on `/sync` but
+            # the events have not been de-outliered yet so they won't be part of the
+            # room state yet.
+            #
+            # This helps in situations where a remote homeserver invites a local user to
+            # a room that we're already participating in; and we've persisted the invite
+            # as an out-of-band membership (outlier), but it hasn't been pushed to us as
+            # part of a `/send` transaction yet and de-outliered. This also helps for
+            # any of the other out-of-band membership transitions.
+            #
+            # As an optimization, we could check if the room state already includes a
+            # non-`leave` membership event, then we can assume the membership event has
+            # been de-outliered and we don't need to check for an out-of-band
+            # membership. But we don't have the necessary information from a
+            # `StateMap[str]` and we'll just have to take the hit of this extra lookup
+            # for any membership event for now.
+            if self.type == EventTypes.Member and self.is_mine_id(self.state_key):
+                (
+                    _membership,
+                    member_event_id,
+                ) = await self._store.get_local_current_membership_for_user_in_room(
+                    user_id=self.state_key,
+                    room_id=self.room_id,
+                )
+                # There is no need to check if the membership is actually an
+                # out-of-band membership (`outlier`) as we would end up with the
+                # same result either way (adding the member event to the
+                # `auth_event_ids`).
+                if (
+                    member_event_id is not None
+                    # We only need to be careful about duplicating the event in the
+                    # `auth_event_ids` list (duplicate `type`/`state_key` is part of the
+                    # authorization rules)
+                    and member_event_id not in auth_event_ids
+                ):
+                    auth_event_ids.append(member_event_id)
+                    # Also make sure to point to the previous membership event that will
+                    # allow this one to happen so the computed state works out.
+                    prev_event_ids.append(member_event_id)
+
         format_version = self.room_version.event_format
         # The types of auth/prev events changes between event versions.
         prev_events: Union[StrCollection, List[Tuple[str, Dict[str, str]]]]
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index c85deaed56216737e068c1c09abfe15cdbc05bcd..1b535ea2cb7fa8a1a7314261af17f05d397c49eb 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -2272,8 +2272,9 @@ class FederationEventHandler:
                 event_and_contexts, backfilled=backfilled
             )
 
-            # After persistence we always need to notify replication there may
-            # be new data.
+            # After persistence, we never notify clients (wake up `/sync` streams) about
+            # backfilled events but it's important to let all the workers know about any
+            # new event (backfilled or not) because TODO
             self._notifier.notify_replication()
 
             if self._ephemeral_messages_enabled:
diff --git a/synapse/server.py b/synapse/server.py
index 462e15cc2ff62911761dd9389cfe4204d8843b6f..bd2faa61b948085129280c0b1e5638551f0b8619 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -391,7 +391,7 @@ class HomeServer(metaclass=abc.ABCMeta):
     def is_mine(self, domain_specific_string: DomainSpecificString) -> bool:
         return domain_specific_string.domain == self.hostname
 
-    def is_mine_id(self, string: str) -> bool:
+    def is_mine_id(self, user_id: str) -> bool:
         """Determines whether a user ID or room alias originates from this homeserver.
 
         Returns:
@@ -399,7 +399,7 @@ class HomeServer(metaclass=abc.ABCMeta):
             homeserver.
             `False` otherwise, or if the user ID or room alias is malformed.
         """
-        localpart_hostname = string.split(":", 1)
+        localpart_hostname = user_id.split(":", 1)
         if len(localpart_hostname) < 2:
             return False
         return localpart_hostname[1] == self.hostname
diff --git a/tests/federation/test_federation_devices.py b/tests/federation/test_federation_devices.py
new file mode 100644
index 0000000000000000000000000000000000000000..ba27e6947980151ad290a1d23d60b2645ce25025
--- /dev/null
+++ b/tests/federation/test_federation_devices.py
@@ -0,0 +1,161 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+
+import logging
+from unittest.mock import AsyncMock, Mock
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.handlers.device import DeviceListUpdater
+from synapse.server import HomeServer
+from synapse.types import JsonDict
+from synapse.util import Clock
+from synapse.util.retryutils import NotRetryingDestination
+
+from tests import unittest
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceListResyncTestCase(unittest.HomeserverTestCase):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = self.hs.get_datastores().main
+
+    def test_retry_device_list_resync(self) -> None:
+        """Tests that device lists are marked as stale if they couldn't be synced, and
+        that stale device lists are retried periodically.
+        """
+        remote_user_id = "@john:test_remote"
+        remote_origin = "test_remote"
+
+        # Track the number of attempts to resync the user's device list.
+        self.resync_attempts = 0
+
+        # When this function is called, increment the number of resync attempts (only if
+        # we're querying devices for the right user ID), then raise a
+        # NotRetryingDestination error to fail the resync gracefully.
+        def query_user_devices(
+            destination: str, user_id: str, timeout: int = 30000
+        ) -> JsonDict:
+            if user_id == remote_user_id:
+                self.resync_attempts += 1
+
+            raise NotRetryingDestination(0, 0, destination)
+
+        # Register the mock on the federation client.
+        federation_client = self.hs.get_federation_client()
+        federation_client.query_user_devices = Mock(side_effect=query_user_devices)  # type: ignore[method-assign]
+
+        # Register a mock on the store so that the incoming update doesn't fail because
+        # we don't share a room with the user.
+        self.store.get_rooms_for_user = AsyncMock(return_value=["!someroom:test"])
+
+        # Manually inject a fake device list update. We need this update to include at
+        # least one prev_id so that the user's device list will need to be retried.
+        device_list_updater = self.hs.get_device_handler().device_list_updater
+        assert isinstance(device_list_updater, DeviceListUpdater)
+        self.get_success(
+            device_list_updater.incoming_device_list_update(
+                origin=remote_origin,
+                edu_content={
+                    "deleted": False,
+                    "device_display_name": "Mobile",
+                    "device_id": "QBUAZIFURK",
+                    "prev_id": [5],
+                    "stream_id": 6,
+                    "user_id": remote_user_id,
+                },
+            )
+        )
+
+        # Check that there was one resync attempt.
+        self.assertEqual(self.resync_attempts, 1)
+
+        # Check that the resync attempt failed and caused the user's device list to be
+        # marked as stale.
+        need_resync = self.get_success(
+            self.store.get_user_ids_requiring_device_list_resync()
+        )
+        self.assertIn(remote_user_id, need_resync)
+
+        # Check that waiting for 30 seconds caused Synapse to retry resyncing the device
+        # list.
+        self.reactor.advance(30)
+        self.assertEqual(self.resync_attempts, 2)
+
+    def test_cross_signing_keys_retry(self) -> None:
+        """Tests that resyncing a device list correctly processes cross-signing keys from
+        the remote server.
+        """
+        remote_user_id = "@john:test_remote"
+        remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+        remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
+
+        # Register mock device list retrieval on the federation client.
+        federation_client = self.hs.get_federation_client()
+        federation_client.query_user_devices = AsyncMock(  # type: ignore[method-assign]
+            return_value={
+                "user_id": remote_user_id,
+                "stream_id": 1,
+                "devices": [],
+                "master_key": {
+                    "user_id": remote_user_id,
+                    "usage": ["master"],
+                    "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                },
+                "self_signing_key": {
+                    "user_id": remote_user_id,
+                    "usage": ["self_signing"],
+                    "keys": {
+                        "ed25519:" + remote_self_signing_key: remote_self_signing_key
+                    },
+                },
+            }
+        )
+
+        # Resync the device list.
+        device_handler = self.hs.get_device_handler()
+        self.get_success(
+            device_handler.device_list_updater.multi_user_device_resync(
+                [remote_user_id]
+            ),
+        )
+
+        # Retrieve the cross-signing keys for this user.
+        keys = self.get_success(
+            self.store.get_e2e_cross_signing_keys_bulk(user_ids=[remote_user_id]),
+        )
+        self.assertIn(remote_user_id, keys)
+        key = keys[remote_user_id]
+        assert key is not None
+
+        # Check that the master key is the one returned by the mock.
+        master_key = key["master"]
+        self.assertEqual(len(master_key["keys"]), 1)
+        self.assertTrue("ed25519:" + remote_master_key in master_key["keys"].keys())
+        self.assertTrue(remote_master_key in master_key["keys"].values())
+
+        # Check that the self-signing key is the one returned by the mock.
+        self_signing_key = key["self_signing"]
+        self.assertEqual(len(self_signing_key["keys"]), 1)
+        self.assertTrue(
+            "ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
+        )
+        self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
diff --git a/tests/federation/test_federation_out_of_band_membership.py b/tests/federation/test_federation_out_of_band_membership.py
new file mode 100644
index 0000000000000000000000000000000000000000..a4a266cf06d009988458eb4c7746983ea4e0f33e
--- /dev/null
+++ b/tests/federation/test_federation_out_of_band_membership.py
@@ -0,0 +1,671 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+# Copyright (C) 2023 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+
+import logging
+import time
+import urllib.parse
+from http import HTTPStatus
+from typing import Any, Callable, Optional, Set, Tuple, TypeVar, Union
+from unittest.mock import Mock
+
+import attr
+from parameterized import parameterized
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.room_versions import RoomVersion, RoomVersions
+from synapse.events import EventBase, make_event_from_dict
+from synapse.events.utils import strip_event
+from synapse.federation.federation_base import (
+    event_from_pdu_json,
+)
+from synapse.federation.transport.client import SendJoinResponse
+from synapse.http.matrixfederationclient import (
+    ByteParser,
+)
+from synapse.http.types import QueryParams
+from synapse.rest import admin
+from synapse.rest.client import login, room, sync
+from synapse.server import HomeServer
+from synapse.types import JsonDict, MutableStateMap, StateMap
+from synapse.types.handlers.sliding_sync import (
+    StateValues,
+)
+from synapse.util import Clock
+
+from tests import unittest
+from tests.utils import test_timeout
+
+logger = logging.getLogger(__name__)
+
+
+def required_state_json_to_state_map(required_state: Any) -> StateMap[EventBase]:
+    state_map: MutableStateMap[EventBase] = {}
+
+    # Scrutinize JSON values to ensure it's in the expected format
+    if isinstance(required_state, list):
+        for state_event_dict in required_state:
+            # Yell because we're in a test and this is unexpected
+            assert isinstance(
+                state_event_dict, dict
+            ), "`required_state` should be a list of event dicts"
+
+            event_type = state_event_dict["type"]
+            event_state_key = state_event_dict["state_key"]
+
+            # Yell because we're in a test and this is unexpected
+            assert isinstance(
+                event_type, str
+            ), "Each event in `required_state` should have a string `type`"
+            assert isinstance(
+                event_state_key, str
+            ), "Each event in `required_state` should have a string `state_key`"
+
+            state_map[(event_type, event_state_key)] = make_event_from_dict(
+                state_event_dict
+            )
+    else:
+        # Yell because we're in a test and this is unexpected
+        raise AssertionError("`required_state` should be a list of event dicts")
+
+    return state_map
+
+
+@attr.s(slots=True, auto_attribs=True)
+class RemoteRoomJoinResult:
+    remote_room_id: str
+    room_version: RoomVersion
+    remote_room_creator_user_id: str
+    local_user1_id: str
+    local_user1_tok: str
+    state_map: StateMap[EventBase]
+
+
+class OutOfBandMembershipTests(unittest.FederatingHomeserverTestCase):
+    """
+    Tests to make sure that interactions with out-of-band membership (outliers) works as
+    expected.
+
+     - invites received over federation, before we join the room
+     - *rejections* for said invites
+
+    See the "Out-of-band membership events" section in
+    `docs/development/room-dag-concepts.md` for more information.
+    """
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+        sync.register_servlets,
+    ]
+
+    sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+
+    def default_config(self) -> JsonDict:
+        conf = super().default_config()
+        # Federation sending is disabled by default in the test environment
+        # so we need to enable it like this.
+        conf["federation_sender_instances"] = ["master"]
+
+        return conf
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        self.federation_http_client = Mock(
+            # The problem with using `spec=MatrixFederationHttpClient` here is that it
+            # requires everything to be mocked which is a lot of work that I don't want
+            # to do when the code only uses a few methods (`get_json` and `put_json`).
+        )
+        return self.setup_test_homeserver(
+            federation_http_client=self.federation_http_client
+        )
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        super().prepare(reactor, clock, hs)
+
+        self.store = self.hs.get_datastores().main
+        self.storage_controllers = hs.get_storage_controllers()
+
+    def do_sync(
+        self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
+    ) -> Tuple[JsonDict, str]:
+        """Do a sliding sync request with given body.
+
+        Asserts the request was successful.
+
+        Attributes:
+            sync_body: The full request body to use
+            since: Optional since token
+            tok: Access token to use
+
+        Returns:
+            A tuple of the response body and the `pos` field.
+        """
+
+        sync_path = self.sync_endpoint
+        if since:
+            sync_path += f"?pos={since}"
+
+        channel = self.make_request(
+            method="POST",
+            path=sync_path,
+            content=sync_body,
+            access_token=tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        return channel.json_body, channel.json_body["pos"]
+
+    def _invite_local_user_to_remote_room_and_join(self) -> RemoteRoomJoinResult:
+        """
+        Helper to reproduce this scenario:
+
+         1. The remote user invites our local user to a room on their remote server (which
+        creates an out-of-band invite membership for user1 on our local server).
+         2. The local user notices the invite from `/sync`.
+         3. The local user joins the room.
+         4. The local user can see that they are now joined to the room from `/sync`.
+        """
+
+        # Create a local user
+        local_user1_id = self.register_user("user1", "pass")
+        local_user1_tok = self.login(local_user1_id, "pass")
+
+        # Create a remote room
+        room_creator_user_id = f"@remote-user:{self.OTHER_SERVER_NAME}"
+        remote_room_id = f"!remote-room:{self.OTHER_SERVER_NAME}"
+        room_version = RoomVersions.V10
+
+        room_create_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": remote_room_id,
+                    "sender": room_creator_user_id,
+                    "depth": 1,
+                    "origin_server_ts": 1,
+                    "type": EventTypes.Create,
+                    "state_key": "",
+                    "content": {
+                        # The `ROOM_CREATOR` field could be removed if we used a room
+                        # version > 10 (in favor of relying on `sender`)
+                        EventContentFields.ROOM_CREATOR: room_creator_user_id,
+                        EventContentFields.ROOM_VERSION: room_version.identifier,
+                    },
+                    "auth_events": [],
+                    "prev_events": [],
+                }
+            ),
+            room_version=room_version,
+        )
+
+        creator_membership_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": remote_room_id,
+                    "sender": room_creator_user_id,
+                    "depth": 2,
+                    "origin_server_ts": 2,
+                    "type": EventTypes.Member,
+                    "state_key": room_creator_user_id,
+                    "content": {"membership": Membership.JOIN},
+                    "auth_events": [room_create_event.event_id],
+                    "prev_events": [room_create_event.event_id],
+                }
+            ),
+            room_version=room_version,
+        )
+
+        # From the remote homeserver, invite user1 on the local homserver
+        user1_invite_membership_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": remote_room_id,
+                    "sender": room_creator_user_id,
+                    "depth": 3,
+                    "origin_server_ts": 3,
+                    "type": EventTypes.Member,
+                    "state_key": local_user1_id,
+                    "content": {"membership": Membership.INVITE},
+                    "auth_events": [
+                        room_create_event.event_id,
+                        creator_membership_event.event_id,
+                    ],
+                    "prev_events": [creator_membership_event.event_id],
+                }
+            ),
+            room_version=room_version,
+        )
+        channel = self.make_signed_federation_request(
+            "PUT",
+            f"/_matrix/federation/v2/invite/{remote_room_id}/{user1_invite_membership_event.event_id}",
+            content={
+                "event": user1_invite_membership_event.get_dict(),
+                "invite_room_state": [
+                    strip_event(room_create_event),
+                ],
+                "room_version": room_version.identifier,
+            },
+        )
+        self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [(EventTypes.Member, StateValues.WILDCARD)],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        # Sync until the local user1 can see the invite
+        with test_timeout(
+            3,
+            "Unable to find user1's invite event in the room",
+        ):
+            while True:
+                response_body, _ = self.do_sync(sync_body, tok=local_user1_tok)
+                if (
+                    remote_room_id in response_body["rooms"].keys()
+                    # If they have `invite_state` for the room, they are invited
+                    and len(
+                        response_body["rooms"][remote_room_id].get("invite_state", [])
+                    )
+                    > 0
+                ):
+                    break
+
+                # Prevent tight-looping to allow the `test_timeout` to work
+                time.sleep(0.1)
+
+        user1_join_membership_event_template = make_event_from_dict(
+            {
+                "room_id": remote_room_id,
+                "sender": local_user1_id,
+                "depth": 4,
+                "origin_server_ts": 4,
+                "type": EventTypes.Member,
+                "state_key": local_user1_id,
+                "content": {"membership": Membership.JOIN},
+                "auth_events": [
+                    room_create_event.event_id,
+                    user1_invite_membership_event.event_id,
+                ],
+                "prev_events": [user1_invite_membership_event.event_id],
+            },
+            room_version=room_version,
+        )
+
+        T = TypeVar("T")
+
+        # Mock the remote homeserver responding to our HTTP requests
+        #
+        # We're going to mock the following endpoints so that user1 can join the remote room:
+        # - GET /_matrix/federation/v1/make_join/{room_id}/{user_id}
+        # - PUT /_matrix/federation/v2/send_join/{room_id}/{user_id}
+        #
+        async def get_json(
+            destination: str,
+            path: str,
+            args: Optional[QueryParams] = None,
+            retry_on_dns_fail: bool = True,
+            timeout: Optional[int] = None,
+            ignore_backoff: bool = False,
+            try_trailing_slash_on_400: bool = False,
+            parser: Optional[ByteParser[T]] = None,
+        ) -> Union[JsonDict, T]:
+            if (
+                path
+                == f"/_matrix/federation/v1/make_join/{urllib.parse.quote_plus(remote_room_id)}/{urllib.parse.quote_plus(local_user1_id)}"
+            ):
+                return {
+                    "event": user1_join_membership_event_template.get_pdu_json(),
+                    "room_version": room_version.identifier,
+                }
+
+            raise NotImplementedError(
+                "We have not mocked a response for `get_json(...)` for the following endpoint yet: "
+                + f"{destination}{path}"
+            )
+
+        self.federation_http_client.get_json.side_effect = get_json
+
+        # PDU's that hs1 sent to hs2
+        collected_pdus_from_hs1_federation_send: Set[str] = set()
+
+        async def put_json(
+            destination: str,
+            path: str,
+            args: Optional[QueryParams] = None,
+            data: Optional[JsonDict] = None,
+            json_data_callback: Optional[Callable[[], JsonDict]] = None,
+            long_retries: bool = False,
+            timeout: Optional[int] = None,
+            ignore_backoff: bool = False,
+            backoff_on_404: bool = False,
+            try_trailing_slash_on_400: bool = False,
+            parser: Optional[ByteParser[T]] = None,
+            backoff_on_all_error_codes: bool = False,
+        ) -> Union[JsonDict, T, SendJoinResponse]:
+            if (
+                path.startswith(
+                    f"/_matrix/federation/v2/send_join/{urllib.parse.quote_plus(remote_room_id)}/"
+                )
+                and data is not None
+                and data.get("type") == EventTypes.Member
+                and data.get("state_key") == local_user1_id
+                # We're assuming this is a `ByteParser[SendJoinResponse]`
+                and parser is not None
+            ):
+                # As the remote server, we need to sign the event before sending it back
+                user1_join_membership_event_signed = make_event_from_dict(
+                    self.add_hashes_and_signatures_from_other_server(data),
+                    room_version=room_version,
+                )
+
+                # Since they passed in a `parser`, we need to return the type that
+                # they're expecting instead of just a `JsonDict`
+                return SendJoinResponse(
+                    auth_events=[
+                        room_create_event,
+                        user1_invite_membership_event,
+                    ],
+                    state=[
+                        room_create_event,
+                        creator_membership_event,
+                        user1_invite_membership_event,
+                    ],
+                    event_dict=user1_join_membership_event_signed.get_pdu_json(),
+                    event=user1_join_membership_event_signed,
+                    members_omitted=False,
+                    servers_in_room=[
+                        self.OTHER_SERVER_NAME,
+                    ],
+                )
+
+            if path.startswith("/_matrix/federation/v1/send/") and data is not None:
+                for pdu in data.get("pdus", []):
+                    event = event_from_pdu_json(pdu, room_version)
+                    collected_pdus_from_hs1_federation_send.add(event.event_id)
+
+                # Just acknowledge everything hs1 is trying to send hs2
+                return {
+                    event_from_pdu_json(pdu, room_version).event_id: {}
+                    for pdu in data.get("pdus", [])
+                }
+
+            raise NotImplementedError(
+                "We have not mocked a response for `put_json(...)` for the following endpoint yet: "
+                + f"{destination}{path} with the following body data: {data}"
+            )
+
+        self.federation_http_client.put_json.side_effect = put_json
+
+        # User1 joins the room
+        self.helper.join(remote_room_id, local_user1_id, tok=local_user1_tok)
+
+        # Reset the mocks now that user1 has joined the room
+        self.federation_http_client.get_json.side_effect = None
+        self.federation_http_client.put_json.side_effect = None
+
+        # Sync until the local user1 can see that they are now joined to the room
+        with test_timeout(
+            3,
+            "Unable to find user1's join event in the room",
+        ):
+            while True:
+                response_body, _ = self.do_sync(sync_body, tok=local_user1_tok)
+                if remote_room_id in response_body["rooms"].keys():
+                    required_state_map = required_state_json_to_state_map(
+                        response_body["rooms"][remote_room_id]["required_state"]
+                    )
+                    if (
+                        required_state_map.get((EventTypes.Member, local_user1_id))
+                        is not None
+                    ):
+                        break
+
+                # Prevent tight-looping to allow the `test_timeout` to work
+                time.sleep(0.1)
+
+        # Nothing needs to be sent from hs1 to hs2 since we already let the other
+        # homeserver know by doing the `/make_join` and `/send_join` dance.
+        self.assertIncludes(
+            collected_pdus_from_hs1_federation_send,
+            set(),
+            exact=True,
+            message="Didn't expect any events to be sent from hs1 over federation to hs2",
+        )
+
+        return RemoteRoomJoinResult(
+            remote_room_id=remote_room_id,
+            room_version=room_version,
+            remote_room_creator_user_id=room_creator_user_id,
+            local_user1_id=local_user1_id,
+            local_user1_tok=local_user1_tok,
+            state_map=self.get_success(
+                self.storage_controllers.state.get_current_state(remote_room_id)
+            ),
+        )
+
+    def test_can_join_from_out_of_band_invite(self) -> None:
+        """
+        Test to make sure that we can join a room that we were invited to over
+        federation; even if our server has never participated in the room before.
+        """
+        self._invite_local_user_to_remote_room_and_join()
+
+    @parameterized.expand(
+        [("accept invite", Membership.JOIN), ("reject invite", Membership.LEAVE)]
+    )
+    def test_can_x_from_out_of_band_invite_after_we_are_already_participating_in_the_room(
+        self, _test_description: str, membership_action: str
+    ) -> None:
+        """
+        Test to make sure that we can do either a) join the room (accept the invite) or
+        b) reject the invite after being invited to over federation; even if we are
+        already participating in the room.
+
+        This is a regression test to make sure we stress the scenario where even though
+        we are already participating in the room, local users can still react to invites
+        regardless of whether the remote server has told us about the invite event (via
+        a federation `/send` transaction) and we have de-outliered the invite event.
+        Previously, we would mistakenly throw an error saying the user wasn't in the
+        room when they tried to join or reject the invite.
+        """
+        remote_room_join_result = self._invite_local_user_to_remote_room_and_join()
+        remote_room_id = remote_room_join_result.remote_room_id
+        room_version = remote_room_join_result.room_version
+
+        # Create another local user
+        local_user2_id = self.register_user("user2", "pass")
+        local_user2_tok = self.login(local_user2_id, "pass")
+
+        T = TypeVar("T")
+
+        # PDU's that hs1 sent to hs2
+        collected_pdus_from_hs1_federation_send: Set[str] = set()
+
+        async def put_json(
+            destination: str,
+            path: str,
+            args: Optional[QueryParams] = None,
+            data: Optional[JsonDict] = None,
+            json_data_callback: Optional[Callable[[], JsonDict]] = None,
+            long_retries: bool = False,
+            timeout: Optional[int] = None,
+            ignore_backoff: bool = False,
+            backoff_on_404: bool = False,
+            try_trailing_slash_on_400: bool = False,
+            parser: Optional[ByteParser[T]] = None,
+            backoff_on_all_error_codes: bool = False,
+        ) -> Union[JsonDict, T]:
+            if path.startswith("/_matrix/federation/v1/send/") and data is not None:
+                for pdu in data.get("pdus", []):
+                    event = event_from_pdu_json(pdu, room_version)
+                    collected_pdus_from_hs1_federation_send.add(event.event_id)
+
+                # Just acknowledge everything hs1 is trying to send hs2
+                return {
+                    event_from_pdu_json(pdu, room_version).event_id: {}
+                    for pdu in data.get("pdus", [])
+                }
+
+            raise NotImplementedError(
+                "We have not mocked a response for `put_json(...)` for the following endpoint yet: "
+                + f"{destination}{path} with the following body data: {data}"
+            )
+
+        self.federation_http_client.put_json.side_effect = put_json
+
+        # From the remote homeserver, invite user2 on the local homserver
+        user2_invite_membership_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": remote_room_id,
+                    "sender": remote_room_join_result.remote_room_creator_user_id,
+                    "depth": 5,
+                    "origin_server_ts": 5,
+                    "type": EventTypes.Member,
+                    "state_key": local_user2_id,
+                    "content": {"membership": Membership.INVITE},
+                    "auth_events": [
+                        remote_room_join_result.state_map[
+                            (EventTypes.Create, "")
+                        ].event_id,
+                        remote_room_join_result.state_map[
+                            (
+                                EventTypes.Member,
+                                remote_room_join_result.remote_room_creator_user_id,
+                            )
+                        ].event_id,
+                    ],
+                    "prev_events": [
+                        remote_room_join_result.state_map[
+                            (EventTypes.Member, remote_room_join_result.local_user1_id)
+                        ].event_id
+                    ],
+                }
+            ),
+            room_version=room_version,
+        )
+        channel = self.make_signed_federation_request(
+            "PUT",
+            f"/_matrix/federation/v2/invite/{remote_room_id}/{user2_invite_membership_event.event_id}",
+            content={
+                "event": user2_invite_membership_event.get_dict(),
+                "invite_room_state": [
+                    strip_event(
+                        remote_room_join_result.state_map[(EventTypes.Create, "")]
+                    ),
+                ],
+                "room_version": room_version.identifier,
+            },
+        )
+        self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [(EventTypes.Member, StateValues.WILDCARD)],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        # Sync until the local user2 can see the invite
+        with test_timeout(
+            3,
+            "Unable to find user2's invite event in the room",
+        ):
+            while True:
+                response_body, _ = self.do_sync(sync_body, tok=local_user2_tok)
+                if (
+                    remote_room_id in response_body["rooms"].keys()
+                    # If they have `invite_state` for the room, they are invited
+                    and len(
+                        response_body["rooms"][remote_room_id].get("invite_state", [])
+                    )
+                    > 0
+                ):
+                    break
+
+                # Prevent tight-looping to allow the `test_timeout` to work
+                time.sleep(0.1)
+
+        if membership_action == Membership.JOIN:
+            # User2 joins the room
+            join_event = self.helper.join(
+                remote_room_join_result.remote_room_id,
+                local_user2_id,
+                tok=local_user2_tok,
+            )
+            expected_pdu_event_id = join_event["event_id"]
+        elif membership_action == Membership.LEAVE:
+            # User2 rejects the invite
+            leave_event = self.helper.leave(
+                remote_room_join_result.remote_room_id,
+                local_user2_id,
+                tok=local_user2_tok,
+            )
+            expected_pdu_event_id = leave_event["event_id"]
+        else:
+            raise NotImplementedError(
+                "This test does not support this membership action yet"
+            )
+
+        # Sync until the local user2 can see their new membership in the room
+        with test_timeout(
+            3,
+            "Unable to find user2's new membership event in the room",
+        ):
+            while True:
+                response_body, _ = self.do_sync(sync_body, tok=local_user2_tok)
+                if membership_action == Membership.JOIN:
+                    if remote_room_id in response_body["rooms"].keys():
+                        required_state_map = required_state_json_to_state_map(
+                            response_body["rooms"][remote_room_id]["required_state"]
+                        )
+                        if (
+                            required_state_map.get((EventTypes.Member, local_user2_id))
+                            is not None
+                        ):
+                            break
+                elif membership_action == Membership.LEAVE:
+                    if remote_room_id not in response_body["rooms"].keys():
+                        break
+                else:
+                    raise NotImplementedError(
+                        "This test does not support this membership action yet"
+                    )
+
+                # Prevent tight-looping to allow the `test_timeout` to work
+                time.sleep(0.1)
+
+        # Make sure that we let hs2 know about the new membership event
+        self.assertIncludes(
+            collected_pdus_from_hs1_federation_send,
+            {expected_pdu_event_id},
+            exact=True,
+            message="Expected to find the event ID of the user2 membership to be sent from hs1 over federation to hs2",
+        )
diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py
index 88261450b1fe273fe106058fcade2279caca56a9..42dc8447349648119b37772b687505a99faef17f 100644
--- a/tests/federation/test_federation_server.py
+++ b/tests/federation/test_federation_server.py
@@ -20,14 +20,21 @@
 #
 import logging
 from http import HTTPStatus
+from typing import Optional, Union
+from unittest.mock import Mock
 
 from parameterized import parameterized
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import FederationError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.config.server import DEFAULT_ROOM_VERSION
 from synapse.events import EventBase, make_event_from_dict
+from synapse.federation.federation_base import event_from_pdu_json
+from synapse.http.types import QueryParams
+from synapse.logging.context import LoggingContext
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
@@ -85,6 +92,163 @@ class FederationServerTests(unittest.FederatingHomeserverTestCase):
         self.assertEqual(500, channel.code, channel.result)
 
 
+def _create_acl_event(content: JsonDict) -> EventBase:
+    return make_event_from_dict(
+        {
+            "room_id": "!a:b",
+            "event_id": "$a:b",
+            "type": "m.room.server_acls",
+            "sender": "@a:b",
+            "content": content,
+        }
+    )
+
+
+class MessageAcceptTests(unittest.FederatingHomeserverTestCase):
+    """
+    Tests to make sure that we don't accept flawed events from federation (incoming).
+    """
+
+    servlets = [
+        admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        self.http_client = Mock()
+        return self.setup_test_homeserver(federation_http_client=self.http_client)
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        super().prepare(reactor, clock, hs)
+
+        self.store = self.hs.get_datastores().main
+        self.storage_controllers = hs.get_storage_controllers()
+        self.federation_event_handler = self.hs.get_federation_event_handler()
+
+        # Create a local room
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        self.room_id = self.helper.create_room_as(
+            user1_id, tok=user1_tok, is_public=True
+        )
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(self.room_id)
+        )
+
+        # Figure out what the forward extremities in the room are (the most recent
+        # events that aren't tied into the DAG)
+        forward_extremity_event_ids = self.get_success(
+            self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
+        )
+
+        # Join a remote user to the room that will attempt to send bad events
+        self.remote_bad_user_id = f"@baduser:{self.OTHER_SERVER_NAME}"
+        self.remote_bad_user_join_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": self.room_id,
+                    "sender": self.remote_bad_user_id,
+                    "state_key": self.remote_bad_user_id,
+                    "depth": 1000,
+                    "origin_server_ts": 1,
+                    "type": EventTypes.Member,
+                    "content": {"membership": Membership.JOIN},
+                    "auth_events": [
+                        state_map[(EventTypes.Create, "")].event_id,
+                        state_map[(EventTypes.JoinRules, "")].event_id,
+                    ],
+                    "prev_events": list(forward_extremity_event_ids),
+                }
+            ),
+            room_version=RoomVersions.V10,
+        )
+
+        # Send the join, it should return None (which is not an error)
+        self.assertEqual(
+            self.get_success(
+                self.federation_event_handler.on_receive_pdu(
+                    self.OTHER_SERVER_NAME, self.remote_bad_user_join_event
+                )
+            ),
+            None,
+        )
+
+        # Make sure we actually joined the room
+        self.assertEqual(
+            self.get_success(self.store.get_latest_event_ids_in_room(self.room_id)),
+            {self.remote_bad_user_join_event.event_id},
+        )
+
+    def test_cant_hide_direct_ancestors(self) -> None:
+        """
+        If you send a message, you must be able to provide the direct
+        prev_events that said event references.
+        """
+
+        async def post_json(
+            destination: str,
+            path: str,
+            data: Optional[JsonDict] = None,
+            long_retries: bool = False,
+            timeout: Optional[int] = None,
+            ignore_backoff: bool = False,
+            args: Optional[QueryParams] = None,
+        ) -> Union[JsonDict, list]:
+            # If it asks us for new missing events, give them NOTHING
+            if path.startswith("/_matrix/federation/v1/get_missing_events/"):
+                return {"events": []}
+            return {}
+
+        self.http_client.post_json = post_json
+
+        # Figure out what the forward extremities in the room are (the most recent
+        # events that aren't tied into the DAG)
+        forward_extremity_event_ids = self.get_success(
+            self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
+        )
+
+        # Now lie about an event's prev_events
+        lying_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "room_id": self.room_id,
+                    "sender": self.remote_bad_user_id,
+                    "depth": 1000,
+                    "origin_server_ts": 1,
+                    "type": "m.room.message",
+                    "content": {"body": "hewwo?"},
+                    "auth_events": [],
+                    "prev_events": ["$missing_prev_event"]
+                    + list(forward_extremity_event_ids),
+                }
+            ),
+            room_version=RoomVersions.V10,
+        )
+
+        with LoggingContext("test-context"):
+            failure = self.get_failure(
+                self.federation_event_handler.on_receive_pdu(
+                    self.OTHER_SERVER_NAME, lying_event
+                ),
+                FederationError,
+            )
+
+        # on_receive_pdu should throw an error
+        self.assertEqual(
+            failure.value.args[0],
+            (
+                "ERROR 403: Your server isn't divulging details about prev_events "
+                "referenced in this event."
+            ),
+        )
+
+        # Make sure the invalid event isn't there
+        extrem = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
+        self.assertEqual(extrem, {self.remote_bad_user_join_event.event_id})
+
+
 class ServerACLsTestCase(unittest.TestCase):
     def test_blocked_server(self) -> None:
         e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
@@ -355,13 +519,76 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
     #   is probably sufficient to reassure that the bucket is updated.
 
 
-def _create_acl_event(content: JsonDict) -> EventBase:
-    return make_event_from_dict(
-        {
-            "room_id": "!a:b",
-            "event_id": "$a:b",
-            "type": "m.room.server_acls",
-            "sender": "@a:b",
-            "content": content,
+class StripUnsignedFromEventsTestCase(unittest.TestCase):
+    """
+    Test to make sure that we handle the raw JSON events from federation carefully and
+    strip anything that shouldn't be there.
+    """
+
+    def test_strip_unauthorized_unsigned_values(self) -> None:
+        event1 = {
+            "sender": "@baduser:test.serv",
+            "state_key": "@baduser:test.serv",
+            "event_id": "$event1:test.serv",
+            "depth": 1000,
+            "origin_server_ts": 1,
+            "type": "m.room.member",
+            "origin": "test.servx",
+            "content": {"membership": "join"},
+            "auth_events": [],
+            "unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"},
         }
-    )
+        filtered_event = event_from_pdu_json(event1, RoomVersions.V1)
+        # Make sure unauthorized fields are stripped from unsigned
+        self.assertNotIn("more warez", filtered_event.unsigned)
+
+    def test_strip_event_maintains_allowed_fields(self) -> None:
+        event2 = {
+            "sender": "@baduser:test.serv",
+            "state_key": "@baduser:test.serv",
+            "event_id": "$event2:test.serv",
+            "depth": 1000,
+            "origin_server_ts": 1,
+            "type": "m.room.member",
+            "origin": "test.servx",
+            "auth_events": [],
+            "content": {"membership": "join"},
+            "unsigned": {
+                "malicious garbage": "hackz",
+                "more warez": "more hackz",
+                "age": 14,
+                "invite_room_state": [],
+            },
+        }
+
+        filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1)
+        self.assertIn("age", filtered_event2.unsigned)
+        self.assertEqual(14, filtered_event2.unsigned["age"])
+        self.assertNotIn("more warez", filtered_event2.unsigned)
+        # Invite_room_state is allowed in events of type m.room.member
+        self.assertIn("invite_room_state", filtered_event2.unsigned)
+        self.assertEqual([], filtered_event2.unsigned["invite_room_state"])
+
+    def test_strip_event_removes_fields_based_on_event_type(self) -> None:
+        event3 = {
+            "sender": "@baduser:test.serv",
+            "state_key": "@baduser:test.serv",
+            "event_id": "$event3:test.serv",
+            "depth": 1000,
+            "origin_server_ts": 1,
+            "type": "m.room.power_levels",
+            "origin": "test.servx",
+            "content": {},
+            "auth_events": [],
+            "unsigned": {
+                "malicious garbage": "hackz",
+                "more warez": "more hackz",
+                "age": 14,
+                "invite_room_state": [],
+            },
+        }
+        filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1)
+        self.assertIn("age", filtered_event3.unsigned)
+        # Invite_room_state field is only permitted in event type m.room.member
+        self.assertNotIn("invite_room_state", filtered_event3.unsigned)
+        self.assertNotIn("more warez", filtered_event3.unsigned)
diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 5db10fa74c2f616c880e72a9f3c9eae8dbc20c2a..61b0efb87e2dd78f189b834c5128653a2f2c7127 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -375,7 +375,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
 
         In this test, we pretend we are processing a "pulled" event via
         backfill. The pulled event succesfully processes and the backward
-        extremeties are updated along with clearing out any failed pull attempts
+        extremities are updated along with clearing out any failed pull attempts
         for those old extremities.
 
         We check that we correctly cleared failed pull attempts of the
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 4cf048f0dfb635bd92d04ae23a7c9936be6d66e6..6b7bf112c21585e4fcc887b1c7c9424896c1237f 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -23,14 +23,21 @@ from typing import Optional, cast
 from unittest.mock import Mock, call
 
 from parameterized import parameterized
-from signedjson.key import generate_signing_key
+from signedjson.key import (
+    encode_verify_key_base64,
+    generate_signing_key,
+    get_verify_key,
+)
 
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.presence import UserDevicePresenceState, UserPresenceState
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events.builder import EventBuilder
+from synapse.api.room_versions import (
+    RoomVersion,
+)
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.events import EventBase, make_event_from_dict
 from synapse.federation.sender import FederationSender
 from synapse.handlers.presence import (
     BUSY_ONLINE_TIMEOUT,
@@ -48,6 +55,7 @@ from synapse.rest import admin
 from synapse.rest.client import login, room, sync
 from synapse.server import HomeServer
 from synapse.storage.database import LoggingDatabaseConnection
+from synapse.storage.keys import FetchKeyResult
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util import Clock
 
@@ -1926,6 +1934,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
         # self.event_builder_for_2.hostname = "test2"
 
         self.store = hs.get_datastores().main
+        self.storage_controllers = hs.get_storage_controllers()
         self.state = hs.get_state_handler()
         self._event_auth_handler = hs.get_event_auth_handler()
 
@@ -2041,29 +2050,35 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
 
         hostname = get_domain_from_id(user_id)
 
-        room_version = self.get_success(self.store.get_room_version_id(room_id))
+        room_version = self.get_success(self.store.get_room_version(room_id))
 
-        builder = EventBuilder(
-            state=self.state,
-            event_auth_handler=self._event_auth_handler,
-            store=self.store,
-            clock=self.clock,
-            hostname=hostname,
-            signing_key=self.random_signing_key,
-            room_version=KNOWN_ROOM_VERSIONS[room_version],
-            room_id=room_id,
-            type=EventTypes.Member,
-            sender=user_id,
-            state_key=user_id,
-            content={"membership": Membership.JOIN},
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id)
         )
 
-        prev_event_ids = self.get_success(
-            self.store.get_latest_event_ids_in_room(room_id)
+        # Figure out what the forward extremities in the room are (the most recent
+        # events that aren't tied into the DAG)
+        forward_extremity_event_ids = self.get_success(
+            self.hs.get_datastores().main.get_latest_event_ids_in_room(room_id)
         )
 
-        event = self.get_success(
-            builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
+        event = self.create_fake_event_from_remote_server(
+            remote_server_name=hostname,
+            event_dict={
+                "room_id": room_id,
+                "sender": user_id,
+                "type": EventTypes.Member,
+                "state_key": user_id,
+                "depth": 1000,
+                "origin_server_ts": 1,
+                "content": {"membership": Membership.JOIN},
+                "auth_events": [
+                    state_map[(EventTypes.Create, "")].event_id,
+                    state_map[(EventTypes.JoinRules, "")].event_id,
+                ],
+                "prev_events": list(forward_extremity_event_ids),
+            },
+            room_version=room_version,
         )
 
         self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
@@ -2071,3 +2086,50 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
         # Check that it was successfully persisted.
         self.get_success(self.store.get_event(event.event_id))
         self.get_success(self.store.get_event(event.event_id))
+
+    def create_fake_event_from_remote_server(
+        self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
+    ) -> EventBase:
+        """
+        This is similar to what `FederatingHomeserverTestCase` is doing but we don't
+        need all of the extra baggage and we want to be able to create an event from
+        many remote servers.
+        """
+
+        # poke the other server's signing key into the key store, so that we don't
+        # make requests for it
+        other_server_signature_key = generate_signing_key("test")
+        verify_key = get_verify_key(other_server_signature_key)
+        verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+
+        self.get_success(
+            self.hs.get_datastores().main.store_server_keys_response(
+                remote_server_name,
+                from_server=remote_server_name,
+                ts_added_ms=self.clock.time_msec(),
+                verify_keys={
+                    verify_key_id: FetchKeyResult(
+                        verify_key=verify_key,
+                        valid_until_ts=self.clock.time_msec() + 10000,
+                    ),
+                },
+                response_json={
+                    "verify_keys": {
+                        verify_key_id: {"key": encode_verify_key_base64(verify_key)}
+                    }
+                },
+            )
+        )
+
+        add_hashes_and_signatures(
+            room_version=room_version,
+            event_dict=event_dict,
+            signature_name=remote_server_name,
+            signing_key=other_server_signature_key,
+        )
+        event = make_event_from_dict(
+            event_dict,
+            room_version=room_version,
+        )
+
+        return event
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 9dd0e98971bf2581d3a573f137205fbc2cf8378d..6b202dfbd5367591fefbae8ee8a565a523f1b60e 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -17,6 +17,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+from http import HTTPStatus
 from typing import Collection, ContextManager, List, Optional
 from unittest.mock import AsyncMock, Mock, patch
 
@@ -347,7 +348,15 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
         # the prev_events used when creating the join event, such that the ban does not
         # precede the join.
         with self._patch_get_latest_events([last_room_creation_event_id]):
-            self.helper.join(room_id, eve, tok=eve_token)
+            self.helper.join(
+                room_id,
+                eve,
+                tok=eve_token,
+                # Previously, this join would succeed but now we expect it to fail at
+                # this point. The rest of the test is for the case when this used to
+                # succeed.
+                expect_code=HTTPStatus.FORBIDDEN,
+            )
 
         # Eve makes a second, incremental sync.
         eve_incremental_sync_after_join: SyncResult = self.get_success(
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 4429d0f4e29d2063b234504d5dff09f9a20c4eff..58a7a9dc721a01379b52b504adb0c58e59f08c3c 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -22,14 +22,26 @@ import logging
 from unittest.mock import AsyncMock, Mock
 
 from netaddr import IPSet
+from signedjson.key import (
+    encode_verify_key_base64,
+    generate_signing_key,
+    get_verify_key,
+)
+
+from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.events.builder import EventBuilderFactory
+from synapse.api.room_versions import RoomVersion
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.events import EventBase, make_event_from_dict
 from synapse.handlers.typing import TypingWriterHandler
 from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.rest.admin import register_servlets_for_client_rest_resource
 from synapse.rest.client import login, room
-from synapse.types import UserID, create_requester
+from synapse.server import HomeServer
+from synapse.storage.keys import FetchKeyResult
+from synapse.types import JsonDict, UserID, create_requester
+from synapse.util import Clock
 
 from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.server import get_clock
@@ -63,6 +75,9 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
             ip_blocklist=IPSet(),
         )
 
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.storage_controllers = hs.get_storage_controllers()
+
     def test_send_event_single_sender(self) -> None:
         """Test that using a single federation sender worker correctly sends a
         new event.
@@ -243,35 +258,92 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         self.assertTrue(sent_on_1)
         self.assertTrue(sent_on_2)
 
+    def create_fake_event_from_remote_server(
+        self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
+    ) -> EventBase:
+        """
+        This is similar to what `FederatingHomeserverTestCase` is doing but we don't
+        need all of the extra baggage and we want to be able to create an event from
+        many remote servers.
+        """
+
+        # poke the other server's signing key into the key store, so that we don't
+        # make requests for it
+        other_server_signature_key = generate_signing_key("test")
+        verify_key = get_verify_key(other_server_signature_key)
+        verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+
+        self.get_success(
+            self.hs.get_datastores().main.store_server_keys_response(
+                remote_server_name,
+                from_server=remote_server_name,
+                ts_added_ms=self.clock.time_msec(),
+                verify_keys={
+                    verify_key_id: FetchKeyResult(
+                        verify_key=verify_key,
+                        valid_until_ts=self.clock.time_msec() + 10000,
+                    ),
+                },
+                response_json={
+                    "verify_keys": {
+                        verify_key_id: {"key": encode_verify_key_base64(verify_key)}
+                    }
+                },
+            )
+        )
+
+        add_hashes_and_signatures(
+            room_version=room_version,
+            event_dict=event_dict,
+            signature_name=remote_server_name,
+            signing_key=other_server_signature_key,
+        )
+        event = make_event_from_dict(
+            event_dict,
+            room_version=room_version,
+        )
+
+        return event
+
     def create_room_with_remote_server(
         self, user: str, token: str, remote_server: str = "other_server"
     ) -> str:
-        room = self.helper.create_room_as(user, tok=token)
+        room_id = self.helper.create_room_as(user, tok=token)
         store = self.hs.get_datastores().main
         federation = self.hs.get_federation_event_handler()
 
-        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
-        room_version = self.get_success(store.get_room_version(room))
+        room_version = self.get_success(store.get_room_version(room_id))
 
-        factory = EventBuilderFactory(self.hs)
-        factory.hostname = remote_server
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id)
+        )
+
+        # Figure out what the forward extremities in the room are (the most recent
+        # events that aren't tied into the DAG)
+        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room_id))
 
         user_id = UserID("user", remote_server).to_string()
 
-        event_dict = {
-            "type": EventTypes.Member,
-            "state_key": user_id,
-            "content": {"membership": Membership.JOIN},
-            "sender": user_id,
-            "room_id": room,
-        }
-
-        builder = factory.for_room_version(room_version, event_dict)
-        join_event = self.get_success(
-            builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
+        join_event = self.create_fake_event_from_remote_server(
+            remote_server_name=remote_server,
+            event_dict={
+                "room_id": room_id,
+                "sender": user_id,
+                "type": EventTypes.Member,
+                "state_key": user_id,
+                "depth": 1000,
+                "origin_server_ts": 1,
+                "content": {"membership": Membership.JOIN},
+                "auth_events": [
+                    state_map[(EventTypes.Create, "")].event_id,
+                    state_map[(EventTypes.JoinRules, "")].event_id,
+                ],
+                "prev_events": list(prev_event_ids),
+            },
+            room_version=room_version,
         )
 
         self.get_success(federation.on_send_membership_event(remote_server, join_event))
         self.replicate()
 
-        return room
+        return room_id
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 4cf1a3dc51908c409513b0e9eb0cbcac45938dcb..833bd6fff8c30d47a3edb59a81c15feec405c755 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -742,7 +742,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(33, channel.resource_usage.db_txn_count)
+        self.assertEqual(34, channel.resource_usage.db_txn_count)
 
     def test_post_room_initial_state(self) -> None:
         # POST with initial_state config key, expect new room id
@@ -755,7 +755,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(35, channel.resource_usage.db_txn_count)
+        self.assertEqual(36, channel.resource_usage.db_txn_count)
 
     def test_post_room_visibility_key(self) -> None:
         # POST with visibility config key, expect new room id
diff --git a/tests/test_federation.py b/tests/test_federation.py
deleted file mode 100644
index 94b0fa98565aa56be62b3926d5b629ae7c6da854..0000000000000000000000000000000000000000
--- a/tests/test_federation.py
+++ /dev/null
@@ -1,378 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-
-from typing import Collection, List, Optional, Union
-from unittest.mock import AsyncMock, Mock
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.api.errors import FederationError
-from synapse.api.room_versions import RoomVersion, RoomVersions
-from synapse.events import EventBase, make_event_from_dict
-from synapse.events.snapshot import EventContext
-from synapse.federation.federation_base import event_from_pdu_json
-from synapse.handlers.device import DeviceListUpdater
-from synapse.http.types import QueryParams
-from synapse.logging.context import LoggingContext
-from synapse.server import HomeServer
-from synapse.types import JsonDict, UserID, create_requester
-from synapse.util import Clock
-from synapse.util.retryutils import NotRetryingDestination
-
-from tests import unittest
-
-
-class MessageAcceptTests(unittest.HomeserverTestCase):
-    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        self.http_client = Mock()
-        return self.setup_test_homeserver(federation_http_client=self.http_client)
-
-    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-        user_id = UserID("us", "test")
-        our_user = create_requester(user_id)
-        room_creator = self.hs.get_room_creation_handler()
-        self.room_id = self.get_success(
-            room_creator.create_room(
-                our_user, room_creator._presets_dict["public_chat"], ratelimit=False
-            )
-        )[0]
-
-        self.store = self.hs.get_datastores().main
-
-        # Figure out what the most recent event is
-        most_recent = next(
-            iter(
-                self.get_success(
-                    self.hs.get_datastores().main.get_latest_event_ids_in_room(
-                        self.room_id
-                    )
-                )
-            )
-        )
-
-        join_event = make_event_from_dict(
-            {
-                "room_id": self.room_id,
-                "sender": "@baduser:test.serv",
-                "state_key": "@baduser:test.serv",
-                "event_id": "$join:test.serv",
-                "depth": 1000,
-                "origin_server_ts": 1,
-                "type": "m.room.member",
-                "origin": "test.servx",
-                "content": {"membership": "join"},
-                "auth_events": [],
-                "prev_state": [(most_recent, {})],
-                "prev_events": [(most_recent, {})],
-            }
-        )
-
-        self.handler = self.hs.get_federation_handler()
-        federation_event_handler = self.hs.get_federation_event_handler()
-
-        async def _check_event_auth(
-            origin: Optional[str], event: EventBase, context: EventContext
-        ) -> None:
-            pass
-
-        federation_event_handler._check_event_auth = _check_event_auth  # type: ignore[method-assign]
-        self.client = self.hs.get_federation_client()
-
-        async def _check_sigs_and_hash_for_pulled_events_and_fetch(
-            dest: str, pdus: Collection[EventBase], room_version: RoomVersion
-        ) -> List[EventBase]:
-            return list(pdus)
-
-        self.client._check_sigs_and_hash_for_pulled_events_and_fetch = (  # type: ignore[method-assign]
-            _check_sigs_and_hash_for_pulled_events_and_fetch  # type: ignore[assignment]
-        )
-
-        # Send the join, it should return None (which is not an error)
-        self.assertEqual(
-            self.get_success(
-                federation_event_handler.on_receive_pdu("test.serv", join_event)
-            ),
-            None,
-        )
-
-        # Make sure we actually joined the room
-        self.assertEqual(
-            self.get_success(self.store.get_latest_event_ids_in_room(self.room_id)),
-            {"$join:test.serv"},
-        )
-
-    def test_cant_hide_direct_ancestors(self) -> None:
-        """
-        If you send a message, you must be able to provide the direct
-        prev_events that said event references.
-        """
-
-        async def post_json(
-            destination: str,
-            path: str,
-            data: Optional[JsonDict] = None,
-            long_retries: bool = False,
-            timeout: Optional[int] = None,
-            ignore_backoff: bool = False,
-            args: Optional[QueryParams] = None,
-        ) -> Union[JsonDict, list]:
-            # If it asks us for new missing events, give them NOTHING
-            if path.startswith("/_matrix/federation/v1/get_missing_events/"):
-                return {"events": []}
-            return {}
-
-        self.http_client.post_json = post_json
-
-        # Figure out what the most recent event is
-        most_recent = next(
-            iter(
-                self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
-            )
-        )
-
-        # Now lie about an event
-        lying_event = make_event_from_dict(
-            {
-                "room_id": self.room_id,
-                "sender": "@baduser:test.serv",
-                "event_id": "one:test.serv",
-                "depth": 1000,
-                "origin_server_ts": 1,
-                "type": "m.room.message",
-                "origin": "test.serv",
-                "content": {"body": "hewwo?"},
-                "auth_events": [],
-                "prev_events": [("two:test.serv", {}), (most_recent, {})],
-            }
-        )
-
-        federation_event_handler = self.hs.get_federation_event_handler()
-        with LoggingContext("test-context"):
-            failure = self.get_failure(
-                federation_event_handler.on_receive_pdu("test.serv", lying_event),
-                FederationError,
-            )
-
-        # on_receive_pdu should throw an error
-        self.assertEqual(
-            failure.value.args[0],
-            (
-                "ERROR 403: Your server isn't divulging details about prev_events "
-                "referenced in this event."
-            ),
-        )
-
-        # Make sure the invalid event isn't there
-        extrem = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
-        self.assertEqual(extrem, {"$join:test.serv"})
-
-    def test_retry_device_list_resync(self) -> None:
-        """Tests that device lists are marked as stale if they couldn't be synced, and
-        that stale device lists are retried periodically.
-        """
-        remote_user_id = "@john:test_remote"
-        remote_origin = "test_remote"
-
-        # Track the number of attempts to resync the user's device list.
-        self.resync_attempts = 0
-
-        # When this function is called, increment the number of resync attempts (only if
-        # we're querying devices for the right user ID), then raise a
-        # NotRetryingDestination error to fail the resync gracefully.
-        def query_user_devices(
-            destination: str, user_id: str, timeout: int = 30000
-        ) -> JsonDict:
-            if user_id == remote_user_id:
-                self.resync_attempts += 1
-
-            raise NotRetryingDestination(0, 0, destination)
-
-        # Register the mock on the federation client.
-        federation_client = self.hs.get_federation_client()
-        federation_client.query_user_devices = Mock(side_effect=query_user_devices)  # type: ignore[method-assign]
-
-        # Register a mock on the store so that the incoming update doesn't fail because
-        # we don't share a room with the user.
-        store = self.hs.get_datastores().main
-        store.get_rooms_for_user = AsyncMock(return_value=["!someroom:test"])
-
-        # Manually inject a fake device list update. We need this update to include at
-        # least one prev_id so that the user's device list will need to be retried.
-        device_list_updater = self.hs.get_device_handler().device_list_updater
-        assert isinstance(device_list_updater, DeviceListUpdater)
-        self.get_success(
-            device_list_updater.incoming_device_list_update(
-                origin=remote_origin,
-                edu_content={
-                    "deleted": False,
-                    "device_display_name": "Mobile",
-                    "device_id": "QBUAZIFURK",
-                    "prev_id": [5],
-                    "stream_id": 6,
-                    "user_id": remote_user_id,
-                },
-            )
-        )
-
-        # Check that there was one resync attempt.
-        self.assertEqual(self.resync_attempts, 1)
-
-        # Check that the resync attempt failed and caused the user's device list to be
-        # marked as stale.
-        need_resync = self.get_success(
-            store.get_user_ids_requiring_device_list_resync()
-        )
-        self.assertIn(remote_user_id, need_resync)
-
-        # Check that waiting for 30 seconds caused Synapse to retry resyncing the device
-        # list.
-        self.reactor.advance(30)
-        self.assertEqual(self.resync_attempts, 2)
-
-    def test_cross_signing_keys_retry(self) -> None:
-        """Tests that resyncing a device list correctly processes cross-signing keys from
-        the remote server.
-        """
-        remote_user_id = "@john:test_remote"
-        remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
-        remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
-
-        # Register mock device list retrieval on the federation client.
-        federation_client = self.hs.get_federation_client()
-        federation_client.query_user_devices = AsyncMock(  # type: ignore[method-assign]
-            return_value={
-                "user_id": remote_user_id,
-                "stream_id": 1,
-                "devices": [],
-                "master_key": {
-                    "user_id": remote_user_id,
-                    "usage": ["master"],
-                    "keys": {"ed25519:" + remote_master_key: remote_master_key},
-                },
-                "self_signing_key": {
-                    "user_id": remote_user_id,
-                    "usage": ["self_signing"],
-                    "keys": {
-                        "ed25519:" + remote_self_signing_key: remote_self_signing_key
-                    },
-                },
-            }
-        )
-
-        # Resync the device list.
-        device_handler = self.hs.get_device_handler()
-        self.get_success(
-            device_handler.device_list_updater.multi_user_device_resync(
-                [remote_user_id]
-            ),
-        )
-
-        # Retrieve the cross-signing keys for this user.
-        keys = self.get_success(
-            self.store.get_e2e_cross_signing_keys_bulk(user_ids=[remote_user_id]),
-        )
-        self.assertIn(remote_user_id, keys)
-        key = keys[remote_user_id]
-        assert key is not None
-
-        # Check that the master key is the one returned by the mock.
-        master_key = key["master"]
-        self.assertEqual(len(master_key["keys"]), 1)
-        self.assertTrue("ed25519:" + remote_master_key in master_key["keys"].keys())
-        self.assertTrue(remote_master_key in master_key["keys"].values())
-
-        # Check that the self-signing key is the one returned by the mock.
-        self_signing_key = key["self_signing"]
-        self.assertEqual(len(self_signing_key["keys"]), 1)
-        self.assertTrue(
-            "ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
-        )
-        self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
-
-
-class StripUnsignedFromEventsTestCase(unittest.TestCase):
-    def test_strip_unauthorized_unsigned_values(self) -> None:
-        event1 = {
-            "sender": "@baduser:test.serv",
-            "state_key": "@baduser:test.serv",
-            "event_id": "$event1:test.serv",
-            "depth": 1000,
-            "origin_server_ts": 1,
-            "type": "m.room.member",
-            "origin": "test.servx",
-            "content": {"membership": "join"},
-            "auth_events": [],
-            "unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"},
-        }
-        filtered_event = event_from_pdu_json(event1, RoomVersions.V1)
-        # Make sure unauthorized fields are stripped from unsigned
-        self.assertNotIn("more warez", filtered_event.unsigned)
-
-    def test_strip_event_maintains_allowed_fields(self) -> None:
-        event2 = {
-            "sender": "@baduser:test.serv",
-            "state_key": "@baduser:test.serv",
-            "event_id": "$event2:test.serv",
-            "depth": 1000,
-            "origin_server_ts": 1,
-            "type": "m.room.member",
-            "origin": "test.servx",
-            "auth_events": [],
-            "content": {"membership": "join"},
-            "unsigned": {
-                "malicious garbage": "hackz",
-                "more warez": "more hackz",
-                "age": 14,
-                "invite_room_state": [],
-            },
-        }
-
-        filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1)
-        self.assertIn("age", filtered_event2.unsigned)
-        self.assertEqual(14, filtered_event2.unsigned["age"])
-        self.assertNotIn("more warez", filtered_event2.unsigned)
-        # Invite_room_state is allowed in events of type m.room.member
-        self.assertIn("invite_room_state", filtered_event2.unsigned)
-        self.assertEqual([], filtered_event2.unsigned["invite_room_state"])
-
-    def test_strip_event_removes_fields_based_on_event_type(self) -> None:
-        event3 = {
-            "sender": "@baduser:test.serv",
-            "state_key": "@baduser:test.serv",
-            "event_id": "$event3:test.serv",
-            "depth": 1000,
-            "origin_server_ts": 1,
-            "type": "m.room.power_levels",
-            "origin": "test.servx",
-            "content": {},
-            "auth_events": [],
-            "unsigned": {
-                "malicious garbage": "hackz",
-                "more warez": "more hackz",
-                "age": 14,
-                "invite_room_state": [],
-            },
-        }
-        filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1)
-        self.assertIn("age", filtered_event3.unsigned)
-        # Invite_room_state field is only permitted in event type m.room.member
-        self.assertNotIn("invite_room_state", filtered_event3.unsigned)
-        self.assertNotIn("more warez", filtered_event3.unsigned)
diff --git a/tests/utils.py b/tests/utils.py
index 3f4a7bb5600eb5f2ae3196d62f2b7210612b7ef3..d4aebc306990f54ba4939595c901e2a40b60d98d 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -400,11 +400,24 @@ class TestTimeout(Exception):
 
 
 class test_timeout:
+    """
+    FIXME: This implementation is not robust against other code tight-looping and
+    preventing the signals propagating and timing out the test. You may need to add
+    `time.sleep(0.1)` to your code in order to allow this timeout to work correctly.
+
+    ```py
+    with test_timeout(3):
+        while True:
+            my_checking_func()
+            time.sleep(0.1)
+    ```
+    """
+
     def __init__(self, seconds: int, error_message: Optional[str] = None) -> None:
-        if error_message is None:
-            error_message = "test timed out after {}s.".format(seconds)
+        self.error_message = f"Test timed out after {seconds}s"
+        if error_message is not None:
+            self.error_message += f": {error_message}"
         self.seconds = seconds
-        self.error_message = error_message
 
     def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None:
         raise TestTimeout(self.error_message)