Created
March 10, 2026 21:44
-
-
Save 0xdeafbeef/6fd496c4316e98f67baf05a420812bd0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # prerequesties: | |
| # hardware.cpu.amd.ryzen-smu.enable = true; | |
| # environment.systemPackages = with pkgs; [ | |
| # ryzenadj | |
| # ]; | |
| import subprocess | |
| import re | |
| import csv | |
| import time | |
| import os | |
| import sys | |
| # ========================================== | |
| # CONFIGURATION | |
| # ========================================== | |
| OFFSETS =[0, -10, -15, -20, -30] # The Curve Optimizer offsets to test | |
| RUNS_PER_OFFSET = 2 # How many times to test each offset | |
| COOLDOWN_SECONDS = 30 # Wait time between runs for heatsink cooldown | |
| CSV_FILENAME = "co_benchmark_results.csv" # Output file name | |
| # Sysbench Settings | |
| PRIME_LIMIT = 10000 | |
| SYSBENCH_TIME = 60 | |
| SYSBENCH_THREADS = os.cpu_count() or 8 # Automatically use all CPU threads | |
| SYSBENCH_SEED = 42 # Fixed seed for run-to-run consistency | |
| # ========================================== | |
| def check_root(): | |
| """Ensure the script is run with sudo for ryzenadj.""" | |
| if os.geteuid() != 0: | |
| print("ERROR: This script must be run as root (sudo) to use ryzenadj.") | |
| sys.exit(1) | |
| def set_co(offset): | |
| """Applies the Curve Optimizer offset via ryzenadj.""" | |
| print(f"\n[{time.strftime('%H:%M:%S')}] Applying Curve Optimizer offset: {offset}") | |
| cmd = f"ryzenadj --set-coall={offset}" | |
| try: | |
| subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| except subprocess.CalledProcessError: | |
| print(f"FAILED to set CO offset {offset}. Ensure ryzenadj is installed and in your PATH.") | |
| sys.exit(1) | |
| # Give the system a moment to apply the voltage change | |
| time.sleep(2) | |
| def run_sysbench(): | |
| """Executes sysbench and returns the standard output.""" | |
| print(f"[{time.strftime('%H:%M:%S')}] Running Sysbench (Limit: {PRIME_LIMIT}, Time: {SYSBENCH_TIME}s, Threads: {SYSBENCH_THREADS})...") | |
| cmd = f"sysbench cpu --cpu-max-prime={PRIME_LIMIT} --time={SYSBENCH_TIME} --threads={SYSBENCH_THREADS} --rand-seed={SYSBENCH_SEED} run" | |
| result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| return result.stdout | |
| def parse_sysbench_output(output): | |
| """Extracts the critical data points using Regex.""" | |
| metrics = { | |
| "events_per_sec": r"events per second:\s+([0-9.]+)", | |
| "total_events": r"total number of events:\s+([0-9]+)", | |
| "min_latency_ms": r"min:\s+([0-9.]+)", | |
| "avg_latency_ms": r"avg:\s+([0-9.]+)", | |
| "max_latency_ms": r"max:\s+([0-9.]+)", | |
| "p95_latency_ms": r"95th percentile:\s+([0-9.]+)" | |
| } | |
| parsed_data = {} | |
| for key, pattern in metrics.items(): | |
| match = re.search(pattern, output) | |
| if match: | |
| # Convert to float (or int if it's the total events count) | |
| val_str = match.group(1) | |
| parsed_data[key] = float(val_str) if "." in val_str else int(val_str) | |
| else: | |
| parsed_data[key] = None | |
| return parsed_data | |
| def main(): | |
| check_root() | |
| results =[] | |
| print("===============================================") | |
| print(" Starting Automated Curve Optimizer Benchmark...") | |
| print("===============================================") | |
| for offset in OFFSETS: | |
| for run in range(1, RUNS_PER_OFFSET + 1): | |
| print(f"\n--- Testing Offset: {offset} (Run {run}/{RUNS_PER_OFFSET}) ---") | |
| # 1. Apply Offset | |
| set_co(offset) | |
| # 2. Benchmark | |
| output = run_sysbench() | |
| # 3. Parse Data | |
| data = parse_sysbench_output(output) | |
| # 4. Save to dictionary | |
| row = { | |
| "co_offset": offset, | |
| "run_number": run, | |
| **data | |
| } | |
| results.append(row) | |
| print(f"-> Result: {data.get('events_per_sec')} events/sec | P95 Latency: {data.get('p95_latency_ms')} ms | Max Latency: {data.get('max_latency_ms')} ms") | |
| # 5. Cooldown (Skip cooldown on the very last run) | |
| if not (offset == OFFSETS[-1] and run == RUNS_PER_OFFSET): | |
| print(f"[{time.strftime('%H:%M:%S')}] Cooling down for {COOLDOWN_SECONDS} seconds to prevent thermal throttling...") | |
| time.sleep(COOLDOWN_SECONDS) | |
| # Write to CSV | |
| print(f"\nWriting aggregated results to {CSV_FILENAME}...") | |
| with open(CSV_FILENAME, mode="w", newline="") as file: | |
| fieldnames =[ | |
| "co_offset", "run_number", "events_per_sec", "total_events", | |
| "min_latency_ms", "avg_latency_ms", "max_latency_ms", "p95_latency_ms" | |
| ] | |
| writer = csv.DictWriter(file, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for row in results: | |
| writer.writerow(row) | |
| print("Done! Resetting CO to stable (-15)...") | |
| set_co(-15) # Resetting to your chosen stable profile instead of 0 | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment