diff --git a/changelog.d/17962.misc b/changelog.d/17962.misc
new file mode 100644
index 0000000000000000000000000000000000000000..adf634870799f1740d7f694149a4ca28f9154d4a
--- /dev/null
+++ b/changelog.d/17962.misc
@@ -0,0 +1 @@
+Fix new scheduled tasks jumping the queue.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 7d51441e9145d49aea8b083dc3ede9cf018317de..6ab53566600d67f998afbc736ad776963efd539c 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -495,7 +495,7 @@ class LockReleasedCommand(Command):
 
 
 class NewActiveTaskCommand(_SimpleCommand):
-    """Sent to inform instance handling background tasks that a new active task is available to run.
+    """Sent to inform instance handling background tasks that a new task is ready to run.
 
     Format::
 
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 61012269380fb7fff7ab85e126e343c5b8acc6e8..1fafbb48c3e7b6226eb4a7ee696b27e0ee833916 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -727,7 +727,7 @@ class ReplicationCommandHandler:
     ) -> None:
         """Called when get a new NEW_ACTIVE_TASK command."""
         if self._task_scheduler:
-            self._task_scheduler.launch_task_by_id(cmd.data)
+            self._task_scheduler.on_new_task(cmd.data)
 
     def new_connection(self, connection: IReplicationConnection) -> None:
         """Called when we have a new connection."""
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 448960b297844e3e36c4cd3317b4baa7f1e6f545..3ed457bd30789460639f42503b341f24887e9ca4 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -174,9 +174,10 @@ class TaskScheduler:
             The id of the scheduled task
         """
         status = TaskStatus.SCHEDULED
+        start_now = False
         if timestamp is None or timestamp < self._clock.time_msec():
             timestamp = self._clock.time_msec()
-            status = TaskStatus.ACTIVE
+            start_now = True
 
         task = ScheduledTask(
             random_string(16),
@@ -190,9 +191,11 @@ class TaskScheduler:
         )
         await self._store.insert_scheduled_task(task)
 
-        if status == TaskStatus.ACTIVE:
+        # If the task is ready to run immediately, run the scheduling algorithm now
+        # rather than waiting
+        if start_now:
             if self._run_background_tasks:
-                await self._launch_task(task)
+                self._launch_scheduled_tasks()
             else:
                 self._hs.get_replication_command_handler().send_new_active_task(task.id)
 
@@ -300,23 +303,13 @@ class TaskScheduler:
             raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
         await self._store.delete_scheduled_task(id)
 
-    def launch_task_by_id(self, id: str) -> None:
-        """Try launching the task with the given ID."""
-        # Don't bother trying to launch new tasks if we're already at capacity.
-        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
-            return
-
-        run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
-
-    async def _launch_task_by_id(self, id: str) -> None:
-        """Helper async function for `launch_task_by_id`."""
-        task = await self.get_task(id)
-        if task:
-            await self._launch_task(task)
+    def on_new_task(self, task_id: str) -> None:
+        """Handle a notification that a new ready-to-run task has been added to the queue"""
+        # Just run the scheduler
+        self._launch_scheduled_tasks()
 
-    @wrap_as_background_process("launch_scheduled_tasks")
-    async def _launch_scheduled_tasks(self) -> None:
-        """Retrieve and launch scheduled tasks that should be running at that time."""
+    def _launch_scheduled_tasks(self) -> None:
+        """Retrieve and launch scheduled tasks that should be running at this time."""
         # Don't bother trying to launch new tasks if we're already at capacity.
         if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
             return
@@ -326,20 +319,26 @@ class TaskScheduler:
 
         self._launching_new_tasks = True
 
-        try:
-            for task in await self.get_tasks(
-                statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
-            ):
-                await self._launch_task(task)
-            for task in await self.get_tasks(
-                statuses=[TaskStatus.SCHEDULED],
-                max_timestamp=self._clock.time_msec(),
-                limit=self.MAX_CONCURRENT_RUNNING_TASKS,
-            ):
-                await self._launch_task(task)
-
-        finally:
-            self._launching_new_tasks = False
+        async def inner() -> None:
+            try:
+                for task in await self.get_tasks(
+                    statuses=[TaskStatus.ACTIVE],
+                    limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+                ):
+                    # _launch_task will ignore tasks that we're already running, and
+                    # will also do nothing if we're already at the maximum capacity.
+                    await self._launch_task(task)
+                for task in await self.get_tasks(
+                    statuses=[TaskStatus.SCHEDULED],
+                    max_timestamp=self._clock.time_msec(),
+                    limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+                ):
+                    await self._launch_task(task)
+
+            finally:
+                self._launching_new_tasks = False
+
+        run_as_background_process("launch_scheduled_tasks", inner)
 
     @wrap_as_background_process("clean_scheduled_tasks")
     async def _clean_scheduled_tasks(self) -> None:
diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py
index 30f0510c9f27dc0563aba0464d2ac159e81c99c0..9e403b948be0922cad21569d7cee95220fe2747f 100644
--- a/tests/util/test_task_scheduler.py
+++ b/tests/util/test_task_scheduler.py
@@ -18,8 +18,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
-
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
 
 from twisted.internet.task import deferLater
 from twisted.test.proto_helpers import MemoryReactor
@@ -104,33 +103,43 @@ class TestTaskScheduler(HomeserverTestCase):
                 )
             )
 
-        # This is to give the time to the active tasks to finish
+        def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
+            tasks = (
+                self.get_success(self.task_scheduler.get_task(task_id))
+                for task_id in task_ids
+            )
+            return [t for t in tasks if t is not None and t.status == status]
+
+        # At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
+        # one scheduled task.
+        self.assertEquals(
+            len(get_tasks_of_status(TaskStatus.ACTIVE)),
+            TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
+        )
+        self.assertEquals(
+            len(get_tasks_of_status(TaskStatus.SCHEDULED)),
+            1,
+        )
+
+        # Give the time to the active tasks to finish
         self.reactor.advance(1)
 
-        # Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
+        # Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
         # is still scheduled.
-        tasks = [
-            self.get_success(self.task_scheduler.get_task(task_id))
-            for task_id in task_ids
-        ]
-
         self.assertEquals(
-            len(
-                [t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
-            ),
+            len(get_tasks_of_status(TaskStatus.COMPLETE)),
             TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
         )
-
-        scheduled_tasks = [
-            t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
-        ]
+        scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
         self.assertEquals(len(scheduled_tasks), 1)
 
-        # We need to wait for the next run of the scheduler loop
-        self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
-        self.reactor.advance(1)
+        # The scheduled task should start 0.1s after the first of the active tasks
+        # finishes
+        self.reactor.advance(0.1)
+        self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)
 
-        # Check that the last task has been properly executed after the next scheduler loop run
+        # ... and should finally complete after another second
+        self.reactor.advance(1)
         prev_scheduled_task = self.get_success(
             self.task_scheduler.get_task(scheduled_tasks[0].id)
         )