Newer
Older
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.enterprise import adbapi
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.storage.engines import create_engine
import argparse
Erik Johnston
committed
import curses
Erik Johnston
committed
import sys
import time
import traceback
logger = logging.getLogger("port_from_sqlite_to_postgres")
BOOLEAN_COLUMNS = {
"events": ["processed", "outlier"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
}
APPEND_ONLY_TABLES = [
"event_content_hashes",
"event_reference_hashes",
"event_signatures",
"event_edge_hashes",
"events",
"event_json",
"state_events",
"room_memberships",
"feedback",
"topics",
"room_names",
"rooms",
"local_media_repository",
"local_media_repository_thumbnails",
"remote_media_cache",
"remote_media_cache_thumbnails",
"redactions",
"event_edges",
"event_auth",
"received_transactions",
"sent_transactions",
"transaction_id_to_pdu",
"users",
"state_groups",
"state_groups_state",
"event_to_state_groups",
"rejections",
]
Erik Johnston
committed
end_error_exec_info = None
Erik Johnston
committed
"""This object is used to pull out some of the convenience API from the
Storage layer.
*All* database interactions should go through this object.
"""
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"]
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
i = 0
N = 5
while True:
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine),
*args, **kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
conn.rollback()
continue
raise
except Exception as e:
Erik Johnston
committed
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise
return self.db_pool.runWithConnection(r)
def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
Erik Johnston
committed
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in headers),
", ".join("%s" for _ in headers)
)
try:
txn.executemany(sql, rows)
except:
logger.exception(
"Failed to insert: %s",
table,
)
raise
Erik Johnston
committed
class Progress(object):
"""Used to report progress of the port
"""
def __init__(self):
self.tables = {}
self.start_time = int(time.time())
def add_table(self, table, cur, size):
self.tables[table] = {
"start": cur,
"num_done": cur,
"total": size,
"perc": int(cur * 100 / size),
}
def update(self, table, num_done):
data = self.tables[table]
data["num_done"] = num_done
data["perc"] = int(num_done * 100 / data["total"])
def done(self):
pass
Erik Johnston
committed
class CursesProgress(Progress):
"""Reports progress to a curses window
"""
def __init__(self, stdscr):
self.stdscr = stdscr
Erik Johnston
committed
curses.use_default_colors()
curses.curs_set(0)
Erik Johnston
committed
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
curses.init_pair(1, curses.COLOR_RED, -1)
curses.init_pair(2, curses.COLOR_GREEN, -1)
self.last_update = 0
self.finished = False
super(CursesProgress, self).__init__()
def update(self, table, num_done):
super(CursesProgress, self).update(table, num_done)
self.render()
def render(self, force=False):
now = time.time()
if not force and now - self.last_update < 0.2:
# reactor.callLater(1, self.render)
return
self.stdscr.clear()
rows, cols = self.stdscr.getmaxyx()
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
duration_str = '%02dm %02ds' % (minutes, seconds,)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
else:
min_perc = min(
(v["num_done"] - v["start"]) * 100. / (v["total"] - v["start"])
if v["total"] - v["start"] else 100
for v in self.tables.values()
)
if min_perc > 0:
est_remaining = (int(now) - self.start_time) * 100 / min_perc
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = (
"Time spent: %s (est. remaining: %s)"
% (duration_str, est_remaining_str,)
)
self.stdscr.addstr(
0, 0,
status,
curses.A_BOLD,
Erik Johnston
committed
max_len = max([len(t) for t in self.tables.keys()])
left_margin = 5
middle_space = 1
items = self.tables.items()
items.sort(
key=lambda i: (i[1]["perc"], i[0]),
)
for i, (table, data) in enumerate(items):
if i + 2 >= rows:
break
perc = data["perc"]
color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
self.stdscr.addstr(
i+2, left_margin + max_len - len(table),
table,
curses.A_BOLD | color,
Erik Johnston
committed
size = 20
progress = "[%s%s]" % (
"#" * int(perc*size/100),
" " * (size - int(perc*size/100)),
Erik Johnston
committed
self.stdscr.addstr(
i+2, left_margin + max_len + middle_space,
"%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
Erik Johnston
committed
if self.finished:
self.stdscr.addstr(
Erik Johnston
committed
"Press any key to exit...",
)
Erik Johnston
committed
self.stdscr.refresh()
self.last_update = time.time()
Erik Johnston
committed
def done(self):
self.finished = True
self.render(True)
self.stdscr.getch()
Erik Johnston
committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def on_prepare_sqlite(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing SQLite database...",
curses.A_BOLD,
)
self.stdscr.refresh()
def on_prepare_postgres(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing PostgreSQL database...",
curses.A_BOLD,
)
self.stdscr.refresh()
def fetching_tables(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Fetching tables...",
curses.A_BOLD,
)
self.stdscr.refresh()
def preparing_tables(self):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
"Preparing tables...",
curses.A_BOLD,
)
self.stdscr.refresh()
Erik Johnston
committed
class TerminalProgress(Progress):
"""Just prints progress to the terminal
"""
def update(self, table, num_done):
super(TerminalProgress, self).update(table, num_done)
Erik Johnston
committed
data = self.tables[table]
Erik Johnston
committed
print "%s: %d%% (%d/%d)" % (
table, data["perc"],
data["num_done"], data["total"],
)
Erik Johnston
committed
def on_prepare_sqlite(self):
print "Preparing SQLite database..."
Erik Johnston
committed
def on_prepare_postgres(self):
print "Preparing PostgreSQL database..."
Erik Johnston
committed
def fetching_tables(self):
print "Fetching tables..."
Erik Johnston
committed
def preparing_tables(self):
print "Preparing tables..."
Erik Johnston
committed
class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def convert_rows(self, table, headers, rows):
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
bool_cols = [
i for i, h in enumerate(headers) if h in bool_col_names
]
def conv(j, col):
if j in bool_cols:
return bool(col)
return col
for i, row in enumerate(rows):
rows[i] = tuple(
self.postgres_store.database_engine.encode_parameter(
conv(j, col)
)
for j, col in enumerate(row)
if j > 0
)
Erik Johnston
committed
@defer.inlineCallbacks
def handle_table(self, table):
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def delete_all(txn):
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
def get_table_size(txn):
txn.execute("SELECT count(*) FROM %s" % (table,))
size, = txn.fetchone()
return int(size)
if table == "sent_transactions":
# This is a big table, and we really only need some of the recent
# data
yield self.postgres_store.execute(delete_all)
# Only save things from the last day
yesterday = 1429114568820 #int(time.time()*1000) - 86400000
# And save the max transaction id from each destination
select = (
"SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
"SELECT max(rowid) FROM sent_transactions"
" GROUP BY destination"
")"
)
def r(txn):
txn.execute(select)
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
ts_ind = headers.index('ts')
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.runInteraction(
"select", r,
)
self.convert_rows(table, headers, rows)
inserted_rows = len(rows)
max_inserted_rowid = max(r[0] for r in rows)
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
yield self.postgres_store.execute(insert)
def get_start_id(txn):
txn.execute(
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
" ORDER BY rowid ASC LIMIT 1",
(yesterday,)
)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return 1
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": next_chunk}
)
def get_sent_table_size(txn):
txn.execute(
"SELECT count(*) FROM sent_transactions"
" WHERE ts >= ?",
(yesterday,)
)
size, = txn.fetchone()
return int(size)
table_size = yield self.sqlite_store.execute(
get_sent_table_size
)
table_size += inserted_rows
elif table in APPEND_ONLY_TABLES:
Erik Johnston
committed
# It's safe to just carry on inserting.
next_chunk = yield self.postgres_store._simple_select_one_onecol(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcol="rowid",
allow_none=True,
)
Erik Johnston
committed
if next_chunk is None:
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": 1}
)
Erik Johnston
committed
next_chunk = 1
table_size = yield self.sqlite_store.execute(get_table_size)
Erik Johnston
committed
else:
yield self.postgres_store.execute(delete_all)
self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": 0}
)
table_size = yield self.sqlite_store.execute(get_table_size)
Erik Johnston
committed
next_chunk = 1
postgres_size = yield self.postgres_store.execute(get_table_size)
if not table_size:
Erik Johnston
committed
self.progress.add_table(table, postgres_size, table_size)
Erik Johnston
committed
select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
)
Erik Johnston
committed
while True:
def r(txn):
txn.execute(select, (next_chunk, self.batch_size,))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
Erik Johnston
committed
return headers, rows
Erik Johnston
committed
headers, rows = yield self.sqlite_store.runInteraction("select", r)
Erik Johnston
committed
if rows:
next_chunk = rows[-1][0] + 1
self.convert_rows(table, headers, rows)
Erik Johnston
committed
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
Erik Johnston
committed
self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
)
Erik Johnston
committed
yield self.postgres_store.execute(insert)
Erik Johnston
committed
postgres_size += len(rows)
Erik Johnston
committed
self.progress.update(table, postgres_size)
else:
return
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
Erik Johnston
committed
database_engine.prepare_database(db_conn)
Erik Johnston
committed
db_conn.commit()
Erik Johnston
committed
@defer.inlineCallbacks
def run(self):
Erik Johnston
committed
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"],
**self.sqlite_config["args"]
Erik Johnston
committed
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"],
**self.postgres_config["args"]
)
sqlite_engine = create_engine("sqlite3")
postgres_engine = create_engine("psycopg2")
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
# Step 1. Set up databases.
self.progress.on_prepare_sqlite()
self.setup_db(sqlite_config, sqlite_engine)
self.progress.on_prepare_postgres()
self.setup_db(postgres_config, postgres_engine)
# Step 2. Get tables.
self.progress.fetching_tables()
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={
"table_schema": "public",
},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
self.progress.preparing_tables()
logger.info("Found %d tables", len(tables))
def create_port_table(txn):
txn.execute(
"CREATE TABLE port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
" rowid bigint NOT NULL"
")"
)
try:
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
# Process tables.
yield defer.gatherResults(
[
self.handle_table(table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
)
self.progress.done()
except:
global end_error_exec_info
end_error_exec_info = sys.exc_info()
logger.exception("")
finally:
reactor.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
Erik Johnston
committed
parser.add_argument("-v", action='store_true')
parser.add_argument("--curses", action='store_true')
parser.add_argument("--sqlite-database")
parser.add_argument(
"--postgres-config", type=argparse.FileType('r'),
Erik Johnston
committed
parser.add_argument("--batch-size", type=int, default=1000)
args = parser.parse_args()
Erik Johnston
committed
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
}
if args.curses:
logging_config["filename"] = "port-synapse.log"
logging.basicConfig(**logging_config)
sqlite_config = {
"name": "sqlite3",
"args": {
"database": args.sqlite_database,
"cp_min": 1,
"cp_max": 1,
"check_same_thread": False,
},
}
postgres_config = yaml.safe_load(args.postgres_config)
Erik Johnston
committed
def start(stdscr=None):
if stdscr:
progress = CursesProgress(stdscr)
else:
progress = TerminalProgress()
porter = Porter(
sqlite_config=sqlite_config,
postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
)
reactor.callWhenRunning(porter.run)
reactor.run()
if args.curses:
curses.wrapper(start)
else:
start()
Erik Johnston
committed
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)