Last active
March 20, 2018 10:23
-
-
Save myleshk/9293501b0737e75b218e1f2ac0c7c537 to your computer and use it in GitHub Desktop.
Fill dummy records into tracking_video_pings_per_hour so its records for old videos matches table tracking_video_pings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[default] | |
user = user | |
password = password | |
host = localhost | |
database = edxapp_event |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
import csv | |
def check_and_bulk_insert(bulk_count=5000): | |
global insert_data_buffer | |
global global_count | |
buffer_length = len(insert_data_buffer) | |
if buffer_length >= bulk_count: | |
query = ("INSERT INTO `tracking_video_pings_per_hour` (`user_id`, `time`, `course_id`, `unit_id`, `pings`)" | |
"VALUES {};").format(','.join(["({})".format(','.join(['%s'] * 5))] * buffer_length)) | |
data = [] | |
for insert_data in insert_data_buffer: | |
data.extend(insert_data) | |
try: | |
cursor.execute(query, data) | |
cnx.commit() | |
except Exception as e: | |
print("Error") | |
raise e | |
del insert_data_buffer[:] | |
global_count += buffer_length | |
print("Inserted {}".format(global_count)) | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
global_count = 0 | |
# get records from csv | |
insert_data_buffer = [] | |
fake_time = '2000-01-01 00:00:00' | |
with open("mismatch.csv", 'r') as f: | |
reader = csv.reader(f) | |
for user_id, course_id, unit_id, pings, new_pings, last_record_time in reader: | |
pings = int(pings) | |
new_pings = int(new_pings) | |
insert_data_buffer.append([user_id, fake_time, course_id, unit_id, pings]) | |
check_and_bulk_insert() | |
# insert leftovers | |
check_and_bulk_insert(1) | |
cursor.close() | |
cnx.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
cursor.close() | |
cnx.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
import csv | |
import re | |
def create_unique_id(user_id, course_id, unit_id): | |
return '::'.join(["{}"] * 3).format(user_id, course_id, unit_id) | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
# get records from new table | |
with_fake = True | |
if with_fake: | |
cursor.execute( | |
"SELECT `user_id`,`course_id`,`unit_id`, SUM(`pings`) AS `pings` FROM `tracking_video_pings_per_hour` GROUP BY `user_id`,`course_id`,`unit_id`;") | |
else: | |
cursor.execute( | |
"SELECT `user_id`,`course_id`,`unit_id`, SUM(`pings`) AS `pings` FROM `tracking_video_pings_per_hour` WHERE " | |
"`time`>'2001-01-01' GROUP BY `user_id`,`course_id`,`unit_id`;") | |
new_buf = {} | |
for user_id, course_id, unit_id, pings in cursor: | |
uniq_id = create_unique_id(user_id, course_id, unit_id) | |
new_buf[uniq_id] = pings | |
print("{} records found from table tracking_video_pings_per_hour".format(len(new_buf.keys()))) | |
# load last record time | |
# SELECT max(`id`) AS `id`, max(`time`) AS `time`,`course_id`, `unit_id`, `user_id` FROM `tracking_video` `t` JOIN `course_ids` `c` ON t.`course_id_index`=c.`course_id_index` WHERE `event_type`='ping_video' GROUP BY c.`course_id_index`, `unit_id`, `user_id`; | |
last_dict = {} | |
with open("video_last.csv", 'r') as f: | |
reader = csv.reader(f) | |
next(reader) | |
for t_id, time, course_id, unit_id, user_id in reader: | |
uniq_id = create_unique_id(user_id, course_id, unit_id) | |
last_dict[uniq_id] = time | |
# get old records | |
cursor.execute("SELECT `user_id`,`course_id`,`unit_id`, `pings` FROM `tracking_video_pings`;") | |
mismatch_buf = [] | |
course_id_rp = re.compile(r"course-v1:[^+]+\+[^+]+\+[^+]+$") | |
for user_id, course_id, unit_id, pings in cursor: | |
if not course_id_rp.match(course_id): | |
# print('Illegal cid: "{}"'.format(course_id)) | |
continue | |
uniq_id = create_unique_id(user_id, course_id, unit_id) | |
new_pings = new_buf.get(uniq_id) | |
if new_pings: | |
del new_buf[uniq_id] | |
else: | |
# print("No new record found for {}".format(uniq_id)) | |
new_pings = 0 | |
if pings != new_pings: | |
# print("{} != {}".format(pings, new_pings)) | |
if abs(pings - new_pings) <= 10: | |
# print("Delta small as {}, skip".format(pings - new_pings)) | |
continue | |
if new_pings > pings: | |
print("New data {} > {}, {}, skip".format(new_pings, pings, uniq_id)) | |
continue | |
last_record_time = last_dict.get(uniq_id) | |
mismatch_buf.append([user_id, course_id, unit_id, pings, new_pings, last_record_time]) | |
if new_buf: | |
print("{} new records no old match".format(len(new_buf.keys()))) | |
with open("mismatch.csv", 'w') as f: | |
writer = csv.writer(f) | |
writer.writerows(mismatch_buf) | |
cursor.close() | |
cnx.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
import csv | |
def create_unique_id(user_id, hour, course_id, unit_id): | |
return '::'.join(["{}"] * 4).format(user_id, hour, course_id, unit_id) | |
def check_and_bulk_insert(bulk_count=5000): | |
global insert_data_buffer | |
global global_count | |
buffer_length = len(insert_data_buffer) | |
if buffer_length >= bulk_count: | |
query = ( | |
"INSERT INTO `tracking_video_pings_per_hour` (`user_id`, `time`, `course_id`, `unit_id`, `pings`) " | |
"VALUES {} ON DUPLICATE KEY UPDATE `pings`=VALUES(`pings`);" | |
).format(','.join(["({})".format(','.join(['%s'] * 5))] * buffer_length)) | |
data = [] | |
for insert_data in insert_data_buffer: | |
data.extend(insert_data) | |
try: | |
# print(query) | |
# print(data) | |
cursor.execute(query, data) | |
cnx.commit() | |
except Exception as e: | |
print("Error") | |
raise e | |
del insert_data_buffer[:] | |
global_count += buffer_length | |
print("Inserted {}".format(global_count)) | |
global_count = 0 | |
insert_data_buffer = [] | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
# get records from per_hour table | |
cursor.execute( | |
"SELECT `user_id`,substr(`time`,1,13) AS `hour`,`course_id`,`unit_id`,`pings` FROM `tracking_video_pings_per_hour`;") | |
per_hour_buf = {} | |
for user_id, hour, course_id, unit_id, pings in cursor: | |
uniq_id = create_unique_id(user_id, hour, course_id, unit_id) | |
per_hour_buf[uniq_id] = pings | |
# this csv file is generated by manually run | |
# SELECT `user_id`,`hour`,`course_id`,`unit_id`,count(*) FROM (SELECT `user_id`, SUBSTR(`TIME`,1,13) AS `HOUR`, `course_id`,`unit_id` FROM `tracking_video` `t` JOIN `course_ids` `c` ON t.`course_id_index`=c.`course_id_index` WHERE `event_type`='ping_video') `s` GROUP BY `hour`,`course_id`,`unit_id`,`user_id`; | |
# We exclude it from this script b/c it is a really slow query and we don't want it re-runned in case this script fails. | |
with open("recal.csv", 'r') as f: | |
reader = csv.reader(f) | |
next(reader) | |
for user_id, hour, course_id, unit_id, pings in reader: | |
# "user_id","hour","course_id","unit_id","pings" | |
uniq_id = create_unique_id(user_id, hour, course_id, unit_id) | |
old_pings = per_hour_buf.get(uniq_id) | |
if old_pings == pings: | |
continue | |
# not equal | |
insert_data_buffer.append([user_id, '{}:00:00'.format(hour), course_id, unit_id, pings]) | |
check_and_bulk_insert() | |
if not old_pings: | |
print("Missing old: {}".format(uniq_id)) | |
continue | |
output = "" | |
pings = int(pings) | |
if old_pings < pings: | |
output = "Old smaller: {} < {}" | |
elif old_pings > pings: | |
output = "Old grater: {} > {}" | |
output.format(old_pings, pings) | |
# leftovers | |
check_and_bulk_insert(1) | |
cnx.commit() | |
cursor.close() | |
cnx.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector as connector | |
import configparser | |
import csv | |
def create_unique_id(user_id, course_id, unit_id): | |
return '::'.join(["{}"] * 3).format(user_id, course_id, unit_id) | |
config = configparser.ConfigParser() | |
config.read("config.ini") | |
default_section = config.sections()[0] | |
config_root = config[default_section] | |
cnx = connector.connect(user=config_root.get('user'), password=config_root.get('password'), | |
host=config_root.get('host'), database=config_root.get('database')) | |
cursor = cnx.cursor() | |
# get records from new table | |
with open("mismatch.csv", 'w') as f: | |
writer = csv.writer(f) | |
writer.writerows(mismatch_buf) | |
cursor.close() | |
cnx.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment