Skip to content
Snippets Groups Projects
Commit 773cb3b6 authored by Kegan Dougal's avatar Kegan Dougal
Browse files

Add stub architecture for txn reliability.

parent e3190711
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@ This module controls the reliability for application service transactions.
The nominal flow through this module looks like:
___________
\O/ --- event -->| | +--------------+
| - event ---->| EventPool |<-- poll 1/s for events ---| EventSorter |
| - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter |
/ \ ---- event ->|___________| +--------------+
USERS ____________________________|
| | |
......@@ -29,7 +29,7 @@ The nominal flow through this module looks like:
V
-````````- +------------+
|````````|<--StoreTxn-|Transaction |
|Database| | Maker |---> SEND TO AS
|Database| | Controller |---> SEND TO AS
`--------` +------------+
What happens on SEND TO AS depends on the state of the Application Service:
- If the AS is marked as DOWN, do nothing.
......@@ -49,20 +49,121 @@ UP & quit +---------- YES SUCCESS
| | |
NO <--- Have more txns? <------ Mark txn success & nuke -+
from db; incr AS pos.
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
class EventPool(object):
pass
class AppServiceScheduler(object):
""" Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this
case is a simple array.
"""
def __init__(self, store, as_api, services):
self.app_services = services
self.event_pool = []
def create_recoverer(service):
return _Recoverer(store, as_api, service)
self.txn_ctrl = _TransactionController(store, as_api, create_recoverer)
self.event_sorter = _EventSorter(self, self.txn_ctrl, services)
def start(self):
self.event_sorter.start_polling()
def store_event(self, event): # event_pool
self.event_pool.append(event)
def get_events(self): # event_pool
return self.event_pool
class AppServiceTransaction(object):
"""Represents an application service transaction."""
def __init__(self, service, id, events):
self.service = service
self.id = id
self.events = events
def send(self, as_api):
# sends this transaction using this as_api
pass
def complete(self, store):
# increment txn id on AS and nuke txn contents from db
pass
class _EventSorter(object):
def __init__(self, event_pool, txn_ctrl, services):
self.event_pool = event_pool
self.txn_ctrl = txn_ctrl
self.services = services
def start_polling(self):
events = self.event_pool.get_events()
if events:
self._process(events)
# repoll later on
def _process(self, events):
# sort events
# f.e. (AS, events) => poke transaction controller
pass
class _TransactionController(object):
def __init__(self, store, as_api, recoverer_fn):
self.store = store
self.as_api = as_api
self.recoverer_fn = recoverer_fn
def on_receive_events(self, service, events):
txn = self._store_txn(service, events)
if txn.send(self.as_api):
txn.complete(self.store)
else:
self._start_recoverer(service)
def _start_recoverer(self, service):
recoverer = self.recoverer_fn(service)
recoverer.recover()
def _store_txn(self, service, events):
pass # returns AppServiceTransaction
class _Recoverer(object):
def __init__(self, store, as_api, service):
self.store = store
self.as_api = as_api
self.service = service
self.backoff_counter = 1
class EventSorter(object):
pass
def recover(self):
# TODO wait a bit
txn = self._get_oldest_txn()
if txn:
if txn.send(self.as_api):
txn.complete(self.store)
else:
self.backoff_counter += 1
self.recover(self.service)
return
else:
self._set_service_recovered(self.service)
def _set_service_recovered(self, service):
pass
class TransactionMaker(object):
pass
def _get_oldest_txn(self):
pass # returns AppServiceTransaction
class Recoverer(object):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment