Skip to content

Instantly share code, notes, and snippets.

@sdg-1
Last active April 1, 2025 03:43
Show Gist options
  • Save sdg-1/2c8a00ce79b402f95be98322ad4a4ff4 to your computer and use it in GitHub Desktop.
Save sdg-1/2c8a00ce79b402f95be98322ad4a4ff4 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
)
from datetime import datetime
from google.cloud import storage, bigquery
import requests
import zipfile
import os
# DAG defaults
default_args = {
"owner": "airflow",
"start_date": datetime(2024, 8, 1),
"retries": 1,
}
dag = DAG(
"download_and_load_data_to_snowflake_with_gcs",
default_args=default_args,
description="Download a zip file to GCS, unzip it, and load data into Snowflake",
schedule_interval="@daily",
)
# Config
GCS_BUCKET_NAME = "healthcare_data_01"
ZIP_FOLDER = "zipped_data"
CSV_FOLDER = "csv_data"
BIGQUERY_DATASET = "airflow_example"
BIGQUERY_TABLE = "inpatient_claims"
JOINED_TABLE = "claims_beneficiary_plus"
# Task 1: Download ZIP and upload to GCS
def download_zip_to_gcs():
url = "https://www.cms.gov/research-statistics-data-and-systems/downloadable-public-use-files/synpufs/downloads/inpatient_claims.zip"
response = requests.get(url)
zip_path = "/tmp/inpatient_claims.zip"
with open(zip_path, "wb") as f:
f.write(response.content)
client = storage.Client()
bucket = client.bucket(GCS_BUCKET_NAME)
blob = bucket.blob(f"{ZIP_FOLDER}/inpatient_claims.zip")
blob.upload_from_filename(zip_path)
os.remove(zip_path)
# Task 2: Unzip and upload CSVs to GCS
def unzip_file_in_gcs():
client = storage.Client()
bucket = client.bucket(GCS_BUCKET_NAME)
zip_blob = bucket.blob(f"{ZIP_FOLDER}/inpatient_claims.zip")
zip_blob.download_to_filename("/tmp/inpatient_claims.zip")
with zipfile.ZipFile("/tmp/inpatient_claims.zip", "r") as zip_ref:
zip_ref.extractall("/tmp/inpatient_claims")
for filename in os.listdir("/tmp/inpatient_claims"):
local_path = os.path.join("/tmp/inpatient_claims", filename)
blob = bucket.blob(f"{CSV_FOLDER}/{filename}")
blob.upload_from_filename(local_path)
os.remove(local_path)
os.remove("/tmp/inpatient_claims.zip")
# Task 3: Check data quality in BigQuery
def check_data_quality(**kwargs):
client = bigquery.Client()
query = f"""
SELECT COUNT(*) as invalid_count
FROM `{BIGQUERY_DATASET}.{BIGQUERY_TABLE}`
WHERE CLM_ADMSN_DT IS NULL;
"""
query_job = client.query(query)
results = query_job.result()
invalid_count = next(results).invalid_count
if invalid_count > 0:
raise ValueError(
f"Data quality check failed: {invalid_count} invalid dates found"
)
# Define Operators
download_task = PythonOperator(
task_id="download_zip_to_gcs",
python_callable=download_zip_to_gcs,
dag=dag,
)
unzip_task = PythonOperator(
task_id="unzip_file_in_gcs",
python_callable=unzip_file_in_gcs,
dag=dag,
)
# DAG Task Order
download_task >> unzip_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment