diff --git a/CHANGES.md b/CHANGES.md
index 532b30e2323f3e10f2e978b01f938be2457f7e5a..9a41607679fdf5768d65f7316cd92c383056f0a7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,56 @@
+Synapse 1.33.0rc1 (2021-04-28)
+==============================
+
+Features
+--------
+
+- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
+- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
+- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))
+
+
+Bugfixes
+--------
+
+- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
+- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
+- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
+- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
+- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
+- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))
+
+
+Improved Documentation
+----------------------
+
+- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))
+
+
+Internal Changes
+----------------
+
+- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
+- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
+- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
+- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
+- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
+- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
+- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
+- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
+- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
+- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
+- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
+- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
+- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
+- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
+- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
+- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
+- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
+- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
+- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
+- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
+
+
 Synapse 1.32.2 (2021-04-22)
 ===========================
 
diff --git a/changelog.d/9162.misc b/changelog.d/9162.misc
deleted file mode 100644
index 1083da8a7af87aace08bc2dadabaab130035bc82..0000000000000000000000000000000000000000
--- a/changelog.d/9162.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add a dockerfile for running Synapse in worker-mode under Complement.
\ No newline at end of file
diff --git a/changelog.d/9702.misc b/changelog.d/9702.misc
deleted file mode 100644
index c6e63450a97102ca20d624715e39a86c4341283e..0000000000000000000000000000000000000000
--- a/changelog.d/9702.misc
+++ /dev/null
@@ -1 +0,0 @@
-Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan.
diff --git a/changelog.d/9726.bugfix b/changelog.d/9726.bugfix
deleted file mode 100644
index 4ba0b24327cbba8625de9897a0289c6f5f6402ba..0000000000000000000000000000000000000000
--- a/changelog.d/9726.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path.
\ No newline at end of file
diff --git a/changelog.d/9786.misc b/changelog.d/9786.misc
deleted file mode 100644
index cf265db749e2a3cbd4cd5d0cb090bdcb0d67d730..0000000000000000000000000000000000000000
--- a/changelog.d/9786.misc
+++ /dev/null
@@ -1 +0,0 @@
-Apply `pyupgrade` across the codebase.
\ No newline at end of file
diff --git a/changelog.d/9788.bugfix b/changelog.d/9788.bugfix
deleted file mode 100644
index edb58fbd5b37ce47232df5711e5e96fe7ee48a4f..0000000000000000000000000000000000000000
--- a/changelog.d/9788.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg.
diff --git a/changelog.d/9796.misc b/changelog.d/9796.misc
deleted file mode 100644
index 59bb1813c32f068c3b7d6c487dd10c59372f839e..0000000000000000000000000000000000000000
--- a/changelog.d/9796.misc
+++ /dev/null
@@ -1 +0,0 @@
-Move some replication processing out of `generic_worker`.
diff --git a/changelog.d/9800.feature b/changelog.d/9800.feature
deleted file mode 100644
index 9404ad2fc047d7ca1fa6f4bdbf217763e66b4aa1..0000000000000000000000000000000000000000
--- a/changelog.d/9800.feature
+++ /dev/null
@@ -1 +0,0 @@
-Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.
diff --git a/changelog.d/9801.doc b/changelog.d/9801.doc
deleted file mode 100644
index 8b8b9d01d493844baf308aeeafcdb544ed15e77c..0000000000000000000000000000000000000000
--- a/changelog.d/9801.doc
+++ /dev/null
@@ -1 +0,0 @@
-Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms.
diff --git a/changelog.d/9802.bugfix b/changelog.d/9802.bugfix
deleted file mode 100644
index 0c72f7be473fe54d1e6d2c48aece42c51af0acea..0000000000000000000000000000000000000000
--- a/changelog.d/9802.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Add some sanity checks to identity server passed to 3PID bind/unbind endpoints.
diff --git a/changelog.d/9814.feature b/changelog.d/9814.feature
deleted file mode 100644
index 9404ad2fc047d7ca1fa6f4bdbf217763e66b4aa1..0000000000000000000000000000000000000000
--- a/changelog.d/9814.feature
+++ /dev/null
@@ -1 +0,0 @@
-Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.
diff --git a/changelog.d/9815.misc b/changelog.d/9815.misc
deleted file mode 100644
index e33d012d3d28b7ed5b3fdb00b74e0a54c94dddd2..0000000000000000000000000000000000000000
--- a/changelog.d/9815.misc
+++ /dev/null
@@ -1 +0,0 @@
-Replace `HomeServer.get_config()` with inline references.
diff --git a/changelog.d/9816.misc b/changelog.d/9816.misc
deleted file mode 100644
index d0981225006e63ed8abfd9b303e709087ea0cd40..0000000000000000000000000000000000000000
--- a/changelog.d/9816.misc
+++ /dev/null
@@ -1 +0,0 @@
-Rename some handlers and config modules to not duplicate the top-level module.
diff --git a/changelog.d/9817.misc b/changelog.d/9817.misc
deleted file mode 100644
index 8aa8895f05f0e63f003f627c2289bf47d1adda08..0000000000000000000000000000000000000000
--- a/changelog.d/9817.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced.
diff --git a/changelog.d/9819.feature b/changelog.d/9819.feature
deleted file mode 100644
index f56b0bb3bdeb640a6f87bb821187173ab953fdc8..0000000000000000000000000000000000000000
--- a/changelog.d/9819.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for handling presence on a worker.
diff --git a/changelog.d/9820.feature b/changelog.d/9820.feature
deleted file mode 100644
index f56b0bb3bdeb640a6f87bb821187173ab953fdc8..0000000000000000000000000000000000000000
--- a/changelog.d/9820.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for handling presence on a worker.
diff --git a/changelog.d/9821.misc b/changelog.d/9821.misc
deleted file mode 100644
index 03b2d2ed4dbbb485dd3a471aaadcefd50affb7ab..0000000000000000000000000000000000000000
--- a/changelog.d/9821.misc
+++ /dev/null
@@ -1 +0,0 @@
-Reduce CPU usage of the user directory by reusing existing calculated room membership.
\ No newline at end of file
diff --git a/changelog.d/9825.misc b/changelog.d/9825.misc
deleted file mode 100644
index 42f3f15619367980571295c475394bec7d9b3dce..0000000000000000000000000000000000000000
--- a/changelog.d/9825.misc
+++ /dev/null
@@ -1 +0,0 @@
-Small speed up for joining large remote rooms.
diff --git a/changelog.d/9828.feature b/changelog.d/9828.feature
deleted file mode 100644
index f56b0bb3bdeb640a6f87bb821187173ab953fdc8..0000000000000000000000000000000000000000
--- a/changelog.d/9828.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for handling presence on a worker.
diff --git a/changelog.d/9832.feature b/changelog.d/9832.feature
deleted file mode 100644
index e76395fbe886d4be376cfae0330d136f6be2e871..0000000000000000000000000000000000000000
--- a/changelog.d/9832.feature
+++ /dev/null
@@ -1 +0,0 @@
-Don't return an error when a user attempts to renew their account multiple times with the same token. Instead, state when their account is set to expire. This change concerns the optional account validity feature.
\ No newline at end of file
diff --git a/changelog.d/9833.bugfix b/changelog.d/9833.bugfix
deleted file mode 100644
index 56f9c9626b5c34b72612b57f8d57d515ad49ac86..0000000000000000000000000000000000000000
--- a/changelog.d/9833.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Limit the size of HTTP responses read over federation.
diff --git a/changelog.d/9838.misc b/changelog.d/9838.misc
deleted file mode 100644
index b98ce563093c68a46d3fc66f1952800c6dd5183d..0000000000000000000000000000000000000000
--- a/changelog.d/9838.misc
+++ /dev/null
@@ -1 +0,0 @@
-Introduce flake8-bugbear to the test suite and fix some of its lint violations.
\ No newline at end of file
diff --git a/changelog.d/9845.misc b/changelog.d/9845.misc
deleted file mode 100644
index 875dd6d13156be3fce05964931b178477c6e9038..0000000000000000000000000000000000000000
--- a/changelog.d/9845.misc
+++ /dev/null
@@ -1 +0,0 @@
-Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores.
diff --git a/changelog.d/9850.feature b/changelog.d/9850.feature
deleted file mode 100644
index f56b0bb3bdeb640a6f87bb821187173ab953fdc8..0000000000000000000000000000000000000000
--- a/changelog.d/9850.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for handling presence on a worker.
diff --git a/changelog.d/9855.misc b/changelog.d/9855.misc
deleted file mode 100644
index 6a3d700fde71728195306ffa1cba968f99fe00fb..0000000000000000000000000000000000000000
--- a/changelog.d/9855.misc
+++ /dev/null
@@ -1 +0,0 @@
-Limit length of accepted email addresses.
diff --git a/changelog.d/9856.misc b/changelog.d/9856.misc
deleted file mode 100644
index d67e8c386a3e56f000321757cd56d6ddb80acd8a..0000000000000000000000000000000000000000
--- a/changelog.d/9856.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove redundant `synapse.types.Collection` type definition.
diff --git a/changelog.d/9858.misc b/changelog.d/9858.misc
deleted file mode 100644
index f7e286fa69bfae5b3b1d285f10648a09146b2ff6..0000000000000000000000000000000000000000
--- a/changelog.d/9858.misc
+++ /dev/null
@@ -1 +0,0 @@
-Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts.
diff --git a/changelog.d/9867.bugfix b/changelog.d/9867.bugfix
deleted file mode 100644
index f236de247d8b556710647b9a3773374d468c44a1..0000000000000000000000000000000000000000
--- a/changelog.d/9867.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists.
diff --git a/changelog.d/9868.bugfix b/changelog.d/9868.bugfix
deleted file mode 100644
index e2b4f97ad51fac5b9962a73a9e4230a66c8ac0e7..0000000000000000000000000000000000000000
--- a/changelog.d/9868.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a long-standing bug where errors from federation did not propagate to the client.
diff --git a/changelog.d/9871.misc b/changelog.d/9871.misc
deleted file mode 100644
index b19acfab6298bdb526036f4557fb0ffda9367ae9..0000000000000000000000000000000000000000
--- a/changelog.d/9871.misc
+++ /dev/null
@@ -1 +0,0 @@
-Disable invite rate-limiting by default when running the unit tests.
\ No newline at end of file
diff --git a/changelog.d/9874.misc b/changelog.d/9874.misc
deleted file mode 100644
index ba1097e65eb4eb310f90d5e4ffe5a09c23b96889..0000000000000000000000000000000000000000
--- a/changelog.d/9874.misc
+++ /dev/null
@@ -1 +0,0 @@
-Pass a reactor into `SynapseSite` to make testing easier.
diff --git a/changelog.d/9875.misc b/changelog.d/9875.misc
deleted file mode 100644
index 9345c0bf4530d2e00f16fdc39003f02ae983144e..0000000000000000000000000000000000000000
--- a/changelog.d/9875.misc
+++ /dev/null
@@ -1 +0,0 @@
-Make `DomainSpecificString` an `attrs` class.
diff --git a/changelog.d/9876.misc b/changelog.d/9876.misc
deleted file mode 100644
index 28390e32e67efe24aacf572ca20cee7e43ea6d47..0000000000000000000000000000000000000000
--- a/changelog.d/9876.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules.
diff --git a/changelog.d/9878.misc b/changelog.d/9878.misc
deleted file mode 100644
index 927876852db3f52df078d8feff51ac9eb92bf9b3..0000000000000000000000000000000000000000
--- a/changelog.d/9878.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove redundant `_PushHTTPChannel` test class.
diff --git a/changelog.d/9879.misc b/changelog.d/9879.misc
deleted file mode 100644
index c9ca37cf4835238350fa7a7f7ba29c691506c4ac..0000000000000000000000000000000000000000
--- a/changelog.d/9879.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove backwards-compatibility code for Python versions < 3.6.
\ No newline at end of file
diff --git a/changelog.d/9887.misc b/changelog.d/9887.misc
deleted file mode 100644
index 650ebf85e6ceac59f1a8c5453b1b65a4d60f9dcd..0000000000000000000000000000000000000000
--- a/changelog.d/9887.misc
+++ /dev/null
@@ -1 +0,0 @@
-Small performance improvement around handling new local presence updates.
diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py
index 5dd172052b9ead198fc5640e410ab44c72c934c4..31b8a68225045c7207251b97d1e5e0457066b556 100644
--- a/contrib/experiments/test_messaging.py
+++ b/contrib/experiments/test_messaging.py
@@ -224,16 +224,14 @@ class HomeServer(ReplicationHandler):
         destinations = yield self.get_servers_for_context(room_name)
 
         try:
-            yield self.replication_layer.send_pdus(
-                [
-                    Pdu.create_new(
-                        context=room_name,
-                        pdu_type="sy.room.message",
-                        content={"sender": sender, "body": body},
-                        origin=self.server_name,
-                        destinations=destinations,
-                    )
-                ]
+            yield self.replication_layer.send_pdu(
+                Pdu.create_new(
+                    context=room_name,
+                    pdu_type="sy.room.message",
+                    content={"sender": sender, "body": body},
+                    origin=self.server_name,
+                    destinations=destinations,
+                )
             )
         except Exception as e:
             logger.exception(e)
@@ -255,7 +253,7 @@ class HomeServer(ReplicationHandler):
                 origin=self.server_name,
                 destinations=destinations,
             )
-            yield self.replication_layer.send_pdus([pdu])
+            yield self.replication_layer.send_pdu(pdu)
         except Exception as e:
             logger.exception(e)
 
@@ -267,18 +265,16 @@ class HomeServer(ReplicationHandler):
         destinations = yield self.get_servers_for_context(room_name)
 
         try:
-            yield self.replication_layer.send_pdus(
-                [
-                    Pdu.create_new(
-                        context=room_name,
-                        is_state=True,
-                        pdu_type="sy.room.member",
-                        state_key=invitee,
-                        content={"membership": "invite"},
-                        origin=self.server_name,
-                        destinations=destinations,
-                    )
-                ]
+            yield self.replication_layer.send_pdu(
+                Pdu.create_new(
+                    context=room_name,
+                    is_state=True,
+                    pdu_type="sy.room.member",
+                    state_key=invitee,
+                    content={"membership": "invite"},
+                    origin=self.server_name,
+                    destinations=destinations,
+                )
             )
         except Exception as e:
             logger.exception(e)
diff --git a/synapse/__init__.py b/synapse/__init__.py
index fbd49a93e137b251ef221d09cc7d673b69974044..5bbaa62de2b262f231fbee2e0a9d78ad85440354 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.32.2"
+__version__ = "1.33.0rc1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 022bbf7dad4440be80d6abf18a221c927c92ce0e..deb40f461096cac86cb94daf01bf38cbd19e9e6e 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,26 +14,19 @@
 
 import abc
 import logging
-from typing import (
-    TYPE_CHECKING,
-    Collection,
-    Dict,
-    Hashable,
-    Iterable,
-    List,
-    Optional,
-    Set,
-    Tuple,
-)
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter
 
+from twisted.internet import defer
+
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                async def get_destinations_for_event(
-                    event: EventBase,
-                ) -> Collection[str]:
-                    """Computes the destinations to which this event must be sent.
-
-                    This returns an empty tuple when there are no destinations to send to,
-                    or if this event is not from this homeserver and it is not sending
-                    it on behalf of another server.
-
-                    Will also filter out destinations which this sender is not responsible for,
-                    if multiple federation senders exist.
-                    """
-
+                async def handle_event(event: EventBase) -> None:
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.sender)
                     if not is_mine and send_on_behalf_of is None:
-                        return ()
+                        return
 
                     if not event.internal_metadata.should_proactively_send():
-                        return ()
+                        return
 
                     destinations = None  # type: Optional[Set[str]]
                     if not event.prev_event_ids():
@@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
                                 "Failed to calculate hosts in room for event: %s",
                                 event.event_id,
                             )
-                            return ()
+                            return
 
                     destinations = {
                         d
@@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
                         )
                     }
 
-                    destinations.discard(self.server_name)
-
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
                         # then it already has the event and there is no reason to
                         # send the event to it.
                         destinations.discard(send_on_behalf_of)
 
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     if destinations:
+                        await self._send_pdu(event, destinations)
+
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
 
@@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
                             "federation_sender"
                         ).observe((now - ts) / 1000)
 
-                        return destinations
-                    return ()
-
-                async def get_federatable_events_and_destinations(
-                    events: Iterable[EventBase],
-                ) -> List[Tuple[EventBase, Collection[str]]]:
-                    with Measure(self.clock, "get_destinations_for_events"):
-                        # Fetch federation destinations per event,
-                        # skip if get_destinations_for_event returns an empty collection,
-                        # return list of event->destinations pairs.
-                        return [
-                            (event, dests)
-                            for (event, dests) in [
-                                (event, await get_destinations_for_event(event))
-                                for event in events
-                            ]
-                            if dests
-                        ]
-
-                events_and_dests = await get_federatable_events_and_destinations(events)
-
-                # Send corresponding events to each destination queue
-                await self._distribute_events(events_and_dests)
+                async def handle_room_events(events: Iterable[EventBase]) -> None:
+                    with Measure(self.clock, "handle_room_events"):
+                        for event in events:
+                            await handle_event(event)
+
+                events_by_room = {}  # type: Dict[str, List[EventBase]]
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                await make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            run_in_background(handle_room_events, evs)
+                            for evs in events_by_room.values()
+                        ],
+                        consumeErrors=True,
+                    )
+                )
 
                 await self.store.update_federation_out_pos("events", next_token)
 
@@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
                     events_processed_counter.inc(len(events))
 
                     event_processing_loop_room_count.labels("federation_sender").inc(
-                        len({event.room_id for event in events})
+                        len(events_by_room)
                     )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
@@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
         finally:
             self._is_processing = False
 
-    async def _distribute_events(
-        self,
-        events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
-    ) -> None:
-        """Distribute events to the respective per_destination queues.
-
-        Also persists last-seen per-room stream_ordering to 'destination_rooms'.
-
-        Args:
-            events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
-                              Every event is paired with its intended destinations (in federation).
-        """
-        # Tuples of room_id + destination to their max-seen stream_ordering
-        room_with_dest_stream_ordering = {}  # type: Dict[Tuple[str, str], int]
-
-        # List of events to send to each destination
-        events_by_dest = {}  # type: Dict[str, List[EventBase]]
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+        # We loop through all destinations to see whether we already have
+        # a transaction in progress. If we do, stick it in the pending_pdus
+        # table and we'll get back to it later.
 
-        # For each event-destinations pair...
-        for event, destinations in events_and_dests:
+        destinations = set(destinations)
+        destinations.discard(self.server_name)
+        logger.debug("Sending to: %s", str(destinations))
 
-            # (we got this from the database, it's filled)
-            assert event.internal_metadata.stream_ordering
-
-            sent_pdus_destination_dist_total.inc(len(destinations))
-            sent_pdus_destination_dist_count.inc()
+        if not destinations:
+            return
 
-            # ...iterate over those destinations..
-            for destination in destinations:
-                # ...update their stream-ordering...
-                room_with_dest_stream_ordering[(event.room_id, destination)] = max(
-                    event.internal_metadata.stream_ordering,
-                    room_with_dest_stream_ordering.get((event.room_id, destination), 0),
-                )
+        sent_pdus_destination_dist_total.inc(len(destinations))
+        sent_pdus_destination_dist_count.inc()
 
-                # ...and add the event to each destination queue.
-                events_by_dest.setdefault(destination, []).append(event)
+        assert pdu.internal_metadata.stream_ordering
 
-        # Bulk-store destination_rooms stream_ids
-        await self.store.bulk_store_destination_rooms_entries(
-            room_with_dest_stream_ordering
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations,
+            pdu.room_id,
+            pdu.internal_metadata.stream_ordering,
         )
 
-        for destination, pdus in events_by_dest.items():
-            logger.debug("Sending %d pdus to %s", len(pdus), destination)
-
-            self._get_per_destination_queue(destination).send_pdus(pdus)
+        for destination in destinations:
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3bb66bce324eefa2e55df2209abb5deb203d942c..3b053ebcfb0882a0c812a0a9894d51d8689d1664 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,22 +154,19 @@ class PerDestinationQueue:
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdus(self, pdus: Iterable[EventBase]) -> None:
-        """Add PDUs to the queue, and start the transmission loop if necessary
+    def send_pdu(self, pdu: EventBase) -> None:
+        """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
-            pdus: pdus to send
+            pdu: pdu to send
         """
         if not self._catching_up or self._last_successful_stream_ordering is None:
             # only enqueue the PDU if we are not catching up (False) or do not
             # yet know if we have anything to catch up (None)
-            self._pending_pdus.extend(pdus)
+            self._pending_pdus.append(pdu)
         else:
-            self._catchup_last_skipped = max(
-                pdu.internal_metadata.stream_ordering
-                for pdu in pdus
-                if pdu.internal_metadata.stream_ordering is not None
-            )
+            assert pdu.internal_metadata.stream_ordering
+            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
 
         self.attempt_new_transaction()
 
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index b28ca61f8064cf9b12c1e28a7842bde27d55535a..82335e7a9dadae6032807e587874282021b8c8c3 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
 
 import logging
 from collections import namedtuple
-from typing import Dict, List, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -295,33 +295,37 @@ class TransactionStore(TransactionWorkerStore):
                 },
             )
 
-    async def bulk_store_destination_rooms_entries(
-        self, room_and_destination_to_ordering: Dict[Tuple[str, str], int]
-    ):
+    async def store_destination_rooms_entries(
+        self,
+        destinations: Iterable[str],
+        room_id: str,
+        stream_ordering: int,
+    ) -> None:
         """
-        Updates or creates `destination_rooms` entries for a number of events.
+        Updates or creates `destination_rooms` entries in batch for a single event.
 
         Args:
-            room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id
+            destinations: list of destinations
+            room_id: the room_id of the event
+            stream_ordering: the stream_ordering of the event
         """
 
         await self.db_pool.simple_upsert_many(
             table="destinations",
             key_names=("destination",),
-            key_values={(d,) for _, d in room_and_destination_to_ordering.keys()},
+            key_values=[(d,) for d in destinations],
             value_names=[],
             value_values=[],
             desc="store_destination_rooms_entries_dests",
         )
 
+        rows = [(destination, room_id) for destination in destinations]
         await self.db_pool.simple_upsert_many(
             table="destination_rooms",
-            key_names=("room_id", "destination"),
-            key_values=list(room_and_destination_to_ordering.keys()),
+            key_names=("destination", "room_id"),
+            key_values=rows,
             value_names=["stream_ordering"],
-            value_values=[
-                (stream_id,) for stream_id in room_and_destination_to_ordering.values()
-            ],
+            value_values=[(stream_ordering,)] * len(rows),
             desc="store_destination_rooms_entries_rooms",
         )