Skip to content

Instantly share code, notes, and snippets.

@diegogaulke
Created February 15, 2026 20:01
Show Gist options
  • Select an option

  • Save diegogaulke/8e5459ff86552bae5c28c55d8c1ac301 to your computer and use it in GitHub Desktop.

Select an option

Save diegogaulke/8e5459ff86552bae5c28c55d8c1ac301 to your computer and use it in GitHub Desktop.
HFT crypto bot - bybit exclusive - prototype - loose variables
import os
import time
import sys
import threading
import pandas as pd
import ta
import requests
import csv
import queue
import traceback
import numpy as np
from collections import deque
from decimal import Decimal, ROUND_DOWN, Context, InvalidOperation, localcontext
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from pybit.unified_trading import HTTP, WebSocket
from dotenv import load_dotenv
try:
from tenacity import retry, stop_after_attempt, wait_fixed
except ImportError:
def retry(*args, **kwargs): return lambda f: f
def stop_after_attempt(*args, **kwargs): return None
def wait_fixed(*args, **kwargs): return None
# ==========================================
# 0. CONFIGURAÇÃO (V98.1 - THE OMNISCIENT HFT)
# ==========================================
load_dotenv()
decimal_ctx = Context(prec=28, rounding=ROUND_DOWN)
class Config:
API_KEY = os.getenv("BYBIT_API_KEY")
API_SECRET = os.getenv("BYBIT_API_SECRET")
TG_TOKEN = os.getenv("TG_TOKEN")
TG_CHAT_ID = os.getenv("TG_CHAT_ID")
TESTNET = False
USE_TELEGRAM = True
ENTRY_INTERVAL = 1
FILTER_INTERVAL = 5
SYMBOL_COOLDOWN = 60
MAX_POSITIONS = 3
MAX_SESSION_LOSS_PCT = 0.10
TOTAL_EQUITY = 0.0
BASE_RISK_PER_TRADE = 0.006
LEVERAGE = 5
MIN_SL_PCT = 0.018
TP_PCT = 0.025
MAX_FUNDING_RATE = 0.0004
EUPHORIA_SL_MULT = 1.30
EUPHORIA_RISK_MULT = 1.25
EUPHORIA_TRAIL_DELAY = 1.50
BTC_EUPHORIA_THRESHOLD = 0.5
TIME_STOP_SEC = 600
EXTENSION_TIME_SEC = 300
MIN_EFFICIENCY = 0.3
CRASH_SIGMA = 1.80
CRASH_R2 = 0.60
RELATIVE_SIGMA_THRESHOLD = -1.5
BTC_DUMP_THRESHOLD = -1.5
ATR_SAFE_MULTIPLIER = 4.0
MIN_MOVE_PCT = 0.002
MIN_VOLATILITY_PCT = 0.0015 # Reduzido para 0.15% por minuto
MAX_SPREAD_PCT = 0.0004
# [AJUSTES HFT CPU & MICROESTRUTURA]
MAX_WORKERS = 20 # Reduzido de 50 para evitar overhead de contexto
OFI_ENTRY_MULTIPLIER = 0.35 # Mais tolerância ao OFI contrário na entrada
OFI_EXIT_MULTIPLIER = 0.30 # Limite do OFI dinâmico para pânico no Trailing (30%)
OFI_TRAIL_TIGHTEN = 0.60 # Aperta o SL em 40% (distância * 0.6) se der pânico
CPU_DEBOUNCE = 0.2
ORDER_DEBOUNCE = 5.0
TRAILING_DEBOUNCE = 1.0
PENDING_ORDER_TTL = 20.0
VOLATILITY_FACTOR = 0.50
MAX_VOL_FACTOR = 5.0
SIDEWAYS_THRESHOLD = 0.012
MAJORS = [
'ETHUSDT', 'SOLUSDT', 'BNBUSDT', 'AVAXUSDT', 'LINKUSDT',
'NEARUSDT', 'MATICUSDT', 'AAVEUSDT', 'UNIUSDT', 'DOTUSDT',
'XRPUSDT', 'ADAUSDT', 'TRXUSDT', 'LTCUSDT', 'BCHUSDT',
'TONUSDT', 'ATOMUSDT'
]
MEMES = [
'1000PEPEUSDT', '1000BONKUSDT', '1000FLOKIUSDT', 'WIFUSDT', 'DOGEUSDT',
'JUPUSDT', 'PYTHUSDT', 'WLDUSDT', 'SUIUSDT', 'FETUSDT',
'SHIB1000USDT', 'ARBUSDT', 'OPUSDT', 'TIAUSDT', 'INJUSDT',
'RNDRUSDT', 'STXUSDT', 'BOMEUSDT', 'ORDIUSDT', 'SEIUSDT',
'APTUSDT', 'FTMUSDT'
]
ACTIVE_WHITELIST = MAJORS + MEMES
CURRENT_REGIME = "INIT"
STRATEGY = {
"RSI_PERIOD": 14,
"RSI_BUY": 40, # Afrouxado para dar mais gatilhos
"RSI_SELL": 60, # Afrouxado para dar mais gatilhos
"BB_PERIOD": 20,
"BB_STD": 2.0,
"EMA_FAST": 50,
"EMA_SLOW": 200
}
BREAKEVEN_TRIGGER = 0.0025 # Reduzido para 0.25% (Arma a defesa mais cedo)
TRAIL_DISTANCE = 0.0020
C_YELLOW = "\033[33m"; C_RED = "\033[31m"; C_GREEN = "\033[32m"; C_CYAN = "\033[36m"; C_RESET = "\033[0m"
def safe_float(val):
try: return float(val) if val else 0.0
except: return 0.0
def safe_decimal(val):
try: return Decimal(str(val))
except (InvalidOperation, TypeError): return Decimal("0")
class AsyncLogManager:
def __init__(self):
self.queue = queue.Queue()
self.running = True
self.thread = threading.Thread(target=self._worker, daemon=True)
self.thread.start()
def _worker(self):
while self.running:
try:
item = self.queue.get(timeout=2)
if item is None: break
fn, row = item
try:
with open(fn, 'a', newline='') as f: csv.writer(f).writerow(row)
except: pass
finally: self.queue.task_done()
except: pass
def log(self, fn, row):
if self.running: self.queue.put((fn, row))
def shutdown(self):
self.running = False; self.queue.put(None); self.thread.join(timeout=3)
log_manager = AsyncLogManager()
class AnalysisLogger:
FILENAME = ""
@classmethod
def init(cls):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
cls.FILENAME = f"ghost_session_{timestamp}.csv"
if not os.path.exists(cls.FILENAME):
with open(cls.FILENAME, 'w', newline='') as f:
csv.writer(f).writerow(["TIMESTAMP", "EVENT", "SYMBOL", "SIDE", "PRICE", "SIZE_USD", "PNL", "EXIT_REASON", "METRICS", "FUNDING"])
@classmethod
def log_entry(cls, sym, side, px, size, rsi, metrics, funding=0.0):
log_manager.log(cls.FILENAME, [datetime.now(), "ENTRY", sym, side, px, size, 0.0, "", f"RSI:{rsi:.1f} | {metrics}", f"{funding:.6f}"])
@classmethod
def log_exit(cls, sym, pnl, reason):
log_manager.log(cls.FILENAME, [datetime.now(), "EXIT", sym, "", "", "", f"{pnl:.4f}", reason, "", ""])
class BotLogger:
@staticmethod
def log(msg, color=""):
print(f"\n{Config.C_YELLOW}[{datetime.now().strftime('%H:%M:%S')}]{Config.C_RESET} {color}{msg}{Config.C_RESET}")
sys.stdout.flush()
class TelegramBot:
_queue = queue.Queue()
_thread = None
@classmethod
def _worker(cls):
while True:
msg = cls._queue.get()
if msg is None: break
try:
requests.post(f"https://api.telegram.org/bot{Config.TG_TOKEN}/sendMessage", json={"chat_id": Config.TG_CHAT_ID, "text": msg, "parse_mode": "HTML"}, timeout=5)
except: pass
time.sleep(0.5)
@classmethod
def send(cls, msg):
if not Config.USE_TELEGRAM: return
if cls._thread is None:
cls._thread = threading.Thread(target=cls._worker, daemon=True)
cls._thread.start()
cls._queue.put(msg)
class PerformanceEngine:
def __init__(self):
self.total_trades = 0; self.wins = 0; self.losses = 0; self.gross_profit = 0.0; self.gross_loss = 0.0
def update_trade(self, pnl):
self.total_trades += 1
if pnl > 0: self.wins += 1; self.gross_profit += pnl
else: self.losses += 1; self.gross_loss += abs(pnl)
def get_stats_str(self):
win_rate = (self.wins / self.total_trades * 100) if self.total_trades > 0 else 0.0
profit_factor = (self.gross_profit / self.gross_loss) if self.gross_loss > 0 else 99.9
return f"WR: {win_rate:.1f}% | PF: {profit_factor:.2f} | Trades: {self.total_trades}"
perf_engine = PerformanceEngine()
class BotState:
def __init__(self):
self.positions = {}
self.pending_orders = {}
self.klines = {}
self.df_cache = {}
self.filters = {}
self.last_exits = {}
self.session_pnl = 0.0
self.processed_pnl_ids = deque(maxlen=1000)
self.last_entry_check = {}
self.last_symbol_entry = {}
self.active_analysis = set()
self.last_trailing_api = {}
self.trailing_guards = {}
self.predictor_memory = {}
self.exit_reasons = {}
self.btc_metrics = {'slope_std': 0.0, 'accel': 0.0}
self.lock = threading.RLock()
self.last_block_msg = {}
# [OFI HFT STATE]
self.ob_state = {}
self.ofi_history = {}
self.ofi_sum = {}
self.liquidity_proxy = {} # Proxy para Normalização
# [CPU OPTIMIZATION]
self.latest_ta = {}
def update_orderbook_and_calc_ofi(self, symbol, bid_price, bid_size, ask_price, ask_size):
with self.lock:
WINDOW_SIZE = 50
# Atualiza o proxy de liquidez (Média do tamanho no topo do book)
self.liquidity_proxy[symbol] = (bid_size + ask_size) / 2.0
if symbol not in self.ob_state:
self.ob_state[symbol] = {'bp': bid_price, 'bs': bid_size, 'ap': ask_price, 'as': ask_size}
self.ofi_history[symbol] = deque(maxlen=WINDOW_SIZE)
self.ofi_sum[symbol] = 0.0
return 0.0
prev = self.ob_state[symbol]
current_tick_ofi = 0.0
# Pressão de Compra
if bid_price > prev['bp']: current_tick_ofi += bid_size
elif bid_price == prev['bp']: current_tick_ofi += (bid_size - prev['bs'])
else: current_tick_ofi -= prev['bs']
# Pressão de Venda
if ask_price < prev['ap']: current_tick_ofi -= ask_size
elif ask_price == prev['ap']: current_tick_ofi -= (ask_size - prev['as'])
else: current_tick_ofi += prev['as']
self.ob_state[symbol] = {'bp': bid_price, 'bs': bid_size, 'ap': ask_price, 'as': ask_size}
# MATEMÁTICA HFT O(1): Subtrai o valor mais velho antes de adicionar o novo
if len(self.ofi_history[symbol]) == WINDOW_SIZE:
self.ofi_sum[symbol] -= self.ofi_history[symbol][0]
self.ofi_history[symbol].append(current_tick_ofi)
self.ofi_sum[symbol] += current_tick_ofi
return self.ofi_sum[symbol]
def get_slot_count(self):
with self.lock:
real_positions = {s for s, p in self.positions.items() if safe_float(p.get("size", 0)) > 0}
return len(real_positions) + len(self.pending_orders)
def reserve_slot(self, symbol):
with self.lock:
if self.get_slot_count() >= Config.MAX_POSITIONS: return False
if symbol in self.positions or symbol in self.pending_orders: return False
pos = self.positions.get(symbol)
if pos and safe_float(pos.get("size", 0)) > 0: return False
self.pending_orders[symbol] = time.time()
return True
def release_slot(self, symbol):
with self.lock: self.pending_orders.pop(symbol, None)
def force_release(self, symbol):
with self.lock:
if symbol in self.positions:
self.last_exits[symbol] = time.time()
self.positions.pop(symbol, None)
self.pending_orders.pop(symbol, None)
self.trailing_guards.pop(symbol, None)
self.predictor_memory.pop(symbol, None)
def cleanup_expired_slots(self):
now = time.time()
expired = []
with self.lock:
for symbol, ts in list(self.pending_orders.items()):
pos = self.positions.get(symbol)
if pos and safe_float(pos.get("size", 0)) > 0: continue
if now - ts > Config.PENDING_ORDER_TTL: expired.append(symbol)
for symbol in expired: self.pending_orders.pop(symbol, None)
if expired: exc.sync_initial_state()
def update_position(self, symbol, data):
if not symbol: return
with self.lock:
if symbol in self.positions:
current_data = self.positions[symbol]
for k, v in data.items():
current_data[k] = v
else:
current_data = data.copy()
size = safe_float(current_data.get('size', 0))
if size > 0:
self.positions[symbol] = current_data
self.pending_orders.pop(symbol, None)
else:
if symbol in self.positions: self.last_exits[symbol] = time.time()
self.positions.pop(symbol, None)
self.pending_orders.pop(symbol, None)
self.trailing_guards.pop(symbol, None)
def get_trailing_guard(self, symbol):
with self.lock: return self.trailing_guards.get(symbol, Decimal(0))
def update_trailing_guard(self, symbol, price):
with self.lock: self.trailing_guards[symbol] = price
def update_kline(self, symbol, candle, is_confirmed):
with self.lock:
if symbol not in self.klines: self.klines[symbol] = deque(maxlen=205)
dq = self.klines[symbol]
if len(dq) > 0 and dq[-1]['ts'] == candle['ts']: dq[-1] = candle
else: dq.append(candle)
def get_df_cached(self, symbol):
with self.lock:
if symbol not in self.klines or len(self.klines[symbol]) < 35: return pd.DataFrame()
last_ts = self.klines[symbol][-1]['ts']
last_close = self.klines[symbol][-1]['c']
cached = self.df_cache.get(symbol)
if cached and cached[0] == last_ts and cached[1] == last_close:
return cached[2].copy()
df = pd.DataFrame(list(self.klines[symbol]))
df.sort_values(by='ts', inplace=True)
self.df_cache[symbol] = (last_ts, last_close, df)
return df.copy()
def can_analyze(self, symbol):
now = time.time()
with self.lock:
if symbol in self.active_analysis:
if (now - self.last_entry_check.get(symbol, 0)) > 5.0: self.active_analysis.discard(symbol)
else: return False
if (now - self.last_exits.get(symbol, 0)) < Config.SYMBOL_COOLDOWN: return False
if (now - self.last_symbol_entry.get(symbol, 0)) < Config.ORDER_DEBOUNCE: return False
if (now - self.last_entry_check.get(symbol, 0)) < Config.CPU_DEBOUNCE: return False
self.active_analysis.add(symbol)
self.last_entry_check[symbol] = now
return True
def finish_analysis(self, symbol):
with self.lock: self.active_analysis.discard(symbol)
def can_update_trailing(self, symbol):
now = time.time()
with self.lock: return (now - self.last_trailing_api.get(symbol, 0)) >= Config.TRAILING_DEBOUNCE
def mark_trailing_update(self, symbol):
with self.lock: self.last_trailing_api[symbol] = time.time()
def update_closed_pnl(self, order_id, amount):
with self.lock:
if order_id in self.processed_pnl_ids: return False
self.processed_pnl_ids.append(order_id)
self.session_pnl += amount
return True
def get_session_pnl(self):
with self.lock: return self.session_pnl
def update_btc_metrics(self, slope_std, accel):
with self.lock: self.btc_metrics = {'slope_std': slope_std, 'accel': accel}
def get_btc_metrics(self):
with self.lock: return self.btc_metrics
def log_block_smart(self, symbol, regime, rsi, signal, reason_code, reason_msg):
now = time.time()
should_log = False
with self.lock:
if symbol not in self.last_block_msg:
self.last_block_msg[symbol] = {}
if now - self.last_block_msg[symbol].get(reason_code, 0) > 300:
self.last_block_msg[symbol][reason_code] = now
should_log = True
if should_log:
BotLogger.log(f"👀 [TRACER] {symbol} ({regime}|RSI:{rsi:.1f}) {signal} ➡️ ⛔ BLOQUEADO: {reason_msg}", Config.C_YELLOW)
state = BotState()
class Exchange:
def __init__(self):
self.session = HTTP(testnet=Config.TESTNET, api_key=Config.API_KEY, api_secret=Config.API_SECRET, timeout=5)
self._load_filters()
def _load_filters(self):
try:
cursor = ""
while True:
r = self.session.get_instruments_info(category="linear", limit=1000, cursor=cursor)
for i in r['result']['list']:
if i['symbol'].endswith('USDT'):
state.filters[i['symbol']] = {'qty_step': i['lotSizeFilter']['qtyStep'], 'tick': i['priceFilter']['tickSize'], 'min_qty': i['lotSizeFilter']['minOrderQty']}
cursor = r['result'].get('nextPageCursor', '')
if not cursor: break
time.sleep(0.1)
except: pass
def sync_initial_state(self):
try:
r = self.session.get_wallet_balance(accountType="UNIFIED", coin="USDT")
Config.TOTAL_EQUITY = safe_float(r['result']['list'][0]['totalEquity'])
r = self.session.get_positions(category="linear", settleCoin="USDT")
for p in r['result']['list']:
if safe_float(p['size']) > 0: state.update_position(p['symbol'], p)
except: pass
def get_ticker_info(self, symbol):
try:
r = self.session.get_tickers(category="linear", symbol=symbol)
info = r['result']['list'][0]
funding = safe_float(info.get('fundingRate'))
bid = safe_float(info.get('bid1Price'))
ask = safe_float(info.get('ask1Price'))
return funding, bid, ask
except Exception as e:
return 0.0, 0.0, 0.0
def check_closed_pnl(self, is_boot=False):
try:
r = self.session.get_closed_pnl(category="linear", limit=20)
if 'result' in r and 'list' in r['result']:
for item in r['result']['list']:
order_id = item['orderId']
pnl = safe_float(item['closedPnl'])
if is_boot:
state.processed_pnl_ids.append(order_id)
continue
if state.update_closed_pnl(order_id, pnl):
sym = item['symbol']
perf_engine.update_trade(pnl)
reason = "EXCHANGE_SL_TP"
with state.lock:
if sym in state.exit_reasons: reason = state.exit_reasons.pop(sym)
state.force_release(sym)
color = Config.C_GREEN if pnl >= 0 else Config.C_RED
BotLogger.log(f"💸 CLOSED {sym}: ${pnl:.2f} | Motivo: {reason}", color)
TelegramBot.send(f"{'🟢' if pnl >= 0 else '🔴'} <b>EXIT: {sym}</b>\nPnL: ${pnl:.2f}\nMotivo: {reason}")
AnalysisLogger.log_exit(sym, pnl, reason)
except: pass
def set_leverage(self):
lev = str(Config.LEVERAGE)
valid_coins = [s for s in set(Config.ACTIVE_WHITELIST) if s in state.filters]
for sym in valid_coins:
try: self.session.set_leverage(category="linear", symbol=sym, buy_leverage=lev, sell_leverage=lev)
except: pass
time.sleep(0.2)
def place_market_order(self, symbol, side, qty, sl, tp):
for attempt in range(3):
try:
r = self.session.place_order(category="linear", symbol=symbol, side=side, orderType="Market", qty=qty, stopLoss=sl, takeProfit=tp, positionIdx=0)
if r.get('retCode') == 0 or attempt == 2: return r
time.sleep(1)
except Exception as e:
if attempt == 2: return {'retCode': -1, 'retMsg': str(e)}
time.sleep(1)
def place_sniper_limit_order(self, symbol, side, qty, sl, tp, bid, ask):
"""
Execução HFT: Ordem Limit Agressiva com Time-In-Force = IOC.
Protege o capital de slippages infinitos durante quedas de liquidez.
"""
f = state.filters.get(symbol, {})
tick_size = safe_float(f.get('tick', '0.0001'))
# Cruza o spread de forma controlada (Margem de 2 ticks)
if side == "Buy":
limit_price = ask + (tick_size * 2)
else:
limit_price = bid - (tick_size * 2)
limit_price_str = self.calculate_precision(symbol, limit_price)
for attempt in range(3):
try:
r = self.session.place_order(
category="linear",
symbol=symbol,
side=side,
orderType="Limit",
qty=qty,
price=limit_price_str,
timeInForce="IOC",
stopLoss=sl,
takeProfit=tp,
positionIdx=0
)
if r.get('retCode') == 0 or attempt == 2:
return r
time.sleep(0.1) # Retries ultrarrápidos para o HFT Sniper
except Exception as e:
if attempt == 2:
return {'retCode': -1, 'retMsg': str(e)}
time.sleep(0.1)
def set_trading_stop_retry(self, symbol, side, sl_str):
for attempt in range(3):
try:
r = self.session.set_trading_stop(category="linear", symbol=symbol, side=side, stopLoss=sl_str, closeOnTrigger=False, positionIdx=0)
if r.get('retCode') == 0 or attempt == 2: return r
time.sleep(1)
except Exception as e:
if attempt == 2: return {'retCode': -1, 'retMsg': str(e)}
time.sleep(1)
def calculate_precision(self, symbol, val, is_qty=False):
f = state.filters.get(symbol)
if not f: return "{:f}".format(Decimal(str(val)))
step = Decimal(f['qty_step'] if is_qty else f['tick'])
try:
d_val = Decimal(str(val))
return "{:f}".format(d_val.quantize(step, rounding=ROUND_DOWN))
except: return "{:f}".format(Decimal(str(val)))
exc = Exchange()
class LogicEngine:
entry_executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
@staticmethod
def calculate_robust_metrics(prices, symbol):
try:
full_y = np.array(prices); n = len(full_y)
if n < 15: return 0.0, 0.0, 0.0, 0.0
recent_mean = np.mean(full_y[-15:])
volatility = np.std(full_y[-15:]) / recent_mean if recent_mean > 0 else 0.0
window_size = 10 if volatility > 0.008 else 20
if n < window_size: window_size = n
y = full_y[-window_size:]; x = np.arange(len(y))
weights = np.linspace(1, 2, len(y))
slope, intercept = np.polyfit(x, y, 1, w=weights)
y_pred = slope * x + intercept
res = y - y_pred; ss_res = np.sum(res**2); ss_tot = np.sum((y - np.mean(y))**2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0
if (np.std(y) / y[-1]) < 0.001: r_squared *= 0.5
if abs((y[-1] - y[0]) / y[0]) < Config.MIN_MOVE_PCT: r_squared = min(r_squared, 0.55)
std_dev = np.std(y); slope_std = slope / std_dev if std_dev > 0 else 0.0
half = len(y) // 2
s_new, _ = np.polyfit(x[half:], y[half:], 1); s_old, _ = np.polyfit(x[:half], y[:half], 1)
accel = (s_new - s_old) / y[-1]
eff = abs(y[-1] - y[0]) / np.sum(np.abs(np.diff(y))) if np.sum(np.abs(np.diff(y))) > 0 else 0.0
with state.lock:
mem = state.predictor_memory.get(symbol, {'slope': 0.0, 'accel': 0.0})
slope_std = (slope_std * 0.7) + (mem['slope'] * 0.3)
accel = (accel * 0.7) + (mem['accel'] * 0.3)
state.predictor_memory[symbol] = {'slope': slope_std, 'accel': accel}
return slope_std, r_squared, accel, eff
except: return 0.0, 0.0, 0.0, 0.0
@staticmethod
def process_tick(symbol, current_price, is_confirmed):
if symbol == "BTCUSDT":
now = time.time()
if now - state.last_entry_check.get("BTCUSDT", 0) > 5.0:
state.last_entry_check["BTCUSDT"] = now
df_btc = state.get_df_cached("BTCUSDT")
if not df_btc.empty and len(df_btc) > 20:
s, _, a, _ = LogicEngine.calculate_robust_metrics(df_btc['c'].tail(30).values, "BTCUSDT")
state.update_btc_metrics(slope_std=s, accel=a)
return
pos_data = None
with state.lock: pos_data = state.positions.get(symbol)
if pos_data:
LogicEngine._check_trailing_stop(symbol, current_price, pos_data, is_confirmed)
else:
if state.can_analyze(symbol):
LogicEngine.entry_executor.submit(LogicEngine._analyze_entry_wrapper, symbol)
@staticmethod
def _check_trailing_stop(symbol, cp, p, is_confirmed):
try:
pos = state.positions.get(symbol)
if not pos or safe_float(pos.get("size", 0)) <= 0: return
entry = safe_float(p.get('avgPrice'))
current_sl = safe_float(p.get('stopLoss', 0))
if entry == 0 or cp <= 0: return
side = p.get('side', '').capitalize()
side_f = 1 if side == "Buy" else -1
with localcontext(decimal_ctx):
d_cp = safe_decimal(cp)
d_entry = safe_decimal(entry)
d_csl = safe_decimal(current_sl)
roi = float((d_cp - d_entry) / d_entry) * side_f
df = state.get_df_cached(symbol)
atr_pct = Decimal("0.01")
slope_std = 0.0; r_sq = 0.0; accel = 0.0; eff = 0.0
if not df.empty and len(df) > 15:
slope_std, r_sq, accel, eff = LogicEngine.calculate_robust_metrics(df['c'].tail(30).values, symbol)
if side == 'Sell':
slope_std = -slope_std
accel = -accel
atr_val = ta.volatility.average_true_range(df['h'].tail(30), df['l'].tail(30), df['c'].tail(30), window=14).iloc[-1]
if not pd.isna(atr_val) and atr_val > 0:
atr_pct = Decimal(str(atr_val / float(cp)))
last_entry_time = state.last_symbol_entry.get(symbol, 0)
if last_entry_time > 0:
time_in_trade = time.time() - last_entry_time
# O Slope_std já foi invertido acima se for Sell.
# Se for > 0.1, significa que o momentum está ganhando força na direção do nosso trade.
momentum_a_favor = slope_std > 0.1
# Limite Absoluto (15 minutos = 900s)
if time_in_trade > (Config.TIME_STOP_SEC + Config.EXTENSION_TIME_SEC):
if roi < Config.BREAKEVEN_TRIGGER:
r = exc.place_market_order(symbol, "Sell" if side == "Buy" else "Buy", str(p['size']), None, None)
if r and r.get('retCode') == 0:
with state.lock: state.exit_reasons[symbol] = "TIME_LIMIT_HARD"
state.force_release(symbol)
BotLogger.log(f"⏰ LIMITE ABSOLUTO (15m) ESGOTADO EM {symbol}. Fechando.", Config.C_YELLOW)
return
# Time Stop Normal (10 minutos = 600s)
elif time_in_trade > Config.TIME_STOP_SEC:
if roi < Config.BREAKEVEN_TRIGGER:
if momentum_a_favor:
# O robô vê a regressão linear apontando pro lado certo e poupa a vida do trade!
pass
else:
r = exc.place_market_order(symbol, "Sell" if side == "Buy" else "Buy", str(p['size']), None, None)
if r and r.get('retCode') == 0:
with state.lock: state.exit_reasons[symbol] = "TIME_STOP"
state.force_release(symbol)
BotLogger.log(f"⏱️ TIME STOP (10m) EM {symbol}: Sem momentum para ajudar. Fechando.", Config.C_YELLOW)
return
btc_stats = state.get_btc_metrics()
is_euphoria = btc_stats['slope_std'] > Config.BTC_EUPHORIA_THRESHOLD
trigger = Config.BREAKEVEN_TRIGGER
if is_euphoria:
trigger *= Config.EUPHORIA_TRAIL_DELAY
if roi < trigger: return
d_guard = state.get_trailing_guard(symbol)
if side == 'Buy' and d_csl > d_guard: d_guard = d_csl
if side == 'Sell' and d_csl > 0 and (d_guard == 0 or d_csl < d_guard): d_guard = d_csl
base_trail = Decimal(str(Config.TRAIL_DISTANCE))
trail_dist = max(base_trail, atr_pct * Decimal("0.4"))
if is_euphoria:
trail_dist = max(trail_dist, Decimal("0.0035"))
# =================================================================
# [NOVO] Dynamic Exit (OFI Trailing Tightener)
# Aperta o SL preditivamente se o fluxo institucional for forte contra a posição
# =================================================================
smoothed_ofi = 0.0
liq_proxy = 0.0
with state.lock:
if hasattr(state, 'ofi_sum') and symbol in state.ofi_sum:
smoothed_ofi = state.ofi_sum[symbol]
liq_proxy = state.liquidity_proxy.get(symbol, 0.0)
if liq_proxy > 0:
ofi_exit_thresh = liq_proxy * Config.OFI_EXIT_MULTIPLIER
if side == 'Buy' and smoothed_ofi < -ofi_exit_thresh:
trail_dist *= Decimal(str(Config.OFI_TRAIL_TIGHTEN))
elif side == 'Sell' and smoothed_ofi > ofi_exit_thresh:
trail_dist *= Decimal(str(Config.OFI_TRAIL_TIGHTEN))
# =================================================================
min_lock = max(Decimal("0.0010"), atr_pct * Decimal("0.3"))
if side == 'Buy':
best_new_sl = d_cp * (1 - trail_dist)
min_breakeven = d_entry * (1 + min_lock)
if best_new_sl < min_breakeven:
best_new_sl = min_breakeven
else:
best_new_sl = d_cp * (1 + trail_dist)
min_breakeven = d_entry * (1 - min_lock)
if best_new_sl > min_breakeven:
best_new_sl = min_breakeven
if (side == 'Buy' and d_guard > 0 and best_new_sl <= d_guard) or (side == 'Sell' and d_guard > 0 and best_new_sl >= d_guard): return
safety_margin = d_cp * Decimal("0.0005")
limit_price = d_cp - safety_margin if side == 'Buy' else d_cp + safety_margin
if (side == 'Buy' and best_new_sl >= limit_price) or (side == 'Sell' and best_new_sl <= limit_price):
best_new_sl = limit_price
sl_str = exc.calculate_precision(symbol, best_new_sl)
d_new_sl = safe_decimal(sl_str)
tick_size = safe_decimal(state.filters.get(symbol, {}).get('tick', '0.0001'))
ref_sl = d_guard if d_guard > 0 else d_csl
if ref_sl != 0 and abs(d_new_sl - ref_sl) < (tick_size * Decimal('2')): return
if not state.can_update_trailing(symbol): return
BotLogger.log(f"🧲 TRAIL MAGNÉTICO {symbol}: Lucro {roi*100:.2f}% → Enviando Guard: {sl_str}", Config.C_CYAN)
r = exc.set_trading_stop_retry(symbol, side, sl_str)
if r and r.get('retCode') == 0:
state.mark_trailing_update(symbol)
state.update_trailing_guard(symbol, d_new_sl)
BotLogger.log(f"✅ TRAIL CONFIRMADO {symbol}: API Bybit aceitou o Stop em {sl_str}", Config.C_GREEN)
else:
msg_err = r.get('retMsg') if r else 'Falha Desconhecida'
BotLogger.log(f"⚠️ API REJEITOU Trail de {symbol}: {msg_err}", Config.C_RED)
except Exception as e:
BotLogger.log(f"💥 ERRO GRAVE NO TRAILING STOP ({symbol}): {e}", Config.C_RED)
@staticmethod
def _analyze_entry_wrapper(symbol):
try: LogicEngine._analyze_entry(symbol)
except Exception as e:
BotLogger.log(f"💥 ERRO CRÍTICO NO CÁLCULO ({symbol}): {e}", Config.C_RED)
traceback.print_exc()
finally: state.finish_analysis(symbol)
@staticmethod
def _analyze_entry(symbol):
if state.get_slot_count() >= Config.MAX_POSITIONS: return
df = state.get_df_cached(symbol)
if df.empty or len(df) < 205: return
df_tail = df.tail(200).copy()
current_price = df_tail['c'].iloc[-1]
rsi = ta.momentum.rsi(df_tail['c'], window=Config.STRATEGY["RSI_PERIOD"]).iloc[-1]
if pd.isna(rsi): return
adx_ind = ta.trend.ADXIndicator(high=df_tail['h'], low=df_tail['l'], close=df_tail['c'], window=14, fillna=False)
adx = adx_ind.adx().iloc[-1]
if pd.isna(adx): return
ema_fast = ta.trend.ema_indicator(df_tail['c'], window=Config.STRATEGY["EMA_FAST"]).iloc[-1]
ema_slow = ta.trend.ema_indicator(df_tail['c'], window=Config.STRATEGY["EMA_SLOW"]).iloc[-1]
regime = "SIDEWAYS"
if adx > 25:
if ema_fast > ema_slow: regime = "BULL"
elif ema_fast < ema_slow: regime = "BEAR"
# Salva a Análise Técnica no cache para o Radar imprimir sem gastar CPU
with state.lock:
state.latest_ta[symbol] = {'rsi': rsi, 'regime': regime}
is_interesting = False
if regime == "SIDEWAYS" and (rsi <= Config.STRATEGY["RSI_BUY"] or rsi >= Config.STRATEGY["RSI_SELL"]):
is_interesting = True
elif regime == "BULL" and rsi <= 40:
is_interesting = True
elif regime == "BEAR" and rsi >= 60:
is_interesting = True
if not is_interesting: return
bb = ta.volatility.BollingerBands(close=df_tail['c'], window=Config.STRATEGY["BB_PERIOD"], window_dev=Config.STRATEGY["BB_STD"])
bb_lower = bb.bollinger_lband().iloc[-1]
bb_upper = bb.bollinger_hband().iloc[-1]
bb_mid = bb.bollinger_mavg().iloc[-1]
atr_val = ta.volatility.average_true_range(df_tail['h'], df_tail['l'], df_tail['c'], window=14).iloc[-1]
if pd.isna(atr_val) or atr_val == 0:
atr_val = current_price * 0.01
signal = None
tp_price_calculated = None
if regime == "SIDEWAYS":
if adx < 15:
state.log_block_smart(symbol, regime, rsi, "N/A", "LOW_ADX", f"ADX muito baixo ({adx:.1f} - Pântano)")
return
if current_price < bb_lower and rsi <= Config.STRATEGY["RSI_BUY"]:
signal = "Buy"
tp_price_calculated = bb_mid
elif current_price > bb_upper and rsi >= Config.STRATEGY["RSI_SELL"]:
signal = "Sell"
tp_price_calculated = bb_mid
elif regime == "BULL":
if current_price < ema_fast and rsi <= 42:
signal = "Buy"
tp_price_calculated = current_price * (1 + Config.TP_PCT)
elif regime == "BEAR":
if current_price > ema_fast and rsi >= 58:
signal = "Sell"
tp_price_calculated = current_price * (1 - Config.TP_PCT)
if not signal: return
if regime == "BULL" and signal == "Sell":
state.log_block_smart(symbol, regime, rsi, signal, "CONTRA_BULL", "Proibido Short em Bull")
return
if regime == "BEAR" and signal == "Buy":
state.log_block_smart(symbol, regime, rsi, signal, "CONTRA_BEAR", "Proibido Long em Bear")
return
if regime == "SIDEWAYS":
roi_to_tp = abs(tp_price_calculated - current_price) / current_price
if roi_to_tp < 0.007:
tp_price_calculated = current_price * (1 + 0.007) if signal == "Buy" else current_price * (1 - 0.007)
if (atr_val / current_price) < Config.MIN_VOLATILITY_PCT:
state.log_block_smart(symbol, regime, rsi, signal, "DEAD_MARKET", f"Mercado Morto (ATR < {Config.MIN_VOLATILITY_PCT*100:.2f}%)")
return
if not state.reserve_slot(symbol): return
funding, bid, ask = exc.get_ticker_info(symbol)
if bid > 0 and ask > 0:
mid = (ask + bid) / 2
spread_pct = (ask - bid) / mid
if spread_pct > Config.MAX_SPREAD_PCT:
state.log_block_smart(symbol, regime, rsi, signal, "HIGH_SPREAD", f"Spread alto demais ({spread_pct*100:.3f}%)")
state.release_slot(symbol)
return
if funding > Config.MAX_FUNDING_RATE:
state.log_block_smart(symbol, regime, rsi, signal, "HIGH_FUNDING", f"Funding Positivo Alto ({funding:.6f})")
state.release_slot(symbol)
return
# =================================================================
# [NOVO] Filtro HFT de Microestrutura Dinâmico: Threshold Normalizado
# =================================================================
smoothed_ofi = 0.0
liq_proxy = 0.0
with state.lock:
if hasattr(state, 'ofi_sum') and symbol in state.ofi_sum:
smoothed_ofi = state.ofi_sum[symbol]
liq_proxy = state.liquidity_proxy.get(symbol, 0.0)
# Bloqueia apenas se o fluxo contra for relevante (ex: > 35% do book topo)
if liq_proxy > 0:
ofi_threshold = liq_proxy * Config.OFI_ENTRY_MULTIPLIER
if signal == "Buy" and smoothed_ofi < -ofi_threshold:
state.log_block_smart(symbol, regime, rsi, signal, "OFI_NEGATIVO", f"OFI Contra (OFI: {smoothed_ofi:.2f} < Thresh: {-ofi_threshold:.2f})")
state.release_slot(symbol)
return
if signal == "Sell" and smoothed_ofi > ofi_threshold:
state.log_block_smart(symbol, regime, rsi, signal, "OFI_POSITIVO", f"OFI Contra (OFI: {smoothed_ofi:.2f} > Thresh: {ofi_threshold:.2f})")
state.release_slot(symbol)
return
# =================================================================
btc_stats = state.get_btc_metrics()
is_euphoria = btc_stats['slope_std'] > Config.BTC_EUPHORIA_THRESHOLD
with localcontext(decimal_ctx):
sl_dynamic = (atr_val * 1.5) / current_price
sl_pct = max(Config.MIN_SL_PCT, sl_dynamic)
adj_risk = Config.BASE_RISK_PER_TRADE
if is_euphoria:
adj_risk *= Config.EUPHORIA_RISK_MULT
size = (Config.TOTAL_EQUITY * adj_risk) / sl_pct
size = min(size, Config.TOTAL_EQUITY * Config.LEVERAGE * 0.95)
raw_qty = safe_decimal(size) / safe_decimal(current_price)
qty = exc.calculate_precision(symbol, raw_qty, is_qty=True)
if qty == "0" or float(qty) * current_price < 5.0:
state.log_block_smart(symbol, regime, rsi, signal, "MIN_ORDER", f"Ordem Menor que mínimo Bybit (SL: {sl_pct*100:.2f}%)")
state.release_slot(symbol); return
sl_dist = safe_decimal(current_price) * safe_decimal(sl_pct)
raw_sl = safe_decimal(current_price) - sl_dist if signal == "Buy" else safe_decimal(current_price) + sl_dist
sl = exc.calculate_precision(symbol, raw_sl, is_qty=False)
tp = exc.calculate_precision(symbol, tp_price_calculated, is_qty=False)
r = exc.place_sniper_limit_order(symbol, signal, qty, sl, tp, bid, ask)
if r['retCode'] == 0:
state.last_symbol_entry[symbol] = time.time()
TelegramBot.send(f"🚀 <b>ENTRY: {symbol}</b> ({signal})\nRegime: {regime}\nFunding: {funding:.6f}")
BotLogger.log(f"✅ SUCESSO! ORDEM ABERTA: {symbol} | Regime: {regime} | Funding: {funding:.6f}", Config.C_GREEN)
AnalysisLogger.log_entry(symbol, signal, current_price, float(qty)*current_price, rsi, f"Reg:{regime}|ATR:{atr_val/current_price:.4f}", funding)
else:
BotLogger.log(f"❌ ERRO API BYBIT ({symbol}): {r.get('retMsg')}", Config.C_RED)
state.release_slot(symbol)
class WebSocketManager:
def __init__(self):
self.ws_public = WebSocket(testnet=Config.TESTNET, channel_type="linear")
self.ws_private = WebSocket(testnet=Config.TESTNET, channel_type="private", api_key=Config.API_KEY, api_secret=Config.API_SECRET)
self.msg_queue = queue.Queue(maxsize=5000); self.running = True; self.last_msg_time = time.time()
self.consumer_thread = threading.Thread(target=self._consumer_worker, daemon=True); self.consumer_thread.start()
def start(self):
self.ws_private.position_stream(callback=self.on_private_msg)
valid_coins = [s for s in set(Config.ACTIVE_WHITELIST + ["BTCUSDT"]) if s in state.filters]
for i in range(0, len(valid_coins), 10):
for s in valid_coins[i:i+10]:
try:
self.ws_public.kline_stream(interval=Config.ENTRY_INTERVAL, symbol=s, callback=self.on_kline_msg)
self.ws_public.orderbook_stream(depth=1, symbol=s, callback=self.on_orderbook_msg)
except: pass
time.sleep(0.1)
def on_private_msg(self, msg):
if msg.get('data'):
for p in msg['data']: state.update_position(p.get('symbol'), p)
def on_kline_msg(self, msg):
try:
self.msg_queue.put_nowait(msg)
self.last_msg_time = time.time()
except: pass
def on_orderbook_msg(self, msg):
try:
self.msg_queue.put_nowait(msg)
self.last_msg_time = time.time()
except: pass
def _consumer_worker(self):
while self.running:
try:
msg = self.msg_queue.get(timeout=1)
topic = msg.get('topic', '')
if 'kline' in topic:
sym = topic.split('.')[-1]; d = msg.get('data', [])[0]
c = {'ts': int(d['start']), 'c': float(d['close']), 'h': float(d['high']), 'l': float(d['low']), 'o': float(d['open'])}
is_conf = d.get('confirm', False)
state.update_kline(sym, c, is_conf)
LogicEngine.process_tick(sym, c['c'], is_conf)
elif 'orderbook' in topic:
sym = topic.split('.')[-1]
d = msg.get('data', {})
bids = d.get('b', [])
asks = d.get('a', [])
if bids and asks:
bp, bs = float(bids[0][0]), float(bids[0][1])
ap, ask_s = float(asks[0][0]), float(asks[0][1])
state.update_orderbook_and_calc_ofi(sym, bp, bs, ap, ask_s)
except: pass
def graceful_shutdown():
BotLogger.log("🛑 Shutdown...", Config.C_RED); log_manager.shutdown(); sys.exit(0)
def main():
AnalysisLogger.init(); exc.set_leverage(); exc.sync_initial_state()
TelegramBot.send(f"🦅 <b>GHOST V98.1 HFT THE OMNISCIENT - ONLINE</b>\n💰 Saldo Inicial: ${Config.TOTAL_EQUITY:.2f}")
BotLogger.log("🦅 GHOST V98.1 HFT THE OMNISCIENT - ONLINE", Config.C_GREEN)
time.sleep(1.0)
BotLogger.log("📥 Baixando histórico da Bybit (Pode demorar uns 20 seg)...", Config.C_CYAN)
load_list = [s for s in list(set(Config.ACTIVE_WHITELIST + ["BTCUSDT"])) if s in state.filters]
for sym in load_list:
sucesso = False
tentativas = 0
while not sucesso and tentativas < 3:
try:
r = exc.session.get_kline(category="linear", symbol=sym, interval=Config.ENTRY_INTERVAL, limit=200)
if r['retCode'] == 0:
for k in reversed(r['result']['list']):
state.update_kline(sym, {
'ts': int(k[0]),
'o': float(k[1]),
'h': float(k[2]),
'l': float(k[3]),
'c': float(k[4])
}, True)
sucesso = True
else:
time.sleep(1)
except:
time.sleep(1)
tentativas += 1
time.sleep(0.2)
df_teste = state.get_df_cached("BTCUSDT")
if df_teste.empty:
BotLogger.log("❌ ERRO FATAL: Falha no download do histórico.", Config.C_RED)
sys.exit(1)
else:
BotLogger.log(f"✅ Histórico Carregado. Motores Ativados.", Config.C_GREEN)
ws_man = WebSocketManager(); ws_man.start()
exc.check_closed_pnl(is_boot=True); state.session_pnl = 0.0
last_pnl_check = 0; last_cleanup = 0; radar_time = 0; radar_msg = ""
last_sync_check = time.time(); last_heartbeat = time.time()
while True:
try:
time.sleep(1); now = time.time()
btc_rsi_str = "N/A"
btc_regime = "INIT"
df_btc = state.get_df_cached("BTCUSDT")
if not df_btc.empty and len(df_btc) > 200:
btc_rsi_val = ta.momentum.rsi(df_btc['c'], window=Config.STRATEGY["RSI_PERIOD"]).iloc[-1]
btc_rsi_str = f"{btc_rsi_val:.1f}"
btc_adx_ind = ta.trend.ADXIndicator(high=df_btc['h'], low=df_btc['l'], close=df_btc['c'], window=14, fillna=False)
btc_adx = btc_adx_ind.adx().iloc[-1]
btc_ema50 = ta.trend.ema_indicator(df_btc['c'], window=50).iloc[-1]
btc_ema200 = ta.trend.ema_indicator(df_btc['c'], window=200).iloc[-1]
if btc_adx < 25: btc_regime = "SIDEWAYS"
elif btc_ema50 > btc_ema200: btc_regime = "BULL"
else: btc_regime = "BEAR"
if now - last_heartbeat > 30:
last_heartbeat = now
slots_open = state.get_slot_count()
pnl = state.get_session_pnl()
BotLogger.log(f"💓 HEARTBEAT | Pos: {slots_open}/{Config.MAX_POSITIONS} | PnL: ${pnl:.2f} | BTC: {btc_regime} (RSI: {btc_rsi_str})", Config.C_CYAN)
if now - last_cleanup > 10: last_cleanup = now; state.cleanup_expired_slots()
if now - ws_man.last_msg_time > 60: graceful_shutdown()
if now - last_sync_check > 300:
last_sync_check = now
exc.sync_initial_state()
if now - last_pnl_check > 30:
last_pnl_check = now; exc.check_closed_pnl()
curr_pnl = state.get_session_pnl(); max_l = Config.TOTAL_EQUITY * Config.MAX_SESSION_LOSS_PCT
if curr_pnl < -max_l: graceful_shutdown()
# [NOVO] Radar Otimizado: Só roda a cada 5 segundos para aliviar o I/O
if now - radar_time > 5:
radar_time = now; best_coin = ""; best_val = 50.0
active = [s for s in Config.ACTIVE_WHITELIST if s not in state.positions]
with state.lock:
for s in active:
ta_data = state.latest_ta.get(s)
if ta_data:
rsi = ta_data['rsi']
s_reg = ta_data['regime']
is_best_buy = (rsi < 40 and rsi < best_val)
is_best_sell = (rsi > 60 and rsi > best_val)
if is_best_buy or is_best_sell:
best_val = rsi
if is_best_buy:
best_coin = f"📉 {s}({s_reg}|RSI:{rsi:.0f})"
else:
best_coin = f"📈 {s}({s_reg}|RSI:{rsi:.0f})"
slots_agora = state.get_slot_count()
radar_msg = f"| Pos: {slots_agora}/{Config.MAX_POSITIONS} | BTC Reg: {btc_regime} | BTC RSI: {btc_rsi_str} | {best_coin if best_coin else 'Sondando...'}"
stats_str = perf_engine.get_stats_str()
status_line = f"\r💓 {stats_str} | PnL: ${state.get_session_pnl():.2f} {radar_msg}"
sys.stdout.write(status_line.ljust(115)); sys.stdout.flush()
except KeyboardInterrupt: graceful_shutdown()
if __name__ == "__main__": main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment