Skip to content

Instantly share code, notes, and snippets.

@natarajanc
Last active November 19, 2018 15:40
Show Gist options
  • Save natarajanc/670657a922727d58f7cde25626c85575 to your computer and use it in GitHub Desktop.
Save natarajanc/670657a922727d58f7cde25626c85575 to your computer and use it in GitHub Desktop.
Template DAG function for ELT/ETL pipeline
from datetime import timedelta
from airflow import DAG
from dw_airflow.airflow_utils import etl_helpers # internal helper module
def create_dag(dag_id, dag_schedule, dag_start_date, data_source, notify_always=True):
"""
Create ELT DAG from template based on data source and env
Args:
dag_id: ELT DAG name
dag_schedule: (string) cron representation depicting dag schedule
dag_start_date: datetime object depicting DAG start date
data_source: internal data source label like SALESFORCE, MARKETO for ELT DAG identifier
notify_always: Boolean for slack notification preference on DAG success
"""
DAG_DEFAULT_ARGS = {
'owner': 'dw',
'depends_on_past': False,
'retries': 1,
'max_active_runs': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=2)
}
dag = DAG(dag_id,
default_args=DAG_DEFAULT_ARGS,
start_date=dag_start_date,
max_active_runs=1,
schedule_interval=dag_schedule)
# internal killswitch method returning a ShortCircuitOperator to shortcircuit ELT execution if necessary
should_etl_run = etl_helpers.dw_should_datasource_run_operator(data_source=data_source, dag=dag)
# returns a Postgres Operator setting the ETL marker after ELT run is complete
set_last_sync = etl_helpers.dw_set_last_sync_for_datasource_operator(data_source=data_source, dag=dag)
# returns BranchPythonOperator to confirm success of ETL dag run and send notifications as needed
register_etl_success = etl_helpers.register_etl_success_operator(data_source=data_source,
notify_always=notify_always,
dag=dag)
# returns SlackOperator that will post ETL success message
slack_notify = etl_helpers.post_etl_success_to_slack(dag=dag)
# returns DumyOperator that does a no-op
skip_slack_notify = etl_helpers.dw_dummy_etl_operator(task_id='skip_notify_slack', dag=dag)
# returns BashOperator that invokes docker-compose service to do the ELT fetch process for a given data source
fetch = etl_helpers.dw_elt_operator(data_source=data_source, etl_stage='FETCH', dag=dag)
# returns BashOperator that invokes docker-compose service to do the ELT Staging process for a given data source
stage = etl_helpers.dw_elt_operator(data_source=data_source, etl_stage='STAGE', dag=dag)
# returns BashOperator that invokes docker-compose service to do the ELT Transform process for a given data source
transform = etl_helpers.dw_elt_operator(data_source=data_source, etl_stage='TRANSFORM', dag=dag)
# returns BashOperator that invokes docker-compose service to execute data quality rules for a given data source
run_rules_engine = etl_helpers.dw_etl_rules_engine_operator(data_source=data_source, dag=dag)
# returns DummyOperator that does a no-op
finish_etl = etl_helpers.dw_finish_elt_operator(dag=dag)
# DAG sequencing
should_etl_run.set_downstream(fetch)
fetch.set_downstream(stage)
stage.set_downstream(transform)
transform.set_downstream(set_last_sync)
set_last_sync.set_downstream(run_rules_engine)
run_rules_engine.set_downstream(register_etl_success)
register_etl_success.set_downstream([slack_notify, skip_slack_notify])
slack_notify.set_downstream(finish_etl)
skip_slack_notify.set_downstream(finish_etl)
return dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment