Last active
November 23, 2024 20:54
-
-
Save ahrm/c402a1664c769ac3a113901355983922 to your computer and use it in GitHub Desktop.
update sioyek database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import argparse | |
import sqlite3 | |
from typing import Tuple, Any | |
import uuid | |
def create_arg_parser(): | |
parser = argparse.ArgumentParser( | |
prog='sioyek-database-migrator', | |
) | |
parser.add_argument('--old-shared-db', type=str, help='Path to the old shared.db file') | |
parser.add_argument('--new-shared-db', type=str, help='Path to the new shared.db file') | |
return parser | |
def get_tables(db_conn) -> list[str]: | |
cursor = db_conn.cursor() | |
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
tables = cursor.fetchall() | |
return [table[0] for table in tables] | |
def get_table_columns_with_types(db_conn, table_name) -> list[Tuple[str, str]]: | |
cursor = db_conn.cursor() | |
cursor.execute(f"PRAGMA table_info({table_name});") | |
columns = cursor.fetchall() | |
return [(column[1], column[2]) for column in columns] | |
def get_column_default_value(column_name, column_type): | |
if column_name == 'uuid': | |
return "'{" + str(uuid.uuid4()) + "}'" | |
if column_type == 'timestamp': | |
return 'CURRENT_TIMESTAMP' | |
if column_type == 'REAL': | |
if column_name.startswith('color'): | |
return 0 | |
if column_name == 'zoom_level': | |
return 1 | |
if column_name == 'src_offset_x' or column_name == 'src_offset_end_x' or column_name == 'src_offset_end_y': | |
return 'NULL' | |
else: | |
return -1 | |
if column_type == 'INTEGER': | |
return -1 | |
if column_type == 'BOOLEAN': | |
return 0 | |
if column_type == 'TEXT': | |
return "''" | |
return "''" | |
def migrate_table(old_conn, new_conn, table_name): | |
# copy the values from the old table to the new table | |
# if the column exists in the new table just copy the value from the old table, otherwise use get_column_default_value to get the default value | |
if table_name.startswith('sqlite'): | |
return | |
print(f'Migrating table {table_name}') | |
old_columns = get_table_columns_with_types(old_conn, table_name) | |
new_columns = get_table_columns_with_types(new_conn, table_name) | |
old_values: list[dict[str, str]] = [] | |
cursor = old_conn.cursor() | |
cursor.execute(f"SELECT * FROM {table_name}") | |
rows = cursor.fetchall() | |
for row in rows: | |
old_values.append({old_columns[i][0]: row[i] for i in range(len(row))}) | |
# ignore the id column | |
old_columns = [column for column in old_columns if column[0] != 'id'] | |
new_columns = [column for column in new_columns if column[0] != 'id'] | |
# insert the values into the new table | |
cursor = new_conn.cursor() | |
for old_value in old_values: | |
columns = ', '.join([column[0] for column in new_columns]) | |
values = ', '.join([f"'{old_value[column[0]]}'" if column[0] in old_value else f'{get_column_default_value(column[0], column[1])}' for column in new_columns]) | |
cursor.execute(f"INSERT OR REPLACE INTO {table_name} ({columns}) VALUES ({values})") | |
try: | |
new_conn.commit() | |
print(f'Migrated {len(old_values)} rows from {table_name}') | |
except Exception as e: | |
print(f'Error migrating table {table_name}: {e}') | |
def migrate(old_conn, new_conn): | |
tables = get_tables(old_conn) | |
for table in tables: | |
migrate_table(old_conn, new_conn, table) | |
if __name__ == "__main__": | |
parser = create_arg_parser() | |
args = parser.parse_args(sys.argv[1:]) | |
old_shared_db = args.old_shared_db | |
new_shared_db = args.new_shared_db | |
old_conn = sqlite3.connect(old_shared_db) | |
new_conn = sqlite3.connect(new_shared_db) | |
migrate(old_conn, new_conn) | |
old_conn.close() | |
new_conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment