Skip to content
Snippets Groups Projects
test_linearizer.py 4.25 KiB
Newer Older
  • Learn to ignore specific revisions
  • Erik Johnston's avatar
    Erik Johnston committed
    # Copyright 2016 OpenMarket Ltd
    
    # Copyright 2018 New Vector Ltd
    
    Erik Johnston's avatar
    Erik Johnston committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    from twisted.internet import defer, reactor
    
    from twisted.internet.defer import CancelledError
    
    Erik Johnston's avatar
    Erik Johnston committed
    
    
    from synapse.logging.context import LoggingContext, current_context
    
    from synapse.util.async_helpers import Linearizer
    
    Amber Brown's avatar
    Amber Brown committed
    
    from tests import unittest
    
    Erik Johnston's avatar
    Erik Johnston committed
    
    
    class LinearizerTestCase(unittest.TestCase):
        @defer.inlineCallbacks
        def test_linearizer(self):
            linearizer = Linearizer()
    
            key = object()
    
            d1 = linearizer.queue(key)
            cm1 = yield d1
    
            d2 = linearizer.queue(key)
            self.assertFalse(d2.called)
    
            with cm1:
                self.assertFalse(d2.called)
    
            with (yield d2):
                pass
    
        @defer.inlineCallbacks
        def test_linearizer_is_queued(self):
            linearizer = Linearizer()
    
            key = object()
    
            d1 = linearizer.queue(key)
            cm1 = yield d1
    
            # Since d1 gets called immediately, "is_queued" should return false.
            self.assertFalse(linearizer.is_queued(key))
    
            d2 = linearizer.queue(key)
            self.assertFalse(d2.called)
    
            # Now d2 is queued up behind successful completion of cm1
            self.assertTrue(linearizer.is_queued(key))
    
            with cm1:
                self.assertFalse(d2.called)
    
                # cm1 still not done, so d2 still queued.
                self.assertTrue(linearizer.is_queued(key))
    
            # And now d2 is called and nothing is in the queue again
            self.assertFalse(linearizer.is_queued(key))
    
            with (yield d2):
                self.assertFalse(linearizer.is_queued(key))
    
            self.assertFalse(linearizer.is_queued(key))
    
    
        def test_lots_of_queued_things(self):
            # we have one slow thing, and lots of fast things queued up behind it.
            # it should *not* explode the stack.
            linearizer = Linearizer()
    
            @defer.inlineCallbacks
            def func(i, sleep=False):
    
                with LoggingContext("func(%s)" % i) as lc:
    
                    with (yield linearizer.queue("")):
    
                        self.assertEqual(current_context(), lc)
    
                            yield Clock(reactor).sleep(0)
    
                    self.assertEqual(current_context(), lc)
    
            for i in range(1, 100):
    
    
        @defer.inlineCallbacks
        def test_multiple_entries(self):
            limiter = Linearizer(max_count=3)
    
            key = object()
    
            d1 = limiter.queue(key)
            cm1 = yield d1
    
            d2 = limiter.queue(key)
            cm2 = yield d2
    
            d3 = limiter.queue(key)
            cm3 = yield d3
    
            d4 = limiter.queue(key)
            self.assertFalse(d4.called)
    
            d5 = limiter.queue(key)
            self.assertFalse(d5.called)
    
            with cm1:
                self.assertFalse(d4.called)
                self.assertFalse(d5.called)
    
            cm4 = yield d4
            self.assertFalse(d5.called)
    
            with cm3:
                self.assertFalse(d5.called)
    
            cm5 = yield d5
    
            with cm2:
                pass
    
            with cm4:
                pass
    
            with cm5:
                pass
    
            d6 = limiter.queue(key)
            with (yield d6):
                pass
    
    
        @defer.inlineCallbacks
        def test_cancellation(self):
            linearizer = Linearizer()
    
            key = object()
    
            d1 = linearizer.queue(key)
            cm1 = yield d1
    
            d2 = linearizer.queue(key)
            self.assertFalse(d2.called)
    
            d3 = linearizer.queue(key)
            self.assertFalse(d3.called)
    
            d2.cancel()
    
            with cm1:
                pass
    
            self.assertTrue(d2.called)
            try:
                yield d2
                self.fail("Expected d2 to raise CancelledError")
            except CancelledError:
                pass
    
            with (yield d3):
                pass