diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 90235ff098bce0e844fe40fd5ae804cf3264cc60..c802dd67a3e6af30cf38a01b9108233ccff57ea8 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -99,7 +99,12 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + # destination -> stream_id of last successfully sent to-device message. + # NB: may be a long or an int. self.last_device_stream_id_by_dest = {} + + # destination -> stream_id of last successfully sent device list + # update. self.last_device_list_stream_id_by_dest = {} # HACK to get unique tx id diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 24b5c79d4ae6ae0d7c8392edaf3750a4d51cfe90..9d1d173b2f64c977c65f7dc5913a27e938976683 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -27,4 +27,9 @@ class SlavedIdTracker(object): self._current = (max if self.step > 0 else min)(self._current, new_id) def get_current_token(self): + """ + + Returns: + int + """ return self._current diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 5c7db5e5f62eeacca631d0dc1a8dd5e81c17f549..7925cb5f1b4d5a31294e049ccf0c7991cdd6e65a 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore): """ Args: destination(str): The name of the remote server. - last_stream_id(int): The last position of the device message stream + last_stream_id(int|long): The last position of the device message stream that the server sent up to. - current_stream_id(int): The current position of the device + current_stream_id(int|long): The current position of the device message stream. Returns: - Deferred ([dict], int): List of messages for the device and where + Deferred ([dict], int|long): List of messages for the device and where in the stream the messages got to. """ diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 563071b7a988f928eb3d8f9b285de347f18c6a0f..e545b62e39c98118ac3a6fbf51cbb4f583023ddf 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore): """Get stream of updates to send to remote servers Returns: - (now_stream_id, [ { updates }, .. ]) + (int, list[dict]): current stream id and list of updates """ now_stream_id = self._device_list_id_gen.get_current_token() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 46cf93ff873f5c8792fd02b0725b58d8053ced5b..95031dc9ec70849fd333cbf00abd8f54d82b17f6 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -30,6 +30,17 @@ class IdGenerator(object): def _load_current_id(db_conn, table, column, step=1): + """ + + Args: + db_conn (object): + table (str): + column (str): + step (int): + + Returns: + int + """ cur = db_conn.cursor() if step == 1: cur.execute("SELECT MAX(%s) FROM %s" % (column, table,)) @@ -131,6 +142,9 @@ class StreamIdGenerator(object): def get_current_token(self): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. + + Returns: + int """ with self._lock: if self._unfinished_ids: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b72bb0ff02cb4194cf78b596a5355f85ccbf2ed3..70fe00ce0b31279664a70024454c96394a7b1e46 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -50,7 +50,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int + assert type(stream_pos) is int or type(stream_pos) is long if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses()