Last active
April 7, 2022 17:31
-
-
Save vrutik2809/25cb32d2fd760bf8905a8e44ac2f6a17 to your computer and use it in GitHub Desktop.
Function prototypes for exporting tables from database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import create_engine | |
from sqlalchemy import inspect, Table, MetaData | |
from xlsxwriter import Workbook | |
# TODO replace variable with appropiate values | |
DB_URL = f"postgresql://{user}:{password}@{hostname}:{port}/{database-name}" | |
engine = create_engine(DB_URL) | |
def reflect_table(name, schema, engine, metadata=None, connection_to_use=None): | |
if metadata is None: | |
metadata = MetaData(bind=engine) | |
autoload_with = engine if connection_to_use is None else connection_to_use | |
return Table(name, metadata, schema=schema, autoload_with=autoload_with, extend_existing=True) | |
def export_table_to_dict(table_name,schema,engine): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if table_name not in inspector.get_table_names(schema=schema): | |
raise Exception(f'Table "{schema + "." + table_name}" not found') | |
table_data = {} | |
oid = inspector.get_table_oid(table_name,schema=schema) | |
table_data['table_oid'] = oid | |
table_data['table_name'] = table_name | |
table_data['table_schema'] = schema | |
table = reflect_table(table_name, schema, engine,connection_to_use=connection) | |
table_data['columns'] = [] | |
for col in table.columns: | |
col_data = {} | |
col_data['name'] = col.name | |
col_data['type'] = col.type.__class__.__name__ | |
col_data['nullable'] = col.nullable | |
col_data['primary_key'] = col.primary_key | |
col_data['autoincrement'] = col.autoincrement | |
col_data['default'] = col.default | |
table_data['columns'].append(col_data) | |
table_data['records'] = [dict(row._mapping.items()) for row in table.select().execute()] | |
return table_data | |
def export_view_to_dict(view_name,schema,engine): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if view_name not in inspector.get_view_names(schema=schema): | |
raise Exception(f'View "{schema + "." + view_name}" not found') | |
view_data = {} | |
view_data['view_name'] = view_name | |
view_data['view_schema'] = schema | |
table = reflect_table(view_name, schema, engine,connection_to_use=connection) | |
view_data['columns'] = [] | |
for col in table.columns: | |
col_data = {} | |
col_data['name'] = col.name | |
col_data['type'] = col.type.__class__.__name__ | |
col_data['nullable'] = col.nullable | |
col_data['primary_key'] = col.primary_key | |
col_data['autoincrement'] = col.autoincrement | |
col_data['default'] = col.default | |
view_data['columns'].append(col_data) | |
view_data['records'] = [dict(row._mapping.items()) for row in table.select().execute()] | |
return view_data | |
def export_schema_to_dict(schema,engine): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if schema not in inspector.get_schema_names(): | |
raise Exception(f'Schema "{schema}" not found') | |
schema_data = {} | |
schema_data['schema_name'] = schema | |
schema_data['tables'] = [] | |
for table_name in inspector.get_table_names(schema=schema): | |
table_data = export_table_to_dict(table_name,schema,engine) | |
schema_data['tables'].append(table_data) | |
return schema_data | |
def export_table_to_csv(table_name,schema,engine): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if table_name not in inspector.get_table_names(schema=schema): | |
raise Exception(f'Table "{schema + "." + table_name}" not found') | |
table = reflect_table(table_name, schema, engine, connection_to_use=connection) | |
csv_data = str() | |
for col in table.columns: | |
csv_data += col.name + ',' | |
csv_data = csv_data[:-1] | |
csv_data += '\n' | |
for record in table.select().execute(): | |
row = str() | |
for col in table.columns: | |
row += str(record[col.name]) + ',' | |
row = row[:-1] | |
row += '\n' | |
csv_data += row | |
return csv_data | |
def export_table_to_tsv(table_name,schema,engine): | |
return export_table_to_csv(table_name,schema,engine).replace(',','\t') | |
def write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet:Workbook.worksheet_class): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if table_name not in inspector.get_table_names(schema=schema): | |
raise Exception(f'Table "{schema + "." + table_name}" not found') | |
table = reflect_table(table_name, schema, engine, connection_to_use=connection) | |
cols = table.columns | |
for col in range(len(cols)): | |
worksheet.write(0,col,cols[col].name) | |
row_idx = 1 | |
for row in table.select().execute(): | |
for col in range(len(cols)): | |
worksheet.write(row_idx,col,row[cols[col].name]) | |
row_idx += 1 | |
def export_table_to_xlsx(table_name,schema,engine,excel_file_name='test.xlsx'): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if table_name not in inspector.get_table_names(schema=schema): | |
raise Exception(f'Table "{schema + "." + table_name}" not found') | |
workbook = Workbook(excel_file_name) | |
worksheet = workbook.add_worksheet(table_name) | |
write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet) | |
workbook.close() | |
def export_schema_to_xlsx(schema,engine,excel_file_name='test.xlsx'): | |
with engine.connect() as connection: | |
inspector = inspect(connection) | |
if schema not in inspector.get_schema_names(): | |
raise Exception(f'Schema "{schema}" not found') | |
workbook = Workbook(excel_file_name) | |
for table_name in inspector.get_table_names(schema=schema): | |
worksheet = workbook.add_worksheet(table_name) | |
write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet) | |
workbook.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment