diff --git a/changelog.d/8287.bugfix b/changelog.d/8287.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..839781aa07531c370b3ff25a1911b5aaebfeca8e
--- /dev/null
+++ b/changelog.d/8287.bugfix
@@ -0,0 +1 @@
+Fix edge case where push could get delayed for a user until a later event was pushed.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 43f2986f8955efd3ee9aceb230f4c217270e63f4..74d7ac8a67f27cf3953acaaba4ed4803fcc1eec8 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler):
             event, event_stream_id, max_stream_id, extra_users=extra_users
         )
 
-        await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
+        await self.pusher_pool.on_new_notifications(max_stream_id)
 
     async def _clean_room_for_join(self, room_id: str) -> None:
         """Called to clean up any data in DB for a given room, ready for the
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8a7b4916cd6ae91ee63c5aec59a15122d73d09d8..d1556659e3e53dea909186197cecfbf1baf5a36b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1145,7 +1145,7 @@ class EventCreationHandler:
             # If there's an expiry timestamp on the event, schedule its expiry.
             self._message_handler.maybe_schedule_expiry(event)
 
-        await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
+        await self.pusher_pool.on_new_notifications(max_stream_id)
 
         def _notify():
             try:
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index b7ea4438e082a978ff5659a94c1064b8278eb670..28bd8ab7481e81ba84d6e40f21ee5614fa05d948 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -91,7 +91,7 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+    def on_new_notifications(self, max_stream_ordering):
         if self.max_stream_ordering:
             self.max_stream_ordering = max(
                 max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f21fa9b65905292a135b9656956ae44252998d6e..26706bf3e1eec75f51123ae6acce20083a0e813a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -114,7 +114,7 @@ class HttpPusher:
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+    def on_new_notifications(self, max_stream_ordering):
         self.max_stream_ordering = max(
             max_stream_ordering, self.max_stream_ordering or 0
         )
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3c3262a88c53d7b1403912ce249d3b481dc36292..fa8473bf8d00fb1f4d3ae3f15e67be94110e9682 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -64,6 +64,12 @@ class PusherPool:
         self._pusher_shard_config = hs.config.push.pusher_shard_config
         self._instance_name = hs.get_instance_name()
 
+        # Record the last stream ID that we were poked about so we can get
+        # changes since then. We set this to the current max stream ID on
+        # startup as every individual pusher will have checked for changes on
+        # startup.
+        self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()
+
         # map from user id to app_id:pushkey to pusher
         self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
 
@@ -178,20 +184,27 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, min_stream_id, max_stream_id):
+    async def on_new_notifications(self, max_stream_id):
         if not self.pushers:
             # nothing to do here.
             return
 
+        if max_stream_id < self._last_room_stream_id_seen:
+            # Nothing to do
+            return
+
+        prev_stream_id = self._last_room_stream_id_seen
+        self._last_room_stream_id_seen = max_stream_id
+
         try:
             users_affected = await self.store.get_push_action_users_in_range(
-                min_stream_id, max_stream_id
+                prev_stream_id, max_stream_id
             )
 
             for u in users_affected:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(min_stream_id, max_stream_id)
+                        p.on_new_notifications(max_stream_id)
 
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index d6ecf5b3270315338127b067bd865ec9b6f3a1e6..ccd3147dfdafcbfcc796513f9dfa536c6531f3ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -154,7 +154,8 @@ class ReplicationDataHandler:
                 max_token = self.store.get_room_max_stream_ordering()
                 self.notifier.on_new_room_event(event, token, max_token, extra_users)
 
-            await self.pusher_pool.on_new_notifications(token, token)
+            max_token = self.store.get_room_max_stream_ordering()
+            await self.pusher_pool.on_new_notifications(max_token)
 
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index ae6bc24f4cbb832803b2a4a194003bd2c83495d6..f306a09bfaa79ee66dbb426c3d67376a6a2926f2 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -80,6 +80,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
                 "get_user_directory_stream_pos",
                 "get_current_state_deltas",
                 "get_device_updates_by_remote",
+                "get_room_max_stream_ordering",
             ]
         )