Last active
September 23, 2022 13:08
-
-
Save vanga/1cdaf07d9af32d21fda06feb5a144a95 to your computer and use it in GitHub Desktop.
For processing SE data dumps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Convert xml to json/parquet and stores them in TEMP_DIR | |
# Column names are modified to remove "@"" at the beginning for convenience | |
# Transforms tags column of posts into a list | |
# Uses pandas joins and groupby features do one-one and one-many joins | |
import xmltodict | |
import json | |
import simplejson | |
import os | |
from pathlib import Path | |
import pandas as pd | |
import numpy as np | |
import re | |
from tqdm import tqdm | |
import py7zr | |
from time import time | |
EXCLUDE_SITES = ["ru.stackoverflow.com"] | |
INCLUDE_SITES = ["devops.stackexchange.com"] | |
# , "superuser.com", "askubuntu.com" | |
DUMP_SRC_DIR = "/Users/vanga/Downloads/stackexchange" # directory where the downloaded zip files are present for all the sites | |
TEMP_DIR = "/Users/vanga/Downloads/stackexchange-temp" | |
OUTPUT_DIR = "/Users/vanga/Downloads/se-processed" | |
TABLES_TO_CONSIDER = ["Posts", "Comments", "Users"] | |
INTERMEDIATE_FORMAT = "parquet" # parquet or json | |
INCLUDE_COLUMNS = { | |
"Posts": ["Id", "PostTypeId", "OwnerUserId", "ParentId"], | |
"Users": ["Id", "DisplayName"], | |
"Columns": ["Id", "PostId", "Text", "UserId"] | |
} | |
# Note: some fields are mandatory to be included like primary and foreigh keys. | |
def prepare_directories(): | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
xml_temp_dir = os.path.join(TEMP_DIR, "xml") | |
os.makedirs(xml_temp_dir, exist_ok=True) | |
parquet_temp_dir = os.path.join(TEMP_DIR, INTERMEDIATE_FORMAT) | |
os.makedirs(parquet_temp_dir, exist_ok=True) | |
def site_name_from_zipfile(zip_file): | |
# downloaded zip files for stackoverflow.com are separate for each table/entity. | |
if zip_file.startswith("stackoverflow.com-"): | |
return "stackoverflow.com" | |
else: | |
return Path(zip_file).stem | |
def skip_site(site): | |
return (site in EXCLUDE_SITES) or (len(INCLUDE_SITES) > 0 and site not in INCLUDE_SITES) | |
def extract_dump(zip_file, dest_dir): | |
os.makedirs(dest_dir, exist_ok=True) | |
with py7zr.SevenZipFile(zip_file, 'r') as archive: | |
archive.extractall(path=dest_dir) | |
def convert_xml(xml_src_dir, dest_dir, to=INTERMEDIATE_FORMAT): | |
os.makedirs(dest_dir, exist_ok=True) | |
for file in os.listdir(xml_src_dir): | |
file_type = Path(file).suffix | |
if file_type.lower() != ".xml": | |
continue | |
file_wo_ext = Path(file).stem | |
src_abs_path = os.path.join(xml_src_dir, file) | |
target_file = os.path.join(dest_dir, file_wo_ext + "." + to) | |
if os.path.exists(target_file): | |
print(f"file: {file_wo_ext} already converted") | |
continue | |
with open(src_abs_path, 'r') as xml_file: | |
data_dict = xmltodict.parse(xml_file.read()) | |
if to == "parquet": | |
df = pd.DataFrame.from_dict(data_dict[file_wo_ext.lower()]['row']) | |
df.rename(columns=lambda x: re.sub('@','',x), inplace=True) | |
if file_wo_ext == "Posts": | |
transform_tags_column(df) | |
df.to_parquet(target_file) | |
elif to == "json": | |
with open(target_file, 'w') as json_file_o: | |
json_file_o.write(simplejson.dumps(data_dict[file_wo_ext.lower()]['row'], ignore_nan=True)) | |
else: | |
raise ValueError(f"Unsupported target format type: {to}. supported values are 'parquet', 'json'") | |
print(f"Finished converting {file_wo_ext} from xml to {to}") | |
def load_data_into_pandas(src_dir, tables, format="parquet"): | |
dfs = {} | |
print(f"Processing tables: {tables}") | |
for table in tables: | |
table_path = os.path.join(src_dir, table + "." + format) | |
if format == "parquet": | |
df = pd.read_parquet(table_path) | |
elif format == "json": | |
df = pd.read_json(table_path) | |
df.rename(columns=lambda x: re.sub('@','',x), inplace=True) | |
else: | |
raise ValueError("Unsuported format") | |
dfs[table] = df | |
return dfs | |
def transform_tags_column(df): | |
df['Tags'] = df['Tags'].str.replace('><',',').str.replace('<','').str.replace('>','').str.split(',') | |
def get_questions_subset(df): | |
questions_df = df[df['PostTypeId'] == "1"] | |
return questions_df | |
def denormalize_data(site, dfs): | |
posts_columns = INCLUDE_COLUMNS["Posts"] | |
comments_columns = INCLUDE_COLUMNS["Columns"] | |
users_columns = INCLUDE_COLUMNS["Users"] | |
posts_df = dfs['Posts'][posts_columns] | |
posts_df = posts_df[(posts_df.PostTypeId == "1") | (posts_df.PostTypeId == "2")] | |
comments_df = dfs['Comments'][comments_columns] | |
users_df = dfs['Users'][users_columns] | |
users_df = users_df.add_prefix("user_") | |
tqdm.pandas() | |
# join user info with posts | |
print("Adding user info to posts") | |
posts_df = pd.merge(posts_df, users_df, left_on='OwnerUserId', right_on='user_Id', how='left').progress_apply(lambda x: x).drop('user_Id', axis=1) | |
# join user info with comments | |
print("Adding user info to comments") | |
comments_df = pd.merge(comments_df, users_df, left_on='UserId', right_on='user_Id', how='left').progress_apply(lambda x: x).drop('user_Id', axis=1) | |
# group comments by posts and populate a list of dictionaries | |
print("Grouping comments") | |
comments_grouped = comments_df.groupby('PostId').progress_apply(lambda x: x.to_dict('records')).to_frame("comments") | |
# populate posts with comments | |
print("Adding comments to posts") | |
posts_df = pd.merge(posts_df, comments_grouped, left_on="Id", right_on="PostId", how='left').progress_apply(lambda x: x) | |
questions_df = get_questions_subset(posts_df) | |
print(f"Found {questions_df.shape[0]} questions") | |
# group answers by question | |
print("Grouping answers by question") | |
answers_grouped = posts_df[posts_df.PostTypeId == "2"].groupby("ParentId").progress_apply(lambda x: x.to_dict('records')).to_frame("answers") | |
# populate questions with answers | |
print("Adding answers to questions") | |
questions_df = pd.merge(questions_df, answers_grouped, left_on='Id', right_on='ParentId', how='left').drop('ParentId', axis=1).progress_apply(lambda x: x) | |
output_site_dir = os.path.join(OUTPUT_DIR, site) | |
os.makedirs(output_site_dir, exist_ok=True) | |
if INTERMEDIATE_FORMAT == "parquet": | |
output_file = os.path.join(output_site_dir, "questions.parquet") | |
questions_df.to_parquet(output_file) | |
else: | |
output_file = os.path.join(output_site_dir, "questions.json") | |
questions_df.to_json(output_file) | |
print(f"Finished proecssing site: '{site}'") | |
def process_dumps(): | |
prepare_directories() | |
for zip_file in os.listdir(DUMP_SRC_DIR): | |
if not zip_file.endswith(".7z"): | |
continue | |
site = site_name_from_zipfile(zip_file) | |
if skip_site(site): | |
continue | |
print(f"Processing site: {site}") | |
src_abs_path = os.path.join(DUMP_SRC_DIR, zip_file) | |
site_xml_temp_dir = os.path.join(TEMP_DIR, "xml", site) | |
extract_dump(src_abs_path, site_xml_temp_dir) | |
site_temp_dir = os.path.join(TEMP_DIR, INTERMEDIATE_FORMAT, site) | |
convert_xml(site_xml_temp_dir, site_temp_dir) | |
dfs = load_data_into_pandas(site_temp_dir, TABLES_TO_CONSIDER) | |
denormalize_data(site, dfs) | |
process_dumps() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment