Skip to content

Instantly share code, notes, and snippets.

@NerdPreferred1
Created May 24, 2025 02:34
Show Gist options
  • Save NerdPreferred1/eb2a53b0d630c2ffc9adc7f71b05407b to your computer and use it in GitHub Desktop.
Save NerdPreferred1/eb2a53b0d630c2ffc9adc7f71b05407b to your computer and use it in GitHub Desktop.
Mint Cycler Assistant
K_privateKey=Enter Private Key Here
K_publicKey=Enter Public Key Here
K_sendEmail=[email protected]
K_sendWallet=default_wallet_address

Kinesis API Trading Script

Setup

  1. Clone this repository: git clone <repository-url>
  2. Install dependencies: pip install -r requirements.txt
  3. Create a .env file with your API keys: (You can hard code your keys into the program but not recommended for security reasons)
  4. Run the script: Mint Cycler Assistant.py

Notes

  • When program launches, the price tracker will automatically being charting the current price information for KAU (mid between bid and ask)
  • Set Parameters to desired values.
  • Any update to the parameters takes affect in real time
  • Dry Run mode will simulate trades.
  • Set to Live mode to place trades for real.
  • A log file location must be selected before you can press the start button.
  • Press Start to begin trading
  • The log infromation at the bottom can be hidden with the Hide Log button.
  • The cycler will copy your USD holdings minus $10 to your clip board each time it cycles. This is so you can Ctrl+V your USD balance (-$10) whenever you want to place the withdrawal amount to your mint account. (The minus $10 is to account for the $5 withdrawal fee to your mint account.)

Parameter Descriptions

  • Sell Amount (KAU): This value will control the amount of KAU that is sold each time the program loops.
  • Loop Timing (Seconds): This value controls the frequency for each trade iteration. (be kind to the servers, recommend no less than 3 seconds)
  • Bid-Ask Spread Threshold: This value controls the minimum value for the bid ask spread. The program will not place a trade unless the spread between the bid and ask price is less than this value.
  • KAU Threshold: This is the minimum amount of KAU that must be in your account before the progam will place a trade. Note that the API will not account for the 0.22% trade fee. If you have 100 KAU in your account the most you can place a trade for is 99.87 KAU (0.22% fee)
  • Max Trade Amount (USD): The program will stop trading once this value has been reached. (There is a bug that requires the program to be restarted once the max amount of USD total has been reached)
  • Sell Price (Bid Price Offset): This value will control the price you want to sell KAU at. Example: If the current bid price is 100.00 but you want to sell your KAU at 100.69, you would change the Sell Price parameter to 0.69.
  • Log File: A text log file will be created with all trade activity and saved to the location of your choosing.
import subprocess
import requests
import time
import pandas as pd
from datetime import timezone
import datetime
import hmac
import hashlib
import json
import logging
import math
from dotenv import load_dotenv
import os
import pyperclip
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog
import threading
import queue
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib
import sys
matplotlib.use('TkAgg')
# Set up logging with custom handlers for GUI and file
log_directory = r"C:\Users"
os.makedirs(log_directory, exist_ok=True)
# Log filename will be set via GUI; use a default for initialization
default_log_filename = os.path.join(log_directory,
f"trading_log_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
log_queue = queue.Queue()
# File handler will be added after user selects log file
file_handler = None
class QueueHandler(logging.Handler):
"""Custom logging handler to queue log messages for GUI display."""
def __init__(self, queue):
super().__init__()
self.queue = queue
def emit(self, record):
try:
msg = self.format(record)
self.queue.put(msg)
except Exception:
self.handleError(record)
# Load environment variables
load_dotenv()
# Global variables
privateKey = os.getenv("K_privateKey")
publicKey = os.getenv("K_publicKey")
base_url = 'https://client-api.kinesis.money'
sendToken = 'KAU'
sendEmail = os.getenv("K_sendEmail", "[email protected]") # Load from environment variable
sendWallet = os.getenv("K_sendWallet", "default_wallet_address") # Load from environment variable
# GUI Application
class KAUTradingApp:
def __init__(self, root):
"""
Initialize the Kinesis Mint Cycler GUI application.
Args:
root (tk.Tk): The root Tkinter window.
"""
self.root = root
self.root.title("Kinesis Mint Cycler")
# self.root.iconbitmap(r"C:\directory location of .ico file if you want to add one")
self.running = False
self.thread = None
self.price_thread = None
self.price_thread_running = True
self.holdings_labels = []
self.usd_total = 0.0
self.mid_prices = []
self.previous_mid_price = None
self.log_visible = True
self.trade_count = 0
self.log_file_path = None
self.status_var = tk.StringVar(value="Stopped")
# Set up queue handler for GUI logging
self.queue_handler = QueueHandler(log_queue)
self.queue_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(self.queue_handler)
# Configure style
self.configure_style()
# GUI Layout
self.create_widgets()
# Start queue processing
self.process_log_queue()
# Start continuous price updates
self.start_price_updates()
# Ensure price thread stops when window is closed
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def configure_style(self):
"""Configure the visual styles for the GUI elements with a professional look."""
style = ttk.Style()
style.theme_use('clam')
# Professional color scheme
self.bg_color = "#212121" # Dark gray background
self.frame_bg = "#424242" # Lighter gray for frames
self.text_color = "#E0E0E0" # Soft white for text
self.accent_color = "#26A69A" # Teal for highlights and buttons
self.log_bg = "#303030" # Slightly lighter gray for log area
self.start_button_color = "#26A69A" # Teal for Start button
self.start_button_hover = "#4DB6AC" # Lighter teal for hover
self.stop_button_color = "#EF5350" # Red for Stop button
self.stop_button_hover = "#F44336" # Darker red for hover
self.button_color = "#616161" # Neutral gray for other buttons
self.button_hover = "#757575" # Lighter gray for hover
self.running_color = "#26A69A" # Teal for Running status
self.stopped_color = "#EF5350" # Red for Stopped status
self.graph_bg = "#424242" # Same as frame for graph background
self.graph_line_color = "#26A69A" # Teal for graph line
self.entry_bg = "#616161" # Dark gray for entry fields
self.entry_fg = "#E0E0E0" # Soft white for entry text
# Apply background color to root window
self.root.configure(bg=self.bg_color)
# Configure styles for labels, entries, and frames
style.configure("TLabel", font=("Segoe UI", 11), foreground=self.text_color, background=self.frame_bg)
style.configure("TEntry", font=("Segoe UI", 10), foreground=self.entry_fg, fieldbackground=self.entry_bg,
padding=5)
style.configure("TFrame", background=self.bg_color)
style.configure("TLabelframe", background=self.frame_bg, font=("Segoe UI", 12, "bold"))
style.configure("TLabelframe.Label", background=self.frame_bg, foreground=self.text_color)
# Start and Stop buttons with hover effects
style.configure("Start.TButton", font=("Segoe UI", 11, "bold"), background=self.start_button_color,
foreground=self.text_color, padding=8)
style.map("Start.TButton",
background=[('active', self.start_button_hover)],
foreground=[('active', self.text_color)])
style.configure("Stop.TButton", font=("Segoe UI", 11, "bold"), background=self.stop_button_color,
foreground=self.text_color, padding=8)
style.map("Stop.TButton",
background=[('active', self.stop_button_hover)],
foreground=[('active', self.text_color)])
# Other buttons (Browse, Dry Run, etc.) with hover effects
style.configure("TButton", font=("Segoe UI", 10), background=self.button_color, foreground=self.text_color,
padding=5)
style.map("TButton",
background=[('active', self.button_hover)],
foreground=[('active', self.text_color)])
# Dry Run and Live Trading buttons
style.configure("DryRun.TButton", font=("Segoe UI", 10, "bold"), padding=5)
style.map("DryRun.TButton",
background=[('!active', self.accent_color), ('active', self.start_button_hover)],
foreground=[('!active', self.text_color), ('active', self.text_color)])
style.configure("LiveTrading.TButton", font=("Segoe UI", 10, "bold"), padding=5)
style.map("LiveTrading.TButton",
background=[('!active', self.stop_button_color), ('active', self.stop_button_hover)],
foreground=[('!active', self.text_color), ('active', self.text_color)])
# Holdings, Counter, USD Total, and Price Info labels
style.configure("Holdings.TLabel", font=("Segoe UI", 10), background=self.frame_bg, foreground=self.text_color)
style.configure("Counter.TLabel", font=("Segoe UI", 10), background=self.frame_bg, foreground=self.text_color)
style.configure("USDTotal.TLabel", font=("Segoe UI", 10), background=self.frame_bg, foreground=self.text_color)
style.configure("PriceInfo.TLabel", font=("Segoe UI", 10), background=self.frame_bg, foreground=self.text_color)
# Status label (Running/Stopped) with rounded corners (simulated via padding)
style.configure("Running.TLabel",
font=("Segoe UI", 12, "bold"),
foreground=self.text_color,
background=self.running_color,
padding=8,
anchor="center",
relief="flat")
style.configure("Stopped.TLabel",
font=("Segoe UI", 12, "bold"),
foreground=self.text_color,
background=self.stopped_color,
padding=8,
anchor="center",
relief="flat")
# Log text area style
self.log_text_style = {
"font": ("Segoe UI", 9),
"bg": self.log_bg,
"fg": self.text_color,
"insertbackground": self.text_color
}
def validate_positive_float(self, value_if_allowed):
"""
Validate that an input is a positive float.
Args:
value_if_allowed (str): The value to validate.
Returns:
bool: True if the value is a positive float, False otherwise.
"""
try:
val = float(value_if_allowed)
return val > 0
except ValueError:
return False
def on_start_button_click(self, event):
"""
Handle click events on the Start button, logging an error if the button is disabled.
Args:
event: The click event object.
Returns:
str: "break" to prevent further event processing if the button is disabled.
"""
if self.start_button['state'] == 'disabled':
logging.info("Start button clicked while disabled; user must select a log file location.")
return "break" # Prevent further event processing
return None
def create_widgets(self):
"""Create and layout all GUI widgets for the application with a professional design."""
main_frame = ttk.Frame(self.root, padding=15)
main_frame.grid(row=0, column=0, sticky="nsew")
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(0, weight=2) # Parameters and Trading Mode
main_frame.rowconfigure(1, weight=1) # Control Buttons
main_frame.rowconfigure(2, weight=4) # Price Information
main_frame.rowconfigure(3, weight=2) # Holdings
main_frame.rowconfigure(4, weight=3) # Log Display
# Top Frame (Parameters and Trading Mode/Trade Counter/USD Total)
top_frame = ttk.Frame(main_frame, padding=5)
top_frame.grid(row=0, column=0, sticky="nsew", padx=10, pady=5)
top_frame.columnconfigure(0, weight=3)
top_frame.columnconfigure(1, weight=1)
top_frame.rowconfigure(0, weight=1)
# Input Frame (Left)
input_frame = ttk.LabelFrame(top_frame, text="Parameters", padding=15)
input_frame.grid(row=0, column=0, sticky="nsew", padx=(0, 10), pady=5)
input_frame.columnconfigure(0, weight=1)
input_frame.columnconfigure(1, weight=2)
input_frame.columnconfigure(2, weight=0)
for i in range(7):
input_frame.rowconfigure(i, weight=1)
# Validation command for positive floats
validate_cmd = (self.root.register(self.validate_positive_float), '%P')
# Sell Amount
ttk.Label(input_frame, text="Sell Amount (KAU):").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.sell_amount_var = tk.DoubleVar(value=0.01)
ttk.Entry(input_frame, textvariable=self.sell_amount_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=0, column=1, sticky="ew", padx=5, pady=5)
# Loop Timing
ttk.Label(input_frame, text="Loop Timing (seconds):").grid(row=1, column=0, sticky="w", padx=5, pady=5)
self.loop_timing_var = tk.DoubleVar(value=3)
ttk.Entry(input_frame, textvariable=self.loop_timing_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=1, column=1, sticky="ew", padx=5, pady=5)
# Bid-Ask Spread Threshold
ttk.Label(input_frame, text="Bid-Ask Spread Threshold:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
self.spread_threshold_var = tk.DoubleVar(value=0.12)
ttk.Entry(input_frame, textvariable=self.spread_threshold_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=2, column=1, sticky="ew", padx=5, pady=5)
# KAU Threshold
ttk.Label(input_frame, text="KAU Threshold:").grid(row=3, column=0, sticky="w", padx=5, pady=5)
self.kau_threshold_var = tk.DoubleVar(value=0.100)
ttk.Entry(input_frame, textvariable=self.kau_threshold_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=3, column=1, sticky="ew", padx=5, pady=5)
# Max Trade Amount (USD)
ttk.Label(input_frame, text="Max Trade Amount (USD):").grid(row=4, column=0, sticky="w", padx=5, pady=5)
self.max_usd_var = tk.DoubleVar(value=1000.00)
ttk.Entry(input_frame, textvariable=self.max_usd_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=4, column=1, sticky="ew", padx=5, pady=5)
# Sell Price Offset
ttk.Label(input_frame, text="Sell Price (Bid Offset):").grid(row=5, column=0, sticky="w", padx=5, pady=5)
self.sell_price_offset_var = tk.DoubleVar(value=0.00)
ttk.Entry(input_frame, textvariable=self.sell_price_offset_var, width=15, validate="key",
validatecommand=validate_cmd).grid(row=5, column=1, sticky="ew", padx=5, pady=5)
# Log File Selection
ttk.Label(input_frame, text="Log File:").grid(row=6, column=0, sticky="w", padx=5, pady=5)
self.log_file_var = tk.StringVar(value="Select log file...")
log_entry = ttk.Entry(input_frame, textvariable=self.log_file_var, width=15, state='readonly')
log_entry.grid(row=6, column=1, sticky="ew", padx=5, pady=5)
ttk.Button(input_frame, text="Browse", command=self.browse_log_file).grid(row=6, column=2, sticky="w", padx=5,
pady=5)
# Right Frame (Trading Mode, Trade Counter, USD Total)
right_frame = ttk.Frame(top_frame, padding=5)
right_frame.grid(row=0, column=1, sticky="nsew", padx=(10, 0), pady=5)
right_frame.columnconfigure(0, weight=1)
right_frame.rowconfigure(0, weight=1)
right_frame.rowconfigure(1, weight=1)
right_frame.rowconfigure(2, weight=1)
# Trading Mode Frame
trading_mode_frame = ttk.LabelFrame(right_frame, text="Trading Mode", padding=15)
trading_mode_frame.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
trading_mode_frame.columnconfigure(0, weight=1)
trading_mode_frame.rowconfigure(0, weight=1)
self.dry_run_var = tk.BooleanVar(value=True)
self.dry_run_button = ttk.Button(trading_mode_frame, text="Dry Run Active", style="DryRun.TButton",
command=self.toggle_dry_run)
self.dry_run_button.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
# Trade Counter Display
trade_counter_frame = ttk.LabelFrame(right_frame, text="Trade Counter", padding=15)
trade_counter_frame.grid(row=1, column=0, sticky="nsew", padx=5, pady=5)
trade_counter_frame.columnconfigure(0, weight=1)
trade_counter_frame.rowconfigure(0, weight=1)
self.trade_counter_var = tk.StringVar(value="Successful Trades: 0")
ttk.Label(trade_counter_frame, textvariable=self.trade_counter_var, style="Counter.TLabel").grid(row=0,
column=0,
sticky="ew")
# USD Total Display
usd_total_frame = ttk.LabelFrame(right_frame, text="Total USD from KAU Sales", padding=15)
usd_total_frame.grid(row=2, column=0, sticky="nsew", padx=5, pady=5)
usd_total_frame.columnconfigure(0, weight=1)
usd_total_frame.rowconfigure(0, weight=1)
self.usd_total_var = tk.StringVar(value="USD Total: 0.00")
ttk.Label(usd_total_frame, textvariable=self.usd_total_var, style="USDTotal.TLabel").grid(row=0, column=0,
sticky="ew")
# Control Buttons Frame
button_frame = ttk.Frame(main_frame, padding=5)
button_frame.grid(row=1, column=0, sticky="ew", padx=10, pady=5)
button_frame.columnconfigure(0, weight=1)
button_frame.columnconfigure(1, weight=1)
button_frame.columnconfigure(2, weight=1)
button_frame.columnconfigure(3, weight=2)
button_frame.rowconfigure(0, weight=1)
self.start_button = ttk.Button(button_frame, text="Start", command=self.start_script, style="Start.TButton",
state='disabled')
self.start_button.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
self.start_button.bind("<Button-1>", self.on_start_button_click)
ttk.Button(button_frame, text="Stop", command=self.stop_script, style="Stop.TButton").grid(row=0, column=1,
sticky="ew", padx=5,
pady=5)
self.toggle_log_button = ttk.Button(button_frame, text="Hide Log", command=self.toggle_log)
self.toggle_log_button.grid(row=0, column=2, sticky="ew", padx=5, pady=5)
self.status_label = ttk.Label(button_frame, textvariable=self.status_var, style="Stopped.TLabel", width=12)
self.status_label.grid(row=0, column=3, sticky="ew", padx=5, pady=5)
# Price Information Frame
price_info_frame = ttk.LabelFrame(main_frame, text="KAU Price Information", padding=15)
price_info_frame.grid(row=2, column=0, sticky="nsew", padx=10, pady=5)
price_info_frame.columnconfigure(0, weight=1)
price_info_frame.columnconfigure(1, weight=3)
price_info_frame.rowconfigure(0, weight=1)
# Left Frame for Price Labels
labels_frame = ttk.Frame(price_info_frame)
labels_frame.grid(row=0, column=0, sticky="nsw", padx=(0, 10))
labels_frame.columnconfigure(0, minsize=150, weight=1)
for i in range(4):
labels_frame.rowconfigure(i, weight=1)
self.bid_price_var = tk.StringVar(value="Bid Price: N/A")
self.ask_price_var = tk.StringVar(value="Ask Price: N/A")
self.spread_var = tk.StringVar(value="Spread: N/A")
self.trend_var = tk.StringVar(value="Trend: N/A")
ttk.Label(labels_frame, textvariable=self.bid_price_var, style="PriceInfo.TLabel").grid(row=0, column=0,
sticky="ew", pady=5)
ttk.Label(labels_frame, textvariable=self.ask_price_var, style="PriceInfo.TLabel").grid(row=1, column=0,
sticky="ew", pady=5)
ttk.Label(labels_frame, textvariable=self.spread_var, style="PriceInfo.TLabel").grid(row=2, column=0,
sticky="ew", pady=5)
ttk.Label(labels_frame, textvariable=self.trend_var, style="PriceInfo.TLabel").grid(row=3, column=0,
sticky="ew", pady=5)
# Right Frame for Graph
graph_frame = ttk.Frame(price_info_frame)
graph_frame.grid(row=0, column=1, sticky="nsew", padx=5)
graph_frame.columnconfigure(0, weight=1)
graph_frame.rowconfigure(0, weight=1)
self.fig, self.ax = plt.subplots(figsize=(6, 3), dpi=80)
self.ax.set_facecolor(self.graph_bg)
self.fig.set_facecolor(self.graph_bg)
self.ax.tick_params(axis='x', colors=self.text_color, labelsize=8)
self.ax.tick_params(axis='y', colors=self.text_color, labelsize=8)
self.ax.spines['bottom'].set_color(self.text_color)
self.ax.spines['top'].set_color(self.text_color)
self.ax.spines['left'].set_color(self.text_color)
self.ax.spines['right'].set_color(self.text_color)
self.line, = self.ax.plot([], [], color=self.graph_line_color, linewidth=1.5)
self.ax.set_xlabel("Time", color=self.text_color, fontsize=8)
self.ax.set_ylabel("Mid Price (USD)", color=self.text_color, fontsize=8)
self.canvas = FigureCanvasTkAgg(self.fig, master=graph_frame)
self.canvas.get_tk_widget().grid(row=0, column=0, sticky="nsew")
plt.tight_layout()
# Holdings Frame
self.holdings_frame = ttk.LabelFrame(main_frame, text="Current Holdings", padding=15)
self.holdings_frame.grid(row=3, column=0, sticky="nsew", padx=10, pady=5)
self.holdings_frame.columnconfigure(0, weight=1)
self.holdings_frame.rowconfigure(0, weight=1)
self.holdings_labels = [
ttk.Label(self.holdings_frame, text="No holdings data", style="Holdings.TLabel").grid(row=0, column=0,
sticky="ew")]
# Log Display
self.log_text = scrolledtext.ScrolledText(main_frame, height=8, state='disabled', **self.log_text_style)
self.log_text.grid(row=4, column=0, sticky="nsew", padx=10, pady=5)
def browse_log_file(self):
"""Open a file dialog for the user to select a log file location."""
initial_dir = log_directory
default_filename = f"trading_log_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt"
file_path = filedialog.asksaveasfilename(
initialdir=initial_dir,
initialfile=default_filename,
title="Select Log File Location",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")],
defaultextension=".txt"
)
if file_path:
self.log_file_path = file_path
self.log_file_var.set(os.path.basename(file_path))
os.makedirs(os.path.dirname(file_path), exist_ok=True)
global file_handler
if file_handler:
logger.removeHandler(file_handler)
file_handler = logging.FileHandler(file_path)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
logging.info(f"Log file set to: {file_path}")
self.start_button.configure(state='normal')
else:
self.log_file_var.set("Select log file...")
self.start_button.configure(state='disabled')
def toggle_dry_run(self):
"""Toggle between Dry Run and Live Trading modes."""
self.dry_run_var.set(not self.dry_run_var.get())
if self.dry_run_var.get():
self.dry_run_button.configure(text="Dry Run Active", style="DryRun.TButton")
else:
self.dry_run_button.configure(text="Live Trading", style="LiveTrading.TButton")
def toggle_log(self):
"""Show or hide the log display in the GUI."""
if self.log_visible:
self.log_text.grid_remove()
self.toggle_log_button.configure(text="Show Log")
self.log_visible = False
else:
self.log_text.grid(row=4, column=0, sticky="nsew", padx=10, pady=5)
self.toggle_log_button.configure(text="Hide Log")
self.log_visible = True
def process_log_queue(self):
"""Process queued log messages and display them in the GUI log area."""
try:
while True:
msg = log_queue.get_nowait()
if self.log_visible:
self.log_text.configure(state='normal')
self.log_text.insert(tk.END, msg + '\n')
self.log_text.see(tk.END)
self.log_text.configure(state='disabled')
except queue.Empty:
pass
self.root.after(100, self.process_log_queue)
def update_holdings_display(self, holdings):
"""Update the GUI display of current holdings."""
for label in self.holdings_frame.winfo_children():
label.destroy()
self.holdings_labels = []
if not holdings:
label = ttk.Label(self.holdings_frame, text="No holdings data", style="Holdings.TLabel")
label.grid(row=0, column=0, sticky="ew")
self.holdings_labels.append(label)
return
non_zero_holdings = [
(currency, float(data['available']))
for currency, data in holdings.items()
if float(data.get('available', 0)) > 0
]
if non_zero_holdings:
for i, (currency, balance) in enumerate(non_zero_holdings):
self.holdings_frame.rowconfigure(i, weight=1)
label = ttk.Label(self.holdings_frame, text=f"{currency}: {balance:.5f}", style="Holdings.TLabel")
label.grid(row=i, column=0, sticky="ew", pady=2)
self.holdings_labels.append(label)
else:
label = ttk.Label(self.holdings_frame, text="No non-zero holdings", style="Holdings.TLabel")
label.grid(row=0, column=0, sticky="ew")
self.holdings_labels.append(label)
def update_price_display(self, bid_price, ask_price, spread):
"""Update the GUI display of price information and trend graph."""
self.bid_price_var.set(f"Bid Price: {bid_price:.2f}" if bid_price is not None else "Bid Price: N/A")
self.ask_price_var.set(f"Ask Price: {ask_price:.2f}" if ask_price is not None else "Ask Price: N/A")
self.spread_var.set(f"Spread: {spread:.2f}" if spread is not None else "Spread: N/A")
if bid_price is not None and ask_price is not None:
mid_price = (bid_price + ask_price) / 2
self.mid_prices.append(mid_price)
if self.previous_mid_price is None:
trend = "N/A"
elif mid_price > self.previous_mid_price:
trend = "Up"
elif mid_price < self.previous_mid_price:
trend = "Down"
else:
trend = "Stable"
self.trend_var.set(f"Trend: {trend}")
self.previous_mid_price = mid_price
if self.mid_prices:
min_price = min(self.mid_prices)
max_price = max(self.mid_prices)
price_range = max_price - min_price
margin = max(0.1, price_range * 0.05)
self.ax.set_ylim(min_price - margin, max_price + margin)
else:
self.trend_var.set("Trend: N/A")
self.ax.set_ylim(0, 1)
self.ax.clear()
self.ax.set_facecolor(self.graph_bg)
self.ax.plot(range(len(self.mid_prices)), self.mid_prices, color=self.graph_line_color, linewidth=1.5)
self.ax.set_xlabel("Time", color=self.text_color, fontsize=8)
self.ax.set_ylabel("Mid Price (USD)", color=self.text_color, fontsize=8)
self.ax.tick_params(axis='x', colors=self.text_color, labelsize=8)
self.ax.tick_params(axis='y', colors=self.text_color, labelsize=8)
self.ax.spines['bottom'].set_color(self.text_color)
self.ax.spines['top'].set_color(self.text_color)
self.ax.spines['left'].set_color(self.text_color)
self.ax.spines['right'].set_color(self.text_color)
self.ax.grid(True, color=self.text_color, alpha=0.2)
self.canvas.draw()
def update_trade_counter(self, count):
"""Update the trade counter display in the GUI."""
self.trade_counter_var.set(f"Successful Trades: {count}")
self.trade_count = count
def update_usd_total(self, order_amount, limit_price):
"""Update the total USD from KAU sales in the GUI."""
usd_from_sale = order_amount * limit_price
self.usd_total += usd_from_sale
self.usd_total_var.set(f"USD Total: {self.usd_total:.2f}")
def start_price_updates(self):
"""Start a background thread to continuously update price information."""
self.price_thread = threading.Thread(target=self.run_price_updates)
self.price_thread.daemon = True
self.price_thread.start()
logging.info("Started continuous price updates")
def run_price_updates(self):
"""Fetch and update price information in a loop."""
pair_name = 'KAU_USD'
while self.price_thread_running:
try:
price_response = getPrice(pair_name)
if price_response and price_response.status_code == 200:
price_data = price_response.json()
bid_price = float(price_data.get("bid", 0))
ask_price = float(price_data.get("ask", 0))
if bid_price == 0 or ask_price == 0:
logging.error("Invalid bid or ask price received in price update")
self.update_price_display(None, None, None)
else:
spread = ask_price - bid_price
self.update_price_display(bid_price, ask_price, spread)
else:
logging.error("Failed to fetch price in price update")
self.update_price_display(None, None, None)
except Exception as e:
logging.error(f"Error in price update: {e}")
self.update_price_display(None, None, None)
time.sleep(1)
def on_closing(self):
"""Handle window closing by stopping threads, destroying the GUI, and exiting the application."""
logging.info("Closing application...")
self.price_thread_running = False # Signal the price thread to stop
if self.price_thread and self.price_thread.is_alive():
self.price_thread.join(timeout=1.0) # Wait for the price thread to terminate
self.stop_script() # Stop the trading script and join its thread
self.root.destroy() # Destroy the Tkinter window
logging.info("Application closed, exiting process.")
sys.exit(0) # Explicitly terminate the Python process
def start_script(self):
"""Start the trading script if not already running."""
if not self.running:
self.running = True
self.usd_total = 0.0
self.usd_total_var.set("USD Total: 0.00")
self.trade_count = 0
self.update_trade_counter(0)
self.status_var.set("Running")
self.status_label.configure(style="Running.TLabel")
self.thread = threading.Thread(target=self.run_trading_script)
self.thread.daemon = True
self.thread.start()
logging.info("Trading script started")
# Start checking for thread completion
self.check_thread_status()
def stop_script(self):
"""Stop the trading script and clean up."""
if self.running:
self.running = False # Signal the trading thread to stop
if self.thread and self.thread.is_alive() and threading.current_thread() != self.thread:
self.thread.join(timeout=1.0) # Wait for the trading thread to terminate
self.thread = None
self.cleanup_after_stop()
def cleanup_after_stop(self):
"""Perform cleanup tasks after the trading script stops."""
logging.info("Trading script stopped")
self.update_trade_counter(0)
self.usd_total = 0.0
self.usd_total_var.set("USD Total: 0.00")
self.status_var.set("Stopped")
self.status_label.configure(style="Stopped.TLabel")
def check_thread_status(self):
"""
Periodically check if the trading thread has finished and update the status if so.
Polls every 500ms to reduce CPU usage.
"""
if self.thread and not self.thread.is_alive() and self.running:
self.running = False
self.thread = None
self.cleanup_after_stop()
if self.running:
self.root.after(500, self.check_thread_status)
def run_trading_script(self):
"""
Execute the main trading loop, placing sell orders based on user parameters.
Continuously checks KAU balance, bid-ask spread, and places orders if conditions are met.
Stops when max USD is reached, or during shutdown.
"""
pair_name = 'KAU_USD'
price_precision = 2 # Decimal places for price calculations
amount_precision = 5 # Decimal places for order amounts
trade_count = 0
while self.running and self.price_thread_running: # Check if shutting down
try:
# Dynamically retrieve parameters on each iteration
sendAmount = self.sell_amount_var.get()
CHECK_INTERVAL_SECONDS = self.loop_timing_var.get()
spread_threshold = self.spread_threshold_var.get()
kau_threshold = self.kau_threshold_var.get()
DRY_RUN = self.dry_run_var.get()
max_usd = self.max_usd_var.get()
sell_price_offset = self.sell_price_offset_var.get()
logging.info("Fetching account holdings...")
holdings_response = getHoldings()
if holdings_response is None or holdings_response.status_code != 200:
logging.error("Failed to fetch holdings")
self.update_holdings_display({})
time.sleep(CHECK_INTERVAL_SECONDS)
continue
holdings = holdings_response.json()
logging.info(f"Holdings: {holdings}")
self.update_holdings_display(holdings)
kau_balance = 0.0
if 'KAU' in holdings:
kau_balance = float(holdings['KAU'].get('available', 0))
logging.info(f"KAU available balance: {kau_balance}")
usd_balance = 0.0
if 'USD' in holdings:
usd_balance = float(holdings['USD'].get('available', 0))
usd_balance = round(usd_balance, 2) - 10
logging.info(f"Available USD balance (after subtracting 10): {usd_balance:.2f} USD")
pyperclip.copy(str(usd_balance))
logging.info("USD balance copied to clipboard")
if kau_balance > kau_threshold:
logging.info(f"KAU balance ({kau_balance}) > {kau_threshold}. Checking bid-ask spread...")
price_response = getPrice(pair_name)
if price_response is None or price_response.status_code != 200:
logging.error(f"Failed to fetch price for {pair_name}")
time.sleep(CHECK_INTERVAL_SECONDS)
continue
price_data = price_response.json()
bid_price = float(price_data.get("bid", 0))
ask_price = float(price_data.get("ask", 0))
if bid_price == 0 or ask_price == 0:
logging.error("Invalid bid or ask price received")
time.sleep(CHECK_INTERVAL_SECONDS)
continue
logging.info(f"Current bid price: {bid_price:.2f}")
logging.info(f"Current ask price: {ask_price:.2f}")
spread = ask_price - bid_price
logging.info(f"Bid-ask spread: {spread:.2f}")
if spread < spread_threshold:
logging.info(f"Spread ({spread:.2f}) < {spread_threshold}. Placing sell order.")
# Apply sell price offset to bid price
limit_price = bid_price + sell_price_offset
limit_price = round(limit_price, price_precision)
logging.info(f"Limit price (bid + offset {sell_price_offset:.2f}): {limit_price:.2f}")
order_amount = sendAmount
order_amount = round(order_amount, amount_precision)
if order_amount > kau_balance:
logging.error(f"Insufficient balance ({kau_balance} KAU) to sell {order_amount} KAU")
time.sleep(CHECK_INTERVAL_SECONDS)
continue
logging.info(f"Placing sell order for {order_amount} KAU at {limit_price:.2f} USD...")
if DRY_RUN:
logging.info(f"[DRY RUN] Would sell {order_amount} KAU at {limit_price:.2f} USD")
order_id = "dry-run-order-id"
else:
order_id = placeOrder(pair_name, 'sell', order_amount, 'limit', priceLimit=limit_price)
if order_id:
logging.info(f"Order placed. ID: {order_id}")
self.update_usd_total(order_amount, limit_price)
logging.info(f"Total USD traded: {self.usd_total:.2f}")
trade_count += 1
self.update_trade_counter(trade_count)
if self.usd_total >= max_usd:
logging.info(f"Reached max trade amount of {max_usd:.2f} USD. Stopping.")
self.running = False
break
else:
logging.error("Failed to place order")
else:
logging.info(f"Spread ({spread:.2f}) >= {spread_threshold}. Skipping.")
else:
logging.info(f"KAU balance ({kau_balance}) <= {kau_threshold}. Skipping.")
logging.info(f"Sleeping for {CHECK_INTERVAL_SECONDS} seconds...")
time.sleep(CHECK_INTERVAL_SECONDS)
except Exception as e:
logging.error(f"Error: {e}")
self.update_holdings_display({})
time.sleep(CHECK_INTERVAL_SECONDS)
continue
# API Functions
def separator():
"""Print a separator line to the console for readability."""
print()
print('=====')
print()
def getNonce():
"""Generate a nonce based on the current UTC timestamp."""
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = round(utc_time.timestamp() * 1000)
return str(utc_timestamp)
def getAuthHeader(method, url, content=''):
"""
Generate authentication headers for Kinesis API requests.
Args:
method (str): HTTP method (e.g., 'GET', 'POST').
url (str): API endpoint URL.
content (str): Request body content (default '').
Returns:
dict: Headers with nonce, API key, and signature.
"""
nonce = getNonce()
message = str(nonce) + str(method) + str(url) + str(content)
message = message.encode(encoding='UTF-8', errors='strict')
byte_key = bytes(privateKey, 'UTF-8')
xsig = hmac.new(byte_key, message, hashlib.sha256).hexdigest().upper()
headers = {
"x-nonce": nonce,
"x-api-key": publicKey,
"x-signature": xsig
}
if method != 'DELETE':
headers["Content-Type"] = "application/json"
return headers
def cancelOrder(orderNo):
"""Cancel an existing order on the Kinesis exchange."""
url = "/v1/exchange/orders/" + orderNo
headersAuth = getAuthHeader('DELETE', url)
response = None
try:
response = requests.delete(base_url + url, headers=headersAuth)
response.raise_for_status()
logging.info(f"Order {orderNo} canceled successfully")
except requests.exceptions.RequestException as err:
logging.error(f"Error canceling order {orderNo}: {err}")
return response
def placeOrder(symbol, orderDirection, orderAmount, OrderType, priceLimit=0):
"""Place a new order on the Kinesis exchange."""
url = '/v1/exchange/orders'
payload = {
'currencyPairId': symbol,
'direction': orderDirection,
'amount': float(orderAmount),
'orderType': OrderType,
'limitPrice': priceLimit
}
payload = json.dumps(payload, separators=(',', ':'))
logging.info(f"Placing order: {payload}")
headersAuth = getAuthHeader('POST', url, payload)
response = None
try:
response = requests.post(base_url + url, data=payload, headers=headersAuth)
response.raise_for_status()
arr = response.json()
logging.info(f"Order response: {arr}")
order_id = arr.get("id")
if order_id:
logging.info(f"Order placed successfully: {order_id}")
return order_id
else:
logging.error("Order ID not found in response")
return None
except requests.exceptions.RequestException as err:
logging.error(f"Error placing order: {err} - Response: {response.text if response else 'No response'}")
return None
def getHoldings():
"""Fetch the current account holdings from the Kinesis exchange."""
url = '/v1/exchange/holdings'
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching holdings: {err}")
return response
def getPairs():
"""Fetch available trading pairs from the Kinesis exchange."""
url = '/v1/exchange/pairs'
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching pairs: {err}")
return response
def getDepth(pair):
"""Fetch the order book depth for a specific trading pair."""
url = '/v1/exchange/depth/' + pair
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching depth for {pair}: {err}")
return response
def getOHLC(pair, fromDate='2022-01-01T00:00:00.000Z', toDate='2022-03-31T00:00:00.000Z', timeFrame='60'):
"""Fetch OHLC (Open, High, Low, Close) data for a trading pair."""
url = '/v1/exchange/ohlc/' + pair
headersAuth = getAuthHeader('GET', url)
payload = {
'timeFrame': timeFrame,
'fromDate': fromDate,
'toDate': toDate
}
response = None
try:
response = requests.get(base_url + url, headers=headersAuth, params=payload)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching OHLC for {pair}: {err}")
return response
def getPrice(pair):
"""Fetch the current mid-price for a trading pair."""
url = '/v1/exchange/mid-price/' + pair
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching price for {pair}: {err}")
return response
def getStatement(pair=''):
"""Fetch the account balance statement from the Kinesis exchange."""
if pair:
url = f'/v1/exchange/reporting/account-balance-statement/{pair}'
else:
url = '/v1/exchange/reporting/account-balance-statement'
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching statement: {err}")
return response
def getOpenOrders():
"""Fetch all open orders from the Kinesis exchange."""
url = '/v1/exchange/orders/open'
headersAuth = getAuthHeader('GET', url)
response = None
try:
response = requests.get(base_url + url, headers=headersAuth)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching open orders: {err}")
return response
def sendToWallet(sendAmount, pair, wallet, memo='Send to wallet'):
"""Send funds to a wallet address via the Kinesis exchange."""
url = '/v1/exchange/withdrawals/address'
payload = {
'amount': sendAmount,
'currencyCode': pair,
'address': wallet,
'memo': memo
}
payload = json.dumps(payload, separators=(',', ':'))
headersAuth = getAuthHeader('POST', url, payload)
response = requests.post(base_url + url, data=payload, headers=headersAuth)
return response
def sendToEmail(sendAmount, pair, email, description='Send to email'):
"""Send funds to an email address via the Kinesis exchange."""
url = '/v1/exchange/withdrawals/email'
payload = {
'amount': sendAmount,
'currencyCode': pair,
'receiverEmail': email,
'description': description
}
payload = json.dumps(payload, separators=(',', ':'))
headersAuth = getAuthHeader('POST', url, payload)
response = requests.post(base_url + url, data=payload, headers=headersAuth)
return response
def bidAskCheck(Symbol):
"""Check if the bid price exceeds the ask price for a trading pair."""
today = datetime.datetime.now(timezone.utc)
price = getPrice(Symbol).json()
ask_price = price["ask"]
bid_price = price["bid"]
if bid_price > ask_price:
logging.warning(f"{today} {Symbol} bid: {bid_price} ask: {ask_price}")
time.sleep(5)
if __name__ == "__main__":
root = tk.Tk()
app = KAUTradingApp(root)
root.mainloop()
requests==2.31.0 pandas==2.2.2 python-dotenv==1.0.1 pyperclip==1.9.0 matplotlib==3.8.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment