Skip to content
Snippets Groups Projects
Commit 8a1817f0 authored by Erik Johnston's avatar Erik Johnston
Browse files

Use errback pattern and catch async failures

parent 8164f6da
No related branches found
No related tags found
No related merge requests found
...@@ -28,6 +28,7 @@ from synapse.metrics import ( ...@@ -28,6 +28,7 @@ from synapse.metrics import (
event_processing_loop_room_count, event_processing_loop_room_count,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import make_log_failure_errback
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
...@@ -112,7 +113,11 @@ class ApplicationServicesHandler(object): ...@@ -112,7 +113,11 @@ class ApplicationServicesHandler(object):
if not self.started_scheduler: if not self.started_scheduler:
def start_scheduler(): def start_scheduler():
return self.scheduler.start().addErrback(log_failure) return self.scheduler.start().addErrback(
make_log_failure_errback(
"Application Services Failure",
)
)
run_as_background_process("as_scheduler", start_scheduler) run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True self.started_scheduler = True
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
import logging import logging
from itertools import islice from itertools import islice
...@@ -67,9 +66,12 @@ class Clock(object): ...@@ -67,9 +66,12 @@ class Clock(object):
f(function): The function to call repeatedly. f(function): The function to call repeatedly.
msec(float): How long to wait between calls in milliseconds. msec(float): How long to wait between calls in milliseconds.
""" """
call = task.LoopingCall(_log_exception_wrapper(f)) call = task.LoopingCall(f)
call.clock = self._reactor call.clock = self._reactor
call.start(msec / 1000.0, now=False) d = call.start(msec / 1000.0, now=False)
d.addErrback(make_log_failure_errback(
"Looping call died", consumeErrors=False,
))
return call return call
def call_later(self, delay, callback, *args, **kwargs): def call_later(self, delay, callback, *args, **kwargs):
...@@ -112,17 +114,30 @@ def batch_iter(iterable, size): ...@@ -112,17 +114,30 @@ def batch_iter(iterable, size):
return iter(lambda: tuple(islice(sourceiter, size)), ()) return iter(lambda: tuple(islice(sourceiter, size)), ())
def _log_exception_wrapper(f): def make_log_failure_errback(msg, consumeErrors=True):
"""Used to wrap looping calls to log loudly if they get killed """Creates a function suitable for passing to `Deferred.addErrback` that
logs any failures that occur.
Args:
msg (str): Message to log
consumeErrors (bool): If true consumes the failure, otherwise passes
on down the callback chain
Returns:
func(Failure)
""" """
@functools.wraps(f) def log_failure(failure):
def wrap(*args, **kwargs): logger.error(
try: msg,
logger.info("Running looping call") exc_info=(
return f(*args, **kwargs) failure.type,
except: # noqa: E722, as we reraise the exception this is fine. failure.value,
logger.exception("Looping called died") failure.getTracebackObject()
raise )
)
if not consumeErrors:
return failure
return wrap return log_failure
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment