Skip to content

Instantly share code, notes, and snippets.

@kuntalchandra
Created January 29, 2026 05:00
Show Gist options
  • Select an option

  • Save kuntalchandra/67fc5d7559deb9d06fc6b349eb85c9c7 to your computer and use it in GitHub Desktop.

Select an option

Save kuntalchandra/67fc5d7559deb9d06fc6b349eb85c9c7 to your computer and use it in GitHub Desktop.
Regression analyzer sample code
"""Main agent orchestrator for regression analysis."""
import logging
from typing import Optional
from regression_analyser.github_client import GitHubClient
from regression_analyser.code_analyzer import CodeAnalyzer
from regression_analyser.endpoint_mapper import EndpointMapper
from regression_analyser.ai_agent import RegressionAnalyzerAgent
from regression_analyser.comment_formatter import CommentFormatter
from regression_analyser.models import AnalysisResult
from regression_analyser.config import Config
logger = logging.getLogger(__name__)
class RegressionAnalyzer:
"""Main orchestrator for regression analysis."""
def __init__(self) -> None:
try:
Config.validate()
except ValueError as e:
logger.error(f"Configuration validation failed: {e}")
raise
try:
self.github_client = GitHubClient()
self.code_analyzer = CodeAnalyzer()
self.endpoint_mapper = EndpointMapper()
self.ai_agent = RegressionAnalyzerAgent()
self.comment_formatter = CommentFormatter()
logger.debug("RegressionAnalyzer initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize RegressionAnalyzer: {e}", exc_info=True)
raise
def analyze_pr(self, pr_number: int) -> Optional[AnalysisResult]:
"""Analyze a PR and return results.
This method enforces:
- A hard cap on number of files per PR (Config.MAX_FILES_PER_PR)
- Skipping docs / infra only PRs (no Python files)
"""
logger.debug(f"Starting analysis for PR #{pr_number}")
# Get PR information
logger.debug(f"Fetching PR info for #{pr_number}")
pr_info = self.github_client.get_pr_info(pr_number)
if not pr_info:
error_msg = f"Failed to get PR info for #{pr_number}. Check that the PR exists and your GitHub token has access."
logger.error(error_msg)
raise ValueError(error_msg)
# Re-use previous analysis when the head SHA has not changed
logger.debug("Checking for existing analysis comment...")
existing_comment = self.github_client.get_existing_analysis_comment(pr_number)
head_sha = pr_info.get("head_sha")
if existing_comment and head_sha:
marker = "regression-analyser:sha:"
if marker in existing_comment.body:
try:
stored_sha = existing_comment.body.split(marker, maxsplit=1)[1].split("-->", maxsplit=1)[0].strip()
if stored_sha == head_sha:
logger.debug(
f"Head SHA unchanged ({head_sha[:8]}...) and existing analysis comment found; "
"skipping re-analysis to save cost",
)
return None
except Exception as e:
# If marker parsing fails, fall back to normal analysis
logger.debug(f"Failed to parse cached SHA marker from existing comment: {e}")
# Get code changes (Python files only)
logger.debug(f"Fetching code changes for PR #{pr_number}...")
code_changes = self.github_client.get_pr_changes(pr_number)
total_files = len(code_changes)
logger.debug(f"Found {total_files} Python files changed in PR #{pr_number}")
# Skip analysis if PR only modifies regression_analyser itself (meta-change)
if total_files > 0:
regression_analyser_files = [
change for change in code_changes
if change.file_path.startswith("regression_analyser/")
]
if len(regression_analyser_files) == total_files:
logger.debug(
"PR only modifies regression_analyser files; skipping analysis "
"(this is a meta-change to the analyser itself)"
)
summary = (
"This PR only modifies the regression analyser itself. "
"Analysis was skipped as this is a meta-change to the tool."
)
return AnalysisResult(
pr_number=pr_number,
pr_title=pr_info["title"],
affected_endpoints=[],
total_endpoints_analyzed=0,
high_impact_count=0,
medium_impact_count=0,
low_impact_count=0,
code_changes=code_changes,
summary=summary,
metadata={"skipped_reason": "regression_analyser_only"},
)
if total_files == 0:
# Docs / infra only PR - nothing for regression analyser to do
logger.debug("No Python files changed in PR; skipping regression analysis")
summary = (
"No Python backend files changed in this PR. "
"Regression analysis was skipped to avoid unnecessary cost."
)
return AnalysisResult(
pr_number=pr_number,
pr_title=pr_info["title"],
affected_endpoints=[],
total_endpoints_analyzed=0,
high_impact_count=0,
medium_impact_count=0,
low_impact_count=0,
code_changes=[],
summary=summary,
metadata={"skipped_reason": "no_python_files", "head_sha": head_sha},
)
if total_files > Config.MAX_FILES_PER_PR:
message = (
f"PR #{pr_number} has {total_files} Python files changed which exceeds the configured cap "
f"MAX_FILES_PER_PR={Config.MAX_FILES_PER_PR}. "
"Please split this into smaller PRs (ideally <= 25 files) to keep reviews and "
"regression analysis focused."
)
logger.error(message)
raise ValueError(message)
# Analyze code changes
analyzed_changes = []
for change in code_changes:
analyzed_change = self.code_analyzer.analyze_diff(change.file_path, change.diff)
analyzed_changes.append(analyzed_change)
# Map to endpoints (rule-based)
affected_endpoints = self.endpoint_mapper.map_changes_to_endpoints(analyzed_changes)
# AI analysis
analysis_result = self.ai_agent.analyze_regression(
code_changes=analyzed_changes,
pr_title=pr_info["title"],
pr_description=pr_info.get("body", ""),
)
# Merge rule-based and AI results
analysis_result.pr_number = pr_number
analysis_result.pr_title = pr_info["title"]
analysis_result.code_changes = analyzed_changes
analysis_result.metadata["head_sha"] = head_sha
# Enhance with AI certainty
if analysis_result.affected_endpoints:
analysis_result.affected_endpoints = self.ai_agent.enhance_with_certainty(
analysis_result.affected_endpoints,
analyzed_changes,
)
return analysis_result
def post_analysis_to_pr(self, pr_number: int, result: AnalysisResult) -> bool:
"""Post analysis results as PR comment."""
comment = self.comment_formatter.format_analysis_comment(result)
# Check for existing comment
existing_comment_id = self.github_client.find_existing_comment(pr_number)
if existing_comment_id:
return self.github_client.update_pr_comment(pr_number, existing_comment_id, comment)
return self.github_client.post_pr_comment(pr_number, comment)
def run_analysis(self, pr_number: int) -> bool:
"""Run complete analysis workflow."""
logger.debug(f"Running analysis workflow for PR #{pr_number}")
try:
result = self.analyze_pr(pr_number)
if not result:
# No analysis needed (e.g., docs-only PR or cached result)
logger.debug(
f"No regression analysis comment posted for PR #{pr_number} "
"(likely docs-only PR or cached result - no action needed)"
)
return True
logger.debug(f"Analysis complete, posting comment to PR #{pr_number}...")
success = self.post_analysis_to_pr(pr_number, result)
if not success:
logger.error(f"Failed to post analysis for PR #{pr_number}")
return success
except ValueError as e:
# Re-raise validation errors (e.g., file cap exceeded) so they're not swallowed
logger.error(f"Validation error: {e}")
raise
except Exception as e:
logger.error(f"Analysis workflow failed: {e}", exc_info=True)
raise # Re-raise so caller can handle it
# Allow running agent.py directly for backwards compatibility
if __name__ == "__main__":
from regression_analyser.__main__ import main
main()
"""AI Agent for regression analysis using LLM."""
import json
from typing import List, Dict, Any
import logging
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
from regression_analyser.models import (
AnalysisResult,
AffectedEndpoint,
CodeChange,
ImpactLevel,
ChangeType,
)
from regression_analyser.config import Config
logger = logging.getLogger(__name__)
def _normalize_change_type(change_type_str: str) -> ChangeType:
"""Normalize change_type string to valid ChangeType enum.
Maps common variations and invalid values to valid ChangeType enum values.
"""
if not change_type_str:
return ChangeType.SERVICE_MODIFIED
change_type_lower = change_type_str.lower().strip()
# Direct mappings
type_mapping = {
"endpoint_modified": ChangeType.ENDPOINT_MODIFIED,
"service_modified": ChangeType.SERVICE_MODIFIED,
"service_logic_modified": ChangeType.SERVICE_MODIFIED, # Common AI variation
"service_changed": ChangeType.SERVICE_MODIFIED,
"service_updated": ChangeType.SERVICE_MODIFIED,
"model_modified": ChangeType.MODEL_MODIFIED,
"model_changed": ChangeType.MODEL_MODIFIED,
"schema_modified": ChangeType.SCHEMA_MODIFIED,
"schema_changed": ChangeType.SCHEMA_MODIFIED,
"dependency_modified": ChangeType.DEPENDENCY_MODIFIED,
"dependency_changed": ChangeType.DEPENDENCY_MODIFIED,
"config_modified": ChangeType.CONFIG_MODIFIED,
"config_changed": ChangeType.CONFIG_MODIFIED,
"configuration_modified": ChangeType.CONFIG_MODIFIED,
}
# Try direct mapping first
if change_type_lower in type_mapping:
return type_mapping[change_type_lower]
# Try to match enum values
try:
return ChangeType(change_type_lower)
except ValueError:
# If it contains keywords, try to infer
if "endpoint" in change_type_lower or "route" in change_type_lower or "api" in change_type_lower:
return ChangeType.ENDPOINT_MODIFIED
elif "service" in change_type_lower or "logic" in change_type_lower or "business" in change_type_lower:
return ChangeType.SERVICE_MODIFIED
elif "model" in change_type_lower or "database" in change_type_lower:
return ChangeType.MODEL_MODIFIED
elif "schema" in change_type_lower:
return ChangeType.SCHEMA_MODIFIED
elif "dependency" in change_type_lower or "import" in change_type_lower:
return ChangeType.DEPENDENCY_MODIFIED
elif "config" in change_type_lower or "setting" in change_type_lower:
return ChangeType.CONFIG_MODIFIED
else:
# Default fallback
logger.debug(f"Unknown change_type '{change_type_str}', defaulting to SERVICE_MODIFIED")
return ChangeType.SERVICE_MODIFIED
class RegressionAnalyzerAgent:
"""AI agent that analyzes code changes and identifies affected endpoints."""
def __init__(self):
self.client = OpenAI(api_key=Config.OPENAI_API_KEY)
# Apply model alias if needed (e.g., gpt-4-turbo -> gpt-4o)
self.model = Config.MODEL_ALIASES.get(Config.OPENAI_MODEL, Config.OPENAI_MODEL)
if self.model != Config.OPENAI_MODEL:
logger.debug(f"Using model alias: {Config.OPENAI_MODEL} -> {self.model}")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def analyze_regression(
self,
code_changes: List[CodeChange],
pr_title: str,
pr_description: str = "",
) -> AnalysisResult:
"""Analyze code changes and identify affected endpoints."""
logger.debug(f"Starting regression analysis for {len(code_changes)} file changes")
# Prepare context for AI
context = self._prepare_analysis_context(code_changes, pr_title, pr_description)
# Call AI to analyze
analysis_prompt = self._build_analysis_prompt(context)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": self._get_system_prompt(),
},
{
"role": "user",
"content": analysis_prompt,
},
],
temperature=Config.OPENAI_TEMPERATURE,
max_tokens=Config.MAX_TOKENS,
response_format={"type": "json_object"},
)
result_json = json.loads(response.choices[0].message.content)
return self._parse_ai_response(result_json, code_changes)
except Exception as e:
error_msg = str(e)
error_details = ""
# Try to extract more details from the error
if hasattr(e, 'response') and hasattr(e.response, 'json'):
try:
error_data = e.response.json()
if 'error' in error_data:
error_details = f" - {error_data['error'].get('message', '')}"
except Exception:
pass
# Check for quota exceeded error
if "quota" in error_msg.lower() or "insufficient_quota" in error_msg.lower() or "429" in error_msg:
logger.error(
f"OpenAI API quota exceeded.{error_details}\n"
f"Status: System operating in fallback mode (rule-based analysis).\n"
f"Resolution: Add credits to OpenAI account or wait for quota reset.\n"
f"Impact: AI-powered reasoning temporarily unavailable, basic analysis continues."
)
# Check for model not found error
elif "model" in error_msg.lower() and ("not exist" in error_msg.lower() or "not found" in error_msg.lower() or "does not exist" in error_msg.lower()):
# Check if it's a deprecated model
if self.model in Config.DEPRECATED_MODELS:
replacement = Config.DEPRECATED_MODELS[self.model]
logger.error(
f"OpenAI model '{self.model}' is deprecated and no longer available. "
f"Please update your .env file: OPENAI_MODEL={replacement}"
)
else:
logger.error(
f"OpenAI model '{self.model}' not found or not accessible.{error_details}\n"
f"Your API key may not have access to this model, or the model name may be incorrect.\n"
f"Try updating your .env file with one of these models:\n"
f" - OPENAI_MODEL=gpt-4o (recommended - latest model)\n"
f" - OPENAI_MODEL=gpt-4 (alternative)\n"
f" - OPENAI_MODEL=gpt-3.5-turbo (cheaper option)\n"
f"\nNote: 'gpt-4-turbo' is automatically mapped to 'gpt-4o' if available.\n"
f"To check available models, visit: https://platform.openai.com/docs/models"
)
else:
logger.error(f"AI analysis failed: {e}{error_details}")
# Fallback to rule-based analysis
logger.warning("Falling back to rule-based analysis")
return self._fallback_analysis(code_changes, pr_title)
def _prepare_analysis_context(
self, code_changes: List[CodeChange], pr_title: str, pr_description: str
) -> Dict[str, Any]:
"""Prepare context for AI analysis."""
def _summarize_diff(diff: str) -> str:
"""Return a compact, signal-focused summary of a unified diff.
- Skips whitespace-only changes
- Skips pure comment lines (starting with '#')
- Ignores diff headers (--- / +++)
- Caps the number of lines to keep token usage bounded
"""
if not diff:
return ""
summary_lines: list[str] = []
max_lines = 80
for raw_line in diff.split("\n"):
if len(summary_lines) >= max_lines:
break
if not raw_line:
continue
# Only consider added/removed/context lines
prefix = raw_line[0]
if prefix not in {"+", "-", " "}:
continue
# Skip diff headers
if raw_line.startswith("+++") or raw_line.startswith("---"):
continue
# Strip the diff prefix for content inspection
content = raw_line[1:]
stripped = content.strip()
# Skip whitespace-only lines
if not stripped:
continue
# Skip comment-only lines (Python style)
if stripped.startswith("#"):
continue
summary_lines.append(raw_line)
if not summary_lines:
return "No significant code changes (only comments / whitespace)."
return "\n".join(summary_lines)
return {
"pr_title": pr_title,
"pr_description": pr_description,
"files_changed": len(code_changes),
"changes": [
{
"file": change.file_path,
"type": change.change_type,
"functions": change.functions_modified,
"classes": change.classes_modified,
"lines_added": change.lines_added,
"lines_removed": change.lines_removed,
"diff_summary": _summarize_diff(change.diff),
}
for change in code_changes[:Config.MAX_FILES_TO_ANALYZE]
],
}
def _get_system_prompt(self) -> str:
"""Get system prompt for AI agent."""
return """You are an expert backend engineer analyzing code changes in a FastAPI application.
Your task is to identify which API endpoints could be affected by the changes.
Consider:
1. Direct changes to endpoint functions in views.py files
2. Changes to services/models/schemas that endpoints depend on
3. Changes to shared utilities or dependencies
4. Database model changes that affect endpoints
5. Configuration changes that affect behavior
For each affected endpoint, provide:
- HTTP method and path
- Confidence score (0.0-1.0)
- Clear reasoning
- Impact level (high/medium/low)
- Change type (MUST be one of: endpoint_modified, service_modified, model_modified, schema_modified, dependency_modified, config_modified)
- Test recommendations for QA
IMPORTANT: The change_type field MUST use one of these exact values:
- "endpoint_modified" - Direct changes to API endpoint functions
- "service_modified" - Changes to service layer logic (including service_logic_modified should use this)
- "model_modified" - Changes to database models
- "schema_modified" - Changes to request/response schemas
- "dependency_modified" - Changes to dependencies or imports
- "config_modified" - Changes to configuration
Return your analysis as JSON with this structure:
{
"affected_endpoints": [
{
"method": "GET",
"path": "/api/v1/customers",
"function_name": "get_customers",
"file_path": "src/customer/views.py",
"confidence": 0.9,
"reasoning": "The get_customers function was directly modified",
"impact_level": "high",
"change_type": "endpoint_modified",
"test_recommendations": ["Test GET /api/v1/customers with various filters", "Verify pagination"]
}
],
"summary": "Brief summary of analysis"
}
"""
def _build_analysis_prompt(self, context: Dict[str, Any]) -> str:
"""Build prompt for AI analysis."""
prompt = f"""Analyze the following code changes and identify affected API endpoints.
PR Title: {context['pr_title']}
Files Changed: {context['files_changed']}
Changes:
{json.dumps(context['changes'], indent=2)}
Based on the codebase structure (FastAPI with views.py files containing endpoints),
identify all API endpoints that could be affected by these changes.
Consider:
- Direct modifications to endpoint functions
- Service layer changes that endpoints call
- Model/schema changes that affect request/response
- Database changes
- Configuration changes
Provide a comprehensive analysis with confidence scores and reasoning.
"""
return prompt
def _parse_ai_response(
self, response_json: Dict[str, Any], code_changes: List[CodeChange]
) -> AnalysisResult:
"""Parse AI response into AnalysisResult."""
affected_endpoints = []
for endpoint_data in response_json.get("affected_endpoints", []):
try:
# Normalize change_type to handle invalid values from AI
raw_change_type = endpoint_data.get("change_type", "service_modified")
normalized_change_type = _normalize_change_type(raw_change_type)
# Normalize impact_level as well
raw_impact_level = endpoint_data.get("impact_level", "medium")
try:
impact_level = ImpactLevel(raw_impact_level.lower())
except ValueError:
# Map common variations
impact_level_map = {
"high": ImpactLevel.HIGH,
"medium": ImpactLevel.MEDIUM,
"low": ImpactLevel.LOW,
"none": ImpactLevel.NONE,
}
impact_level = impact_level_map.get(raw_impact_level.lower(), ImpactLevel.MEDIUM)
logger.debug(f"Normalized impact_level '{raw_impact_level}' to '{impact_level.value}'")
endpoint = AffectedEndpoint(
method=endpoint_data["method"],
path=endpoint_data["path"],
function_name=endpoint_data["function_name"],
file_path=endpoint_data["file_path"],
confidence=float(endpoint_data.get("confidence", 0.5)),
reasoning=endpoint_data["reasoning"],
impact_level=impact_level,
change_type=normalized_change_type,
test_recommendations=endpoint_data.get("test_recommendations", []),
)
affected_endpoints.append(endpoint)
except (ValueError, KeyError, TypeError) as e:
logger.warning(
f"Failed to parse endpoint: {e}. "
f"Endpoint data: method={endpoint_data.get('method')}, "
f"path={endpoint_data.get('path')}, "
f"change_type={endpoint_data.get('change_type')}"
)
# Calculate impact counts
high_count = sum(1 for e in affected_endpoints if e.impact_level == ImpactLevel.HIGH)
medium_count = sum(1 for e in affected_endpoints if e.impact_level == ImpactLevel.MEDIUM)
low_count = sum(1 for e in affected_endpoints if e.impact_level == ImpactLevel.LOW)
return AnalysisResult(
pr_number=0, # Will be set by caller
pr_title="", # Will be set by caller
affected_endpoints=affected_endpoints,
total_endpoints_analyzed=len(affected_endpoints),
high_impact_count=high_count,
medium_impact_count=medium_count,
low_impact_count=low_count,
code_changes=code_changes,
summary=response_json.get("summary", ""),
)
def _fallback_analysis(
self, code_changes: List[CodeChange], pr_title: str
) -> AnalysisResult:
"""Fallback rule-based analysis if AI fails."""
logger.debug("Using fallback rule-based analysis")
affected_endpoints = []
for change in code_changes:
if "views.py" in change.file_path:
# Simple heuristic: if views.py changed, endpoints are affected
for func in change.functions_modified:
affected_endpoints.append(
AffectedEndpoint(
method="UNKNOWN",
path="UNKNOWN",
function_name=func,
file_path=change.file_path,
confidence=0.7,
reasoning=f"Function {func} was modified in views file",
impact_level=ImpactLevel.MEDIUM,
change_type=ChangeType.ENDPOINT_MODIFIED,
test_recommendations=[f"Test endpoint using {func} function"],
)
)
return AnalysisResult(
pr_number=0,
pr_title=pr_title,
affected_endpoints=affected_endpoints,
total_endpoints_analyzed=len(affected_endpoints),
code_changes=code_changes,
summary="Fallback analysis - manual review recommended",
)
def enhance_with_certainty(
self, endpoints: List[AffectedEndpoint], code_changes: List[CodeChange]
) -> List[AffectedEndpoint]:
"""Enhance endpoint analysis with certainty calculations."""
# Use AI to refine confidence scores
prompt = f"""Review these potentially affected endpoints and adjust confidence scores based on:
1. How directly the change affects the endpoint
2. Whether the endpoint actually uses the changed code
3. The type of change (breaking vs non-breaking)
Endpoints to review:
{json.dumps([e.model_dump() for e in endpoints], indent=2)}
Code changes:
{json.dumps([c.model_dump() for c in code_changes], indent=2)}
Return updated endpoints with refined confidence scores.
"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert at code impact analysis."},
{"role": "user", "content": prompt},
],
temperature=0.1,
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
# Update confidence scores
for endpoint in endpoints:
for updated in result.get("endpoints", []):
if updated.get("function_name") == endpoint.function_name:
endpoint.confidence = float(updated.get("confidence", endpoint.confidence))
break
except Exception as e:
logger.debug(f"Failed to enhance certainty: {e}")
return endpoints
name: Regression Analysis
on:
pull_request:
types: [opened, synchronize, reopened]
issue_comment:
types: [created]
jobs:
analyze-pr:
runs-on: ubuntu-latest
if: |
github.event.pull_request.head.repo.full_name == github.repository ||
(github.event.issue.pull_request && contains(github.event.comment.body, '/analyze') || contains(github.event.comment.body, '/regression'))
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install -r regression_analyser/requirements.txt
- name: Run regression analysis
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
REPO_OWNER: ${{ github.repository_owner }}
REPO_NAME: ${{ github.event.repository.name }}
run: |
# Check if OPENAI_API_KEY is set, if not, skip with a message
if [ -z "$OPENAI_API_KEY" ]; then
echo "⚠️ OPENAI_API_KEY secret not configured. Skipping regression analysis."
echo "To enable analysis, add OPENAI_API_KEY to repository secrets."
exit 0
fi
python -m regression_analyser.agent analyze --pr-number ${{ github.event.pull_request.number || github.event.issue.number }}
passive-setup:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request' && github.event.action == 'opened'
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install -r regression_analyser/requirements.txt
- name: Store PR status
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_OWNER: ${{ github.repository_owner }}
REPO_NAME: ${{ github.event.repository.name }}
run: |
python -m regression_analyser.agent store-status --pr-number ${{ github.event.pull_request.number }}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment