Skip to content

Instantly share code, notes, and snippets.

@blake41
Created June 26, 2025 17:17
Show Gist options
  • Save blake41/13c9ab510de3f12d17b03fe0054dba53 to your computer and use it in GitHub Desktop.
Save blake41/13c9ab510de3f12d17b03fe0054dba53 to your computer and use it in GitHub Desktop.
dspy.py
from pathlib import Path
from typing import Literal, Optional
import os, random, dspy
from datetime import datetime
from dspy.teleprompt import MIPROv2
from dspy.evaluate import Evaluate
import mlflow
from data_processor import QualificationDataProcessor, load_processed_dataset
# ────────────────────────────────────────────────────────────
# 1. CONFIG CONSTANTS
# ────────────────────────────────────────────────────────────
DATA_EXAMPLES = 50 # number of examples per split
RANDOM_SEED = 42
LM_NAME = "gpt-4o-mini"
LM_TEMPERATURE = 0 # deterministic outputs
NUM_THREADS = 4
OPT_MINIBATCH = False # flip to True once summaries <= 250 tokens
# ────────────────────────────────────────────────────────────
# 2. MLflow setup
# ────────────────────────────────────────────────────────────
mlflow.set_tracking_uri("http://localhost:8080")
mlflow.set_experiment("call_qualifier")
mlflow.set_tag("model", LM_NAME)
mlflow.set_tag("dataset", "call_qualifier")
mlflow.set_tag("version", "1.0.0")
mlflow.set_tag("owner", "blake")
mlflow.dspy.autolog(
log_compiles=True,
log_evals=True,
log_traces_from_compile=True,
log_traces=True,
log_traces_from_eval=True
)
# ────────────────────────────────────────────────────────────
# 3. LiteLLM proxy note
# ────────────────────────────────────────────────────────────
# Make sure you run:
# litellm --model gpt-4o-mini --port 4000 &
# and then invoke this script with:
# OPENAI_API_BASE=http://127.0.0.1:4000 python call_qualifier.py
# ────────────────────────────────────────────────────────────
# 4. Configure DSPy’s LM
# ────────────────────────────────────────────────────────────
lm = dspy.LM(
LM_NAME,
num_retries=2,
temperature=LM_TEMPERATURE
)
os.environ["LITELLM_CACHE"] = "True"
os.environ["LITELLM_CACHE_TYPE"] = "disk"
dspy.settings.configure(lm=lm, output_format="text", verbose=True)
# ────────────────────────────────────────────────────────────
# 5. DSPy Signatures
# ────────────────────────────────────────────────────────────
class CallSummary(dspy.Signature):
transcript: str = dspy.InputField(
desc="Full transcript text of a single sales call."
)
need: str = dspy.OutputField(
desc="Customer's specific business need or pain point mentioned in the call."
)
budget: str = dspy.OutputField(
desc="Budget information, constraints, or financial discussions mentioned."
)
authority: str = dspy.OutputField(
desc="Decision maker information - who has authority to make the purchase decision."
)
timeline: str = dspy.OutputField(
desc="Timeline for implementation, urgency, or decision-making timeframe."
)
objections: str = dspy.OutputField(
desc="Concerns, objections, or hesitations expressed by the prospect."
)
next_steps: str = dspy.OutputField(
desc="Agreed upon next steps, follow-up actions, or commitments made."
)
class CallQualification(dspy.Signature):
need: str = dspy.InputField(desc="Customer's specific business need or pain point")
budget: str = dspy.InputField(desc="Budget information or constraints")
authority: str = dspy.InputField(desc="Decision maker information")
timeline: str = dspy.InputField(desc="Timeline for implementation")
objections: str = dspy.InputField(desc="Concerns or objections expressed")
next_steps: str = dspy.InputField(desc="Agreed upon next steps")
qualification: Literal["qualified", "disqualified"] = dspy.OutputField(
desc="Output exactly 'qualified' or 'disqualified'."
)
# ────────────────────────────────────────────────────────────
# 6. Modules & Pipeline
# ────────────────────────────────────────────────────────────
class CallQualificationPipeline(dspy.Module):
def __init__(self):
super().__init__()
self.summarize = dspy.Predict(CallSummary)
self.classifier = dspy.Predict(CallQualification)
def forward(self, transcript: str) -> str:
summary_result = self.summarize(transcript=transcript)
return self.classifier(
need=summary_result.need,
budget=summary_result.budget,
authority=summary_result.authority,
timeline=summary_result.timeline,
objections=summary_result.objections,
next_steps=summary_result.next_steps
).qualification
pipeline = CallQualificationPipeline()
# ────────────────────────────────────────────────────────────
# 7. Load & split data (force_rebuild=True!)
# ────────────────────────────────────────────────────────────
def load_dataset(
source_file : str = "./fresh_extraction_results.json",
processed_file : str = "./processed_qualification_dataset.json",
total_examples : Optional[int] = None,
force_rebuild : bool = True
):
"""Always reprocess to ensure freshness."""
if not force_rebuild and Path(processed_file).exists():
return load_processed_dataset(processed_file)[0]
proc = QualificationDataProcessor()
train = proc.process_full_pipeline(source_file)
proc.save_processed_dataset(train, processed_file)
return train
print(f"\n📂 Loading and preparing dataset...")
needed = DATA_EXAMPLES * 2
print(f" - Total examples needed: {needed} ({DATA_EXAMPLES} train + {DATA_EXAMPLES} dev)")
print(f" - Force rebuild: True")
try:
all_examples = load_dataset(total_examples=needed, force_rebuild=True)
print(f"✅ Dataset loaded successfully")
print(f" - Examples loaded: {len(all_examples)}")
except Exception as e:
print(f"❌ Failed to load dataset: {e}")
raise
print(f"🔀 Shuffling dataset with seed {RANDOM_SEED}...")
random.Random(RANDOM_SEED).shuffle(all_examples)
if len(all_examples) < needed:
print(f"❌ Insufficient data: need {needed}, found {len(all_examples)}")
raise RuntimeError(f"Need {needed} examples, found {len(all_examples)}")
print(f"✂️ Splitting dataset...")
trainset = all_examples[:DATA_EXAMPLES]
devset = all_examples[DATA_EXAMPLES:needed]
# ────────────────────────────────────────────────────────────
# 8. MiPro v2 compile (auto=light, no valset passed)
# ────────────────────────────────────────────────────────────
def accuracy(ex, pred, _):
result = ex.qualification.lower() == pred.lower()
print(f"🎯 Accuracy check: expected='{ex.qualification}', predicted='{pred}', correct={result}")
return result
try:
optimizer = MIPROv2(
metric=accuracy,
auto="light", # lean preset chooses its own budgets
num_threads=NUM_THREADS,
track_stats=True,
verbose=True,
)
print(f"✅ MIPROv2 optimizer created successfully")
except Exception as e:
print(f"❌ Failed to create MIPROv2 optimizer: {e}")
raise
print(f"\n🚀 Starting MiPro v2 compilation...")
print(f" - Student: {type(pipeline).__name__}")
print(f" - Trainset size: {len(trainset)}")
print(f" - Minibatch enabled: {OPT_MINIBATCH}")
try:
compiled = optimizer.compile(
student=pipeline,
trainset=trainset,
minibatch=OPT_MINIBATCH,
requires_permission_to_run=False,
)
print(f"✅ MiPro v2 compilation completed successfully!")
print(f" - Compiled program type: {type(compiled)}")
print(f" - Has predictors: {hasattr(compiled, 'predictors')}")
if hasattr(compiled, 'predictors'):
print(f" - Number of predictors: {len(compiled.predictors())}")
except Exception as e:
print(f"❌ MiPro v2 compilation failed: {e}")
import traceback
traceback.print_exc()
raise
# ────────────────────────────────────────────────────────────
# 9. Skip evaluation for now - will run separately
# ────────────────────────────────────────────────────────────
print(f"\n⏭️ Skipping evaluation (will run separately after saving compiled model)")
print(f" - Devset size: {len(devset)} examples available for later evaluation")
print(f" - Compiled model will be saved and can be evaluated independently")
# ────────────────────────────────────────────────────────────
# 10. Save compiled program for reuse
# ────────────────────────────────────────────────────────────
print(f"\n💾 Saving compiled program...")
try:
compiled.save("compiled_call_qualifier.json")
print(f"✅ Compiled program saved as compiled_call_qualifier.json")
# Verify the file was actually created
from pathlib import Path
saved_file = Path("compiled_call_qualifier.json")
if saved_file.exists():
print(f" - File size: {saved_file.stat().st_size} bytes")
print(f" - File path: {saved_file.absolute()}")
else:
print(f"⚠️ Warning: Save appeared successful but file not found!")
except Exception as e:
print(f"❌ Failed to save compiled program: {e}")
import traceback
traceback.print_exc()
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment