Last active
April 30, 2025 20:26
-
-
Save renini/b09fc1a3d9a54b7a9caa598fd4561ffc to your computer and use it in GitHub Desktop.
winlogbeat_subscription_stats.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import xml.etree.ElementTree as ET | |
| from elasticsearch import Elasticsearch | |
| from elasticsearch.helpers import scan | |
| import re | |
| # Initialize Elasticsearch client | |
| #es = Elasticsearch("http://localhost:9200") | |
| def parse_event_query(xml_str): | |
| """ | |
| Parse a <Select> element from an Event Viewer subscription XML. | |
| Extracts log_name, provider name, and event IDs. | |
| """ | |
| root = ET.fromstring(xml_str.strip()) | |
| queries = [] | |
| for select in root.findall(".//{*}Select"): | |
| log_name = select.attrib.get("Path") | |
| query = select.text.strip() | |
| # Extract Event IDs using regex | |
| event_ids = list(map(int, re.findall(r"EventID=(\d+)", query))) | |
| # Extract Provider Name | |
| provider_match = re.search(r'Provider\[@Name="([^"]+)"\]', query) | |
| provider = provider_match.group(1) if provider_match else None | |
| queries.append({ | |
| "log_name": log_name, | |
| "event_ids": event_ids, | |
| "provider": provider | |
| }) | |
| return queries | |
| def build_elasticsearch_query(query_info): | |
| must_clauses = [] | |
| if query_info["log_name"]: | |
| must_clauses.append({"term": {"log_name": query_info["log_name"]}}) | |
| if query_info["provider"]: | |
| must_clauses.append({"term": {"winlog.provider_name": query_info["provider"]}}) | |
| if query_info["event_ids"]: | |
| if len(query_info["event_ids"]) == 1: | |
| must_clauses.append({"term": {"event_id": query_info["event_ids"][0]}}) | |
| else: | |
| must_clauses.append({"terms": {"event_id": query_info["event_ids"]}}) | |
| return { | |
| "query": { | |
| "bool": { | |
| "must": must_clauses | |
| } | |
| } | |
| } | |
| def get_doc_stats(index, query): | |
| """ | |
| Uses the Elasticsearch _search API with size 0 and aggregations | |
| to get doc count and total size in bytes. | |
| """ | |
| es_query = { | |
| **query, | |
| "size": 0, | |
| "aggs": { | |
| "total_size": { | |
| "sum": { | |
| "field": "_size" | |
| } | |
| } | |
| } | |
| } | |
| res = es.search(index=index, body=es_query) | |
| count = res["hits"]["total"]["value"] | |
| size = res["aggregations"]["total_size"]["value"] if "aggregations" in res else 0 | |
| return count, size | |
| def main(): | |
| xml_input = """ | |
| <QueryList> | |
| <Query Id="0" Path="Security"> | |
| <Select Path="Security">*[System[(EventID=4624)]]</Select> | |
| </Query> | |
| <Query Id="1" Path="System"> | |
| <Select Path="System">*[System[(EventID=6005)]]</Select> | |
| </Query> | |
| </QueryList> | |
| """ | |
| index = "winlogbeat-*" | |
| queries = parse_event_query(xml_input) | |
| for q in queries: | |
| es_query = build_elasticsearch_query(q) | |
| print(es_query) | |
| count, size = get_doc_stats(index, es_query) | |
| print(f"Log: {q['log_name']}") | |
| print(f"Provider: {q['provider']}") | |
| print(f"EventIDs: {q['event_ids']}") | |
| print(f"→ Count: {count}, Size: {int(size)} bytes\n") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment