Skip to content

Instantly share code, notes, and snippets.

@lemassykoi
Created August 26, 2024 13:28
Show Gist options
  • Save lemassykoi/0bea8d7f2c38fc58508b4c0078b432a1 to your computer and use it in GitHub Desktop.
Save lemassykoi/0bea8d7f2c38fc58508b4c0078b432a1 to your computer and use it in GitHub Desktop.
XTB FOREX Scalp Trading
print('\nStart\n')
import time # noqa: E402
start_time = time.perf_counter()
import os # noqa: E402
import html # noqa: E402
import datetime # noqa: E402
import asyncio # noqa: E402
import websockets # noqa: E402
import json # noqa: E402
import logging # noqa: E402
import requests # noqa: E402
import pandas as pd # noqa: E402
import talib as ta # noqa: E402
import plotly.express as px # noqa: E402
from halo import Halo # noqa: E402
stop_time = time.perf_counter()
elapsed_time = stop_time - start_time
# GLOBAL DEBUG
is_DEBUG = False
# WEBSOCKETS DEBUG
WS_DEBUG = False
DEMO = True
CURRENT_SYMBOL = 'EURUSD'
API_ENDPOINT = "wss://ws.xtb.com/demo" if DEMO else "wss://ws.xtb.com/real"
API_ENDPOINT_STREAM = "wss://ws.xtb.com/demoStream" if DEMO else "wss://ws.xtb.com/realStream"
LAST_ASK_PRICE = None
LAST_BID_PRICE = None
LAST_HIGH_PRICE = None
LAST_LOW_PRICE = None
LAST_SPREAD = None
SYMBOL_DIGITS = 0
SYMBOL_SHORT_NAME = None
SYMBOL_FIAT_CURRENCY = None
MARGIN_TRADE = None
CANDLE_PERIOD = 1
VOLUME_TO_BUY = 0.05
MINIMAL_VOLUME = 0
SYMBOL_MIN_STEPS = 0
MARGINFREE = 0
PIPS_TO_CLOSE = 0.2
# PAYLOADS
LOGIN_DATA = {
"command": "login",
"arguments": {
"userId": "" if DEMO else "",
"password": "" if DEMO else ""
}
}
PAYLOAD_GET_TRADING_HOURS = {
"command": "getTradingHours",
"arguments": {
"symbols": [CURRENT_SYMBOL]
}
}
PAYLOAD_GET_SYMBOL = {
"command": "getSymbol",
"arguments": {
"symbol": CURRENT_SYMBOL
}
}
PAYLOAD_GET_MARGIN_LEVEL = {
"command": "getMarginLevel"
}
PAYLOAD_GET_MARGIN_TRADE = {
"command": "getMarginTrade",
"arguments": {
"symbol": CURRENT_SYMBOL,
"volume": VOLUME_TO_BUY
}
}
# Initialisation du DataFrame global
DF = pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Limiter la taille du DataFrame à 3600 lignes
DF_MAX_LENGTH = 3600
## LOGGER LEVEL
class CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
F_LightGreen = "\x1b[92m"
F_LightBlue = "\x1b[94m"
underline = "\x1b[4m"
base_format = "%(asctime)s - %(message)s"
datefmt = '%d/%m/%Y | %H:%M:%S'
FORMATS = {
logging.DEBUG: underline + grey + base_format + reset,
logging.INFO: F_LightGreen + base_format + reset,
logging.WARNING: yellow + base_format + reset,
logging.ERROR: underline + red + base_format + reset,
logging.CRITICAL: bold_red + base_format + reset
}
def format(self, record):
# Obtenir la largeur du terminal
terminal_width = os.get_terminal_size().columns
# Créer les informations de fonction et ligne
func_info = f"[{record.funcName}:{record.lineno}]"
# Calculer l'espacement nécessaire
base_message_length = 20 + len(record.getMessage()) # 20 pour la longueur approximative de la date
padding = terminal_width - base_message_length - len(func_info) - 16 # Ajustement
# Assurer que l'espacement soit positif (sinon on met à zéro)
padding = max(padding, 0)
# Formater dynamiquement avec l'espacement calculé
adjusted_format = f"%(asctime)s - %(message)s{' ' * padding}{func_info} - %(levelname)s"
# Sélectionner le bon format selon le niveau de log
log_fmt = self.FORMATS.get(record.levelno)
# Appliquer le nouveau format avec l'espacement calculé
log_fmt = log_fmt.replace(self.base_format, adjusted_format)
# Créer et appliquer le format final en incluant `datefmt`
formatter = logging.Formatter(log_fmt, datefmt=self.datefmt)
return formatter.format(record)
# Configuration du logger
logger = logging.getLogger()
ch = logging.StreamHandler()
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
# Définir le niveau de log en fonction de DEBUG
if is_DEBUG:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
## LOGGER END
logging.error('** FOREX MODE **')
logging.info(f"Module loading duration : {elapsed_time:.2f} seconds")
def send_telegram_message(message:str, bot_token:str="xxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", chat_id:int=1234567890):
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message
}
response = requests.post(url, data=payload)
if response.status_code == 200:
logger.debug("Telegram : Message envoyé avec succès")
else:
logger.debug(f"Telegram : Erreur lors de l'envoi du message : {response.status_code} - {response.text}")
def get_timestamp_X_minutes_ago(minutes:int, period:int):
# Obtenir le temps actuel
current_time = datetime.datetime.now()
# Soustraire X minutes
time_X_minutes_ago = current_time - datetime.timedelta(minutes=(minutes*period))
# Convertir en timestamp (en millisecondes)
timestamp = int(time_X_minutes_ago.timestamp() * 1000)
return timestamp
def get_timestamp_X_minutes_after(minutes:int):
# Obtenir le temps actuel
current_time = datetime.datetime.now()
# Ajouter X minutes
time_X_minutes_after = current_time + datetime.timedelta(minutes=minutes)
# Convertir en timestamp (en millisecondes)
timestamp = int(time_X_minutes_after.timestamp() * 1000)
return timestamp
def get_last_candles(symbol=CURRENT_SYMBOL, period=CANDLE_PERIOD, start:int = 60):
logging.info(f'Get last candles for {start} periods of {period} minute(s)')
CHART_LAST_INFO_RECORD = {
"symbol": symbol,
"period": period,
"start": get_timestamp_X_minutes_ago(start, period),
}
PAYLOAD = {
"command": "getChartLastRequest",
"arguments": {
"info": CHART_LAST_INFO_RECORD
}
}
return PAYLOAD
def check_if_market_is_open(msg):
global MARKET_SESSIONS
# Vérifier si le marché est ouvert
logging.debug(f'Check if Market is Open for {CURRENT_SYMBOL}')
trading_hours = msg['trading']
MARKET_SESSIONS = {session['day']: (session['fromT'], session['toT']) for session in trading_hours}
IS_MARKET_OPEN = False
while not IS_MARKET_OPEN:
for session in trading_hours:
current_day = datetime.datetime.now(datetime.timezone.utc).isoweekday()
current_time_ms = int(time.time() * 100) % 86400000
if session['day'] == current_day and session['fromT'] <= current_time_ms <= session['toT']:
logging.info(f"Le marché pour {CURRENT_SYMBOL} est ouvert.")
IS_MARKET_OPEN = True
break
else:
IS_MARKET_OPEN = False
if not IS_MARKET_OPEN:
logging.error(f"Le marché pour {CURRENT_SYMBOL} est fermé. Retry dans 1mn")
asyncio.sleep(60)
def process_balance(message):
global BALANCE, MARGIN, EQUITY, MARGINLEVEL, MARGINFREE#, EQUITYFX
"""
{'balance': 100142.57, 'margin': 510.2, 'equityFX': 100121.82, 'equity': 100121.82, 'marginLevel': 19624.03, 'marginFree': 99611.62, 'credit': 0.0, 'stockValue': 0.0, 'stockLock': 0.0, 'cashStockValue': 0.0}
"""
if 'currency' in message:
logging.info('Initial Balance received')
BALANCE = message['balance'] ## balance in account currency
MARGIN = message['margin'] ## margin requirements
EQUITY = message['equity'] ## sum of balance and all profits in account currency
MARGINLEVEL = message['margin_level'] ## margin level percentage
MARGINFREE = message['margin_free'] ## free margin
logging.info(f"Balance Available : {MARGINFREE}")
else:
logging.debug('Stream Balance received')
BALANCE = message['balance'] ## balance in account currency
MARGIN = message['margin'] ## margin requirements
#EQUITYFX = message['equityFX'] ##
EQUITY = message['equity'] ## sum of balance and all profits in account currency
MARGINLEVEL = message['marginLevel'] ## margin level percentage
MARGINFREE = message['marginFree'] ## free margin
logging.debug(f"Balance Available : {MARGINFREE}")
def process_symbol_details(message):
global SYMBOL_SHORT_NAME, SYMBOL_FIAT_CURRENCY, SYMBOL_DIGITS, MINIMAL_VOLUME, SYMBOL_MIN_STEPS, CONTRACT_SIZE
"""
{'symbol': 'EURUSD', 'currency': 'EUR', 'categoryName': 'FX', 'currencyProfit': 'USD', 'quoteId': 10, 'quoteIdCross': 4, 'marginMode': 101, 'profitMode': 5, 'pipsPrecision': 4, 'contractSize': 100000, 'exemode': 1, 'time': 1724635548463, 'expiration': None, 'stopsLevel': 0, 'precision': 5, 'swapType': 3, 'stepRuleId': 5, 'type': 355, 'instantMaxVolume': 2147483647, 'groupName': 'Major', 'description': 'Euro to American Dollar', 'longOnly': False, 'trailingEnabled': True, 'marginHedgedStrong': False, 'swapEnable': True, 'percentage': 100.0, 'bid': 1.11973, 'ask': 1.11974, 'high': 1.12016, 'low': 1.11863, 'lotMin': 0.01, 'lotMax': 100.0, 'lotStep': 0.01, 'tickSize': 1e-05, 'tickValue': 1.0, 'swapLong': -0.006312, 'swapShort': 0.00298, 'leverage': 1.0, 'spreadRaw': 1e-05, 'spreadTable': 0.1, 'starting': None, 'swap_rollover3days': 0, 'marginMaintenance': 0, 'marginHedged': 0, 'initialMargin': 0, 'timeString': 'Mon Aug 26 03:25:48 CEST 2024', 'shortSelling': True, 'currencyPair': True}
"""
SYMBOL_SHORT_NAME = message['currency']
SYMBOL_FIAT_CURRENCY = message['currencyProfit']
SYMBOL_DIGITS = message['precision']
MINIMAL_VOLUME = message['lotMin']
SYMBOL_MIN_STEPS = message['lotStep']
PIPS_PRECISION = message['pipsPrecision']
CONTRACT_SIZE = 1 / ( 10 ** PIPS_PRECISION)
logging.info(f"Current Symbol : {CURRENT_SYMBOL} ({SYMBOL_SHORT_NAME}) in {SYMBOL_FIAT_CURRENCY}")
logging.info(f"Symbol Minimal Volume : {MINIMAL_VOLUME}")
logging.info(f"Vol must be multiple of : {SYMBOL_MIN_STEPS}")
logging.info(f"Symbol Digits Precision : {SYMBOL_DIGITS}")
logging.info(f"Contract Size : {CONTRACT_SIZE}")
logging.info(f"PIPS for TP : {PIPS_TO_CLOSE}")
time.sleep(2)
if message['categoryName'] == 'CRT':
logging.info('Market is : Crypto')
elif message['categoryName'] == 'FX':
logging.info('Market is : Forex')
else:
logging.error(f"Market Type is : {message['categoryName']}")
def process_trade_status(DATA):
if DATA['requestStatus'] == 3:
logging.info(f"Order n°{DATA['order']} Accepted @ {DATA['price']}")
if DATA['message'] is not None:
logging.error(DATA['message'])
elif DATA['requestStatus'] == 0:
logging.error(f"Order n°{DATA['order']} : ERROR")
elif DATA['requestStatus'] == 1:
logging.warning(f"Order n°{DATA['order']} Pending...")
elif DATA['requestStatus'] == 4:
logging.error(f"Order n°{DATA['order']} REJECTED")
def process_trade(DATA):
if DATA['closed']:
logging.warning(f"Position {DATA['position']} (Order {DATA['order']}) closed. Profit : {DATA['profit']}")
else:
if DATA['state'] == 'Deleted':
logging.warning(f"Order {DATA['order']} deleted")
else:
logging.error(DATA)
async def process_news(DATA):
send_telegram_message('XTB : News reçues !')
await asyncio.sleep(1)
send_telegram_message(html.escape(DATA['body']))
async def process_charts(candles):
logging.debug('Processing Charts')
digits = candles['digits']
if digits != SYMBOL_DIGITS:
logging.error('DIGITS Mismatch')
logging.info(f"Candle Lenght : {len(candles['rateInfos'])}")
if 1 < len(candles['rateInfos']) < 3:
logging.debug('Number of Candle > 1 and < 3 !')
await insert_charts_to_df([candles['rateInfos'][-1]], digits)
else:
await insert_charts_to_df(candles['rateInfos'], digits)
if is_DEBUG:
print(f"\n{DF.tail()}\n")
print(f"\n{DF.count()}\n")
async def insert_candle_to_df(candle):
global DF
# Convert datas to DF
new_row = pd.DataFrame([{
#'timestamp': candle['ctm'],
'timestamp': pd.to_datetime(candle['ctm'], unit='ms'),
'open': candle['open'],
'high': candle['high'],
'low': candle['low'],
'close': candle['close'],
'volume': candle['vol']
}])
DF = pd.concat([DF, new_row], ignore_index=True)
logging.info('Candle added to DF')
# Prune DF if too big
logging.debug(f'DF Lenght BEFORE pruning : {len(DF)}')
if len(DF) > DF_MAX_LENGTH:
n = len(DF) - DF_MAX_LENGTH
DF.drop(DF.head(n).index,inplace=True)
logging.info('DF Pruned !')
logging.debug(f'DF Lenght AFTER pruning : {len(DF)}')
# REFRESH MARGIN TRADE
await main_ws_client.send_message(PAYLOAD_GET_MARGIN_TRADE)
## Do Maths to update DF
calcul_and_insert_to_df()
logging.debug(DF.tail())
if test_BUY_indicators():
# CHECK MARGIN REQUIREMENT
if MARGINFREE > MARGIN_TRADE:
msg = (f"Margin Required : {MARGIN_TRADE} | Margin Free : {MARGINFREE}")
logging.info(msg)
# BUY NOW WITH TP AT MARKET PRICE
trade_payload = generate_trade_payload(
symbol_price = LAST_ASK_PRICE,
buy_tp = True,
volume = VOLUME_TO_BUY,
tp = round((LAST_ASK_PRICE + (PIPS_TO_CLOSE * CONTRACT_SIZE)), SYMBOL_DIGITS)
)
if trade_payload:
logging.debug(trade_payload)
await main_ws_client.send_message(trade_payload)
else:
logging.error('Generate Trade Payload returned False')
# BUY LIMIT WITH TP AT MY PRICE ( which IS BID PRICE)
trade_payload = generate_trade_payload(
symbol_price = LAST_BID_PRICE,
buy_limit = True,
volume = VOLUME_TO_BUY,
tp = round((LAST_BID_PRICE + (PIPS_TO_CLOSE * CONTRACT_SIZE)), SYMBOL_DIGITS)
)
if trade_payload:
logging.debug(trade_payload)
await main_ws_client.send_message(trade_payload)
else:
logging.error('Generate Trade Payload returned False')
else:
msg = (f"Insufficient Margin Free : Have {MARGINFREE} but need {MARGIN_TRADE}")
logging.error(msg)
send_telegram_message(msg)
async def insert_charts_to_df(msg, digits):
global DF
for candle in msg:
# Calculs
open_price = int(candle['open']) * (10**(-digits))
close_price = round(open_price + (candle['close'] * (10**(-digits))), digits)
high_price = round(open_price + (candle['high'] * (10**(-digits))), digits)
low_price = round(open_price + (candle['low'] * (10**(-digits))), digits)
# Convert datas to DF
new_row = pd.DataFrame([{
#'timestamp': candle['ctm'],
'timestamp': pd.to_datetime(candle['ctm'], unit='ms'),
'open': open_price,
'high': high_price,
'low': low_price,
'close': close_price,
'volume': candle['vol']
}])
# Append to existing DF unless empty
if DF.empty:
DF = new_row
else:
DF = pd.concat([DF, new_row], ignore_index=True)
logging.info('Charts added to DF')
## Do Maths to update DF
calcul_and_insert_to_df()
if test_BUY_indicators():
# BUY NOW WITH TP
trade_payload = generate_trade_payload(
buy_tp = True,
volume = VOLUME_TO_BUY
)
if trade_payload:
await main_ws_client.send_message(trade_payload)
else:
logging.error('Generate Trade Payload returned False')
def calcul_and_insert_to_df():
global DF
logging.debug('Calculate BUY indicators')
DF['RSI'] = ta.RSI(DF['close'], timeperiod=14)
DF['SMA'] = ta.SMA(DF['close'], timeperiod=14)
DF['EMA'] = ta.EMA(DF['close'], timeperiod=50)
DF['WMA'] = ta.WMA(DF['close'], timeperiod=50)
DF['upper_band'], DF['middle_band'], DF['lower_band'] = ta.BBANDS(DF['close'], matype=ta.MA_Type.T3)
DF['MACD'], DF['MACD_signal'], DF['MACD_hist'] = ta.MACD(DF['close'], fastperiod=12, slowperiod=26, signalperiod=6)
DF['Stochastic_K'], DF['Stochastic_D'] = ta.STOCH(DF['high'], DF['low'], DF['close'], fastk_period=14, slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0)
#DF['ATR'] = ta.ATR(DF['high'], DF['low'], DF['close'], timeperiod=18)
#DF['Spread'] = DF['high'] - DF['low'] # Spread
def test_BUY_indicators():
global DF
rsi_low_threshold = 50
# Generate trading signal based on conditions
buy_signal = (
(DF['RSI'].iloc[-1] < rsi_low_threshold) & # RSI condition
(DF['SMA'].iloc[-1] > DF['EMA'].iloc[-1]) & # SMA > EMA
(DF['close'].iloc[-1] > DF['upper_band'].iloc[-1]) # & # Close above upper Bollinger Band
#(DF['MACD'].iloc[-1] > macd_threshold) & # MACD condition
#(DF['ADX'].iloc[-1] > adx_threshold) # ADX condition
)
if buy_signal:
# Execute buy order or return a buy signal
logging.warning("** Buy Signal Found ! **")
return True
else:
logging.warning("No Buy Signal")
return False
async def process_message(message):
global LAST_BID_PRICE, LAST_ASK_PRICE, LAST_HIGH_PRICE, LAST_LOW_PRICE, LAST_SPREAD, MARGIN_TRADE
logging.debug('Process Message')
if 'returnData' in message:
logging.debug("ReturnData received!")
DATA = message['returnData']
logging.debug(f"ReturnData: {DATA}")
if 'symbol' and 'currency' and 'categoryName' in DATA:
logging.debug("Symbol details received")
process_symbol_details(message['returnData'])
elif 'balance' and 'currency' in DATA:
process_balance(DATA)
elif 'order' in DATA:
logging.info(f"New Order opened found : {message['returnData']['order']}")
elif 'rateInfos' in DATA:
logging.debug("Candles received")
await process_charts(message['returnData'])
elif 'margin' in DATA:
logging.debug("Margin Trade received")
if MARGIN_TRADE is None:
MARGIN_TRADE = DATA['margin']
logging.info(f"Marge engagée selon le volume : {MARGIN_TRADE}")
else:
MARGIN_TRADE = DATA['margin']
elif 'trading' in DATA[0]:
logging.debug("Trading Hours received")
check_if_market_is_open(message['returnData'][0])
else:
logging.error('unknown returnData')
elif 'command' in message:
logging.debug("COMMAND received!")
logging.debug(f"Command: {message['command']}")
DATA = message['data']
if 'balance' in message['command']:
process_balance(DATA)
elif 'news' in message['command']:
logging.debug(f"News : {DATA}")
await process_news(DATA)
elif 'keepAlive' in message['command']:
logging.debug(f"KeepAlive : {DATA['timestamp']}")
elif 'tickPrices' in message['command']:
logging.debug(f"Tick Prices : {DATA}")
LAST_ASK_PRICE = DATA['ask']
LAST_BID_PRICE = DATA['bid']
LAST_HIGH_PRICE = DATA['high']
LAST_LOW_PRICE = DATA['low']
LAST_SPREAD = DATA['spreadTable']
elif 'tradeStatus' in message['command']:
logging.debug('getTradeStatus')
logging.debug(DATA)
""" {'order': 650474591, 'requestStatus': 3, 'message': None, 'customComment': 'Auto BUY', 'price': 0.01} """
process_trade_status(DATA)
elif 'trade' in message['command']:
logging.debug('trade')
"""
{
"close_price": 1.3256,
"close_time": null,
"closed": false,
"cmd": 0,
"comment": "Web Trader",
"commission": 0.0,
"customComment": "Some text",
"digits": 4,
"expiration": null,
"margin_rate": 3.9149000,
"offset": 0,
"open_price": 1.4,
"open_time": 1272380927000,
"order": 7497776,
"order2": 1234567,
"position": 1234567,
"profit": 68.392,
"sl": 0.0,
"state": "Modified",
"storage": -4.46,
"symbol": "EURUSD",
"tp": 0.0,
"type": 0,
"volume": 0.10
}
"""
process_trade(DATA)
elif 'candle' in message['command']:
logging.debug('candle')
""" {'command': 'candle', 'data': {'ctm': 1724539140000, 'ctmString': 'Aug 25, 2024, 12:39:00 AM', 'open': 0.10198, 'close': 0.10199, 'high': 0.10201, 'low': 0.10198, 'vol': 252.0, 'quoteId': 10, 'symbol': 'STELLAR'}} """
await insert_candle_to_df(message['data'])
else:
logging.error(f"DATA: {message}")
elif 'status' in message:
if not message['status']:
logging.debug('status False')
logging.error(f"Error Code : {message['errorCode']}")
logging.error(f"Error Message : {message['errorDescr']}")
else:
logging.error('status True')
else:
logging.error(f"Other message: {message}")
async def process_queue_messages(ws):
""" Streaming messages are queued. This function pick messages from queue to send to function process_message. """
while True:
message = await ws.message_queue.get() # Récupérer un message depuis la Queue
await process_message(message)
ws.message_queue.task_done()
def generate_trade_payload(symbol_price:float, buy:bool = False, buy_tp:bool = False, sell:bool = False, buy_limit:bool = False, volume:int = MINIMAL_VOLUME, tp:float = 0.0, expiration:int = 0) -> str:
if LAST_BID_PRICE is not None and LAST_ASK_PRICE is not None:
if buy:
msg = f"Try to Place BUY Order @ {LAST_ASK_PRICE}"
logging.error(msg)
send_telegram_message(msg)
reason = "Auto BUY"
cmd = 0
order_type = 0
#symbol_price = LAST_ASK_PRICE
offset = 0
elif buy_tp:
msg = f"Try to Place BUY TP Order @ {LAST_ASK_PRICE} with TP @ {tp}"
logging.error(msg)
send_telegram_message(msg)
reason = "Auto BUY with TP"
cmd = 0
order_type = 0
symbol_price = LAST_ASK_PRICE
offset = 0
elif buy_limit:
cmd = 2
order_type = 0
offset = 0
expiration = get_timestamp_X_minutes_after(2)
if tp != 0.0:
reason = "Auto BUY LIMIT with TP"
msg = f"Place BUY LIMIT Order with TP @ {symbol_price} with TP @ {tp}"
logging.error(msg)
send_telegram_message(msg)
else:
reason = "Auto BUY LIMIT without TP"
msg = 'Place BUY LIMIT Order without TP'
logging.error(msg)
send_telegram_message(msg)
elif sell:
logging.error('Place SELL Order')
reason = "Auto SELL"
cmd = 1
order_type = 2
offset = 0
else:
logging.error('Undefined Order Type to place!')
return False
else:
logging.error("No Tick received yet! Can't place order")
return False
ORDER_PAYLOAD = {
"command": "tradeTransaction",
"arguments": {
"tradeTransInfo": {
"cmd": cmd,
"customComment": reason,
"expiration": expiration,
"offset": offset,
"order": 0,
"price": float(symbol_price),
"sl": 0.0,
"symbol": CURRENT_SYMBOL,
"tp": tp,
"type": order_type,
"volume": float(volume)
}
}
}
return ORDER_PAYLOAD
# Classe pour la connexion principale
class MainWebSocketClient:
def __init__(self, endpoint, login_data):
self.endpoint = endpoint
self.login_data = login_data
self.websocket = None
self.streamSessionId = None
async def connect(self):
self.websocket = await websockets.connect(self.endpoint)
await self.login()
asyncio.create_task(self.listen_messages())
async def login(self):
await self.websocket.send(json.dumps(self.login_data))
response = await self.websocket.recv()
response_data = json.loads(response)
if response_data.get("status"):
logging.info("Login successful")
self.streamSessionId = response_data.get("streamSessionId")
logging.debug(f"streamSessionId : {self.streamSessionId}")
else:
logging.error("Login failed")
async def listen_messages(self):
while True:
try:
async for message in self.websocket:
await process_message(json.loads(message))
except websockets.exceptions.ConnectionClosedError as e:
logging.error(f"Connection closed: {e}")
except Exception as e:
logging.debug(f"Error while receiving message: {e}")
async def send_message(self, message):
await self.websocket.send(json.dumps(message))
await asyncio.sleep(0.25)
async def close(self):
logging.error('Closing WS')
if self.websocket:
await self.websocket.close()
# Classe pour la connexion de streaming
class StreamWebSocketClient:
def __init__(self, stream_endpoint, streamSessionId):
self.stream_endpoint = stream_endpoint
self.streamSessionId = streamSessionId
self.websocket = None
self.message_queue = asyncio.Queue()
async def connect(self):
self.websocket = await websockets.connect(self.stream_endpoint)
# Lancer la tâche d'écoute des messages de streaming
asyncio.create_task(self.listen_stream())
async def send_stream_message(self, message):
await self.websocket.send(json.dumps(message))
await asyncio.sleep(0.25)
async def listen_stream(self):
while True:
try:
async for message in self.websocket:
logging.debug(f"Stream message received: {json.loads(message)}")
# Normal processing
#await process_message(json.loads(message))
# Queue processing
await self.message_queue.put(json.loads(message))
except websockets.exceptions.ConnectionClosedError as e:
logging.error(f"Stream connection closed: {e}")
async def subscribe_keepalive(self):
logging.debug('Subscribe to keepAlive')
PAYLOAD_KEEPALIVE = {
"command": "getKeepAlive",
"streamSessionId": self.streamSessionId
}
await self.send_stream_message(PAYLOAD_KEEPALIVE)
async def subscribe_news(self):
logging.debug('Subscribe to News')
PAYLOAD_NEWS = {
"command": "getNews",
"streamSessionId": self.streamSessionId
}
await self.send_stream_message(PAYLOAD_NEWS)
async def subscribe_balance(self):
logging.debug('Subscribe to Balance')
PAYLOAD_BALANCE = {
"command": "getBalance",
"streamSessionId": self.streamSessionId
}
await self.send_stream_message(PAYLOAD_BALANCE)
async def subscribe_trades(self):
logging.debug('Subscribe to Trades')
PAYLOAD_GET_TRADES = {
"command": "getTrades",
"streamSessionId": self.streamSessionId
}
await self.send_stream_message(PAYLOAD_GET_TRADES)
async def subscribe_trades_status(self):
logging.debug('Subscribe to Trades Status')
PAYLOAD_GET_TRADES_STATUS = {
"command": "getTradeStatus",
"streamSessionId": self.streamSessionId
}
await self.send_stream_message(PAYLOAD_GET_TRADES_STATUS)
async def subscribe_ticks(self):
logging.debug('Subscribe to Ticks')
PAYLOAD_GET_TICK_PRICES = {
"command": "getTickPrices",
"streamSessionId": self.streamSessionId,
"symbol": CURRENT_SYMBOL,
"minArrivalTime": 1,
"maxLevel": 0
}
await self.send_stream_message(PAYLOAD_GET_TICK_PRICES)
async def get_candles(self):
logging.debug('Subscribe to Candles')
PAYLOAD_GET_CANDLES = {
"command": "getCandles",
"streamSessionId": self.streamSessionId,
"symbol": CURRENT_SYMBOL
}
await self.send_stream_message(PAYLOAD_GET_CANDLES)
async def close(self):
logging.error('Closing Stream WS')
if self.websocket:
await self.websocket.close()
async def main():
global main_ws_client, stream_ws_client
# Connexion principale pour l'authentification et les commandes
main_ws_client = MainWebSocketClient(API_ENDPOINT, LOGIN_DATA)
await main_ws_client.connect()
# Connexion de streaming avec streamSessionId
stream_ws_client = StreamWebSocketClient(API_ENDPOINT_STREAM, main_ws_client.streamSessionId)
await stream_ws_client.connect()
# Lancer la tâche de traitement des messages depuis la Queue
asyncio.create_task(process_queue_messages(stream_ws_client))
# CHECK IF MARKET IS OPEN (GET TRADING HOURS)
await main_ws_client.send_message(PAYLOAD_GET_TRADING_HOURS)
# GET INITIAL BALANCE
await main_ws_client.send_message(PAYLOAD_GET_MARGIN_LEVEL)
# GET SYMBOL DETAILS
await main_ws_client.send_message(PAYLOAD_GET_SYMBOL)
# GET MARGIN TRADE
await main_ws_client.send_message(PAYLOAD_GET_MARGIN_TRADE)
# SUBSCRIBE TO KEEPALIVE
await stream_ws_client.subscribe_keepalive()
# Récupération des PREMIERES CANDLES
await main_ws_client.send_message(get_last_candles())
# SUBSCRIBE TO CANDLE (STREAM)
await stream_ws_client.get_candles()
# SUBSCRIBE TO NEWS
await stream_ws_client.subscribe_news()
# SUBSCRIBE TO BALANCE
await stream_ws_client.subscribe_balance()
# SUBSCRIBE TO TICKS
await stream_ws_client.subscribe_ticks()
# SUBSCRIBE TO TRADES
await stream_ws_client.subscribe_trades()
# SUBSCRIBE TO TRADES STATUS
await stream_ws_client.subscribe_trades_status()
try:
logging.info('Starting Market Check Loop')
while True:
current_time_ms = int(time.time() * 100) % 86400000 # Heure actuelle en ms dans la journée
current_time = datetime.datetime.now(datetime.timezone.utc) # Heure actuelle en UTC
current_hour_minute = current_time.hour * 100 + current_time.minute # Heure actuelle au format hhmm (ex : 14h30 -> 1430)
current_day = current_time.isoweekday() # Jour de la semaine (1 = lundi, 7 = dimanche)
# Plage horaire souhaitée (14h30 à 17h00, heure de Paris en été, équivaut à 12h30 à 15h00 UTC)
open_time = 1230 # Heure d'ouverture en hhmm (UTC)
close_time = 1500 # Heure de fermeture en hhmm (UTC)
if current_day in MARKET_SESSIONS:
open_time, close_time = MARKET_SESSIONS[current_day]
if open_time <= current_time_ms <= close_time:
logger.debug(f"Le marché pour {CURRENT_SYMBOL} est encore ouvert.")
# Vérifier si c'est un jour de semaine (1 à 5 = lundi à vendredi)
if 1 <= current_day <= 5:
# Vérifier si on est dans la plage horaire
if open_time <= current_hour_minute <= close_time:
logging.debug(f"Le marché pour {CURRENT_SYMBOL} est à une heure optimale.")
else:
# Calculer le temps jusqu'à la prochaine ouverture
if current_hour_minute < open_time:
time_until_open = (open_time - current_hour_minute) * 60 # Temps en secondes jusqu'à l'ouverture
else:
time_until_open = ((24 * 60) - current_hour_minute + open_time) * 60 # Temps en secondes jusqu'à l'ouverture du lendemain
logging.debug(f"Le marché pour {CURRENT_SYMBOL} n'est pas optimal. Attente de réouverture dans {int(time_until_open / 60)} minutes.")
time.sleep(time_until_open) # Dormir jusqu'à la prochaine ouverture
else:
# Si c'est un week-end, attendre jusqu'au lundi 12h30 UTC
logging.warning(f"Le marché pour {CURRENT_SYMBOL} est fermé pour le week-end. Vérification lundi prochain.")
#next_monday = (7 - current_day + 1) * 86400 # Temps en secondes jusqu'à lundi
#time_until_open = next_monday + (open_time * 60)
#time.sleep(time_until_open)
else:
# Si le jour n'a pas de session, attendre jusqu'au prochain jour ouvré
logger.error(f"Le marché pour {CURRENT_SYMBOL} est fermé pour aujourd'hui. Vérification demain.")
next_day = (current_day % 7) + 1 # Prochain jour
open_time = MARKET_SESSIONS.get(next_day, (0, 86400000))[0]
time_until_next_open = 86400000 - current_time_ms + open_time
time.sleep(time_until_next_open / 1000) # Dormir jusqu'à la réouverture le lendemain
# Si on est ici, c'est que le marché est ouvert, donc attendre une minute avant de re-vérifier
await asyncio.sleep(60)
except KeyboardInterrupt:
logging.error('End of Market Check Loop')
"""
# NORMAL BUY
await main_ws_client.send_message(
generate_trade_payload(
buy=True,
volume=1100
)
)
await asyncio.sleep(5)
# BUY NOW WITH TP
await main_ws_client.send_message(
generate_trade_payload(
buy_tp = True,
volume = 1000
)
)
await asyncio.sleep(5)
# BUY WITH LIMIT WITHOUT TP
await main_ws_client.send_message(
generate_trade_payload(
buy_limit = True,
volume = 1000,
symbol_price = 0.1016
)
)
await asyncio.sleep(5)
# BUY WITH LIMIT AND TP
await main_ws_client.send_message(
generate_trade_payload(
buy_limit = True,
volume = 1000,
symbol_price = 0.1016,
tp = 2.5 # percentage profit target for TP
)
)
"""
# END : CLOSE ALL
logging.error('End of sleep')
await stream_ws_client.close()
await asyncio.sleep(0.25)
await main_ws_client.close()
await asyncio.sleep(0.25)
normal_exit()
def wait_until_second(target_second):
logging.warning(f"Waiting for second {target_second}")
start_spinner = Halo(text='Waiting...')
start_spinner.start()
while True:
current_second = datetime.datetime.now().second
remaining_seconds = (target_second - current_second) % 60
start_spinner.text = f"Waiting... {remaining_seconds}s remaining"
if current_second == target_second:
start_spinner.succeed('Ok!')
break
time.sleep(0.2) # Attendre 200ms avant de vérifier à nouveau
def normal_exit() -> None:
print(DF.tail())
fig = px.line(DF, x = 'timestamp', y = 'close', title=f"{SYMBOL_SHORT_NAME} Close Prices - Last {CANDLE_PERIOD}mn Period")
fig.show()
time.sleep(2)
if __name__ == "__main__":
wait_until_second(55)
try:
asyncio.run(main(), debug=WS_DEBUG)
except KeyboardInterrupt:
logging.error('User Interrupt')
normal_exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment