Skip to content
Snippets Groups Projects
Unverified Commit cb20b885 authored by David Robertson's avatar David Robertson Committed by GitHub
Browse files

Always close _all_ `ijson` coroutines, even if doing so raises Exceptions (#14065)

parent 44741aa8
No related branches found
No related tags found
No related merge requests found
Fix a bug introduced in Synapse 1.35.0 where errors parsing a `/send_join` or `/state` response would produce excessive, low-quality Sentry events.
...@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction ...@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction
from synapse.http.matrixfederationclient import ByteParser from synapse.http.matrixfederationclient import ByteParser
from synapse.http.types import QueryParams from synapse.http.types import QueryParams
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import ExceptionBundle
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -926,8 +927,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]): ...@@ -926,8 +927,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
return len(data) return len(data)
def finish(self) -> SendJoinResponse: def finish(self) -> SendJoinResponse:
for c in self._coros: _close_coros(self._coros)
c.close()
if self._response.event_dict: if self._response.event_dict:
self._response.event = make_event_from_dict( self._response.event = make_event_from_dict(
...@@ -970,6 +970,27 @@ class _StateParser(ByteParser[StateRequestResponse]): ...@@ -970,6 +970,27 @@ class _StateParser(ByteParser[StateRequestResponse]):
return len(data) return len(data)
def finish(self) -> StateRequestResponse: def finish(self) -> StateRequestResponse:
for c in self._coros: _close_coros(self._coros)
c.close()
return self._response return self._response
def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
"""Close each of the given coroutines.
Always calls .close() on each coroutine, even if doing so raises an exception.
Any exceptions raised are aggregated into an ExceptionBundle.
:raises ExceptionBundle: if at least one coroutine fails to close.
"""
exceptions = []
for c in coros:
try:
c.close()
except Exception as e:
exceptions.append(e)
if exceptions:
# raise from the first exception so that the traceback has slightly more context
raise ExceptionBundle(
f"There were {len(exceptions)} errors closing coroutines", exceptions
) from exceptions[0]
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import json import json
import logging import logging
import typing import typing
from typing import Any, Callable, Dict, Generator, Optional from typing import Any, Callable, Dict, Generator, Optional, Sequence
import attr import attr
from frozendict import frozendict from frozendict import frozendict
...@@ -193,3 +193,15 @@ def log_failure( ...@@ -193,3 +193,15 @@ def log_failure(
# Version string with git info. Computed here once so that we don't invoke git multiple # Version string with git info. Computed here once so that we don't invoke git multiple
# times. # times.
SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__) SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__)
class ExceptionBundle(Exception):
# A poor stand-in for something like Python 3.11's ExceptionGroup.
# (A backport called `exceptiongroup` exists but seems overkill: we just want a
# container type here.)
def __init__(self, message: str, exceptions: Sequence[Exception]):
parts = [message]
for e in exceptions:
parts.append(str(e))
super().__init__("\n - ".join(parts))
self.exceptions = exceptions
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import json import json
from unittest.mock import Mock
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import RoomVersions
from synapse.federation.transport.client import SendJoinParser from synapse.federation.transport.client import SendJoinParser
...@@ -94,3 +95,39 @@ class SendJoinParserTestCase(TestCase): ...@@ -94,3 +95,39 @@ class SendJoinParserTestCase(TestCase):
# Retrieve and check the parsed SendJoinResponse # Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish() parsed_response = parser.finish()
self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"]) self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])
def test_errors_closing_coroutines(self) -> None:
"""Check we close all coroutines, even if closing the first raises an Exception.
We also check that an Exception of some kind is raised, but we don't make any
assertions about its attributes or type.
"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
serialisation = json.dumps(response).encode()
# Mock the coroutines managed by this parser.
# The first one will error when we try to close it.
coro_1 = Mock()
coro_1.close = Mock(side_effect=RuntimeError("Couldn't close coro 1"))
coro_2 = Mock()
coro_3 = Mock()
coro_3.close = Mock(side_effect=RuntimeError("Couldn't close coro 3"))
parser._coros = [coro_1, coro_2, coro_3]
# Send half of the data to the parser
parser.write(serialisation[: len(serialisation) // 2])
# Close the parser. There should be _some_ kind of exception, but it need not
# be that RuntimeError directly. E.g. we might want to raise a wrapper
# encompassing multiple errors from multiple coroutines.
with self.assertRaises(Exception):
parser.finish()
# In any case, we should have tried to close both coros.
coro_1.close.assert_called()
coro_2.close.assert_called()
coro_3.close.assert_called()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment