Skip to content

Instantly share code, notes, and snippets.

@davidfischer-ch
Last active May 27, 2025 15:01
Show Gist options
  • Save davidfischer-ch/cdfede27ac053a8332b2127becc07608 to your computer and use it in GitHub Desktop.
Save davidfischer-ch/cdfede27ac053a8332b2127becc07608 to your computer and use it in GitHub Desktop.
sonar_sarif_to_generic.py
"""
Convert SARIF to Generic SonarQube issues import format.
Links:
* https://community.sonarsource.com/t/import-sarif-results-as-security-hotspots/83223
* docs.sonarqube.org/9.8/analyzing-source-code/importing-external-issues/generic-issue-import-format
* https://gist.github.com/davidfischer-ch/cdfede27ac053a8332b2127becc07608
Author: David Fischer <[email protected]>
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Final
import json
import os
import sys
# https://docs.oasis-open.org/sarif/sarif/v2.1.0/os/sarif-v2.1.0-os.html#_Toc34317648
# SonarQube severity can be one of BLOCKER, CRITICAL, MAJOR, MINOR, INFO
LEVEL_TO_SERVERITY: Final[dict[str, str]] = {
'warning': 'MAJOR',
'error': 'CRITICAL',
'note': 'MINOR',
'none': 'INFO'
}
DEFAULT_REPORT_TYPE: Final[str] = 'CODE_SMELL'
REPORT_TYPE_BY_ENGINE: Final[dict[str, str]] = {
'ansible-lint': 'CODE_SMELL',
'robocop': 'CODE_SMELL',
'tflint': 'CODE_SMELL'
}
@dataclass
class Position(object):
line: int
column: int
def __bool__(self):
return bool(self.line) and bool(self.column)
def __str__(self):
return f'({self.line}, {self.column})'
def main_ocsin(dotenv_path: Path = Path('sarif_to_generic.env')) -> None:
reports_dir = Path(os.environ['REPORTS_DIR'])
reports_exists = reports_dir.is_dir()
print('Reports directory', reports_dir, 'found' if reports_exists else 'not found')
for file_path in find_recursive(reports_dir, '*'):
print('Found file', file_path)
print('Convert SARIF Quality to Generic SonarQube reports inside reports directory')
if reports_exists:
for source in find_recursive(reports_dir, '*/sonarqube-sarif-quality*-results.json'):
print(f'Converting file {source}')
sarif_to_generic(
source=source,
target=source.parent / source.name.replace(
'sonarqube-sarif-quality',
'sonarqube-external-from-sarif'))
print('Find external reports for SonarQube inside reports directory')
external_reports: list[str] = []
if reports_exists:
external_reports = sorted(str(p) for p in find_recursive(
reports_dir, '*/sonarqube-external*-results.json'))
print('\n'.join(external_reports) if external_reports else 'No reports found.')
print('Find SARIF (SAST) reports for SonarQube inside reports directory')
sarif_sast_reports: list[str] = []
if reports_exists:
sarif_sast_reports = sorted(str(p) for p in find_recursive(
reports_dir, '*/sonarqube-sarif-sast*-results.json'))
print('\n'.join(sarif_sast_reports) if sarif_sast_reports else 'No reports found.')
print('generate dotenv file for Sonar')
dotenv_path.write_text(
f"EXTERNAL_REPORTS={','.join(external_reports)}\n"
f"SARIF_SAST_REPORTS={','.join(sarif_sast_reports)}",
encoding='utf-8')
def sarif_to_generic( # pylint:disable=too-many-locals
source: Path | str,
target: Path | str
) -> None:
source = Path(source).resolve()
target = Path(target).resolve()
if target.exists():
raise IOError(f'Target file "{target}" already exist.')
sarif_data: dict = json.loads(source.read_text(encoding='utf-8'))
if 'sarif' not in sarif_data['$schema']:
raise ValueError('Source is (probably) not a valid sarif file.')
issues: list[dict] = []
for run_index, run_data in enumerate(sarif_data['runs'], 1):
driver_data = run_data['tool']['driver']
engine_id = driver_data['name']
engine_key = engine_id.lower()
rules: dict[str, dict] = {rule['id']: rule for rule in driver_data.get('rules', {})}
for result_index, result_data in enumerate(run_data['results'], 1):
# Code is not programmed to handle multiple locations, because ... Its a WIP
if (num_locations := len(result_data['locations'])) != 1:
raise NotImplementedError(
f'File {source} : run[{run_index}].results[{result_index}].locations[] '
f'size expected 1, actual {num_locations}')
rule_id = result_data['ruleId']
rule_data = rules[rule_id] if rules else {} # Only if rules is not empty
location_data = result_data['locations'][0]['physicalLocation']
file_path = location_data['artifactLocation']['uri']
message_lines = [
result_data['message']['text'],
''
f"Help: {rule_data.get('help', {}).get('text') or '<empty>'}",
f"URL: {rule_data.get('helpUri') or '<empty>'}"
]
if tags := rule_data.get('properties', {}).get('tags', []):
message_lines.append(f"Tags: {', '.join(clean_tag(tag) for tag in tags)}")
issue = {
'engineId': engine_id,
'primaryLocation': {
'filePath': file_path,
'message': '\n'.join(message_lines)
},
'ruleId': rule_id,
'severity': LEVEL_TO_SERVERITY[result_data['level']],
'type': REPORT_TYPE_BY_ENGINE.get(engine_key, DEFAULT_REPORT_TYPE)
}
# Converting location data
start = Position(
location_data['region']['startLine'] - 1,
location_data['region'].get('startColumn', 1) - 1)
end = Position(
location_data['region'].get('endLine', start.line + 1) - 1,
location_data['region'].get('endColumn', start.column + 1) - 1)
# Fix location data for some tools (data is wrong or missing)
if engine_key in {'ansible-lint', 'robocop'}:
# Ensure the end position makes sense or fix it
lines = Path(file_path).read_text(encoding='utf-8').split(os.linesep)
# If end is equal, to start, drop end since its not required
if start == end:
print(
f"Wrong indexation (0-indexed) {file_path}: "
f"(start={start} end={end}), fix it by removing end")
end = Position(0, 0)
# If end column is wrong (outside content), move it...
elif end.column and end.column > len(lines[end.line]):
prev_start, prev_end = start, end
if end.line + 1 < len(lines):
# Move end position to next line at column 0
end = Position(end.line + 1, 0)
elif start.line == 0:
# Move end position to same line at last column
end = Position(end.line, len(lines[end.line]))
else:
# Move start to previous line at same column
# Move end position to same line at column 0
start = Position(start.line - 1, start.column)
end = Position(end.line, 0)
print(
f"Wrong indexation (0-indexed) {file_path}: "
f"(start={prev_start} end={prev_end}), "
f"fix it by setting start={start} end={end}")
assert start.line >= 0, (result_index, result_data, start, end)
# If start line is empty, then move start up until its not (or its the first line)
# To fix java.lang.IllegalArgumentException: A 'startColumn' [line=41, lineOffset=0]
# cannot be provided when the line is empty
while start.line and not lines[start.line]:
# Move start one line before at column 0
start = Position(start.line - 1, 0)
# Lines are 1-indexed both in SARIF and Sonar Generic
# Columns are 1-indexed in SARIF 0-indexed in Sonar Generic
issue['primaryLocation']['textRange'] = {
'startLine': start.line + 1,
'startColumn': start.column
}
# End is optional
if end:
issue['primaryLocation']['textRange'].update({
'endLine': end.line + 1,
'endColumn': end.column
})
issues.append(issue)
target.write_text(json.dumps({'issues': issues}, indent=2), encoding='utf-8')
def clean_tag(value: str) -> str:
return f"'{value}'" if ' ' in value else value
if __name__ == '__main__':
main(sys.argv[1], sys.argv[2])
@davidfischer-ch
Copy link
Author

davidfischer-ch commented May 15, 2025

To fix : java.lang.IllegalArgumentException: A 'startColumn' [line=41, lineOffset=0] cannot be provided when the line is empty. Caused by https://github.com/SonarSource/sonarqube/blame/master/sonar-scanner-engine/src/main/java/org/sonar/scanner/externalissue/ExternalIssueImporter.java.

@davidfischer-ch
Copy link
Author

davidfischer-ch commented May 22, 2025

Note : robocop can now export to SonarQube format, but I don't know yet if there will be indexing errors. In that case, raise issues on robocop's project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment