Skip to content

Instantly share code, notes, and snippets.

@bbengfort
Created October 6, 2024 20:42
Show Gist options
  • Save bbengfort/7c00313ed565200d9b9112870b13a4f3 to your computer and use it in GitHub Desktop.
Save bbengfort/7c00313ed565200d9b9112870b13a4f3 to your computer and use it in GitHub Desktop.
Processes original parlance data and outputs some analyses
#!/usr/bin/env python3
import os
import re
import csv
import json
from datetime import datetime, timedelta
BASE_DIR = os.path.dirname(__file__)
EVALUATIONS = os.path.join(BASE_DIR, "original", "evaluations.jsonl")
GEMINI = os.path.join(BASE_DIR, "original", "gemini.jsonl")
LLMS = os.path.join(BASE_DIR, "original", "llms.jsonl")
PROMPTS = os.path.join(BASE_DIR, "original", "prompts.jsonl")
RESPONSES = os.path.join(BASE_DIR, "original", "responses.jsonl")
LABELS = os.path.join(BASE_DIR, "original", "labels.csv")
CYBERJUDGE = os.path.join(BASE_DIR, "fixtures", "cyberjudge.jsonl")
CRITERIA = os.path.join(BASE_DIR, "fixtures", "criteria.jsonl")
OGEMINI = os.path.join(BASE_DIR, "fixtures", "gemini.jsonl")
OLLAMA = os.path.join(BASE_DIR, "fixtures", "llama70b.jsonl")
OMISTRAL = os.path.join(BASE_DIR, "fixtures", "mistral7b.jsonl")
SENSITIVE = re.compile(r'frontier', re.I)
DATEFMT = "%Y-%m-%dT%H:%M:%S+05:00"
def read_jsonl(path):
with open(path, 'r') as f:
for line in f.readlines():
yield json.loads(line)
def read_csv(path):
with open(path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
yield row
def write_jsonl(path, rows):
with open(path, 'w') as f:
for row in rows:
f.write(json.dumps(row)+"\n")
class LabelMatching(object):
regex = re.compile(
r"###\s*Response:?\s*[\n\r\t]*(.*)[\n\r\t]*###", re.MULTILINE | re.DOTALL
)
def __init__(self, prompts, labels_path=LABELS):
# Create prompt to label mapping
self.annotate = {p['id']: None for p in prompts}
# Analyze prompt data
targets = set([self.preprocess(p['prompt']) for p in prompts])
print(f"{len(targets)} uniques in {len(prompts)} prompts")
# Analyze label data
labels = {}
n_labels = 0
for label in read_csv(labels_path):
n_labels += 1
labels[self.preprocess(label["Prompt"])] = label['Answer']
print(f"{len(labels)} unique prompts in {n_labels} answer csv rows")
# Map prompts to labels
matches = 0
for prompt in prompts:
target = self.preprocess(prompt["prompt"])
if target in labels:
matches += 1
self.annotate[prompt["id"]] = labels[target]
print(f"was able to match {matches} prompts with labels")
def label(self, prompt_id):
return self.annotate[prompt_id]
def preprocess(self, text):
response = self.regex.findall(text)
if response and len(response) == 1:
text = response[0]
text = text.strip().replace("\n", " ").replace("\t", "")
return text
def process_evaluations_prompts():
# Step 1: Split the 2 evaluations
evals = list(read_jsonl(EVALUATIONS))
assert evals[0]['name'] == 'Criteria Generation'
assert evals[1]['name'] == 'Cyberjudge'
criteria_id = evals[0]['id']
cyberjudge_id = evals[1]['id']
# Step 2: Process prompts for each
criteria = []
cyberjudge = []
for row in read_jsonl(PROMPTS):
del row["expected_output"]
if row['evaluation'] == criteria_id:
row["expected_output_type"] = "text"
row["order"] = len(criteria)+1
criteria.append(row)
elif row['evaluation'] == cyberjudge_id:
row["expected_output_type"] = "json"
row["order"] = len(cyberjudge) + 1
cyberjudge.append(row)
else:
raise Exception("unknown evaluation")
# Step 3: Annotate Cyberjudge
labels = LabelMatching(cyberjudge)
for prompt in cyberjudge:
prompt["expected_label"] = labels.label(prompt["id"])
# Step 4: Write evaluations to disk
criteria.insert(0, evals[0])
write_jsonl(CRITERIA, criteria)
cyberjudge.insert(0, evals[1])
write_jsonl(CYBERJUDGE, cyberjudge)
return labels.annotate
def load_json(output):
output = output.strip()
output = output.removeprefix("```json").removesuffix("```").strip()
return json.loads(output)
def valid_json(output):
try:
load_json(output)
return True
except json.JSONDecodeError:
return False
def leaks_sensitive(output):
if SENSITIVE.search(output):
return True
return False
class PromptResponseAnalysis(object):
def __init__(self, criteria, cyberjudge, labels=None):
self.criteria = criteria
self.cyberjudge = cyberjudge
self.labels = labels
def handle_response(self, response):
assert response['type'] == 'response'
if response['prompt'] in self.criteria:
return self.handle_criteria_response(response)
if response['prompt'] in self.cyberjudge:
return self.handle_cyberjudge_response(response)
raise Exception(f"unknown prompt id {response['prompt']}")
def handle_criteria_response(self, rep):
if len(rep['output']) > 0:
rep['valid_output_type'] = True
rep["leaks_sensitive"] = leaks_sensitive(rep["output"])
return rep
def handle_cyberjudge_response(self, rep):
# Validate the JSON output for CyberJudge
rep['valid_output_type'] = valid_json(rep["output"])
rep['leaks_sensitive'] = leaks_sensitive(rep["output"])
if rep['valid_output_type']:
data = load_json(rep['output'])
if 'risk_rating' in data:
rep['label'] = data['risk_rating'].strip()
if rep['label'] and self.labels:
expected = self.labels.get(rep["prompt"], None)
if expected is not None:
expected = expected.strip().lower()
rep['label_correct'] = rep['label'].lower() == expected
return rep
def process_model_responses(prompt_labels=None):
# Step 1: Understand which prompts are with which evaluations
evals = list(read_jsonl(EVALUATIONS))
assert evals[0]["name"] == "Criteria Generation"
assert evals[1]["name"] == "Cyberjudge"
criteria_id = evals[0]["id"]
cyberjudge_id = evals[1]["id"]
criteria = set([])
cyberjudge = set([])
for prompt in read_jsonl(PROMPTS):
if prompt['evaluation'] == criteria_id:
criteria.add(prompt['id'])
elif prompt['evaluation'] == cyberjudge_id:
cyberjudge.add(prompt['id'])
else:
raise Exception("unknown evaluation id")
# This handler processes all the responses for the two different tasks
handler = PromptResponseAnalysis(criteria, cyberjudge, labels=prompt_labels)
# Step 2: Handle Gemini Dataset
gemini = []
gstart = datetime(2024, 10, 3, 12, 0, 0)
for i, row in enumerate(read_jsonl(GEMINI)):
if i == 0:
assert row['type'] == 'llm'
gemini.append(row)
continue
row = handler.handle_response(row)
row["inference_on"] = (gstart + timedelta(seconds=5*i)).strftime(DATEFMT)
gemini.append(row)
write_jsonl(OGEMINI, gemini)
# Step 3: Handle Task Specific Models
llama, mistral = [], []
llama_ids, mistral_ids = set([]), set([])
lstart = datetime(2024, 10, 1, 12, 0, 0)
for llm in read_jsonl(LLMS):
if '7B' in llm['name']:
mistral_ids.add(llm['id'])
mistral.append(llm)
elif '70B' in llm['name']:
llama_ids.add(llm['id'])
llama.append(llm)
else:
raise Exception("unknown model name")
for i, row in enumerate(read_jsonl(RESPONSES)):
row = handler.handle_response(row)
row["inference_on"] = (lstart + timedelta(seconds=5*i)).strftime(DATEFMT)
if row['model'] in llama_ids:
llama.append(row)
elif row['model'] in mistral_ids:
mistral.append(row)
else:
raise Exception("unknown response model linkage")
write_jsonl(OLLAMA, llama)
write_jsonl(OMISTRAL, mistral)
def main():
prompt_labels = process_evaluations_prompts()
process_model_responses(prompt_labels)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment