Created
April 16, 2025 10:30
-
-
Save dstreefkerk/5557d0c6a7f84fed3a37a6d30a169fa2 to your computer and use it in GitHub Desktop.
POC code only. Do not use.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sigma to Microsoft Sentinel KQL Converter | |
----------------------------------------- | |
This script converts Sigma detection rules to Microsoft Sentinel KQL (Kusto Query Language) format | |
using the SOC Prime translation service. | |
SOC Prime, Inc (https://socprime.com) provides the translation API (uncoder.io) that powers this converter. | |
This script is a client for that service and is not affiliated with or endorsed by SOC Prime, Inc. | |
The converter supports two types of output: | |
1. KQL Queries - For use in Log Analytics and Microsoft Sentinel | |
2. KQL Analytics Rules - For creating detection rules in Microsoft Sentinel | |
Features: | |
- Converts Sigma rules to Microsoft Sentinel KQL format | |
- Supports both KQL queries and KQL analytics rules | |
- Handles network issues with retry logic | |
- Provides detailed logging for troubleshooting | |
- Includes file I/O utilities for reading/writing rules | |
- Offers a command-line interface for easy usage | |
Usage examples: | |
1. As a module in Python code: | |
```python | |
from sigma_converter import convert_sigma_to_sentinel_kql | |
# Convert a sigma rule to KQL | |
success, kql = convert_sigma_to_sentinel_kql(sigma_rule_content) | |
if success: | |
print(kql) | |
``` | |
2. From the command line: | |
```bash | |
# Convert a Sigma rule file to a KQL query | |
python sigma_converter.py -i rule.yaml -o query.kql | |
# Convert a Sigma rule file to a KQL analytics rule | |
python sigma_converter.py -i rule.yaml -o rule.kql -t rule | |
# Enable debug output | |
python sigma_converter.py -i rule.yaml -o query.kql -d | |
# Run the example with a built-in Sigma rule | |
python sigma_converter.py --example | |
``` | |
Dependencies: | |
- requests | |
- json | |
- logging | |
- argparse | |
Note: This script relies on external API service from SOC Prime, Inc which may require | |
proper attribution, have usage limits, or be subject to terms of service. Always check | |
the service provider's terms before using this script in production environments. | |
""" | |
import requests | |
import json | |
import time | |
import logging | |
from typing import Optional, Dict, Any, Tuple, Union, List | |
def setup_logger(): | |
"""Set up a basic logger for the module.""" | |
logger = logging.getLogger('sigma_converter') | |
if not logger.handlers: | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
return logger | |
def convert_sigma_to_sentinel_kql( | |
sigma_rule: str, | |
target_type: str = "query", | |
max_retries: int = 3, | |
retry_delay: float = 1.0, | |
timeout: float = 10.0, | |
debug: bool = False | |
) -> Tuple[bool, Union[str, Dict[str, Any]]]: | |
""" | |
Convert a Sigma rule to Microsoft Sentinel KQL (Kusto Query Language). | |
Args: | |
sigma_rule (str): The Sigma rule content as a string. | |
target_type (str): The target type - either "query" or "rule". Defaults to "query". | |
max_retries (int): Maximum number of retries on failure. Defaults to 3. | |
retry_delay (float): Delay between retries in seconds. Defaults to 1.0. | |
timeout (float): Request timeout in seconds. Defaults to 10.0. | |
Returns: | |
Tuple[bool, Union[str, Dict[str, Any]]]: A tuple containing: | |
- bool: Success indicator (True if successful, False otherwise) | |
- Union[str, Dict[str, Any]]: Either the KQL query string if successful, | |
or an error dictionary if failed | |
""" | |
# Set up logging | |
logger = setup_logger() | |
if debug: | |
logger.setLevel(logging.DEBUG) | |
# Validate input parameters | |
if not sigma_rule or not isinstance(sigma_rule, str): | |
logger.error("Invalid sigma rule provided") | |
return False, {"error": "Invalid sigma rule. Must be a non-empty string."} | |
# Determine the target platform ID based on the specified target type | |
if target_type.lower() == "query": | |
target_platform_id = "sentinel-kql-query" | |
logger.debug(f"Using target platform: sentinel-kql-query") | |
elif target_type.lower() == "rule": | |
target_platform_id = "sentinel-kql-rule" | |
logger.debug(f"Using target platform: sentinel-kql-rule") | |
else: | |
logger.error(f"Invalid target_type provided: {target_type}") | |
return False, {"error": f"Invalid target_type: {target_type}. Must be 'query' or 'rule'."} | |
# API endpoint for the translation service | |
url = "https://siemc.socprime.com/translate" | |
# Prepare the request payload | |
payload = { | |
"text": sigma_rule, | |
"source_platform_id": "sigma", | |
"target_platform_id": target_platform_id | |
} | |
# Prepare headers | |
headers = { | |
"accept": "*/*", | |
"content-type": "application/json", | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "cross-site" | |
} | |
# Attempt the request with retries | |
for attempt in range(max_retries): | |
try: | |
logger.debug(f"Attempt {attempt+1}/{max_retries}: Sending request to {url}") | |
# Send the request | |
response = requests.post( | |
url=url, | |
headers=headers, | |
json=payload, | |
timeout=timeout | |
) | |
# Check if the request was successful | |
if response.status_code == 200: | |
logger.debug("Request successful with status code 200") | |
try: | |
# Try to parse the response as JSON | |
result = response.json() | |
# Debug log the structure of the response | |
if debug: | |
logger.debug(f"Response structure: {json.dumps(result, indent=2)[:500]}...") | |
# Extract the KQL content based on response structure | |
# The structure needs to be adjusted based on actual API response | |
if isinstance(result, dict): | |
# Check various possible structures | |
if "data" in result and "content" in result["data"]: | |
kql_content = result["data"]["content"] | |
elif "data" in result and isinstance(result["data"], str): | |
kql_content = result["data"] | |
elif "content" in result: | |
kql_content = result["content"] | |
elif "translation" in result: | |
kql_content = result["translation"] | |
elif "result" in result: | |
kql_content = result["result"] | |
else: | |
# If we can't find a specific field, return the whole result | |
logger.warning("Unknown response structure, returning entire response") | |
return True, result | |
# Clean up the KQL (remove unnecessary blank lines) | |
if isinstance(kql_content, str): | |
# Remove blank lines that only contain whitespace | |
cleaned_kql = "\n".join([line for line in kql_content.splitlines() | |
if line.strip()]) | |
logger.debug("Successfully extracted and cleaned KQL content") | |
return True, cleaned_kql | |
else: | |
logger.debug("Extracted non-string KQL content") | |
return True, kql_content | |
else: | |
# If result is not a dict, return as is | |
logger.debug("Response is not a dictionary") | |
return True, result | |
except json.JSONDecodeError as json_err: | |
# If not JSON, log the issue and return the raw text | |
logger.warning(f"Response is not valid JSON: {str(json_err)}") | |
return True, response.text | |
else: | |
error_msg = f"Request failed with status code: {response.status_code}" | |
logger.error(error_msg) | |
# Try to get more error details | |
try: | |
error_detail = response.json() | |
error_msg = f"{error_msg}, Details: {error_detail}" | |
except: | |
if response.text: | |
error_msg = f"{error_msg}, Response: {response.text[:200]}..." | |
# If this isn't the last attempt, retry | |
if attempt < max_retries - 1: | |
retry_time = retry_delay * (attempt + 1) # Exponential backoff | |
logger.info(f"Retrying in {retry_time} seconds...") | |
time.sleep(retry_time) | |
continue | |
else: | |
logger.error("Maximum retries exceeded") | |
return False, {"error": error_msg, "status_code": response.status_code} | |
except requests.RequestException as e: | |
# Handle request exceptions (timeout, connection issues, etc.) | |
logger.error(f"Request exception: {str(e)}") | |
if attempt < max_retries - 1: | |
retry_time = retry_delay * (attempt + 1) # Exponential backoff | |
logger.info(f"Retrying in {retry_time} seconds...") | |
time.sleep(retry_time) | |
continue | |
else: | |
logger.error("Maximum retries exceeded") | |
return False, {"error": f"Request exception: {str(e)}"} | |
# If we reached here, all retries failed | |
return False, {"error": "Maximum retries exceeded"} | |
def read_sigma_from_file(file_path: str) -> Optional[str]: | |
""" | |
Read a Sigma rule from a file. | |
Args: | |
file_path (str): Path to the Sigma rule file | |
Returns: | |
Optional[str]: The content of the Sigma rule file, or None if an error occurred | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
return file.read() | |
except Exception as e: | |
print(f"Error reading Sigma rule file: {str(e)}") | |
return None | |
def save_kql_to_file(kql_content: str, output_path: str) -> bool: | |
""" | |
Save the KQL query to a file. | |
Args: | |
kql_content (str): The KQL query content | |
output_path (str): Path to save the KQL file | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
with open(output_path, 'w', encoding='utf-8') as file: | |
file.write(kql_content) | |
print(f"KQL successfully saved to {output_path}") | |
return True | |
except Exception as e: | |
print(f"Error saving KQL to file: {str(e)}") | |
return False | |
# Example usage | |
def example(): | |
# Setup logging for the example | |
logger = setup_logger() | |
logger.setLevel(logging.INFO) | |
print("\n===== Sigma to Sentinel KQL Converter Example =====\n") | |
# Example Sigma rule | |
sigma_rule = """ | |
title: Office Macro File Download | |
status: test | |
description: Detects the creation of a new office macro files on the systems via an application (browser, mail client). | |
references: | |
- https://github.com/redcanaryco/atomic-red-team/blob/f339e7da7d05f6057fdfcdd3742bfcf365fee2a9/atomics/T1566.001/T1566.001.md | |
author: Nasreddine Bencherchali (Nextron Systems) | |
logsource: | |
category: file_event | |
product: windows | |
detection: | |
selection_processes: | |
Image|endswith: | |
# Email clients | |
- '\\RuntimeBroker.exe' | |
- '\\outlook.exe' | |
# Browsers | |
- '\\chrome.exe' | |
- '\\firefox.exe' | |
- '\\msedge.exe' | |
selection_ext: | |
- TargetFilename|endswith: | |
- '.docm' | |
- '.xlsm' | |
- TargetFilename|contains: | |
- '.docm:Zone' | |
- '.xlsm:Zone' | |
condition: all of selection_* | |
falsepositives: | |
- Legitimate macro files downloaded from the internet | |
level: medium | |
""" | |
print("1. Converting Sigma rule to KQL Query") | |
# Convert to KQL query | |
success, result = convert_sigma_to_sentinel_kql( | |
sigma_rule=sigma_rule, | |
target_type="query", | |
debug=True | |
) | |
if success: | |
print("\nConversion to KQL Query successful!") | |
print("\nKQL Query:") | |
print("-" * 40) | |
print(result) | |
print("-" * 40) | |
# Optionally save to a file | |
# save_kql_to_file(result, "sigma_rule_query.kql") | |
else: | |
print("\nConversion to KQL Query failed!") | |
print("Error:", result) | |
print("\n2. Converting Sigma rule to KQL Analytics Rule") | |
# Convert to KQL analytics rule | |
success, rule_result = convert_sigma_to_sentinel_kql( | |
sigma_rule=sigma_rule, | |
target_type="rule", | |
debug=True | |
) | |
if success: | |
print("\nConversion to KQL Analytics Rule successful!") | |
print("\nKQL Analytics Rule:") | |
print("-" * 40) | |
print(rule_result) | |
print("-" * 40) | |
# Optionally save to a file | |
# save_kql_to_file(rule_result, "sigma_rule_analytics.kql") | |
else: | |
print("\nConversion to KQL Analytics Rule failed!") | |
print("Error:", rule_result) | |
# Example of using with a file | |
def file_example(input_path, output_path=None, target_type="query"): | |
"""Process a Sigma rule file and convert it to KQL""" | |
# Read the Sigma rule from file | |
sigma_content = read_sigma_from_file(input_path) | |
if not sigma_content: | |
return | |
# Convert to KQL | |
success, result = convert_sigma_to_sentinel_kql( | |
sigma_rule=sigma_content, | |
target_type=target_type | |
) | |
if success: | |
print(f"Successfully converted {input_path} to KQL {target_type}") | |
# Save to file if output path is provided | |
if output_path: | |
save_kql_to_file(result, output_path) | |
else: | |
print("\nKQL Result:") | |
print("-" * 40) | |
print(result) | |
print("-" * 40) | |
else: | |
print(f"Failed to convert {input_path} to KQL") | |
print("Error:", result) | |
def main(): | |
"""Command-line interface for the Sigma to KQL converter.""" | |
import argparse | |
# Set up the argument parser | |
parser = argparse.ArgumentParser( | |
description="Convert Sigma rules to Microsoft Sentinel KQL", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
# Convert a Sigma rule file to a KQL query | |
python sigma_converter.py -i rule.yaml -o query.kql | |
# Convert a Sigma rule file to a KQL analytics rule | |
python sigma_converter.py -i rule.yaml -o rule.kql -t rule | |
# Enable debug output | |
python sigma_converter.py -i rule.yaml -o query.kql -d | |
# Run the example with a built-in Sigma rule | |
python sigma_converter.py --example | |
""" | |
) | |
# Add arguments | |
parser.add_argument('-i', '--input', help='Input Sigma rule file') | |
parser.add_argument('-o', '--output', help='Output KQL file') | |
parser.add_argument('-t', '--type', choices=['query', 'rule'], default='query', | |
help='KQL output type (default: query)') | |
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging') | |
parser.add_argument('-e', '--example', action='store_true', help='Run the example with built-in Sigma rule') | |
# Parse arguments | |
args = parser.parse_args() | |
# Set up logging | |
logger = setup_logger() | |
if args.debug: | |
logger.setLevel(logging.DEBUG) | |
else: | |
logger.setLevel(logging.INFO) | |
# Process the command | |
if args.example: | |
# Run the example | |
example() | |
elif args.input: | |
# Process the input file | |
if not args.output: | |
print("Warning: No output file specified. KQL will be printed to console.") | |
file_example( | |
input_path=args.input, | |
output_path=args.output, | |
target_type=args.type | |
) | |
else: | |
# No valid command provided | |
parser.print_help() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment