Skip to content

Instantly share code, notes, and snippets.

@justinabrahms
Created October 16, 2025 21:10
Show Gist options
  • Save justinabrahms/aac07edb236fcee2a0e08089c7dd99c4 to your computer and use it in GitHub Desktop.
Save justinabrahms/aac07edb236fcee2a0e08089c7dd99c4 to your computer and use it in GitHub Desktop.
New Relic Alert Policy Team Checker - Detects PER_POLICY alert policies with conditions from multiple teams. This helps identify alert routing issues where team tags are used for incident management (e.g., Rootly) - a single incident would alert all teams simultaneously instead of just the owning team.
#!/usr/bin/env python3
"""
Check New Relic alert policies for multiple teams under PER_POLICY configuration.
This detects policies where conditions monitor entities with different team tags,
which can cause issues with alert routing when using PER_POLICY incident preference.
Usage:
./check-newrelic-policy-teams.py # Check all PER_POLICY policies
./check-newrelic-policy-teams.py 705052 # Check specific policy by ID
"""
import json
import sys
import argparse
import base64
import os
from collections import defaultdict
from pathlib import Path
import requests
# Load environment variables from .env.newrelic
env_file = Path(__file__).parent / '.env.newrelic'
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key] = value
API_KEY = os.environ.get('NEWRELIC_API_KEY')
ACCOUNT_ID = int(os.environ.get('NEWRELIC_ACCOUNT_ID', '0'))
if not API_KEY or not ACCOUNT_ID:
print("Error: NEWRELIC_API_KEY and NEWRELIC_ACCOUNT_ID must be set in .env.newrelic")
sys.exit(1)
GRAPHQL_URL = "https://api.newrelic.com/graphql"
REST_API_URL = "https://api.newrelic.com/v2"
def graphql_query(query):
"""Execute a GraphQL query against New Relic API."""
headers = {
"Api-Key": API_KEY,
"Content-Type": "application/json"
}
response = requests.post(
GRAPHQL_URL,
headers=headers,
json={"query": query}
)
response.raise_for_status()
return response.json()
def rest_api_get(endpoint):
"""Execute a REST API GET request."""
headers = {"Api-Key": API_KEY}
response = requests.get(f"{REST_API_URL}/{endpoint}", headers=headers)
response.raise_for_status()
return response.json()
def get_per_policy_policies():
"""Get all policies with PER_POLICY incident preference."""
query = """
{
actor {
account(id: %d) {
alerts {
policiesSearch {
policies {
id
name
incidentPreference
}
}
}
}
}
}
""" % ACCOUNT_ID
result = graphql_query(query)
policies = result['data']['actor']['account']['alerts']['policiesSearch']['policies']
return [p for p in policies if p['incidentPreference'] == 'PER_POLICY']
def construct_condition_entity_guid(condition_id):
"""Construct the entity GUID for an alert condition."""
# Format: {account_id}|AIOPS|CONDITION|{condition_id}
guid_str = f"{ACCOUNT_ID}|AIOPS|CONDITION|{condition_id}"
# New Relic strips the base64 padding
return base64.b64encode(guid_str.encode()).decode().rstrip('=')
def get_policy_conditions_rest(policy_id):
"""Get all conditions for a policy using REST API with pagination."""
conditions = []
# Get standard APM/Infrastructure conditions (paginated)
# These don't have entity_guid in REST API, so we construct it
page = 1
while True:
try:
result = rest_api_get(f"alerts_conditions.json?policy_id={policy_id}&page={page}")
page_conditions = result.get('conditions', [])
if not page_conditions:
break
conditions.extend([
{
'id': c['id'],
'name': c['name'],
'type': 'standard',
'entity_guid': construct_condition_entity_guid(c['id'])
}
for c in page_conditions
])
page += 1
except:
break
# Get NRQL conditions - these have entity_guid already (paginated)
page = 1
while True:
try:
result = rest_api_get(f"alerts_nrql_conditions.json?policy_id={policy_id}&page={page}")
nrql_conditions = result.get('nrql_conditions', [])
if not nrql_conditions:
break
conditions.extend([
{'id': c['id'], 'name': c['name'], 'type': 'nrql', 'entity_guid': c.get('entity_guid')}
for c in nrql_conditions
])
page += 1
except:
break
return conditions
def get_entities_team_tags_by_guid(entity_guids):
"""Get team tags for multiple entities by their GUIDs."""
if not entity_guids:
return {}
entity_teams = {}
# Query entities in batches to avoid URL length limits
batch_size = 20
for i in range(0, len(entity_guids), batch_size):
batch = entity_guids[i:i+batch_size]
for guid in batch:
query = """
{
actor {
entity(guid: "%s") {
guid
name
tags {
key
values
}
}
}
}
""" % guid
try:
result = graphql_query(query)
entity = result.get('data', {}).get('actor', {}).get('entity')
if entity:
tags = entity.get('tags', [])
for tag in tags:
if tag['key'] == 'team' and tag['values']:
entity_teams[guid] = tag['values'][0]
break
else:
# Entity not found - may not exist or may not have permission
pass
except Exception as e:
# Only print errors for debugging if needed
# print(f" Error fetching tags for entity {guid}: {e}")
continue
return entity_teams
def get_policy_by_id(policy_id):
"""Get a specific policy by ID."""
query = """
{
actor {
account(id: %d) {
alerts {
policiesSearch(searchCriteria: {ids: ["%s"]}) {
policies {
id
name
incidentPreference
}
}
}
}
}
}
""" % (ACCOUNT_ID, policy_id)
result = graphql_query(query)
policies = result['data']['actor']['account']['alerts']['policiesSearch']['policies']
return policies[0] if policies else None
def main():
parser = argparse.ArgumentParser(
description='Check New Relic alert policies for multiple teams under PER_POLICY configuration.'
)
parser.add_argument('policy_id', nargs='?', help='Specific policy ID to check (optional)')
args = parser.parse_args()
if args.policy_id:
print(f"Fetching policy {args.policy_id}...")
policy = get_policy_by_id(args.policy_id)
if not policy:
print(f"Error: Policy {args.policy_id} not found")
sys.exit(1)
print(f"Found policy: {policy['name']} (Incident Preference: {policy['incidentPreference']})\n")
per_policy_policies = [policy]
else:
print("Fetching PER_POLICY alert policies...")
per_policy_policies = get_per_policy_policies()
print(f"Found {len(per_policy_policies)} policies with PER_POLICY setting\n")
issues_found = []
for policy in per_policy_policies:
policy_id = policy['id']
policy_name = policy['name']
print(f"Checking policy: {policy_name} (ID: {policy_id})")
try:
conditions = get_policy_conditions_rest(policy_id)
if not conditions:
print(f" No conditions found\n")
continue
print(f" Found {len(conditions)} total conditions")
# Collect all entity GUIDs from conditions
entity_guids = []
condition_info = {} # Map guid to condition name for reporting
for condition in conditions:
guid = condition.get('entity_guid')
if guid:
entity_guids.append(guid)
condition_info[guid] = condition['name']
if not entity_guids:
print(f" No condition entities found\n")
continue
# Get team tags for all condition entities
print(f" Querying tags for {len(entity_guids)} condition entities...")
entity_teams = get_entities_team_tags_by_guid(entity_guids)
print(f" Found {len(entity_teams)} conditions with team tags")
if not entity_teams:
print(f" No team tags found on entities\n")
continue
# Check if multiple teams exist
unique_teams = set(entity_teams.values())
if len(unique_teams) > 1:
print(f" ⚠️ ISSUE FOUND: Multiple teams in same PER_POLICY policy!")
print(f" Teams found: {', '.join(sorted(unique_teams))}")
# Group conditions by team
teams_to_conditions = defaultdict(list)
for guid, team in entity_teams.items():
condition_name = condition_info.get(guid, guid)
teams_to_conditions[team].append(condition_name)
for team, condition_names in teams_to_conditions.items():
print(f" Team '{team}': {len(condition_names)} condition(s)")
for cond_name in condition_names[:3]: # Show first 3
print(f" - {cond_name}")
if len(condition_names) > 3:
print(f" ... and {len(condition_names) - 3} more")
issues_found.append({
'policy_id': policy_id,
'policy_name': policy_name,
'teams': sorted(unique_teams),
'team_condition_counts': {t: len(c) for t, c in teams_to_conditions.items()},
'team_conditions': {t: c for t, c in teams_to_conditions.items()}
})
else:
team_name = list(unique_teams)[0]
print(f" ✓ OK: Single team ({team_name}) with {len(entity_teams)} condition(s)")
print()
except Exception as e:
print(f" Error querying conditions: {e}\n")
import traceback
traceback.print_exc()
continue
# Summary
print("\n" + "="*80)
print("SUMMARY")
print("="*80)
if issues_found:
print(f"\n❌ Found {len(issues_found)} policies with multiple teams:\n")
for issue in issues_found:
print(f"Policy: {issue['policy_name']} (ID: {issue['policy_id']})")
print(f" Teams: {', '.join(issue['teams'])}")
print()
# Write detailed report to file
with open('newrelic-multi-team-policies.json', 'w') as f:
json.dump(issues_found, f, indent=2)
print(f"Detailed report written to: newrelic-multi-team-policies.json")
sys.exit(1)
else:
print("\n✅ No issues found! All PER_POLICY policies have single teams.")
sys.exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment