Skip to content

Instantly share code, notes, and snippets.

@LaurentiuGabriel
Created April 23, 2026 19:12
Show Gist options
  • Select an option

  • Save LaurentiuGabriel/18f132282ade21de2ecc4a5946605dcf to your computer and use it in GitHub Desktop.

Select an option

Save LaurentiuGabriel/18f132282ade21de2ecc4a5946605dcf to your computer and use it in GitHub Desktop.
ML XGBoost-enabled trading script
"""
ML-Enhanced Multi-Factor Crypto Trading System
===============================================
Extends the multi-factor institutional trading engine with two ML models:
LSTM (PyTorch) - Sequential pattern recognition on 20-day feature windows.
Captures temporal dependencies that rule-based indicators miss.
XGBoost - Gradient-boosted tree classifier on 30+ engineered features.
Excels at non-linear feature interactions and regime shifts.
Both models output a probability of next-day upward move. These are converted
into factor signals and blended with the 6 existing factors (technical,
microstructure, sentiment, volume, regime, anomaly) through the same
confidence-weighted aggregation engine.
Models are trained on historical daily bars, persisted to disk, and retrained
on a configurable schedule.
Usage:
python alpaca_ml_trader.py --dry-run --verbose # test run
python alpaca_ml_trader.py --continuous --interval 3600 # hourly loop
python alpaca_ml_trader.py --retrain # force retrain now
"""
from __future__ import annotations
import argparse
import json
import logging
import math
import pickle
import sys
import time as time_module
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from xgboost import XGBClassifier
from alpaca.data.historical import CryptoHistoricalDataClient
from alpaca.data.historical.news import NewsClient
from alpaca.data.requests import (
CryptoBarsRequest,
CryptoLatestOrderbookRequest,
CryptoLatestQuoteRequest,
CryptoSnapshotRequest,
CryptoTradesRequest,
NewsRequest,
)
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from alpaca.trading.client import TradingClient
from alpaca.trading.enums import OrderSide, TimeInForce
from alpaca.trading.requests import LimitOrderRequest, MarketOrderRequest
# ═══════════════════════════════════════════════════════════════════════════
# LOGGING
# ═══════════════════════════════════════════════════════════════════════════
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("ml_trader.log", mode="a"),
],
)
log = logging.getLogger(__name__)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════
SYMBOLS = ["BTC/USD", "ETH/USD"]
LOOKBACK_DAYS = 200 # extra history for ML training
INTRADAY_LOOKBACK_HOURS = 24
SEQ_LEN = 20 # LSTM lookback window
# --- Technical indicator params ------------------------------------------
EMA_FAST, EMA_SLOW = 19, 60
MACD_FAST, MACD_SLOW, MACD_SIG = 18, 22, 12
SMA_FAST, SMA_SLOW = 15, 32
RSI_PERIOD, RSI_LOWER, RSI_UPPER = 10, 20, 93
ATR_PERIOD, ATR_MULTIPLIER = 14, 2.5
# --- Risk parameters -----------------------------------------------------
MAX_EXPOSURE_PER_ASSET = 0.05
MAX_PORTFOLIO_EXPOSURE = 0.10
DRAWDOWN_CIRCUIT_BREAKER = -0.03
KELLY_FRACTION_CAP = 0.25
MIN_SIGNAL_CONFIDENCE = 0.30
# --- Factor weights (sum to 1.0) -----------------------------------------
FACTOR_WEIGHTS = {
"technical": 0.15,
"microstructure": 0.12,
"sentiment": 0.08,
"volume": 0.10,
"regime": 0.10,
"anomaly": 0.05,
"ml_lstm": 0.20,
"ml_xgboost": 0.20,
}
# --- ML training config --------------------------------------------------
ML_TRAIN_DAYS = 180 # days of data used for training
ML_RETRAIN_INTERVAL_HOURS = 24 # retrain every 24h
ML_MIN_SAMPLES = 100 # minimum training samples required
MODEL_DIR = Path(__file__).parent / "models"
# --- Sentiment keywords ---------------------------------------------------
BULLISH_KEYWORDS = [
"surge", "rally", "breakout", "bullish", "upgrade", "adoption",
"approval", "etf approved", "institutional", "accumulation",
"all-time high", "ath", "partnership", "integration", "growth",
"inflow", "buying", "support", "recovery", "momentum",
]
BEARISH_KEYWORDS = [
"crash", "plunge", "bearish", "hack", "exploit", "ban",
"regulation", "crackdown", "sell-off", "selloff", "liquidation",
"outflow", "fraud", "investigation", "subpoena", "delisting",
"downgrade", "collapse", "default", "contagion", "bankruptcy",
]
# ═══════════════════════════════════════════════════════════════════════════
# DATA CLASSES
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class FactorSignal:
name: str
score: float
confidence: float
details: dict = field(default_factory=dict)
@dataclass
class TradeDecision:
symbol: str
direction: str
conviction: float
target_weight: float
factors: List[FactorSignal] = field(default_factory=list)
anomalies: List[str] = field(default_factory=list)
regime: str = "unknown"
# ═══════════════════════════════════════════════════════════════════════════
# CREDENTIAL LOADING
# ═══════════════════════════════════════════════════════════════════════════
def load_credentials(path: str = "alpaca_creds") -> dict:
creds = {}
cred_path = Path(path)
if not cred_path.exists():
cred_path = Path(__file__).parent / path
if not cred_path.exists():
raise FileNotFoundError(f"Credential file not found: {path}")
with open(cred_path, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
creds[key.strip()] = value.strip()
for r in ["alpaca_key", "secret", "endpoint"]:
if r not in creds:
raise ValueError(f"Missing '{r}' in credential file")
return creds
# ═══════════════════════════════════════════════════════════════════════════
# TECHNICAL INDICATORS (pure pandas)
# ═══════════════════════════════════════════════════════════════════════════
def _ema(s: pd.Series, period: int) -> pd.Series:
return s.ewm(span=period, adjust=False).mean()
def _sma(s: pd.Series, period: int) -> pd.Series:
return s.rolling(window=period).mean()
def _rsi(s: pd.Series, period: int) -> pd.Series:
delta = s.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def _macd(s: pd.Series, fast: int, slow: int, sig: int):
ml = _ema(s, fast) - _ema(s, slow)
sl = _ema(ml, sig)
return ml, sl, ml - sl
def _atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
prev = close.shift(1)
tr = pd.concat([high - low, (high - prev).abs(), (low - prev).abs()], axis=1).max(axis=1)
return tr.rolling(window=period).mean()
def _stochastic(high: pd.Series, low: pd.Series, close: pd.Series,
k_period: int = 14, d_period: int = 3) -> Tuple[pd.Series, pd.Series]:
lowest = low.rolling(k_period).min()
highest = high.rolling(k_period).max()
k = 100 * (close - lowest) / (highest - lowest + 1e-10)
d = k.rolling(d_period).mean()
return k, d
def _bollinger_position(close: pd.Series, period: int = 20) -> pd.Series:
mid = _sma(close, period)
std = close.rolling(period).std()
return (close - mid) / (std + 1e-10)
def _hurst_exponent(series: pd.Series, max_lag: int = 20) -> float:
ts = series.dropna().values
if len(ts) < max_lag * 2:
return 0.5
lags = range(2, max_lag + 1)
rs_values = []
for lag in lags:
rs_lag = []
for start in range(0, len(ts) - lag, lag):
chunk = ts[start : start + lag]
mean_chunk = np.mean(chunk)
deviate = np.cumsum(chunk - mean_chunk)
r = np.max(deviate) - np.min(deviate)
s = np.std(chunk, ddof=1)
if s > 0:
rs_lag.append(r / s)
if rs_lag:
rs_values.append((np.log(lag), np.log(np.mean(rs_lag))))
if len(rs_values) < 3:
return 0.5
x = np.array([v[0] for v in rs_values])
y = np.array([v[1] for v in rs_values])
slope, _ = np.polyfit(x, y, 1)
return float(np.clip(slope, 0.0, 1.0))
# ═══════════════════════════════════════════════════════════════════════════
# FEATURE ENGINEERING (30+ features for ML models)
# ═══════════════════════════════════════════════════════════════════════════
def build_feature_matrix(daily: pd.DataFrame) -> pd.DataFrame:
"""
Build a feature DataFrame from daily OHLCV bars.
Mirrors the feature sets from crypto-strat-lstm.py and crypto-strat-ml-v1.py
but implemented in pure pandas.
Returns DataFrame with columns = feature names, index = timestamps.
"""
close = daily["close"]
high = daily["high"]
low = daily["low"]
volume = daily["volume"]
returns = close.pct_change()
feats = {}
# --- Trend: EMA distances (normalized by price) ---
for p in [10, 19, 30, 60]:
ema_val = _ema(close, p)
feats[f"ema_dist_{p}"] = (close - ema_val) / (close + 1e-10)
# --- Trend: SMA distances ---
for p in [15, 32, 50]:
sma_val = _sma(close, p)
feats[f"sma_dist_{p}"] = (close - sma_val) / (close + 1e-10)
# --- Momentum: Rate of change ---
for p in [3, 5, 10, 20]:
feats[f"roc_{p}"] = close.pct_change(p)
# --- Momentum: RSI (normalized to -1..+1) ---
for p in [7, 10, 14, 21]:
rsi_val = _rsi(close, p)
feats[f"rsi_norm_{p}"] = (rsi_val - 50) / 50
# --- MACD histogram (normalized) ---
macd_line, macd_sig_line, macd_hist = _macd(close, MACD_FAST, MACD_SLOW, MACD_SIG)
feats["macd_hist_norm"] = macd_hist / (close + 1e-10)
feats["macd_cross"] = (macd_line > macd_sig_line).astype(float)
# --- Volatility: ATR percentage ---
atr_val = _atr(high, low, close, ATR_PERIOD)
feats["atr_pct"] = atr_val / (close + 1e-10)
# --- Volatility: 20-day realized vol ---
feats["volatility_20"] = returns.rolling(20).std()
# --- Bollinger band position ---
feats["bb_position"] = _bollinger_position(close, 20)
# --- Volume features ---
vol_sma5 = _sma(volume, 5)
vol_sma20 = _sma(volume, 20)
feats["vol_ratio_5_20"] = vol_sma5 / (vol_sma20 + 1e-10)
# --- Stochastic oscillator ---
stoch_k, stoch_d = _stochastic(high, low, close, 14, 3)
feats["stoch_k_norm"] = (stoch_k - 50) / 50
feats["stoch_cross"] = (stoch_k > stoch_d).astype(float)
# --- SMA trend binary ---
feats["sma_trend_15_32"] = (_sma(close, 15) > _sma(close, 32)).astype(float)
# --- EMA trend binary ---
feats["ema_trend_19_60"] = (_ema(close, 19) > _ema(close, 60)).astype(float)
# --- Rule-based signal (the proven Quantiacs baseline as a feature) ---
rsi10 = _rsi(close, 10)
atr_s = _atr(high, low, close, ATR_PERIOD)
atr_median = atr_s.rolling(ATR_PERIOD * 2).median()
rule = (
(feats["ema_trend_19_60"] == 1) &
(feats["sma_trend_15_32"] == 1) &
(feats["macd_cross"] == 1) &
(rsi10 > RSI_LOWER) & (rsi10 < RSI_UPPER) &
(atr_s <= atr_median * ATR_MULTIPLIER)
).astype(float)
feats["rule_signal"] = rule
# --- On-balance volume trend ---
sign = returns.apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
obv = (volume * sign).cumsum()
feats["obv_ema_ratio"] = _ema(obv, 10) / (_ema(obv, 30).abs() + 1e-10)
# --- Return-based features ---
feats["ret_1d"] = returns
feats["ret_5d"] = close.pct_change(5)
feats["ret_10d"] = close.pct_change(10)
# --- High-low range normalized ---
feats["hl_range"] = (high - low) / (close + 1e-10)
df = pd.DataFrame(feats, index=daily.index)
df = df.ffill().fillna(0)
df = df.replace([np.inf, -np.inf], 0)
return df
def build_target(daily: pd.DataFrame) -> pd.Series:
"""Binary target: will price be higher tomorrow?"""
close = daily["close"]
future = close.shift(-1)
return (future > close).astype(float)
# ═══════════════════════════════════════════════════════════════════════════
# LSTM MODEL
# ═══════════════════════════════════════════════════════════════════════════
class CryptoLSTM(nn.Module):
def __init__(self, input_size: int, hidden_size: int = 64,
num_layers: int = 2, dropout: float = 0.3):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
)
self.fc = nn.Sequential(
nn.Linear(hidden_size, 32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(32, 1),
nn.Sigmoid(),
)
def forward(self, x):
lstm_out, _ = self.lstm(x)
last_hidden = lstm_out[:, -1, :]
return self.fc(last_hidden).squeeze(-1)
# ═══════════════════════════════════════════════════════════════════════════
# ML MODEL MANAGER (train, predict, save, load)
# ═══════════════════════════════════════════════════════════════════════════
class MLModelManager:
"""Manages LSTM and XGBoost models: training, persistence, inference."""
def __init__(self):
MODEL_DIR.mkdir(parents=True, exist_ok=True)
self.lstm_models: Dict[str, dict] = {} # symbol -> {model, feat_mean, feat_std}
self.xgb_models: Dict[str, XGBClassifier] = {} # symbol -> model
self.last_train_time: Optional[datetime] = None
self._load_train_timestamp()
# --- Persistence helpers -----------------------------------------------
def _meta_path(self) -> Path:
return MODEL_DIR / "train_meta.json"
def _load_train_timestamp(self):
mp = self._meta_path()
if mp.exists():
try:
meta = json.loads(mp.read_text())
self.last_train_time = datetime.fromisoformat(meta["last_train"])
log.info(" ML models last trained: %s", self.last_train_time.isoformat())
except Exception:
self.last_train_time = None
def _save_train_timestamp(self):
self.last_train_time = datetime.now(timezone.utc)
self._meta_path().write_text(json.dumps({
"last_train": self.last_train_time.isoformat()
}))
def needs_retrain(self) -> bool:
if self.last_train_time is None:
return True
elapsed = (datetime.now(timezone.utc) - self.last_train_time).total_seconds() / 3600
return elapsed >= ML_RETRAIN_INTERVAL_HOURS
# --- LSTM training -----------------------------------------------------
def train_lstm(self, symbol: str, features_df: pd.DataFrame, target: pd.Series):
"""Train LSTM on sequences of features."""
tag = symbol.replace("/", "_")
# Align features and target
common = features_df.index.intersection(target.dropna().index)
feat_arr = features_df.loc[common].values.astype(np.float32)
tgt_arr = target.loc[common].values.astype(np.float32)
# Build sequences
n = len(feat_arr)
if n <= SEQ_LEN + 10:
log.warning(" LSTM: not enough data for %s (%d rows). Skipping.", symbol, n)
return
X_list, y_list = [], []
for i in range(SEQ_LEN, n):
X_list.append(feat_arr[i - SEQ_LEN : i])
y_list.append(tgt_arr[i])
X = np.array(X_list, dtype=np.float32)
y = np.array(y_list, dtype=np.float32)
# Remove NaN/inf
mask = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y)
X, y = X[mask], y[mask]
if len(y) < ML_MIN_SAMPLES:
log.warning(" LSTM: only %d valid samples for %s. Skipping.", len(y), symbol)
return
# Normalize per-feature
n_feat = X.shape[2]
feat_mean = np.zeros(n_feat, dtype=np.float32)
feat_std = np.ones(n_feat, dtype=np.float32)
for i in range(n_feat):
vals = X[:, :, i].flatten()
vals = vals[np.isfinite(vals)]
if len(vals) > 0:
feat_mean[i] = vals.mean()
feat_std[i] = vals.std() + 1e-8
X = (X - feat_mean[None, None, :]) / feat_std[None, None, :]
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
# Train
input_size = X.shape[2]
model = CryptoLSTM(input_size=input_size, hidden_size=64,
num_layers=2, dropout=0.3).to(DEVICE)
X_tensor = torch.tensor(X).to(DEVICE)
y_tensor = torch.tensor(y).to(DEVICE)
dataset = TensorDataset(X_tensor, y_tensor)
loader = DataLoader(dataset, batch_size=256, shuffle=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
criterion = nn.BCELoss()
model.train()
for epoch in range(30):
total_loss = 0
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
model.eval()
avg_loss = total_loss / max(len(loader), 1)
log.info(" LSTM trained for %s: %d samples, final loss=%.4f", symbol, len(y), avg_loss)
self.lstm_models[symbol] = {
"model": model,
"feat_mean": feat_mean,
"feat_std": feat_std,
}
# Save to disk
torch.save({
"state_dict": model.state_dict(),
"input_size": input_size,
"feat_mean": feat_mean,
"feat_std": feat_std,
}, MODEL_DIR / f"lstm_{tag}.pt")
# --- XGBoost training --------------------------------------------------
def train_xgboost(self, symbol: str, features_df: pd.DataFrame, target: pd.Series):
"""Train XGBoost binary classifier."""
tag = symbol.replace("/", "_")
common = features_df.index.intersection(target.dropna().index)
X = features_df.loc[common].values.astype(np.float32)
y = target.loc[common].values.astype(np.float32)
mask = np.isfinite(X).all(axis=1) & np.isfinite(y)
X, y = X[mask], y[mask]
if len(y) < ML_MIN_SAMPLES or y.sum() < 10 or (len(y) - y.sum()) < 10:
log.warning(" XGBoost: insufficient balanced data for %s (%d). Skipping.", symbol, len(y))
return
model = XGBClassifier(
n_estimators=200,
max_depth=4,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
min_child_weight=5,
reg_alpha=0.1,
reg_lambda=1.0,
random_state=42,
eval_metric="logloss",
verbosity=0,
)
model.fit(X, y)
log.info(" XGBoost trained for %s: %d samples", symbol, len(y))
self.xgb_models[symbol] = model
# Save to disk
with open(MODEL_DIR / f"xgb_{tag}.pkl", "wb") as f:
pickle.dump(model, f)
# Log top features
importances = model.feature_importances_
cols = features_df.columns.tolist()
top_idx = np.argsort(importances)[-5:][::-1]
top_feats = [(cols[i], round(float(importances[i]), 4)) for i in top_idx]
log.info(" XGBoost top features: %s", top_feats)
# --- Load from disk ----------------------------------------------------
def load_models(self, symbol: str, n_features: int):
"""Attempt to load saved models from disk."""
tag = symbol.replace("/", "_")
# LSTM
lstm_path = MODEL_DIR / f"lstm_{tag}.pt"
if lstm_path.exists() and symbol not in self.lstm_models:
try:
checkpoint = torch.load(lstm_path, map_location=DEVICE, weights_only=False)
model = CryptoLSTM(
input_size=checkpoint["input_size"],
hidden_size=64, num_layers=2, dropout=0.3,
).to(DEVICE)
model.load_state_dict(checkpoint["state_dict"])
model.eval()
self.lstm_models[symbol] = {
"model": model,
"feat_mean": checkpoint["feat_mean"],
"feat_std": checkpoint["feat_std"],
}
log.info(" LSTM model loaded from disk for %s", symbol)
except Exception as e:
log.warning(" Failed to load LSTM for %s: %s", symbol, e)
# XGBoost
xgb_path = MODEL_DIR / f"xgb_{tag}.pkl"
if xgb_path.exists() and symbol not in self.xgb_models:
try:
with open(xgb_path, "rb") as f:
self.xgb_models[symbol] = pickle.load(f)
log.info(" XGBoost model loaded from disk for %s", symbol)
except Exception as e:
log.warning(" Failed to load XGBoost for %s: %s", symbol, e)
# --- LSTM inference ----------------------------------------------------
def predict_lstm(self, symbol: str, features_df: pd.DataFrame) -> Optional[float]:
"""Return LSTM probability of upward move (0..1), or None if unavailable."""
if symbol not in self.lstm_models:
return None
info = self.lstm_models[symbol]
model = info["model"]
feat_mean = info["feat_mean"]
feat_std = info["feat_std"]
feat_arr = features_df.values.astype(np.float32)
if len(feat_arr) < SEQ_LEN:
return None
# Take last SEQ_LEN rows as the input sequence
seq = feat_arr[-SEQ_LEN:]
seq = (seq - feat_mean[None, :]) / feat_std[None, :]
seq = np.nan_to_num(seq, nan=0.0, posinf=0.0, neginf=0.0)
X = torch.tensor(seq[None, :, :], dtype=torch.float32).to(DEVICE)
model.eval()
with torch.no_grad():
prob = model(X).item()
return float(prob)
# --- XGBoost inference -------------------------------------------------
def predict_xgboost(self, symbol: str, features_df: pd.DataFrame) -> Optional[float]:
"""Return XGBoost probability of upward move (0..1), or None."""
if symbol not in self.xgb_models:
return None
model = self.xgb_models[symbol]
row = features_df.iloc[-1:].values.astype(np.float32)
row = np.nan_to_num(row, nan=0.0, posinf=0.0, neginf=0.0)
try:
prob = model.predict_proba(row)[0, 1]
return float(prob)
except Exception as e:
log.warning(" XGBoost predict failed for %s: %s", symbol, e)
return None
# ═══════════════════════════════════════════════════════════════════════════
# DATA FETCHING (same as multifactor trader)
# ═══════════════════════════════════════════════════════════════════════════
class DataFetcher:
def __init__(self, api_key: str, api_secret: str):
self.crypto_client = CryptoHistoricalDataClient(api_key, api_secret)
self.news_client = NewsClient(api_key, api_secret)
self.trading_client = TradingClient(api_key, api_secret, paper=False)
def get_daily_bars(self, symbol: str, days: int = LOOKBACK_DAYS) -> pd.DataFrame:
start = datetime.now(timezone.utc) - timedelta(days=days)
req = CryptoBarsRequest(symbol_or_symbols=symbol, timeframe=TimeFrame.Day, start=start)
bars = self.crypto_client.get_crypto_bars(req)
df = bars.df
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol, level="symbol")
return df
def get_hourly_bars(self, symbol: str, hours: int = INTRADAY_LOOKBACK_HOURS) -> pd.DataFrame:
start = datetime.now(timezone.utc) - timedelta(hours=hours)
req = CryptoBarsRequest(symbol_or_symbols=symbol, timeframe=TimeFrame.Hour, start=start)
bars = self.crypto_client.get_crypto_bars(req)
df = bars.df
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol, level="symbol")
return df
def get_5min_bars(self, symbol: str, hours: int = 24) -> pd.DataFrame:
start = datetime.now(timezone.utc) - timedelta(hours=hours)
tf = TimeFrame(amount=5, unit=TimeFrameUnit.Minute)
req = CryptoBarsRequest(symbol_or_symbols=symbol, timeframe=tf, start=start)
bars = self.crypto_client.get_crypto_bars(req)
df = bars.df
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol, level="symbol")
return df
def get_orderbook(self, symbol: str) -> Optional[dict]:
try:
req = CryptoLatestOrderbookRequest(symbol_or_symbols=symbol)
result = self.crypto_client.get_crypto_latest_orderbook(req)
ob = result.get(symbol)
if ob is None:
return None
return {
"bids": [(float(q.price), float(q.size)) for q in ob.bids] if ob.bids else [],
"asks": [(float(q.price), float(q.size)) for q in ob.asks] if ob.asks else [],
}
except Exception as e:
log.warning(" Orderbook fetch failed for %s: %s", symbol, e)
return None
def get_latest_quote(self, symbol: str) -> Optional[dict]:
try:
req = CryptoLatestQuoteRequest(symbol_or_symbols=symbol)
result = self.crypto_client.get_crypto_latest_quote(req)
q = result.get(symbol)
if q is None:
return None
return {
"bid": float(q.bid_price), "ask": float(q.ask_price),
"bid_size": float(q.bid_size), "ask_size": float(q.ask_size),
"spread": float(q.ask_price) - float(q.bid_price),
"mid": (float(q.ask_price) + float(q.bid_price)) / 2,
}
except Exception as e:
log.warning(" Quote fetch failed for %s: %s", symbol, e)
return None
def get_snapshot(self, symbol: str) -> Optional[dict]:
try:
req = CryptoSnapshotRequest(symbol_or_symbols=symbol)
result = self.crypto_client.get_crypto_snapshot(req)
snap = result.get(symbol)
if snap is None:
return None
info = {}
if snap.daily_bar:
info["daily_open"] = float(snap.daily_bar.open)
info["daily_high"] = float(snap.daily_bar.high)
info["daily_low"] = float(snap.daily_bar.low)
info["daily_close"] = float(snap.daily_bar.close)
info["daily_volume"] = float(snap.daily_bar.volume)
info["daily_vwap"] = float(snap.daily_bar.vwap)
if snap.previous_daily_bar:
info["prev_close"] = float(snap.previous_daily_bar.close)
info["prev_volume"] = float(snap.previous_daily_bar.volume)
info["prev_vwap"] = float(snap.previous_daily_bar.vwap)
if snap.minute_bar:
info["minute_close"] = float(snap.minute_bar.close)
info["minute_volume"] = float(snap.minute_bar.volume)
info["minute_vwap"] = float(snap.minute_bar.vwap)
return info
except Exception as e:
log.warning(" Snapshot fetch failed for %s: %s", symbol, e)
return None
def get_recent_trades(self, symbol: str, minutes: int = 60) -> pd.DataFrame:
start = datetime.now(timezone.utc) - timedelta(minutes=minutes)
try:
req = CryptoTradesRequest(symbol_or_symbols=symbol, start=start, limit=5000)
trades = self.crypto_client.get_crypto_trades(req)
df = trades.df
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol, level="symbol")
return df
except Exception as e:
log.warning(" Trade fetch failed for %s: %s", symbol, e)
return pd.DataFrame()
def get_news(self, symbol: str, days: int = 3, limit: int = 50) -> List[dict]:
start = datetime.now(timezone.utc) - timedelta(days=days)
ticker = symbol.replace("/USD", "").replace("/", "")
try:
req = NewsRequest(symbols=ticker, start=start, limit=limit,
include_content=True, sort="DESC")
news_set = self.news_client.get_news(req)
articles = []
try:
df = news_set.df if hasattr(news_set, "df") else pd.DataFrame()
except Exception:
df = pd.DataFrame()
if not df.empty:
for _, row in df.iterrows():
articles.append({
"headline": str(row.get("headline", "")),
"summary": str(row.get("summary", "")),
"content": str(row.get("content", "")),
"source": str(row.get("source", "")),
"created_at": row.get("created_at", None),
})
if not articles and hasattr(news_set, "model_dump"):
try:
dump = news_set.model_dump()
raw = dump.get("data", {})
items = raw.values() if isinstance(raw, dict) else raw
for item in items:
if isinstance(item, dict):
articles.append({
"headline": str(item.get("headline", "")),
"summary": str(item.get("summary", "")),
"content": str(item.get("content", "")),
"source": str(item.get("source", "")),
"created_at": item.get("created_at", None),
})
except Exception:
pass
return articles
except Exception as e:
log.warning(" News fetch failed for %s: %s", symbol, e)
return []
def get_account(self):
return self.trading_client.get_account()
def get_position(self, symbol: str) -> float:
try:
pos = self.trading_client.get_open_position(symbol.replace("/", ""))
return float(pos.qty)
except Exception:
return 0.0
# ═══════════════════════════════════════════════════════════════════════════
# FACTOR MODULES (6 original + 2 ML)
# ═══════════════════════════════════════════════════════════════════════════
# --- Factor 1: Technical Analysis -----------------------------------------
def factor_technical(daily: pd.DataFrame) -> FactorSignal:
close, high, low = daily["close"], daily["high"], daily["low"]
signals = {}
ef = _ema(close, EMA_FAST).iloc[-1]
es = _ema(close, EMA_SLOW).iloc[-1]
signals["ema"] = np.clip((ef - es) / es * 20, -1, 1)
sf = _sma(close, SMA_FAST).iloc[-1]
ss = _sma(close, SMA_SLOW).iloc[-1]
signals["sma"] = np.clip((sf - ss) / ss * 20, -1, 1)
rsi_val = _rsi(close, RSI_PERIOD).iloc[-1]
if rsi_val > RSI_UPPER:
signals["rsi"] = -1.0
elif rsi_val < RSI_LOWER:
signals["rsi"] = -0.5
else:
signals["rsi"] = (rsi_val - 56.5) / 36.5 * 0.5
_, _, hist = _macd(close, MACD_FAST, MACD_SLOW, MACD_SIG)
signals["macd"] = np.clip(hist.iloc[-1] / close.iloc[-1] * 1000, -1, 1)
atr_s = _atr(high, low, close, ATR_PERIOD)
atr_now = atr_s.iloc[-1]
atr_median = atr_s.rolling(ATR_PERIOD * 2).median().iloc[-1]
signals["atr"] = np.clip(1.0 - (atr_now / (atr_median + 1e-10) - 1.0), -1, 1) if atr_median > 0 else 0.0
weights = {"ema": 0.30, "sma": 0.20, "rsi": 0.15, "macd": 0.25, "atr": 0.10}
score = sum(signals[k] * weights[k] for k in signals)
agreement = sum(1 for v in signals.values() if v > 0) / len(signals)
confidence = abs(2 * agreement - 1)
return FactorSignal("technical", float(np.clip(score, -1, 1)), float(confidence),
{k: round(v, 4) for k, v in signals.items()})
# --- Factor 2: Microstructure --------------------------------------------
def factor_microstructure(orderbook, quote, recent_trades, snapshot) -> FactorSignal:
signals = {}
data_points = 0
if orderbook and orderbook["bids"] and orderbook["asks"]:
bid_d = sum(p * s for p, s in orderbook["bids"])
ask_d = sum(p * s for p, s in orderbook["asks"])
total = bid_d + ask_d
if total > 0:
signals["book_imbalance"] = float(np.clip((bid_d - ask_d) / total * 3, -1, 1))
data_points += 1
if quote and quote["mid"] > 0:
spread_bps = (quote["spread"] / quote["mid"]) * 10000
signals["spread_score"] = float(np.clip(1.0 - spread_bps / 20, -1, 1))
data_points += 1
if not recent_trades.empty and "price" in recent_trades.columns:
prices = recent_trades["price"].astype(float)
sizes = recent_trades["size"].astype(float) if "size" in recent_trades.columns else pd.Series(np.ones(len(prices)))
tick = prices.diff().fillna(0)
buy_vol = sizes[tick > 0].sum()
sell_vol = sizes[tick < 0].sum()
total_v = buy_vol + sell_vol
if total_v > 0:
signals["trade_flow"] = float(np.clip((buy_vol - sell_vol) / total_v * 2, -1, 1))
data_points += 1
if snapshot and "daily_vwap" in snapshot and "daily_close" in snapshot:
vwap = snapshot["daily_vwap"]
if vwap > 0:
signals["vwap_dev"] = float(np.clip((snapshot["daily_close"] - vwap) / vwap * 20, -1, 1))
data_points += 1
if not signals:
return FactorSignal("microstructure", 0.0, 0.0, {"error": "no data"})
score = np.mean(list(signals.values()))
return FactorSignal("microstructure", float(np.clip(score, -1, 1)),
min(data_points / 4, 1.0),
{k: round(v, 4) for k, v in signals.items()})
# --- Factor 3: Sentiment -------------------------------------------------
def _score_text(text: str) -> float:
t = text.lower()
bull = sum(1 for kw in BULLISH_KEYWORDS if kw in t)
bear = sum(1 for kw in BEARISH_KEYWORDS if kw in t)
total = bull + bear
return (bull - bear) / total if total > 0 else 0.0
def _time_decay(created_at, half_life_hours=24):
if created_at is None:
return 0.5
now = datetime.now(timezone.utc)
if hasattr(created_at, "tzinfo") and created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
try:
age_h = (now - created_at).total_seconds() / 3600
except Exception:
return 0.5
return math.exp(-0.693 * age_h / half_life_hours)
def factor_sentiment(articles: List[dict]) -> FactorSignal:
if not articles:
return FactorSignal("sentiment", 0.0, 0.0, {"articles": 0})
ws, wts = [], []
for a in articles:
h = _score_text(a.get("headline", ""))
s = _score_text(a.get("summary", ""))
c = _score_text(a.get("content", ""))
combined = (h * 3 + s * 2 + c) / 6
decay = _time_decay(a.get("created_at"))
ws.append(combined * decay)
wts.append(decay)
if sum(wts) == 0:
return FactorSignal("sentiment", 0.0, 0.0, {"articles": len(articles)})
score = sum(ws) / sum(wts)
polarized = sum(1 for x in ws if abs(x) > 0.2)
conf = min(polarized / 5, 1.0) * min(len(articles) / 10, 1.0)
return FactorSignal("sentiment", float(np.clip(score, -1, 1)), float(conf),
{"articles": len(articles), "avg_score": round(score, 4)})
# --- Factor 4: Volume ----------------------------------------------------
def factor_volume(daily: pd.DataFrame, bars_5min: pd.DataFrame, snapshot) -> FactorSignal:
close, volume = daily["close"], daily["volume"]
signals = {}
vol10 = _sma(volume, 10).iloc[-1]
vol30 = _sma(volume, 30).iloc[-1]
if vol30 > 0:
ratio = vol10 / vol30
direction = 1 if close.iloc[-1] > close.iloc[-5] else -1
signals["volume_trend"] = 0.5 * direction if ratio > 1.0 else -0.3 * direction
price_chg = close.diff()
sign_fn = price_chg.apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
obv = (volume * sign_fn).cumsum()
obv_f = _ema(obv, 10).iloc[-1]
obv_s = _ema(obv, 30).iloc[-1]
if obv_s != 0:
signals["obv_trend"] = float(np.clip((obv_f - obv_s) / abs(obv_s) * 5, -1, 1))
if not bars_5min.empty and "vwap" in bars_5min.columns and len(bars_5min) > 10:
iv = bars_5min["volume"]
vwap = (bars_5min["vwap"] * iv).sum() / (iv.sum() + 1e-10)
if vwap > 0:
signals["intraday_vwap"] = float(np.clip((bars_5min["close"].iloc[-1] - vwap) / vwap * 30, -1, 1))
if not signals:
return FactorSignal("volume", 0.0, 0.0, {})
score = np.mean(list(signals.values()))
return FactorSignal("volume", float(np.clip(score, -1, 1)),
min(len(signals) / 3, 1.0),
{k: round(v, 4) for k, v in signals.items()})
# --- Factor 5: Regime Detection -------------------------------------------
def factor_regime(daily: pd.DataFrame) -> FactorSignal:
close = daily["close"]
returns = close.pct_change().dropna()
signals = {}
rv14 = returns.rolling(14).std().iloc[-1] * np.sqrt(365)
hist_vol = returns.rolling(14).std() * np.sqrt(365)
vol_pct = (hist_vol < rv14).mean()
if vol_pct > 0.80:
vol_regime, signals["vol_regime"] = "high_vol", -0.5
elif vol_pct < 0.30:
vol_regime, signals["vol_regime"] = "low_vol", 0.3
else:
vol_regime, signals["vol_regime"] = "normal_vol", 0.0
hurst = _hurst_exponent(close, 20)
if hurst > 0.6:
trend_regime, signals["trend_regime"] = "trending", 0.5
elif hurst < 0.4:
trend_regime, signals["trend_regime"] = "mean_reverting", -0.3
else:
trend_regime, signals["trend_regime"] = "random", 0.0
if len(returns) >= 30:
signals["skewness"] = float(np.clip(returns.tail(30).skew() * 0.5, -1, 1))
signals["tail_risk"] = float(np.clip(-returns.tail(30).kurtosis() / 10, -1, 0))
if len(returns) >= 20:
ac = returns.autocorr(lag=1)
if not np.isnan(ac):
signals["momentum_persist"] = float(np.clip(ac * 3, -1, 1))
score = np.mean(list(signals.values()))
return FactorSignal("regime", float(np.clip(score, -1, 1)),
min(len(signals) / 4, 1.0),
{"vol_regime_label": vol_regime, "trend_regime_label": trend_regime,
"hurst": round(hurst, 4),
**{k: round(v, 4) for k, v in signals.items()}})
# --- Factor 6: Anomaly Detection ------------------------------------------
def factor_anomaly(daily: pd.DataFrame, quote, snapshot) -> FactorSignal:
close, volume = daily["close"], daily["volume"]
returns = close.pct_change().dropna()
anomalies = []
caution = 0.0
ret_std = returns.rolling(30).std().iloc[-1]
if ret_std > 0:
z = (returns.iloc[-1] - returns.rolling(30).mean().iloc[-1]) / ret_std
if abs(z) > 2.5:
anomalies.append(f"return_z={z:.2f}")
caution -= 0.3
vol_std = volume.rolling(30).std().iloc[-1]
if vol_std > 0:
vz = (volume.iloc[-1] - volume.rolling(30).mean().iloc[-1]) / vol_std
if vz > 3.0:
anomalies.append(f"vol_z={vz:.2f}")
caution -= 0.2
if quote and quote["mid"] > 0:
sp = (quote["spread"] / quote["mid"]) * 10000
if sp > 30:
anomalies.append(f"wide_spread={sp:.0f}bps")
caution -= 0.4
if snapshot and "daily_open" in snapshot and "prev_close" in snapshot:
gap = (snapshot["daily_open"] - snapshot["prev_close"]) / snapshot["prev_close"]
if abs(gap) > 0.03:
anomalies.append(f"gap={gap:.2%}")
caution -= 0.2
sma60 = _sma(close, 60).iloc[-1]
if sma60 > 0:
dist = (close.iloc[-1] - sma60) / sma60
if abs(dist) > 0.15:
anomalies.append(f"sma60_dist={dist:.2%}")
caution -= 0.15
score = float(np.clip(caution, -1, 0))
conf = min(len(anomalies) / 3, 1.0) if anomalies else 0.5
return FactorSignal("anomaly", score, float(conf),
{"anomalies": anomalies, "caution": round(caution, 4)})
# --- Factor 7: ML LSTM ---------------------------------------------------
def factor_ml_lstm(prob: Optional[float]) -> FactorSignal:
"""Convert LSTM probability into a factor signal."""
if prob is None:
return FactorSignal("ml_lstm", 0.0, 0.0, {"status": "no model"})
# prob=0.5 => score=0, prob=1.0 => score=+1, prob=0.0 => score=-1
score = (prob - 0.5) * 2.0
# Confidence: high when probability is far from 0.5
confidence = min(abs(score) * 1.5, 1.0)
return FactorSignal("ml_lstm", float(np.clip(score, -1, 1)), float(confidence),
{"probability": round(prob, 4)})
# --- Factor 8: ML XGBoost ------------------------------------------------
def factor_ml_xgboost(prob: Optional[float]) -> FactorSignal:
"""Convert XGBoost probability into a factor signal."""
if prob is None:
return FactorSignal("ml_xgboost", 0.0, 0.0, {"status": "no model"})
score = (prob - 0.5) * 2.0
confidence = min(abs(score) * 1.5, 1.0)
return FactorSignal("ml_xgboost", float(np.clip(score, -1, 1)), float(confidence),
{"probability": round(prob, 4)})
# ═══════════════════════════════════════════════════════════════════════════
# SIGNAL AGGREGATION
# ═══════════════════════════════════════════════════════════════════════════
def aggregate_signals(factors: List[FactorSignal]) -> Tuple[float, float]:
if not factors:
return 0.0, 0.0
weighted_score = 0.0
weighted_conf = 0.0
total_weight = 0.0
for f in factors:
w = FACTOR_WEIGHTS.get(f.name, 0.0)
ew = w * f.confidence
weighted_score += f.score * ew
weighted_conf += f.confidence * w
total_weight += ew
score = weighted_score / total_weight if total_weight > 0 else 0.0
directions = [f.score for f in factors if f.confidence > 0.2]
if directions:
pos = sum(1 for d in directions if d > 0)
agreement = max(pos, len(directions) - pos) / len(directions)
else:
agreement = 0.0
confidence = weighted_conf * agreement
return float(np.clip(score, -1, 1)), float(np.clip(confidence, 0, 1))
# ═══════════════════════════════════════════════════════════════════════════
# RISK MANAGEMENT & POSITION SIZING
# ═══════════════════════════════════════════════════════════════════════════
class RiskManager:
def __init__(self, equity: float, daily_pnl_pct: float):
self.equity = equity
self.daily_pnl_pct = daily_pnl_pct
self.circuit_breaker_active = daily_pnl_pct < DRAWDOWN_CIRCUIT_BREAKER
def check_circuit_breaker(self) -> bool:
if self.circuit_breaker_active:
log.warning(" CIRCUIT BREAKER ACTIVE: daily P&L %.2f%% < %.2f%%",
self.daily_pnl_pct * 100, DRAWDOWN_CIRCUIT_BREAKER * 100)
return self.circuit_breaker_active
def kelly_fraction(self, win_rate, avg_win, avg_loss):
if avg_loss == 0 or win_rate <= 0:
return 0.0
b = avg_win / avg_loss
kelly = (win_rate * b - (1 - win_rate)) / b
return min(max(kelly, 0.0), KELLY_FRACTION_CAP)
def compute_position_weight(self, score, confidence, vol_regime, daily_returns):
if score <= 0 or confidence < MIN_SIGNAL_CONFIDENCE:
return 0.0
base = score * confidence * MAX_EXPOSURE_PER_ASSET
rets = daily_returns.dropna()
if len(rets) >= 30:
wins = rets[rets > 0]
losses = rets[rets < 0]
if len(wins) > 0 and len(losses) > 0:
kelly = self.kelly_fraction(len(wins) / len(rets), wins.mean(), abs(losses.mean()))
base = min(base, kelly * self.equity) if kelly > 0 else base * 0.5
mult = {"high_vol": 0.5, "normal_vol": 1.0, "low_vol": 1.2}
for k, v in mult.items():
if k in vol_regime:
base *= v
break
return min(base, MAX_EXPOSURE_PER_ASSET)
# ═══════════════════════════════════════════════════════════════════════════
# EXECUTION ENGINE
# ═══════════════════════════════════════════════════════════════════════════
class ExecutionEngine:
MIN_ORDER_NOTIONAL = 1.00
def __init__(self, trading_client: TradingClient, dry_run: bool = False):
self.client = trading_client
self.dry_run = dry_run
def execute(self, symbol, target_qty, current_qty, quote, last_price=0.0, use_limit=True):
diff = target_qty - current_qty
if abs(diff) < 1e-8:
log.info(" %s: at target. No action.", symbol)
return
side = OrderSide.BUY if diff > 0 else OrderSide.SELL
qty = abs(diff)
ref_price = quote["mid"] if quote and quote.get("mid", 0) > 0 else last_price
notional = qty * ref_price if ref_price > 0 else 0
if notional < self.MIN_ORDER_NOTIONAL:
if side == OrderSide.SELL and target_qty == 0.0 and current_qty > 0:
min_qty = self.MIN_ORDER_NOTIONAL / ref_price if ref_price > 0 else qty
if current_qty >= min_qty:
qty = current_qty
else:
log.info(" %s: position $%.2f below $1 min. Skipping.", symbol, current_qty * ref_price)
return
else:
log.info(" %s: notional $%.2f below $1 min. Skipping.", symbol, notional)
return
log.info(" %s: %s %.8f (~$%.2f)", symbol, side.name, qty, qty * ref_price)
if self.dry_run:
log.info(" [DRY RUN] Order not submitted.")
return
ticker = symbol.replace("/", "")
if use_limit and quote and quote["mid"] > 0:
lp = round(quote["ask"] * 1.001, 2) if side == OrderSide.BUY else round(quote["bid"] * 0.999, 2)
order_req = LimitOrderRequest(symbol=ticker, qty=round(qty, 8), side=side,
time_in_force=TimeInForce.GTC, limit_price=lp)
else:
order_req = MarketOrderRequest(symbol=ticker, qty=round(qty, 8), side=side,
time_in_force=TimeInForce.GTC)
try:
order = self.client.submit_order(order_req)
log.info(" Order submitted: id=%s status=%s", order.id, order.status)
except Exception as e:
log.error(" Order failed: %s", e)
if use_limit:
self.execute(symbol, target_qty, current_qty, quote, last_price, use_limit=False)
# ═══════════════════════════════════════════════════════════════════════════
# MAIN ORCHESTRATOR
# ═══════════════════════════════════════════════════════════════════════════
def run(args, ml_mgr: MLModelManager):
log.info("=" * 70)
log.info("ML-ENHANCED MULTI-FACTOR CRYPTO TRADING SYSTEM")
log.info("Time: %s", datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"))
log.info("Mode: %s | Device: %s", "DRY RUN" if args.dry_run else "LIVE", DEVICE)
log.info("=" * 70)
creds = load_credentials(args.creds)
fetcher = DataFetcher(creds["alpaca_key"], creds["secret"])
account = fetcher.get_account()
equity = float(account.equity)
last_equity = float(account.last_equity)
daily_pnl_pct = (equity - last_equity) / last_equity if last_equity > 0 else 0.0
log.info("Account equity: $%.2f | Daily P&L: %.2f%%", equity, daily_pnl_pct * 100)
if equity <= 0:
log.error("Zero equity. Exiting.")
sys.exit(1)
risk_mgr = RiskManager(equity, daily_pnl_pct)
exec_engine = ExecutionEngine(fetcher.trading_client, dry_run=args.dry_run)
breaker = risk_mgr.check_circuit_breaker()
# --- Check if ML models need retraining --------------------------------
need_retrain = args.retrain or ml_mgr.needs_retrain()
if need_retrain:
log.info("")
log.info(">>> ML MODEL TRAINING <<<")
total_exposure = 0.0
for symbol in SYMBOLS:
log.info("")
log.info("-" * 70)
log.info("ANALYZING: %s", symbol)
log.info("-" * 70)
# --- Fetch data ----------------------------------------------------
daily = fetcher.get_daily_bars(symbol, LOOKBACK_DAYS)
if len(daily) < EMA_SLOW + 20:
log.warning(" Insufficient bars (%d). Skipping.", len(daily))
continue
bars_5min = fetcher.get_5min_bars(symbol)
orderbook = fetcher.get_orderbook(symbol)
quote = fetcher.get_latest_quote(symbol)
snapshot = fetcher.get_snapshot(symbol)
recent_trades = fetcher.get_recent_trades(symbol, 60)
articles = fetcher.get_news(symbol, days=3)
last_price = float(daily["close"].iloc[-1])
log.info(" Last price: $%.2f | Bars: %d", last_price, len(daily))
# --- Build ML features ---------------------------------------------
features_df = build_feature_matrix(daily)
n_features = features_df.shape[1]
# --- Train if needed -----------------------------------------------
if need_retrain:
target = build_target(daily)
# Use most recent ML_TRAIN_DAYS for training
train_end = len(features_df) - 1 # exclude last row (no future target)
train_start = max(0, train_end - ML_TRAIN_DAYS)
train_feat = features_df.iloc[train_start:train_end]
train_tgt = target.iloc[train_start:train_end]
log.info(" Training ML models on %d rows...", len(train_feat))
try:
ml_mgr.train_lstm(symbol, train_feat, train_tgt)
except Exception as e:
log.error(" LSTM training failed: %s", e)
try:
ml_mgr.train_xgboost(symbol, train_feat, train_tgt)
except Exception as e:
log.error(" XGBoost training failed: %s", e)
# --- Load models if not in memory ----------------------------------
ml_mgr.load_models(symbol, n_features)
# --- ML predictions ------------------------------------------------
lstm_prob = ml_mgr.predict_lstm(symbol, features_df)
xgb_prob = ml_mgr.predict_xgboost(symbol, features_df)
# --- Run all 8 factor modules --------------------------------------
f_tech = factor_technical(daily)
f_micro = factor_microstructure(orderbook, quote, recent_trades, snapshot)
f_sent = factor_sentiment(articles)
f_vol = factor_volume(daily, bars_5min, snapshot)
f_regime = factor_regime(daily)
f_anom = factor_anomaly(daily, quote, snapshot)
f_lstm = factor_ml_lstm(lstm_prob)
f_xgb = factor_ml_xgboost(xgb_prob)
factors = [f_tech, f_micro, f_sent, f_vol, f_regime, f_anom, f_lstm, f_xgb]
# --- Log factor scores ---------------------------------------------
log.info("")
log.info(" FACTOR SCORES:")
for f in factors:
log.info(" %-16s score=%+.3f conf=%.3f weight=%.2f",
f.name, f.score, f.confidence, FACTOR_WEIGHTS.get(f.name, 0))
if args.verbose:
for k, v in f.details.items():
log.info(" %s: %s", k, v)
# --- Aggregate -----------------------------------------------------
agg_score, agg_conf = aggregate_signals(factors)
log.info("")
log.info(" AGGREGATE: score=%+.4f confidence=%.4f", agg_score, agg_conf)
# --- Position sizing -----------------------------------------------
vol_regime = f_regime.details.get("vol_regime_label", "normal_vol")
returns = daily["close"].pct_change()
target_weight = risk_mgr.compute_position_weight(agg_score, agg_conf, vol_regime, returns)
if total_exposure + target_weight > MAX_PORTFOLIO_EXPOSURE:
target_weight = max(0, MAX_PORTFOLIO_EXPOSURE - total_exposure)
total_exposure += target_weight
if breaker:
direction, target_weight = "flat", 0.0
elif agg_score > 0 and agg_conf >= MIN_SIGNAL_CONFIDENCE:
direction = "long"
else:
direction, target_weight = "flat", 0.0
decision = TradeDecision(
symbol=symbol, direction=direction, conviction=agg_conf,
target_weight=target_weight, factors=factors,
anomalies=f_anom.details.get("anomalies", []),
regime=f"{vol_regime}/{f_regime.details.get('trend_regime_label', 'unknown')}",
)
log.info("")
log.info(" DECISION: %s dir=%s conviction=%.3f weight=%.4f regime=%s",
symbol, decision.direction, decision.conviction,
decision.target_weight, decision.regime)
if decision.anomalies:
log.info(" ANOMALIES: %s", ", ".join(decision.anomalies))
# --- Execute -------------------------------------------------------
current_qty = fetcher.get_position(symbol)
target_qty = (equity * target_weight / last_price) if target_weight > 0 else 0.0
log.info(" POSITION: current=%.8f target=%.8f (%.2f%% = $%.2f)",
current_qty, target_qty, target_weight * 100, target_weight * equity)
try:
exec_engine.execute(symbol, target_qty, current_qty, quote, last_price)
except Exception as e:
log.error(" Execution error: %s", e)
# --- Save retrain timestamp -------------------------------------------
if need_retrain:
ml_mgr._save_train_timestamp()
log.info(" ML models retrained and saved.")
log.info("")
log.info("=" * 70)
log.info("SUMMARY: exposure=%.2f%% of $%.2f | circuit_breaker=%s",
total_exposure * 100, equity, "ACTIVE" if breaker else "off")
log.info("=" * 70)
def main():
global MAX_EXPOSURE_PER_ASSET
parser = argparse.ArgumentParser(description="ML-Enhanced Multi-Factor Crypto Trader")
parser.add_argument("--dry-run", action="store_true", help="Signals only, no orders")
parser.add_argument("--verbose", action="store_true", help="Detailed factor breakdowns")
parser.add_argument("--creds", default="alpaca_creds", help="Credential file path")
parser.add_argument("--max-exposure", type=float, default=None)
parser.add_argument("--continuous", action="store_true", help="Run in a loop")
parser.add_argument("--interval", type=int, default=3600, help="Loop interval in seconds")
parser.add_argument("--retrain", action="store_true", help="Force ML model retrain this run")
args = parser.parse_args()
if args.max_exposure is not None:
MAX_EXPOSURE_PER_ASSET = args.max_exposure
# Create ML model manager (persists across iterations in continuous mode)
ml_mgr = MLModelManager()
if args.continuous:
log.info("Continuous mode (interval=%ds). Ctrl+C to stop.", args.interval)
iteration = 0
while True:
iteration += 1
try:
log.info("\n### ITERATION %d ###", iteration)
run(args, ml_mgr)
# Only force retrain on first iteration
if args.retrain:
args.retrain = False
except KeyboardInterrupt:
log.info("Interrupted. Shutting down.")
break
except Exception as e:
log.exception("Error in iteration %d: %s", iteration, e)
try:
next_t = datetime.now(timezone.utc) + timedelta(seconds=args.interval)
log.info("Next run at %s. Sleeping %ds...", next_t.strftime("%H:%M:%S UTC"), args.interval)
time_module.sleep(args.interval)
except KeyboardInterrupt:
log.info("Interrupted during sleep. Shutting down.")
break
else:
try:
run(args, ml_mgr)
except KeyboardInterrupt:
log.info("Interrupted.")
except Exception as e:
log.exception("Fatal error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment