Skip to content

Instantly share code, notes, and snippets.

@sanmai
Created June 10, 2026 05:59
Show Gist options
  • Select an option

  • Save sanmai/fcb3eeaadd90e660f800aefdccd67a1d to your computer and use it in GitHub Desktop.

Select an option

Save sanmai/fcb3eeaadd90e660f800aefdccd67a1d to your computer and use it in GitHub Desktop.
KV-quant sweep
#!/usr/bin/env python3
"""Generate a Python source file of functions with incompressible random bodies.
Real code is compressible: a noisy KV-cache retrieval still lands the right token
because the model's language prior reconstructs it. That hides KV-quant error.
This corpus removes the prior — every body line is a random identifier bound to a
random string literal, so verbatim recall MUST come from the KV cache and nowhere
else. That is the regime where q8 vs q4 K/V should finally diverge.
Output is valid Python (parses with `ast`), so the existing .py extractor and the
whole bench pipeline consume it unchanged. Repeatable: same --seed + same knobs
produce a byte-identical file.
./gen-random-corpus.py # defaults -> fixtures/random-bodies.py
./gen-random-corpus.py --functions 300 --value-len 40 # deeper, higher per-line entropy
"""
from __future__ import annotations
import argparse
import keyword
import random
import string
from pathlib import Path
IDENT_TAIL = string.ascii_lowercase + string.digits
VALUE_CHARS = string.ascii_letters + string.digits # no quotes/backslashes -> always valid
def rand_ident(rng: random.Random, n: int) -> str:
"""A valid Python identifier: a leading letter, then letters/digits."""
while True:
ident = rng.choice(string.ascii_lowercase) + "".join(
rng.choice(IDENT_TAIL) for _ in range(n - 1)
)
if not keyword.iskeyword(ident):
return ident
def rand_value(rng: random.Random, n: int) -> str:
return "".join(rng.choice(VALUE_CHARS) for _ in range(n))
def gen_function(rng: random.Random, index: int, body_lines: int, value_len: int) -> str:
name = f"f_{index:04d}_{rand_ident(rng, 4)}" # unique by index, non-keyword by prefix
lines = [f"def {name}():"]
used: set[str] = set()
for _ in range(body_lines):
var = rand_ident(rng, 8)
while var in used: # distinct vars so no line is a duplicate of another
var = rand_ident(rng, 8)
used.add(var)
lines.append(f' {var} = "{rand_value(rng, value_len)}"')
return "\n".join(lines)
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--functions", type=int, default=200, help="number of functions (depth knob)")
ap.add_argument("--body-lines", type=int, default=24, help="body statements per function (>=20 to be testable)")
ap.add_argument("--value-len", type=int, default=24, help="length of each random string literal (V-cache entropy knob)")
ap.add_argument("--seed", type=int, default=42, help="RNG seed (same seed -> identical file)")
ap.add_argument("--out", default="fixtures/random-bodies.py", help="output path")
args = ap.parse_args()
if args.body_lines < 20:
ap.error("--body-lines must be >= 20 (the bench needs 20 primary lines per function)")
rng = random.Random(args.seed)
text = "\n\n\n".join(
gen_function(rng, i, args.body_lines, args.value_len) for i in range(args.functions)
) + "\n"
out = Path(args.out)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(text)
nlines = text.count("\n") + 1
print(
f"wrote {out}: {args.functions} functions, {len(text):,} chars, {nlines:,} lines.\n"
f" seed={args.seed} body_lines={args.body_lines} value_len={args.value_len}\n"
f" note: random content tokenizes denser than prose (~1 token per 1-2 chars),\n"
f" so the real prompt token count is well above a chars/4 prose estimate — read it\n"
f" off the server log on the first run and tune --functions to your target depth."
)
if __name__ == "__main__":
main()
import json, glob, re, sys
from collections import defaultdict
corpus = sys.argv[1] if len(sys.argv) > 1 else "proprietary"
QUANTS = ["bf16","q8_0","q5_1","q5_0","q4_1","q4_0"]
cells = {}
for f in glob.glob(f"results/{corpus}__qwen36-27b-gguf__*.json"):
m = re.search(r"__([a-z0-9]+(?:_[0-9]+)?)-([a-z0-9]+(?:_[0-9]+)?)\.json$", f)
if not m: continue
ctk, ctv = m.group(1), m.group(2)
d = json.load(open(f))
res = d["results"]
n = len(res)
passed = sum(1 for r in res if r["passed"])
errored = sum(1 for r in res if r.get("error"))
matched = [r["primary_matched"] for r in res]
avg = sum(matched)/n if n else 0
halluc = sum(r["hallucinated"] for r in res)/n if n else 0
cells[(ctk,ctv)] = (passed, n, avg, errored, halluc)
def grid(title, fn, fmt):
print(f"\n{title} (rows=ctk, cols=ctv)")
print("ctk\\ctv " + " ".join(f"{q:>6}" for q in QUANTS))
for ctk in QUANTS:
row = []
for ctv in QUANTS:
c = cells.get((ctk,ctv))
row.append(fmt(fn(c)) if c else " - ")
print(f"{ctk:>7} " + " ".join(row))
grid("PASS / 16", lambda c: c[0], lambda v: f"{v:>6}")
grid("AVG primary_matched /20", lambda c: c[2], lambda v: f"{v:6.2f}")
grid("ERRORED", lambda c: c[3], lambda v: f"{v:>6}")
grid("AVG hallucinated", lambda c: c[4], lambda v: f"{v:6.2f}")
# overall spread
import statistics
passes = [c[0] for c in cells.values()]
avgs = [c[2] for c in cells.values()]
print(f"\n--- summary over {len(cells)} combos ---")
print(f"pass/16: min={min(passes)} max={max(passes)} mean={statistics.mean(passes):.2f} stdev={statistics.pstdev(passes):.2f}")
print(f"avg/20: min={min(avgs):.2f} max={max(avgs):.2f} mean={statistics.mean(avgs):.2f} stdev={statistics.pstdev(avgs):.2f}")
import json, glob, re, sys
import plotly.graph_objects as go
corpus = sys.argv[1] if len(sys.argv) > 1 else "proprietary"
QUANTS = ["bf16","q8_0","q5_1","q5_0","q4_1","q4_0"] # high -> low precision
cells = {}
for f in glob.glob(f"results/{corpus}__qwen36-27b-gguf__*.json"):
m = re.search(r"__([a-z0-9]+(?:_[0-9]+)?)-([a-z0-9]+(?:_[0-9]+)?)\.json$", f)
if not m: continue
ctk, ctv = m.group(1), m.group(2)
d = json.load(open(f))
res = d["results"]
cells[(ctk,ctv)] = sum(r["primary_matched"] for r in res)/len(res)
# z[row=ctv][col=ctk] so ctk on x-axis, ctv on y-axis
z, text = [], []
for ctv in QUANTS:
zrow, trow = [], []
for ctk in QUANTS:
v = cells.get((ctk,ctv))
zrow.append(v)
trow.append(f"{v:.2f}" if v is not None else "")
z.append(zrow); text.append(trow)
vals = [v for v in cells.values()]
fig = go.Figure(go.Heatmap(
z=z, x=QUANTS, y=QUANTS, text=text, texttemplate="%{text}",
textfont={"size":14},
colorscale="RdYlGn", zmin=min(vals), zmax=max(vals),
colorbar={"title":"avg matched /20"},
))
fig.update_layout(
title=f"{corpus}: avg primary lines matched /20 (range {min(vals):.2f}-{max(vals):.2f}, n=16 fns/cell)",
xaxis_title="ctk (K-cache quant)", yaxis_title="ctv (V-cache quant)",
xaxis={"side":"top"}, yaxis={"autorange":"reversed"},
width=720, height=640, font={"family":"monospace"},
)
out_html = f"analysis/charts/{corpus}_matched_heatmap.html"
out_png = f"tmp/{corpus}_matched_heatmap.png"
import os; os.makedirs("analysis/charts", exist_ok=True)
fig.write_html(out_html)
fig.write_image(out_png, scale=2)
print("wrote", out_html, "and", out_png)
#!/bin/bash
# Sweep every -ctk x -ctv KV-cache quantization pairing for the qwen3.6-27b model
# and benchmark each one with codeneedle.
#
# Per combination: relaunch the llama.cpp server with that K/V cache quant (the
# runner uses a modified llama build that permits all pairings), wait for it to
# load, run a tagged round, then kill it before the next combo.
#
# Run from the codeneedle repo root. This is long: 49 combos x (http_server +
# jquery). It is resumable — a combo whose jquery result already exists is
# skipped; set FORCE=1 to redo everything.
#
# ./sweep-kv-quants.sh
# FORCE=1 ./sweep-kv-quants.sh # ignore existing results, rerun all
set -uo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG="${SWEEP_LOG:-$PWD/sweep-kv-quants.log}" # in the dir you invoke from
cd "$SCRIPT_DIR" || exit 1
# Mirror everything to the log (appended, so a resumed run keeps history).
exec > >(tee -a "$LOG") 2>&1
echo "===== sweep started $(date) -> $LOG ====="
RUNNER="${RUNNER:-$SCRIPT_DIR/qwen3.6-27b-n4_0-mse.sh}"
MODEL="qwen36-27b-gguf" # configs/models/<MODEL>.toml (base_url :4000)
CORPUS="${CORPUS:-proprietary}" # one corpus per sweep; sets the run + resume sentinel
PORT=4000
READY_TIMEOUT="${READY_TIMEOUT:-300}" # seconds to wait for the server to load
# iq4_nl excluded: no flash-attention kernel, so it loads but wedges inference.
# (f16 would belong here too, but this build doesn't have it.)
QUANTS=(bf16 q8_0 q4_0 q4_1 q5_0 q5_1)
# Shuffle the quant list each launch so a fresh sweep explores different pairs
# first (the nested loop reuses this one list, reordering both axes). Resume is
# per-combo, so order never affects what gets skipped.
mapfile -t QUANTS < <(printf '%s\n' "${QUANTS[@]}" | shuf)
echo "corpus: ${CORPUS} model: ${MODEL}"
echo "quant order this run: ${QUANTS[*]}"
wait_ready() { # poll /health (503 while loading, 200 when ready), or die / time out
local pid="$1" waited=0
until curl -fs -o /dev/null "http://localhost:$PORT/health"; do
if ! kill -0 "$pid" 2>/dev/null; then echo " runner exited while loading"; return 1; fi
if [ "$waited" -ge "$READY_TIMEOUT" ]; then echo " timed out after ${READY_TIMEOUT}s"; return 1; fi
sleep 1; waited=$((waited + 1))
done
}
stop_server() { # kill the runner + its llama-server child, wait until the port frees
kill "$1" 2>/dev/null
killall llama-server 2>/dev/null
local waited=0
while pgrep -x llama-server >/dev/null; do
[ "$waited" -ge 30 ] && { killall -9 llama-server 2>/dev/null; break; }
sleep 1; waited=$((waited + 1))
done
}
echo "clearing any running llama-server..."
killall llama-server 2>/dev/null
while pgrep -x llama-server >/dev/null; do sleep 1; done
total=$(( ${#QUANTS[@]} * ${#QUANTS[@]} ))
n=0
for ctk in "${QUANTS[@]}"; do
for ctv in "${QUANTS[@]}"; do
n=$((n + 1))
tag="${ctk}-${ctv}"
echo
echo "########## [$n/$total] ctk=$ctk ctv=$ctv (tag=$tag) ##########"
if [ -z "${FORCE:-}" ] && [ -f "results/${CORPUS}__${MODEL}__${tag}.json" ]; then
echo " already done — skipping (FORCE=1 to redo)"
continue
fi
# The runner hardcodes -ctk q8_0 -ctv q4_0; appended flags win (last occurrence).
"$RUNNER" -ctk "$ctk" -ctv "$ctv" >/dev/null 2>&1 &
pid=$!
echo " launched runner pid=$pid; waiting for server (timeout ${READY_TIMEOUT}s)..."
if wait_ready "$pid"; then
echo " server up — benchmarking"
./run-round.py --model "$MODEL" --corpus "$CORPUS" --tag "$tag" || true
else
echo " server never became ready — skipping this combo"
fi
stop_server "$pid"
done
done
echo
echo "sweep complete. results in results/<corpus>__${MODEL}__<ctk>-<ctv>.json"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment