Skip to content
Snippets Groups Projects
Commit f09d2b69 authored by Erik Johnston's avatar Erik Johnston
Browse files

Removed unused stuff

parent 4c3eb14d
No related branches found
No related tags found
No related merge requests found
...@@ -16,13 +16,12 @@ ...@@ -16,13 +16,12 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from twisted.internet import defer, reactor from twisted.internet import defer
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from collections import namedtuple from collections import namedtuple
import itertools
import logging import logging
import ujson as json import ujson as json
...@@ -47,25 +46,6 @@ class TransactionStore(SQLBaseStore): ...@@ -47,25 +46,6 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs. """A collection of queries for handling PDUs.
""" """
def __init__(self, hs):
super(TransactionStore, self).__init__(hs)
# New transactions that are currently in flights
self.inflight_transactions = {}
# Newly delievered transactions that *weren't* persisted while in flight
self.new_delivered_transactions = {}
# Newly delivered transactions that *were* persisted while in flight
self.update_delivered_transactions = {}
self.last_transaction = {}
reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
self._clock.looping_call(self._persist_in_mem_txns, 10 * 1000)
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
def get_received_txn_response(self, transaction_id, origin): def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have """For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response already responded to it. If so, return the response code and response
...@@ -148,46 +128,7 @@ class TransactionStore(SQLBaseStore): ...@@ -148,46 +128,7 @@ class TransactionStore(SQLBaseStore):
Returns: Returns:
list: A list of previous transaction ids. list: A list of previous transaction ids.
""" """
return defer.succeed([])
auto_id = self._transaction_id_gen.get_next()
txn_row = _TransactionRow(
id=auto_id,
transaction_id=transaction_id,
destination=destination,
ts=origin_server_ts,
response_code=0,
response_json=None,
)
self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
prev_txn = self.last_transaction.get(destination)
if prev_txn:
return defer.succeed(prev_txn)
else:
return self.runInteraction(
"_get_prevs_txn",
self._get_prevs_txn,
destination,
)
def _get_prevs_txn(self, txn, destination):
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = (
"SELECT * FROM sent_transactions"
" WHERE destination = ?"
" ORDER BY id DESC LIMIT 1"
)
txn.execute(query, (destination,))
results = self.cursor_to_dict(txn)
prev_txns = [r["transaction_id"] for r in results]
return prev_txns
def delivered_txn(self, transaction_id, destination, code, response_dict): def delivered_txn(self, transaction_id, destination, code, response_dict):
"""Persists the response for an outgoing transaction. """Persists the response for an outgoing transaction.
...@@ -198,52 +139,7 @@ class TransactionStore(SQLBaseStore): ...@@ -198,52 +139,7 @@ class TransactionStore(SQLBaseStore):
code (int) code (int)
response_json (str) response_json (str)
""" """
pass
txn_row = self.inflight_transactions.get(
destination, {}
).pop(transaction_id, None)
self.last_transaction[destination] = transaction_id
if txn_row:
d = self.new_delivered_transactions.setdefault(destination, {})
d[transaction_id] = txn_row._replace(
response_code=code,
response_json=None, # For now, don't persist response
)
else:
d = self.update_delivered_transactions.setdefault(destination, {})
# For now, don't persist response
d[transaction_id] = _UpdateTransactionRow(code, None)
def get_transactions_after(self, transaction_id, destination):
"""Get all transactions after a given local transaction_id.
Args:
transaction_id (str)
destination (str)
Returns:
list: A list of dicts
"""
return self.runInteraction(
"get_transactions_after",
self._get_transactions_after, transaction_id, destination
)
def _get_transactions_after(self, txn, transaction_id, destination):
query = (
"SELECT * FROM sent_transactions"
" WHERE destination = ? AND id >"
" ("
" SELECT id FROM sent_transactions"
" WHERE transaction_id = ? AND destination = ?"
" )"
)
txn.execute(query, (destination, transaction_id, destination))
return self.cursor_to_dict(txn)
@cached(max_entries=10000) @cached(max_entries=10000)
def get_destination_retry_timings(self, destination): def get_destination_retry_timings(self, destination):
...@@ -338,59 +234,3 @@ class TransactionStore(SQLBaseStore): ...@@ -338,59 +234,3 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),)) txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn) return self.cursor_to_dict(txn)
@defer.inlineCallbacks
def _persist_in_mem_txns(self):
try:
inflight = self.inflight_transactions
new_delivered = self.new_delivered_transactions
update_delivered = self.update_delivered_transactions
self.inflight_transactions = {}
self.new_delivered_transactions = {}
self.update_delivered_transactions = {}
full_rows = [
row._asdict()
for txn_map in itertools.chain(inflight.values(), new_delivered.values())
for row in txn_map.values()
]
def f(txn):
if full_rows:
self._simple_insert_many_txn(
txn=txn,
table="sent_transactions",
values=full_rows
)
for dest, txn_map in update_delivered.items():
for txn_id, update_row in txn_map.items():
self._simple_update_one_txn(
txn,
table="sent_transactions",
keyvalues={
"transaction_id": txn_id,
"destination": dest,
},
updatevalues={
"response_code": update_row.response_code,
"response_json": None, # For now, don't persist response
}
)
if full_rows or update_delivered:
yield self.runInteraction("_persist_in_mem_txns", f)
except:
logger.exception("Failed to persist transactions!")
def _cleanup_transactions(self):
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
six_hours_ago = now - 6 * 60 * 60 * 1000
def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,))
return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment