Skip to content

Instantly share code, notes, and snippets.

@sdg-1
Last active March 28, 2025 17:21
Show Gist options
  • Save sdg-1/02035262f5e32a5e88e918aa857ae627 to your computer and use it in GitHub Desktop.
Save sdg-1/02035262f5e32a5e88e918aa857ae627 to your computer and use it in GitHub Desktop.
import psycopg2
import os
def delete_old_records(conn, start_date, end_date):
delete_query = """DELETE FROM raw.public.stage_earthquake
WHERE dt BETWEEN %s AND %s
"""
cur = conn.cursor()
cur.execute(delete_query, (start_date, end_date))
conn.commit()
cur.close()
def transform_earthquake(conn, start_date, end_date):
cur = conn.cursor()
sql = """
INSERT INTO raw.public.stage_earthquake
SELECT
TIMESTAMP(CAST(ts AS bigint)/1000) AS dt,
st,
TRIM(SUBSTRING(place FROM POSITION(' of ' IN place) + 3)) AS place,
magnitude,
latitude,
longitude
FROM raw.public.earthquake
WHERE (CAST(ts AS bigint)/1000)::date BETWEEN %s AND %s
"""
cur.execute(sql, (start_date, end_date))
conn.commit()
cur.close()
def main():
# Database connection parameters
db_params = {
"dbname": os.getenv("DB_NAME"),
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASS"),
"host": os.getenv("DB_HOST"),
"port": "5432",
}
# Connect to the PostgreSQL database
conn = psycopg2.connect(**db_params)
start_date = "2023-04-07"
end_date = "2023-04-15"
# Delete old records
delete_old_records(conn, start_date, end_date)
# Load data into the table
transform_earthquake(conn, start_date, end_date)
print("Data loaded successfully")
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment