Last active
April 1, 2025 03:51
-
-
Save sdg-1/ef97213329a701298c2b02fb8815316f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.dummy import DummyOperator | |
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( | |
GCSToBigQueryOperator, | |
) | |
from airflow.providers.google.cloud.operators.bigquery import ( | |
BigQueryExecuteQueryOperator, | |
BigQueryInsertJobOperator, | |
BigQueryTableCheckOperator, | |
) | |
from datetime import datetime, timedelta | |
from google.cloud import bigquery | |
FILE_LOCATION = "csv_data/DE1_0_2008_to_2010_Inpatient_Claims_Sample_1.csv" | |
# DAG default arguments | |
default_args = { | |
"owner": "airflow", | |
"depends_on_past": False, | |
"retries": 1, | |
"retry_delay": timedelta(minutes=5), | |
} | |
# DAG definition | |
dag = DAG( | |
dag_id="patient_claims_plus", | |
default_args=default_args, | |
description="Creating a joined table between patients and claims", | |
schedule_interval="@daily", | |
start_date=datetime(2024, 11, 1), | |
catchup=False, | |
) | |
# Task 1: Delete previous entries for this file | |
delete_raw_inpatient_claims = BigQueryExecuteQueryOperator( | |
task_id="delete_raw_inpatient_claims", | |
sql=""" | |
DELETE FROM `TEMP.airflow_example.raw_inpatient_claims` | |
WHERE file_name = '{{ params.FILE_LOCATION }}' | |
""", | |
use_legacy_sql=False, | |
params={"FILE_LOCATION": FILE_LOCATION}, | |
dag=dag, | |
) | |
# Task 2: Load CSV to raw table | |
insert_raw_inpatient_claims = GCSToBigQueryOperator( | |
task_id="insert_raw_inpatient_claims", | |
bucket="healthcare_data_01", | |
source_objects=[FILE_LOCATION], | |
destination_project_dataset_table="TEMP.airflow_example.raw_inpatient_claims", | |
source_format="CSV", | |
skip_leading_rows=1, | |
write_disposition="WRITE_TRUNCATE", | |
dag=dag, | |
) | |
# Task 3: Update filename metadata | |
update_raw_inpatient_claims = BigQueryExecuteQueryOperator( | |
task_id="update_raw_inpatient_claims", | |
sql=""" | |
UPDATE `TEMP.airflow_example.raw_inpatient_claims` | |
SET file_name = '{{ params.FILE_LOCATION }}' | |
WHERE file_name IS NULL; | |
""", | |
use_legacy_sql=False, | |
params={"FILE_LOCATION": FILE_LOCATION}, | |
dag=dag, | |
) | |
# Task 4: Create joined patient_claims_plus table | |
create_patient_claims_plus = BigQueryInsertJobOperator( | |
task_id="create_patient_claims_plus", | |
configuration={ | |
"query": { | |
"query": """ | |
CREATE OR REPLACE TABLE `TEMP.airflow_example.patient_claims_plus` AS | |
SELECT | |
a.DESYNPUF_ID AS patient_id, | |
PARSE_DATE('%Y%m%d', CAST(a.CLM_FROM_DT AS STRING)) AS claim_from_date, | |
PARSE_DATE('%Y%m%d', CAST(a.CLM_THRU_DT AS STRING)) AS claim_thru_date, | |
a.CLM_ID AS claim_id, | |
a.PRVD_NUM AS provider_number, | |
a.CLM_PMT_AMT AS claim_payment_amount, | |
a.ICD9_DGNS_CD_1 AS icd_diagnosis_code_1, | |
a.ICD9_DGNS_CD_2 AS icd_diagnosis_code_2, | |
a.ICD9_DGNS_CD_3 AS icd_diagnosis_code_3, | |
a.ICD9_DGNS_CD_4 AS icd_diagnosis_code_4, | |
a.ICD9_DGNS_CD_5 AS icd_diagnosis_code_5, | |
a.ICD9_DGNS_CD_6 AS icd_diagnosis_code_6, | |
a.ICD9_DGNS_CD_7 AS icd_diagnosis_code_7, | |
a.ICD9_DGNS_CD_8 AS icd_diagnosis_code_8, | |
a.ICD9_DGNS_CD_9 AS icd_diagnosis_code_9, | |
b.BENE_HI_CVRAGE_TOT_MONS AS patient_hospital_insurance_total_months, | |
b.BENE_SMI_CVRAGE_TOT_MONS AS patient_supplementary_medical_insurance_total_months, | |
b.BENE_BIRTH_DT AS patient_birth_date, | |
b.BENE_DEATH_DT AS patient_death_date, | |
CASE | |
WHEN b.BENE_SEX_IDENT_CD = 1 THEN "Male" | |
WHEN b.BENE_SEX_IDENT_CD = 2 THEN "Female" | |
ELSE "Unknown" | |
END AS patient_sex | |
FROM `TEMP.airflow_example.raw_inpatient_claims` a | |
LEFT JOIN `TEMP.airflow_example.raw_beneficiary` b | |
ON a.DESYNPUF_ID = b.DESYNPUF_ID | |
""", | |
"useLegacySql": False, | |
} | |
}, | |
dag=dag, | |
) | |
# Task 5: Data quality check | |
dq_row_count_patient_claims_plus = BigQueryTableCheckOperator( | |
task_id="dq_row_count_patient_claims_plus", | |
table="airflow_example.patient_claims_plus", | |
checks={ | |
"row_count_check": { | |
"check_statement": "COUNT(*) > 0", | |
"use_legacy_sql": False, | |
}, | |
}, | |
dag=dag, | |
) | |
# Task 6: Dummy marker for success | |
ready = DummyOperator(task_id="ready", dag=dag) | |
# Set task dependencies | |
( | |
delete_raw_inpatient_claims | |
>> insert_raw_inpatient_claims | |
>> update_raw_inpatient_claims | |
>> create_patient_claims_plus | |
>> dq_row_count_patient_claims_plus | |
>> ready | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment