Skip to content

Instantly share code, notes, and snippets.

@JSeam2
Created December 8, 2024 10:41
Show Gist options
  • Save JSeam2/e0867c832d7f13c7898c919bf69160a9 to your computer and use it in GitHub Desktop.
Save JSeam2/e0867c832d7f13c7898c919bf69160a9 to your computer and use it in GitHub Desktop.
Simple TWAP script with CCXT
import ccxt
import time
from datetime import datetime
import math
import logging
from typing import Dict, Optional, Literal
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Optionally add file handler to save logs
file_handler = logging.FileHandler(f'twap_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
def read_credentials():
"""Read API credentials from files"""
try:
with open('api_key', 'r') as f:
api_key = f.read().strip()
with open('api_secret', 'r') as f:
api_secret = f.read().strip()
return api_key, api_secret
except FileNotFoundError as e:
logger.error("Credential files not found. Please ensure 'api_key' and 'api_secret' files exist in the current directory.")
raise Exception("Credential files not found") from e
except Exception as e:
logger.error(f"Error reading credential files: {str(e)}")
raise
class TWAPExecutor:
def __init__(
self,
total_amount: float,
num_chunks: int,
interval_minutes: int,
side: Literal['buy', 'sell'] = 'sell',
symbol: str = 'STETH/USDT',
test_mode: bool = True
):
# Read credentials from files
api_key, api_secret = read_credentials()
# TODO: Initialize client, change client accordingly
self.exchange = ccxt.bybit({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'spot',
}
})
# Set testnet if in test mode
if test_mode:
self.exchange.set_sandbox_mode(True)
self.symbol = symbol
self.total_amount = float(total_amount)
self.num_chunks = int(num_chunks)
self.interval_minutes = int(interval_minutes)
self.chunk_size = self.total_amount / self.num_chunks
self.side = side.lower()
if self.side not in ['buy', 'sell']:
raise ValueError("Side must be either 'buy' or 'sell'")
# Load markets at initialization
self.exchange.load_markets()
logger.info(f"Initialized {self.side} TWAP with: total_amount={self.total_amount}")
logger.info(f"Chunk size calculated as: {self.chunk_size}")
def get_market_info(self) -> Dict:
"""Get market information including minimum order size"""
market = self.exchange.market(self.symbol)
if not market:
logger.error(f"Could not get market info for {self.symbol}")
raise Exception(f"Could not get market info for {self.symbol}")
logger.debug(f"Full market info: {market}")
# Convert precision from scientific notation if needed
amount_precision = abs(int(math.log10(float(market['precision']['amount']))))
return {
'min_amount': market['limits']['amount']['min'],
'max_amount': market['limits']['amount']['max'],
'min_cost': market['limits']['cost']['min'],
'price_precision': int(abs(math.log10(float(market['precision']['price'])))),
'amount_precision': amount_precision
}
def get_current_price(self) -> Optional[float]:
"""Get current market price"""
try:
ticker = self.exchange.fetch_ticker(self.symbol)
if not ticker or 'last' not in ticker or ticker['last'] is None:
logger.error(f"Could not get valid ticker data: {ticker}")
return None
return float(ticker['last'])
except Exception as e:
logger.error(f"Error fetching current price: {str(e)}")
return None
def round_amount(self, amount: float, precision: int) -> float:
"""Round amount down to the specified precision"""
logger.debug(f"Rounding amount={amount} with precision={precision}")
amount_str = f"{{:.{precision}f}}".format(amount)
result = float(amount_str)
logger.debug(f"Rounded result: {result}")
return result
def check_balance(self, amount: float) -> bool:
"""Check if there's enough balance for the order"""
try:
balance = self.exchange.fetch_balance()
if not balance:
logger.error("Could not fetch balance")
return False
if self.side == 'buy':
currency = self.symbol.split('/')[1] # USDT
current_price = self.get_current_price()
if current_price is None:
logger.error("Could not get current price for balance check")
return False
available = float(balance.get(currency, {}).get('free', 0))
required = amount * current_price * 1.01 # Add 1% for fees
else:
currency = self.symbol.split('/')[0] # STETH
available = float(balance.get(currency, {}).get('free', 0))
required = amount
logger.info(f"Available {currency} balance: {available}")
logger.info(f"Required {currency}: {required}")
return available >= required
except Exception as e:
logger.error(f"Error checking balance: {str(e)}")
return False
def place_order(self, amount: float) -> Optional[Dict]:
"""Place a market order"""
try:
market_info = self.get_market_info()
logger.info(f"Market info for order: {market_info}")
# Round amount to correct precision
rounded_amount = self.round_amount(amount, market_info['amount_precision'])
logger.info(f"Amount before rounding: {amount}")
logger.info(f"Amount after rounding: {rounded_amount}")
# Check minimum amount
if rounded_amount < market_info['min_amount']:
logger.warning(f"Order amount {rounded_amount} is below minimum {market_info['min_amount']}")
return None
# Check maximum amount
if rounded_amount > market_info['max_amount']:
logger.warning(f"Order amount {rounded_amount} is above maximum {market_info['max_amount']}")
return None
# Check if we have enough balance
if not self.check_balance(rounded_amount):
logger.warning(f"Insufficient balance for {self.side} order")
return None
current_price = self.get_current_price()
if current_price is None:
logger.error("Could not get current price for order")
return None
logger.info(f"Placing {self.side} order with rounded amount: {rounded_amount} at approx. price: {current_price}")
order = self.exchange.create_order(
symbol=self.symbol,
type='market',
side=self.side,
amount=rounded_amount
)
if not order:
logger.error("Order creation returned None")
return None
return order
except Exception as e:
logger.error(f"Error placing order: {str(e)}")
logger.error(f"Error type: {type(e)}")
if hasattr(e, '__dict__'):
logger.error(f"Error details: {e.__dict__}")
return None
def execute_twap(self):
"""Execute the TWAP strategy"""
logger.info(f"Starting TWAP execution to {self.side} {self.total_amount} {self.symbol}")
logger.info(f"Will execute {self.num_chunks} orders of {self.chunk_size} every {self.interval_minutes} minutes")
orders_placed = 0
total_executed = 0
while orders_placed < self.num_chunks:
start_time = time.time()
try:
# Get current price first
current_price = self.get_current_price()
if current_price is None:
logger.error("Could not get current price, skipping this interval")
continue
logger.info(f"\nPlacing order {orders_placed + 1}/{self.num_chunks}")
logger.info(f"Current market price: {current_price}")
order = self.place_order(self.chunk_size)
if order:
try:
# Safely extract order details with defaults
executed_amount = 0
if 'amount' in order and order['amount'] is not None:
executed_amount = float(order['amount'])
elif 'filled' in order and order['filled'] is not None:
executed_amount = float(order['filled'])
else:
logger.warning("Could not get executed amount from order response. Using requested amount.")
executed_amount = self.chunk_size
orders_placed += 1
total_executed += executed_amount
# Safely get the execution price
execution_price = 'unknown'
if 'price' in order and order['price'] is not None:
execution_price = order['price']
elif 'average' in order and order['average'] is not None:
execution_price = order['average']
logger.info(f"Order executed: {executed_amount}")
logger.info(f"Execution price: {execution_price}")
logger.info(f"Total executed so far: {total_executed}")
logger.debug(f"Full order response: {order}")
except Exception as e:
logger.error(f"Error processing order response: {str(e)}")
logger.error(f"Order was placed but details could not be processed: {order}")
orders_placed += 1
total_executed += self.chunk_size
else:
logger.warning("Order failed, will retry in next interval")
except Exception as e:
logger.error(f"Error during execution: {str(e)}")
logger.error("Will retry in next interval")
# Wait for next interval
elapsed = time.time() - start_time
sleep_time = max(0, self.interval_minutes * 60 - elapsed)
if sleep_time > 0 and orders_placed < self.num_chunks:
logger.info(f"Waiting {sleep_time:.2f} seconds until next order...")
time.sleep(sleep_time)
logger.info("\nTWAP execution completed")
logger.info(f"Total executed: {total_executed}")
if __name__ == "__main__":
# Configuration
TOTAL_AMOUNT = 1.0 # Total STETH to buy or sell
NUM_CHUNKS = 10 # Number of orders to split into
INTERVAL_MINUTES = 1 # Time between orders
TEST_MODE = False # Set to False for real trading
SIDE = 'sell' # 'buy' or 'sell'
SYMBOL = 'STETH/USDT'
# Initialize and run TWAP
try:
twap = TWAPExecutor(
total_amount=TOTAL_AMOUNT,
num_chunks=NUM_CHUNKS,
interval_minutes=INTERVAL_MINUTES,
side=SIDE,
symbol=SYMBOL
test_mode=TEST_MODE
)
twap.execute_twap()
except Exception as e:
logger.error(f"Error: {str(e)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment