Last active
July 3, 2024 12:45
-
-
Save SolidStill/030becc0024c22d29f62e5a0022521aa to your computer and use it in GitHub Desktop.
EODHD_ETL_workflow_py Development
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from dotenv import load_dotenv | |
load_dotenv() # Load environment variables from .env file | |
EODHD_API_KEY = os.getenv("EODHD_API_KEY") | |
SQL_USER = os.getenv("SQL_USER") | |
SQL_PASS = os.getenv("SQL_PASS") | |
SQL_HOST = os.getenv("SQL_HOST") | |
# BELOW temp config code block for use during initial development in notebook | |
# from google.colab import userdata | |
# my_password = userdata.get('SQLPass') | |
# my_user = userdata.get('SQLUser') | |
# my_host = userdata.get('host') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import pandas as pd | |
# from config import EODHD_API_KEY | |
def get_eod_data(symbol, api_token): | |
"""Fetches EOD (End of Day) data for a given symbol from the EODHD API.""" | |
base_url = "https://eodhd.com/api/eod" | |
url = f"{base_url}/{symbol}?api_token={api_token}&fmt=json" | |
response = requests.get(url) | |
response.raise_for_status() # Raise an exception if the request failed | |
data = response.json() | |
# Create DataFrame and filter for the specified number of days | |
df = pd.DataFrame(data) | |
df = df.drop("volume", axis=1) # Drop the 'volume' column because the data is not relevant | |
df["date"] = pd.to_datetime(df["date"]) | |
df.sort_values("date", ascending=False, inplace=True) | |
return df |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
""" | |
Helper functions in this .py file: | |
create_connection(), | |
create_database_tables(symbol), | |
load_data_into_database(df, symbol) | |
""" | |
def create_connection(): | |
"""Creates a connection to the PostgreSQL database. Helps me use graceful 'with' statements when handling DB connections""" | |
try: | |
my_user = SQL_USER | |
my_password = SQL_PASS | |
my_host = SQL_HOST | |
conn = psycopg2.connect( | |
database="pagila", | |
user=my_user, | |
host=my_host, | |
password=my_password, | |
port=5432 | |
) | |
return conn | |
except psycopg2.Error as e: | |
print(f"Error connecting to database: {e}") | |
raise # Re-raise the error for potential error handling in 'higher-level' calling functions | |
def create_database_tables(symbol): | |
"""Creates the required tables in the PostgreSQL database if they don't exist""" | |
maturity_class, country = bond_symbols_dict[symbol] | |
table_name = f"de10_cdw_{country}_{maturity_class}_gbond" | |
# using 'with' statements to help handle connection/cursor objects gracefully | |
try: | |
with create_connection() as conn: | |
with conn.cursor() as cur: | |
# creates table under "student" schema | |
create_table_sql = f""" | |
CREATE TABLE IF NOT EXISTS student.{table_name} ( | |
date DATE PRIMARY KEY, | |
open NUMERIC, | |
high NUMERIC, | |
low NUMERIC, | |
close NUMERIC, | |
adjusted_close NUMERIC | |
) | |
""" | |
cur.execute(create_table_sql) | |
conn.commit() | |
except psycopg2.Error as e: | |
print(f"Error creating table '{table_name}': {e}") | |
raise # Re-raise the error for potential error handling in 'higher-level' calling functions | |
def load_data_into_database(df, symbol): | |
"""Loads data into the specified 'symbol' table in the database.""" | |
maturity_class, country = bond_symbols_dict[symbol] | |
try: | |
with create_connection() as conn: | |
create_database_tables(symbol) # Ensure tables exist before loading | |
with conn.cursor() as cur: | |
for _, row in df.iterrows(): | |
cur.execute(f""" | |
INSERT INTO student.de10_cdw_{country}_{maturity_class}_gbond (date, open, high, low, close, adjusted_close) | |
VALUES (%s, %s, %s, %s, %s, %s) | |
""", (row["date"], | |
row["open"], | |
row["high"], | |
row["low"], | |
row["close"], | |
row["adjusted_close"]) | |
) | |
# '%s' place holders in INSERT help with: | |
# 1)Prevent SQL Injection, | |
# 2)Type Safety: psycopg2 will automatically convert the Python data types into their corresponding PostgreSQL types | |
conn.commit() | |
except psycopg2.Error as error: | |
print(f"Database error: {error}") | |
raise # Re-raise the error for potential handling in 'higher-level' calling functions |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from config import EODHD_API_KEY, SQL_USER, SQL_PASS, SQL_HOST | |
if __name__ == "__main__": | |
update_data(bond_symbols_dict) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas | |
import psycopg2 | |
#from config import api_token | |
# from extract_transform import get_eod_data, clean_and_transform_data | |
# from load import create_connection, load_data_into_database | |
# Dictionary with UK/US government bond symbols as keys and with list of maturity classes and countries as values | |
# *** I had subtantial issues as a result of the values in this dict being UPPER | |
# CASE initially *** | |
bond_symbols_dict = { | |
'UK1Y.GBOND': ['1y', 'uk'], | |
'UK2Y.GBOND': ['2y', 'uk'], | |
'UK3Y.GBOND': ['3y', 'uk'], | |
'UK5Y.GBOND': ['5y', 'uk'], | |
'UK10Y.GBOND': ['10y', 'uk'], | |
'UK30Y.GBOND': ['30y', 'uk'], | |
'US1Y.GBOND': ['1y', 'us'], | |
'US2Y.GBOND': ['2y', 'us'], | |
'US3Y.GBOND': ['3y', 'us'], | |
'US5Y.GBOND': ['5y', 'us'], | |
'US10Y.GBOND': ['10y', 'us'], | |
'US30Y.GBOND': ['30y', 'us'], | |
'DE1Y.GBOND': ['1y', 'de'], # German 1-year bond | |
'DE2Y.GBOND': ['2y', 'de'], | |
'DE5Y.GBOND': ['5y', 'de'], | |
'DE10Y.GBOND': ['10y', 'de'], | |
'DE30Y.GBOND': ['30y', 'de'], | |
} | |
def fetch_latest_ingested_date(symbol): | |
"""This function exists to help me determine the delta that requires inserting to fulfill the update. It returns the latest data in DB table, or 'None' if table doesn't exist or is empty.""" | |
## I have an ongoing concern that this fn may return 'None' in the event of a connection error or similar - this could mess with the logic of the fn that calls this one and cause an attempt to INSERT already existing data, leading to "duplicate primary key" errors from DB | |
maturity_class, country = bond_symbols_dict[symbol] | |
table_name = f"de10_cdw_{country}_{maturity_class}_gbond" | |
print(f"Checking for table: {table_name}") # Debug print | |
try: | |
with create_connection() as conn: | |
with conn.cursor() as cur: | |
# Check if the table exists | |
cur.execute( | |
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'student' AND table_name = %s)", | |
(table_name,) #trailing comma ensures arg read as a tuple and not a string | |
) | |
table_exists = cur.fetchone()[0] | |
print(f"Table exists: {table_exists}") # Debug print | |
if table_exists: | |
# Check if the table is empty | |
cur.execute(f"SELECT COUNT(*) FROM student.{table_name};") | |
row_count = cur.fetchone()[0] | |
print(f"Row count: {row_count}") # Debug print | |
if row_count > 0: | |
cur.execute(f"SELECT MAX(date) FROM student.{table_name};") | |
latest_date = cur.fetchone()[0] | |
print(f"Latest date found: {latest_date}") # Debug print | |
return latest_date | |
else: | |
print("Table is empty.") # Debug print | |
return None | |
else: | |
print("Table does not exist.") # Debug print | |
return None | |
except psycopg2.Error as e: | |
print(f"Database error while fetching latest date: {e}") | |
raise | |
def update_data(symbol_dict): | |
"""Fetches, transforms, and loads the latest data for all specified symbols; if there is no existing data in the corresponding table (or the table does not exist), ALL existing data will be fetched and loaded""" | |
# this loop iterates through each bond type, fetches all API data for that bond and filters to keep all rows that are newer than in the current DB table - if there are in fact any new rows, they are loaded into the DB | |
for symbol, (maturity_class, country) in symbol_dict.items(): | |
# using a 'while' loop below to allow 3 tries at each symbol update, to build-in falut-tolerance in the event that there is an intermittent connection error or similar | |
retries = 0 | |
while retries < 3: | |
try: | |
latest_date_in_db = fetch_latest_ingested_date(symbol) | |
# Current method fetches all available API data before filtering, only around 30KB per symbol for entire historical feed | |
all_data_df = get_eod_data(symbol, EODHD_API_KEY) | |
if latest_date_in_db: #true if there exists a "latest date" in table | |
# Filter out data already in the database | |
latest_date_in_db = pd.Timestamp(latest_date_in_db) | |
new_data_df = all_data_df[all_data_df['date'] > latest_date_in_db] | |
else: #in the event the above if statement is false, ALL data from API loaded. | |
new_data_df = all_data_df | |
# NB must be careful about a fleeting error during fetch_latest_ingested_date(): if latest_date_in_db is returned as 'None', this will cause a load of all API data during update_data() (ie duplicate data) - have tried re-raising the exception in order to force the process to terminate | |
if not new_data_df.empty: | |
load_data_into_database(new_data_df, symbol) | |
print(f"Successfully updated data for {symbol}") | |
else: | |
print(f"No new data found for {symbol}") | |
break # Exit the loop after successful update | |
except Exception as e: | |
print(f"Error updating data for {symbol}: {e}") | |
retries += 1 | |
if retries >= 3: | |
raise # Re-raise the error for potential error handling in 'higher-level' calling functions | |
print("***Update completed for all symbols***") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment