Skip to content
Snippets Groups Projects
Commit 638d35ef authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

Fix linearizer cancellation on twisted < 18.7

Turns out that cancellation of inlineDeferreds didn't really work properly
until Twisted 18.7. This commit refactors Linearizer.queue to avoid
inlineCallbacks.
parent 67dbe4c8
No related branches found
No related tags found
No related merge requests found
......@@ -188,62 +188,30 @@ class Linearizer(object):
# things blocked from executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
# propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1][new_defer] = 1
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
res = self._await_lock(key)
else:
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
res = defer.succeed(None)
# once we successfully get the lock, we need to return a context manager which
# will release the lock.
@contextmanager
def _ctx_manager():
def _ctx_manager(_):
try:
yield
finally:
......@@ -264,7 +232,64 @@ class Linearizer(object):
# map.
del self.key_to_defer[key]
defer.returnValue(_ctx_manager())
res.addCallback(_ctx_manager)
return res
def _await_lock(self, key):
"""Helper for queue: adds a deferred to the queue
Assumes that we've already checked that we've reached the limit of the number
of lock-holders we allow. Creates a new deferred which is added to the list, and
adds some management around cancellations.
Returns the deferred, which will callback once we have secured the lock.
"""
entry = self.key_to_defer[key]
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1
def cb(_r):
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
return self._clock.sleep(0)
def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
return e
new_defer.addCallbacks(cb, eb)
return new_defer
class ReadWriteLock(object):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment