Created
March 27, 2025 11:46
-
-
Save ion-storm/7b0f9781730ea56c310e4758a57b8e77 to your computer and use it in GitHub Desktop.
Splunk to sigma
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import argparse | |
from typing import Dict, List, Set | |
class SplunkToSigmaParser: | |
def __init__(self): | |
self.index_pattern = r'index=(\S+)' | |
self.sourcetype_pattern = r'sourcetype=(\S+)' | |
self.datamodel_pattern = r'datamodel=(\S+)' # New: Capture datamodel in base search | |
self.field_value_pattern = r'(\w+(?:\.\w+)?)=(?:"([^"]+)"|(\S+))' # Updated: Allow datamodel.field syntax | |
self.in_pattern = r'(\w+(?:\.\w+)?)\s+IN\s+\((.*?)\)' # Updated: Allow datamodel.field in IN clause | |
self.tstats_pattern = r'\|?\s*tstats\s+(?:summariesonly=\S+\s+)?(\w+)\s+(?:from\s+datamodel=(\S+)\s+)?where\s+(.+?)(?:\s+by\s+(.+))?(?=\s*\|?|$)' | |
self.subsearch_pattern = r'\[\s*search\s+(.+?)\s*\]' | |
self.keyword_pattern = r'(?<!\w)(?:\"[^\"]+\"|\*\S+\*|\S+)(?!\w|\s*=)' | |
self.stats_pattern = r'\|?\s*stats\s+(\w+)\s+(?:by\s+(.+))?(?=\s*\|?|$)' | |
self.rex_pattern = r'\|?\s*rex\s+(?:field=\S+\s+)?\"(.+?)\"' | |
self.eval_pattern = r'\|?\s*eval\s+(\w+)=(?:"([^"]+)"|(\S+?))(?:\s|$)' | |
self.fields_pattern = r'fields\s+(.+?)(?:\s|$|\|)' | |
self.excluded_keywords = {"where", "by", "count", "stats", "eval", "rex", "tstats", "fields", "datamodel"} | |
def parse_query(self, query: str) -> Dict[str, List]: | |
result = { | |
"fields": [], | |
"values": [], | |
"keywords": [], | |
"regex_fields": [], | |
"regex_patterns": [], | |
"cidr_fields": [], | |
"cidr_values": [], | |
"grouping_fields": [], | |
"source": None, | |
"datamodel": None | |
} | |
seen_fields: Set[str] = set() | |
seen_values: Set[str] = set() | |
query = " ".join(query.split()) | |
# Extract index | |
index_match = re.search(self.index_pattern, query) | |
if index_match: | |
result["source"] = index_match.group(1) | |
# Extract sourcetype | |
sourcetype_match = re.search(self.sourcetype_pattern, query) | |
if sourcetype_match and "sourcetype" not in seen_fields: | |
result["fields"].append("sourcetype") | |
result["values"].append(sourcetype_match.group(1)) | |
seen_fields.add("sourcetype") | |
seen_values.add(sourcetype_match.group(1)) | |
# Extract datamodel from base search | |
datamodel_match = re.search(self.datamodel_pattern, query) | |
if datamodel_match and not result["datamodel"]: | |
result["datamodel"] = datamodel_match.group(1) | |
# Handle tstats with datamodel | |
tstats_match = re.search(self.tstats_pattern, query, re.IGNORECASE) | |
if tstats_match: | |
if tstats_match.group(2): | |
result["datamodel"] = tstats_match.group(2) | |
where_clause = tstats_match.group(3) | |
by_clause = tstats_match.group(4) | |
self._parse_where_clause(where_clause, result, seen_fields, seen_values) | |
if by_clause: | |
self._parse_by_clause(by_clause, result, seen_fields) | |
return result | |
# Handle subsearches | |
subsearch_match = re.search(self.subsearch_pattern, query, re.IGNORECASE) | |
if subsearch_match: | |
subquery = subsearch_match.group(1) | |
self._parse_simple_search(subquery, result, seen_fields, seen_values) | |
# Extract eval | |
eval_matches = re.finditer(self.eval_pattern, query, re.IGNORECASE) | |
for match in eval_matches: | |
field = match.group(1) | |
value = match.group(2) if match.group(2) else match.group(3) | |
if field not in seen_fields: | |
result["fields"].append(field) | |
result["values"].append(value) | |
seen_fields.add(field) | |
seen_values.add(value) | |
# Extract multiple rex | |
rex_matches = re.finditer(self.rex_pattern, query, re.IGNORECASE) | |
for rex_match in rex_matches: | |
regex = rex_match.group(1) | |
field_match = re.search(r'\?\<(\w+)\>', regex) | |
if field_match: | |
field_name = field_match.group(1) | |
pattern = regex.split(f"?<{field_name}>")[1] | |
else: | |
field_name = "regex_match" | |
pattern = regex | |
if field_name not in seen_fields: | |
result["regex_fields"].append(field_name) | |
result["regex_patterns"].append(pattern) | |
seen_fields.add(field_name) | |
# Handle stats | |
stats_match = re.search(self.stats_pattern, query, re.IGNORECASE) | |
if stats_match: | |
by_clause = stats_match.group(2) | |
if by_clause: | |
self._parse_by_clause(by_clause, result, seen_fields) | |
pre_stats = query.split("|")[0].strip() | |
self._parse_simple_search(pre_stats, result, seen_fields, seen_values) | |
return result | |
self._parse_simple_search(query.split("|")[0].strip(), result, seen_fields, seen_values) | |
return result | |
def _parse_where_clause(self, clause: str, result: Dict[str, List], seen_fields: Set[str], seen_values: Set[str]): | |
in_match = re.search(self.in_pattern, clause, re.IGNORECASE) | |
if in_match: | |
field = in_match.group(1).split(".")[-1] # Strip datamodel prefix | |
values = [v.strip().strip('"') for v in in_match.group(2).split(",")] | |
if field not in seen_fields: | |
result["fields"].append(field) | |
seen_fields.add(field) | |
for value in values: | |
if "/" in value and re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', value): | |
result["cidr_fields"].append(field) | |
result["cidr_values"].append(value) | |
elif value not in seen_values: | |
result["values"].append(value) | |
seen_values.add(value) | |
return | |
field_values = re.findall(self.field_value_pattern, clause) | |
for field, quoted_val, unquoted_val in field_values: | |
field = field.split(".")[-1] # Strip datamodel prefix | |
if field not in seen_fields: | |
result["fields"].append(field) | |
seen_fields.add(field) | |
value = quoted_val if quoted_val else unquoted_val | |
if "/" in value and re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', value): | |
result["cidr_fields"].append(field) | |
result["cidr_values"].append(value) | |
elif value not in seen_values: | |
result["values"].append(value) | |
seen_values.add(value) | |
def _parse_by_clause(self, clause: str, result: Dict[str, List], seen_fields: Set[str]): | |
fields = [f.strip().split(".")[-1] for f in clause.split(",")] # Strip datamodel prefix | |
for field in fields: | |
if field not in seen_fields: | |
result["grouping_fields"].append(field) | |
seen_fields.add(field) | |
def _parse_simple_search(self, query: str, result: Dict[str, List], seen_fields: Set[str], seen_values: Set[str]): | |
in_match = re.search(self.in_pattern, query, re.IGNORECASE) | |
if in_match: | |
field = in_match.group(1).split(".")[-1] # Strip datamodel prefix | |
values = [v.strip().strip('"') for v in in_match.group(2).split(",")] | |
if field not in seen_fields: | |
result["fields"].append(field) | |
seen_fields.add(field) | |
for value in values: | |
if "/" in value and re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', value): | |
result["cidr_fields"].append(field) | |
result["cidr_values"].append(value) | |
elif value not in seen_values: | |
result["values"].append(value) | |
seen_values.add(value) | |
field_values = re.findall(self.field_value_pattern, query) | |
for field, quoted_val, unquoted_val in field_values: | |
field = field.split(".")[-1] # Strip datamodel prefix | |
if field not in seen_fields: | |
result["fields"].append(field) | |
seen_fields.add(field) | |
value = quoted_val if quoted_val else unquoted_val | |
if "/" in value and re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', value): | |
result["cidr_fields"].append(field) | |
result["cidr_values"].append(value) | |
elif value not in seen_values: | |
result["values"].append(value) | |
seen_values.add(value) | |
fields_match = re.search(self.fields_pattern, query, re.IGNORECASE) | |
if fields_match: | |
fields_list = [f.strip().split(".")[-1] for f in fields_match.group(1).split(",")] # Strip datamodel prefix | |
for field in fields_list: | |
if field not in seen_fields: | |
result["fields"].append(field) | |
seen_fields.add(field) | |
keywords = re.findall(self.keyword_pattern, query) | |
for kw in keywords: | |
if (kw not in seen_values and | |
not any(kw in f for f in result["fields"]) and | |
kw not in result["keywords"] and | |
kw.lower() not in self.excluded_keywords): | |
result["keywords"].append(kw) | |
def to_sigma_stub(self, query: str) -> str: | |
parsed = self.parse_query(query) | |
sigma = "title: Autogenerated Sigma Rule\n" | |
sigma += "logsource:\n" | |
sigma += " product: splunk\n" | |
if parsed["source"]: | |
sigma += f" service: {parsed['source']}\n" | |
if parsed["datamodel"]: | |
sigma += f" category: {parsed['datamodel']}\n" | |
sigma += "detection:\n" | |
sigma += " selection:\n" | |
for field, value in zip(parsed["fields"], parsed["values"]): | |
if field not in parsed["cidr_fields"]: | |
sigma += f" {field}: \"{value}\"\n" | |
for field, value in zip(parsed["cidr_fields"], parsed["cidr_values"]): | |
sigma += f" {field}|cidr: \"{value}\"\n" | |
if parsed["keywords"]: | |
sigma += " keywords:\n" | |
for kw in parsed["keywords"]: | |
sigma += f" - \"{kw}\"\n" | |
for field, pattern in zip(parsed["regex_fields"], parsed["regex_patterns"]): | |
sigma += f" {field}|re: \"{pattern}\"\n" | |
sigma += " condition: selection\n" | |
return sigma | |
def main(): | |
parser = argparse.ArgumentParser(description="Convert Splunk queries to Sigma rules.") | |
parser.add_argument('-q', '--query', type=str, help='Splunk query in quotes (e.g., "index=main error")') | |
parser.add_argument('-f', '--file', type=str, help='Path to a text file containing a Splunk query') | |
args = parser.parse_args() | |
if not args.query and not args.file: | |
print("Error: Please provide a query (-q) or a file path (-f).") | |
print("Usage: python splunk_to_sigma.py -q 'index=main error' OR -f query.txt") | |
return | |
splunk_parser = SplunkToSigmaParser() | |
if args.query: | |
query = args.query.strip("'\"") # Remove surrounding quotes | |
sigma_rule = splunk_parser.to_sigma_stub(query) | |
print("Sigma Rule:\n", sigma_rule) | |
elif args.file: | |
try: | |
with open(args.file, 'r') as f: | |
query = f.read().strip() | |
sigma_rule = splunk_parser.to_sigma_stub(query) | |
print("Sigma Rule:\n", sigma_rule) | |
except FileNotFoundError: | |
print(f"Error: File '{args.file}' not found.") | |
except Exception as e: | |
print(f"Error reading file: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment