Skip to content

Instantly share code, notes, and snippets.

@rollendxavier
Created March 2, 2026 10:27
Show Gist options
  • Select an option

  • Save rollendxavier/d7f826d322ff2502696669fb173ac346 to your computer and use it in GitHub Desktop.

Select an option

Save rollendxavier/d7f826d322ff2502696669fb173ac346 to your computer and use it in GitHub Desktop.
Crypto Dollar Cost Averaging (DCA) Bot - Code Samples from Tutorial
import requests
BASE_URL = "https://api.coingecko.com/api/v3"
HEADERS = {"accept": "application/json", "x-cg-demo-api-key": "YOUR_KEY"}
def get_coin_list():
return requests.get(f"{BASE_URL}/coins/list", headers=HEADERS, timeout=20).json()
def resolve_coin_ids(targets):
coin_list = get_coin_list()
id_set = {c["id"] for c in coin_list}
symbol_map = {}
for c in coin_list:
symbol_map.setdefault(c["symbol"].lower(), []).append(c["id"])
resolved = []
for t in targets:
t = t.lower()
if t in id_set:
resolved.append(t)
elif t in symbol_map:
resolved.append(symbol_map[t][0])
return resolved
def get_prices(coin_ids, vs_currency="usd"):
params = {
"ids": ",".join(coin_ids),
"vs_currencies": vs_currency,
"include_24hr_change": "true",
}
return requests.get(
f"{BASE_URL}/simple/price", headers=HEADERS, params=params, timeout=20
).json()
coin_ids = resolve_coin_ids(["btc", "eth"])
prices = get_prices(coin_ids)
print(prices)
def execute_dca_buy(coin_ids, investment_amount_usd, vs_currency="usd"):
prices = get_prices(coin_ids, vs_currency=vs_currency)
for coin_id in coin_ids:
price = prices.get(coin_id, {}).get(vs_currency)
if not price:
continue
units = investment_amount_usd / price
print(f"BUY {coin_id}: ${investment_amount_usd:.2f} at ${price:.4f} -> {units:.6f} units")
# Sanity check
if not prices:
raise SystemExit("No price data returned. Check your API key and coin IDs.")
btc_price = prices.get("bitcoin", {}).get("usd")
eth_price = prices.get("ethereum", {}).get("usd")
print("BTC/USD:", btc_price, "| ETH/USD:", eth_price)
def execute_dca_buy(coin_ids, investment_amount_usd, vs_currency="usd"):
prices = get_prices(coin_ids, vs_currency=vs_currency)
for coin_id in coin_ids:
price = prices.get(coin_id, {}).get(vs_currency)
if not price:
print(f"Skipping {coin_id}: no price available.")
continue
units = investment_amount_usd / price
print(f"BUY {coin_id}: ${investment_amount_usd:.2f} at ${price:.4f} -> {units:.6f} units")
# Paper trade only (no real orders)
# Log to SQLite in Step 4
# Swap this for an exchange client when you go live
# Keep it simple while testing and validating logic
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = BlockingScheduler()
# Daily at 9:00 AM
scheduler.add_job(
execute_dca_buy,
CronTrigger(hour=9, minute=0),
args=[coin_ids, INVESTMENT_AMOUNT_USD, "usd"],
)
# Weekly on Monday at 9:00 AM
scheduler.add_job(
execute_dca_buy,
CronTrigger(day_of_week="mon", hour=9, minute=0),
args=[coin_ids, INVESTMENT_AMOUNT_USD, "usd"],
)
scheduler.start()
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = Path("dca.db")
def init_db(db_path):
conn = sqlite3.connect(db_path)
with conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
coin_id TEXT NOT NULL,
price REAL NOT NULL,
units REAL NOT NULL,
amount_usd REAL NOT NULL
)
"""
)
conn.close()
def log_transaction(db_path, coin_id, price, units, amount_usd):
timestamp = datetime.now(timezone.utc).isoformat()
conn = sqlite3.connect(db_path)
with conn:
conn.execute(
"""
INSERT INTO transactions (timestamp, coin_id, price, units, amount_usd)
VALUES (?, ?, ?, ?, ?)
""",
(timestamp, coin_id, price, units, amount_usd),
)
conn.close()
def get_portfolio_summary(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
SELECT coin_id, SUM(amount_usd) AS total_invested, SUM(units) AS total_units
FROM transactions
GROUP BY coin_id
"""
)
rows = cursor.fetchall()
conn.close()
summary = {}
for coin_id, total_invested, total_units in rows:
avg_cost = total_invested / total_units if total_units else 0
summary[coin_id] = {
"total_invested": total_invested or 0,
"total_units": total_units or 0,
"avg_cost": avg_cost,
}
return summary
def print_unrealized_pnl(db_path):
summary = get_portfolio_summary(db_path)
coin_ids = list(summary.keys())
prices = get_prices(coin_ids)
for coin_id, stats in summary.items():
current_price = prices.get(coin_id, {}).get("usd", 0)
market_value = current_price * stats["total_units"]
unrealized_pnl = market_value - stats["total_invested"]
print(coin_id, "unrealized PnL:", unrealized_pnl)
def calculate_sma(prices, window=7):
if len(prices) < window:
return None
return sum(prices[-window:]) / window
def apply_smart_dca(coin_id, base_amount, current_price, dip_pct=0.05, boost=1.5):
chart = get_market_chart(coin_id, vs_currency="usd", days=30)
prices = [p[1] for p in chart.get("prices", [])]
sma = calculate_sma(prices, window=7)
if sma and current_price < sma * (1 - dip_pct):
return base_amount * boost
return base_amount
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment