Created
June 30, 2021 09:33
-
-
Save 4sushi/ba422a057e618a5e1800c44ac6da00bb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Exemple of script to copy tables from Cloud SQL database to Bigquery (using SQL proxy) with airflow | |
Note: part of the code inside function get_proxy_connection_engine is from airflow.contrib.operators | |
Author: 4sushi | |
Date: 2021-06-30 | |
""" | |
import pandas as pd | |
from sqlalchemy import inspect | |
from airflow.hooks.base_hook import BaseHook | |
from contextlib import contextmanager | |
from airflow.contrib.operators.gcp_sql_operator import CloudSqlHook, CloudSqlDatabaseHook | |
import os | |
@contextmanager | |
def get_proxy_connection_engine(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance, | |
db_type='mysql', db_host='127.0.0.1', db_port=3306): | |
"""Download cloud sql proxy and run it, return SQLAlchemy engine | |
Args: | |
db_user(str): database user | |
db_password(str): database password | |
db_name(str): database name (schema) | |
gcp_project_id(str): Google Cloud Platform project id | |
gcp_project_id(str): Google Cloud Platform project location | |
gcp_instance(str): Google Cloud Platform instance name | |
db_type(str): database type (mysql, postgres) | |
db_host(str): database host name / IP | |
db_port(int): database port | |
Yields: | |
sqlalchemy.engine.base.Engine: SQL alchemy engine | |
Examples: | |
with get_proxy_connection_engine(db_user='user', db_password='', db_name='test', | |
gcp_project_id='project-test', gcp_location='europe-west1', | |
gcp_instance='instance-test', db_type='mysql', db_host='127.0.0.1', | |
db_port=3306) as engine: | |
connection = engine.connect() | |
request = 'SELECT * FROM users') | |
# Load table data into dataframe | |
df = pd.read_sql(request, con=connection) | |
connection.close() | |
""" | |
def set_conf(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance, | |
db_type, db_host, db_port): | |
os.environ['AIRFLOW_CONN_GOOGLE_CLOUD_SQL_DEFAULT'] = ( | |
"gcpcloudsql://{user}:{password}@{ip}:{port}/{database}?" | |
"database_type={type}&" | |
"project_id={project_id}&" | |
"location={location}&" | |
"instance={instance}&" | |
"use_proxy=True&" | |
"sql_proxy_use_tcp=True".format( | |
user=db_user, password=db_password, ip=db_host, port=db_port, type=db_type, | |
database=db_name, project_id=gcp_project_id, location=gcp_location, instance=gcp_instance | |
) | |
) | |
set_conf(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance, | |
db_type, db_host, db_port) | |
cloudsql_db_hook = CloudSqlDatabaseHook() | |
cloudsql_db_hook.validate_ssl_certs() | |
cloudsql_db_hook.create_connection() | |
cloud_sql_proxy_runner = None | |
try: | |
cloudsql_db_hook.validate_socket_path_length() | |
database_hook = cloudsql_db_hook.get_database_hook() | |
try: | |
try: | |
if cloudsql_db_hook.use_proxy: | |
cloud_sql_proxy_runner = cloudsql_db_hook. \ | |
get_sqlproxy_runner() | |
cloudsql_db_hook.free_reserved_port() | |
# There is very, very slim chance that the socket will | |
# be taken over here by another bind(0). | |
# It's quite unlikely to happen though! | |
cloud_sql_proxy_runner.start_proxy() | |
yield database_hook.get_sqlalchemy_engine() | |
finally: | |
if cloud_sql_proxy_runner: | |
cloud_sql_proxy_runner.stop_proxy() | |
cloud_sql_proxy_runner = None | |
finally: | |
cloudsql_db_hook.cleanup_database_hook() | |
finally: | |
cloudsql_db_hook.delete_connection() | |
cloudsql_db_hook = None | |
def export_mysql_tables_to_bq(): | |
"""Export tables to Big query""" | |
c = BaseHook.get_connection("ext_mysql_db") | |
with get_proxy_connection_engine(db_user=c.login, db_password=c.password, db_name=c.schema, | |
gcp_project_id='project', gcp_location='europe-west1', | |
gcp_instance='instance', db_type='mysql', db_host=c.host, | |
db_port=c.port) as engine: | |
connection = engine.connect() | |
inspector = inspect(engine) | |
errors = [] | |
# Tables on the database | |
tables_db = inspector.get_table_names(schema=c.schema) | |
for table_name in tables_db: | |
export_mysql_table_to_bq(connection, table_name, 'test', errors) | |
connection.close() | |
# Control if there is table not exported | |
if len(tables_to_copy_not_available) > 0: | |
e = Exception('Some tables are not in the database: {}'.format(tables_to_copy_not_available)) | |
errors.append(e) | |
if len(errors) > 0: | |
raise Exception(errors) | |
def export_mysql_table_to_bq(connection, table_name, bq_dataset, errors): | |
"""Export Mysql table to Big Query. If the table exists in BigQuery, drop it and create new one. | |
Args: | |
connecion(sqlalchemy.engine.base.Connection): SQL alchemy connection to Mysql database | |
table_name(str): mysql and BigQuery table name (we keep the same name) | |
bq_dataset(str): Big Query dataset name | |
errors(list[Exception]): list of exceptions, used to continue processing to export all tables | |
""" | |
try: | |
request = 'SELECT * FROM {}'.format(table_name) | |
# Load table data into dataframe | |
df = pd.read_sql(request, con=connection) | |
table_id = '{}.{}'.format(bq_dataset, table_name) | |
# Export dataframe to Big Query (if the table exists, drop it and create new one) | |
df.to_gbq(table_id, chunksize=1000, progress_bar=False, if_exists='replace') | |
except Exception as e: | |
# Capture error to continue processing | |
errors.append(e) | |
if __name__ == '__main__': | |
export_mysql_tables_to_bq() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment