Skip to content

Instantly share code, notes, and snippets.

@mattatsnyk
Created July 21, 2025 19:45
Show Gist options
  • Save mattatsnyk/2debc258fd93f24b749210ff1fd222eb to your computer and use it in GitHub Desktop.
Save mattatsnyk/2debc258fd93f24b749210ff1fd222eb to your computer and use it in GitHub Desktop.
An example script to get all Snyk issues for a given Snyk org and Repository
import os
import requests
import argparse
import logging
from collections import defaultdict
# --- Configuration ---
SNYK_API_HOST = "https://api.snyk.io"
SNYK_API_BASE_URL = f"{SNYK_API_HOST}/rest"
API_VERSION = "2024-05-24" # Using a recent, stable version
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_snyk_api_token():
"""
Retrieves the Snyk API token from an environment variable.
"""
token = os.environ.get("SNYK_API_TOKEN")
if not token:
logging.error("SNYK_API_TOKEN environment variable not set.")
logging.error("Please set it to your Snyk API token: export SNYK_API_TOKEN='your_token_here'")
exit(1)
return token
def get_auth_headers(token):
"""
Constructs the authorization headers for Snyk API requests.
"""
return {
"Authorization": f"token {token}",
"Accept": "application/vnd.api+json"
}
def find_projects_by_repo_name(org_id, repo_name, headers):
"""
Finds all Snyk projects that match the given repository name within an organization.
Args:
org_id (str): The Snyk Organization ID.
repo_name (str): The name of the repository to find.
headers (dict): The authorization headers.
Returns:
list: A list of project data dictionaries for all matching projects.
"""
logging.info(f"Searching for all projects with repository name '{repo_name}' in organization '{org_id}'...")
matching_projects = []
projects_url = f"{SNYK_API_BASE_URL}/orgs/{org_id}/projects?version={API_VERSION}&limit=100"
while projects_url:
try:
response = requests.get(projects_url, headers=headers)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
response_data = response.json()
projects = response_data.get('data', [])
for project in projects:
# The project name often contains the org/repo structure. This is a simple substring match.
if repo_name in project.get('attributes', {}).get('name', ''):
logging.info(f"Found matching project: {project.get('attributes').get('name')} (ID: {project.get('id')})")
matching_projects.append(project)
# Handle pagination to check the next page
next_url = response_data.get('links', {}).get('next')
if next_url:
# The 'next' link is a relative path, so we prepend the API host.
projects_url = f"{SNYK_API_HOST}{next_url}"
logging.info("Checking next page of projects...")
else:
projects_url = None # Exit loop if no more pages
except requests.exceptions.RequestException as e:
logging.error(f"An error occurred while fetching projects: {e}")
if e.response:
logging.error(f"Response Body: {e.response.text}")
return [] # Return empty list on error
if not matching_projects:
logging.warning(f"No projects found with the name '{repo_name}'. Please check the repository name and organization ID.")
return matching_projects
def get_project_issues(org_id, project_id, headers):
"""
Retrieves all issues (including ignored and all types) for a specific project.
Args:
org_id (str): The Snyk Organization ID.
project_id (str): The ID of the project.
headers (dict): The authorization headers.
Returns:
list: A list of all found issues for the project.
"""
all_issues = []
# We need to query for both ignored and not-ignored issues to get a complete list.
for ignored_status in [True, False]:
logging.info(f"Fetching issues for project ID: {project_id} (Ignored: {ignored_status})")
# By not specifying the 'type' parameter, we get all issue types (code, license, package_vulnerability, etc.).
# We explicitly query for ignored and not-ignored issues.
issues_url = (
f"{SNYK_API_BASE_URL}/orgs/{org_id}/issues"
f"?version={API_VERSION}"
f"&scan_item.id={project_id}"
f"&scan_item.type=project"
f"&ignored={str(ignored_status).lower()}"
f"&limit=100"
)
while issues_url:
try:
response = requests.get(issues_url, headers=headers)
response.raise_for_status()
response_data = response.json()
issues = response_data.get('data', [])
all_issues.extend(issues)
# Handle pagination
next_url = response_data.get('links', {}).get('next')
if next_url:
# The 'next' link is a relative path, so we prepend the API host.
issues_url = f"{SNYK_API_HOST}{next_url}"
logging.info(f"Fetching next page of issues... (Ignored: {ignored_status})")
else:
issues_url = None # Exit loop for this ignored status
except requests.exceptions.RequestException as e:
logging.error(f"An error occurred while fetching issues: {e}")
if e.response:
logging.error(f"Response Body: {e.response.text}")
break # Exit inner loop on error
return all_issues
def main():
"""
Main function to orchestrate finding all projects for a repo and fetching their issues.
"""
parser = argparse.ArgumentParser(description="Fetch Snyk issues for a given repository.")
parser.add_argument("--orgId", required=True, help="Your Snyk Organization ID.")
parser.add_argument("--repoName", required=True, help="The name of the repository (e.g., 'my-awesome-app').")
args = parser.parse_args()
snyk_token = get_snyk_api_token()
auth_headers = get_auth_headers(snyk_token)
# Step 1: Find all project IDs based on the repository name
projects = find_projects_by_repo_name(args.orgId, args.repoName, auth_headers)
if projects:
total_issues_found = 0
# Dictionary to hold summary counts for each project
project_summary_counts = {}
for project in projects:
project_id = project.get('id')
project_name = project.get('attributes', {}).get('name')
# Initialize counters for the current project
project_summary_counts[project_name] = defaultdict(int)
print("\n" + "="*40)
logging.info(f"Processing project: '{project_name}' (ID: {project_id})")
# Step 2: Fetch issues using the found project ID
issues = get_project_issues(args.orgId, project_id, auth_headers)
if issues:
total_issues_found += len(issues)
logging.info(f"Found {len(issues)} issues for this project.")
# Print some details for each issue
for issue in issues:
attrs = issue.get('attributes', {})
severity = attrs.get('effective_severity_level', 'unknown')
status = attrs.get('status')
# Increment severity count
project_summary_counts[project_name][severity] += 1
# Increment ignored count if applicable
if status == 'ignored':
project_summary_counts[project_name]['ignored'] += 1
print("---")
print(f"Project: {project_name}")
print(f"Issue Title: {attrs.get('title')}")
print(f"Severity: {severity}")
print(f"Status: {status}")
print(f"Type: {attrs.get('type')}")
else:
logging.info(f"No issues found for project '{project_name}'.")
# Print the final summary
print("\n" + "="*50)
logging.info("AGGREGATE ISSUE COUNT PER PROJECT")
print("="*50)
for project_name, counts in project_summary_counts.items():
print(f"\nProject: {project_name}")
print(f" - Critical: {counts.get('critical', 0)}")
print(f" - High: {counts.get('high', 0)}")
print(f" - Medium: {counts.get('medium', 0)}")
print(f" - Low: {counts.get('low', 0)}")
print(f" - Ignored: {counts.get('ignored', 0)}")
print("\n" + "="*50)
logging.info(f"Finished. Found a total of {total_issues_found} issues across {len(projects)} projects for repo '{args.repoName}'.")
print("="*50)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment