Created
August 30, 2019 10:05
-
-
Save jatinchauhann/93f930cdaddce844b40847978cb9878b to your computer and use it in GitHub Desktop.
driver_function_hdfs_stats.py for HDFS Stats Tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if __name__ == '__main__': | |
file_name = args.jsonconfig | |
css_file = args.cssfile | |
try: | |
spark = SparkSession.builder.enableHiveSupport().appName("HDFS Stats Generator").getOrCreate() | |
print(spark.version) | |
log=spark.sparkContext._jvm.org.apache.log4j.LogManager.getLogger(__name__) | |
stats, out_dict = {}, {} | |
with open(css_file, 'r') as myfile: | |
style = myfile.read() | |
db_names_file_path_to_json = file_name | |
db_names_hdfs = convert_json_to_list(read_from_json(db_names_file_path_to_json), "key") | |
db_json = read_from_json(db_names_file_path_to_json) | |
db_names_hdfs.remove("admin") | |
db_names_hdfs.remove("mail_format") | |
print("Databases Used:") | |
print("\n".join(db_names_hdfs)) | |
# Function populates the stat<dict> with all the info obtained from the | |
# db_names_hdfs<list> database names | |
print("Collecting all stats...") | |
collect_all_stats(db_names_hdfs, spark) | |
print("Creating the Dataframe...") | |
# Creates a df_all_stats<DataFrame> with all the stats in the databases specified | |
df_all_stats = pd.DataFrame.from_dict(out_dict, orient='index', columns=['DB Name', 'Table Name', 'Owner', 'Location', 'totalSize', 'InputFormat']) | |
df_all_stats.rename_axis('Table Name') | |
# Converts the column 'totalSize' to numeric | |
df_all_stats = convert_clm_to_numeric(df_all_stats) | |
# Converts the NaN values to to ZERO | |
df_all_stats = get_convert_nan_to_zero(df_all_stats) | |
# Converts the sizes from KB to GB | |
df_all_stats = convert_kb_to_gb(df_all_stats, "totalSize") | |
print("DL_ALL_STATS: " + str(df_all_stats.head(1))) | |
print("Segregating dataframe database wise..") | |
# Segregates the df_all_stats<DataFrame> into database wise <DataFrame> in df_db_name_dict_seg_stats<dict> | |
df_db_name_dict_seg_stats = segregate_df_on_db(db_names_hdfs, df_all_stats) | |
print("Sending mail to Franchise Leads...") | |
#Send mail to the franchise leads | |
generate_mail(db_json, db_names_hdfs, df_all_stats, df_db_name_dict_seg_stats, admin=False) | |
print("Sending mail to Admin Leads...") | |
#Send mail to the Admin | |
generate_mail(db_json, db_names_hdfs, df_all_stats, df_db_name_dict_seg_stats, admin=True) | |
print("Emailed all the reports") | |
except Exception as e: | |
msg = "Module execution interrupted, following error occured : " + str(e) | |
raise Exception(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment