Skip to content
Snippets Groups Projects
Commit 1d4deff2 authored by Mark Haines's avatar Mark Haines
Browse files

Separate generating the replication response...

from doing the http request parsing to make it easier
to write unit tests for replication.
parent bf14883a
No related branches found
No related tags found
No related merge requests found
......@@ -145,32 +145,43 @@ class ReplicationResource(Resource):
timeout = parse_integer(request, "timeout", 10 * 1000)
request.setHeader(b"Content-Type", b"application/json")
writer = _Writer(request)
@defer.inlineCallbacks
def replicate():
current_token = yield self.current_replication_token()
logger.info("Replicating up to %r", current_token)
yield self.account_data(writer, current_token, limit)
yield self.events(writer, current_token, limit)
yield self.presence(writer, current_token) # TODO: implement limit
yield self.typing(writer, current_token) # TODO: implement limit
yield self.receipts(writer, current_token, limit)
yield self.push_rules(writer, current_token, limit)
yield self.pushers(writer, current_token, limit)
yield self.state(writer, current_token, limit)
self.streams(writer, current_token)
request_streams = {
name: parse_integer(request, name)
for names in STREAM_NAMES for name in names
}
request_streams["streams"] = parse_string(request, "streams")
logger.info("Replicated %d rows", writer.total)
defer.returnValue(writer.total)
def replicate():
return self.replicate(request_streams, limit)
yield self.notifier.wait_for_replication(replicate, timeout)
result = yield self.notifier.wait_for_replication(replicate, timeout)
writer.finish()
request.write(json.dumps(result, ensure_ascii=False))
finish_request(request)
def streams(self, writer, current_token):
request_token = parse_string(writer.request, "streams")
@defer.inlineCallbacks
def replicate(self, request_streams, limit):
writer = _Writer()
current_token = yield self.current_replication_token()
logger.info("Replicating up to %r", current_token)
yield self.account_data(writer, current_token, limit, request_streams)
yield self.events(writer, current_token, limit, request_streams)
# TODO: implement limit
yield self.presence(writer, current_token, request_streams)
yield self.typing(writer, current_token, request_streams)
yield self.receipts(writer, current_token, limit, request_streams)
yield self.push_rules(writer, current_token, limit, request_streams)
yield self.pushers(writer, current_token, limit, request_streams)
yield self.state(writer, current_token, limit, request_streams)
self.streams(writer, current_token, request_streams)
logger.info("Replicated %d rows", writer.total)
defer.returnValue(writer.finish())
def streams(self, writer, current_token, request_streams):
request_token = request_streams.get("streams")
streams = []
......@@ -195,9 +206,9 @@ class ReplicationResource(Resource):
)
@defer.inlineCallbacks
def events(self, writer, current_token, limit):
request_events = parse_integer(writer.request, "events")
request_backfill = parse_integer(writer.request, "backfill")
def events(self, writer, current_token, limit, request_streams):
request_events = request_streams.get("events")
request_backfill = request_streams.get("backfill")
if request_events is not None or request_backfill is not None:
if request_events is None:
......@@ -228,10 +239,10 @@ class ReplicationResource(Resource):
)
@defer.inlineCallbacks
def presence(self, writer, current_token):
def presence(self, writer, current_token, request_streams):
current_position = current_token.presence
request_presence = parse_integer(writer.request, "presence")
request_presence = request_streams.get("presence")
if request_presence is not None:
presence_rows = yield self.presence_handler.get_all_presence_updates(
......@@ -244,10 +255,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def typing(self, writer, current_token):
def typing(self, writer, current_token, request_streams):
current_position = current_token.presence
request_typing = parse_integer(writer.request, "typing")
request_typing = request_streams.get("typing")
if request_typing is not None:
typing_rows = yield self.typing_handler.get_all_typing_updates(
......@@ -258,10 +269,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def receipts(self, writer, current_token, limit):
def receipts(self, writer, current_token, limit, request_streams):
current_position = current_token.receipts
request_receipts = parse_integer(writer.request, "receipts")
request_receipts = request_streams.get("receipts")
if request_receipts is not None:
receipts_rows = yield self.store.get_all_updated_receipts(
......@@ -272,12 +283,12 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def account_data(self, writer, current_token, limit):
def account_data(self, writer, current_token, limit, request_streams):
current_position = current_token.account_data
user_account_data = parse_integer(writer.request, "user_account_data")
room_account_data = parse_integer(writer.request, "room_account_data")
tag_account_data = parse_integer(writer.request, "tag_account_data")
user_account_data = request_streams.get("user_account_data")
room_account_data = request_streams.get("room_account_data")
tag_account_data = request_streams.get("tag_account_data")
if user_account_data is not None or room_account_data is not None:
if user_account_data is None:
......@@ -303,10 +314,10 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def push_rules(self, writer, current_token, limit):
def push_rules(self, writer, current_token, limit, request_streams):
current_position = current_token.push_rules
push_rules = parse_integer(writer.request, "push_rules")
push_rules = request_streams.get("push_rules")
if push_rules is not None:
rows = yield self.store.get_all_push_rule_updates(
......@@ -318,10 +329,11 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def pushers(self, writer, current_token, limit):
def pushers(self, writer, current_token, limit, request_streams):
current_position = current_token.pushers
pushers = parse_integer(writer.request, "pushers")
pushers = request_streams.get("pushers")
if pushers is not None:
updated, deleted = yield self.store.get_all_updated_pushers(
pushers, current_position, limit
......@@ -336,10 +348,11 @@ class ReplicationResource(Resource):
))
@defer.inlineCallbacks
def state(self, writer, current_token, limit):
def state(self, writer, current_token, limit, request_streams):
current_position = current_token.state
state = parse_integer(writer.request, "state")
state = request_streams.get("state")
if state is not None:
state_groups, state_group_state = (
yield self.store.get_all_new_state_groups(
......@@ -356,9 +369,8 @@ class ReplicationResource(Resource):
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
def __init__(self, request):
def __init__(self):
self.streams = {}
self.request = request
self.total = 0
def write_header_and_rows(self, name, rows, fields, position=None):
......@@ -377,8 +389,7 @@ class _Writer(object):
self.total += len(rows)
def finish(self):
self.request.write(json.dumps(self.streams, ensure_ascii=False))
finish_request(self.request)
return self.streams
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment