Skip to content
Snippets Groups Projects
Commit 2b1acb76 authored by Matthew Hodgson's avatar Matthew Hodgson
Browse files

squidge to 79 columns as per pep8

parent 8ada2d20
No related branches found
No related tags found
No related merge requests found
......@@ -778,21 +778,25 @@ class _TransactionQueue(object):
def _attempt_new_transaction(self, destination):
(retry_last_ts, retry_interval) = (0, 0)
retry_timings = yield self.store.get_destination_retry_timings(destination)
retry_timings = yield self.store.get_destination_retry_timings(
destination
)
if retry_timings:
(retry_last_ts, retry_interval) = (
retry_timings.retry_last_ts, retry_timings.retry_interval
)
if retry_last_ts + retry_interval > int(self._clock.time_msec()):
logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination)
logger.info("TX [%s] not ready for retry yet - "
"dropping transaction for now", destination)
return
else:
logger.info("TX [%s] is ready for retry", destination)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending request
# at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these requests
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
return
# list of (pending_pdu, deferred, order)
......@@ -803,8 +807,10 @@ class _TransactionQueue(object):
if not pending_pdus and not pending_edus and not pending_failures:
return
logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)",
destination, len(pending_pdus), len(pending_edus), len(pending_failures))
logger.debug("TX [%s] Attempting new transaction "
"(pdus: %d, edus: %d, failures: %d)",
destination,
len(pending_pdus), len(pending_edus), len(pending_failures))
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[2])
......@@ -837,7 +843,8 @@ class _TransactionQueue(object):
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id)
logger.info("TX [%s] Sending transaction [%s]", destination,
transaction.transaction_id)
# Actually send the transaction
......@@ -874,7 +881,9 @@ class _TransactionQueue(object):
if code == 200:
if retry_last_ts:
# this host is alive! reset retry schedule
yield self.store.set_destination_retry_timings(destination, 0, 0)
yield self.store.set_destination_retry_timings(
destination, 0, 0
)
deferred.callback(None)
else:
self.start_retrying(destination, retry_interval)
......@@ -892,7 +901,8 @@ class _TransactionQueue(object):
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e)
logger.exception("TX [%s] Problem in _attempt_transaction: %s",
destination, e)
self.start_retrying(destination, retry_interval)
......
......@@ -90,7 +90,7 @@ class MatrixFederationHttpClient(object):
)
logger.info("Sending request to %s: %s %s",
destination, method, url_bytes)
destination, method, url_bytes)
logger.debug(
"Types: %s",
......@@ -135,7 +135,7 @@ class MatrixFederationHttpClient(object):
raise SynapseError(400, "Domain specified not found.")
logger.exception("Sending request failed to %s: %s %s : %s",
destination, method, url_bytes, e)
destination, method, url_bytes, e)
_print_ex(e)
if retries_left:
......@@ -145,7 +145,8 @@ class MatrixFederationHttpClient(object):
raise
logger.info("Received response %d %s for %s: %s %s",
response.code, response.phrase, destination, method, url_bytes)
response.code, response.phrase,
destination, method, url_bytes)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
......
......@@ -28,7 +28,8 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
# a write-through cache of DestinationsTable.EntryType indexed by destination string
# a write-through cache of DestinationsTable.EntryType indexed by
# destination string
destination_retry_cache = {}
def get_received_txn_response(self, transaction_id, origin):
......@@ -238,7 +239,8 @@ class TransactionStore(SQLBaseStore):
else:
return None
def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
def set_destination_retry_timings(self, destination,
retry_last_ts, retry_interval):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
......@@ -249,15 +251,19 @@ class TransactionStore(SQLBaseStore):
"""
self.destination_retry_cache[destination] = (
DestinationsTable.EntryType(destination, retry_last_ts, retry_interval)
DestinationsTable.EntryType(destination,
retry_last_ts, retry_interval)
)
# xxx: we could chose to not bother persisting this if our cache things this is a NOOP
# XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
self._set_destination_retry_timings, destination,
retry_last_ts, retry_interval)
def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
def _set_destination_retry_timings(cls, txn, destination,
retry_last_ts, retry_interval):
query = (
"INSERT OR REPLACE INTO %s "
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment