Skip to content
Snippets Groups Projects
synapse_port_db 41.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • Matthew Hodgson's avatar
    Matthew Hodgson committed
    # Copyright 2015, 2016 OpenMarket Ltd
    
    # Copyright 2018 New Vector Ltd
    
    # Copyright 2019 The Matrix.org Foundation C.I.C.
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import argparse
    
    from typing import Dict, Iterable, Optional, Set
    
    from synapse.config.database import DatabaseConnectionConfig
    
    from synapse.config.homeserver import HomeServerConfig
    
    from synapse.logging.context import (
        LoggingContext,
        make_deferred_yieldable,
        run_in_background,
    )
    
    from synapse.storage.database import DatabasePool, make_conn
    from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
    from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
    from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
    
    from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackgroundStore
    
    from synapse.storage.databases.main.events_bg_updates import (
    
    Andrew Morgan's avatar
    Andrew Morgan committed
        EventsBackgroundUpdatesStore,
    
    from synapse.storage.databases.main.media_repository import (
    
    Andrew Morgan's avatar
    Andrew Morgan committed
        MediaRepositoryBackgroundUpdateStore,
    
    from synapse.storage.databases.main.pusher import PusherWorkerStore
    
    from synapse.storage.databases.main.registration import (
    
    Andrew Morgan's avatar
    Andrew Morgan committed
        RegistrationBackgroundUpdateStore,
    
        find_max_generated_user_id_localpart,
    
    from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
    from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
    from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
    from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
    from synapse.storage.databases.main.stats import StatsStore
    from synapse.storage.databases.main.user_directory import (
    
    Andrew Morgan's avatar
    Andrew Morgan committed
        UserDirectoryBackgroundUpdateStore,
    
    from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
    
    Andrew Morgan's avatar
    Andrew Morgan committed
    from synapse.storage.engines import create_engine
    from synapse.storage.prepare_database import prepare_database
    
    from synapse.util import Clock
    
    from synapse.util.versionstring import get_version_string
    
    logger = logging.getLogger("synapse_port_db")
    
    BOOLEAN_COLUMNS = {
    
    Brendan Abolivier's avatar
    Brendan Abolivier committed
        "events": ["processed", "outlier", "contains_url"],
    
        "rooms": ["is_public", "has_auth_chain_index"],
    
        "event_edges": ["is_state"],
        "presence_list": ["accepted"],
    
        "presence_stream": ["currently_active"],
    
        "public_room_list_stream": ["visibility"],
    
        "devices": ["hidden"],
    
    Kevin Liu's avatar
    Kevin Liu committed
        "device_lists_outbound_pokes": ["sent"],
    
        "users_who_share_rooms": ["share_private"],
    
        "groups": ["is_public"],
    
    Erik Johnston's avatar
    Erik Johnston committed
        "group_rooms": ["is_public"],
        "group_users": ["is_public", "is_admin"],
    
        "group_summary_rooms": ["is_public"],
        "group_room_categories": ["is_public"],
        "group_summary_users": ["is_public"],
        "group_roles": ["is_public"],
        "local_group_membership": ["is_publicised", "is_admin"],
    
        "e2e_room_keys": ["is_verified"],
    
        "redactions": ["have_censored"],
        "room_stats_state": ["is_federatable"],
    
        "local_media_repository": ["safe_from_quarantine"],
    
        "e2e_fallback_keys_json": ["used"],
    
    APPEND_ONLY_TABLES = [
        "event_reference_hashes",
        "events",
        "event_json",
        "state_events",
        "room_memberships",
        "topics",
        "room_names",
        "rooms",
        "local_media_repository",
        "local_media_repository_thumbnails",
        "remote_media_cache",
        "remote_media_cache_thumbnails",
        "redactions",
        "event_edges",
        "event_auth",
        "received_transactions",
        "sent_transactions",
        "transaction_id_to_pdu",
        "users",
        "state_groups",
        "state_groups_state",
        "event_to_state_groups",
        "rejections",
    
        "presence_stream",
        "push_rules_stream",
        "ex_outlier_stream",
    
        "cache_invalidation_stream_by_instance",
    
        "public_room_list_stream",
        "state_group_edges",
        "stream_ordering_to_exterm",
    
        # We don't port these tables, as they're a faff and we can regenerate
        # them anyway.
    
        "user_directory",
        "user_directory_search",
    
        "user_directory_search_content",
        "user_directory_search_docsize",
        "user_directory_search_segdir",
        "user_directory_search_segments",
        "user_directory_search_stat",
        "user_directory_search_pos",
        "users_who_share_private_rooms",
        "users_in_public_room",
        # UI auth sessions have foreign keys so additional care needs to be taken,
        # the sessions are transient anyway, so ignore them.
    
        "ui_auth_sessions",
        "ui_auth_sessions_credentials",
    
    # Error returned by the run function. Used at the top-level part of the script to
    # handle errors and return codes.
    
    end_error = None  # type: Optional[str]
    
    # The exec_info for the error, if any. If error is defined but not exec_info the script
    # will show only the error message without the stacktrace, if exec_info is defined but
    # not the error then the script will show nothing outside of what's printed in the run
    # function. If both are defined, the script will print both the error and the stacktrace.
    
    class Store(
        ClientIpBackgroundUpdateStore,
        DeviceInboxBackgroundUpdateStore,
        DeviceBackgroundUpdateStore,
        EventsBackgroundUpdatesStore,
        MediaRepositoryBackgroundUpdateStore,
        RegistrationBackgroundUpdateStore,
    
        RoomBackgroundUpdateStore,
    
        RoomMemberBackgroundUpdateStore,
        SearchBackgroundUpdateStore,
        StateBackgroundUpdateStore,
    
        MainStateBackgroundUpdateStore,
    
        UserDirectoryBackgroundUpdateStore,
    
        def execute(self, f, *args, **kwargs):
    
            return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
    
        def execute_sql(self, sql, *args):
            def r(txn):
                txn.execute(sql, args)
                return txn.fetchall()
    
            return self.db_pool.runInteraction("execute_sql", r)
    
        def insert_many_txn(self, txn, table, headers, rows):
    
            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                table,
                ", ".join(k for k in headers),
    
            try:
                txn.executemany(sql, rows)
    
            except Exception:
                logger.exception("Failed to insert: %s", table)
    
        def set_room_is_public(self, room_id, is_public):
            raise Exception(
                "Attempt to set room_is_public during port_db: database not empty?"
            )
    
    
        def __init__(self, config):
    
            self.clock = Clock(reactor)
            self.config = config
            self.hostname = config.server_name
    
            self.version_string = "Synapse/" + get_version_string(synapse)
    
        def get_reactor(self):
            return reactor
    
    
        def get_instance_name(self):
            return "master"
    
    
    class Porter(object):
        def __init__(self, **kwargs):
            self.__dict__.update(kwargs)
    
        async def setup_table(self, table):
    
                row = await self.postgres_store.db_pool.simple_select_one(
    
                    table="port_from_sqlite3",
                    keyvalues={"table_name": table},
    
                    retcols=("forward_rowid", "backward_rowid"),
    
                        (
                            forward_chunk,
                            already_ported,
                            total_to_port,
    
                        ) = await self._setup_sent_transactions()
    
                        await self.postgres_store.db_pool.simple_insert(
    
                            values={
                                "table_name": table,
                                "forward_rowid": 1,
                                "backward_rowid": 0,
    
                        forward_chunk = 1
                        backward_chunk = 0
    
                else:
                    forward_chunk = row["forward_rowid"]
                    backward_chunk = row["backward_rowid"]
    
                    already_ported, total_to_port = await self._get_total_count_to_port(
    
                        table, forward_chunk, backward_chunk
    
                        "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,)
    
                await self.postgres_store.execute(delete_all)
    
                await self.postgres_store.db_pool.simple_insert(
    
                    table="port_from_sqlite3",
    
                    values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
    
                forward_chunk = 1
                backward_chunk = 0
    
                already_ported, total_to_port = await self._get_total_count_to_port(
    
                    table, forward_chunk, backward_chunk
    
            return table, already_ported, total_to_port, forward_chunk, backward_chunk
    
        async def get_table_constraints(self) -> Dict[str, Set[str]]:
            """Returns a map of tables that have foreign key constraints to tables they depend on.
            """
    
            def _get_constraints(txn):
                # We can pull the information about foreign key constraints out from
                # the postgres schema tables.
                sql = """
                    SELECT DISTINCT
                        tc.table_name,
                        ccu.table_name AS foreign_table_name
                    FROM
                        information_schema.table_constraints AS tc
                        INNER JOIN information_schema.constraint_column_usage AS ccu
                        USING (table_schema, constraint_name)
                    WHERE tc.constraint_type = 'FOREIGN KEY';
                """
                txn.execute(sql)
    
                results = {}
                for table, foreign_table in txn:
                    results.setdefault(table, set()).add(foreign_table)
                return results
    
            return await self.postgres_store.db_pool.runInteraction(
                "get_table_constraints", _get_constraints
            )
    
    
        async def handle_table(
    
            self, table, postgres_size, table_size, forward_chunk, backward_chunk
        ):
    
            logger.info(
                "Table %s: %i/%i (rows %i-%i) already ported",
    
                table,
                postgres_size,
                table_size,
                backward_chunk + 1,
                forward_chunk - 1,
    
            self.progress.add_table(table, postgres_size, table_size)
    
    Erik Johnston's avatar
    Erik Johnston committed
            if table == "event_search":
    
                await self.handle_search_table(
    
                    postgres_size, table_size, forward_chunk, backward_chunk
                )
    
    Erik Johnston's avatar
    Erik Johnston committed
                return
    
    
                self.progress.update(table, table_size)  # Mark table as done
                return
    
            if table == "user_directory_stream_pos":
    
    Erik Johnston's avatar
    Erik Johnston committed
                # We need to make sure there is a single row, `(X, null), as that is
                # what synapse expects to be there.
    
                await self.postgres_store.db_pool.simple_insert(
    
                )
                self.progress.update(table, table_size)  # Mark table as done
                return
    
    
                "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
    
                "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,)
    
                    forward_rows = []
                    backward_rows = []
                    if do_forward[0]:
    
                        txn.execute(forward_select, (forward_chunk, self.batch_size))
    
                        forward_rows = txn.fetchall()
                        if not forward_rows:
                            do_forward[0] = False
    
                    if do_backward[0]:
    
                        txn.execute(backward_select, (backward_chunk, self.batch_size))
    
                        backward_rows = txn.fetchall()
                        if not backward_rows:
                            do_backward[0] = False
    
                    if forward_rows or backward_rows:
                        headers = [column[0] for column in txn.description]
                    else:
                        headers = None
    
                    return headers, forward_rows, backward_rows
    
                headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
    
    Erik Johnston's avatar
    Erik Johnston committed
                    "select", r
                )
    
                if frows or brows:
                    if frows:
                        forward_chunk = max(row[0] for row in frows) + 1
                    if brows:
                        backward_chunk = min(row[0] for row in brows) - 1
    
                    rows = self._convert_rows(table, headers, rows)
    
    Erik Johnston's avatar
    Erik Johnston committed
                    def insert(txn):
    
                        self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
    
                        self.postgres_store.db_pool.simple_update_one_txn(
    
    Erik Johnston's avatar
    Erik Johnston committed
                            txn,
                            table="port_from_sqlite3",
                            keyvalues={"table_name": table},
    
                            updatevalues={
                                "forward_rowid": forward_chunk,
                                "backward_rowid": backward_chunk,
                            },
    
                    await self.postgres_store.execute(insert)
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                    postgres_size += len(rows)
    
                    self.progress.update(table, postgres_size)
                else:
                    return
    
    
        async def handle_search_table(
    
            self, postgres_size, table_size, forward_chunk, backward_chunk
        ):
    
    Erik Johnston's avatar
    Erik Johnston committed
            select = (
                "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
                " FROM event_search as es"
                " INNER JOIN events AS e USING (event_id, room_id)"
                " WHERE es.rowid >= ?"
                " ORDER BY es.rowid LIMIT ?"
            )
    
    Erik Johnston's avatar
    Erik Johnston committed
            while True:
    
    Erik Johnston's avatar
    Erik Johnston committed
                def r(txn):
    
                    txn.execute(select, (forward_chunk, self.batch_size))
    
    Erik Johnston's avatar
    Erik Johnston committed
                    rows = txn.fetchall()
                    headers = [column[0] for column in txn.description]
    
                    return headers, rows
    
    
                headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                if rows:
    
                    forward_chunk = rows[-1][0] + 1
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                    # We have to treat event_search differently since it has a
                    # different structure in the two different databases.
                    def insert(txn):
                        sql = (
                            "INSERT INTO event_search (event_id, room_id, key,"
                            " sender, vector, origin_server_ts, stream_ordering)"
                            " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
                        )
    
    
                        rows_dict = []
                        for row in rows:
                            d = dict(zip(headers, row))
    
                            if "\0" in d["value"]:
                                logger.warning("dropping search row %s", d)
    
                            else:
                                rows_dict.append(d)
    
                        txn.executemany(
                            sql,
                            [
                                (
                                    row["event_id"],
                                    row["room_id"],
                                    row["key"],
                                    row["sender"],
                                    row["value"],
                                    row["origin_server_ts"],
                                    row["stream_ordering"],
                                )
                                for row in rows_dict
                            ],
                        )
    
                        self.postgres_store.db_pool.simple_update_one_txn(
    
    Erik Johnston's avatar
    Erik Johnston committed
                            txn,
                            table="port_from_sqlite3",
                            keyvalues={"table_name": "event_search"},
    
                            updatevalues={
                                "forward_rowid": forward_chunk,
                                "backward_rowid": backward_chunk,
                            },
    
    Erik Johnston's avatar
    Erik Johnston committed
                        )
    
                    await self.postgres_store.execute(insert)
    
    Erik Johnston's avatar
    Erik Johnston committed
                    self.progress.update("event_search", postgres_size)
    
    
        def build_db_store(
            self, db_config: DatabaseConnectionConfig, allow_outdated_version: bool = False,
        ):
    
            """Builds and returns a database store using the provided configuration.
    
                db_config: The database configuration
                allow_outdated_version: True to suppress errors about the database server
                    version being too old to run a complete synapse
    
            self.progress.set_state("Preparing %s" % db_config.config["name"])
    
            engine = create_engine(db_config.config)
    
            hs = MockHomeserver(self.hs_config)
    
            with make_conn(db_config, engine, "portdb") as db_conn:
    
                engine.check_database(
                    db_conn, allow_outdated_version=allow_outdated_version
                )
    
                prepare_database(db_conn, engine, config=self.hs_config)
    
                store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
    
        async def run_background_updates_on_postgres(self):
    
            # Manually apply all background updates on the PostgreSQL database.
    
    Erik Johnston's avatar
    Erik Johnston committed
            postgres_ready = (
    
                await self.postgres_store.db_pool.updates.has_completed_background_updates()
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
    
    
            if not postgres_ready:
                # Only say that we're running background updates when there are background
                # updates to run.
                self.progress.set_state("Running background updates on PostgreSQL")
    
            while not postgres_ready:
    
                await self.postgres_store.db_pool.updates.do_next_background_update(100)
    
                postgres_ready = await (
    
                    self.postgres_store.db_pool.updates.has_completed_background_updates()
    
        async def run(self):
            """Ports the SQLite database to a PostgreSQL database.
    
            When a fatal error is met, its message is assigned to the global "end_error"
            variable. When this error comes with a stacktrace, its exec_info is assigned to
            the global "end_error_exec_info" variable.
            """
            global end_error
    
    
                # we allow people to port away from outdated versions of sqlite.
    
                self.sqlite_store = self.build_db_store(
    
                    DatabaseConnectionConfig("master-sqlite", self.sqlite_config),
                    allow_outdated_version=True,
    
    
                # Check if all background updates are done, abort if not.
    
                    await self.sqlite_store.db_pool.updates.has_completed_background_updates()
    
                        "Pending background updates exist in the SQLite3 database."
                        " Please start Synapse again and wait until every update has finished"
                        " before running this script.\n"
                    )
    
                self.postgres_store = self.build_db_store(
    
                    self.hs_config.get_single_database()
    
                await self.run_background_updates_on_postgres()
    
                self.progress.set_state("Creating port tables")
    
                        "CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
    
                        " forward_rowid bigint NOT NULL,"
                        " backward_rowid bigint NOT NULL"
    
                # The old port script created a table with just a "rowid" column.
                # We want people to be able to rerun this script from an old port
                # so that they can pick up any missing events that were not
                # ported across.
                def alter_table(txn):
                    txn.execute(
                        "ALTER TABLE IF EXISTS port_from_sqlite3"
                        " RENAME rowid TO forward_rowid"
                    )
                    txn.execute(
                        "ALTER TABLE IF EXISTS port_from_sqlite3"
                        " ADD backward_rowid bigint NOT NULL DEFAULT 0"
                    )
    
                try:
    
                    await self.postgres_store.db_pool.runInteraction(
                        "alter_table", alter_table
                    )
    
                except Exception:
                    # On Error Resume Next
    
                await self.postgres_store.db_pool.runInteraction(
    
                    "create_port_table", create_port_table
                )
    
                # Step 2. Set up sequences
                #
                # We do this before porting the tables so that event if we fail half
                # way through the postgres DB always have sequences that are greater
                # than their respective tables. If we don't then creating the
                # `DataStore` object will fail due to the inconsistency.
                self.progress.set_state("Setting up sequence generators")
                await self._setup_state_group_id_seq()
                await self._setup_user_id_seq()
                await self._setup_events_stream_seqs()
    
                await self._setup_sequence(
                    "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
                )
                await self._setup_sequence(
    
                    "account_data_sequence",
                    ("room_account_data", "room_tags_revisions", "account_data"),
                )
                await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
                await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
    
                await self._setup_auth_chain_sequence()
    
                self.progress.set_state("Fetching tables")
    
                sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
    
                    table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
    
                postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
    
                    table="information_schema.tables",
                    keyvalues={},
                    retcol="distinct table_name",
                )
    
                tables = set(sqlite_tables) & set(postgres_tables)
                logger.info("Found %d tables", len(tables))
    
                # Step 4. Figure out what still needs copying
    
                self.progress.set_state("Checking on port progress")
    
                setup_res = await make_deferred_yieldable(
                    defer.gatherResults(
                        [
                            run_in_background(self.setup_table, table)
                            for table in tables
                            if table not in ["schema_version", "applied_schema_deltas"]
                            and not table.startswith("sqlite_")
                        ],
                        consumeErrors=True,
                    )
    
                # Map from table name to args passed to `handle_table`, i.e. a tuple
                # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
                tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}
    
                #
                # This is slightly convoluted as we need to ensure tables are ported
                # in the correct order due to foreign key constraints.
    
                self.progress.set_state("Copying to postgres")
    
    
                constraints = await self.get_table_constraints()
                tables_ported = set()  # type: Set[str]
    
                while tables_to_port_info_map:
                    # Pulls out all tables that are still to be ported and which
                    # only depend on tables that are already ported (if any).
                    tables_to_port = [
                        table
                        for table in tables_to_port_info_map
                        if not constraints.get(table, set()) - tables_ported
                    ]
    
                    await make_deferred_yieldable(
                        defer.gatherResults(
                            [
                                run_in_background(
                                    self.handle_table,
                                    table,
                                    *tables_to_port_info_map.pop(table),
                                )
                                for table in tables_to_port
                            ],
                            consumeErrors=True,
                        )
    
    
                    tables_ported.update(tables_to_port)
    
            except Exception as e:
    
                end_error_exec_info = sys.exc_info()
                logger.exception("")
            finally:
                reactor.stop()
    
        def _convert_rows(self, table, headers, rows):
            bool_col_names = BOOLEAN_COLUMNS.get(table, [])
    
    
            bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
    
            class BadValueException(Exception):
                pass
    
    
            def conv(j, col):
                if j in bool_cols:
                    return bool(col)
    
                if isinstance(col, bytes):
                    return bytearray(col)
    
                        "DROPPING ROW: NUL value in table %s col %s: %r",
                        table,
                        headers[j],
                        col,
                    )
                    raise BadValueException()
    
                    outrows.append(
                        tuple(conv(j, col) for j, col in enumerate(row) if j > 0)
                    )
    
                except BadValueException:
                    pass
    
            return outrows
    
        async def _setup_sent_transactions(self):
    
            yesterday = int(time.time() * 1000) - 86400000
    
    
            # And save the max transaction id from each destination
            select = (
                "SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
                "SELECT max(rowid) FROM sent_transactions"
                " GROUP BY destination"
                ")"
            )
    
            def r(txn):
                txn.execute(select)
                rows = txn.fetchall()
                headers = [column[0] for column in txn.description]
    
    
    
                return headers, [r for r in rows if r[ts_ind] < yesterday]
    
    
            headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
    
            rows = self._convert_rows("sent_transactions", headers, rows)
    
            if inserted_rows:
                max_inserted_rowid = max(r[0] for r in rows)
    
                def insert(txn):
                    self.postgres_store.insert_many_txn(
                        txn, "sent_transactions", headers[1:], rows
                    )
    
                await self.postgres_store.execute(insert)
    
            else:
                max_inserted_rowid = 0
    
    
            def get_start_id(txn):
                txn.execute(
                    "SELECT rowid FROM sent_transactions WHERE ts >= ?"
                    " ORDER BY rowid ASC LIMIT 1",
    
            next_chunk = await self.sqlite_store.execute(get_start_id)
    
            next_chunk = max(max_inserted_rowid + 1, next_chunk)
    
    
            await self.postgres_store.db_pool.simple_insert(
    
                values={
                    "table_name": "sent_transactions",
                    "forward_rowid": next_chunk,
                    "backward_rowid": 0,
    
                    "SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
    
            remaining_count = await self.sqlite_store.execute(get_sent_table_size)
    
            return next_chunk, inserted_rows, total_count
    
        async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
            frows = await self.sqlite_store.execute_sql(
    
                "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
    
            brows = await self.sqlite_store.execute_sql(
    
                "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
    
            return frows[0][0] + brows[0][0]
    
        async def _get_already_ported_count(self, table):
            rows = await self.postgres_store.execute_sql(
    
        async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
            remaining, done = await make_deferred_yieldable(
                defer.gatherResults(
                    [
                        run_in_background(
                            self._get_remaining_count_to_port,
                            table,
                            forward_chunk,
                            backward_chunk,
                        ),
                        run_in_background(self._get_already_ported_count, table),
                    ],
                )
    
            )
    
            remaining = int(remaining) if remaining else 0
            done = int(done) if done else 0
    
    
            return done, remaining + done
    
        async def _setup_state_group_id_seq(self) -> None:
    
            curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
                table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
            )
    
            if not curr_id:
                return
    
    
                txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
    
    
            await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
    
    
        async def _setup_user_id_seq(self) -> None:
    
            curr_id = await self.sqlite_store.db_pool.runInteraction(
                "setup_user_id_seq", find_max_generated_user_id_localpart
            )
    
                txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
    
    
            await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
    
        async def _setup_events_stream_seqs(self) -> None:
    
            """Set the event stream sequences to the correct values.
            """
    
            # We get called before we've ported the events table, so we need to
            # fetch the current positions from the SQLite store.
            curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
                table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True
            )
    
            curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
                table="events",
                keyvalues={},
                retcol="MAX(-MIN(stream_ordering), 1)",
                allow_none=True,
            )
    
            def _setup_events_stream_seqs_set_pos(txn):
                if curr_forward_id:
    
                        "ALTER SEQUENCE events_stream_seq RESTART WITH %s",
                        (curr_forward_id + 1,),
    
                if curr_backward_id:
                    txn.execute(
                        "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
                        (curr_backward_id + 1,),
                    )
    
            await self.postgres_store.db_pool.runInteraction(
    
                "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
    
        async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
            """Set a sequence to the correct value.
    
            current_stream_ids = []
            for stream_id_table in stream_id_tables:
                max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
                    table=stream_id_table,
                    keyvalues={},
                    retcol="COALESCE(MAX(stream_id), 1)",
                    allow_none=True,
                )
                current_stream_ids.append(max_stream_id)
    
            next_id = max(current_stream_ids) + 1
    
            def r(txn):
                sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
                txn.execute(sql + " %s", (next_id, ))
    
            await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)
    
        async def _setup_auth_chain_sequence(self) -> None:
            curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
                table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
            )
    
                    "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
                    (curr_chain_id,),
    
            if curr_chain_id is not None:
                await self.postgres_store.db_pool.runInteraction(
                    "_setup_event_auth_chain_id",
                    r,
                )
    
    Erik Johnston's avatar
    Erik Johnston committed
    ##############################################
    
    Erik Johnston's avatar
    Erik Johnston committed
    ##############################################
    
    
    class Progress(object):
        """Used to report progress of the port
        """
    
    Erik Johnston's avatar
    Erik Johnston committed
        def __init__(self):
            self.tables = {}
    
            self.start_time = int(time.time())
    
        def add_table(self, table, cur, size):
            self.tables[table] = {
                "start": cur,
                "num_done": cur,
                "total": size,
                "perc": int(cur * 100 / size),
            }
    
        def update(self, table, num_done):
            data = self.tables[table]
            data["num_done"] = num_done
            data["perc"] = int(num_done * 100 / data["total"])
    
        def done(self):
            pass
    
    
    class CursesProgress(Progress):
        """Reports progress to a curses window
        """
    
    Erik Johnston's avatar
    Erik Johnston committed
        def __init__(self, stdscr):