Skip to content

Instantly share code, notes, and snippets.

@pfn
Last active April 1, 2026 15:37
Show Gist options
  • Select an option

  • Save pfn/6d3ed2dc05bcf40f288f9c90b1680176 to your computer and use it in GitHub Desktop.

Select an option

Save pfn/6d3ed2dc05bcf40f288f9c90b1680176 to your computer and use it in GitHub Desktop.
PP/TG counter for openwebui
"""
title: Real-time PP and TG metrics
author: pfn0
author_url: https://github.com/pfn
funding_url: https://github.com/pfn
version: 0.9
"""
from pydantic import BaseModel, Field
import traceback
import tiktoken
import logging
import time
CHIP_HTML = """
<div id="token-metrics-pptg-chip" class="absolute -top-12 left-0 right-0 flex z-30 pointer-events-none">
<button class="text-xs px-3 bg-white border border-gray-100 dark:border-none dark:bg-white/20 p-1.5 rounded-full pointer-events-auto">
Generating...
</button>
</div>
""".replace("\n", "").replace("'", '"')
ENCODING = tiktoken.get_encoding("cl100k_base")
def num_tokens_from_string(text: str) -> int:
"""Count the number of tokens in a string."""
# use r50k_base as a common default, not going to do heuristics to guess at which tokenizer to use
if not text:
return 0
return len(ENCODING.encode(text))
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=999, description="Filter execution order. Lower values run first."
)
def __init__(self):
self.valves = self.Valves()
self.logger = logging.getLogger("pptg_metrics")
self.chat_stats = {}
self.ewma_alpha = 0.6 # EWMA smoothing factor
async def inlet(
self,
body: dict,
__event_emitter__,
__metadata__: dict = None,
) -> dict:
if "stream" in body and body["stream"]:
self.chat_stats[__metadata__["chat_id"]] = {
"start_time": time.time(),
"last_update": time.time(),
"tg": 0,
# Removed: last_stream_time - now set when generation starts
"last_token_count": 0, # Cumulative tokens at last update
"ewma_tg": None, # EWMA token rate (None until first calculation)
"ewma_initialized": False, # Track if we've done first EWMA calculation
}
await __event_emitter__(
{
"type": "status",
"data": {"description": "Processing prompt...", "done": False},
}
)
return body
async def stream(self, event: dict, __event_emitter__, __metadata__) -> dict:
try:
if not isinstance(event, dict):
return event
now = time.time()
if not __metadata__["chat_id"] in self.chat_stats:
return event
chat_stats = self.chat_stats[__metadata__["chat_id"]]
if (
"ttft" not in chat_stats
): # streaming means tokens are incoming, even if not counted because of thinking
chat_stats["ttft"] = now - chat_stats["start_time"]
content = None
try:
if "choices" in event and len(event["choices"]) > 0:
choice = event["choices"][0]
if "delta" in choice and "content" in choice["delta"]:
content = choice["delta"]["content"]
except Exception as e:
self.logger.error(f"Stream ex: {e}\n{traceback.format_exc()}")
self.logger.error(f"Stream event: {event}")
if not content and "thinking_start" not in chat_stats:
chat_stats["thinking_start"] = now
if content:
if "generation_start" not in chat_stats:
chat_stats["generation_start"] = now
# FIX: Initialize last_stream_time when generation starts, not in inlet()
chat_stats["last_stream_time"] = now
chat_stats["tg"] += num_tokens_from_string(content)
if not "generation_start" in chat_stats:
return event
gen_time = now - chat_stats["generation_start"]
chat_stats["gen_time"] = gen_time
if gen_time < 0.001: # is first stream event, do this once
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Prompt processed in {chat_stats['ttft']:.1f}s, generating response...",
"done": False,
},
}
)
await __event_emitter__(
{
"type": "execute",
"data": {"code": f"""
(function() {{
let chip = document.getElementById("token-metrics-pptg-chip");
if (chip) {{
chip.querySelector("button").textContent = "Generating...";
}} else {{
let chipbar = document.querySelector("div.w-full.font-primary > div > div > div.relative")
chipbar.insertAdjacentHTML('beforeend', '{CHIP_HTML}');
}}
}})();
"""},
}
)
return event
# Calculate EWMA token rate every 0.5s
if now - chat_stats["last_update"] > 0.50:
chat_stats["last_update"] = now
# Calculate time interval since last stream invocation
time_delta = now - chat_stats["last_stream_time"]
chat_stats["last_stream_time"] = now
# Calculate tokens received since last update
tokens_delta = chat_stats["tg"] - chat_stats["last_token_count"]
chat_stats["last_token_count"] = chat_stats["tg"]
# Calculate instantaneous rate for this interval
if time_delta > 0:
instantaneous_rate = tokens_delta / time_delta
else:
instantaneous_rate = 0
# Apply EWMA smoothing
if not chat_stats["ewma_initialized"]:
# First calculation: use instantaneous rate as-is
chat_stats["ewma_tg"] = instantaneous_rate
chat_stats["ewma_initialized"] = True
else:
# Subsequent calculations: apply EWMA formula
# EWMA = alpha * new_value + (1 - alpha) * old_value
chat_stats["ewma_tg"] = (self.ewma_alpha * instantaneous_rate) + (
(1 - self.ewma_alpha) * chat_stats["ewma_tg"]
)
# Update display with EWMA rate
if gen_time > 0.001:
tg = chat_stats["ewma_tg"]
await __event_emitter__(
{
"type": "execute",
"data": {"code": f"""
(function() {{
let chipContainer = document.getElementById("token-metrics-pptg-chip");
if (!chipContainer) {{
let chipbar = document.querySelector("div.w-full.font-primary > div > div > div.relative")
chipbar.insertAdjacentHTML('beforeend', '{CHIP_HTML}');
}}
let chip = document.getElementById("token-metrics-pptg-chip").querySelector("button");
chip.textContent = "Generating {tg:.1f} t/s";
}})();
"""},
}
)
except Exception as e:
self.logger.error(f"stream error: {e}\n{traceback.format_exc()}")
return event
async def outlet(self, body: dict, __event_emitter__, __metadata__) -> dict:
try:
chat_stats = self.chat_stats[__metadata__["chat_id"]]
tg = None
result_str = "Response complete"
if "ttft" in chat_stats:
result_str = f"Prompt processed in {chat_stats['ttft']:.1f}s"
gen_time = None
if (
chat_stats
and "tg" in chat_stats
and "gen_time" in chat_stats
and chat_stats["gen_time"] > 0.001
):
gen_time = chat_stats["gen_time"]
tg = chat_stats["tg"] / gen_time
result_str = f"Generated {chat_stats['tg']} tokens in {chat_stats['gen_time']:.1f}s at {tg:.1f} t/s"
if (
"usage" in body["messages"][-1]
): # usage is oddly nested in the last message in body, rather than being top-level
usage = body["messages"][-1]["usage"]
if (
usage
and "predicted_n" in usage
and "predicted_ms" in usage
and "predicted_per_second" in usage
):
result_str = f"Generated {usage['predicted_n']} tokens in {usage['predicted_ms'] / 1000:.1f}s at {usage['predicted_per_second']:.1f} t/s"
elif usage and gen_time and "completion_tokens" in usage:
if "thinking_start" in chat_stats:
gen_time += (
chat_stats["generation_start"]
- chat_stats["thinking_start"]
)
tokens = usage["completion_tokens"]
gen_time = max(0.001, gen_time)
result_str = f"Generated {tokens} tokens in {gen_time:.1f}s at {tokens / gen_time:.1f} t/s"
await __event_emitter__(
{
"type": "execute",
"data": {"code": f"""
(function() {{
let chip = document.getElementById("token-metrics-pptg-chip");
if (chip) chip.parentNode.removeChild(chip);
}})();
"""},
}
)
await __event_emitter__(
{
"type": "status",
"data": {
"description": result_str,
"done": True,
},
}
)
finally:
del self.chat_stats[__metadata__["chat_id"]]
return body
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment