Skip to content

Instantly share code, notes, and snippets.

@vitorcalvi
Last active April 7, 2025 07:27
Show Gist options
  • Save vitorcalvi/7bd9c37411c0a5ae5464cc2bd2cfd289 to your computer and use it in GitHub Desktop.
Save vitorcalvi/7bd9c37411c0a5ae5464cc2bd2cfd289 to your computer and use it in GitHub Desktop.
EVO-X1 34GB Machine Learning Device Setup

sudo apt update && sudo apt install -y
build-essential
dkms

wget https://repo.radeon.com/amdgpu-install/6.3.3/ubuntu/noble/amdgpu-install_6.3.60303-1_all.deb sudo apt install -y ./amdgpu-install_6.3.60303-1_all.deb

Install with gfx1150 support

sudo amdgpu-install -y --usecase=rocm,mlsdk
--no-dkms
--opencl=rocr
--allow-unauthenticated

sudo apt install -y docker.io sudo systemctl enable --now docker sudo systemctl restart docker sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)"
-o /usr/local/bin/docker-compose

sudo chmod +x /usr/local/bin/docker-compose docker-compose --version

echo "📦 Installing dependencies..." sudo apt update && sudo apt install -y
cpufrequtils
rocm-smi
lm-sensors
cpuset
amdgpu-dkms

echo "🚀 Starting Ryzen AI Ultimate Optimization (v1.9)" echo "⚠️ Monitor with: watch -n1 "sensors amdgpu-* && rocm-smi -c && sudo cpupower monitor"" sleep 2

--- CPU Optimization ---

echo "⚙️ Configuring CPU..." echo "active" | sudo tee /sys/devices/system/cpu/amd_pstate/status >/dev/null sudo cpupower frequency-set --governor performance

echo " Isolating performance cores..." sudo cset shield --cpu=0-7,16-23 --kthread=on # Updated for Zen5 hybrid

--- Memory Tuning ---

echo "🧠 Memory Optimization:" sudo sysctl -w vm.swappiness=1 sudo sysctl -w vm.dirty_ratio=60 sudo sysctl -w vm.dirty_background_ratio=3 sudo sysctl -w vm.nr_hugepages=8192 # Aligned with system setup

--- GPU Configuration ---

echo "🎮 GPU Optimization:" GPU_SYSFS="/sys/class/drm/card0/device" echo " Using GPU path: $GPU_SYSFS"

echo " Setting GPU control to manual..." echo "manual" | sudo tee $GPU_SYSFS/power_dpm_force_performance_level

if [ -f $GPU_SYSFS/pp_od_clk_voltage ]; then echo " Applying GPU OC Profile..." sudo bash -c "echo 'r' > $GPU_SYSFS/pp_od_clk_voltage" # Reset sudo bash -c "echo 's 7 3000' > $GPU_SYSFS/pp_od_clk_voltage" sudo bash -c "echo 'm 3 2250' > $GPU_SYSFS/pp_od_clk_voltage" sudo bash -c "echo 'c' > $GPU_SYSFS/pp_od_clk_voltage" fi

HWMON_PATH=$(find $GPU_SYSFS/hwmon/ -name "hwmon*" -type d | head -1) echo " Adjusting power limits to 135W @ $HWMON_PATH..." sudo bash -c "echo 135000 > $HWMON_PATH/power1_cap"

--- System Services ---

echo "🔋 Power Management:" sudo systemctl stop power-profiles-daemon.service 2>/dev/null sudo systemctl mask power-profiles-daemon.service 2>/dev/null

--- Kernel Tuning ---

echo "🐧 Kernel Optimization:" sudo sysctl -w kernel.watchdog=0 sudo sysctl -w kernel.numa_balancing=0

echo "✅ Optimization Complete! Reboot recommended."

sudo docker run -itd --name pytorch-rocm
--device=/dev/kfd
--device=/dev/dri
--group-add video
--ipc=host
--shm-size 32G
-p 8888:8888
-v $(pwd):/workspace
-e HSA_OVERRIDE_GFX_VERSION=11.0.0
-e HSA_ENABLE_SDMA=1
rocm/pytorch:rocm6.3.3_ubuntu24.04_py3.12_pytorch_release_2.4.0

sudo docker run -itd --name tensorflow-rocm
--device=/dev/kfd
--device=/dev/dri
--group-add video
--ipc=host
--shm-size 32G
-e HSA_OVERRIDE_GFX_VERSION=11.0.0
-e TF_ROCM_FUSION_ENABLE=1
rocm/tensorflow:rocm6.3.3-py3.12-tf2.17-dev

@vitorcalvi
Copy link
Author

GEMINI

#!/usr/bin/env python3
import json
import subprocess
import sys
import time
from itertools import product
from pathlib import Path
from typing import Dict, List, Any # Added Any
from tqdm import tqdm
from colorama import Fore, Style, init

# --- Configuration ---
CONFIG = {
    "llama_bench_path": "./bin/llama-bench",
    "model_path": "/home/vi/models/DeepSeek-R1-Distill-Qwen-14B-IQ4_NL.gguf",
    "output_json": "/home/vi/llama_bench_results_deepseek.json",
    "max_retries": 1,
    "timeout": 300,
    "numa_strategy": "isolate",
    "cpu_mask": "0x55555555",
    "main_gpu": 0,
    "ubatch_size": 512,
    "mmap": 1,
    "repetitions": 5,
    "poll": 50,
    "combination_top_n": 2,
    "deep_search_params": {
        "step_percentage": 0.25,
        "steps_around": 2,
        "min_values": 3
    },
    "temperature": {
        "cpu_sensor": "k10temp",
        "gpu_hwmon_pattern": "/sys/class/drm/card0/device/hwmon/hwmon*/temp1_input", # Note: This pattern isn't used in the current get_gpu_temperature logic
        "max_temp_samples": 5,
        "sample_interval": 0.2
    },
    "parameters": {
        "threads": [4, 8, 12, 16, 24],
        "batch_size": [512, 1024, 2048, 4096],
        "gpu_layers": [99, 80, 64, 32],
        "flash_attn": [0, 1],
        "split_mode": ["layer", "row"],
        "cache_type": ["f16", "q4_0"],
        "tensor_split": ["0", "0/0"]
    },
    "optimization_order": [
        "threads",
        "batch_size",
        "gpu_layers",
        "flash_attn",
        "split_mode",
        "cache_type",
        "tensor_split"
    ]
}

init(autoreset=True)

class BenchmarkRunner:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.results: List[Dict[str, Any]] = []
        self.best_params: Dict[str, Any] = {"tg_speed": 0.0}
        self.param_results: Dict[str, List[tuple[Any, float]]] = {param: [] for param in config['parameters']}
        self.validate_paths()
        self.load_existing_results()

    def validate_paths(self):
        if not Path(self.config['llama_bench_path']).is_file():
            self.error_exit(f"llama-bench not found at {self.config['llama_bench_path']}")
        if not Path(self.config['model_path']).is_file():
            self.error_exit(f"Model file not found at {self.config['model_path']}")
        Path(self.config['output_json']).parent.mkdir(parents=True, exist_ok=True)

    def error_exit(self, message: str):
        print(f"{Fore.RED}ERROR: {message}{Style.RESET_ALL}")
        sys.exit(1)

    def load_existing_results(self):
        output_path = Path(self.config['output_json'])
        if output_path.exists() and output_path.stat().st_size > 0: # Check if file exists and is not empty
            try:
                with open(output_path, 'r') as f:
                    self.results = json.load(f)
                    if isinstance(self.results, list) and self.results: # Ensure it's a non-empty list
                        # Filter out entries that might lack 'tg_speed' before finding max
                        valid_results = [r for r in self.results if isinstance(r, dict) and 'tg_speed' in r and isinstance(r['tg_speed'], (int, float))]
                        if valid_results:
                            self.best_params = max(valid_results, key=lambda x: x.get('tg_speed', 0.0))
                        else:
                            print(f"{Fore.YELLOW}Warning: Existing results file contains no valid entries with 'tg_speed'. Starting fresh search.{Style.RESET_ALL}")
                            self._initialize_best_params()
                    else:
                        # Handle cases where the file might contain non-list JSON or was empty after loading
                        print(f"{Fore.YELLOW}Warning: Existing results file is empty or not a list. Starting fresh search.{Style.RESET_ALL}")
                        self.results = []
                        self._initialize_best_params()

            except json.JSONDecodeError as e:
                print(f"{Fore.YELLOW}Warning: Failed to decode JSON from {self.config['output_json']}. File might be corrupted. Starting fresh search. Error: {e}{Style.RESET_ALL}")
                self.results = []
                self._initialize_best_params()
            except Exception as e:
                 # Catch other potential file reading errors
                print(f"{Fore.YELLOW}Warning: Failed to load or process results from {self.config['output_json']}. Starting fresh search. Error: {e}{Style.RESET_ALL}")
                self.results = []
                self._initialize_best_params()
        else:
            # File doesn't exist or is empty
            self._initialize_best_params()

    def _initialize_best_params(self):
         """Sets initial best_params based on the first value of each parameter."""
         self.best_params = {
             param: self.config['parameters'][param][0]
             for param in self.config['optimization_order']
         }
         self.best_params['tg_speed'] = 0.0 # Explicitly set initial speed


    def get_cpu_temperature(self) -> float:
        try:
            # Try reading directly from sysfs first (more reliable if available)
            cpu_sensor_name = self.config['temperature']['cpu_sensor']
            for hwmon in Path('/sys/class/hwmon').glob('hwmon*'):
                 try:
                     name_path = hwmon / 'name'
                     if name_path.exists() and name_path.read_text().strip() == cpu_sensor_name:
                         temp_input_path = hwmon / 'temp1_input' # Adjust if needed (e.g., temp2_input)
                         if temp_input_path.exists():
                             return int(temp_input_path.read_text().strip()) / 1000.0
                 except (IOError, ValueError):
                     continue # Ignore errors reading from specific hwmon entries

            # Fallback to 'sensors' command
            result = subprocess.run(['sensors', cpu_sensor_name], capture_output=True, text=True, check=False)
            if result.returncode == 0:
                for line in result.stdout.split('\n'):
                     # Look for common patterns like Tctl, Tdie, Core 0, Package id 0
                     if any(idfr in line for idfr in ['Tctl:', 'Tdie:', 'Core 0:', 'Package id 0:']):
                         parts = line.split()
                         for part in parts:
                             if part.startswith('+') and part.endswith('°C'):
                                 return float(part.strip('+°C'))
            print(f"{Fore.YELLOW}Could not determine CPU temperature using sysfs or 'sensors' for {cpu_sensor_name}.{Style.RESET_ALL}")
            return 0.0 # Return 0.0 if sensor not found or unreadable

        except FileNotFoundError:
             print(f"{Fore.YELLOW}'sensors' command not found. Cannot get CPU temperature.{Style.RESET_ALL}")
             return 0.0
        except Exception as e:
            print(f"{Fore.YELLOW}CPU temp error: {e}{Style.RESET_ALL}")
            return 0.0

    def get_gpu_temperature(self) -> float:
        try:
            # Try direct hwmon read (often card0/device/hwmon/hwmonX/temp1_input for AMD/Nvidia)
            hwmon_paths = list(Path('/sys/class/drm/card0/device/hwmon').glob('hwmon*/temp1_input'))
            if not hwmon_paths: # Try alternative common path for some GPUs
                 hwmon_paths = list(Path('/sys/class/hwmon').glob('hwmon*/temp1_input'))

            for temp_path in hwmon_paths:
                 try:
                     # Check associated name file if exists, might help identify correct sensor
                     name_path = temp_path.parent / 'name'
                     # Add specific checks if needed, e.g., name_path.read_text().strip() == 'amdgpu'
                     if temp_path.exists():
                         return int(temp_path.read_text().strip()) / 1000.0
                 except (IOError, ValueError):
                     continue # Ignore errors with specific hwmon paths

            # Fallback to nvidia-smi if hwmon fails
            try:
                result_nvidia = subprocess.run(['nvidia-smi', '--query-gpu=temperature.gpu', '--format=csv,noheader'], capture_output=True, text=True, check=True)
                return float(result_nvidia.stdout.strip())
            except (FileNotFoundError, subprocess.CalledProcessError, ValueError):
                pass # nvidia-smi not found or failed, proceed to amdgpu attempt

             # Fallback to sensors command for amdgpu if others fail
            try:
                result_amd = subprocess.run(['sensors', 'amdgpu-pci*'], capture_output=True, text=True, check=False) # Adjust pattern if needed
                if result_amd.returncode == 0:
                    for line in result_amd.stdout.split('\n'):
                        if 'edge:' in line.lower(): # Common identifier for AMD GPU temp
                             parts = line.split()
                             for part in parts:
                                 if part.startswith('+') and part.endswith('°C'):
                                     return float(part.strip('+°C'))
            except FileNotFoundError:
                 pass # sensors command not found
            except Exception as e:
                 print(f"{Fore.YELLOW}Error during 'sensors amdgpu' call: {e}{Style.RESET_ALL}")


            print(f"{Fore.YELLOW}Could not determine GPU temperature via sysfs, nvidia-smi, or sensors.{Style.RESET_ALL}")
        except Exception as e:
            print(f"{Fore.YELLOW}GPU temp error: {e}{Style.RESET_ALL}")
        return 0.0

    def monitor_temperatures(self) -> Dict[str, List[float]]:
        temps: Dict[str, List[float]] = {'cpu': []}
        gpu_available = False
        try:
            # Warmup read and check availability
            self.get_cpu_temperature() # Initial read for CPU
            gpu_temp = self.get_gpu_temperature()
            if gpu_temp > 0.0: # Use > 0 as indicator that sensor works
                temps['gpu'] = []
                gpu_available = True
            else:
                print(f"{Fore.YELLOW}Skipping GPU temperature monitoring (sensor unavailable or read 0.0).{Style.RESET_ALL}")

            # Actual monitoring loop
            for _ in range(self.config['temperature']['max_temp_samples']):
                temps['cpu'].append(self.get_cpu_temperature())
                if gpu_available:
                    temps['gpu'].append(self.get_gpu_temperature())
                time.sleep(self.config['temperature']['sample_interval'])
        except Exception as e:
            print(f"{Fore.YELLOW}Temperature monitoring failed during loop: {e}{Style.RESET_ALL}")

        return temps


    def run_benchmark(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Runs llama-bench and collects performance and temperature metrics."""
        pre_temps = self.monitor_temperatures()
        bench_result = self._run_bench_command(params)
        post_temps = self.monitor_temperatures()

        # Calculate max temps safely
        max_cpu_temp = 0.0
        if pre_temps.get('cpu') or post_temps.get('cpu'):
             all_cpu_temps = pre_temps.get('cpu', []) + post_temps.get('cpu', [])
             if all_cpu_temps:
                 max_cpu_temp = max(all_cpu_temps)

        max_gpu_temp = 0.0
        if pre_temps.get('gpu') or post_temps.get('gpu'):
            all_gpu_temps = pre_temps.get('gpu', []) + post_temps.get('gpu', [])
            if all_gpu_temps:
                 max_gpu_temp = max(all_gpu_temps)

        # Combine benchmark results with temperature readings
        final_result = {
            **bench_result, # Include tg_speed, memory_usage, latency, raw_output or error
            'cpu_temp_pre_max': max(pre_temps['cpu']) if pre_temps.get('cpu') else 0.0,
            'gpu_temp_pre_max': max(pre_temps['gpu']) if pre_temps.get('gpu') else 0.0,
            'cpu_temp_post_max': max(post_temps['cpu']) if post_temps.get('cpu') else 0.0,
            'gpu_temp_post_max': max(post_temps['gpu']) if post_temps.get('gpu') else 0.0,
            'cpu_temp_run_max': max_cpu_temp,
            'gpu_temp_run_max': max_gpu_temp,
        }
        return final_result

    def _run_bench_command(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Executes the llama-bench command and returns parsed JSON or error."""
        command = [
            self.config['llama_bench_path'],
            '-m', self.config['model_path'],
            '-t', str(params['threads']),
            '-b', str(params['batch_size']),
            '-ub', str(self.config['ubatch_size']),
            '-ngl', str(params['gpu_layers']),
            '-sm', str(params['split_mode']),
            '-fa', str(params['flash_attn']),
            '-mmp', str(self.config['mmap']),
            '-ctk', str(params['cache_type']),
            '-ctv', str(params['cache_type']),
            '-r', str(self.config['repetitions']),
            '--poll', str(self.config['poll']),
            '-ts', str(params['tensor_split']),
            '--numa', self.config['numa_strategy'],
            '-C', self.config['cpu_mask'],
            '-mg', str(self.config['main_gpu']),
            '-o', 'json' # Request JSON output
        ]

        for attempt in range(self.config['max_retries'] + 1):
            try:
                print(f"{Style.DIM}Running command: {' '.join(command)}{Style.RESET_ALL}") # Log command being run
                result = subprocess.run(
                    command,
                    check=True, # Raises CalledProcessError on non-zero exit
                    timeout=self.config['timeout'],
                    capture_output=True,
                    text=True,
                    encoding='utf-8' # Explicitly set encoding
                )
                # Check if stdout is empty before parsing
                if not result.stdout.strip():
                    print(f"{Fore.YELLOW}Warning: llama-bench produced empty output for params: {params}{Style.RESET_ALL}")
                    # You might want to treat this as an error or a specific result
                    return {"error": "Empty output from llama-bench", "tg_speed": 0.0, "memory_usage": 0, "latency": 0.0}

                # Parse the JSON output - parse_json_output is now correctly a method
                return self.parse_json_output(result.stdout)

            except subprocess.CalledProcessError as e:
                print(f"{Fore.RED}Command failed: {' '.join(command)}{Style.RESET_ALL}")
                print(f"{Fore.RED}Return code: {e.returncode}{Style.RESET_ALL}")
                print(f"{Fore.RED}Stderr: {e.stderr}{Style.RESET_ALL}")
                if attempt == self.config['max_retries']:
                    return {"error": f"Command failed after retries: {e}", "stderr": e.stderr}
                print(f"{Fore.YELLOW}Retrying ({attempt + 1}/{self.config['max_retries']})...{Style.RESET_ALL}")
                time.sleep(2 ** attempt) # Exponential backoff

            except subprocess.TimeoutExpired:
                print(f"{Fore.YELLOW}Timeout expired ({self.config['timeout']}s) for command: {' '.join(command)}{Style.RESET_ALL}")
                return {"error": "Timeout"}

            except FileNotFoundError:
                 self.error_exit(f"llama-bench command not found at '{self.config['llama_bench_path']}'. Please check the path.")

            except Exception as e: # Catch other potential errors during subprocess run
                print(f"{Fore.RED}An unexpected error occurred running the benchmark: {e}{Style.RESET_ALL}")
                if attempt == self.config['max_retries']:
                    return {"error": f"Unexpected error after retries: {e}"}
                time.sleep(2 ** attempt)

        return {"error": "Max retries exceeded without success"} # Should only be reached if all retries fail

    # *** CORRECTED INDENTATION ***
    def parse_json_output(self, output: str) -> Dict[str, Any]:
        """
        Parses benchmark output JSON with error handling for different formats.
        Handles both list and dictionary structures, extracts performance metrics.
        """
        try:
            # Clean potential non-JSON prefixes/suffixes if necessary (though '-o json' should prevent this)
            # Sometimes logs might interfere, find the first '{' or '['
            json_start = -1
            first_brace = output.find('{')
            first_bracket = output.find('[')

            if first_brace != -1 and (first_bracket == -1 or first_brace < first_bracket):
                json_start = first_brace
            elif first_bracket != -1:
                json_start = first_bracket

            if json_start == -1:
                 print(f"{Fore.YELLOW}Could not find start of JSON ('{{' or '[') in output:\n---\n{output}\n---{Style.RESET_ALL}")
                 return {'error': 'No JSON object/array found in output', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}

            cleaned_output = output[json_start:]
            data = json.loads(cleaned_output)

            tg_speed = 0.0
            memory_usage = 0
            latency = 0.0

            # Handle list output format (e.g., multiple runs summarized)
            # Take the first entry assuming it's representative or the main result
            if isinstance(data, list):
                if len(data) > 0 and isinstance(data[0], dict):
                    first_item = data[0]
                    tg_speed = first_item.get('tokens_per_second', first_item.get('tg_speed', 0.0)) # Check both common keys
                    memory_usage = first_item.get('memory_usage', 0) # Assuming this key exists
                    latency = first_item.get('latency', 0.0) # Assuming this key exists
                else:
                     print(f"{Fore.YELLOW}Warning: JSON output is a list, but first item is not a dictionary or list is empty.{Style.RESET_ALL}")

            # Handle dictionary output format (single run result)
            elif isinstance(data, dict):
                tg_speed = data.get('tokens_per_second', data.get('tg_speed', 0.0)) # Check both common keys
                memory_usage = data.get('memory_usage', 0)
                latency = data.get('latency', 0.0)
            else:
                 print(f"{Fore.YELLOW}Warning: Parsed JSON is neither a list nor a dictionary: {type(data)}{Style.RESET_ALL}")


            # Ensure types are correct before returning
            try:
                tg_speed = float(tg_speed) if tg_speed is not None else 0.0
            except (ValueError, TypeError):
                print(f"{Fore.YELLOW}Warning: Could not convert tg_speed '{tg_speed}' to float. Using 0.0.{Style.RESET_ALL}")
                tg_speed = 0.0
            try:
                memory_usage = int(memory_usage) if memory_usage is not None else 0
            except (ValueError, TypeError):
                 print(f"{Fore.YELLOW}Warning: Could not convert memory_usage '{memory_usage}' to int. Using 0.{Style.RESET_ALL}")
                 memory_usage = 0
            try:
                latency = float(latency) if latency is not None else 0.0
            except (ValueError, TypeError):
                print(f"{Fore.YELLOW}Warning: Could not convert latency '{latency}' to float. Using 0.0.{Style.RESET_ALL}")
                latency = 0.0


            return {
                'tg_speed': tg_speed,
                'memory_usage': memory_usage,
                'latency': latency,
                'raw_output': data # Preserve original parsed data
            }

        except json.JSONDecodeError as json_err:
            print(f"{Fore.YELLOW}JSON decode error: {json_err} for output:\n---\n{output}\n---{Style.RESET_ALL}")
            # Return error but also default numeric values to avoid breaking calculations later
            return {'error': f'JSON decode error: {str(json_err)}', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}

        except Exception as e:
            # Catch-all for unexpected errors during parsing
            print(f"{Fore.YELLOW}Unexpected parsing error: {e} for output:\n---\n{output}\n---{Style.RESET_ALL}")
            return {'error': f'Unexpected parsing error: {str(e)}', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}


    def save_results(self):
        """Saves the current results list to the JSON file."""
        try:
            with open(self.config['output_json'], 'w') as f:
                json.dump(self.results, f, indent=2)
        except IOError as e:
            print(f"{Fore.RED}Save failed: Could not write to {self.config['output_json']}. Error: {e}{Style.RESET_ALL}")
        except Exception as e: # Catch other potential errors like serialization issues
            print(f"{Fore.RED}Save failed: An unexpected error occurred. Error: {e}{Style.RESET_ALL}")


    def print_summary(self):
        """Prints a summary of the best found configuration."""
        print(f"\n{Fore.GREEN}=== Benchmark Complete ===")
        if not self.best_params or self.best_params.get('tg_speed', 0.0) == 0.0:
             print(f"{Fore.YELLOW}No successful runs recorded or best speed is 0. Cannot show best configuration.{Style.RESET_ALL}")
             # Optionally print the last few results or errors if needed
             # print("Last few results:", self.results[-5:])
             return

        print(f"{Fore.CYAN}Best Configuration Found:")
        print("---------------------------")
        # Prioritize printing core performance metrics first
        print(f"{'Token Speed (tg_speed)':<25}: {Fore.GREEN}{self.best_params.get('tg_speed', 0.0):.2f} t/s{Style.RESET_ALL}")
        if 'latency' in self.best_params:
             print(f"{'Latency':<25}: {Fore.BLUE}{self.best_params['latency']:.2f} ms{Style.RESET_ALL}") # Assuming ms
        if 'memory_usage' in self.best_params:
             # Add formatting for memory if it's in bytes
             mem_usage = self.best_params['memory_usage']
             if isinstance(mem_usage, (int, float)) and mem_usage > 0:
                  mem_gb = mem_usage / (1024**3)
                  print(f"{'Memory Usage':<25}: {Fore.BLUE}{mem_gb:.2f} GB ({mem_usage} bytes){Style.RESET_ALL}")
             else:
                  print(f"{'Memory Usage':<25}: {Fore.BLUE}{mem_usage}{Style.RESET_ALL}")


        # Print the parameters that achieved this performance
        print("\nParameters:")
        for key, value in self.best_params.items():
            # Skip internal/performance keys already printed or raw data
            if key in ['tg_speed', 'memory_usage', 'latency', 'raw_output', 'error'] or key.startswith(('cpu_temp', 'gpu_temp')):
                continue
            print(f"  {key:<23}: {Fore.BLUE}{value}{Style.RESET_ALL}")

        # Print temperature information
        print("\nTemperatures During Best Run:")
        temp_keys = [k for k in self.best_params if k.startswith(('cpu_temp', 'gpu_temp'))]
        if temp_keys:
            for key in sorted(temp_keys):
                 temp_value = self.best_params[key]
                 # Format label nicely
                 label = key.replace('_', ' ').replace(' temp ', ' Temp ').replace('pre max', 'Pre (Max)').replace('post max', 'Post (Max)').replace('run max', 'Run (Max)').title()
                 print(f"  {label:<23}: {Fore.BLUE}{temp_value:.1f}°C{Style.RESET_ALL}")
        else:
             print(f"  {Fore.YELLOW}Temperature data not available for the best run.{Style.RESET_ALL}")

        print("---------------------------")
        print(f"Full results saved to: {self.config['output_json']}")


    def run(self):
        """Executes the full benchmarking process: stepwise, combination, and deep search."""
        
        # --- Phase 1: Stepwise Optimization ---
        print(f"{Fore.GREEN}=== Phase 1: Stepwise Optimization ===")
        # Calculate total steps accurately based on parameters list
        total_stepwise_steps = sum(len(self.config['parameters'][param]) for param in self.config['optimization_order'])
        
        current_best_for_stepwise = self.best_params.copy() # Start with loaded best/initial

        with tqdm(total=total_stepwise_steps, desc="Stepwise", unit="run") as pbar:
            for param_name in self.config['optimization_order']:
                param_values = self.config['parameters'][param_name]
                # Store the best speed found *within this parameter's loop*
                best_speed_for_this_param = -1.0 
                best_value_for_this_param = current_best_for_stepwise[param_name]

                for value in param_values:
                    # Create candidate based on the best *so far* from previous params
                    candidate_params = current_best_for_stepwise.copy()
                    candidate_params[param_name] = value

                    # Check if this exact combination (only checking optimized params) exists
                    lookup_key = {p: candidate_params[p] for p in self.config['optimization_order'] if p in candidate_params}
                    
                    existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                    result_data = {}
                    if existing_result and 'error' not in existing_result:
                         print(f"{Style.DIM}Using cached result for {param_name}={value}{Style.RESET_ALL}")
                         result_data = existing_result
                    elif existing_result and 'error' in existing_result:
                         print(f"{Style.DIM}Cached result for {param_name}={value} has error, rerunning...{Style.RESET_ALL}")
                         # Optionally remove the errored entry?
                         # self.results.remove(existing_result)
                         result_data = self.run_benchmark(candidate_params)
                         entry_to_save = {**candidate_params, **result_data}
                         self.results.append(entry_to_save)
                         self.save_results()
                    else:
                        print(f"Running test for {param_name}={value}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results() # Save after each run

                    current_speed = result_data.get('tg_speed', 0.0)

                    # Update overall best if this run is better
                    if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                        self.best_params = {**candidate_params, **result_data} # Update global best
                        print(f"\n{Fore.GREEN}New Overall Best: {param_name}={value}, Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                        # Also update the baseline for the *next* parameter optimization step
                        current_best_for_stepwise = self.best_params.copy()
                        best_speed_for_this_param = current_speed # Update best for this param loop
                        best_value_for_this_param = value

                    # Track results per parameter value for combination phase
                    if 'error' not in result_data:
                         self.param_results[param_name].append((value, current_speed))
                    else:
                         # Log error or handle as needed, maybe append with 0 speed
                         self.param_results[param_name].append((value, 0.0))
                         print(f"{Fore.YELLOW}Error during run for {param_name}={value}: {result_data.get('error')}{Style.RESET_ALL}")


                    pbar.update(1)
                
                # After testing all values for a parameter, ensure the stepwise base uses the best value found for *that* parameter run
                current_best_for_stepwise[param_name] = best_value_for_this_param


        # --- Phase 2: Combination Testing ---
        print(f"\n{Fore.GREEN}=== Phase 2: Combination Testing (Top {self.config['combination_top_n']}) ===")
        combination_candidates = []
        valid_param_results_count = 0
        for param_name in self.config['optimization_order']:
             # Sort by speed (descending), filter out errors/zero speed if desired
             # Here, we keep all results to ensure 'combination_top_n' candidates exist if possible
             sorted_values = sorted(self.param_results[param_name], key=lambda item: item[1], reverse=True)
             # Take the top N *values* (not the tuples)
             top_n_values = [item[0] for item in sorted_values[:self.config['combination_top_n']]]
             if top_n_values: # Only append if there are results for this param
                 combination_candidates.append(top_n_values)
                 valid_param_results_count += 1
             else:
                  # Handle case where a parameter had no successful runs
                  print(f"{Fore.YELLOW}Warning: No successful results found for parameter '{param_name}' during Stepwise phase. Using its best known value for combinations.{Style.RESET_ALL}")
                  combination_candidates.append([self.best_params.get(param_name, self.config['parameters'][param_name][0])])


        # Check if we have enough params with results to combine
        if valid_param_results_count < 2:
             print(f"{Fore.YELLOW}Skipping Combination phase: Not enough parameters ({valid_param_results_count}) yielded successful results in Phase 1.{Style.RESET_ALL}")
        else:
            # Generate all combinations of the top N values for each parameter
            all_combinations = list(product(*combination_candidates))
            print(f"Testing {len(all_combinations)} combinations...")

            with tqdm(total=len(all_combinations), desc="Combinations", unit="run") as pbar:
                for i, combo_values in enumerate(all_combinations):
                    # Create the parameter dictionary for this combination
                    candidate_params = {param_name: combo_values[i] for i, param_name in enumerate(self.config['optimization_order'])}

                    # Check cache first
                    lookup_key = candidate_params # In combination, we check all params
                    existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                    result_data = {}
                    if existing_result and 'error' not in existing_result:
                        print(f"{Style.DIM}Using cached result for combination {i+1}{Style.RESET_ALL}")
                        result_data = existing_result
                    elif existing_result and 'error' in existing_result:
                        print(f"{Style.DIM}Cached combination {i+1} has error, rerunning...{Style.RESET_ALL}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results()
                    else:
                        print(f"Running combination {i+1}/{len(all_combinations)}: {candidate_params}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results()

                    # Update overall best if this combination is better
                    current_speed = result_data.get('tg_speed', 0.0)
                    if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                        self.best_params = {**candidate_params, **result_data} # Update global best
                        print(f"\n{Fore.GREEN}New Overall Best (Combination): Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                        # Display the new best combo
                        print(f"{Fore.GREEN} -> Params: {candidate_params}{Style.RESET_ALL}")

                    pbar.update(1)


        # --- Phase 3: Deep Parameter Search ---
        print(f"\n{Fore.GREEN}=== Phase 3: Deep Parameter Search (Around Best) ===")
        deep_search_candidates_map = {}
        params_for_deep_search = ['threads', 'batch_size', 'gpu_layers'] # Only search numerical params

        for param_name in self.config['optimization_order']:
            best_value = self.best_params.get(param_name) # Get current best value

            # Only perform deep search on specified numerical parameters that have a valid best value
            if param_name in params_for_deep_search and isinstance(best_value, (int, float)):
                original_values = self.config['parameters'][param_name]
                min_original_val = min(original_values) if original_values else 0
                max_original_val = max(original_values) if original_values else best_value # Fallback if list empty

                step_config = self.config['deep_search_params']
                # Calculate step size, ensure it's at least 1 for integers
                step = best_value * step_config['step_percentage']
                if isinstance(best_value, int):
                     step = max(1, int(step)) # Ensure integer step is at least 1

                search_values = {best_value} # Start with the current best

                # Generate values below the best
                current = best_value
                for _ in range(step_config['steps_around']):
                    current -= step
                    # Ensure value stays within reasonable bounds (>= min original or 1) and type
                    val_to_add = int(round(current)) if isinstance(best_value, int) else float(current)
                    if val_to_add >= max(1, min_original_val): # Lower bound check
                         search_values.add(val_to_add)
                    else:
                         break # Stop going lower if bounds exceeded

                # Generate values above the best
                current = best_value
                for _ in range(step_config['steps_around']):
                    current += step
                     # Ensure value stays within reasonable bounds (<= max original) and type
                    val_to_add = int(round(current)) if isinstance(best_value, int) else float(current)
                    if val_to_add <= max_original_val: # Upper bound check
                        search_values.add(val_to_add)
                    else:
                        break # Stop going higher if bounds exceeded

                # Sort and limit the number of values if needed (optional)
                # sorted_values = sorted(list(search_values))
                # deep_search_candidates_map[param_name] = sorted_values[:step_config['min_values']] # 'min_values' is maybe misnamed? Should be max_values?

                # Use all generated valid values
                deep_search_candidates_map[param_name] = sorted(list(search_values))
                print(f"Deep search values for {param_name}: {deep_search_candidates_map[param_name]}")

            else:
                # For non-numeric or non-searched params, just use the best known value
                 deep_search_candidates_map[param_name] = [best_value] if best_value is not None else [self.config['parameters'][param_name][0]]


        # Generate combinations for deep search
        deep_search_combinations = list(product(*deep_search_candidates_map.values()))
        print(f"Testing {len(deep_search_combinations)} deep search combinations...")

        with tqdm(total=len(deep_search_combinations), desc="Deep Search", unit="run") as pbar:
            for i, combo_values in enumerate(deep_search_combinations):
                # Create param dict, ensuring correct types if needed (esp. for threads/batch/ngl)
                candidate_params = {}
                param_order = list(deep_search_candidates_map.keys()) # Get order matching the product
                for idx, param_name in enumerate(param_order):
                     val = combo_values[idx]
                     # Ensure integer types for specific params
                     if param_name in ['threads', 'batch_size', 'gpu_layers', 'flash_attn', 'mmap', 'repetitions', 'poll', 'main_gpu']:
                          candidate_params[param_name] = int(val) if val is not None else 0
                     else:
                          candidate_params[param_name] = val


                # Check cache
                lookup_key = candidate_params
                existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                result_data = {}
                if existing_result and 'error' not in existing_result:
                     print(f"{Style.DIM}Using cached result for deep search combination {i+1}{Style.RESET_ALL}")
                     result_data = existing_result
                elif existing_result and 'error' in existing_result:
                     print(f"{Style.DIM}Cached deep search combination {i+1} has error, rerunning...{Style.RESET_ALL}")
                     result_data = self.run_benchmark(candidate_params)
                     entry_to_save = {**candidate_params, **result_data}
                     self.results.append(entry_to_save)
                     self.save_results()
                else:
                    print(f"Running deep search {i+1}/{len(deep_search_combinations)}: {candidate_params}")
                    result_data = self.run_benchmark(candidate_params)
                    entry_to_save = {**candidate_params, **result_data}
                    self.results.append(entry_to_save)
                    self.save_results()

                # Update overall best
                current_speed = result_data.get('tg_speed', 0.0)
                if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                    self.best_params = {**candidate_params, **result_data}
                    print(f"\n{Fore.GREEN}New Overall Best (Deep Search): Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                    print(f"{Fore.GREEN} -> Params: {candidate_params}{Style.RESET_ALL}")

                pbar.update(1)

        # --- Final Summary ---
        self.print_summary()

# --- Main Execution ---
if __name__ == "__main__":
    try:
        runner = BenchmarkRunner(CONFIG)
        runner.run()
    except KeyboardInterrupt:
        print("\nBenchmark interrupted by user.")
        # Optionally save results even on interrupt
        # if 'runner' in locals() and runner.results:
        #     print("Saving partial results...")
        #     runner.save_results()
        sys.exit(1)
    except Exception as main_e:
         print(f"{Fore.RED}An unexpected error occurred in the main execution: {main_e}{Style.RESET_ALL}")
         import traceback
         traceback.print_exc()
         sys.exit(1)
         
         

@vitorcalvi
Copy link
Author

Best Model for Financial Decision-Making: mlx-community/Qwen2.5-Coder-14B-Instruct-4bit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment