Skip to content
Snippets Groups Projects
Commit 63677d1f authored by Erik Johnston's avatar Erik Johnston
Browse files

Change port script to work with postgres

parent 127fad17
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,7 @@ from synapse.storage.engines import create_engine ...@@ -22,6 +22,7 @@ from synapse.storage.engines import create_engine
import argparse import argparse
import itertools import itertools
import logging import logging
import types
import yaml import yaml
...@@ -51,6 +52,14 @@ UNICODE_COLUMNS = { ...@@ -51,6 +52,14 @@ UNICODE_COLUMNS = {
} }
BOOLEAN_COLUMNS = {
"events": ["processed", "outlier"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
}
APPEND_ONLY_TABLES = [ APPEND_ONLY_TABLES = [
"event_content_hashes", "event_content_hashes",
"event_reference_hashes", "event_reference_hashes",
...@@ -126,24 +135,22 @@ class Store(object): ...@@ -126,24 +135,22 @@ class Store(object):
return self.db_pool.runWithConnection(r) return self.db_pool.runWithConnection(r)
def insert_many(self, table, headers, rows): def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % ( sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table, table,
", ".join(k for k in headers), ", ".join(k for k in headers),
", ".join("%s" for _ in headers) ", ".join("%s" for _ in headers)
) )
def t(txn): try:
try: txn.executemany(sql, rows)
txn.executemany(sql, rows) except:
except: logger.exception(
logger.exception( "Failed to insert: %s",
"Failed to insert: %s", table,
table, )
) raise
raise
return self.runInteraction("insert_many", t)
def chunks(n): def chunks(n):
...@@ -175,7 +182,7 @@ def handle_table(table, sqlite_store, mysql_store): ...@@ -175,7 +182,7 @@ def handle_table(table, sqlite_store, mysql_store):
"DELETE FROM port_from_sqlite3 WHERE table_name = %s", "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,) (table,)
) )
txn.execute("DELETE FROM %s" % (table,)) txn.execute("TRUNCATE %s CASCADE" % (table,))
mysql_store._simple_insert_txn( mysql_store._simple_insert_txn(
txn, txn,
table="port_from_sqlite3", table="port_from_sqlite3",
...@@ -188,14 +195,15 @@ def handle_table(table, sqlite_store, mysql_store): ...@@ -188,14 +195,15 @@ def handle_table(table, sqlite_store, mysql_store):
next_chunk = 0 next_chunk = 0
logger.info("next_chunk for %s: %d", table, next_chunk)
N = 5000 N = 5000
select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
uni_col_names = UNICODE_COLUMNS.get(table, []) uni_col_names = UNICODE_COLUMNS.get(table, [])
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
def conv_uni(c): bin_col_names = BINARY_COLUMNS.get(table, [])
return sqlite_store.database_engine.load_unicode(c)
while True: while True:
def r(txn): def r(txn):
...@@ -211,24 +219,42 @@ def handle_table(table, sqlite_store, mysql_store): ...@@ -211,24 +219,42 @@ def handle_table(table, sqlite_store, mysql_store):
if rows: if rows:
uni_cols = [i for i, h in enumerate(headers) if h in uni_col_names] uni_cols = [i for i, h in enumerate(headers) if h in uni_col_names]
bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
bin_cols = [i for i, h in enumerate(headers) if h in bin_col_names]
next_chunk = rows[-1][0] + 1 next_chunk = rows[-1][0] + 1
def conv(j, col):
if j in uni_cols:
col = sqlite_store.database_engine.load_unicode(col)
if j in bool_cols:
return bool(col)
if j in bin_cols:
if isinstance(col, types.UnicodeType):
col = buffer(col.encode("utf8"))
return col
for i, row in enumerate(rows): for i, row in enumerate(rows):
rows[i] = tuple( rows[i] = tuple(
mysql_store.database_engine.encode_parameter( mysql_store.database_engine.encode_parameter(
conv_uni(col) if j in uni_cols else col conv(j, col)
) )
for j, col in enumerate(row) for j, col in enumerate(row)
if j > 0 if j > 0
) )
yield mysql_store.insert_many(table, headers[1:], rows) def ins(txn):
mysql_store.insert_many_txn(txn, table, headers[1:], rows)
yield mysql_store._simple_update_one( mysql_store._simple_update_one_txn(
table="port_from_sqlite3", txn,
keyvalues={"table_name": table}, table="port_from_sqlite3",
updatevalues={"rowid": next_chunk}, keyvalues={"table_name": table},
) updatevalues={"rowid": next_chunk},
)
yield mysql_store.runInteraction("insert_many", ins)
else: else:
return return
...@@ -260,7 +286,7 @@ def main(sqlite_config, mysql_config): ...@@ -260,7 +286,7 @@ def main(sqlite_config, mysql_config):
) )
sqlite_engine = create_engine("sqlite3") sqlite_engine = create_engine("sqlite3")
mysql_engine = create_engine("mysql.connector") mysql_engine = create_engine("psycopg2")
sqlite_store = Store(sqlite_db_pool, sqlite_engine) sqlite_store = Store(sqlite_db_pool, sqlite_engine)
mysql_store = Store(mysql_db_pool, mysql_engine) mysql_store = Store(mysql_db_pool, mysql_engine)
...@@ -285,20 +311,19 @@ def main(sqlite_config, mysql_config): ...@@ -285,20 +311,19 @@ def main(sqlite_config, mysql_config):
logger.info("Found %d tables", len(tables)) logger.info("Found %d tables", len(tables))
def create_port_table(txn): def create_port_table(txn):
try: txn.execute(
txn.execute( "CREATE TABLE port_from_sqlite3 ("
"CREATE TABLE port_from_sqlite3 (" " table_name varchar(100) NOT NULL UNIQUE,"
" `table_name` varchar(100) NOT NULL UNIQUE," " rowid bigint NOT NULL"
" `rowid` bigint unsigned NOT NULL" ")"
")" )
)
except mysql_engine.module.DatabaseError as e:
if e.errno != mysql_engine.module.errorcode.ER_TABLE_EXISTS_ERROR:
raise
yield mysql_store.runInteraction( try:
"create_port_table", create_port_table yield mysql_store.runInteraction(
) "create_port_table", create_port_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
# Process tables. # Process tables.
yield defer.gatherResults( yield defer.gatherResults(
...@@ -342,17 +367,12 @@ if __name__ == "__main__": ...@@ -342,17 +367,12 @@ if __name__ == "__main__":
} }
mysql_config = yaml.safe_load(args.mysql_config) mysql_config = yaml.safe_load(args.mysql_config)
mysql_config["args"].update({ # mysql_config["args"].update({
"sql_mode": "TRADITIONAL", # "sql_mode": "TRADITIONAL",
"charset": "utf8mb4", # "charset": "utf8mb4",
"use_unicode": True, # "use_unicode": True,
"collation": "utf8mb4_bin", # "collation": "utf8mb4_bin",
}) # })
import codecs
codecs.register(
lambda name: codecs.lookup('utf8') if name == "utf8mb4" else None
)
reactor.callWhenRunning( reactor.callWhenRunning(
main, main,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment