Skip to content

Instantly share code, notes, and snippets.

@dewaldabrie
Last active June 13, 2021 09:57
Show Gist options
  • Save dewaldabrie/1f7fda36a3442315dbfc1079b8fdad7e to your computer and use it in GitHub Desktop.
Save dewaldabrie/1f7fda36a3442315dbfc1079b8fdad7e to your computer and use it in GitHub Desktop.
"""
Find list of ASX ETF symbols at asxetfs.com
"""
import os
from coroflow import Pipeline, Node
import time
import logging
import pickle
import csv
import pandas as pd
import yfinance as yf
logging.basicConfig(level=logging.INFO)
class ReadSymbolsNode(Node):
symbol_csv_path = '20200601-etfs.csv'
def execute(self, inp):
with open(self.symbol_csv_path, 'r') as fh:
reader = csv.reader(fh)
# skip both header lines
next(reader)
next(reader)
for row in reader:
symbol = row[0] + '.AX'
yield symbol
class Sym2Data(Node):
def execute(self, inp):
try:
sym = yf.Ticker(inp)
hist = sym.history(period="max")
if hist.empty:
return
return inp, hist
except Exception as e:
logging.exception(str(e))
class DataAgg(Node):
def __init__(self, *args, **kwargs):
self.agg_data = {}
super().__init__(*args, **kwargs)
async def setup(self):
if os.path.isfile('data.pkl'):
with open("data.pkl", 'rb') as fh:
self.agg_data = pickle.load(fh)
context = {'agg_data': self.agg_data}
return context
async def execute(self, inpt, context=None):
agg_data = context['agg_data']
sym, data = inpt
if sym in agg_data:
agg_data[sym] = pd.concat([agg_data[sym], data]).drop_duplicates().reset_index(drop=True)
else:
agg_data[sym] = data
async def teardown(self, context=None):
with open("data.pkl", 'wb') as fh:
pickle.dump(context['agg_data'], fh)
def main():
# Construct pipeline
p = Pipeline()
etf_sym_reader = ReadSymbolsNode('read_etf_syms', p)
data_fetcher = Sym2Data('data_from_yahoo', p)
data_agg = DataAgg('data_agg', p)
etf_sym_reader.set_downstream(data_fetcher)
data_fetcher.set_downstream(data_agg)
# Lite it up
start_time = time.time()
p.run(show_queues=True)
logging.debug(f"Asynchronous duration: {time.time() - start_time}s.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment