Skip to content

Instantly share code, notes, and snippets.

@lopes
Last active November 19, 2025 10:45
Show Gist options
  • Select an option

  • Save lopes/ff5c6882460deabb2e43c8fb38d3eeee to your computer and use it in GitHub Desktop.

Select an option

Save lopes/ff5c6882460deabb2e43c8fb38d3eeee to your computer and use it in GitHub Desktop.
Cleans up rows in Chronicle SIEM Data Tables with expired dates. #chronicle #gcp #functions #python #siem #management
"""
Google Chronicle Data Table Cleaner
This script provides a mechanism for granular row-level expiration in Google
Chronicle Data Tables. It connects to the Chronicle API, iterates through all
data tables in a specified instance, and removes rows based on a custom
expiration date field.
FEATURES
- Row-level TTL: Deletes individual rows from Data Tables when their
custom expiration date has passed.
- Archiving: Expired rows are archived to a designated Data Table before
being deleted, preserving a record of removed data.
- Configurable: All necessary parameters, such as project details, API
credentials, and table names, are configured via environment variables.
HOW IT WORKS
1. Loads configuration from environment variables.
2. Authenticates and initializes a client for the Chronicle API.
3. Fetches a list of all Data Tables in the configured instance.
4. For each table (excluding the archive table itself):
a. It checks for a specific column (e.g., "expiration").
b. It iterates through each row, comparing the date in the expiration
column with the current date.
c. If a row is expired, it is added to an archive queue and deleted
from the source table.
d. Rows with invalid date formats are also marked for deletion.
5. After processing a table, all expired rows collected are logged
to the specified archive table.
6. (Optional) Sends a run summary notification to a Slack webhook.
This script is designed to be run as a scheduled GCP Cloud Function to
automate the cleanup process.
PREREQUISITES
- A Google Cloud Project with the Chronicle API enabled.
- A service account with appropriate permissions to read and modify
Chronicle Data Tables. The key for this service account must be provided.
- A designated "archive" Data Table to store expired rows. This table
must have columns for `source_table`, `payload`, and `expired_date`.
SETUP
Deploy this script as a GCP Cloud Function and configure the following
environment variables:
- CHRONICLE_PROJECT: Your GCP Project ID.
- CHRONICLE_LOCATION: The location of your Chronicle instance (e.g., "us").
- CHRONICLE_INSTANCE: Your Chronicle instance ID.
- CHRONICLE_KEY: The JSON service account key for authentication.
- ARCHIVE_TABLE_NAME: The display name of the Data Table for archiving.
- SLACK_WEBHOOK_URL: Optional. If provided, a run summary is sent to this
Slack webhook. If empty or not set, no notification is sent.
This script is expected to be put in the `main.py` file, whose function entry
point must be set to `main`. The `requirements.txt` file must be set as below:
```
functions-framework==3.*
requests
google-api-python-client
google-auth
```
Note that depending on the number of Data Tables in the instance and the number
of their rows, this script might take a while to run. This can lead to a time
out problem. If that's the case, adjust the Cloud Function time out appropriately
or add more resources (CPU and memory) to it.
When all is set, add a Cloud Scheduler to trigger this function periodically. It
can be triggered using the Function URL (as shown in the GCP console), and using
an HTTP POST with the following arguments:
Header:
```
Content-Type: application/json
User-Agent: Google-Cloud-Scheduler
```
Body (parameters are passed as environment variables, not via POST):
```
{}
```
REFERENCES
- https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.dataTables.dataTableRows
AUTHOR: Joe Lopes <lopes.id>
DATE: 2025-11-18
LICENSE: MIT
"""
import json
import logging
from datetime import datetime
from os import environ
from typing import Any, Dict, List, Tuple
import functions_framework
import requests
from google.auth.transport.requests import AuthorizedSession
from google.oauth2 import service_account
from requests.exceptions import HTTPError
# --- ENTRY POINT ---
@functions_framework.http
def main(request):
"""Entry point for the GCP Cloud Function (Orchestrator)."""
# 0. Basic configuration
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
authorization_scopes = [
"https://www.googleapis.com/auth/chronicle-backstory",
"https://www.googleapis.com/auth/malachite-ingestion",
"https://www.googleapis.com/auth/cloud-platform",
]
slack_webhook_url = environ.get("SLACK_WEBHOOK_URL")
# Collects error messages throughout the script to send via Slack
run_logs: List[Dict[str, str]] = []
try:
# 1. Load configuration
config = _load_configuration()
chronicle_url = f"https://{config['CHRONICLE_LOCATION']}-chronicle.googleapis.com/v1alpha/projects/{config['CHRONICLE_PROJECT']}/locations/{config['CHRONICLE_LOCATION']}/instances/{config['CHRONICLE_INSTANCE']}"
# 2. Initialize Chronicle API Client
session, logs = init_chronicle_client(
config["CHRONICLE_KEY"], authorization_scopes
)
run_logs.extend(logs)
# 3. Get all tables and process them sequentially
all_tables, logs = get_all_tables(session, chronicle_url)
run_logs.extend(logs)
if config["ARCHIVE_TABLE_NAME"] not in all_tables:
raise ValueError(
f"Archive table not found: '{config['ARCHIVE_TABLE_NAME']}'."
)
# 4. Create run configuration object
run_config = {
"chronicle_url": chronicle_url,
"archive_table_name": all_tables[config["ARCHIVE_TABLE_NAME"]][
"name"
].split("/")[-1],
"today": datetime.today().date(),
"date_format": "%Y-%m-%d",
"expiration_column": "expiration",
}
logging.info(f"Starting cleanup. Archive table: {config['ARCHIVE_TABLE_NAME']}")
status = []
for display_name, table_data in all_tables.items():
if display_name == config["ARCHIVE_TABLE_NAME"]:
continue
try:
column_keys = [
c.get("originalColumn") for c in table_data.get("columnInfo", [])
]
rows_dict, logs = get_table_data(session, run_config, display_name)
run_logs.extend(logs)
if not rows_dict or not column_keys:
logging.info(
f"Skipping table {display_name}: No data or failed fetch."
)
status.append((display_name, 0))
continue
# Convert the rows dictionary to a list of dictionaries
rows = []
for row_id, r_values in rows_dict.items():
row_dict = dict(zip(column_keys, r_values))
row_dict["row_id"] = row_id
rows.append(row_dict)
expired, logs = process_table(
session,
display_name,
rows,
column_keys,
run_config,
)
run_logs.extend(logs)
status.append((display_name, expired))
except Exception as e:
msg = f"CRITICAL: Processing of table {display_name} failed. Run halted. Error: {e}"
logging.critical(msg)
run_logs.append({"level": "CRITICAL", "message": msg})
raise # Re-raise to signal function failure
# 5. Final summary
summary = f"Cleanup finished. {len(status)} tables processed. Status: {status}"
logging.info(summary)
return summary, 200
except Exception as e:
error_msg = f"FATAL CLOUD FUNCTION ERROR: {e}"
logging.error(error_msg)
run_logs.append({"level": "CRITICAL", "message": error_msg})
return f"Error processing tables: {e}", 500
finally:
if slack_webhook_url:
send_run_summary(slack_webhook_url, run_logs)
# --- FUNCTIONS ---
def _load_configuration() -> Dict[str, str]:
"""Loads and validates required environment variables."""
config_keys = [
"CHRONICLE_PROJECT",
"CHRONICLE_LOCATION",
"CHRONICLE_INSTANCE",
"CHRONICLE_KEY",
"ARCHIVE_TABLE_NAME",
]
config: Dict[str, str] = {}
missing_keys: List[str] = []
for key in config_keys:
value = environ.get(key)
if not value:
missing_keys.append(key)
else:
config[key] = value
if missing_keys:
error_msg = f"Missing required environment variables: {', '.join(missing_keys)}"
raise ValueError(error_msg)
return config
def init_chronicle_client(
credentials_json: str, scopes: List[str]
) -> Tuple[AuthorizedSession, List[Dict[str, str]]]:
"""Obtains an authorized session using service account credentials."""
logs = []
try:
credentials = service_account.Credentials.from_service_account_info(
json.loads(credentials_json), scopes=scopes
)
session = AuthorizedSession(credentials)
return session, logs
except Exception as e:
msg = f"Failed to initialize Chronicle API Client. Check CHRONICLE_KEY format. Error: {e}"
logging.critical(msg)
logs.append({"level": "CRITICAL", "message": msg})
# In this specific case, we raise an exception, so the caller won't receive the session.
# The message is added for completeness in case the exception is caught later.
raise RuntimeError("API Client Initialization Failed") from e
def get_all_tables(
session: AuthorizedSession,
chronicle_url: str,
) -> Tuple[Dict[str, Dict[str, Any]], List[Dict[str, str]]]:
"""
Fetches all Data Tables and returns them as a dictionary keyed by displayName.
"""
url = f"{chronicle_url}/dataTables"
tables_dict: Dict[str, Dict[str, Any]] = {}
logs: List[Dict[str, str]] = []
page_token = None
while True:
params = {"pageSize": 100}
if page_token:
params["pageToken"] = page_token
try:
res = session.get(url, params=params)
res.raise_for_status()
data = res.json()
for table in data.get("dataTables", []):
if "displayName" in table:
tables_dict[table["displayName"]] = table
page_token = data.get("nextPageToken")
if not page_token:
break
except Exception as e:
msg = f"Unknown error fetching tables: {e}"
logs.append({"level": "CRITICAL", "message": msg})
raise
logging.info(f"Successfully fetched {len(tables_dict)} tables.")
return tables_dict, logs
def get_table_data(
session: AuthorizedSession,
run_config: Dict[str, Any],
table_name: str,
) -> Tuple[Dict[str, List[Any]], List[Dict[str, str]]]:
"""Fetches all rows for a given table and returns them as a dictionary."""
url = f"{run_config['chronicle_url']}/dataTables/{table_name}/dataTableRows"
logs: List[Dict[str, str]] = []
rows_dict: Dict[str, List[Any]] = {}
try:
res = session.get(url)
res.raise_for_status()
data = res.json()
for row in data.get("dataTableRows", []):
row_id = row.get("name", "").split("/")[-1]
if row_id:
rows_dict[row_id] = row.get("values", [])
return rows_dict, logs
except HTTPError as e:
msg = f"Failed to fetch data for table '{table_name}'. Status: {e.response.status_code}"
logs.append({"level": "ERROR", "message": msg})
return {}, logs
def process_table(
session: AuthorizedSession,
table_name: str,
rows: List[Dict[str, Any]],
column_keys: List[str],
run_config: Dict[str, Any],
) -> Tuple[int, List[Dict[str, str]]]:
"""Handles the expiration logic for a single table."""
logs: List[Dict[str, str]] = []
expired_rows_to_archive = []
removed_rows_count = 0
expiration_column = run_config["expiration_column"]
datefmt = run_config["date_format"]
if expiration_column not in column_keys:
logging.info(f"Skipping {table_name}: Missing '{expiration_column}' column.")
return 0, logs
else:
logging.info(f"Processing {table_name}.")
for row in rows:
expiration_str = row.get(expiration_column)
if not expiration_str:
continue
try:
expiration_date = datetime.strptime(str(expiration_str), datefmt).date()
if run_config["today"] > expiration_date:
logging.info(f"Table {table_name}: Expired row found, archiving: {row}")
expired_rows_to_archive.append(row)
delete_table_row(session, run_config, table_name, row["row_id"])
removed_rows_count += 1
except ValueError:
msg = f"Table '{table_name}': Invalid expiration format. Expected '{datefmt}', got '{expiration_str}'. Row data: {row}. Skipping row."
logging.error(msg)
logs.append({"level": "ERROR", "message": msg})
delete_table_row(session, run_config, table_name, row["row_id"])
removed_rows_count += 1
if expired_rows_to_archive:
archive_logs = log_to_archive(
session,
table_name,
expired_rows_to_archive,
column_keys,
run_config,
)
logs.extend(archive_logs)
logging.info(
f"Processed {table_name}: {removed_rows_count} rows expired/removed."
)
else:
logging.info(f"Processed {table_name}: No updates required.")
return removed_rows_count, logs
def delete_table_row(
session: AuthorizedSession,
run_config: Dict[str, Any],
table_name: str,
row_id: str,
) -> Tuple[Dict[str, str], List[Dict[str, str]]]:
"""Deletes a single row from the informed Data Table."""
url = (
f"{run_config['chronicle_url']}/dataTables/{table_name}/dataTableRows/{row_id}"
)
logs: List[Dict[str, str]] = []
try:
res = session.delete(url)
res.raise_for_status()
return {"status": "success"}, logs
except HTTPError:
msg = f"Failed to delete row: {row_id} from {table_name}"
logs.append({"level": "ERROR", "message": msg})
return {"status": "error"}, logs
def log_to_archive(
session: AuthorizedSession,
source_table: str,
expired_rows: List[Dict[str, Any]],
column_keys: List[str],
run_config: Dict[str, Any],
) -> List[Dict[str, str]]:
"""Appends expired rows to the designated archive table one by one."""
logs: List[Dict[str, str]] = []
if not expired_rows:
return logs
url = f"{run_config['chronicle_url']}/dataTables/{run_config['archive_table_name']}/dataTableRows"
archive_rows = []
for expired_row in expired_rows:
payload_parts = [
str(expired_row.get(key, ""))
for key in column_keys # Iterate over original columns to exclude 'row_id'
if key != run_config["expiration_column"]
]
payload = "|".join(payload_parts)
archive_row = [
source_table,
payload,
run_config["today"].strftime(run_config["date_format"]),
]
archive_rows.append(archive_row)
try:
for row in archive_rows:
res = session.post(url, json={"values": row})
res.raise_for_status()
logging.info(f"Archived {row} to {run_config['archive_table_name']}.")
except HTTPError as e:
msg = f"Table '{source_table}': Failed to archive {archive_rows}. Status: {e.response.status_code}. Error: {e.response.text}."
logs.append({"level": "CRITICAL", "message": msg})
raise
return logs
def send_run_summary(slack_webhook_url: str, logs: List[Dict[str, str]]):
"""
Formats and sends a summary to Slack, only if there are messages to report.
"""
if not logs:
logging.info("Run completed with no errors. No Slack notification needed.")
return
if not slack_webhook_url:
logging.warning("Slack webhook not configured. Summary notification lost.")
return
overall_status = "SUCCESS"
color = "#36A64F" # Green
if any(log["level"] == "CRITICAL" for log in logs):
overall_status = "FAILURE"
color = "#FF0000" # Red
elif any(log["level"] == "ERROR" for log in logs):
overall_status = "SUCCESS WITH ERRORS"
color = "#FFA500" # Orange
level_sort_order = {"CRITICAL": 0, "ERROR": 1}
logs.sort(key=lambda log: level_sort_order.get(log.get("level", ""), 2))
details = [f"*{log['level']}*: {log['message']}" for log in logs]
# Slack has a limit on attachment text size, truncate if necessary
details_text = "\n".join(details)
if len(details_text) > 3000:
truncation_marker = "\n... (message truncated)"
allowed_length = 3000 - len(truncation_marker)
details_text = details_text[:allowed_length] + truncation_marker
payload = {
"text": f"*Chronicle Table Cleanup Run: {overall_status}*",
"attachments": [
{
"color": color,
"fields": [
{"title": "Run Status", "value": overall_status, "short": True},
{
"title": "Timestamp",
"value": datetime.now().isoformat(),
"short": True,
},
{"title": "Summary", "value": details_text, "short": False},
],
}
],
}
try:
requests.post(slack_webhook_url, json=payload, timeout=10)
except requests.exceptions.RequestException as e:
logging.error(f"Failed to send final Slack summary notification. Error: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment