Skip to content
Snippets Groups Projects
Commit d04fa1f7 authored by Kegan Dougal's avatar Kegan Dougal
Browse files

Implement ServiceQueuer with tests.

parent 6279285b
No related branches found
No related tags found
No related merge requests found
...@@ -83,7 +83,6 @@ class AppServiceScheduler(object): ...@@ -83,7 +83,6 @@ class AppServiceScheduler(object):
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
) )
self.txn_ctrl.add_recoverers(recoverers) self.txn_ctrl.add_recoverers(recoverers)
self.txn_ctrl.start_polling()
def submit_event_for_as(self, service, event): def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event) self.queuer.enqueue(service, event)
...@@ -96,24 +95,37 @@ class _ServiceQueuer(object): ...@@ -96,24 +95,37 @@ class _ServiceQueuer(object):
""" """
def __init__(self, txn_ctrl): def __init__(self, txn_ctrl):
self.groups = {} # dict of {service: [events]} self.queued_events = {} # dict of {service_id: [events]}
self.pending_requests = {} # dict of {service_id: Deferred}
self.txn_ctrl = txn_ctrl self.txn_ctrl = txn_ctrl
def enqueue(self, service, event): def enqueue(self, service, event):
# if nothing in queue for this service, send event immediately and add # if this service isn't being sent something
# callbacks. if not self.pending_requests.get(service.id):
self.txn_ctrl.send(service, [event]) self._send_request(service, [event])
else:
# else add to queue for this service # add to queue for this service
if service.id not in self.queued_events:
if service not in self.groups: self.queued_events[service.id] = []
self.groups[service] = [] self.queued_events[service.id].append(event)
self.groups[service].append(event)
def _send_request(self, service, events):
def drain_groups(self): # send request and add callbacks
groups = self.groups d = self.txn_ctrl.send(service, events)
self.groups = {} d.addCallback(self._on_request_finish)
return groups d.addErrback(self._on_request_fail)
self.pending_requests[service.id] = d
def _on_request_finish(self, service):
self.pending_requests[service.id] = None
# if there are queued events, then send them.
if (service.id in self.queued_events
and len(self.queued_events[service.id]) > 0):
self._send_request(service, self.queued_events[service.id])
self.queued_events[service.id] = []
def _on_request_fail(self, err):
logger.error("AS request failed: %s", err)
class _TransactionController(object): class _TransactionController(object):
...@@ -142,6 +154,8 @@ class _TransactionController(object): ...@@ -142,6 +154,8 @@ class _TransactionController(object):
self._start_recoverer(service) self._start_recoverer(service)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
# request has finished
defer.returnValue(service)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_recovered(self, recoverer): def on_recovered(self, recoverer):
......
...@@ -197,16 +197,56 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): ...@@ -197,16 +197,56 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def test_send_single_event_no_queue(self): def test_send_single_event_no_queue(self):
# Expect the event to be sent immediately. # Expect the event to be sent immediately.
pass service = Mock(id=4)
event = Mock()
self.queuer.enqueue(service, event)
self.txn_ctrl.send.assert_called_once_with(service, [event])
def test_send_single_event_with_queue(self): def test_send_single_event_with_queue(self):
# - Send an event and don't resolve it just yet. d = defer.Deferred()
# - Send another event: expect send() to NOT be called. self.txn_ctrl.send = Mock(return_value=d)
# - Resolve the send event service = Mock(id=4)
# - Expect queued event to be sent event = Mock(event_id="first")
pass event2 = Mock(event_id="second")
event3 = Mock(event_id="third")
# Send an event and don't resolve it just yet.
self.queuer.enqueue(service, event)
# Send more events: expect send() to NOT be called multiple times.
self.queuer.enqueue(service, event2)
self.queuer.enqueue(service, event3)
self.txn_ctrl.send.assert_called_with(service, [event])
self.assertEquals(1, self.txn_ctrl.send.call_count)
# Resolve the send event: expect the queued events to be sent
d.callback(service)
self.txn_ctrl.send.assert_called_with(service, [event2, event3])
self.assertEquals(2, self.txn_ctrl.send.call_count)
def test_multiple_service_queues(self): def test_multiple_service_queues(self):
# Tests that each service has its own queue, and that they don't block # Tests that each service has its own queue, and that they don't block
# on each other. # on each other.
pass srv1 = Mock(id=4)
srv_1_defer = defer.Deferred()
srv_1_event = Mock(event_id="srv1a")
srv_1_event2 = Mock(event_id="srv1b")
srv2 = Mock(id=6)
srv_2_defer = defer.Deferred()
srv_2_event = Mock(event_id="srv2a")
srv_2_event2 = Mock(event_id="srv2b")
send_return_list = [srv_1_defer, srv_2_defer]
self.txn_ctrl.send = Mock(side_effect=lambda x,y: send_return_list.pop(0))
# send events for different ASes and make sure they are sent
self.queuer.enqueue(srv1, srv_1_event)
self.queuer.enqueue(srv1, srv_1_event2)
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event])
self.queuer.enqueue(srv2, srv_2_event)
self.queuer.enqueue(srv2, srv_2_event2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event])
# make sure callbacks for a service only send queued events for THAT
# service
srv_2_defer.callback(srv2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2])
self.assertEquals(3, self.txn_ctrl.send.call_count)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment