Last active
January 10, 2026 07:47
-
-
Save rollendxavier/b3aaf73f1a8215ece6e6ea3d9bacaf79 to your computer and use it in GitHub Desktop.
Agentic Trading CG
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import json | |
| import time | |
| import asyncio | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # ============================================================================= | |
| # Configuration | |
| # ============================================================================= | |
| CG_API_KEY = os.getenv("CG_PRO_API_KEY") or os.getenv("CG_DEMO_API_KEY") | |
| CG_BASE_URL = "https://pro-api.coingecko.com/api/v3/onchain" # Pro API for paid features | |
| CG_DEMO_URL = "https://api.coingecko.com/api/v3/onchain" # Demo API for free endpoints | |
| # Use Pro API if Pro key available, otherwise Demo | |
| if os.getenv("CG_PRO_API_KEY"): | |
| BASE_URL = CG_BASE_URL | |
| API_KEY = os.getenv("CG_PRO_API_KEY") | |
| print("✓ Using CoinGecko Pro API") | |
| else: | |
| BASE_URL = CG_DEMO_URL | |
| API_KEY = os.getenv("CG_DEMO_API_KEY") | |
| print("✓ Using CoinGecko Demo API") | |
| # Blockscout API for transaction verification | |
| BLOCKSCOUT_BASE_URL = "https://eth.blockscout.com/api/v2" | |
| # Headers for CoinGecko API | |
| HEADERS = { | |
| "accept": "application/json", | |
| "x-cg-pro-api-key": API_KEY | |
| } | |
| # ============================================================================= | |
| # Helper Functions | |
| # ============================================================================= | |
| def cg_request(endpoint: str, params: dict = None) -> dict: | |
| """Make a request to CoinGecko API with error handling and rate limiting.""" | |
| url = f"{BASE_URL}{endpoint}" | |
| for attempt in range(3): | |
| try: | |
| response = requests.get(url, headers=HEADERS, params=params, timeout=30) | |
| if response.status_code == 429: | |
| print(f"⚠ Rate limited. Waiting {2 ** attempt} seconds...") | |
| time.sleep(2 ** attempt) | |
| continue | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| if attempt == 2: | |
| print(f"✗ API request failed: {e}") | |
| raise | |
| time.sleep(1) | |
| return {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_trending_pools(network: str = "eth", limit: int = 10) -> pd.DataFrame: | |
| Fetch trending pools on a specific network. | |
| Args: | |
| network: Network ID (e.g. 'base', 'eth') | |
| limit: Maximum number of pools to return | |
| Returns: | |
| DataFrame with trending pool data | |
| print(f"Fetching trending pools on {network}...") | |
| endpoint = f"/networks/{network}/trending_pools" | |
| params = {"include": "base_token,quote_token"} | |
| data = cg_request(endpoint, params) | |
| pools = data.get("data", [])[:limit] | |
| pool_data = [] | |
| for pool in pools: | |
| attr = pool.get("attributes", {}) | |
| relationships = pool.get("relationships", {}) | |
| pool_data.append({ | |
| "pool_address": attr.get("address", ""), | |
| "pool_name": attr.get("name", "Unknown"), | |
| "dex": relationships.get("dex", {}).get("data", {}).get("id", "unknown"), | |
| "price_usd": float(attr.get("base_token_price_usd", 0) or 0), | |
| "volume_24h": float(attr.get("volume_usd", {}).get("h24", 0) or 0), | |
| "liquidity_usd": float(attr.get("reserve_in_usd", 0) or 0), | |
| "price_change_24h": float(attr.get("price_change_percentage", {}).get("h24", 0) or 0), | |
| }) | |
| return pd.DataFrame(pool_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_pools_megafilter( | |
| network: str = "eth", | |
| min_volume_24h: float = 100000, | |
| max_liquidity: float = 1000000, | |
| min_transactions: int = 100, | |
| limit: int = 10 | |
| ) -> pd.DataFrame: | |
| endpoint = "/pools/megafilter" | |
| params = { | |
| "networks": network, | |
| "volume_24h_usd_min": min_volume_24h, | |
| "reserve_usd_max": max_liquidity, | |
| "tx_24h_count_min": min_transactions, | |
| "order": "volume_usd_h24_desc", | |
| "page": 1, | |
| "limit": limit | |
| } | |
| data = cg_request(endpoint, params) | |
| pools = data.get("data", []) | |
| # Process and return pool data... | |
| return pd.DataFrame(pool_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_pool_trades(network: str, pool_address: str, limit: int = 300) -> pd.DataFrame: | |
| Fetch recent trades from a specific pool. | |
| Args: | |
| network: Network ID | |
| pool_address: Pool contract address | |
| limit: Number of trades to fetch (max 300 per call) | |
| Returns: | |
| DataFrame with trade data | |
| endpoint = f"/networks/{network}/pools/{pool_address}/trades" | |
| params = {"limit": min(limit, 300)} | |
| data = cg_request(endpoint, params) | |
| trades = data.get("data", []) | |
| trade_data = [] | |
| for trade in trades: | |
| attr = trade.get("attributes", {}) | |
| trade_data.append({ | |
| "tx_hash": attr.get("tx_hash", ""), | |
| "block_timestamp": attr.get("block_timestamp", ""), | |
| "tx_from_address": attr.get("tx_from_address", ""), | |
| "kind": attr.get("kind", ""), # 'buy' or 'sell' | |
| "volume_usd": float(attr.get("volume_in_usd", 0) or 0), | |
| "from_token_amount": float(attr.get("from_token_amount", 0) or 0), | |
| "to_token_amount": float(attr.get("to_token_amount", 0) or 0), | |
| "price_from_in_usd": float(attr.get("price_from_in_usd", 0) or 0), | |
| "price_to_in_usd": float(attr.get("price_to_in_usd", 0) or 0), | |
| }) | |
| return pd.DataFrame(trade_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def analyze_wallet_profitability(trades_df: pd.DataFrame, min_trades: int = 3) -> pd.DataFrame: | |
| Analyze wallet profitability based on trade history. | |
| Calculates realized PnL for each wallet. | |
| if trades_df.empty: | |
| return pd.DataFrame() | |
| wallet_stats = [] | |
| for wallet, group in trades_df.groupby("tx_from_address"): | |
| if len(group) < min_trades: | |
| continue | |
| buys = group[group["kind"] == "buy"] | |
| sells = group[group["kind"] == "sell"] | |
| total_bought = buys["volume_usd"].sum() | |
| total_sold = sells["volume_usd"].sum() | |
| # Simple realized PnL calculation | |
| realized_pnl = total_sold - total_bought | |
| wallet_stats.append({ | |
| "wallet_address": wallet, | |
| "total_trades": len(group), | |
| "buys": len(buys), | |
| "sells": len(sells), | |
| "total_bought_usd": total_bought, | |
| "total_sold_usd": total_sold, | |
| "realized_pnl_usd": realized_pnl, | |
| "avg_trade_size_usd": group["volume_usd"].mean(), | |
| }) | |
| df = pd.DataFrame(wallet_stats) | |
| return df.sort_values("realized_pnl_usd", ascending=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import websockets | |
| async def subscribe_to_pool_trades( | |
| pool_address: str, | |
| network: str = "eth", | |
| callback=None, | |
| duration_seconds: int = 60 | |
| ): | |
| # Subscribe to real-time trades for a pool using CoinGecko WebSocket. # | |
| Args: | |
| pool_address: Pool address to monitor | |
| network: Network ID | |
| callback: Function to call when trade is detected | |
| duration_seconds: How long to listen | |
| ws_url = f"wss://ws-onchain.coingecko.com/onchain/v1/subscribe/trades" | |
| print(f"Connecting to CoinGecko WebSocket...") | |
| print(f"Monitoring pool: {pool_address[:10]}...") | |
| async with websockets.connect(ws_url) as websocket: | |
| # Subscribe message | |
| subscribe_msg = { | |
| "action": "subscribe", | |
| "params": { | |
| "pool_addresses": [pool_address], | |
| "network": network | |
| } | |
| "api_key": API_KEY | |
| } | |
| await websocket.send(json.dumps(subscribe_msg)) | |
| print("Subscribed to trade stream") | |
| # Listen for trades | |
| start_time = time.time() | |
| while time.time() - start_time < duration_seconds: | |
| try: | |
| message = await asyncio.wait_for( | |
| websocket.recv(), | |
| timeout=5.0 | |
| ) | |
| data = json.loads(message) | |
| if data.get("type") == "trade": | |
| print(f"TRADE DETECTED!") | |
| print(f" TX Hash: {data.get('tx_hash', 'N/A')}") | |
| print(f" Type: {data.get('kind', 'N/A')}") | |
| print(f" Volume: ${data.get('volume_usd', 0):,.2f}") | |
| if callback: | |
| await callback(data) | |
| except asyncio.TimeoutError: | |
| pass # No message, continue listening |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| BLOCKSCOUT_BASE_URL = "https://eth.blockscout.com/api/v2" | |
| def verify_transaction(tx_hash: str) -> dict: | |
| """ | |
| Verify transaction details using Blockscout API. | |
| """ | |
| url = f"{BLOCKSCOUT_BASE_URL}/transactions/{tx_hash}" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| return { | |
| "tx_hash": tx_hash, | |
| "from_address": data.get("from", {}).get("hash", ""), | |
| "to_address": data.get("to", {}).get("hash", ""), | |
| "value": data.get("value", "0"), | |
| "status": data.get("status", ""), | |
| } | |
| except Exception as e: | |
| print(f"Error verifying transaction: {e}") | |
| return {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class PaperTradingEngine: | |
| """ | |
| Simulated trading engine for copy trading. | |
| Implements proportional allocation for position sizing. | |
| """ | |
| def __init__(self, initial_equity: float = 10000.0, max_position_pct: float = 0.1): | |
| self.equity = initial_equity | |
| self.max_position_pct = max_position_pct | |
| self.positions = {} | |
| self.trades = [] | |
| self.target_wallets = set() | |
| def add_target_wallet(self, wallet_address: str): | |
| """Add a wallet to the copy trade target list.""" | |
| self.target_wallets.add(wallet_address.lower()) | |
| def calculate_position_size(self, leader_trade_usd: float) -> float: | |
| """ | |
| Calculate proportional position size. | |
| Instead of mirroring exact amounts, calculate based on equity. | |
| """ | |
| max_size = self.equity * self.max_position_pct | |
| position_size = min(max_size, leader_trade_usd * 0.1) | |
| return round(position_size, 2) | |
| def check_slippage( | |
| self, | |
| current_price: float, | |
| entry_price: float, | |
| max_slippage_pct: float = 2.0 | |
| ) -> bool: | |
| """ | |
| Check if current price is within acceptable slippage. | |
| Prevents buying at peaks if the target already pumped the price | |
| """ | |
| if entry_price == 0: | |
| return False | |
| slippage = abs(current_price - entry_price) / entry_price * 100 | |
| return slippage <= max_slippage_pct | |
| async def execute_copy_trade(self, trade_data: dict): | |
| """ | |
| Execute a copy trade based on detected trade. | |
| """ | |
| tx_hash = trade_data.get("tx_hash", "") | |
| if not tx_hash: | |
| return | |
| # Verify transaction | |
| tx_info = verify_transaction(tx_hash) | |
| if not tx_info: | |
| return | |
| from_address = tx_info.get("from_address", "").lower() | |
| # Check if this is from a target wallet | |
| if from_address not in self.target_wallets: | |
| return # Not a target wallet | |
| print(f"TARGET WALLET TRADE DETECTED!") | |
| # Calculate position size | |
| leader_volume = float(trade_data.get("volume_usd", 0)) | |
| position_size = self.calculate_position_size(leader_volume) | |
| trade_kind = trade_data.get("kind", "unknown") | |
| token_symbol = trade_data.get("base_token_symbol", "TOKEN") | |
| current_price = float(trade_data.get("price_usd", 0)) | |
| # Execute paper trade | |
| trade_record = { | |
| "timestamp": datetime.now().isoformat(), | |
| "tx_hash": tx_hash, | |
| "copied_wallet": from_address, | |
| "action": trade_kind, | |
| "token": token_symbol, | |
| "amount_usd": position_size, | |
| "price": current_price, | |
| "status": "EXECUTED (PAPER)" | |
| } | |
| self.trades.append(trade_record) | |
| print(f"PAPER TRADE EXECUTED:") | |
| print(f" Action: {trade_kind.upper()}") | |
| print(f" Token: {token_symbol}") | |
| print(f" Amount: ${position_size:,.2f}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async def run_copy_trading_bot(): | |
| # Step 1: Find trending pools | |
| trending_pools = get_trending_pools(network="eth", limit=10) | |
| target_pool = trending_pools.iloc[0]["pool_address"] | |
| # Step 2: Find profitable wallets | |
| trades_df = get_pool_trades("eth", target_pool) | |
| wallets_df = analyze_wallet_profitability(trades_df) | |
| profitable_wallets = wallets_df[wallets_df["realized_pnl_usd"] >= 100]["wallet_address"].tolist() | |
| # Step 3: Initialize paper trading engine | |
| trader = PaperTradingEngine(initial_equity=10000.0, max_position_pct=0.05) | |
| for wallet in profitable_wallets[:3]: | |
| trader.add_target_wallet(wallet) | |
| # Step 4: Start real-time monitoring | |
| await subscribe_to_pool_trades( | |
| pool_address=target_pool, | |
| network="eth", | |
| callback=trader.execute_copy_trade, | |
| duration_seconds=300 | |
| ) | |
| # Print summary | |
| trader.get_summary() | |
| # Run the bot | |
| asyncio.run(run_copy_trading_bot()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment