Created
September 21, 2018 19:43
-
-
Save troyharvey/50dfab3b37ec61bdaa0225bcbd8ffc65 to your computer and use it in GitHub Desktop.
An Airflow operator that translates a Postgres table definition into a Redshift CREATE TABLE statement.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CreateRedshiftTableFromSchemaOperator(BaseOperator): | |
""" | |
Create Redshift Table from schema file. | |
:param redshift_conn_id Airflow Redshift Connection ID | |
:type redshift_conn_id string | |
:param schema Redshift schema | |
:type schema string | |
:param table Redshift table | |
:type table string | |
:param s3_bucket S3 bucket containing the table definition | |
:type s3_bucket string | |
:param s3_key S3 key with table definition | |
:type s3_key string | |
""" | |
template_fields = ["s3_key"] | |
@apply_defaults | |
def __init__( | |
self, | |
redshift_conn_id, | |
schema, | |
table, | |
s3_bucket, | |
s3_key, | |
*args, | |
**kwargs | |
) -> bool: | |
"""Run a Redshift SQL create table script generated from the Postgres table definition.""" | |
super().__init__(*args, **kwargs) | |
self.redshift_conn_id = redshift_conn_id | |
self.schema = schema | |
self.table = table | |
self.s3_bucket = s3_bucket | |
self.s3_key = s3_key | |
def execute(self, context): | |
self.redshift_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) | |
conn = self.redshift_hook.get_conn() | |
cursor = conn.cursor() | |
sql = self._get_create_table_sql( | |
self._get_postgres_table_definition() | |
) | |
cursor.execute(sql) | |
cursor.close() | |
conn.commit() | |
return True | |
def _get_postgres_table_definition(self): | |
s3_hook = S3Hook() | |
return json.loads(s3_hook.read_key(key=self.s3_key, bucket_name=self.s3_bucket)) | |
def _get_create_table_sql(self, postgres_table_definition): | |
column_sql = [] | |
for column in postgres_table_definition: | |
column_sql.append( | |
"{column_name} {data_type} {nullable}".format( | |
column_name=column["column_name"], | |
data_type=self._postgres_to_redshift_data_type(column), | |
nullable="" | |
# nullable="not null" if column["is_nullable"].lower() == "no" else "" | |
) | |
) | |
return f""" | |
DROP TABLE IF EXISTS {self.schema}.{self.table} CASCADE; | |
CREATE TABLE IF NOT EXISTS {self.schema}.{self.table} | |
( | |
{", ".join(column_sql)} | |
); | |
""" | |
def _postgres_to_redshift_data_type(self, column): | |
""" | |
Convert a Postgres data type into a Redshift data type. | |
These data types appear in our Postgres databases, but are not | |
supported by Redshift: | |
- inet -> varchar(45) https://stackoverflow.com/a/7477384 | |
- json -> varchar(max) | |
- jsonb -> varchar(max) | |
- text -> varchar(max) | |
- uuid -> char(36) https://stackoverflow.com/a/18989006 | |
""" | |
if column["data_type"].lower() == "array": | |
return "varchar(16384)" | |
if column["data_type"].lower() == "inet": | |
return "varchar(45)" | |
if column["data_type"].lower() == "json": | |
return "varchar(16384)" | |
if column["data_type"].lower() == "jsonb": | |
return "varchar(16384)" | |
if column["data_type"].lower() == "text": | |
return "varchar(16384)" | |
if column["data_type"].lower() == "uuid": | |
return "char(36)" | |
if "character_maximum_length" in column and column["character_maximum_length"]: | |
return f"{column['data_type']}({column['character_maximum_length']})" | |
return column["data_type"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hey, could you explain in what format have you saved your postgres table schema?