Skip to content
Snippets Groups Projects
Unverified Commit 74dd9060 authored by Patrick Cloke's avatar Patrick Cloke Committed by GitHub
Browse files

Avoid raising the body exceeded error multiple times. (#9108)

Previously this code generated unreferenced `Deferred` instances
which caused "Unhandled Deferreds" errors to appear in error
situations.
parent 9ffac2be
No related branches found
No related tags found
No related merge requests found
Fix "Unhandled error in Deferred: BodyExceededMaxSize" errors when .well-known files that are too large.
...@@ -766,14 +766,24 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): ...@@ -766,14 +766,24 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
self.max_size = max_size self.max_size = max_size
def dataReceived(self, data: bytes) -> None: def dataReceived(self, data: bytes) -> None:
# If the deferred was called, bail early.
if self.deferred.called:
return
self.stream.write(data) self.stream.write(data)
self.length += len(data) self.length += len(data)
# The first time the maximum size is exceeded, error and cancel the
# connection. dataReceived might be called again if data was received
# in the meantime.
if self.max_size is not None and self.length >= self.max_size: if self.max_size is not None and self.length >= self.max_size:
self.deferred.errback(BodyExceededMaxSize()) self.deferred.errback(BodyExceededMaxSize())
self.deferred = defer.Deferred()
self.transport.loseConnection() self.transport.loseConnection()
def connectionLost(self, reason: Failure) -> None: def connectionLost(self, reason: Failure) -> None:
# If the maximum size was already exceeded, there's nothing to do.
if self.deferred.called:
return
if reason.check(ResponseDone): if reason.check(ResponseDone):
self.deferred.callback(self.length) self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss): elif reason.check(PotentialDataLoss):
......
...@@ -1095,7 +1095,7 @@ class MatrixFederationAgentTests(unittest.TestCase): ...@@ -1095,7 +1095,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
# Expire both caches and repeat the request # Expire both caches and repeat the request
self.reactor.pump((10000.0,)) self.reactor.pump((10000.0,))
# Repated the request, this time it should fail if the lookup fails. # Repeat the request, this time it should fail if the lookup fails.
fetch_d = defer.ensureDeferred( fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv") self.well_known_resolver.get_well_known(b"testserv")
) )
...@@ -1130,7 +1130,7 @@ class MatrixFederationAgentTests(unittest.TestCase): ...@@ -1130,7 +1130,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
content=b'{ "m.server": "' + (b"a" * WELL_KNOWN_MAX_SIZE) + b'" }', content=b'{ "m.server": "' + (b"a" * WELL_KNOWN_MAX_SIZE) + b'" }',
) )
# The result is sucessful, but disabled delegation. # The result is successful, but disabled delegation.
r = self.successResultOf(fetch_d) r = self.successResultOf(fetch_d)
self.assertIsNone(r.delegated_server) self.assertIsNone(r.delegated_server)
......
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from io import BytesIO
from mock import Mock
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
from tests.unittest import TestCase
class ReadBodyWithMaxSizeTests(TestCase):
def setUp(self):
"""Start reading the body, returns the response, result and proto"""
self.response = Mock()
self.result = BytesIO()
self.deferred = read_body_with_max_size(self.response, self.result, 6)
# Fish the protocol out of the response.
self.protocol = self.response.deliverBody.call_args[0][0]
self.protocol.transport = Mock()
def _cleanup_error(self):
"""Ensure that the error in the Deferred is handled gracefully."""
called = [False]
def errback(f):
called[0] = True
self.deferred.addErrback(errback)
self.assertTrue(called[0])
def test_no_error(self):
"""A response that is NOT too large."""
# Start sending data.
self.protocol.dataReceived(b"12345")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
self.assertEqual(self.result.getvalue(), b"12345")
self.assertEqual(self.deferred.result, 5)
def test_too_large(self):
"""A response which is too large raises an exception."""
# Start sending data.
self.protocol.dataReceived(b"1234567890")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
self.assertEqual(self.result.getvalue(), b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self._cleanup_error()
def test_multiple_packets(self):
"""Data should be accummulated through mutliple packets."""
# Start sending data.
self.protocol.dataReceived(b"12")
self.protocol.dataReceived(b"34")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
self.assertEqual(self.result.getvalue(), b"1234")
self.assertEqual(self.deferred.result, 4)
def test_additional_data(self):
"""A connection can receive data after being closed."""
# Start sending data.
self.protocol.dataReceived(b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self.protocol.transport.loseConnection.assert_called_once()
# More data might have come in.
self.protocol.dataReceived(b"1234567890")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
self.assertEqual(self.result.getvalue(), b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self._cleanup_error()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment