Skip to content

Instantly share code, notes, and snippets.

@EvilFreelancer
Last active April 20, 2025 12:00
Show Gist options
  • Save EvilFreelancer/71382b29b58e5d23d38b22056661a2ce to your computer and use it in GitHub Desktop.
Save EvilFreelancer/71382b29b58e5d23d38b22056661a2ce to your computer and use it in GitHub Desktop.
LiteLLM usage (total, prompt, completion) promepteus exporter by user by llm model
from prometheus_client import start_http_server, Counter
import os
import psycopg2
import time
from datetime import datetime, timedelta, UTC
import json
# Read environment variables
DB_NAME = os.getenv("POSTGRES_DB")
DB_USER = os.getenv("POSTGRES_USER")
DB_PASSWORD = os.getenv("POSTGRES_PASSWORD")
DB_HOST = os.getenv("POSTGRES_HOST", "localhost")
DB_PORT = os.getenv("POSTGRES_PORT", 5432)
EXPORTER_PORT = int(os.getenv("EXPORTER_PORT", 9090))
CHECKPOINT_FILE = os.getenv("CHECKPOINT_FILE", ".checkpoint")
STATE_FILE = os.getenv("STATE_FILE", ".state.json")
# Connect to the PostgreSQL database
conn = psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
cursor = conn.cursor()
# Create a Prometheus metric
tokens_by_user_counter = Counter(
'llm_tokens_by_user_total',
'Total tokens used per user_email and model (cumulative)',
['user_email', 'model', 'type']
)
# State management
state = {}
def load_state():
global state
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
try:
state = json.load(f)
except json.JSONDecodeError:
state = {}
else:
state = {}
def save_state():
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def get_key(user_email, model, token_type):
return f"{user_email}:{model}:{token_type}"
# Load checkpoint from file
def load_checkpoint():
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, "r") as f:
return f.read().strip()
# fallback: last 5 minutes
fallback_time = datetime.now(UTC) - timedelta(minutes=5)
return fallback_time.strftime("%Y-%m-%d %H:%M:%S")
def save_checkpoint(ts: str):
with open(CHECKPOINT_FILE, "w") as f:
f.write(ts)
# Collect metrics from the database
def collect_metrics():
last_ts = load_checkpoint()
query = """
SELECT
COALESCE(u.user_email, SUBSTRING(sl.api_key, 1, 4) || '...' || RIGHT(sl.api_key, 4)) AS user_email,
sl.model,
COALESCE(SUM(sl.prompt_tokens), 0) AS prompt,
COALESCE(SUM(sl.completion_tokens), 0) AS completion,
COALESCE(SUM(sl.total_tokens), 0) AS total,
MAX(sl."endTime") as max_ts
FROM public."LiteLLM_SpendLogs" sl
LEFT JOIN public."LiteLLM_VerificationToken" vt ON sl.api_key = vt.token
LEFT JOIN public."LiteLLM_UserTable" u ON vt.user_id = u.user_id
WHERE sl."endTime" > %s
GROUP BY u.user_email, sl.api_key, sl.model
"""
cursor.execute(query, (last_ts,))
rows = cursor.fetchall()
new_checkpoint = None
for row in rows:
user_email, model, prompt, completion, total, max_ts = row
user_email = user_email or "unknown"
model = model or "unknown"
for token_type, value in [("prompt", prompt), ("completion", completion), ("total", total)]:
key = get_key(user_email, model, token_type)
prev_value = state.get(key, 0)
delta = value - prev_value
if delta > 0:
tokens_by_user_counter.labels(user_email=user_email, model=model, type=token_type).inc(delta)
state[key] = value
if max_ts and (not new_checkpoint or max_ts > new_checkpoint):
new_checkpoint = max_ts
if new_checkpoint:
save_checkpoint(new_checkpoint.strftime("%Y-%m-%d %H:%M:%S"))
save_state()
if __name__ == '__main__':
start_http_server(EXPORTER_PORT)
print(f"Exporter running on :{EXPORTER_PORT}")
print(f"Checkpoint file: {CHECKPOINT_FILE}")
print(f"State file: {STATE_FILE}")
load_state()
while True:
collect_metrics()
time.sleep(30)
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir prometheus_client psycopg2-binary
COPY app.py .
EXPOSE 9090
CMD ["python", "app.py"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment