Skip to content

Instantly share code, notes, and snippets.

@PachUp
Created December 4, 2024 03:54
Show Gist options
  • Save PachUp/85647ed014d1694fd0bcc81e4f170ef4 to your computer and use it in GitHub Desktop.
Save PachUp/85647ed014d1694fd0bcc81e4f170ef4 to your computer and use it in GitHub Desktop.
import asyncio
import aiohttp
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Prompt
from rich.theme import Theme
# Initialize Rich console
console = Console(theme=Theme({"info": "dim cyan", "warning": "magenta", "danger": "bold red"}))
api_key = "eGhO1dlBx1un5IiF6ipQEI6aj5D6Fpdj"
ranking = {}
def get_user_input():
console.print(Panel.fit(
"[bold blue]Welcome to Stock Analysis Tool[/bold blue]\n"
"[dim]Enter stock tickers separated by commas (e.g., AAPL,MSFT,GOOGL)[/dim]"
))
tickets = Prompt.ask("[bold green]Enter stock tickers[/bold green]")
return tickets.split(",")
# Function to fetch data from the FMP API asynchronously
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
# Fetch quarterly income statement data for TTM calculation (last 4 quarters)
async def get_quarterly_income_statement(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/income-statement/{ticker}?period=quarter&limit=4&apikey={api_key}"
res = await fetch_data(session, url)
if res == None or len(res) == 0: # check if the ticker exists
print("{0} ticker dosen't exist. you are autistic.".format(ticker))
quit()
else:
return res
# Fetch annual income statement data for 5-year average calculation
async def get_annual_income_statement(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/income-statement/{ticker}?limit=5&apikey={api_key}"
return await fetch_data(session, url)
# Fetch quarterly cash flow statement data for TTM calculation
async def get_quarterly_cash_flow_statement(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/cash-flow-statement/{ticker}?period=quarter&limit=4&apikey={api_key}"
return await fetch_data(session, url)
# Fetch annual cash flow statement data for 5-year average calculation
async def get_annual_cash_flow_statement(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/cash-flow-statement/{ticker}?limit=5&apikey={api_key}"
return await fetch_data(session, url)
# Fetch quarterly balance sheet data for TTM calculation
async def get_quarterly_balance_sheet(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{ticker}?period=quarter&limit=1&apikey={api_key}"
return await fetch_data(session, url)
# Fetch annual balance sheet data for 5-year average calculation
async def get_annual_balance_sheet(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{ticker}?limit=5&apikey={api_key}"
return await fetch_data(session, url)
async def get_mkt_cap(session, ticker, api_key):
url = f"https://financialmodelingprep.com/api/v3/market-capitalization/{ticker}?apikey={api_key}"
return await fetch_data(session, url)
def calcualte_mkt_cap(data):
return data[0]["marketCap"]
# AU,FMV,SBSW
# Calculate 5-Year Average for a specific field (Annual)
def calculate_5_year_average(data, field):
return sum(item[field] for item in data) / len(data)
# Calculate TTM (Trailing Twelve Months) for a specific field (Quarterly)
def calculate_ttm(data, field):
return sum(item[field] for item in data)
# Calculate growth percentage
def calculate_growth(ttm_value, avg_value):
return (ttm_value / avg_value) - 1
# Function to calculate financial metrics for a stock ticker
async def calculate_stock_metrics(ticker, api_key):
async with aiohttp.ClientSession() as session:
# Fetching data concurrently
quarterly_income_data = await get_quarterly_income_statement(session, ticker, api_key)
annual_income_data = await get_annual_income_statement(session, ticker, api_key)
quarterly_cash_flow_data = await get_quarterly_cash_flow_statement(session, ticker, api_key)
annual_cash_flow_data = await get_annual_cash_flow_statement(session, ticker, api_key)
quarterly_balance_data = await get_quarterly_balance_sheet(session, ticker, api_key)
annual_balance_data = await get_annual_balance_sheet(session, ticker, api_key)
mkt_cap = await get_mkt_cap(session, ticker, api_key)
# Calculate metrics
revenue_ttm = calculate_ttm(quarterly_income_data, 'revenue')
cogs_ttm = calculate_ttm(quarterly_income_data, 'costOfRevenue')
gross_profit_ttm = calculate_ttm(quarterly_income_data, 'grossProfit')
revenue_avg = calculate_5_year_average(annual_income_data, 'revenue')
cogs_avg = calculate_5_year_average(annual_income_data, 'costOfRevenue')
gross_profit_avg = calculate_5_year_average(annual_income_data, 'grossProfit')
cfo_ttm = calculate_ttm(quarterly_cash_flow_data, 'netCashProvidedByOperatingActivities')
fcf_ttm = calculate_ttm(quarterly_cash_flow_data, 'freeCashFlow')
cfo_avg = calculate_5_year_average(annual_cash_flow_data, 'netCashProvidedByOperatingActivities')
fcf_avg = calculate_5_year_average(annual_cash_flow_data, 'freeCashFlow')
total_equity_ttm = calculate_ttm(quarterly_balance_data, 'totalStockholdersEquity')
total_debt_ttm = calculate_ttm(quarterly_balance_data, 'totalDebt')
total_assets_ttm = calculate_ttm(quarterly_balance_data, 'totalAssets')
total_liabilities_ttm = calculate_ttm(quarterly_balance_data, 'totalLiabilities')
total_equity_avg = calculate_5_year_average(annual_balance_data, 'totalStockholdersEquity')
total_debt_avg = calculate_5_year_average(annual_balance_data, 'totalDebt')
total_assets_avg = calculate_5_year_average(annual_balance_data, 'totalAssets')
total_liabilities_avg = calculate_5_year_average(annual_balance_data, 'totalLiabilities')
# Calculate growth for each sector
revenue_growth = calculate_growth(revenue_ttm, revenue_avg)
cogs_growth = calculate_growth(cogs_ttm, cogs_avg)
gross_profit_growth = calculate_growth(gross_profit_ttm, gross_profit_avg)
cfo_growth = calculate_growth(cfo_ttm, cfo_avg)
fcf_growth = calculate_growth(fcf_ttm, fcf_avg)
total_equity_growth = calculate_growth(total_equity_ttm, total_equity_avg)
total_debt_growth = calculate_growth(total_debt_ttm, total_debt_avg)
total_assets_growth = calculate_growth(total_assets_ttm, total_assets_avg)
total_liabilities_growth = calculate_growth(total_liabilities_ttm, total_liabilities_avg)
mkt_cap_eq = calcualte_mkt_cap(mkt_cap)
return {
'ticker': ticker,
'revenue_growth': revenue_growth,
'cogs_growth': cogs_growth,
'gross_profit_growth': gross_profit_growth,
'cfo_growth': cfo_growth,
'fcf_growth': fcf_growth,
'total_equity_growth': total_equity_growth,
'total_debt_growth': total_debt_growth,
'total_assets_growth': total_assets_growth,
'total_liabilities_growth': total_liabilities_growth,
'mkt_cap' : mkt_cap_eq
}
def print_metric_rankings(results, metric_name, reverse=True):
table = Table(title=f"\n{metric_name} Rankings", show_header=True, header_style="bold magenta")
table.add_column("Rank", style="dim", width=6)
table.add_column("Ticker", style="bold")
table.add_column("Value", justify="right")
sorted_results = sorted(results, key=lambda x: x[metric_name.lower().replace(' ', '_')], reverse=reverse)
for rank, result in enumerate(sorted_results, 1):
value = result[metric_name.lower().replace(' ', '_')] * 100
color = "green" if value > 0 else "red"
table.add_row(
str(rank),
result['ticker'],
f"[{color}]{value:.2f}%[/{color}]"
)
console.print(table)
# Main function to calculate and compare all stocks
async def compare_stocks(tickers, api_key):
with Progress() as progress:
task = progress.add_task("[cyan]Analyzing stocks...", total=len(tickers))
tasks = [calculate_stock_metrics(ticker, api_key) for ticker in tickers]
results = await asyncio.gather(*tasks)
progress.update(task, advance=len(tickers))
def print_growth_rankings(results, growth_metric, metric_name):
# Sort the results based on growth metric (ascending for COGS and descending for others)
if metric_name == "Total Liabilities Growth" or metric_name == "Total Debt Growth" or metric_name == "COGS Growth":
sorted_results = sorted(results, key=lambda x: x[growth_metric])
else:
sorted_results = sorted(results, key=lambda x: x[growth_metric], reverse=True)
table = Table(title=f"--- {metric_name} Rankings ---", show_header=True, header_style="bold magenta")
table.add_column("Rank", style="dim", width=6)
table.add_column("Ticker", style="bold")
table.add_column(metric_name, justify="right")
# Use exactly the same ranking calculation as original
for rank, result in enumerate(sorted_results, 1):
ranking[metric_name] = ranking.get(metric_name, {})
ranking[metric_name][result['ticker']] = ranking[metric_name].get(result['ticker'], 0) + rank
value = result[growth_metric] * 100
color = "green" if value > 0 else "red"
table.add_row(
str(rank),
result['ticker'],
f"[{color}]{value:.2f}%[/{color}]"
)
console.print(table)
console.print() # Add blank line
# Print rankings for each growth metric - exactly as in original
for metric_name in ['Revenue Growth', 'COGS Growth', 'Gross Profit Growth', 'CFO Growth', 'FCF Growth',
'Total Equity Growth', 'Total Debt Growth', 'Total Assets Growth', 'Total Liabilities Growth', 'mkt_cap']:
print_growth_rankings(results, f"{metric_name.lower().replace(' ', '_')}", metric_name)
# Calculate the total score for each company - exactly as in original
total_scores = {}
for metric_name in ['Revenue Growth', 'COGS Growth', 'Gross Profit Growth', 'CFO Growth', 'FCF Growth',
'Total Equity Growth', 'Total Debt Growth', 'Total Assets Growth', 'Total Liabilities Growth', 'mkt_cap']:
for result in results:
ticker = result['ticker']
if ticker not in total_scores:
total_scores[ticker] = 0
total_scores[ticker] += ranking[metric_name].get(ticker, 0)
# Print final rankings with the same logic but nicer formatting
console.print("\n[bold]--- Final Stock Rankings Based on Total Score ---[/bold]")
final_table = Table(show_header=True, header_style="bold blue")
final_table.add_column("Rank", style="dim", width=6)
final_table.add_column("Ticker", style="bold")
final_table.add_column("Total Score", justify="right")
sorted_scores = sorted(total_scores.items(), key=lambda x: x[1]) # Sort by total score (ascending)
for rank, (ticker, score) in enumerate(sorted_scores, 1):
final_table.add_row(str(rank), ticker, str(score))
console.print(final_table)
if __name__ == "__main__":
try:
ticket = get_user_input()
tickers = []
for i in ticket:
ranking[i] = 0
tickers.append(i)
console.print("\n[bold]Starting analysis...[/bold]")
asyncio.run(compare_stocks(tickers, api_key))
except KeyboardInterrupt:
console.print("\n[bold red]Analysis cancelled by user[/bold red]")
except Exception as e:
console.print(f"\n[bold red]An error occurred: {str(e)}[/bold red]")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment