diff --git a/changelog.d/12105.bugfix b/changelog.d/12105.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..f42e63e01fb72b7f8405ab877ede06d21dbe9cb0
--- /dev/null
+++ b/changelog.d/12105.bugfix
@@ -0,0 +1 @@
+Fix an extremely rare, long-standing bug in `ReadWriteLock` that would cause an error when a newly unblocked writer completes instantly.
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 81320b89721eb8bfa0bb6702582b6ab4f38b7b40..60c03a66fd16569fb29fbf16ddbd76ece64b815d 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -555,7 +555,10 @@ class ReadWriteLock:
             finally:
                 with PreserveLoggingContext():
                     new_defer.callback(None)
-                if self.key_to_current_writer[key] == new_defer:
+                # `self.key_to_current_writer[key]` may be missing if there was another
+                # writer waiting for us and it completed entirely within the
+                # `new_defer.callback()` call above.
+                if self.key_to_current_writer.get(key) == new_defer:
                     self.key_to_current_writer.pop(key)
 
         return _ctx_manager()
diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py
index a10071c70fc91f9ca88b2f894d5ad1967da9df54..0774625b85519fe1d7a1b5ecd859e6485e4c9077 100644
--- a/tests/util/test_rwlock.py
+++ b/tests/util/test_rwlock.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 from twisted.internet import defer
+from twisted.internet.defer import Deferred
 
 from synapse.util.async_helpers import ReadWriteLock
 
@@ -83,3 +84,32 @@ class ReadWriteLockTestCase(unittest.TestCase):
         self.assertTrue(d.called)
         with d.result:
             pass
+
+    def test_lock_handoff_to_nonblocking_writer(self):
+        """Test a writer handing the lock to another writer that completes instantly."""
+        rwlock = ReadWriteLock()
+        key = "key"
+
+        unblock: "Deferred[None]" = Deferred()
+
+        async def blocking_write():
+            with await rwlock.write(key):
+                await unblock
+
+        async def nonblocking_write():
+            with await rwlock.write(key):
+                pass
+
+        d1 = defer.ensureDeferred(blocking_write())
+        d2 = defer.ensureDeferred(nonblocking_write())
+        self.assertFalse(d1.called)
+        self.assertFalse(d2.called)
+
+        # Unblock the first writer. The second writer will complete without blocking.
+        unblock.callback(None)
+        self.assertTrue(d1.called)
+        self.assertTrue(d2.called)
+
+        # The `ReadWriteLock` should operate as normal.
+        d3 = defer.ensureDeferred(nonblocking_write())
+        self.assertTrue(d3.called)