Created
July 11, 2025 01:20
-
-
Save wongni/ff07f37f72af9241c93f26469f63887c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import statistics | |
import requests | |
import json | |
import inspect | |
from dotenv import load_dotenv | |
from datetime import datetime | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib.ticker as tick | |
import streamlit as st | |
from bs4 import BeautifulSoup | |
import yfinance as yf | |
from ollama import chat | |
from ollama import ChatResponse | |
import numpy as np | |
debug = False | |
risk_free_rate = 2.958 | |
average_market_risk_premium = 3.010 | |
minimum_growth_rate = 4 | |
optimistic_5_10_yrs_growth_rate_discount = 0.75 | |
pessimistic_5_10_yrs_growth_rate_discount = 0.5 | |
pass_color = (0.5, 1.0, 0.5, 0.3) | |
fail_color = (1.0, 0.5, 0.5, 0.3) | |
load_dotenv() | |
api_key = os.getenv("DCF_API_KEY") | |
base_url = os.getenv("DCF_BASE_URL") | |
@st.cache_data | |
def get_income_statements(ticker, period="annual"): | |
return get_financial_statement(ticker, "income-statement", period)[:10] | |
@st.cache_data | |
def get_balance_sheets(ticker, period="quarterly"): | |
return get_financial_statement(ticker, "balance-sheet-statement", period)[:10] | |
@st.cache_data | |
def get_cash_flow_statements(ticker, period="annual"): | |
return get_financial_statement(ticker, "cash-flow-statement", period)[:10] | |
@st.cache_data | |
def get_stock_prices(ticker, period="daily", limit=1250): | |
return get_financial_statement(ticker, "prices", period)[:limit] | |
@st.cache_data | |
def get_financial_ratios(ticker, period="annual"): | |
df = get_financial_statement(ticker, "ratios", period) | |
df["debtToEBITDARatio"] = ( | |
st.session_state.balance_sheets["totalDebt"] | |
/ st.session_state.income_statements["ebitda"] | |
) | |
df["debtServicingRatio"] = ( | |
st.session_state.income_statements["interestExpense"] | |
- st.session_state.income_statements["interestIncome"] | |
) / st.session_state.cash_flow_statements["operatingCashFlow"] | |
df["revenueToAccountsReceivablesRatio"] = ( | |
st.session_state.income_statements["revenue"] | |
/ st.session_state.balance_sheets["netReceivables"] | |
) | |
return df[:10] | |
@st.cache_data | |
def get_financial_statement(ticker, data_type, period): | |
today_date = get_today_date() | |
file_name = get_file_name(ticker, data_type, period, today_date) | |
if file_exists(file_name): | |
data = read_data(file_name) | |
json_object = json.loads(data) | |
else: | |
json_object = fetch_data(ticker, data_type, period) | |
data = json.dumps(json_object) | |
write_data(file_name, data) | |
if isinstance(json_object["report"], list): | |
df = pd.DataFrame.from_records(json_object["report"]) | |
else: | |
df = pd.DataFrame.from_records([json_object["report"]]) | |
return df | |
@st.cache_data | |
def get_growth_rates(ticker): | |
data_type = "growth_rates" | |
period = "annual" | |
today_date = get_today_date() | |
file_name = get_file_name(ticker, data_type, period, today_date) | |
if file_exists(file_name): | |
data = read_data(file_name) | |
json_object = json.loads(data) | |
df = pd.DataFrame.from_records(json_object) | |
else: | |
json_object = [] | |
number_of_growth_rates = 0 | |
total_growth_rates = 0 | |
growth_rate = get_growth_rate_from_zacks(ticker) | |
if growth_rate > 0: | |
number_of_growth_rates += 1 | |
total_growth_rates += growth_rate | |
json_object.append({"source": "ZK", "growthRate": growth_rate}) | |
growth_rate = get_growth_rate_from_finviz(ticker) | |
if growth_rate > 0: | |
number_of_growth_rates += 1 | |
total_growth_rates += growth_rate | |
json_object.append({"source": "FV", "growthRate": growth_rate}) | |
growth_rate = get_growth_rate_from_yf(ticker) | |
if growth_rate > 0: | |
number_of_growth_rates += 1 | |
total_growth_rates += growth_rate | |
json_object.append({"source": "YF", "growthRate": growth_rate}) | |
growth_rate = get_growth_rate_from_gurufocus(ticker) | |
if growth_rate > 0: | |
number_of_growth_rates += 1 | |
total_growth_rates += growth_rate | |
json_object.append({"source": "GF", "growthRate": growth_rate}) | |
growth_rate = get_growth_rate_from_seekingalpha(ticker) | |
if growth_rate > 0: | |
number_of_growth_rates += 1 | |
total_growth_rates += growth_rate | |
json_object.append({"source": "SA", "growthRate": growth_rate}) | |
# json_object.append( | |
# { | |
# "source": "Average", | |
# "growthRate": total_growth_rates / number_of_growth_rates, | |
# } | |
# ) | |
df = pd.DataFrame(json_object) | |
data = json.dumps(json_object) | |
write_data(file_name, data) | |
return df[:10] | |
@st.cache_data | |
def read_moat_analysis_prompt_file(): | |
with open("moat_analysis_prompt.txt", "r") as file: | |
return file.read() | |
@st.cache_data | |
def get_moat_analysis(ticker, company_profile): | |
data_type = "moat_analysis" | |
period = "" | |
today_date = get_today_date() | |
file_name = get_file_name(ticker, data_type, period, today_date) | |
if file_exists(file_name): | |
data = read_data(file_name) | |
else: | |
prompt = read_moat_analysis_prompt_file() | |
prompt += company_profile | |
response: ChatResponse = chat( | |
model="llama3.2", | |
messages=[ | |
{"role": "user", "content": prompt}, | |
], | |
) | |
data = response.message.content | |
write_data(file_name, data) | |
return data | |
@st.cache_data | |
def get_html(ticker, source): | |
today_date = get_today_date() | |
file_name = get_file_name(ticker, source, "", today_date) | |
if file_exists(file_name): | |
html = BeautifulSoup(open(file_name.lower(), encoding="utf-8"), "html.parser") | |
else: | |
if source == "zacks": | |
html = fetch_html("https://www.zacks.com/stock/quote/{}".format(ticker)) | |
elif source == "finviz": | |
html = fetch_html("https://finviz.com/quote.ashx?t={}".format(ticker)) | |
elif source == "gurufocus": | |
html = fetch_html( | |
"https://www.gurufocus.com/stock/{}/summary".format(ticker) | |
) | |
elif source == "seekingalpha": | |
html = fetch_html( | |
"https://seekingalpha.com/symbol/{}/growth".format(ticker) | |
) | |
else: | |
raise Exception(f"Unknown source: {source}") | |
write_data(file_name, str(html)) | |
return html | |
@st.cache_data | |
def get_growth_rate_from_zacks(ticker): | |
html = get_html(ticker, "zacks") | |
try: | |
eps_growth_rate_text = html.find("p", class_="float_right").get_text(strip=True) | |
return float(eps_growth_rate_text.replace("%", "")) | |
except ValueError as e: | |
print("Error converting {} to float: {}".format(eps_growth_rate_text, e)) | |
return 0.0 | |
except AttributeError as e: | |
print("Error finding a tag", e) | |
return 0.0 | |
@st.cache_data | |
def fetch_html(url): | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Connection": "keep-alive", | |
} | |
try: | |
response = requests.get( | |
url, | |
headers=headers, | |
timeout=10, | |
) | |
response.raise_for_status() | |
return BeautifulSoup(response.text, "html.parser") | |
except requests.RequestException as e: | |
print(f"Error making request: {e}") | |
return None | |
except Exception as e: | |
print(f"Error parsing data: {e}") | |
return None | |
@st.cache_data | |
def get_growth_rate_from_finviz(ticker): | |
html = get_html(ticker, "finviz") | |
try: | |
eps_growth_rate_text = ( | |
html.find(string="EPS next 5Y").find_next("td").text.strip() | |
) | |
return float(eps_growth_rate_text.replace("%", "")) | |
except ValueError as e: | |
print("Error converting {} to float: {}".format(eps_growth_rate_text, e)) | |
return 0.0 | |
except AttributeError as e: | |
print("Error finding a tag", e) | |
return 0.0 | |
@st.cache_data | |
def get_growth_rate_from_yf(ticker): | |
try: | |
return yf.Ticker(ticker).earnings_estimate["growth"].iloc[-1] * 100 | |
except TypeError: | |
return 0.0 | |
except IndexError: | |
return 0.0 | |
except KeyError: | |
return 0.0 | |
@st.cache_data | |
def get_growth_rate_from_gurufocus(ticker): | |
html = get_html(ticker, "gurufocus") | |
try: | |
target_row = html.find( | |
"td", | |
class_="t-caption p-v-sm semi-bold", | |
string=lambda text: text | |
and "Future 3-5Y EPS without NRI Growth Rate Estimate" in text, | |
) | |
value_span = target_row.find_next("td", class_="t-caption").find( | |
"span", class_="p-l-sm" | |
) | |
eps_growth_rate_text = value_span.get_text(strip=True) | |
return float(eps_growth_rate_text.replace("%", "")) | |
except ValueError as e: | |
print("Error converting {} to float: {}".format(eps_growth_rate_text, e)) | |
return 0.0 | |
except AttributeError as e: | |
print("Error finding a tag", e) | |
return 0.0 | |
@st.cache_data | |
def get_growth_rate_from_seekingalpha(ticker): | |
html = get_html(ticker, "seekingalpha") | |
try: | |
eps_growth_rate_text = ( | |
html.find(string="EPS FWD Long Term Growth (3-5Y CAGR)") | |
.find_next("td") | |
.find_next("td") | |
.text.strip() | |
) | |
return float(eps_growth_rate_text.replace("%", "")) | |
except ValueError as e: | |
print("Error converting {} to float: {}".format(eps_growth_rate_text, e)) | |
return 0.0 | |
except AttributeError as e: | |
print("Error finding a tag", e) | |
return 0.0 | |
@st.cache_data | |
def get_beta(ticker): | |
html = get_html(ticker, "zacks") | |
try: | |
beta = html.find(string="Beta").find_next("dd").text.strip() | |
return float(beta) | |
except ValueError as e: | |
print("Error converting {} to float: {}".format(beta, e)) | |
return 1.0 | |
except AttributeError as e: | |
print("Error finding a tag", e) | |
return 1.0 | |
@st.cache_data | |
def get_discount_rate(beta): | |
return risk_free_rate + beta * average_market_risk_premium | |
@st.cache_data | |
def get_outstanding_shares(ticker): | |
if "impliedSharesOutstanding" in yf.Ticker(ticker).info: | |
implied_shares_outstanding = yf.Ticker(ticker).info["impliedSharesOutstanding"] | |
else: | |
implied_shares_outstanding = 0 | |
if "sharesOutstanding" in yf.Ticker(ticker).info: | |
shares_outstanding = yf.Ticker(ticker).info["sharesOutstanding"] | |
else: | |
shares_outstanding = 0 | |
return max(implied_shares_outstanding, shares_outstanding) | |
@st.cache_data | |
def get_company_profile(ticker): | |
if "longBusinessSummary" in yf.Ticker(ticker).info: | |
return yf.Ticker(ticker).info["longBusinessSummary"] | |
else: | |
return ticker | |
def human_to_number(num_str): | |
"""Converts a human-readable number string to a number.""" | |
num_str = num_str.lower() | |
multipliers = { | |
"k": 1000, | |
"m": 1000000, | |
"b": 1000000000, | |
"t": 1000000000000, | |
} | |
if num_str[-1] in multipliers: | |
return float(num_str[:-1]) * multipliers[num_str[-1]] | |
else: | |
return float(num_str) | |
def calculate_wacc(ticker): | |
interest_expense = st.session_state.income_statements.iloc[0]["interestExpense"] | |
total_debt = st.session_state.balance_sheets.iloc[0]["totalDebt"] | |
cost_of_debt = interest_expense / total_debt | |
income_tax_expense = st.session_state.income_statements.iloc[0]["incomeTaxExpense"] | |
income_before_tax = st.session_state.income_statements.iloc[0]["incomeBeforeTax"] | |
effective_tax_rate = income_tax_expense / income_before_tax | |
cost_of_debt_after_tax = cost_of_debt * (1 - effective_tax_rate) * 100 | |
beta = get_beta(ticker) | |
market_return = 10 | |
risk_free_rate = yf.Ticker("^TNX").info["previousClose"] | |
cost_of_equity = risk_free_rate + beta * (market_return - risk_free_rate) | |
print(yf.Ticker(ticker).info) | |
market_cap = yf.Ticker(ticker).info["marketCap"] | |
total = market_cap + total_debt | |
weight_of_debt = total_debt / total | |
weight_of_equity = market_cap / total | |
wacc = cost_of_debt_after_tax * weight_of_debt + cost_of_equity * weight_of_equity | |
print(f"cost of debt: {cost_of_debt:.2f}") | |
print(cost_of_equity) | |
print(total_debt, weight_of_debt, market_cap, weight_of_equity) | |
print(wacc) | |
return wacc | |
def get_simplywallst_ticker_url(ticker): | |
# TODO: Figure out how to get the growth data from the URL. By default, the returned HTML does not contain the data. | |
# Headers to mimic a browser request | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Connection": "keep-alive", | |
} | |
try: | |
response = requests.post( | |
"https://17iqhzwxzw-dsn.algolia.net/1/indexes/companies/query?x-algolia-agent=Algolia%20for%20JavaScript%20(4.4.0)%3B%20Browser%20(lite)&x-algolia-api-key=be7c37718f927d0137a88a11b69ae419&x-algolia-application-id=17IQHZWXZW", | |
json.dumps({"query": "{}".format(ticker)}), | |
headers=headers, | |
timeout=10, | |
) | |
response.raise_for_status() | |
ticker_query_result = response.json() | |
ticker_url = ticker_query_result["hits"][0]["url"] | |
return "https://simplywall.st{}".format(ticker_url) | |
except requests.RequestException as e: | |
print(f"Error making request: {e}") | |
return None | |
except Exception as e: | |
print(f"Error parsing data: {e}") | |
return None | |
def fetch_data(ticker, data_type, period): | |
function_name = inspect.currentframe().f_code.co_name | |
url = "{}/{}/?ticker={}&period={}&key={}".format( | |
base_url, data_type, ticker, period, api_key | |
) | |
print("+{}: fetching {}".format(function_name, url)) | |
response = requests.get(url) | |
return response.json() | |
def get_file_name(ticker, data_type, period, date): | |
return "{}_{}_{}_{}.json".format(ticker, data_type, period, date) | |
def file_exists(file_name): | |
return os.path.exists(file_name.lower()) | |
def write_data(file_name, data): | |
with open(file_name.lower(), "w", encoding="utf-8") as f: | |
f.write(data) | |
def read_data(file_name): | |
with open(file_name.lower(), "r", encoding="utf-8") as f: | |
return f.read() | |
def get_today_date(): | |
return datetime.today().strftime("%Y-%m-%d") | |
def draw_range(ax, range_list, total_data_points, start_value, color="blue", alpha=0.1): | |
start_values = [start_value, start_value] | |
for i, r in enumerate(range_list): | |
num_data_points = int(total_data_points * (r[3] - r[2])) | |
for j in range(num_data_points): | |
ax.axhspan( | |
start_values[0] + j * (r[0] - start_values[0]) / (num_data_points - 1), | |
start_values[1] + j * (r[1] - start_values[1]) / (num_data_points - 1), | |
xmin=r[2] + j * (r[3] - r[2]) / (num_data_points - 1), | |
xmax=r[2] + (j + 1) * (r[3] - r[2]) / (num_data_points - 1), | |
color=color, | |
alpha=alpha, | |
) | |
start_values[0] = r[0] | |
start_values[1] = r[1] | |
def number_formatter(x, _): | |
if abs(x) < 1 and abs(x) > 0: | |
return f"{x:.2f}" | |
elif abs(x) >= 1_000_000_000: | |
return f"{x/1000000000:.1f}B" | |
elif abs(x) >= 1_000_000: | |
return f"{x/1000000:.1f}M" | |
elif abs(x) >= 1_000: | |
return f"{x/1000:.1f}K" | |
else: | |
return f"{x:.1f}" | |
def add_subplot( | |
df, | |
y_axis, | |
axis, | |
x_axis="date", | |
cut_line_value=None, | |
title=None, | |
pass_if_over_cut_line=True, | |
): | |
ax = df.plot(x=x_axis, y=y_axis, kind="bar", ax=axis) | |
ax.set_title( | |
title if title is not None else y_axis, y=0.9, bbox=dict(facecolor="lightblue") | |
) | |
ax.invert_xaxis() | |
ax.legend().remove() | |
ax.spines["right"].set_visible(False) | |
ax.spines["top"].set_visible(False) | |
ax.yaxis.set_major_formatter(plt.FuncFormatter(number_formatter)) | |
ax.xaxis.set_major_formatter( | |
tick.FuncFormatter(lambda x, _: df.iloc[int(x)][x_axis].split("-")[0]) | |
) | |
ax.set_xlabel("") | |
cut_line_color = "g" if pass_if_over_cut_line else "r" | |
if cut_line_value is not None: | |
ax.axhline(y=cut_line_value, color=cut_line_color, linestyle="solid") | |
if df[y_axis].iloc[0] >= cut_line_value: | |
ax.set_facecolor(pass_color if pass_if_over_cut_line else fail_color) | |
else: | |
ax.set_facecolor(fail_color if pass_if_over_cut_line else pass_color) | |
def generate_growth_rates(expected_growth_rate, growth_rates_discount_after_5yrs=0.5): | |
growth_rates_in_next_20yrs = [] | |
for _ in range(5): | |
growth_rates_in_next_20yrs.append(expected_growth_rate) | |
for _ in range(5): | |
growth_rates_in_next_20yrs.append( | |
# max(min(15.0, expected_growth_rate * growth_rates_discount_after_5yrs), minimum_growth_rate) | |
max( | |
min(15.0, (expected_growth_rate + minimum_growth_rate) / 2), | |
minimum_growth_rate, | |
) | |
) | |
for _ in range(10): | |
growth_rates_in_next_20yrs.append(minimum_growth_rate) | |
return growth_rates_in_next_20yrs | |
def calculate_intrinsic_value( | |
current_value, growth_rates, num_shares, discount_rate, total_debt, total_cash | |
): | |
total_value = 0 | |
projected_value = current_value | |
projected_discount_rate = 1 | |
for growth_rate in growth_rates: | |
projected_value = projected_value + (projected_value * growth_rate / 100) | |
projected_discount_rate = projected_discount_rate / (1 + discount_rate / 100) | |
total_value += projected_value * projected_discount_rate | |
iv = (total_value - total_debt + total_cash) / num_shares | |
# print( | |
# f"Intrinsic Value: {iv}: (current_value={current_value}, num_shares={num_shares}, total_debt={total_debt}, total_cash={total_cash}, discount_rate={discount_rate}, growth_rates={growth_rates})" | |
# ) | |
return iv | |
def init_session_state(): | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "company_profile" not in st.session_state: | |
st.session_state.company_profile = None | |
if "moat_analysis" not in st.session_state: | |
st.session_state.moat_analysis = None | |
if "income_statements" not in st.session_state: | |
st.session_state.income_statements = None | |
if "balance_sheets" not in st.session_state: | |
st.session_state.balance_sheets = None | |
if "cash_flow_statements" not in st.session_state: | |
st.session_state.cash_flow_statements = None | |
if "financial_ratios" not in st.session_state: | |
st.session_state.financial_ratios = None | |
if "growth_rates" not in st.session_state: | |
st.session_state.growth_rates = None | |
if "daily_prices" not in st.session_state: | |
st.session_state.daily_prices = None | |
if "annual_prices" not in st.session_state: | |
st.session_state.annual_prices = None | |
if "ticker" not in st.session_state: | |
st.session_state.ticker = None | |
if "price_uptrend_passed" not in st.session_state: | |
st.session_state.price_uptrend_passed = False | |
if "revenue_passed" not in st.session_state: | |
st.session_state.revenue_passed = False | |
if "income_passed" not in st.session_state: | |
st.session_state.income_passed = False | |
if "cash_from_operations_passed" not in st.session_state: | |
st.session_state.cash_from_operations_passed = False | |
if "gross_margin_passed" not in st.session_state: | |
st.session_state.gross_margin_passed = False | |
if "net_margin_passed" not in st.session_state: | |
st.session_state.net_margin_passed = False | |
if "growth_rate_passed" not in st.session_state: | |
st.session_state.growth_rate_passed = False | |
if "current_ratio_passed" not in st.session_state: | |
st.session_state.current_ratio_passed = False | |
if "debt_to_ebitda_ratio_passed" not in st.session_state: | |
st.session_state.debt_to_ebitda_ratio_passed = False | |
if "return_on_equity_passed" not in st.session_state: | |
st.session_state.return_on_equity_passed = False | |
if "return_on_invested_capital_passed" not in st.session_state: | |
st.session_state.return_on_invested_capital_passed = False | |
if "return_on_invested_capital_passed" not in st.session_state: | |
st.session_state.return_on_invested_capital_passed = False | |
if "wide_moat_passed" not in st.session_state: | |
st.session_state.wide_moat_passed = False | |
if "revenue_to_accounts_receivables_ratio_passed" not in st.session_state: | |
st.session_state.revenue_to_accounts_receivables_ratio_passed = False | |
if "cash_conversion_cycle_passed" not in st.session_state: | |
st.session_state.cash_conversion_cycle_passed = False | |
def reset_application(): | |
st.session_state.messages = [] | |
st.session_state.company_profile = None | |
st.session_state.moat_analysis = None | |
st.session_state.income_statements = None | |
st.session_state.balance_sheets = None | |
st.session_state.cash_flow_statements = None | |
st.session_state.financial_ratios = None | |
st.session_state.growth_rates = None | |
st.session_state.daily_prices = None | |
st.session_state.annual_prices = None | |
st.session_state.ticker = None | |
st.session_state.price_uptrend_passed = None | |
st.session_state.revenue_passed = False | |
st.session_state.income_passed = False | |
st.session_state.cash_from_operations_passed = False | |
st.session_state.gross_margin_passed = False | |
st.session_state.net_margin_passed = False | |
st.session_state.growth_rate_passed = False | |
st.session_state.current_ratio_passed = False | |
st.session_state.debt_to_ebitda_ratio_passed = False | |
st.session_state.return_on_equity_passed = False | |
st.session_state.return_on_invested_capital_passed = False | |
st.session_state.wide_moat_passed = False | |
st.session_state.revenue_to_accounts_receivables_ratio_passed = False | |
st.session_state.cash_conversion_cycle_passed = False | |
# def remove_outliers(data): | |
# data = data[data > 0] | |
# # Calculate Q1 (25th percentile) and Q3 (75th percentile) | |
# Q1 = np.percentile(data, 25) | |
# Q3 = np.percentile(data, 75) | |
# # Calculate the interquartile range (IQR) | |
# IQR = Q3 - Q1 | |
# # Define the lower and upper bounds for outliers | |
# lower_bound = Q1 - 1.5 * IQR | |
# upper_bound = Q3 + 1.5 * IQR | |
# # Remove outliers | |
# filtered_data = [x for x in data if lower_bound <= x <= upper_bound] | |
# filtered_data.sort() | |
# return filtered_data | |
def remove_outliers(data): | |
data = data[data > 0] | |
std = np.std(data) | |
mean = np.mean(data) | |
filtered_data = [x for x in data if mean - 1 * std <= x <= mean + 1 * std] | |
filtered_data.sort() | |
return filtered_data | |
def get_period_months(year, period): | |
periods = { | |
"Q1": (f"{year}-01", f"{year}-03"), | |
"Q2": (f"{year}-04", f"{year}-06"), | |
"Q3": (f"{year}-07", f"{year}-09"), | |
"Q4": (f"{year}-10", f"{year}-12"), | |
"FY": (f"{year}-01", f"{year}-12"), | |
} | |
return periods[period] | |
def get_exchange_rate(fromCurrency, toCurrency="USD"): | |
""" | |
Fetch the live exchange rate from TWD to USD using Exchange Rates API. | |
:return: Current exchange rate or None if fetch fails | |
""" | |
try: | |
# Free API endpoint (note: may require API key for more frequent requests) | |
url = f"https://open.exchangerate-api.com/v6/latest/{fromCurrency}" | |
response = requests.get(url) | |
data = response.json() | |
# Get USD rate (how many USD per 1 TWD) | |
usd_rate = data["rates"][toCurrency] | |
return usd_rate | |
except Exception as e: | |
print(f"Error fetching exchange rate: {e}") | |
return None | |
def calculate_intrinsic_values( | |
scenarios, | |
years_ago=0, | |
valuation_method=None, | |
): | |
exchange_rate = 1.0 | |
currency = st.session_state.cash_flow_statements.iloc[years_ago]["reportedCurrency"] | |
if currency != "USD": | |
exchange_rate = get_exchange_rate(currency) | |
operating_cash_flow = st.session_state.cash_flow_statements.iloc[years_ago][ | |
"operatingCashFlow" | |
] | |
net_income = st.session_state.income_statements.iloc[years_ago]["netIncome"] | |
free_cash_flow = st.session_state.cash_flow_statements.iloc[years_ago][ | |
"freeCashFlow" | |
] | |
match valuation_method: | |
case "dcfo": | |
current_value = operating_cash_flow | |
case "dfcf": | |
current_value = free_cash_flow | |
case "dni": | |
current_value = net_income | |
case _: | |
current_value, valuation_method = ( | |
(operating_cash_flow, "dcfo") | |
if operating_cash_flow < net_income * 1.5 | |
and operating_cash_flow > net_income | |
else (free_cash_flow, "dfcf") | |
if free_cash_flow > net_income | |
else (net_income, "dni") | |
) | |
outstanding_shares = get_outstanding_shares(st.session_state.ticker) | |
discount_rate = get_discount_rate(get_beta(st.session_state.ticker)) | |
total_debt = st.session_state.balance_sheets.iloc[years_ago]["totalDebt"] | |
total_cash = st.session_state.balance_sheets.iloc[years_ago][ | |
"cashAndShortTermInvestments" | |
] | |
# Calculate intrinsic values using map and lambda | |
intrinsic_values = list( | |
map( | |
lambda params: calculate_intrinsic_value( | |
max(current_value * exchange_rate, 0), | |
generate_growth_rates( | |
params[0], growth_rates_discount_after_5yrs=params[1] | |
), | |
outstanding_shares, | |
discount_rate, | |
total_debt, | |
total_cash, | |
), | |
scenarios, | |
) | |
) | |
intrinsic_values.sort() | |
intrinsic_values.reverse() | |
return valuation_method, intrinsic_values | |
def get_year_month_from_date_string(x, y): | |
if x < 0 or x > len(st.session_state.daily_prices): | |
return "" | |
date = st.session_state.daily_prices.iloc[int(x)]["date"] | |
return f"{date.split('-')[0]}-{date.split('-')[1]}" | |
def display_intrinsic_values(valuation_method, scenarios, ax): | |
iv_values_list = [] | |
wide_range_list = [] | |
narrow_range_list = [] | |
very_narrow_range_list = [] | |
num_years = 5 | |
for i in range(min(len(st.session_state.cash_flow_statements), num_years)): | |
valuation_method, iv_values = calculate_intrinsic_values( | |
scenarios, | |
years_ago=i, | |
valuation_method=valuation_method, | |
) | |
iv_values_list.append(iv_values) | |
wide_range_list.append( | |
[ | |
iv_values[0], | |
iv_values[-1], | |
(num_years - i - 1) * (1 / num_years), | |
(num_years - i) * (1 / num_years), | |
] | |
) | |
# narrow_range_list.append( | |
# [ | |
# iv_values[1], | |
# iv_values[-2], | |
# (num_years - i - 1) * (1 / num_years), | |
# (num_years - i) * (1 / num_years), | |
# ] | |
# ) | |
# very_narrow_range_list.append( | |
# [ | |
# iv_values[2], | |
# iv_values[-3], | |
# (num_years - i - 1) * (1 / num_years), | |
# (num_years - i) * (1 / num_years), | |
# ] | |
# ) | |
wide_range_list.reverse() | |
iv_values_list.reverse() | |
df = st.session_state.daily_prices | |
df.plot(x="date", y="close", kind="line", ax=ax) | |
ax.invert_xaxis() | |
ax.legend().remove() | |
ax.spines["right"].set_visible(False) | |
ax.spines["top"].set_visible(False) | |
ax.axhline( | |
y=statistics.mean([iv_values_list[-1][2], iv_values_list[-1][-3]]), | |
color="g", | |
linestyle="solid", | |
) | |
ax.xaxis.set_major_formatter(tick.FuncFormatter(get_year_month_from_date_string)) | |
ax.set_xlabel("") | |
ax.margins(0.00, tight=True) | |
ax.set_title( | |
f"Price & IV Range ({valuation_method.upper()})", | |
y=0.9, | |
bbox=dict(facecolor="lightblue"), | |
) | |
draw_range( | |
ax, | |
wide_range_list, | |
len(df.index), | |
df.iloc[-1]["close"], | |
color="green", | |
alpha=0.1, | |
) | |
# draw_range( | |
# ax, | |
# narrow_range_list, | |
# len(df.index), | |
# df.iloc[-1]["close"], | |
# color="green", | |
# alpha=0.04, | |
# ) | |
# draw_range( | |
# ax, | |
# very_narrow_range_list, | |
# len(df.index), | |
# df.iloc[-1]["close"], | |
# color="red", | |
# alpha=0.06, | |
# ) | |
bot_message = f"Intrinsic Value Range ({valuation_method.upper()}) is \n" | |
for i, scenario in enumerate(scenarios): | |
bot_message += f"* {scenario[2]}: ${iv_values_list[-1][i]:.2f}\n" | |
return bot_message | |
@st.cache_data(ttl=3600) | |
def load_financial_data(ticker): | |
st.session_state.balance_sheets = get_balance_sheets(ticker) | |
income_statements_annual = get_income_statements(ticker) | |
income_statement_ltm = get_income_statements(ticker, period="ltm") | |
income_statement_ltm.at[0, "date"] = "LTM-{}-{}".format( | |
income_statement_ltm.at[0, "calendarYear"], | |
income_statement_ltm.at[0, "period"], | |
) | |
if income_statement_ltm.at[0, "fillingDate"] != income_statements_annual.at[0, "fillingDate"]: | |
st.session_state.income_statements = pd.concat( | |
[income_statement_ltm, income_statements_annual], ignore_index=True | |
) | |
else: | |
st.session_state.income_statements = income_statements_annual | |
cash_flow_statements_annual = get_cash_flow_statements(ticker) | |
cash_flow_statement_ltm = get_cash_flow_statements(ticker, period="ltm") | |
if len(cash_flow_statement_ltm) > 0: | |
cash_flow_statement_ltm.at[0, "date"] = "LTM-{}-{}".format( | |
cash_flow_statement_ltm.at[0, "calendarYear"], | |
cash_flow_statement_ltm.at[0, "period"], | |
) | |
if cash_flow_statement_ltm.at[0, "fillingDate"] != cash_flow_statements_annual.at[0, "fillingDate"]: | |
st.session_state.cash_flow_statements = pd.concat( | |
[cash_flow_statement_ltm, cash_flow_statements_annual], ignore_index=True | |
) | |
else: | |
st.session_state.cash_flow_statements = cash_flow_statements_annual | |
financial_ratios_annual = get_financial_ratios(ticker) | |
financial_ratio_quarterly = get_financial_ratios(ticker, period="quarterly") | |
financial_ratio_ltm = get_financial_ratios(ticker, period="ltm") | |
financial_ratio_ltm.at[0, "date"] = "LTM-{}-{}".format( | |
financial_ratio_quarterly.at[0, "calendarYear"], | |
financial_ratio_quarterly.at[0, "period"], | |
) | |
print(financial_ratio_ltm) | |
print(financial_ratios_annual) | |
if financial_ratio_ltm.at[0, "currentRatio"] != financial_ratios_annual.at[0, "currentRatio"]: | |
st.session_state.financial_ratios = pd.concat( | |
[financial_ratio_ltm, financial_ratios_annual], ignore_index=True | |
) | |
else: | |
st.session_state.financial_ratios = financial_ratios_annual | |
st.session_state.growth_rates = get_growth_rates(ticker) | |
st.session_state.daily_prices = get_stock_prices(ticker) | |
st.session_state.annual_prices = get_stock_prices(ticker, period="annual", limit=20) | |
st.session_state.annual_prices.drop(0, inplace=True) | |
def create_scenarios(growth_rates_df): | |
growth_rates = remove_outliers(growth_rates_df) | |
min_growth_rate = growth_rates[0] | |
max_growth_rate = growth_rates[-1] | |
avg_growth_rate = statistics.mean(growth_rates) | |
if (max_growth_rate - min_growth_rate) / max_growth_rate < 0.1: | |
max_growth_rate = avg_growth_rate * 1.05 | |
min_growth_rate = avg_growth_rate * 0.95 | |
discount_factor = 0.005 | |
return [ | |
# (max_growth_rate, optimistic_5_10_yrs_growth_rate_discount, "Very Optimistic"), | |
( | |
max_growth_rate, | |
1.0 - (max(12, max_growth_rate) - 12) * discount_factor, | |
"Optimistic", | |
), | |
# ( | |
# avg_growth_rate, | |
# optimistic_5_10_yrs_growth_rate_discount, | |
# "Optimistic Average", | |
# ), | |
( | |
avg_growth_rate, | |
1.0 - (max(12, avg_growth_rate) - 12) * discount_factor, | |
"Neutral", | |
), | |
# (min_growth_rate, optimistic_5_10_yrs_growth_rate_discount, "Pessimistic"), | |
( | |
min_growth_rate, | |
1.0 - (max(12, min_growth_rate) - 12) * discount_factor, | |
"Pessimistic", | |
), | |
] | |
def print_bot_message(bot_message, bot_figure): | |
with st.chat_message("assistant"): | |
st.markdown(bot_message) | |
if bot_figure is not None: | |
st.pyplot(bot_figure) | |
st.session_state.messages.append( | |
{"role": "assistant", "content": (bot_message, bot_figure)} | |
) | |
def display_growth_rates_and_iv_value( | |
valuation_method=None, user_defined_growth_rate=None | |
): | |
fig = plt.figure(figsize=(10, 15)) | |
gs = fig.add_gridspec(2, 2, width_ratios=[1, 1], height_ratios=[1, 2]) | |
ax1 = fig.add_subplot(gs[0, 0]) | |
ax2 = fig.add_subplot(gs[0, 1]) | |
ax3 = fig.add_subplot(gs[1, :]) | |
if user_defined_growth_rate is None and len(st.session_state.growth_rates) == 0: | |
return ( | |
"No growth rates is provided. Please provide valuation method:growth rate. (e.g. DNI:0.15)", | |
None, | |
) | |
if user_defined_growth_rate is not None: | |
growth_rates_df = pd.DataFrame( | |
[{"source": "Custom", "growthRate": user_defined_growth_rate}] | |
) | |
else: | |
growth_rates_df = st.session_state.growth_rates | |
growth_rates_df.plot(x="source", y="growthRate", kind="bar", ax=ax1) | |
ax1.invert_xaxis() | |
ax1.legend().remove() | |
ax1.spines["right"].set_visible(False) | |
ax1.spines["top"].set_visible(False) | |
ax1.set_xlabel("") | |
ax1.set_title( | |
"Est. Growth Rates (3-5 Years)", y=0.9, bbox=dict(facecolor="lightblue") | |
) | |
calculate_wacc(st.session_state.ticker) | |
scenarios = create_scenarios(growth_rates_df["growthRate"]) | |
legend_labels = [] | |
all_growth_rates = [] | |
for scenario in scenarios: | |
grow_rates = generate_growth_rates( | |
scenario[0], | |
growth_rates_discount_after_5yrs=scenario[1], | |
) | |
all_growth_rates.extend(grow_rates) | |
df = pd.DataFrame( | |
{ | |
"year": list(range(1, len(grow_rates) + 1)), | |
"growthRate": grow_rates, | |
} | |
) | |
df.plot( | |
x="year", y="growthRate", kind="line", linestyle="solid", marker=".", ax=ax2 | |
) | |
legend_labels.append(scenario[2]) | |
ax2.legend(legend_labels, fontsize=9, loc="lower left") | |
ax2.set_xlim((0, 12)) | |
ax2.set_ylim((-10, max(all_growth_rates) + 5)) | |
ax2.set_xlabel("") | |
ax2.set_title("Growth Rates", y=0.9, bbox=dict(facecolor="lightblue")) | |
bot_message = display_intrinsic_values(valuation_method, scenarios, ax3) | |
bot_figure = plt.gcf() | |
return bot_message, bot_figure | |
st.title("Intrinsic Value Calculator") | |
st.error("This calculator may not be working after 2025-02-05 due to API discontinuation.") | |
init_session_state() | |
with st.form("Intrinsic Value Calculator"): | |
st.session_state.ticker = st.text_input("Enter ticker").strip() | |
st.form_submit_button("Submit") | |
if st.session_state.ticker: | |
ticker = st.session_state.ticker | |
company_profile = get_company_profile(ticker) | |
st.subheader("Company Profile") | |
st.write(company_profile) | |
moat_analysis = get_moat_analysis( | |
ticker, company_profile | |
) | |
st.subheader("Moat Analysis") | |
st.write(moat_analysis) | |
load_financial_data(ticker) | |
fig, axes = plt.subplots( | |
nrows=2, ncols=2, constrained_layout=False, figsize=(10, 10) | |
) | |
add_subplot( | |
st.session_state.annual_prices, "close", axes[0, 0], title="Annual Price" | |
) | |
add_subplot( | |
st.session_state.income_statements, "revenue", axes[0, 1], title="Revenue" | |
) | |
add_subplot( | |
st.session_state.income_statements, | |
"netIncome", | |
axes[1, 0], | |
title="Net Income", | |
) | |
add_subplot( | |
st.session_state.income_statements, | |
"operatingIncome", | |
axes[1, 1], | |
title="Operating Income", | |
) | |
st.write("#### Profitability") | |
st.write(fig) | |
fig, axes = plt.subplots( | |
nrows=2, ncols=2, constrained_layout=False, figsize=(10, 10) | |
) | |
add_subplot( | |
st.session_state.cash_flow_statements, | |
"operatingCashFlow", | |
axes[0, 0], | |
title="Cash Flow from Operation", | |
) | |
add_subplot( | |
st.session_state.cash_flow_statements, | |
"freeCashFlow", | |
axes[0, 1], | |
title="Free Cash Flow", | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"grossProfitMargin", | |
axes[1, 0], | |
title="Gross Margin", | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"netProfitMargin", | |
axes[1, 1], | |
title="Profit Margin", | |
) | |
st.write("#### Cash Flows & Margins") | |
st.write(fig) | |
fig, axes = plt.subplots( | |
nrows=2, ncols=2, constrained_layout=False, figsize=(10, 10) | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"returnOnEquity", | |
axes[0, 0], | |
title="RoE", | |
cut_line_value=0.12, | |
pass_if_over_cut_line=not ( | |
st.session_state.financial_ratios.iloc[0]["returnOnEquity"] < 0 | |
and st.session_state.income_statements.iloc[0]["netIncome"] > 0 | |
), | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"returnOnInvestedCapital", | |
axes[0, 1], | |
title="RoIC", | |
cut_line_value=0.12, | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"revenueToAccountsReceivablesRatio", | |
axes[1, 0], | |
title="Rev/AR", | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"cashConversionCycle", | |
axes[1, 1], | |
title="CCC", | |
) | |
st.write("#### Management Efficiency") | |
st.write(fig) | |
fig, axes = plt.subplots( | |
nrows=2, ncols=2, constrained_layout=False, figsize=(10, 10) | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"currentRatio", | |
axes[0, 0], | |
title="Current Ratio", | |
cut_line_value=1.0, | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"debtToEBITDARatio", | |
axes[0, 1], | |
title="D/EBITDA", | |
cut_line_value=3.0, | |
pass_if_over_cut_line=False, | |
) | |
add_subplot( | |
st.session_state.financial_ratios, | |
"debtServicingRatio", | |
axes[1, 0], | |
title="DSR", | |
cut_line_value=0.3, | |
pass_if_over_cut_line=False, | |
) | |
add_subplot( | |
st.session_state.balance_sheets, "totalDebt", axes[1, 1], title="Total Debt" | |
) | |
st.write("#### Debt Structure") | |
st.write(fig) | |
message, fig = display_growth_rates_and_iv_value() | |
st.write("#### Default Intrinsic Value Calculation") | |
st.write(message) | |
st.write(fig) | |
with st.form("Custom Intrinsic Value Calculation"): | |
st.write("#### Custom Intrinsic Value Calculation") | |
valuation_method = st.selectbox("Valuation Method", ["DCFO", "DFCF", "DNI"]) | |
next_5_years_growth_rate = st.slider("Enter next 5 years growth rate (%)", 0.0, 50.0, 10.0, step=0.1) | |
if st.form_submit_button("Submit"): | |
message, fig = display_growth_rates_and_iv_value( | |
valuation_method=valuation_method.lower(), | |
user_defined_growth_rate=next_5_years_growth_rate, | |
) | |
st.write(message) | |
st.write(fig) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment