Skip to content

Instantly share code, notes, and snippets.

@justinabrahms
Last active October 7, 2025 04:05
Show Gist options
  • Save justinabrahms/3cc71036cb5d151b34736280ca9192da to your computer and use it in GitHub Desktop.
Save justinabrahms/3cc71036cb5d151b34736280ca9192da to your computer and use it in GitHub Desktop.

New Relic Alert Condition Team Tagger

A utility for bulk-tagging New Relic alert conditions by team.

Features

  • Queries New Relic for all alert conditions without a "team" tag
  • Presents conditions one by one for interactive tagging
  • Smart team name matching:
    • Type full team name for exact match
    • Type first two letters to autocomplete from existing teams
    • Create new team names on the fly
  • Automatically tags resources in New Relic
  • Shows progress and allows skipping conditions

Setup

Set environment variables:

export NEW_RELIC_API_KEY="your-api-key-here"
export NEW_RELIC_ACCOUNT_ID="your-account-id-here"

To get your API key:

Usage

Run the script with uv:

uv run alert-tagger

Or directly:

./alert_tagger.py

For each untagged alert condition, you'll see:

  • Alert name
  • Alert ID
  • Policy ID
  • Enabled status

Then you can:

  • Type a team name (full or first 2 letters for autocomplete)
  • Type s to skip this condition
  • Type q to quit

Examples

Enter team name (or 's' to skip, 'q' to quit): platform
Using team: platform
Tagging condition 12345 with team 'platform'...
✓ Successfully tagged

Enter team name (or 's' to skip, 'q' to quit): fr
Using team: frontend
Tagging condition 12346 with team 'frontend'...
✓ Successfully tagged

Enter team name (or 's' to skip, 'q' to quit): s
[Skipped]

How It Works

  1. Queries New Relic NerdGraph API for all NRQL alert conditions
  2. Filters out conditions that already have a "team" tag
  3. Builds an index of existing teams for autocomplete
  4. Presents each untagged condition interactively
  5. Updates the condition's entity tags via the NerdGraph tagging mutation

Requirements

  • Python 3.8+
  • uv (recommended) or pip
  • New Relic User API key
  • Account ID for the New Relic account
#!/usr/bin/env python3
"""
New Relic Alert Condition Team Tagger
This utility helps bulk-tag New Relic alert conditions with team information.
It queries for alert conditions without team tags and provides an interactive
interface to assign teams.
"""
import os
import sys
import json
import requests
from typing import List, Dict, Optional, Set
from collections import defaultdict
from pathlib import Path
class NewRelicClient:
"""Client for interacting with New Relic NerdGraph API"""
def __init__(self, api_key: str, account_id: str):
self.api_key = api_key
self.account_id = account_id
self.endpoint = "https://api.newrelic.com/graphql"
self.headers = {
"Content-Type": "application/json",
"API-Key": api_key
}
self._conditions_cache = None
self._policy_cache = {}
def query(self, query: str, variables: Optional[Dict] = None) -> Dict:
"""Execute a GraphQL query against NerdGraph"""
payload = {"query": query}
if variables:
payload["variables"] = variables
response = requests.post(
self.endpoint,
json=payload,
headers=self.headers
)
response.raise_for_status()
return response.json()
def get_alert_conditions(self) -> List[Dict]:
"""Query all alert conditions for the account (cached)"""
if self._conditions_cache is None:
self._conditions_cache = self._get_alert_conditions_direct()
return self._conditions_cache
def _get_alert_conditions_direct(self) -> List[Dict]:
"""Query alert conditions and resolve their entity GUIDs to get tags"""
all_conditions = []
# Use entity search to find ALL conditions (this catches everything)
print("Fetching all alert conditions via entity search...")
entity_conditions = self._fetch_conditions_via_entity_search()
all_conditions.extend(entity_conditions)
print(f" Found {len(entity_conditions)} conditions via entity search")
print(f"\nFetched {len(all_conditions)} total alert conditions")
# Conditions already have tags from entity search
return all_conditions
def _fetch_conditions_via_entity_search(self) -> List[Dict]:
"""Fetch all alert conditions using entity search (catches all types)"""
conditions = []
cursor = None
page = 1
while True:
query = """
query($query: String!, $cursor: String) {
actor {
entitySearch(query: $query) {
results(cursor: $cursor) {
entities {
guid
name
type
tags {
key
values
}
}
nextCursor
}
count
}
}
}
"""
# Search for all CONDITION entities in this account
search_query = f"type = 'CONDITION' AND tags.accountId = '{self.account_id}'"
variables = {"query": search_query}
if cursor:
variables["cursor"] = cursor
result = self.query(query, variables)
if "errors" in result:
print(f" GraphQL errors: {result['errors']}")
break
search_result = result.get("data", {}).get("actor", {}).get("entitySearch", {})
results = search_result.get("results", {})
batch = results.get("entities", [])
next_cursor = results.get("nextCursor")
total_count = search_result.get("count")
if page == 1 and total_count:
print(f" Total conditions available: {total_count}")
print(f" Page {page}: fetched {len(batch)} conditions (total so far: {len(conditions) + len(batch)})")
# Extract condition info from entity tags
for entity in batch:
tags_dict = {tag["key"]: tag["values"] for tag in entity.get("tags", [])}
condition = {
"guid": entity["guid"],
"name": entity["name"],
"type": entity["type"],
"id": tags_dict.get("id", [None])[0] or tags_dict.get("nr.alerts.conditionId", [None])[0],
"policyId": tags_dict.get("policyId", [None])[0] or tags_dict.get("nr.alerts.policyId", [None])[0],
"enabled": tags_dict.get("enabled", ["true"])[0] == "true",
"tags": entity.get("tags", [])
}
conditions.append(condition)
if not next_cursor:
print(f" Pagination complete")
break
cursor = next_cursor
page += 1
return conditions
def _fetch_nrql_conditions(self) -> List[Dict]:
"""Fetch all NRQL conditions with pagination"""
conditions = []
cursor = None
page = 1
while True:
query = """
query($accountId: Int!, $cursor: String) {
actor {
account(id: $accountId) {
alerts {
nrqlConditionsSearch(cursor: $cursor) {
nrqlConditions {
id
name
policyId
enabled
entityGuid
}
nextCursor
totalCount
}
}
}
}
}
"""
variables = {"accountId": int(self.account_id)}
if cursor:
variables["cursor"] = cursor
result = self.query(query, variables)
if "errors" in result:
print(f" GraphQL errors: {result['errors']}")
break
search_result = result.get("data", {}).get("actor", {}).get("account", {}).get("alerts", {}).get("nrqlConditionsSearch", {})
batch = search_result.get("nrqlConditions", [])
next_cursor = search_result.get("nextCursor")
total_count = search_result.get("totalCount")
if page == 1 and total_count:
print(f" Total NRQL conditions available: {total_count}")
print(f" Page {page}: fetched {len(batch)} conditions (total so far: {len(conditions) + len(batch)})")
conditions.extend(batch)
if not next_cursor:
print(f" Pagination complete")
break
cursor = next_cursor
page += 1
return conditions
def _fetch_infrastructure_conditions(self) -> List[Dict]:
"""Fetch infrastructure alert conditions via policies"""
all_conditions = []
# First get all alert policies
policies_query = """
query($accountId: Int!) {
actor {
account(id: $accountId) {
alerts {
policiesSearch {
policies {
id
name
}
}
}
}
}
}
"""
result = self.query(policies_query, {"accountId": int(self.account_id)})
if "errors" in result:
print(f" Error fetching policies: {result['errors']}")
return []
policies = result.get("data", {}).get("actor", {}).get("account", {}).get("alerts", {}).get("policiesSearch", {}).get("policies", [])
if not policies:
return []
print(f" Checking {len(policies)} policies for infrastructure conditions...")
# For each policy, get its conditions
for i, policy in enumerate(policies):
if i % 50 == 0 and i > 0:
print(f" Processed {i}/{len(policies)} policies...")
conditions_query = """
query($accountId: Int!, $policyId: ID!) {
actor {
account(id: $accountId) {
alerts {
policy(id: $policyId) {
conditions {
... on AlertsInfrastructureCondition {
id
name
enabled
policyId
}
... on AlertsExternalServiceCondition {
id
name
enabled
policyId
}
... on AlertsAPMCondition {
id
name
enabled
policyId
}
}
}
}
}
}
}
"""
result = self.query(conditions_query, {
"accountId": int(self.account_id),
"policyId": str(policy['id'])
})
if "errors" in result:
continue
conditions = result.get("data", {}).get("actor", {}).get("account", {}).get("alerts", {}).get("policy", {}).get("conditions", [])
# Add conditions and construct entity GUIDs
for condition in conditions:
if condition: # Filter out None values
condition['policyId'] = policy['id']
# Infrastructure conditions don't have entityGuid in response, need to construct it
all_conditions.append(condition)
return all_conditions
def _batch_get_entity_tags(self, guids: List[str]) -> Dict[str, List[Dict]]:
"""Fetch tags for multiple entities in batches"""
guid_to_tags = {}
batch_size = 25 # New Relic allows up to 25 entities per query
for i in range(0, len(guids), batch_size):
batch = guids[i:i+batch_size]
if (i // batch_size) % 5 == 0:
print(f" Batch {i//batch_size + 1}/{(len(guids)-1)//batch_size + 1}...")
# Build the query dynamically for this batch
entity_queries = []
for idx, guid in enumerate(batch):
entity_queries.append(f"""
entity{idx}: entity(guid: "{guid}") {{
guid
tags {{
key
values
}}
}}
""")
query = f"""
query {{
actor {{
{' '.join(entity_queries)}
}}
}}
"""
result = self.query(query)
if "errors" in result:
print(f"Warning: Error fetching batch: {result['errors']}")
continue
actor_data = result.get("data", {}).get("actor", {})
for idx, guid in enumerate(batch):
entity_data = actor_data.get(f"entity{idx}")
if entity_data:
guid_to_tags[guid] = entity_data.get("tags", [])
return guid_to_tags
def _search_condition_entity(self, condition_id: str) -> Optional[Dict]:
"""Search for a condition's entity to get its GUID and tags"""
query = """
query($query: String!) {
actor {
entitySearch(query: $query) {
results {
entities {
guid
tags {
key
values
}
}
}
}
}
}
"""
# Search by the condition ID tag
search_query = f"id = '{condition_id}' OR tags.`nr.alerts.conditionId` = '{condition_id}'"
result = self.query(query, {"query": search_query})
if "errors" in result:
return None
entities = result.get("data", {}).get("actor", {}).get("entitySearch", {}).get("results", {}).get("entities", [])
if entities:
return entities[0]
return None
def _get_entity_tags(self, guid: str) -> List[Dict]:
"""Get tags for a specific entity"""
query = """
query($guid: EntityGuid!) {
actor {
entity(guid: $guid) {
tags {
key
values
}
}
}
}
"""
result = self.query(query, {"guid": guid})
if "errors" in result:
print(f"DEBUG - Error fetching tags for {guid}: {result['errors']}")
return []
entity = result.get("data", {}).get("actor", {}).get("entity")
if entity is None:
print(f"DEBUG - No entity found for GUID: {guid}")
return []
tags = entity.get("tags", [])
if not tags:
print(f"DEBUG - No tags found for {guid}")
return tags
def get_untagged_conditions(self) -> List[Dict]:
"""Get alert conditions without a 'team' tag"""
all_conditions = self.get_alert_conditions()
print(f"Found {len(all_conditions)} total alert conditions")
untagged = []
for condition in all_conditions:
tags = {tag["key"]: tag["values"] for tag in condition.get("tags", [])}
# Debug: show first few conditions and their tags
if len(untagged) < 3:
print(f"DEBUG - Condition: {condition['name']}")
print(f" Tags: {list(tags.keys())}")
print(f" Has 'team' tag: {'team' in tags}")
if "team" not in tags:
untagged.append(condition)
print(f"Found {len(untagged)} conditions without 'team' tag")
return untagged
def get_existing_teams(self) -> set:
"""Extract all existing team values from tagged conditions"""
all_conditions = self.get_alert_conditions()
teams = set()
for condition in all_conditions:
for tag in condition.get("tags", []):
if tag["key"] == "team":
teams.update(tag["values"])
return teams
def tag_condition(self, guid: str, team: str) -> bool:
"""Add a team tag to an alert condition"""
mutation = """
mutation($guid: EntityGuid!, $tags: [TaggingTagInput!]!) {
taggingAddTagsToEntity(guid: $guid, tags: $tags) {
errors {
message
}
}
}
"""
variables = {
"guid": guid,
"tags": [{"key": "team", "values": [team]}]
}
result = self.query(mutation, variables)
errors = result.get("data", {}).get("taggingAddTagsToEntity", {}).get("errors", [])
if errors:
print(f"Error tagging condition: {errors}")
return False
return True
def get_policy_name(self, policy_id: str) -> str:
"""Get policy name by ID (cached)"""
if policy_id in self._policy_cache:
return self._policy_cache[policy_id]
query = """
query($accountId: Int!, $policyId: ID!) {
actor {
account(id: $accountId) {
alerts {
policy(id: $policyId) {
id
name
}
}
}
}
}
"""
result = self.query(query, {
"accountId": int(self.account_id),
"policyId": policy_id
})
policy = result.get("data", {}).get("actor", {}).get("account", {}).get("alerts", {}).get("policy", {})
policy_name = policy.get("name", f"Policy {policy_id}")
self._policy_cache[policy_id] = policy_name
return policy_name
class InteractiveTagger:
"""Interactive interface for tagging alert conditions"""
SKIPPED_FILE = Path("/tmp/alert-tagger-skipped.json")
def __init__(self, client: NewRelicClient):
self.client = client
self.existing_teams = sorted(list(client.get_existing_teams()))
self.team_index = self._build_team_index()
self.skipped_guids = self._load_skipped()
def _load_skipped(self) -> Set[str]:
"""Load the set of skipped alert GUIDs from disk"""
if self.SKIPPED_FILE.exists():
try:
with open(self.SKIPPED_FILE, 'r') as f:
data = json.load(f)
return set(data.get("skipped", []))
except Exception as e:
print(f"Warning: Could not load skipped file: {e}")
return set()
def _save_skipped(self):
"""Save the set of skipped alert GUIDs to disk"""
try:
with open(self.SKIPPED_FILE, 'w') as f:
json.dump({"skipped": sorted(list(self.skipped_guids))}, f, indent=2)
except Exception as e:
print(f"Warning: Could not save skipped file: {e}")
def _build_team_index(self) -> Dict[str, List[str]]:
"""Build an index of teams by their first two letters"""
index = defaultdict(list)
for team in self.existing_teams:
if len(team) >= 2:
prefix = team[:2].lower()
index[prefix].append(team)
return dict(index)
def _match_team(self, input_text: str) -> Optional[str]:
"""Match input to a team name (exact match or by prefix)"""
# Exact match
if input_text in self.existing_teams:
return input_text
# Prefix match (first two letters)
if len(input_text) >= 2:
prefix = input_text[:2].lower()
matches = self.team_index.get(prefix, [])
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
print(f"\nMultiple teams match '{prefix}':")
for i, team in enumerate(matches, 1):
print(f" {i}. {team}")
choice = input("Select team number: ").strip()
try:
idx = int(choice) - 1
if 0 <= idx < len(matches):
return matches[idx]
except ValueError:
pass
# No match found
return None
def run(self):
"""Run the interactive tagging session"""
print("Fetching untagged alert conditions...\n")
all_conditions = self.client.get_untagged_conditions()
# Filter out previously skipped conditions
conditions = [c for c in all_conditions if c['guid'] not in self.skipped_guids]
if len(self.skipped_guids) > 0:
print(f"Loaded {len(self.skipped_guids)} previously skipped alerts")
print(f"Filtered {len(all_conditions) - len(conditions)} skipped alerts\n")
if not conditions:
print("No untagged alert conditions found!")
return
print(f"Found {len(conditions)} untagged alert conditions to review\n")
if self.existing_teams:
print(f"Existing teams ({len(self.existing_teams)}):")
for team in self.existing_teams:
print(f" - {team}")
print()
tagged_count = 0
skipped_count = 0
for i, condition in enumerate(conditions, 1):
print(f"\n[{i}/{len(conditions)}] Alert Condition:")
print(f" Name: {condition['name']}")
print(f" GUID: {condition['guid']}")
if 'id' in condition:
print(f" ID: {condition['id']}")
if 'policyId' in condition:
policy_name = self.client.get_policy_name(str(condition['policyId']))
print(f" Policy: {policy_name} (ID: {condition['policyId']})")
if 'enabled' in condition:
print(f" Enabled: {condition['enabled']}")
# Show existing tags
existing_tags = {tag["key"]: tag["values"] for tag in condition.get("tags", [])}
if existing_tags:
print(f" Existing tags: {', '.join(existing_tags.keys())}")
while True:
team_input = input("\nEnter team name (or 'o' to open, 's' to skip, 'q' to quit): ").strip()
if team_input.lower() == 'q':
print(f"\nTagged {tagged_count} conditions, skipped {skipped_count}")
return
if team_input.lower() == 'o':
# Open in browser
import webbrowser
import base64
# GUID is base64 encoded, decode it first
# Format after decode: accountId|AIOPS|CONDITION|conditionId
try:
# Add padding if needed
guid = condition['guid']
padding = len(guid) % 4
if padding:
guid += '=' * (4 - padding)
decoded_guid = base64.b64decode(guid).decode('utf-8')
parts = decoded_guid.split('|')
if len(parts) >= 4 and 'policyId' in condition:
account_id = parts[0]
condition_id = parts[3]
policy_id = condition['policyId']
# New Relic alerts URL format - try the NR1 format
url = f"https://one.newrelic.com/nr1-core?state=c0c7903e-c2b6-9944-0374-f7ac35768a9f&account={account_id}&filters=%28domain%20%3D%20%27AIOPS%27%20AND%20type%20%3D%20%27CONDITION%27%29"
# Alternative: direct link to condition
# url = f"https://one.newrelic.com/launcher/nr1-core.settings?pane=eyJuZXJkbGV0SWQiOiJhbGVydHMtYWkuY29uZGl0aW9uLWRldGFpbHMiLCJjb25kaXRpb25JZCI6IiR7Y29uZGl0aW9uX2lkfSJ9"
# Simpler: just go to the alerts policy page
url = f"https://alerts.newrelic.com/accounts/{account_id}/policies/{policy_id}"
print(f"Opening: {url}")
webbrowser.open(url)
else:
print(f"Cannot construct URL from decoded GUID: {decoded_guid}")
except Exception as e:
print(f"Error decoding GUID: {e}")
continue
if team_input.lower() == 's':
self.skipped_guids.add(condition['guid'])
self._save_skipped()
skipped_count += 1
break
if not team_input:
print("Please enter a team name")
continue
# Try to match existing team or use as new team
matched_team = self._match_team(team_input)
if matched_team:
team = matched_team
print(f"Using team: {team}")
else:
# Use as new team name
team = team_input
confirm = input(f"'{team}' is a new team. Confirm? (y/n): ").strip().lower()
if confirm != 'y':
continue
self.existing_teams.append(team)
self.existing_teams.sort()
self.team_index = self._build_team_index()
# Tag the condition
print(f"Tagging condition {condition['guid']} with team '{team}'...")
if self.client.tag_condition(condition['guid'], team):
print("✓ Successfully tagged")
tagged_count += 1
else:
print("✗ Failed to tag condition")
retry = input("Retry? (y/n): ").strip().lower()
if retry == 'y':
continue
break
print(f"\n\nComplete! Tagged {tagged_count} conditions, skipped {skipped_count}")
def main():
"""Main entry point"""
api_key = os.environ.get("NEW_RELIC_API_KEY")
account_id = os.environ.get("NEW_RELIC_ACCOUNT_ID")
if not api_key:
print("Error: NEW_RELIC_API_KEY environment variable not set")
sys.exit(1)
if not account_id:
print("Error: NEW_RELIC_ACCOUNT_ID environment variable not set")
sys.exit(1)
try:
client = NewRelicClient(api_key, account_id)
tagger = InteractiveTagger(client)
tagger.run()
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(0)
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
[project]
name = "alert-tagger"
version = "0.1.0"
description = "New Relic Alert Condition Team Tagger"
requires-python = ">=3.8"
dependencies = [
"requests>=2.31.0",
]
[project.scripts]
alert-tagger = "alert_tagger:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
requests>=2.31.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment