Last active
March 19, 2025 15:25
-
-
Save mikeatlas/07e98cafb308d26ab6b012f11c000849 to your computer and use it in GitHub Desktop.
Finds all newly created API endpoints going back to a specific date across a list of git repositories containing NestJS web services and prints out a report. This script was generated with a few iterations of prompts to ChatGPT o1 model.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import subprocess | |
import sys | |
# Regex to parse the diff hunk header, e.g. "@@ -12,5 +12,6 @@" | |
HUNK_HEADER_RE = re.compile(r'@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@') | |
# Regex to find @ApiOperation(...) inside a line | |
API_OPERATION_RE = re.compile(r'@ApiOperation\s*\(\s*(.*)\s*\)') | |
def switch_to_main_and_pull(repo_path, branch='main'): | |
""" | |
Ensures the local repository is on the specified branch (e.g., 'main') | |
and is up-to-date via 'git pull'. | |
""" | |
try: | |
subprocess.check_output( | |
['git', '-C', repo_path, 'checkout', branch], | |
stderr=subprocess.STDOUT | |
) | |
subprocess.check_output( | |
['git', '-C', repo_path, 'pull'], | |
stderr=subprocess.STDOUT | |
) | |
except subprocess.CalledProcessError as e: | |
print(f"[ERROR] Could not switch to branch '{branch}' or pull latest in {repo_path}. " | |
f"Git error:\n{e.output.decode('utf-8', errors='ignore')}\n", | |
file=sys.stderr) | |
def get_commits_for_controller_files(repo_path, since_date): | |
""" | |
Return a list of commit info from commits after the specified 'since_date' | |
that involve files ending in '*.controller.ts'. | |
Each item is (commit_hash, author, commit_date, file_path). | |
""" | |
try: | |
# Format: commit_hash|author|commit_date | |
cmd = [ | |
'git', '-C', repo_path, 'log', | |
f'--since={since_date}', | |
'--pretty=format:%H|%an|%ci', | |
'--name-only' | |
] | |
output = subprocess.check_output(cmd, universal_newlines=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error retrieving commits from {repo_path}: {e}", file=sys.stderr) | |
return [] | |
commits = [] | |
current_commit = None | |
current_author = None | |
current_date = None | |
for line in output.splitlines(): | |
if '|' in line: | |
# This is a commit line: "<commit_hash>|<author>|<commit_date>" | |
parts = line.strip().split('|', 2) | |
if len(parts) == 3: | |
current_commit, current_author, current_date = parts | |
else: | |
# It's presumably a file path | |
file_path = line.strip() | |
if file_path.endswith('.controller.ts'): | |
# Collect commit info + file | |
commits.append((current_commit, current_author, current_date, file_path)) | |
return commits | |
def parse_diff_for_new_api_operations(repo_path, commit_hash, file_path): | |
""" | |
Parse the diff of a specific commit and file to find newly added lines | |
containing '@ApiOperation(...)'. Return a list of tuples: | |
[ | |
(line_number_in_new_file, extracted_endpoint_name), | |
... | |
] | |
""" | |
results = [] | |
try: | |
# Use --unified=0 so there's minimal context, making it simpler to track line numbers | |
cmd = ['git', '-C', repo_path, 'show', '--unified=0', commit_hash, '--', file_path] | |
diff_output = subprocess.check_output(cmd, universal_newlines=True) | |
except subprocess.CalledProcessError: | |
return [] | |
current_new_line = None | |
for line in diff_output.splitlines(): | |
# Check if this line is a hunk header: "@@ -old_start,old_len +new_start,new_len @@" | |
hunk_match = HUNK_HEADER_RE.search(line) | |
if hunk_match: | |
new_start = int(hunk_match.group(3)) | |
current_new_line = new_start - 1 # We'll increment on each added line | |
continue | |
# Skip lines that are not additions (or are diff meta lines) | |
if not line.startswith('+') or line.startswith('+++'): | |
continue | |
# It's an added line | |
current_new_line += 1 | |
added_code = line[1:].strip() # remove the leading '+' | |
# Check if this line has an @ApiOperation | |
api_match = API_OPERATION_RE.search(added_code) | |
if api_match: | |
contents = api_match.group(1).strip() | |
endpoint_name = extract_api_operation_name(contents) | |
results.append((current_new_line, endpoint_name)) | |
return results | |
def extract_api_operation_name(contents): | |
""" | |
Attempt to parse out a user-friendly name from the @ApiOperation(...) contents. | |
This might be either: | |
- @ApiOperation('My Endpoint') | |
- @ApiOperation({ summary: 'My Endpoint', ... }) | |
We'll do a simple parse for 'summary' or a direct string argument. | |
""" | |
# Check if there's a summary property | |
summary_match = re.search(r"summary\s*:\s*['\"](.+?)['\"]", contents) | |
if summary_match: | |
return summary_match.group(1) | |
# Otherwise, check if there's a direct string argument | |
direct_match = re.search(r"['\"](.+?)['\"]", contents) | |
if direct_match: | |
return direct_match.group(1) | |
# Fallback to the full decorator contents if we can't parse a direct string | |
return contents | |
def main(): | |
""" | |
Usage: | |
python scan_api_operations.py <root_path> <start_date> <repo_name1> [<repo_name2> ...] | |
Example: | |
python scan_api_operations.py /path/to/root 2024-08-01 my-service | |
""" | |
if len(sys.argv) < 4: | |
print("Usage: python scan_api_operations.py <root_path> <start_date> <repo_name1> [<repo_name2> ...]") | |
sys.exit(1) | |
root_path = sys.argv[1] | |
since_date = sys.argv[2] # e.g., "2024-08-01" | |
repo_names = sys.argv[3:] # one or more repo directories | |
# We'll collect final entries as tuples: | |
# (repo_name, file_path, endpoint, line_number, author, commit_date) | |
report_entries = [] | |
for repo_name in repo_names: | |
repo_path = os.path.join(root_path, repo_name) | |
if not os.path.isdir(os.path.join(repo_path, '.git')): | |
print(f"[WARNING] {repo_path} is not a Git repository (no .git folder).") | |
continue | |
print(f"\n[INFO] Analyzing repository: {repo_name}") | |
# 1. Ensure we are on main branch and up to date | |
switch_to_main_and_pull(repo_path) | |
# 2. Gather commits & changed controller files since the given date | |
commits = get_commits_for_controller_files(repo_path, since_date) | |
# 3. For each commit, parse the diff to find newly added @ApiOperation lines | |
for commit_hash, author, commit_date, file_path in commits: | |
new_api_ops = parse_diff_for_new_api_operations(repo_path, commit_hash, file_path) | |
for (line_number, endpoint) in new_api_ops: | |
report_entries.append(( | |
repo_name, | |
file_path, | |
endpoint, | |
line_number, | |
author, | |
commit_date, | |
)) | |
# 4. Print the final report | |
print(f"\n=== New @ApiOperation endpoints since {since_date} ===\n") | |
for entry in report_entries: | |
repo_name, file_path, endpoint, line_number, author, commit_date = entry | |
print(f"Repo: {repo_name}\n" | |
f"File Path: {file_path}\n" | |
f"Line: {line_number}\n" | |
f"Endpoint: {endpoint}\n" | |
f"Author: {author}\n" | |
f"Commit Date: {commit_date}\n") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment