Skip to content

Instantly share code, notes, and snippets.

@26tajeen
Last active November 24, 2024 21:48
Show Gist options
  • Save 26tajeen/5b0f2d0431d6197b8d3911a580e600cb to your computer and use it in GitHub Desktop.
Save 26tajeen/5b0f2d0431d6197b8d3911a580e600cb to your computer and use it in GitHub Desktop.
Python script to sort, validate and categorise your Home Assistant automations.yaml file
import yaml
import re
from collections import defaultdict
from datetime import datetime
class IndentedDumper(yaml.Dumper):
def increase_indent(self, flow=False, indentless=False):
return super(IndentedDumper, self).increase_indent(flow, False)
class CustomDumper(yaml.Dumper):
def represent_scalar(self, tag, value, style=None):
"""Preserve scalar styles including folded (>) and literal (|)."""
if isinstance(value, str):
# If it starts with > or |, preserve that style
if value.lstrip().startswith('>'):
style = '>'
elif value.lstrip().startswith('|'):
style = '|'
# Otherwise, detect if this is a template string
elif '{{' in value or '{%' in value:
if '\n' in value:
style = '>' # Use folded style for multi-line templates
# For other strings containing special characters, use quotes
elif any(char in str(value) for char in [' ', ':', '-', '/', '\\', '[', ']', '{', '}', '#', ',', '|']):
style = '"' if "'" in str(value) else "'"
return super().represent_scalar(tag, value, style=style)
def write_line_break(self, data=None):
super().write_line_break(data)
if len(self.indents) == 1:
super().write_line_break()
class QuotedDumper(yaml.Dumper):
def represent_str(self, data):
# Only quote strings containing spaces, special characters, or starting with certain characters
if any(char in data for char in [' ', ':', '-', '{', '}', '[', ']', ',']) or data.startswith(('~', '&', '*', '!', '|', '>', '@', '%')):
if "'" in data: # If single quotes are present, use double quotes
return self.represent_scalar('tag:yaml.org,2002:str', data, style='"')
else: # Default to single quotes
return self.represent_scalar('tag:yaml.org,2002:str', data, style="'")
# Otherwise, no quotes
return super().represent_str(data)
yaml.add_representer(str, QuotedDumper.represent_str, Dumper=QuotedDumper)
class AutomationValidator:
def __init__(self):
self.issues = []
self.automation_names = set()
self.referenced_automations = set()
self.referenced_entities = set()
self.defined_entities = set()
# Define valid automation structure
self.valid_automation_fields = {
'id': str,
'alias': str,
'description': str,
'trigger': (list, dict),
'condition': (list, dict),
'action': (list, dict),
'mode': str,
'max_exceeded': str,
'variables': dict,
'initial_state': (str, bool) # Added initial_state which can be 'on'/'off' or True/False
}
def extract_automation_refs_from_template(self, template_str):
"""Extract only explicit automation references from a template string."""
refs = set()
# Updated pattern to catch the full automation name
# Matches until a non-word character or end of string
auto_matches = re.finditer(r'states\.automation\.([a-zA-Z0-9_]+[a-zA-Z0-9_]*)', template_str)
for match in auto_matches:
refs.add(f"automation.{match.group(1)}")
return refs
def normalize_entity_id(self, name):
"""Convert any string to valid entity_id format."""
if not name:
return name
# Remove 'automation.' prefix if present
if name.startswith('automation.'):
name = name[11:]
# First convert spaces to underscores, then handle other characters
normalized = re.sub(r'\s+', '_', name.lower())
normalized = re.sub(r'[^a-z0-9_]', '_', normalized)
normalized = re.sub(r'_+', '_', normalized.strip('_'))
return normalized
def build_automation_names(self, yaml_content):
"""Pre-process all automation names at startup."""
try:
automations = yaml.safe_load(yaml_content)
if not isinstance(automations, list):
return
for automation in automations:
if not isinstance(automation, dict) or 'alias' not in automation:
continue
alias = automation['alias']
if not alias:
continue
# print(f"\nProcessing automation with alias: {alias}")
# Add the automation name variations
automation_name = self.normalize_entity_id(alias)
self.automation_names.add(f"automation.{automation_name}")
# print(f"Added normalized name: automation.{automation_name}")
# Also add the exact alias
self.automation_names.add(f"automation.{alias}")
# print(f"Added exact alias: automation.{alias}")
# And add the ID if present
if 'id' in automation:
automation_id = self.normalize_entity_id(automation['id'])
self.automation_names.add(f"automation.{automation_id}")
# print(f"Added ID variation: automation.{automation_id}")
# print(f"\nFinal automation names list: {sorted(self.automation_names)}")
except Exception as e:
print(f"Error building automation names: {str(e)}")
def check_automation_reference(self, ref_name):
"""Enhanced check for automation references."""
if not ref_name:
return True
# Normalize the reference name
ref_normalized = self.normalize_entity_id(ref_name)
# Check against all known variations
exists = False
similar_names = []
for known_auto in self.automation_names:
known_normalized = self.normalize_entity_id(known_auto)
if known_normalized == ref_normalized:
exists = True
break
elif self.is_similar_name(ref_normalized, known_normalized):
similar_names.append(known_auto)
return exists, similar_names
def matches_automation_name(self, ref_name, automation_aliases):
"""Check if a reference matches any automation alias using various formats."""
# Remove 'automation.' prefix if present
if ref_name.startswith('automation.'):
ref_name = ref_name[11:]
# Normalize the reference name
normalized_ref = self.normalize_entity_id(ref_name)
for alias in automation_aliases:
# Remove 'automation.' prefix from stored alias
alias_name = alias[11:] if alias.startswith('automation.') else alias
# Compare various formats
if any([
normalized_ref == self.normalize_entity_id(alias_name),
ref_name == alias_name,
ref_name.replace('_', ' ') == alias_name.replace('_', ' ')
]):
return True
return False
def validate_template(self, template_str, alias, context=""):
"""Validate template content using strict format checking."""
if not template_str:
return
# print(f"\nDEBUG - Validating template in automation '{alias}':")
# print(f"DEBUG - Template content: {template_str}")
# Get current automation's normalized name
current_automation = f"automation.{self.normalize_entity_id(alias)}"
# print(f"DEBUG - Current automation: {current_automation}")
# print(f"DEBUG - Known automation names: {sorted(self.automation_names)}")
# First extract all entities
entities = self.extract_entities_from_template(template_str)
if entities:
# print(f"DEBUG - Found entities: {entities}")
for entity in entities:
if entity.startswith('automation.'):
# print(f"DEBUG - Checking automation reference: {entity}")
# print(f"DEBUG - Normalized form: automation.{self.normalize_entity_id(entity[11:])}")
self.referenced_automations.add(entity)
# Skip self-references
if self.normalize_entity_id(entity) == self.normalize_entity_id(current_automation):
continue
# Check if this automation exists
normalized_entity = f"automation.{self.normalize_entity_id(entity[11:])}"
if normalized_entity not in self.automation_names:
self.issues.append(
f"Critical - Automation '{alias}': References non-existent automation '{entity}'"
)
# print(f"DEBUG - Added validation error for {entity}")
# Update referenced entities
self.referenced_entities.update(entities)
def is_similar_name(self, name1, name2):
"""Compare names with fuzzy matching."""
# Remove automation. prefix if present
name1 = name1[11:] if name1.startswith('automation.') else name1
name2 = name2[11:] if name2.startswith('automation.') else name2
# Remove quotes and spaces for comparison
name1 = re.sub(r'[\'"\s]', '', name1)
name2 = re.sub(r'[\'"\s]', '', name2)
# Exact match after cleaning
if name1 == name2:
return True
# Levenshtein-like distance
if len(name1) == len(name2):
differences = sum(c1 != c2 for c1, c2 in zip(name1, name2))
if differences == 1: # Allow one character difference
return True
# Check for substring relationship
if name1 in name2 or name2 in name1:
return True
return False
def extract_entities_from_template(self, template_str):
print(f"DEBUG - Processing template: {template_str}")
entities = set()
patterns = [
# Updated pattern to better handle nested templates and variable positions
(r'states\.automation\.([a-zA-Z0-9_\-]+?)(?:\.attributes|\.|\s|}}|,|\)|$)', "Automation state reference"),
(r'automation\.([a-zA-Z0-9_\-]+?)(?:\.attributes|\.|\s|}}|,|\)|$)', "Direct automation reference"),
(r'states\.automation\[[\'"]([^\'"]+)', "Bracketed automation reference"),
(r'states\([\'"]automation\.([^\'"]+)', "states function call")
]
for pattern, desc in patterns:
matches = list(re.finditer(pattern, template_str))
if matches:
print(f"DEBUG - Found {desc} matches in: {template_str}")
for match in matches:
entity = match.group(1)
if not entity.startswith('automation.'):
entity = f"automation.{entity}"
print(f"DEBUG - Found: {entity} (from '{match.group(0)}')")
print(f"DEBUG - Known automations: {sorted(self.automation_names)}")
# Validate the automation reference immediately
normalized_entity = f"automation.{self.normalize_entity_id(entity[11:])}"
print(f"DEBUG - Checking normalized form: {normalized_entity}")
if normalized_entity not in self.automation_names:
print(f"DEBUG - INVALID REFERENCE: {entity} (normalized: {normalized_entity})")
self.issues.append(
f"Critical - Found reference to non-existent automation '{entity}'"
)
else:
print(f"DEBUG - Valid reference found")
entities.add(entity)
print(f"DEBUG - Final extracted entities: {entities}")
return entities
def validate_trigger(self, trigger, alias):
valid_platforms = {
'state': {
'required': ['platform', 'entity_id'],
'optional': ['to', 'from', 'for', 'condition', 'conditions']
},
'mqtt': {
'required': ['platform', 'topic'],
'optional': ['payload', 'encoding']
},
'event': {
'required': ['platform', 'event_type'],
'optional': ['event_data']
},
'numeric_state': {
'required': ['platform', 'entity_id'],
'optional': ['above', 'below', 'for', 'value_template']
},
'template': {
'required': ['platform', 'value_template'],
'optional': ['for']
},
'homeassistant': {
'required': ['platform', 'event']
},
'sun': {
'required': ['platform', 'event'],
'optional': ['offset']
},
'time_pattern': {
'required': ['platform'],
'optional': ['hours', 'minutes', 'seconds']
},
'zone': {
'required': ['platform', 'entity_id', 'zone']
},
'device': {
'required': ['platform', 'device_id', 'type']
},
'time': {
'required': ['platform', 'at'],
'optional': ['weekday']
}
}
if not isinstance(trigger, dict):
self.issues.append(f"Critical - Automation '{alias}': Invalid trigger format")
return False
platform = trigger.get('platform')
if not platform:
self.issues.append(f"Critical - Automation '{alias}': Missing platform in trigger")
return False
if platform not in valid_platforms:
self.issues.append(f"Critical - Automation '{alias}': Invalid platform '{platform}'")
return False
schema = valid_platforms[platform]
for field in schema['required']:
if field not in trigger:
self.issues.append(f"Critical - Automation '{alias}': Missing required field '{field}' for {platform} trigger")
allowed_fields = {'platform'} | set(schema['required']) | set(schema.get('optional', []))
unknown_fields = set(trigger.keys()) - allowed_fields
if unknown_fields:
self.issues.append(f"Warning - Automation '{alias}': Unknown fields in {platform} trigger: {', '.join(unknown_fields)}")
return True
def check_for_typos(self, field_name, valid_fields):
"""Check if a field name is a likely typo of a valid field."""
closest_match = None
highest_similarity = 0
for valid_field in valid_fields:
similarity = sum(c1 == c2 for c1, c2 in zip(field_name, valid_field)) / max(len(field_name), len(valid_field))
if similarity > 0.7: # 70% similarity threshold
highest_similarity = similarity
closest_match = valid_field
return closest_match
def validate_condition(self, condition, alias):
if isinstance(condition, dict):
# Check for direct references to automations in state conditions
if condition.get('condition') == 'state' and 'entity_id' in condition:
entity_id = condition['entity_id']
if entity_id.startswith('automation.'):
if not self.check_automation_reference(entity_id[11:]):
self.issues.append(
f"Critical - Automation '{alias}': References non-existent automation '{entity_id}'"
)
self.referenced_automations.add(entity_id)
valid_condition_fields = {
'value_template', 'condition', 'conditions', 'entity_id', 'state'
}
if isinstance(condition, dict):
for field in condition.keys():
if field not in valid_condition_fields:
suggested = self.check_for_typos(field, valid_condition_fields)
if suggested:
self.issues.append(
f"Critical - Automation '{alias}': Found '{field}', did you mean '{suggested}'?"
)
valid_conditions = {
'numeric_state': {
'required': ['condition', 'entity_id'],
'optional': ['above', 'below', 'for'],
'for_fields': ['hours', 'minutes', 'seconds']
},
'state': {
'required': ['condition', 'entity_id', 'state'],
'optional': ['for'],
'for_fields': ['hours', 'minutes', 'seconds']
},
'template': {
'required': ['condition', 'value_template']
},
'and': {
'required': ['condition'],
'optional': ['conditions']
},
'or': {
'required': ['condition'],
'optional': ['conditions']
},
'time': {
'required': ['condition'],
'optional': ['after', 'before', 'weekday', 'for'],
'for_fields': ['hours', 'minutes', 'seconds']
},
'zone': {
'required': ['condition', 'entity_id', 'zone']
},
'not': {
'required': ['condition']
}
}
# Handle string conditions as templates
if isinstance(condition, str):
return self.validate_condition({
'condition': 'template',
'value_template': condition
}, alias)
if not isinstance(condition, dict):
self.issues.append(f"Critical - Automation '{alias}': Invalid condition format")
return False
condition_type = condition.get('condition')
if not condition_type:
self.issues.append(f"Critical - Automation '{alias}': Missing condition type")
return False
if condition_type not in valid_conditions:
self.issues.append(f"Critical - Automation '{alias}': Invalid condition type '{condition_type}'")
return False
schema = valid_conditions[condition_type]
# Check required fields
for field in schema['required']:
if field not in condition:
self.issues.append(f"Critical - Automation '{alias}': Missing required field '{field}' for {condition_type} condition")
# Check for unknown fields
allowed_fields = {'condition'} | set(schema['required']) | set(schema.get('optional', []))
unknown_fields = set(condition.keys()) - allowed_fields
if unknown_fields:
for field in unknown_fields:
# Check for common typos
closest_match = None
highest_similarity = 0
for valid_field in allowed_fields:
# Simple similarity check based on character overlap
similarity = sum(c1 == c2 for c1, c2 in zip(field, valid_field)) / max(len(field), len(valid_field))
if similarity > highest_similarity and similarity > 0.7: # 70% similarity threshold
highest_similarity = similarity
closest_match = valid_field
if closest_match:
self.issues.append(
f"Critical - Automation '{alias}': Invalid field '{field}' in {condition_type} condition, did you mean '{closest_match}'?"
)
else:
self.issues.append(
f"Critical - Automation '{alias}': Unknown field '{field}' in {condition_type} condition"
)
# Validate 'for' field structure if present
if 'for' in condition:
for_value = condition['for']
if isinstance(for_value, dict):
valid_for_fields = schema.get('for_fields', ['hours', 'minutes', 'seconds'])
for field in for_value:
if field not in valid_for_fields:
closest_match = None
highest_similarity = 0
for valid_field in valid_for_fields:
similarity = sum(c1 == c2 for c1, c2 in zip(field, valid_field)) / max(len(field), len(valid_field))
if similarity > highest_similarity and similarity > 0.7:
highest_similarity = similarity
closest_match = valid_field
if closest_match:
self.issues.append(
f"Critical - Automation '{alias}': Invalid 'for' field '{field}', did you mean '{closest_match}'?"
)
else:
self.issues.append(
f"Critical - Automation '{alias}': Invalid 'for' field '{field}'"
)
elif not isinstance(for_value, (int, float, str)):
self.issues.append(f"Critical - Automation '{alias}': Invalid 'for' value format")
# Additional type-specific validations
if condition_type == 'state':
if 'state' in condition and not isinstance(condition['state'], (str, bool, int, float)):
self.issues.append(f"Critical - Automation '{alias}': Invalid state value type")
elif condition_type == 'numeric_state':
if 'above' in condition and 'below' in condition:
try:
above = float(condition['above'])
below = float(condition['below'])
if above >= below:
self.issues.append(
f"Critical - Automation '{alias}': Invalid numeric range (above >= below)"
)
except (ValueError, TypeError):
self.issues.append(f"Critical - Automation '{alias}': Invalid numeric values")
return True
def validate_action(self, action, alias):
valid_data_fields = {
'message', 'title', 'target', 'data', 'entity_id'
}
if isinstance(action, dict) and isinstance(action.get('data'), dict):
for field in action['data'].keys():
if field not in valid_data_fields:
suggested = self.check_for_typos(field, valid_data_fields)
if suggested:
self.issues.append(
f"Critical - Automation '{alias}': In action data found '{field}', did you mean '{suggested}'?"
)
if isinstance(action, dict):
# Check all possible template fields
template_fields = [
('data_template', None),
('service_template', None),
('data', 'message'),
('data', 'title'),
('data', 'payload')
]
for field, subfield in template_fields:
if field in action:
if subfield:
if isinstance(action[field], dict) and subfield in action[field]:
self.validate_template(str(action[field][subfield]), alias, f"Action {field}.{subfield} ")
else:
if isinstance(action[field], str):
self.validate_template(action[field], alias, f"Action {field} ")
elif isinstance(action[field], dict):
for key, value in action[field].items():
if isinstance(value, str):
self.validate_template(value, alias, f"Action {field}.{key} ")
valid_actions = {
'service': {
'required': ['service'],
'optional': [
'entity_id', 'data', 'data_template', 'target',
'response_variable'
]
},
'service_template': {
'required': ['service_template'],
'optional': ['data', 'data_template', 'target', 'entity_id']
},
'variables': {
'required': ['variables']
},
'delay': {'required': ['delay']},
'wait_template': {'required': ['wait_template'], 'optional': ['timeout']},
'wait_for_trigger': {'required': ['platform']},
'repeat': {'required': ['sequence'], 'optional': ['until', 'while']},
'choose': {'required': ['choose'], 'optional': ['default']},
'condition': {
'required': ['condition'],
'optional': [
'entity_id', 'state', 'value_template',
'after', 'before', 'below', 'above'
]
},
'event': {'required': ['event', 'event_type']},
'scene': {'required': ['scene']},
'device_id': {'required': ['device_id', 'domain', 'entity_id']},
}
# Handle string templates as service_template actions
if isinstance(action, str):
return self.validate_action({
'service_template': action
}, alias)
if not isinstance(action, dict):
self.issues.append(f"Critical - Automation '{alias}': Invalid action format")
return False
# Determine action type
action_type = None
for key in valid_actions.keys():
if key in action:
action_type = key
break
if action_type == 'repeat':
# Validate sequence within repeat
sequence = action.get('sequence', [])
if isinstance(sequence, dict):
sequence = [sequence]
for seq_action in sequence:
self.validate_action(seq_action, alias)
return True
if not action_type:
self.issues.append(f"Critical - Automation '{alias}': Unknown action type")
return False
# Validate action against schema
schema = valid_actions[action_type]
for field in schema['required']:
if field not in action:
self.issues.append(f"Critical - Automation '{alias}': Missing required field '{field}' for {action_type} action")
allowed_fields = set(schema['required']) | set(schema.get('optional', []))
unknown_fields = set(action.keys()) - allowed_fields
if unknown_fields:
self.issues.append(f"Warning - Automation '{alias}': Unknown fields in {action_type} action: {', '.join(unknown_fields)}")
return True
def _validate_service_action(self, action, alias):
service = action.get('service', '')
if not service or '.' not in service:
self.issues.append(f"Warning - Automation '{alias}': Invalid service format")
return
# Validate data/data_template structure
if 'data' in action and not isinstance(action['data'], dict):
self.issues.append(f"Critical - Automation '{alias}': Invalid data format")
if 'data_template' in action and not isinstance(action['data_template'], dict):
self.issues.append(f"Critical - Automation '{alias}': Invalid data_template format")
def validate_time_pattern(self, time_str):
"""Validate time pattern format."""
if not time_str:
return False
try:
# Handle "HH:MM" format
if len(time_str.split(':')) == 2:
time_str += ':00'
# Handle negative offsets
if time_str.startswith('-'):
time_str = time_str[1:]
datetime.strptime(time_str, '%H:%M:%S')
return True
except ValueError:
try:
# Handle special formats
if any(word in time_str.lower() for word in ['sunset', 'sunrise', 'noon', 'midnight']):
return True
return False
except:
return False
def validate_numeric_condition(self, condition):
"""Validate numeric state conditions for logical errors."""
if 'above' in condition and 'below' in condition:
above = float(condition['above'])
below = float(condition['below'])
if above >= below:
return False
return True
def validate_trigger_structure(self, trigger, alias):
"""Validate the structure of a trigger entry."""
if isinstance(trigger, dict) and 'entity_id' in trigger and 'platform' not in trigger:
self.issues.append(f"Critical - Automation '{alias}': Malformed trigger - 'platform' should be the root key")
return False
return True
def find_matching_toggle_trigger(self, trigger, triggers):
"""Find a matching toggle trigger for a state trigger."""
if trigger.get('platform') != 'state' or 'entity_id' not in trigger:
return False
entity_id = trigger.get('entity_id')
# Skip toggle pattern check for certain device types
if any(device_type in entity_id for device_type in [
'button',
'binary_sensor',
'device_tracker',
'input_button',
'scene',
'script',
'group',
'person',
'proximity',
'zone',
'sensor',
'counter',
'input_',
'alert',
'automation',
'camera',
'lock', # Locks often have one-way triggers
'alarm_control_panel', # Alarm panels have multiple valid states
'cover' # Covers often have one-way automations
]):
return True
# Check if this is a state that doesn't need a toggle
to_state = trigger.get('to', '').lower()
from_state = trigger.get('from', '').lower()
if any(state in (to_state, from_state) for state in [
'home', 'not_home', 'away', 'present', 'absent',
'locked', 'unlocked', # Lock states
'armed_away', 'armed_home', 'armed_night', 'disarmed', # Alarm states
'armed_vacation', 'armed_custom_bypass', # More alarm states
'open', 'closed', 'opening', 'closing' # Cover states
]):
return True
# For other devices, check for matching toggle
for other_trigger in triggers:
if (other_trigger != trigger and
other_trigger.get('platform') == 'state' and
other_trigger.get('entity_id') == entity_id and
other_trigger.get('to') == from_state and
other_trigger.get('from') == to_state):
return True
return False
def validate_automation(self, automation, index):
"""Main validation method using schema-based validation."""
if not isinstance(automation, dict):
self.issues.append(f"Automation #{index}: Not a valid automation dictionary")
return
# Store start position of issues list
start_len = len(self.issues)
# Get alias early for better error messages
alias = automation.get('alias')
if not alias:
self.issues.append(f"Critical - Automation #{index}: Missing alias")
alias = f"Unnamed automation #{index}"
# Validate structure against known valid fields
for field, value in automation.items():
if field not in self.valid_automation_fields:
self.issues.append(
f"Critical - Automation '{alias}': Unknown field '{field}'"
)
else:
expected_type = self.valid_automation_fields[field]
if not isinstance(value, expected_type):
if isinstance(expected_type, tuple):
if not isinstance(value, expected_type):
self.issues.append(
f"Critical - Automation '{alias}': Field '{field}' should be one of {expected_type}"
)
else:
self.issues.append(
f"Critical - Automation '{alias}': Field '{field}' should be {expected_type.__name__}"
)
# Check for required alias
if 'alias' not in automation:
self.issues.append(f"Critical - Automation #{index}: Missing required field 'alias'")
return
alias = automation.get('alias', '')
normalized_alias = self.normalize_entity_id(alias)
if alias:
# Store both formats in automation_names instead of automation_aliases
self.automation_names.add(f"automation.{normalized_alias}")
self.automation_names.add(f"automation.{alias.lower().replace(' ', '_')}")
# Also store original format
self.automation_names.add(f"automation.{alias}")
# Initialize validation flags
has_valid_trigger = False
has_valid_action = False
# Validate triggers
triggers = automation.get('trigger', [])
if not triggers:
self.issues.append(f"Critical - Automation '{alias}': No trigger defined")
return
if isinstance(triggers, dict):
triggers = [triggers]
# Process triggers
processed_toggles = set()
for trigger in triggers:
if self.validate_trigger(trigger, alias):
has_valid_trigger = True
# Process state-based toggle patterns
if (isinstance(trigger, dict) and
trigger.get('platform') == 'state' and
trigger.get('entity_id')):
entity_id = trigger['entity_id']
self.referenced_entities.add(entity_id)
# Check for toggle pattern
if (any(device_type in entity_id for device_type in [
'cover', 'garage_door', 'media_player', 'vacuum'
]) and
entity_id not in processed_toggles and
'to' in trigger and 'from' in trigger and
not self.find_matching_toggle_trigger(trigger, triggers)):
self.issues.append(
f"Info - Automation '{alias}': State trigger for {entity_id} might be missing its toggle counterpart"
)
processed_toggles.add(entity_id)
# Validate conditions
if 'condition' in automation:
conditions = automation['condition']
if isinstance(conditions, (str, dict)):
conditions = [conditions]
for condition in conditions:
self.validate_condition(condition, alias)
# Validate actions
actions = automation.get('action', [])
if not actions:
self.issues.append(f"Critical - Automation '{alias}': No action defined")
return
if isinstance(actions, dict):
actions = [actions]
for action in actions:
if self.validate_action(action, alias):
has_valid_action = True
# Final validity checks
if not has_valid_trigger:
self.issues.append(f"Critical - Automation '{alias}': No valid triggers found")
if not has_valid_action:
self.issues.append(f"Critical - Automation '{alias}': No valid actions found")
# Process entity_id references in triggers for toggle pattern validation
processed_toggles = set()
for trigger in triggers:
if not isinstance(trigger, dict):
continue
platform = trigger.get('platform')
if platform == 'state' and trigger.get('entity_id'):
entity_id = trigger['entity_id']
self.referenced_entities.add(entity_id)
if (any(device_type in entity_id for device_type in [
'cover', 'garage_door', 'media_player', 'vacuum'
]) and
entity_id not in processed_toggles and
'to' in trigger and 'from' in trigger and
not self.find_matching_toggle_trigger(trigger, triggers)):
self.issues.append(
f"Info - Automation '{alias}': State trigger for {entity_id} might be missing its toggle counterpart"
)
processed_toggles.add(entity_id)
elif platform == 'mqtt':
if not trigger.get('topic'):
self.issues.append(f"Critical - Automation '{alias}': MQTT trigger missing topic")
else:
has_valid_trigger = True
if 'payload' in trigger and not trigger.get('payload'):
self.issues.append(f"Warning - Automation '{alias}': MQTT trigger has empty payload")
elif platform == 'event':
if not trigger.get('event_type'):
self.issues.append(f"Critical - Automation '{alias}': Event trigger missing event_type")
else:
has_valid_trigger = True
elif platform == 'homeassistant':
event = trigger.get('event')
if not event:
self.issues.append(f"Critical - Automation '{alias}': Home Assistant trigger missing event")
elif event not in ['start', 'shutdown', 'restart']:
self.issues.append(f"Warning - Automation '{alias}': Unusual Home Assistant event type: {event}")
has_valid_trigger = True
elif platform == 'sun':
event = trigger.get('event')
if not event:
self.issues.append(f"Critical - Automation '{alias}': Sun trigger missing event")
elif event not in ['sunset', 'sunrise']:
self.issues.append(f"Critical - Automation '{alias}': Invalid sun event type: {event}")
else:
has_valid_trigger = True
if 'offset' in trigger:
offset = trigger['offset']
if not isinstance(offset, str) or not self.validate_time_pattern(offset.strip('+-')):
self.issues.append(f"Warning - Automation '{alias}': Invalid sun offset format: {offset}")
elif platform == 'time_pattern':
# time_pattern requires at least one of hours, minutes, or seconds
if not any(key in trigger for key in ['hours', 'minutes', 'seconds']):
self.issues.append(f"Critical - Automation '{alias}': Time pattern trigger missing time parameter")
else:
has_valid_trigger = True
# Validate pattern format (allowing for /X format)
for key in ['hours', 'minutes', 'seconds']:
if key in trigger:
value = str(trigger[key])
if not (value.isdigit() or
(value.startswith('/') and value[1:].isdigit()) or
value == '*'):
self.issues.append(
f"Warning - Automation '{alias}': Invalid {key} format in time pattern: {value}")
elif platform == 'time':
if 'at' in trigger:
time_str = trigger['at']
if isinstance(time_str, list):
# Handle list of times
for t in time_str:
if not self.validate_time_pattern(str(t)):
self.issues.append(f"Critical - Automation '{alias}': Invalid time format '{t}'")
has_valid_trigger = True
else:
if not self.validate_time_pattern(str(time_str)):
self.issues.append(f"Critical - Automation '{alias}': Invalid time format '{time_str}'")
else:
has_valid_trigger = True
else:
self.issues.append(f"Critical - Automation '{alias}': Time trigger missing 'at' parameter")
elif platform == 'numeric_state':
if not trigger.get('entity_id'):
self.issues.append(f"Critical - Automation '{alias}': Numeric state trigger missing entity_id")
else:
self.referenced_entities.add(trigger['entity_id'])
if 'above' in trigger and 'below' in trigger:
try:
above = float(trigger['above'])
below = float(trigger['below'])
if above >= below:
self.issues.append(
f"Critical - Automation '{alias}': Numeric trigger has impossible condition "
f"(above {above} >= below {below})"
)
except ValueError:
self.issues.append(f"Critical - Automation '{alias}': Invalid numeric values in trigger")
has_valid_trigger = True
elif platform == 'time':
if 'at' in trigger:
time_str = trigger['at']
if not self.validate_time_pattern(time_str):
self.issues.append(f"Critical - Automation '{alias}': Invalid time format '{time_str}'")
else:
has_valid_trigger = True
elif platform == 'template':
if not trigger.get('value_template'):
self.issues.append(f"Critical - Automation '{alias}': Template trigger missing value_template")
else:
template = trigger['value_template']
if '{{' not in template or '}}' not in template:
self.issues.append(f"Critical - Automation '{alias}': Template trigger missing Jinja2 delimiters")
else:
has_valid_trigger = True
self.referenced_entities.update(self.extract_entities_from_template(template))
if not has_valid_trigger:
self.issues.append(f"Critical - Automation '{alias}': No valid triggers found")
# Validate conditions
conditions = automation.get('condition', [])
if isinstance(conditions, dict):
conditions = [conditions]
for condition in conditions:
if not isinstance(condition, dict):
continue
condition_type = condition.get('condition')
if not condition_type:
self.issues.append(f"Warning - Automation '{alias}': Condition missing type")
continue
if condition_type == 'numeric_state':
if not self.validate_numeric_condition(condition):
self.issues.append(
f"Critical - Automation '{alias}': Numeric state condition has impossible range"
)
elif condition_type == 'template':
template = condition.get('value_template', '')
if template:
template_entities = self.extract_entities_from_template(template)
self.referenced_entities.update(template_entities)
# Check for self-references without defaults
current_auto = f"automation.{normalized_alias}"
if current_auto in template_entities:
if 'default' not in template:
self.issues.append(
f"Critical - Automation '{alias}': Self-reference in template "
f"without default value protection"
)
elif condition_type == 'time':
if 'after' in condition and 'before' in condition:
after_time = condition['after']
before_time = condition['before']
if not self.validate_time_pattern(after_time) or not self.validate_time_pattern(before_time):
self.issues.append(f"Critical - Automation '{alias}': Invalid time format in condition")
# Validate actions
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
has_valid_action = False
for action in actions:
if self.validate_action(action, alias):
has_valid_action = True
if not has_valid_action:
self.issues.append(f"Critical - Automation '{alias}': No valid actions found")
if len(self.issues) > start_len:
# Get just the new issues added by this automation
new_issues = self.issues[start_len:]
# Deduplicate them
deduped_issues = list(dict.fromkeys(new_issues))
# Replace the new issues with deduplicated version
self.issues[start_len:] = deduped_issues
class AutomationSorter:
def __init__(self):
# Initialize validator first
self.validator = AutomationValidator()
# Timer purpose mapping
self.timer_outcomes = {
'timer.house_empty': 'security',
'timer.heating_boost': 'heating',
'timer.media_timeout': 'media',
'timer.light_timeout': 'lighting',
'timer.presence_check': 'presence',
'timer.power_save': 'power',
}
# Domain to category mappings
self.domain_categories = {
'alarm_control_panel': ('security', 100),
'lock': ('security', 90),
'climate': ('heating', 90),
'thermostat': ('heating', 90),
'light': ('lighting', 80),
'switch': ('lighting', 70),
'media_player': ('media', 80),
'device_tracker': ('presence', 80),
'person': ('presence', 80),
'ups': ('power', 90),
'system': ('system', 100),
'notify': ('notification', 30),
'alert': ('notification', 30),
'tts': ('notification', 30),
'chime_tts': ('notification', 30),
'esphome': ('notification', 30),
'dfplayer': ('notification', 30)
}
# Pattern weights for category detection
self.category_patterns = {
'security': {
'pattern': r'alarm|alert|motion|door\.|window\.|lock\.|security|armed|disarmed',
'weight': 80
},
'heating': {
'pattern': r'climate\.|thermostat|heat_|heating|cooling|hvac|boost|temperature_(control|set|adjust)',
'weight': 80
},
'lighting': {
'pattern': r'light\.|switch\.|brightness|illuminance|dark|bright',
'weight': 70
},
'power': {
'pattern': r'power|ups|shutdown|battery|energy',
'weight': 90
},
'media': {
'pattern': r'media_player\.|speaker|volume|audio|stereo|sound|tv|movie',
'weight': 70
},
'presence': {
'pattern': r'presence|occupancy|person\.|device_tracker|home|away',
'weight': 80
},
'notification': {
'pattern': r'notify\.|message|alert|tts\.|announcement',
'weight': 30
},
'system': {
'pattern': r'hassio\.|system|cache|restart|update',
'weight': 100
}
}
def find_timer_start_outcome(self, automation, all_automations):
"""Find what category an automation that starts a timer should be."""
# print(f"Analyzing timer start in automation: {automation.get('alias', 'unnamed')}")
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
for action in actions:
if isinstance(action, dict):
service = action.get('service', '')
if service and service.startswith('timer.start'):
timer_entity = None
if 'entity_id' in action:
timer_entity = action['entity_id']
elif 'target' in action and isinstance(action['target'], dict):
target_entities = action['target'].get('entity_id')
if isinstance(target_entities, str):
timer_entity = target_entities
elif isinstance(target_entities, list) and target_entities:
timer_entity = target_entities[0]
if timer_entity:
print(f"Found timer start for {timer_entity}")
# Find what happens when this timer completes
timer_category = self.analyze_timer_outcome(timer_entity, all_automations)
if timer_category:
print(f"Timer {timer_entity} traced to category: {timer_category}")
return timer_category
return None
def find_timer_completion_category(self, automation):
"""Find what category an automation that handles timer completion should be."""
triggers = automation.get('trigger', [])
if isinstance(triggers, dict):
triggers = [triggers]
for trigger in triggers:
if (isinstance(trigger, dict) and
trigger.get('platform') == 'event' and
trigger.get('event_type') == 'timer.finished'):
# Analyze what this automation does
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
scores = defaultdict(int)
for action in actions:
if isinstance(action, dict):
service = action.get('service', '')
if service:
domain = service.split('.')[0]
if domain in self.domain_categories:
category, priority = self.domain_categories[domain]
scores[category] += priority
entities = []
if 'entity_id' in action:
entities.append(action['entity_id'])
if 'target' in action and isinstance(action['target'], dict):
target_entities = action['target'].get('entity_id', [])
if isinstance(target_entities, str):
entities.append(target_entities)
elif isinstance(target_entities, list):
entities.extend(target_entities)
for entity in entities:
if isinstance(entity, str):
domain = entity.split('.')[0]
if domain in self.domain_categories:
category, priority = self.domain_categories[domain]
scores[category] += priority
if scores:
return max(scores.items(), key=lambda x: x[1])[0]
return None
def analyze_timer_outcome(self, timer_entity, automations):
"""Analyze what a timer does by finding its completion handler."""
# If we got a single automation instead of a list, wrap it
print(f"Looking for completion handler for timer: {timer_entity}")
if isinstance(automations, dict):
automations = [automations]
# Search through all automations for the timer's completion handler
for automation in automations:
if not isinstance(automation, dict):
continue
triggers = automation.get('trigger', [])
if isinstance(triggers, dict):
triggers = [triggers]
for trigger in triggers:
if (isinstance(trigger, dict) and
trigger.get('platform') == 'event' and
trigger.get('event_type') == 'timer.finished' and
trigger.get('event_data', {}).get('entity_id') == timer_entity):
# Found the timer completion trigger, analyze its actions
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
scores = defaultdict(int)
for action in actions:
if isinstance(action, dict):
service = action.get('service', '')
if service:
domain = service.split('.')[0]
if domain in self.domain_categories:
category, priority = self.domain_categories[domain]
scores[category] += priority
entities = []
if 'entity_id' in action:
entities.append(action['entity_id'])
if 'target' in action and isinstance(action['target'], dict):
target_entities = action['target'].get('entity_id', [])
if isinstance(target_entities, str):
entities.append(target_entities)
elif isinstance(target_entities, list):
entities.extend(target_entities)
for entity in entities:
if isinstance(entity, str):
domain = entity.split('.')[0]
if domain in self.domain_categories:
category, priority = self.domain_categories[domain]
scores[category] += priority
if scores:
category = max(scores.items(), key=lambda x: x[1])[0]
print(f"Timer {timer_entity} completion maps to category: {category}")
return category
return None
def analyze_timer_purpose(self, automation):
"""Analyze what a timer is used for in the automation."""
if not isinstance(automation, dict):
return None
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
# Look for timer starts
for action in actions:
if isinstance(action, dict):
service = action.get('service', '')
if service.startswith('timer.start'):
timer_entity = action.get('target', {}).get('entity_id')
if timer_entity:
# Check predefined timer purposes
if timer_entity in self.timer_outcomes:
return self.timer_outcomes[timer_entity]
# If not predefined, look ahead in the automation for what the timer triggers
return self.analyze_timer_outcome(timer_entity, automation)
return None
def detect_category(self, automation, all_automations):
# First priority: Check if this automation starts a timer
timer_outcome = self.find_timer_start_outcome(automation, all_automations)
if timer_outcome:
return timer_outcome
# Second priority: Check if this automation handles a timer completion
timer_completion = self.find_timer_completion_category(automation)
if timer_completion:
return timer_completion
if not isinstance(automation, dict):
return 'other'
scores = defaultdict(int)
# Get all actions
actions = automation.get('action', [])
if isinstance(actions, dict):
actions = [actions]
# First check for timer relationships
for action in actions:
if isinstance(action, dict):
service = action.get('service', '')
if service and service.startswith('timer.start'):
timer_entity = None
if 'entity_id' in action:
timer_entity = action['entity_id']
elif 'target' in action and isinstance(action['target'], dict):
target_entities = action['target'].get('entity_id')
if isinstance(target_entities, str):
timer_entity = target_entities
elif isinstance(target_entities, list) and target_entities:
timer_entity = target_entities[0]
if timer_entity:
# Be sure all_automations is a list
if isinstance(all_automations, dict):
all_automations = [all_automations]
timer_category = self.analyze_timer_outcome(timer_entity, all_automations)
if timer_category:
return timer_category
# Count notification-related actions
notification_actions = 0
total_actions = 0
for action in actions:
if isinstance(action, dict):
total_actions += 1
service = action.get('service', '')
if service:
domain = service.split('.')[0]
if domain in ['esphome', 'dfplayer', 'notify', 'tts']:
notification_actions += 1
if domain in self.domain_categories:
category, priority = self.domain_categories[domain]
scores[category] += priority
# If most actions are notifications, prioritize that category
if notification_actions > 0 and notification_actions / total_actions >= 0.5:
return 'notification'
# Add pattern-based scores
automation_str = str(automation).lower()
for category, info in self.category_patterns.items():
matches = len(re.findall(info['pattern'], automation_str))
scores[category] += matches * info['weight']
if not scores:
return 'other'
return max(scores.items(), key=lambda x: x[1])[0]
def sort_automations(self, automations):
"""Sort automations with timer resolution."""
try:
if not isinstance(automations, list):
raise ValueError("Input must be a list of automations")
# Group by category
categorized = defaultdict(list)
for automation in automations:
category = self.detect_category(automation, automations)
categorized[category].append(automation)
# Build output with category headers
output_lines = []
for category in sorted(categorized.keys()):
output_lines.append(f"\n# {'='*20} {category.upper()} {'='*20}\n")
category_automations = sorted(
categorized[category],
key=lambda x: x.get('alias', '').lower() if isinstance(x, dict) else ''
)
for automation in category_automations:
yaml_str = yaml.dump(
[automation],
Dumper=CustomDumper,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
width=10000,
explicit_start=False,
explicit_end=False,
indent=2
)
indented = '\n'.join(' ' + line for line in yaml_str.splitlines())
output_lines.append(indented)
output_lines.append('# ' + '~'*50 + '\n')
return '\n'.join(output_lines)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML format: {str(e)}")
def format_output(self, yaml_content):
try:
# First load the YAML content once
initial_automations = yaml.safe_load(yaml_content)
if not isinstance(initial_automations, list):
raise ValueError("Input YAML must contain a list of automations")
# Build automation names first
print("\nBuilding automation name index...")
self.validator.build_automation_names(yaml_content) # Pass the original yaml_content
# Then run validation
print("\nValidating automations...")
for i, automation in enumerate(initial_automations, 1):
print(f" Checking automation {i}/{len(initial_automations)}: {automation.get('alias', 'unnamed')}")
self.validator.validate_automation(automation, i)
# Sort and format automations
print("\nSorting automations into categories...")
sorted_content = self.sort_automations(initial_automations) # Pass the parsed list instead of yaml_content
# Count categories
category_counts = defaultdict(int)
for automation in initial_automations:
category = self.detect_category(automation, initial_automations)
category_counts[category] += 1
print(f" Categorized '{automation.get('alias', 'unnamed')}' as {category}")
print("\nCategory summary:")
for category, count in sorted(category_counts.items()):
print(f" {category}: {count} automations")
# Build output
output_parts = [
f"# Total automations: {len(initial_automations)}",
"",
sorted_content,
""
]
# Add category summary
output_parts.extend([
"# Category Summary:",
*[f"# {category}: {count} automations"
for category, count in sorted(category_counts.items())]
])
# Add validation results
if self.validator.issues:
critical = [i for i in self.validator.issues if "Critical" in i]
warnings = [i for i in self.validator.issues if "Warning" in i]
if critical:
output_parts.extend([
"",
f"# Critical Issues ({len(critical)}):",
*[f"# {issue}" for issue in critical]
])
if warnings:
output_parts.extend([
"",
f"# Warnings ({len(warnings)}):",
*[f"# {issue}" for issue in warnings]
])
# Add undefined automations
referenced = {e for e in self.validator.referenced_entities
if e.startswith('automation.')}
undefined = referenced - self.validator.automation_names
if undefined:
output_parts.extend([
"",
"# Referenced but undefined automations:",
*[f"# - {auto}" for auto in sorted(undefined)]
])
return '\n'.join(output_parts)
except Exception as e:
raise ValueError(f"Formatting error: {str(e)}")
class AutomationBeautifier:
def __init__(self):
self.standard_key_order = [
'id',
'alias',
'description',
'trigger',
'condition',
'mode',
'action'
]
def beautify_triggers(self, triggers):
"""Standardize trigger format."""
if not triggers:
return triggers
if isinstance(triggers, dict):
triggers = [triggers]
beautified = []
for trigger in triggers:
if not isinstance(trigger, dict):
continue
# Ensure standard key order within triggers
ordered_trigger = {}
standard_trigger_keys = ['platform', 'entity_id', 'to', 'from', 'for', 'at', 'event', 'offset']
# Add keys in standard order if they exist
for key in standard_trigger_keys:
if key in trigger:
ordered_trigger[key] = trigger[key]
# Add any remaining keys
for key, value in trigger.items():
if key not in ordered_trigger:
ordered_trigger[key] = value
beautified.append(ordered_trigger)
return beautified
def beautify_conditions(self, conditions):
"""Standardize condition format."""
if not conditions:
return conditions
if isinstance(conditions, dict):
conditions = [conditions]
beautified = []
for condition in conditions:
if not isinstance(condition, dict):
continue
ordered_condition = {}
standard_condition_keys = ['condition', 'entity_id', 'state', 'value_template']
for key in standard_condition_keys:
if key in condition:
ordered_condition[key] = condition[key]
for key, value in condition.items():
if key not in ordered_condition:
ordered_condition[key] = value
beautified.append(ordered_condition)
return beautified
def beautify_actions(self, actions):
if not actions:
return actions
beautified = []
for action in actions:
if not isinstance(action, dict):
continue
ordered_action = {}
for key, value in action.items():
# Ensure keys like 'topic' and 'payload' are quoted
if key in {'topic', 'payload'} and isinstance(value, str):
value = f'"{value}"' if '"' not in value else f"'{value}'"
ordered_action[key] = value
beautified.append(ordered_action)
return beautified
def beautify_automation(self, automation):
"""Beautify a single automation."""
if not isinstance(automation, dict):
return automation
beautified = {}
# Add keys in standard order
for key in self.standard_key_order:
if key in automation:
value = automation[key]
if key == 'trigger':
value = self.beautify_triggers(value)
elif key == 'condition':
value = self.beautify_conditions(value)
elif key == 'action':
value = self.beautify_actions(value)
beautified[key] = value
# Add any remaining keys not in standard order
for key, value in automation.items():
if key not in beautified:
beautified[key] = value
return beautified
def main():
try:
input_file = 'automations.yaml'
with open(input_file, 'r', encoding='utf-8') as file:
content = file.read()
sorter = AutomationSorter()
formatted_output = sorter.format_output(content)
# Print validation summary to console
for issue in sorter.validator.issues:
print(issue)
output_file = 'automations_sorted.yaml'
with open(output_file, 'w', encoding='utf-8') as file:
file.write(formatted_output)
except FileNotFoundError:
print(f"Error: Could not find {input_file}")
raise
except Exception as e:
print(f"Error: {str(e)}")
import traceback
print(traceback.format_exc())
raise
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment