Last active
March 21, 2018 06:41
-
-
Save myleshk/d2a3770a3ed6d563ddb0805b8b3bc850 to your computer and use it in GitHub Desktop.
DB Migration Tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.ini |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[default] | |
user = user | |
password = password | |
host = localhost | |
database = edxapp_event |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import mysql.connector.errors as errors | |
import re | |
import csv | |
from os import path | |
from datetime import datetime as dt | |
import configparser | |
def current_ts(): | |
return dt.strftime(dt.utcnow(), "%Y-%m-%d %H:%M:%S") | |
def _reload_course_id_dict(): | |
cursor.execute("SELECT `course_id_index`,`course_id` FROM `edxapp_event`.`course_ids`;") | |
course_id_dict = {} | |
for course_id_index, course_id in cursor: | |
course_id_dict[course_id] = course_id_index | |
return course_id_dict | |
# check / create course_ids table | |
def course_ids_table(): | |
global course_dict | |
try: | |
cursor.execute("DESCRIBE course_ids;") | |
cursor.fetchall() | |
except errors.ProgrammingError: | |
print("Table `course_ids` doesn't exist, creating") | |
# we create this table | |
query = """CREATE TABLE `course_ids` ( | |
`course_id_index` smallint(5) unsigned NOT NULL AUTO_INCREMENT, | |
`course_id` varchar(64) NOT NULL, | |
`course_id_last_update` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, | |
PRIMARY KEY (`course_id_index`), | |
UNIQUE KEY `course_id` (`course_id`) | |
) ENGINE=InnoDB CHARSET=utf8mb4""" | |
cursor.execute(query) | |
print("Good! Table `course_id` exist, continue") | |
# we fill in the course_ids table first | |
# get existing course_id values in course_ids table | |
course_dict = _reload_course_id_dict() | |
print("Found {} existing course ids in table `course_ids`\n".format(len(course_dict.keys()))) | |
# get course_id values | |
cursor.execute("SELECT DISTINCT `course_id` FROM `edxapp_event`.`tracking`;") | |
course_id_rp = re.compile(r"course-v1:[^+]+\+[^+]+\+[^+]+$") | |
course_ids = [] | |
for course_id, in cursor: | |
if not course_id_rp.match(course_id): | |
# print('Illegal cid: "{}"'.format(course_id)) | |
continue | |
course_id_index = course_dict.get(course_id) | |
if not course_id_index: | |
course_ids.append(course_id) | |
print("\n{} new course ids to insert".format(len(course_ids))) | |
# fill course_ids table | |
if course_ids: | |
query = "INSERT INTO course_ids (course_id) VALUES {};".format( | |
','.join(map(lambda x: "(%s)", course_ids))) | |
cursor.execute(query, course_ids) | |
cnx.commit() | |
print("Insert to table `course_ids` complete") | |
# reload course_id_dict | |
course_dict = _reload_course_id_dict() | |
print("Reloaded {} course ids from table `course_ids`\n".format(len(course_dict.keys()))) | |
def _create_tmp_table(table_name, tmp_table): | |
""" | |
:rtype: bool: if tmp table is just created | |
""" | |
# create new table | |
if table_name == "tracking": | |
create_query = """CREATE TABLE `{}` ( | |
`id` int(11) unsigned NOT NULL AUTO_INCREMENT, | |
`user_id` int(10) unsigned NOT NULL, | |
`course_id_index` smallint(5) unsigned DEFAULT NULL, | |
`time` datetime NOT NULL, | |
PRIMARY KEY (`id`), | |
KEY `user_id` (`user_id`) | |
) ENGINE=MyISAM DEFAULT CHARSET=utf8;""".format(tmp_table) | |
elif table_name == "tracking_video": | |
create_query = """CREATE TABLE `{}` ( | |
`id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, | |
`user_id` INT(10) UNSIGNED NOT NULL, | |
`event_type` enum('ping_video','play_video','pause_video','seek_video','stop_video','load_video','video_show_cc_menu','video_hide_cc_menu','speed_change_video','ping_video_dash') DEFAULT NULL, | |
`time` DATETIME NOT NULL, | |
`course_id_index` smallint(5) unsigned DEFAULT NULL, | |
`unit_id` VARCHAR(128) NOT NULL, | |
`current_time` MEDIUMINT(9) UNSIGNED NOT NULL, | |
`old_time` MEDIUMINT(9) UNSIGNED NOT NULL, | |
`new_time` MEDIUMINT(9) UNSIGNED NOT NULL, | |
PRIMARY KEY (`id`), | |
KEY `user_id` (`user_id`), | |
KEY `unit_id` (`unit_id`), | |
KEY `time` (`time`) | |
) ENGINE=MyISAM DEFAULT CHARSET=utf8;""".format(tmp_table) | |
else: | |
raise Exception("Wrong table name {}".format(table_name)) | |
try: | |
cursor.execute("DESCRIBE {};".format(tmp_table)) | |
cursor.fetchall() | |
return False | |
except errors.ProgrammingError: | |
print("Table `{}` doesn't exist, creating".format(tmp_table)) | |
cursor.execute(create_query) | |
print("Table `{}` created".format(tmp_table)) | |
return True | |
def copy_table_to_tmp(dump_filename, tmp_table, table, old_columns, new_columns, read_bulk_count=100000, | |
write_bulk_count=50000): | |
""" | |
Actually not only copy, but replace course_id with course_id_index | |
:param old_columns: list, the first column must be course_id | |
:param new_columns: list, the first column must be course_id_index. The length should match old_columns | |
""" | |
column_length = len(new_columns) | |
# prepare columns by quoting them | |
old_columns = list(map(lambda x: "`{}`".format(x), old_columns)) | |
new_columns = list(map(lambda x: "`{}`".format(x), new_columns)) | |
# initiate with fallback value as 0 | |
last_old_id = 0 | |
if not _create_tmp_table(table, tmp_table): | |
# table already exists, check existing records | |
cursor.execute("SELECT {} FROM {} ORDER BY `id` DESC LIMIT 1".format(','.join(new_columns), tmp_table)) | |
record = cursor.fetchone() | |
if record: | |
# we only do the following if there is record in tmp table | |
record = list(record) | |
course_id_index = record[0] | |
for course_id in course_dict: | |
this_course_id_index = course_dict[course_id] | |
if course_id_index == this_course_id_index: | |
break | |
assert course_id | |
# check where we left and continue | |
record[0] = course_id | |
where_clause_list = [] | |
for column in old_columns: | |
where_clause_list.append("{}=%s".format(column)) | |
query = "SELECT `id` FROM {} WHERE {} ORDER BY `id` LIMIT 1".format(table, ' AND '.join(where_clause_list)) | |
cursor.execute(query, record) | |
try: | |
last_old_id = cursor.fetchone()[0] | |
print("Last copied record is `id`={} in table `{}`".format(last_old_id, table)) | |
except TypeError: | |
print( | |
"WARNING: last match record not found for table `{}`. Fine, we start from beginning".format(table)) | |
if path.isfile(dump_filename): | |
print("WARNING: {} file exist, overwriting".format(dump_filename)) | |
where_clause = " WHERE `id`>{}".format(last_old_id) if last_old_id else "" | |
# process and dump tracking table data | |
cursor.execute("SELECT {} FROM {}{};".format(','.join(old_columns), table, where_clause)) | |
with open(dump_filename, 'w') as f: | |
writer = csv.writer(f) | |
write_buf = [] | |
count = 0 | |
for row in cursor: | |
row = list(row) # tuple does not support item assignment | |
cid_index = course_dict.get(row[0]) | |
if cid_index: | |
row[0] = cid_index | |
write_buf.append(row) | |
else: | |
# Invalid course_id | |
continue | |
count += 1 | |
if count % read_bulk_count is 0: | |
writer.writerows(write_buf) | |
write_buf = [] | |
print("{} - Dumped {} rows for {}".format(current_ts(), count, table)) | |
if write_buf: | |
writer.writerows(write_buf) | |
print("{} - Finished dumping {} rows for {}".format(current_ts(), count, table)) | |
# fill tracking_tmp table with records | |
insert_data_buf = [] | |
cursor.execute("LOCK TABLES `{}` WRITE;".format(tmp_table)) | |
with open(dump_filename, 'r') as f: | |
reader = csv.reader(f) | |
count = 0 | |
total_count = 0 | |
for row in reader: | |
insert_data_buf.extend(row) | |
count += 1 | |
total_count += 1 | |
if count >= write_bulk_count: | |
assert count == len( | |
insert_data_buf) / column_length # to make sure nothing wired happened before inserting | |
query = "INSERT INTO {} ({}) VALUES {};".format(tmp_table, ','.join(new_columns), ','.join( | |
["({})".format(','.join(['%s'] * column_length))] * count)) | |
try: | |
cursor.execute(query, insert_data_buf) | |
except errors.ProgrammingError as e: | |
print(query) | |
print(insert_data_buf) | |
raise e | |
print("{} - Inserted {} rows to {}".format(current_ts(), total_count, tmp_table)) | |
count = 0 | |
insert_data_buf = [] | |
if count: | |
# insert leftover | |
# to make sure nothing wired happened before inserting | |
assert count == len(insert_data_buf) / column_length | |
query = "INSERT INTO {} ({}) VALUES {};".format(tmp_table, ','.join(new_columns), ','.join( | |
["({})".format(','.join(['%s'] * column_length))] * count)) | |
cursor.execute(query, insert_data_buf) | |
print("{} - Finished inserting {} rows to {}".format(current_ts(), total_count, tmp_table)) | |
cursor.execute("UNLOCK TABLES;") | |
cnx.commit() | |
######################################################################################################################## | |
################################################## RUN ################################################ | |
######################################################################################################################## | |
print("Script starts at {}".format(current_ts())) | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
course_dict = {} | |
course_ids_table() | |
# tracking_table | |
copy_table_to_tmp(dump_filename="tracking.txt", tmp_table="tracking_tmp", table="tracking", | |
old_columns=['course_id', 'user_id', 'time'], | |
new_columns=['course_id_index', 'user_id', 'time'], read_bulk_count=1000000, write_bulk_count=50000) | |
# tracking_video_table | |
copy_table_to_tmp(dump_filename="tracking_video.txt", tmp_table="tracking_video_tmp", table="tracking_video", | |
old_columns=['course_id', 'user_id', 'event_type', 'time', 'unit_id', 'current_time', 'old_time', | |
'new_time'], | |
new_columns=['course_id_index', 'user_id', 'event_type', 'time', 'unit_id', 'current_time', | |
'old_time', 'new_time'], read_bulk_count=500000, write_bulk_count=20000) | |
cursor.close() | |
cnx.close() | |
print("Script finishes at {}".format(current_ts())) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
######################################################################################################################## | |
################################################## RUN ################################################ | |
######################################################################################################################## | |
print("Start") | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
print("Renaming tracking") | |
cursor.execute("RENAME TABLE tracking TO tracking_backup;") | |
cursor.execute("RENAME TABLE tracking_tmp TO tracking;") | |
print("Renaming tracking_video") | |
cursor.execute("RENAME TABLE tracking_video TO tracking_video_backup;") | |
cursor.execute("RENAME TABLE tracking_video_tmp TO tracking_video;") | |
cursor.close() | |
cnx.close() | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
TOOD: we should not hardcode, but generate ENUM options for
tracking_video
table, b/c this is only tested on Dogwood. Might not be sufficient for Gingko onwards.