-
-
Save oNddleo/c153f5c9c784cf9860d38398217f0750 to your computer and use it in GitHub Desktop.
SELECT symbol, close, date | |
FROM public.historical_data | |
WHERE symbol IN ({placeholders}) | |
AND date BETWEEN %s::date AND %s::date | |
AND close IS NOT NULL | |
ORDER BY symbol, date |
import asyncio
import asyncpg
from typing import List, Dict, Any, Optional
from collections import defaultdict
import logging
Synchronous optimized version
def get_historical_data_for_symbols(symbols_data: list, start_time: str, end_time: str):
“””
Retrieve historical close prices for a list of tickers within a specified date range.
Optimized for better PostgreSQL performance.
Args:
symbols_data: List of dictionaries with 'symbol' and 'exchange' keys
Example: [{'symbol': 'BHP', 'exchange': 'AX'}, {'symbol': 'CBA', 'exchange': 'AX'}]
start_time: Start date in format 'YYYY-MM-DD'
end_time: End date in format 'YYYY-MM-DD'
Returns:
Dictionary mapping tickers to their historical price data (symbol, close, date)
"""
conn = None
try:
# Format the symbols to include exchange information
formatted_tickers = []
for item in symbols_data:
symbol_exchange = item.get('symbol')
if not symbol_exchange:
continue
parts = symbol_exchange.split(".")
if len(parts) >= 2:
symbol = parts[0]
exchange = parts[1]
formatted_symbol = format_symbol(symbol, exchange)
formatted_tickers.append(formatted_symbol)
print(f"[>>] Fetching historical data for {len(formatted_tickers)} tickers from {start_time} to {end_time}")
# Return empty dict if no tickers provided
if not formatted_tickers:
print("No valid tickers provided")
return {}
# Get database connection from pool
conn = conn_pool_fundamentals.getconn()
cursor = conn.cursor()
# OPTIMIZATION 1: Use prepared statement for repeated queries
prepare_stmt = """
PREPARE historical_query (text[], date, date) AS
SELECT symbol, close, date
FROM public.historical_data
WHERE symbol = ANY($1)
AND date BETWEEN $2 AND $3
AND close IS NOT NULL
ORDER BY symbol, date
"""
cursor.execute(prepare_stmt)
# OPTIMIZATION 2: Process in batches to avoid parameter limits
batch_size = 100
all_results = []
for i in range(0, len(formatted_tickers), batch_size):
batch_tickers = formatted_tickers[i:i + batch_size]
# Execute prepared statement
cursor.execute(
"EXECUTE historical_query (%s, %s, %s)",
(batch_tickers, start_time, end_time)
)
batch_results = cursor.fetchall()
all_results.extend(batch_results)
print(f"[<<] Batch {i//batch_size + 1}: {len(batch_results)} rows")
print(f"[<<] Total query returned {len(all_results)} rows")
cursor.close()
# OPTIMIZATION 3: Use defaultdict for faster dictionary operations
ticker_data = defaultdict(list)
for row in all_results:
ticker_data[row[0]].append(row)
# Convert back to regular dict
ticker_data = dict(ticker_data)
# If no data was found, print a warning
if not ticker_data:
print("WARNING: No data found for any of the specified tickers in the given date range")
return ticker_data
except Exception as e:
print(f"Error fetching historical data: {e}")
import traceback
traceback.print_exc()
return {}
finally:
if conn:
# OPTIMIZATION 4: Deallocate prepared statement
try:
cursor = conn.cursor()
cursor.execute("DEALLOCATE historical_query")
cursor.close()
except:
pass
conn_pool_fundamentals.putconn(conn)
Async version for even better performance
async def get_historical_data_async(symbols_data: list, start_time: str, end_time: str,
connection_string: str):
“””
Async version with connection pooling for maximum performance.
“””
# Format symbols
formatted_tickers = []
for item in symbols_data:
symbol_exchange = item.get('symbol', '')
if not symbol_exchange:
continue
parts = symbol_exchange.split(".")
if len(parts) >= 2:
symbol = parts[0]
exchange = parts[1]
formatted_symbol = format_symbol(symbol, exchange)
formatted_tickers.append(formatted_symbol)
if not formatted_tickers:
return {}
# Create connection pool
pool = await asyncpg.create_pool(
connection_string,
min_size=5,
max_size=20,
command_timeout=60
)
try:
# Process in parallel batches
batch_size = 100
tasks = []
for i in range(0, len(formatted_tickers), batch_size):
batch_tickers = formatted_tickers[i:i + batch_size]
task = fetch_batch_async(pool, batch_tickers, start_time, end_time)
tasks.append(task)
# Execute all batches concurrently
batch_results = await asyncio.gather(*tasks)
# Combine results
all_results = []
for batch in batch_results:
all_results.extend(batch)
# Organize results
ticker_data = defaultdict(list)
for row in all_results:
ticker_data[row['symbol']].append((row['symbol'], row['close'], row['date']))
return dict(ticker_data)
finally:
await pool.close()
async def fetch_batch_async(pool, tickers: List[str], start_time: str, end_time: str):
“”“Helper function to fetch a batch of tickers asynchronously.”””
query = """
SELECT symbol, close, date
FROM public.historical_data
WHERE symbol = ANY($1::text[])
AND date BETWEEN $2::date AND $3::date
AND close IS NOT NULL
ORDER BY symbol, date
"""
async with pool.acquire() as conn:
return await conn.fetch(query, tickers, start_time, end_time)
Cursor-based pagination version for very large datasets
def get_historical_data_paginated(symbols_data: list, start_time: str, end_time: str,
page_size: int = 10000):
“””
Version with cursor-based pagination for handling very large result sets.
“””
conn = None
try:
# Format symbols (same as before)
formatted_tickers = []
for item in symbols_data:
symbol_exchange = item.get('symbol')
if not symbol_exchange:
continue
parts = symbol_exchange.split(".")
if len(parts) >= 2:
symbol = parts[0]
exchange = parts[1]
formatted_symbol = format_symbol(symbol, exchange)
formatted_tickers.append(formatted_symbol)
if not formatted_tickers:
return {}
conn = conn_pool_fundamentals.getconn()
cursor = conn.cursor()
# Use server-side cursor for large datasets
cursor_name = "historical_data_cursor"
cursor.execute(f"""
DECLARE {cursor_name} CURSOR FOR
SELECT symbol, close, date
FROM public.historical_data
WHERE symbol = ANY(%s::text[])
AND date BETWEEN %s::date AND %s::date
AND close IS NOT NULL
ORDER BY symbol, date
""", (formatted_tickers, start_time, end_time))
ticker_data = defaultdict(list)
total_rows = 0
while True:
cursor.execute(f"FETCH {page_size} FROM {cursor_name}")
rows = cursor.fetchall()
if not rows:
break
total_rows += len(rows)
print(f"[<<] Fetched {len(rows)} rows (total: {total_rows})")
for row in rows:
ticker_data[row[0]].append(row)
cursor.execute(f"CLOSE {cursor_name}")
cursor.close()
return dict(ticker_data)
except Exception as e:
print(f"Error fetching historical data: {e}")
import traceback
traceback.print_exc()
return {}
finally:
if conn:
conn_pool_fundamentals.putconn(conn)
def get_historical_data_for_symbols(symbols_data: list, start_time: str, end_time: str):
"""
Retrieve historical close prices for a list of tickers within a specified date range.