Skip to content
Snippets Groups Projects
Unverified Commit 413482f5 authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Merge pull request #3255 from matrix-org/rav/fix_transactions

Stop the transaction cache caching failures
parents 4aac8892 6e1cb54a
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ import logging ...@@ -19,6 +19,7 @@ import logging
from synapse.api.auth import get_access_token_from_request from synapse.api.auth import get_access_token_from_request
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -80,27 +81,26 @@ class HttpTransactionCache(object): ...@@ -80,27 +81,26 @@ class HttpTransactionCache(object):
Returns: Returns:
Deferred which resolves to a tuple of (response_code, response_dict). Deferred which resolves to a tuple of (response_code, response_dict).
""" """
try: if txn_key in self.transactions:
return self.transactions[txn_key][0].observe() observable = self.transactions[txn_key][0]
except (KeyError, IndexError): else:
pass # execute the function instead. # execute the function instead.
deferred = run_in_background(fn, *args, **kwargs)
deferred = fn(*args, **kwargs)
observable = ObservableDeferred(deferred)
# if the request fails with a Twisted failure, remove it self.transactions[txn_key] = (observable, self.clock.time_msec())
# from the transaction map. This is done to ensure that we don't
# cache transient errors like rate-limiting errors, etc. # if the request fails with an exception, remove it
def remove_from_map(err): # from the transaction map. This is done to ensure that we don't
self.transactions.pop(txn_key, None) # cache transient errors like rate-limiting errors, etc.
return err def remove_from_map(err):
deferred.addErrback(remove_from_map) self.transactions.pop(txn_key, None)
# we deliberately do not propagate the error any further, as we
# We don't add any other errbacks to the raw deferred, so we ask # expect the observers to have reported it.
# ObservableDeferred to swallow the error. This is fine as the error will
# still be reported to the observers. deferred.addErrback(remove_from_map)
observable = ObservableDeferred(deferred, consumeErrors=True)
self.transactions[txn_key] = (observable, self.clock.time_msec()) return make_deferred_yieldable(observable.observe())
return observable.observe()
def _cleanup(self): def _cleanup(self):
now = self.clock.time_msec() now = self.clock.time_msec()
......
...@@ -2,6 +2,9 @@ from synapse.rest.client.transactions import HttpTransactionCache ...@@ -2,6 +2,9 @@ from synapse.rest.client.transactions import HttpTransactionCache
from synapse.rest.client.transactions import CLEANUP_PERIOD_MS from synapse.rest.client.transactions import CLEANUP_PERIOD_MS
from twisted.internet import defer from twisted.internet import defer
from mock import Mock, call from mock import Mock, call
from synapse.util import async
from synapse.util.logcontext import LoggingContext
from tests import unittest from tests import unittest
from tests.utils import MockClock from tests.utils import MockClock
...@@ -39,6 +42,78 @@ class HttpTransactionCacheTestCase(unittest.TestCase): ...@@ -39,6 +42,78 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
# expect only a single call to do the work # expect only a single call to do the work
cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0) cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0)
@defer.inlineCallbacks
def test_logcontexts_with_async_result(self):
@defer.inlineCallbacks
def cb():
yield async.sleep(0)
defer.returnValue("yay")
@defer.inlineCallbacks
def test():
with LoggingContext("c") as c1:
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
self.assertIs(LoggingContext.current_context(), c1)
self.assertEqual(res, "yay")
# run the test twice in parallel
d = defer.gatherResults([test(), test()])
self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
yield d
self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
@defer.inlineCallbacks
def test_does_not_cache_exceptions(self):
"""Checks that, if the callback throws an exception, it is called again
for the next request.
"""
called = [False]
def cb():
if called[0]:
# return a valid result the second time
return defer.succeed(self.mock_http_response)
called[0] = True
raise Exception("boo")
with LoggingContext("test") as test_context:
try:
yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e:
self.assertEqual(e.message, "boo")
self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
self.assertEqual(res, self.mock_http_response)
self.assertIs(LoggingContext.current_context(), test_context)
@defer.inlineCallbacks
def test_does_not_cache_failures(self):
"""Checks that, if the callback returns a failure, it is called again
for the next request.
"""
called = [False]
def cb():
if called[0]:
# return a valid result the second time
return defer.succeed(self.mock_http_response)
called[0] = True
return defer.fail(Exception("boo"))
with LoggingContext("test") as test_context:
try:
yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e:
self.assertEqual(e.message, "boo")
self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
self.assertEqual(res, self.mock_http_response)
self.assertIs(LoggingContext.current_context(), test_context)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_cleans_up(self): def test_cleans_up(self):
cb = Mock( cb = Mock(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment