Newer
Older
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.enterprise import adbapi
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.storage.engines import create_engine
import argparse
Erik Johnston
committed
import curses
Erik Johnston
committed
import sys
import time
import traceback
logger = logging.getLogger("port_from_sqlite_to_postgres")
BOOLEAN_COLUMNS = {
"events": ["processed", "outlier"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
}
APPEND_ONLY_TABLES = [
"event_content_hashes",
"event_reference_hashes",
"event_signatures",
"event_edge_hashes",
"events",
"event_json",
"state_events",
"room_memberships",
"feedback",
"topics",
"room_names",
"rooms",
"local_media_repository",
"local_media_repository_thumbnails",
"remote_media_cache",
"remote_media_cache_thumbnails",
"redactions",
"event_edges",
"event_auth",
"received_transactions",
"sent_transactions",
"transaction_id_to_pdu",
"users",
"state_groups",
"state_groups_state",
"event_to_state_groups",
"rejections",
]
Erik Johnston
committed
end_error_exec_info = None
Erik Johnston
committed
"""This object is used to pull out some of the convenience API from the
Storage layer.
*All* database interactions should go through this object.
"""
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"]
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
i = 0
N = 5
while True:
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine),
*args, **kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
conn.rollback()
continue
raise
except Exception as e:
Erik Johnston
committed
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise
return self.db_pool.runWithConnection(r)
Erik Johnston
committed
def execute(self, f):
return self.runInteraction(f.__name__, f)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in headers),
", ".join("%s" for _ in headers)
)
try:
txn.executemany(sql, rows)
except:
logger.exception(
"Failed to insert: %s",
table,
)
raise
Erik Johnston
committed
class Progress(object):
"""Used to report progress of the port
"""
def __init__(self):
self.tables = {}
self.start_time = int(time.time())
def add_table(self, table, cur, size):
self.tables[table] = {
"start": cur,
"num_done": cur,
"total": size,
"perc": int(cur * 100 / size),
}
def update(self, table, num_done):
data = self.tables[table]
data["num_done"] = num_done
data["perc"] = int(num_done * 100 / data["total"])
def done(self):
pass
Erik Johnston
committed
class CursesProgress(Progress):
"""Reports progress to a curses window
"""
def __init__(self, stdscr):
self.stdscr = stdscr
Erik Johnston
committed
curses.use_default_colors()
curses.curs_set(0)
Erik Johnston
committed
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
curses.init_pair(1, curses.COLOR_RED, -1)
curses.init_pair(2, curses.COLOR_GREEN, -1)
self.last_update = 0
self.finished = False
super(CursesProgress, self).__init__()
def update(self, table, num_done):
super(CursesProgress, self).update(table, num_done)
self.render()
def render(self, force=False):
now = time.time()
if not force and now - self.last_update < 0.2:
# reactor.callLater(1, self.render)
return
self.stdscr.clear()
rows, cols = self.stdscr.getmaxyx()
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
duration_str = '%02dm %02ds' % (minutes, seconds,)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
else:
min_perc = min(
(v["num_done"] - v["start"]) * 100. / (v["total"] - v["start"])
if v["total"] - v["start"] else 100
for v in self.tables.values()
)
if min_perc > 0:
est_remaining = (int(now) - self.start_time) * 100 / min_perc
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = (
"Time spent: %s (est. remaining: %s)"
% (duration_str, est_remaining_str,)
)
self.stdscr.addstr(
0, 0,
status,
curses.A_BOLD,
Erik Johnston
committed
max_len = max([len(t) for t in self.tables.keys()])
left_margin = 5
middle_space = 1
items = self.tables.items()
items.sort(
key=lambda i: (i[1]["perc"], i[0]),
)
for i, (table, data) in enumerate(items):
if i + 2 >= rows:
break
perc = data["perc"]
color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
self.stdscr.addstr(
i+2, left_margin + max_len - len(table),
table,
curses.A_BOLD | color,
Erik Johnston
committed
size = 20
progress = "[%s%s]" % (
"#" * int(perc*size/100),
" " * (size - int(perc*size/100)),
Erik Johnston
committed
self.stdscr.addstr(
i+2, left_margin + max_len + middle_space,
"%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
Erik Johnston
committed
if self.finished:
self.stdscr.addstr(
Erik Johnston
committed
"Press any key to exit...",
)
Erik Johnston
committed
self.stdscr.refresh()
self.last_update = time.time()
Erik Johnston
committed
def done(self):
self.finished = True
self.render(True)
self.stdscr.getch()
Erik Johnston
committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def on_prepare_sqlite(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing SQLite database...",
curses.A_BOLD,
)
self.stdscr.refresh()
def on_prepare_postgres(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing PostgreSQL database...",
curses.A_BOLD,
)
self.stdscr.refresh()
def fetching_tables(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Fetching tables...",
curses.A_BOLD,
)
self.stdscr.refresh()
def preparing_tables(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing tables...",
curses.A_BOLD,
)
self.stdscr.refresh()
Erik Johnston
committed
class TerminalProgress(Progress):
"""Just prints progress to the terminal
"""
def update(self, table, num_done):
super(TerminalProgress, self).update(table, num_done)
Erik Johnston
committed
data = self.tables[table]
Erik Johnston
committed
print "%s: %d%% (%d/%d)" % (
table, data["perc"],
data["num_done"], data["total"],
)
Erik Johnston
committed
def on_prepare_sqlite(self):
print "Preparing SQLite database..."
Erik Johnston
committed
def on_prepare_postgres(self):
print "Preparing PostgreSQL database..."
Erik Johnston
committed
def fetching_tables(self):
print "Fetching tables..."
Erik Johnston
committed
def preparing_tables(self):
print "Preparing tables..."
Erik Johnston
committed
class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
Erik Johnston
committed
@defer.inlineCallbacks
def handle_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
next_chunk = yield self.postgres_store._simple_select_one_onecol(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcol="rowid",
allow_none=True,
)
Erik Johnston
committed
if next_chunk is None:
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": 1}
)
Erik Johnston
committed
next_chunk = 1
else:
def delete_all(txn):
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
self.postgres_store._simple_insert_txn(
txn,
table="port_from_sqlite3",
Erik Johnston
committed
values={"table_name": table, "rowid": 0}
Erik Johnston
committed
yield self.postgres_store.execute(delete_all)
Erik Johnston
committed
next_chunk = 1
def get_table_size(txn):
txn.execute("SELECT count(*) FROM %s" % (table,))
size, = txn.fetchone()
return int(size)
table_size = yield self.sqlite_store.execute(get_table_size)
postgres_size = yield self.postgres_store.execute(get_table_size)
if not table_size:
Erik Johnston
committed
self.progress.add_table(table, postgres_size, table_size)
Erik Johnston
committed
select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
)
Erik Johnston
committed
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
Erik Johnston
committed
while True:
def r(txn):
txn.execute(select, (next_chunk, self.batch_size,))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
Erik Johnston
committed
return headers, rows
Erik Johnston
committed
headers, rows = yield self.sqlite_store.runInteraction("select", r)
Erik Johnston
committed
if rows:
bool_cols = [
i for i, h in enumerate(headers) if h in bool_col_names
]
next_chunk = rows[-1][0] + 1
def conv(j, col):
if j in bool_cols:
return bool(col)
return col
for i, row in enumerate(rows):
rows[i] = tuple(
self.postgres_store.database_engine.encode_parameter(
conv(j, col)
)
for j, col in enumerate(row)
if j > 0
)
Erik Johnston
committed
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
Erik Johnston
committed
self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
)
Erik Johnston
committed
yield self.postgres_store.execute(insert)
Erik Johnston
committed
postgres_size += len(rows)
Erik Johnston
committed
self.progress.update(table, postgres_size)
else:
return
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
Erik Johnston
committed
database_engine.prepare_database(db_conn)
Erik Johnston
committed
db_conn.commit()
Erik Johnston
committed
@defer.inlineCallbacks
def run(self):
Erik Johnston
committed
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"],
**self.sqlite_config["args"]
Erik Johnston
committed
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"],
**self.postgres_config["args"]
)
sqlite_engine = create_engine("sqlite3")
postgres_engine = create_engine("psycopg2")
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
# Step 1. Set up databases.
self.progress.on_prepare_sqlite()
self.setup_db(sqlite_config, sqlite_engine)
self.progress.on_prepare_postgres()
self.setup_db(postgres_config, postgres_engine)
# Step 2. Get tables.
self.progress.fetching_tables()
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={
"table_schema": "public",
},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
self.progress.preparing_tables()
logger.info("Found %d tables", len(tables))
def create_port_table(txn):
txn.execute(
"CREATE TABLE port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
" rowid bigint NOT NULL"
")"
)
try:
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
# Process tables.
yield defer.gatherResults(
[
self.handle_table(table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
)
self.progress.done()
except:
global end_error_exec_info
end_error_exec_info = sys.exc_info()
logger.exception("")
finally:
reactor.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
Erik Johnston
committed
parser.add_argument("-v", action='store_true')
parser.add_argument("--curses", action='store_true')
parser.add_argument("--sqlite-database")
parser.add_argument(
"--postgres-config", type=argparse.FileType('r'),
Erik Johnston
committed
parser.add_argument("--batch-size", type=int, default=1000)
args = parser.parse_args()
Erik Johnston
committed
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
}
if args.curses:
logging_config["filename"] = "port-synapse.log"
logging.basicConfig(**logging_config)
sqlite_config = {
"name": "sqlite3",
"args": {
"database": args.sqlite_database,
"cp_min": 1,
"cp_max": 1,
"check_same_thread": False,
},
}
postgres_config = yaml.safe_load(args.postgres_config)
Erik Johnston
committed
def start(stdscr=None):
if stdscr:
progress = CursesProgress(stdscr)
else:
progress = TerminalProgress()
porter = Porter(
sqlite_config=sqlite_config,
postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
)
reactor.callWhenRunning(porter.run)
reactor.run()
if args.curses:
curses.wrapper(start)
else:
start()
Erik Johnston
committed
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)