Last active
January 27, 2025 22:13
-
-
Save CultriX-Github/83eabc6a051638bb5aa89f5ea7e568bc to your computer and use it in GitHub Desktop.
Tally-Multi-Vote Dataset Generation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import random | |
import logging | |
import re | |
import time | |
import json | |
import matplotlib | |
matplotlib.use('Agg') # Set the backend to 'Agg' before importing pyplot | |
import matplotlib.pyplot as plt | |
from typing import List, Dict, Tuple | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from datasets import Dataset | |
from dataclasses import dataclass, field | |
# ========================== | |
# Configuration Parameters | |
# ========================== | |
@dataclass | |
class Config: | |
# Litellm Proxy Configuration | |
LITELLM_API_ENDPOINT: str = "https://litellm.***" | |
LITELLM_API_KEY: str = os.getenv("LITELLM_API_KEY", "sk-***") # Fallback if not set via env var | |
# Model Configuration | |
RESPONSE_MODELS: List[str] = field(default_factory=lambda: [ | |
"groq-mixtral-8x7b-32768", | |
"gpt-4o", | |
"brocav9", | |
"wernickev3", | |
"brocav3", | |
"hyperionv3", | |
]) | |
EVALUATOR_MODELS: List[str] = field(default_factory=lambda: [ | |
"gpt-4o-mini", | |
"gpt-3.5-turbo-16k", | |
"groq-llama3-70b-8192", | |
]) | |
# How many prompts we want to process from the input file | |
NUM_EXAMPLES: int = 100 | |
# Logging Configuration | |
LOG_LEVEL: int = logging.INFO # Change to logging.DEBUG for more detailed logs | |
LOG_FILENAME: str = "leaderboard.log" | |
# Saving / Output | |
SAVE_INTERVAL: int = 5 # Save dataset + plot after every N prompts | |
OUTPUT_PATH: str = "dpo_preferences_dataset" | |
PLOT_PATH: str = "leaderboard_plots" # Directory to save plots | |
JSONL_OUTPUT_PATH: str = "final_output.jsonl" # The JSONL file to store progress for each prompt | |
config = Config() | |
# Set logging to file + console | |
logging.basicConfig( | |
level=config.LOG_LEVEL, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(config.LOG_FILENAME, mode='w', encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
HEADERS = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {config.LITELLM_API_KEY}", | |
} | |
# ========================== | |
# Helper Functions | |
# ========================== | |
def load_prompts_from_file(file_path: str) -> List[str]: | |
""" | |
Reads instruction prompts from a file (e.g. instructions_input.jsonl2). | |
Each line is assumed to be a string enclosed in quotes, like: | |
"Describe the smell of the beach in spring." | |
""" | |
prompts = [] | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
lines = f.readlines() | |
for line in lines: | |
line = line.strip() | |
# Strip leading/trailing quotes if present | |
if line.startswith('"') and line.endswith('"'): | |
line = line[1:-1] | |
if line: | |
prompts.append(line) | |
logging.info(f"Loaded {len(prompts)} prompts from '{file_path}'.") | |
except Exception as e: | |
logging.error(f"Could not load prompts from file '{file_path}': {e}") | |
return prompts | |
def send_request(model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 768) -> Tuple[str, str]: | |
""" | |
Sends a request to the LITELLM API endpoint using the given model and prompt. | |
Uses a special system prompt to limit the response to 512 tokens. | |
""" | |
payload = { | |
"model": model, | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are a helpful assistant. " | |
"Please answer the question to the best of your abilities. " | |
"You are limited to a response length of 512 tokens. " | |
"Do not exceed this hard limit." | |
) | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
"temperature": temperature, | |
"max_tokens": max_tokens, | |
} | |
try: | |
response = requests.post(config.LITELLM_API_ENDPOINT, headers=HEADERS, json=payload, timeout=300) | |
response.raise_for_status() | |
data = response.json() | |
return model, data['choices'][0]['message']['content'].strip() | |
except Exception as e: | |
logging.error(f"Request error for model '{model}': {e}") | |
return model, f"Error: {e}" | |
def send_request_with_retries( | |
model: str, | |
prompt: str, | |
temperature: float = 0.7, | |
max_tokens: int = 768, | |
retries: int = 5, | |
backoff_factor: int = 2 | |
) -> Tuple[str, str]: | |
""" | |
Sends a request with multiple retries in case of transient errors. | |
""" | |
for attempt in range(retries): | |
model_name, response = send_request(model, prompt, temperature, max_tokens) | |
if not response.startswith("Error:"): | |
return model_name, response | |
else: | |
logging.error(f"Attempt {attempt + 1} failed for model '{model}': {response}") | |
if attempt < retries - 1: | |
sleep_time = backoff_factor ** attempt | |
logging.info(f"Retrying in {sleep_time} seconds...") | |
time.sleep(sleep_time) | |
else: | |
logging.error(f"All {retries} attempts failed for model '{model}'.") | |
return model_name, response | |
return model, "Error: Unknown" | |
def parse_rankings(raw_ratings: str) -> Dict[str, int]: | |
""" | |
Parse lines of the format: | |
model_name: score | |
from the evaluator's response. Score must be an integer 0-100. | |
""" | |
pattern = re.compile(r"([\w\-\./]+):\s*(\d+)") | |
matches = pattern.findall(raw_ratings) | |
rankings = {} | |
for model, score_str in matches: | |
if model in config.RESPONSE_MODELS: | |
score_int = int(score_str) | |
if 0 <= score_int <= 100: | |
rankings[model] = score_int | |
# Log warning if we missed any model | |
parsed_models = set(rankings.keys()) | |
unparsed = set(config.RESPONSE_MODELS) - parsed_models | |
if unparsed: | |
logging.warning(f"These models were not parsed from rankings: {unparsed}") | |
return rankings | |
def rank_responses(evaluator_model: str, prompt: str, responses: Dict[str, str]) -> Tuple[str, Dict[str, int]]: | |
""" | |
Build an evaluation prompt with the user prompt and each model's response, and | |
ask the evaluator_model to provide a 0-100 ranking for each. | |
""" | |
evaluation_prompt = ( | |
"You are an evaluator tasked with ranking the following model responses to a user prompt " | |
"on a scale from 0 (horrible) to 100 (perfect). " | |
"Consider relevance, correctness, and fluency. **Please evaluate all the responses below without omitting any.**\n\n" | |
f"**User Prompt:** {prompt}\n\n" | |
) | |
# Append each model response | |
for idx, (model_name, response) in enumerate(responses.items(), 1): | |
evaluation_prompt += f"**Response {idx} from {model_name}:**\n{response}\n\n" | |
# Instruction for the evaluator to only provide the lines "model_name: score" | |
evaluation_prompt += ( | |
"Please provide the scores in the exact following format without any additional text:\n" | |
"**model_name: score, model_name: score, ...**\n" | |
"Ensure that each model listed above is evaluated." | |
) | |
# Send the request | |
model, raw_ratings = send_request_with_retries( | |
evaluator_model, evaluation_prompt, temperature=0, max_tokens=4096 | |
) | |
rankings = parse_rankings(raw_ratings) | |
if rankings: | |
logging.info(f"Evaluator '{evaluator_model}' returned rankings: {rankings}") | |
else: | |
logging.warning( | |
f"No valid rankings parsed from evaluator '{evaluator_model}'. " | |
f"Raw response:\n{raw_ratings}" | |
) | |
return model, rankings | |
def summarize_rankings( | |
all_rankings: Dict[str, Dict[str, int]], | |
responses: Dict[str, str] | |
) -> Tuple[str, str, float, str, float, str, float]: | |
""" | |
Summarize the rankings from all evaluators to find: | |
- The best model & score | |
- The worst model & score | |
- The chosen (best) response | |
- The rejected (worst) response | |
- The average score across ALL models for this instruction | |
Returns: | |
chosen, rejected, best_score, best_model, worst_score, worst_model, average_score_across_models | |
""" | |
# Aggregate scores per model across all evaluators | |
aggregated_scores = {} | |
for evaluator, rankings in all_rankings.items(): | |
for model, score in rankings.items(): | |
aggregated_scores.setdefault(model, []).append(score) | |
# Compute average scores for each model (over all evaluators) | |
average_scores = {} | |
for model, scores in aggregated_scores.items(): | |
average_scores[model] = sum(scores) / len(scores) | |
if not average_scores: | |
logging.error("No average scores available for summarization.") | |
return "", "", 0.0, "", 0.0, "", 0.0 | |
# Identify best and worst | |
best_model = max(average_scores, key=average_scores.get) | |
worst_model = min(average_scores, key=average_scores.get) | |
best_score = average_scores[best_model] | |
worst_score = average_scores[worst_model] | |
# The chosen (best) and rejected (worst) responses | |
chosen = responses.get(best_model, "Error: no response found") | |
rejected = responses.get(worst_model, "Error: no response found") | |
# Average score across all models for this prompt | |
global_average_for_prompt = sum(average_scores.values()) / len(average_scores) | |
logging.info(f"Average Scores: {average_scores}") | |
logging.info(f"Best: {best_model} -> {best_score}") | |
logging.info(f"Worst: {worst_model} -> {worst_score}") | |
logging.info(f"Average across all models for this prompt: {global_average_for_prompt}") | |
return ( | |
chosen, | |
rejected, | |
best_score, | |
best_model, | |
worst_score, | |
worst_model, | |
global_average_for_prompt | |
) | |
def process_prompt(prompt: str, prompt_index: int, total_prompts: int) -> Dict[str, any]: | |
""" | |
For a single prompt: | |
1) Get responses from each response model. | |
2) Gather rankings from evaluator models. | |
3) Summarize the results to find best/worst. | |
4) Return a dictionary containing the final chosen and rejected responses, etc. | |
""" | |
logging.info(f"Processing Prompt {prompt_index}/{total_prompts}: {prompt}") | |
# Step 1: Generate responses asynchronously | |
responses = {} | |
with ThreadPoolExecutor(max_workers=len(config.RESPONSE_MODELS)) as executor: | |
future_to_model = { | |
executor.submit( | |
send_request_with_retries, | |
model, | |
prompt, | |
max_tokens=768 | |
): model | |
for model in config.RESPONSE_MODELS | |
} | |
for future in as_completed(future_to_model): | |
model = future_to_model[future] | |
try: | |
model_name, response = future.result() | |
responses[model_name] = response | |
except Exception as e: | |
logging.error(f"Error retrieving response from model '{model}': {e}") | |
responses[model] = f"Error: {e}" | |
# Handle any missing models | |
missing_models = set(config.RESPONSE_MODELS) - set(responses.keys()) | |
for m in missing_models: | |
logging.warning(f"No response from model '{m}'. Assigning default error message.") | |
responses[m] = "Error: No response" | |
# Step 2: Collect rankings from evaluator models | |
all_rankings = {} | |
with ThreadPoolExecutor(max_workers=len(config.EVALUATOR_MODELS)) as executor: | |
future_to_ranker = { | |
executor.submit(rank_responses, ranker_model, prompt, responses): ranker_model | |
for ranker_model in config.EVALUATOR_MODELS | |
} | |
for future in as_completed(future_to_ranker): | |
ranker = future_to_ranker[future] | |
try: | |
evaluator_name, rankings = future.result() | |
if rankings: | |
all_rankings[evaluator_name] = rankings | |
else: | |
# If no valid rankings, assign 0 to each model | |
logging.warning(f"No rankings from evaluator '{evaluator_name}'. Assigning zeroes.") | |
all_rankings[evaluator_name] = {m: 0 for m in config.RESPONSE_MODELS} | |
except Exception as e: | |
logging.error(f"Error retrieving rankings from evaluator '{ranker}': {e}") | |
all_rankings[ranker] = {m: 0 for m in config.RESPONSE_MODELS} | |
# Step 3: If any model responded with error, force their score to 0 | |
for model_name, response in responses.items(): | |
if response.startswith("Error:"): | |
logging.info(f"Model '{model_name}' had an error response; forcing score = 0.") | |
for e in all_rankings: | |
all_rankings[e][model_name] = 0 | |
# Summarize to find best/worst | |
chosen, rejected, best_score, best_model, worst_score, worst_model, avg_score_prompt = summarize_rankings( | |
all_rankings, responses | |
) | |
return { | |
"instruction": prompt, | |
"chosen": chosen, | |
"rejected": rejected, | |
"best_score": best_score, | |
"best_model": best_model, | |
"worst_score": worst_score, | |
"worst_model": worst_model, | |
"average_score_for_this_instruction": avg_score_prompt, | |
"all_rankings": all_rankings | |
} | |
def plot_line_graph( | |
model_score_history: Dict[str, List[float]], | |
prompt_idx: int | |
): | |
""" | |
Generate a line graph that tracks each model's *accumulated* score | |
over the prompts so far. | |
model_score_history: dictionary mapping model -> list of cumulative scores | |
e.g. {"modelA": [10, 20, 27], "modelB": [8, 16, 25], ...} | |
prompt_idx: integer for how many prompts have been processed | |
""" | |
os.makedirs(config.PLOT_PATH, exist_ok=True) | |
plt.figure(figsize=(10, 6)) | |
# For each model, plot the line of cumulative scores | |
for model_name, score_list in model_score_history.items(): | |
x_values = list(range(1, len(score_list) + 1)) | |
plt.plot(x_values, score_list, label=model_name, marker='o') | |
plt.xlabel("Prompt Number") | |
plt.ylabel("Total Accumulated Score") | |
plt.title(f"Leaderboard after {prompt_idx} Prompts (Line Graph)") | |
plt.legend() | |
plt.grid(True) | |
plot_filename = os.path.join(config.PLOT_PATH, f"leaderboard_linechart_{prompt_idx}.png") | |
try: | |
plt.savefig(plot_filename) | |
logging.info(f"Line chart saved to '{plot_filename}'.") | |
print(f"Line chart saved to '{plot_filename}'.\n") | |
except Exception as e: | |
logging.error(f"Failed to save line chart at Prompt {prompt_idx}: {e}") | |
finally: | |
plt.close() | |
# ========================== | |
# Main Execution Function | |
# ========================== | |
def main(): | |
# 1) Load prompts from file | |
input_file = "instructions_input.jsonl2" | |
prompts = load_prompts_from_file(input_file) | |
if not prompts: | |
logging.error("No prompts loaded. Exiting.") | |
return | |
# 2) Select random prompts if needed | |
num_prompts = min(config.NUM_EXAMPLES, len(prompts)) | |
sampled_prompts = random.sample(prompts, num_prompts) | |
logging.info(f"Selected {num_prompts} random prompts for processing.") | |
# Trackers for building a global scoreboard across prompts (cumulative) | |
total_scores = {m: 0.0 for m in config.RESPONSE_MODELS} | |
# We'll keep track of the score progression: model -> [score after prompt1, prompt2, ...] | |
model_score_history = {m: [] for m in config.RESPONSE_MODELS} | |
# We'll also store the processed data for final HF dataset | |
processed_data = [] | |
# If the JSONL file exists and we want to start fresh, you might delete or rename it. | |
# For safety, let's just proceed in append mode to not overwrite prior progress. | |
# If you prefer to start from scratch every run, you can do: | |
# if os.path.exists(config.JSONL_OUTPUT_PATH): | |
# os.remove(config.JSONL_OUTPUT_PATH) | |
for idx, prompt in enumerate(sampled_prompts, 1): | |
logging.info(f"\n=== Processing Prompt {idx}/{num_prompts} ===") | |
result = process_prompt(prompt, idx, num_prompts) | |
processed_data.append(result) | |
# Unpack the "all_rankings" to see how many points each model got for this prompt | |
all_rankings = result["all_rankings"] | |
# Combine the scores from all evaluators for each model | |
prompt_combined_scores = {m: 0 for m in config.RESPONSE_MODELS} | |
for evaluator, ranking_dict in all_rankings.items(): | |
for m, s in ranking_dict.items(): | |
prompt_combined_scores[m] += s | |
# Update the total (cumulative) scoreboard | |
for m in config.RESPONSE_MODELS: | |
total_scores[m] += prompt_combined_scores[m] | |
# Append the new total to the model's history | |
model_score_history[m].append(total_scores[m]) | |
# Log these combined scores | |
logging.info(f"Total Combined Scores for Prompt {idx}: {prompt_combined_scores}") | |
print(f"\n=== Total Combined Scores for Prompt {idx} ===") | |
for m, sc in prompt_combined_scores.items(): | |
print(f"{m}: {sc}") | |
print("========================================\n") | |
# Also update our JSONL file right here, after processing the prompt | |
# This ensures we don't lose progress if the script crashes | |
try: | |
# We'll write a single line to the JSONL, containing the relevant fields | |
jsonl_entry = { | |
"instruction": result["instruction"], | |
"chosen": result["chosen"], | |
"rejected": result["rejected"], | |
"best_score": result["best_score"], | |
"worst_score": result["worst_score"], | |
"best_model": result["best_model"], | |
"worst_model": result["worst_model"], | |
"average_score": result["average_score_for_this_instruction"], | |
} | |
with open(config.JSONL_OUTPUT_PATH, 'a', encoding='utf-8') as f: | |
f.write(json.dumps(jsonl_entry, ensure_ascii=False) + "\n") | |
except Exception as e: | |
logging.error(f"Failed to append to JSONL file: {e}") | |
# Print a quick textual leaderboard of total_scores so far | |
sorted_leaderboard = sorted(total_scores.items(), key=lambda x: x[1], reverse=True) | |
logging.info("Current Cumulative Leaderboard after this prompt:") | |
print("=== Cumulative Leaderboard (Total Scores) ===") | |
print(f"{'Rank':<5} {'Model':<40} {'Total Score':>12}") | |
print("-" * 60) | |
for rank, (model_name, score_val) in enumerate(sorted_leaderboard, 1): | |
print(f"{rank:<5} {model_name:<40} {score_val:>12.2f}") | |
logging.info(f" {rank}. {model_name} -> {score_val:.2f}") | |
print("=============================================\n") | |
# Every SAVE_INTERVAL prompts, we save the HF dataset and generate a line plot | |
if idx % config.SAVE_INTERVAL == 0: | |
try: | |
# Prepare huggingface dataset (remove "all_rankings" from each entry) | |
dataset_list = [] | |
for entry in processed_data: | |
entry_copy = { | |
k: v for k, v in entry.items() | |
if k != "all_rankings" | |
} | |
dataset_list.append(entry_copy) | |
hf_dataset = Dataset.from_list(dataset_list) | |
hf_dataset.save_to_disk(config.OUTPUT_PATH) | |
logging.info(f"Intermediate dataset saved to '{config.OUTPUT_PATH}' after prompt {idx}.") | |
print(f"Dataset saved after processing {idx} prompts.\n") | |
# Now plot the line chart of cumulative scores so far | |
plot_line_graph(model_score_history, idx) | |
except Exception as e: | |
logging.error(f"Failed to save dataset or plot at prompt {idx}: {e}") | |
# Final saving after all prompts | |
try: | |
dataset_list = [] | |
for entry in processed_data: | |
entry_copy = { | |
k: v for k, v in entry.items() | |
if k != "all_rankings" | |
} | |
dataset_list.append(entry_copy) | |
final_dataset = Dataset.from_list(dataset_list) | |
final_dataset.save_to_disk(config.OUTPUT_PATH) | |
logging.info(f"Final dataset saved to '{config.OUTPUT_PATH}'.") | |
# If we didn't generate a line graph in the last iteration, do it now | |
if num_prompts % config.SAVE_INTERVAL != 0: | |
plot_line_graph(model_score_history, num_prompts) | |
except Exception as e: | |
logging.error(f"Failed to create or save the final output dataset or plot: {e}") | |
logging.info("DPO dataset generation completed.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment