Created
October 11, 2022 19:41
-
-
Save mrn-aglic/50174b840094d428ceff93f4cde74a5a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TransferPsqlToMysql(BaseOperator): | |
template_fields = ["sql"] | |
template_ext = [".sql"] | |
template_fields_renderers = {"sql": "sql"} | |
# pylint: disable=too-many-arguments | |
def __init__( | |
self, | |
postgres_conn_id, | |
mysql_conn_id, | |
sql, | |
dst_table_name, | |
truncate_table, | |
**kwargs, | |
): | |
self.postgres_conn_id = postgres_conn_id | |
self.mysql_conn_id = mysql_conn_id | |
self.sql = sql | |
self.dst_table_name = dst_table_name | |
self.truncate_table = truncate_table | |
super().__init__(**kwargs) | |
def execute(self, context: Context) -> Any: | |
postgres_hook = PostgresHook(self.postgres_conn_id) | |
mysql_hook = MySqlHook(self.mysql_conn_id) | |
with NamedTemporaryFile("w+b") as file: | |
logging.info("Exporting data to csv file") | |
postgres_hook.copy_expert( | |
f"COPY ({self.sql}) TO STDOUT WITH (FORMAT csv, DELIMITER '\t')", | |
file.name, | |
) | |
file.flush() | |
file.seek(0) | |
if self.truncate_table: | |
mysql_hook.run(f"TRUNCATE TABLE {self.dst_table_name}") | |
mysql_hook.bulk_load(self.dst_table_name, file.name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment