Created
September 16, 2025 10:31
-
-
Save cdot65/3d01e33c566c6a4e0ee2afe355d48378 to your computer and use it in GitHub Desktop.
Panorama Firewall Hit Count Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to query Panorama for connected firewall serial numbers | |
| and then iterate over them to collect rule hit counts from each firewall. | |
| """ | |
| import requests | |
| import xml.etree.ElementTree as ET | |
| import json | |
| from typing import List, Dict, Optional | |
| import sys | |
| import time | |
| # Configuration | |
| PANORAMA_HOST = "https://panorama.example.io" | |
| API_KEY = "panorama-api-key-was-here" # Replace with your actual API key | |
| # API endpoints | |
| DEVICES_ENDPOINT = "/api?type=op&cmd=<show><devices><connected></connected></devices></show>" | |
| HIT_COUNT_ENDPOINT = "/api?type=op&cmd=<show><rule-hit-count><vsys><all><rule-base><entry name='security'><rules><all/></rules></entry></rule-base></all></vsys></rule-hit-count></show>" | |
| def get_connected_devices(panorama_host: str, api_key: str) -> Optional[str]: | |
| """ | |
| Query Panorama for list of connected devices. | |
| Args: | |
| panorama_host: The Panorama hostname/URL | |
| api_key: The API key for authentication | |
| Returns: | |
| XML response string or None if error | |
| """ | |
| url = f"{panorama_host}{DEVICES_ENDPOINT}" | |
| headers = { | |
| 'X-PAN-KEY': api_key | |
| } | |
| try: | |
| print(f"Fetching connected devices from Panorama...") | |
| response = requests.post(url, headers=headers, verify=True, timeout=30) | |
| response.raise_for_status() | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching connected devices: {e}") | |
| return None | |
| def parse_serial_numbers(xml_response: str) -> List[Dict[str, str]]: | |
| """ | |
| Parse XML response to extract serial numbers and device information. | |
| Args: | |
| xml_response: XML string response from Panorama | |
| Returns: | |
| List of dictionaries containing device information | |
| """ | |
| devices = [] | |
| try: | |
| root = ET.fromstring(xml_response) | |
| # Check if the response was successful | |
| status = root.get('status') | |
| if status != 'success': | |
| print(f"API response status: {status}") | |
| return devices | |
| # Navigate to devices entries | |
| # Path: response -> result -> devices -> entry | |
| result = root.find('result') | |
| if result is None: | |
| print("No result element found in response") | |
| return devices | |
| devices_elem = result.find('devices') | |
| if devices_elem is None: | |
| print("No devices element found in result") | |
| return devices | |
| # Iterate through each device entry | |
| for entry in devices_elem.findall('entry'): | |
| device_info = { | |
| 'name': entry.get('name', 'Unknown'), | |
| 'serial': '', | |
| 'hostname': '', | |
| 'ip_address': '', | |
| 'model': '', | |
| 'connected': '' | |
| } | |
| # Extract device details | |
| serial_elem = entry.find('serial') | |
| if serial_elem is not None and serial_elem.text: | |
| device_info['serial'] = serial_elem.text | |
| hostname_elem = entry.find('hostname') | |
| if hostname_elem is not None and hostname_elem.text: | |
| device_info['hostname'] = hostname_elem.text | |
| ip_elem = entry.find('ip-address') | |
| if ip_elem is not None and ip_elem.text: | |
| device_info['ip_address'] = ip_elem.text | |
| model_elem = entry.find('model') | |
| if model_elem is not None and model_elem.text: | |
| device_info['model'] = model_elem.text | |
| connected_elem = entry.find('connected') | |
| if connected_elem is not None and connected_elem.text: | |
| device_info['connected'] = connected_elem.text | |
| # Only add devices that are connected and have serial numbers | |
| if device_info['serial'] and device_info['connected'] == 'yes': | |
| devices.append(device_info) | |
| print(f"Found device: {device_info['hostname']} (Serial: {device_info['serial']}, IP: {device_info['ip_address']})") | |
| except ET.ParseError as e: | |
| print(f"Error parsing XML response: {e}") | |
| return devices | |
| def get_rule_hit_count(panorama_host: str, api_key: str, serial: str, hostname: str) -> Optional[str]: | |
| """ | |
| Query firewall for rule hit counts via Panorama redirect. | |
| Args: | |
| panorama_host: The Panorama hostname/URL | |
| api_key: The API key for authentication | |
| serial: The target firewall serial number | |
| hostname: The hostname of the firewall (for logging) | |
| Returns: | |
| XML response string or None if error | |
| """ | |
| url = f"{panorama_host}{HIT_COUNT_ENDPOINT}" | |
| headers = { | |
| 'X-PAN-KEY': api_key, | |
| 'target': serial | |
| } | |
| try: | |
| print(f" Fetching rule hit counts for {hostname} (Serial: {serial})...") | |
| response = requests.get(url, headers=headers, verify=True, timeout=60) | |
| response.raise_for_status() | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f" Error fetching rule hit counts for {hostname}: {e}") | |
| return None | |
| def save_hit_count_data(serial: str, hostname: str, xml_response: str, output_dir: str = "hit_counts"): | |
| """ | |
| Save the hit count data to a file. | |
| Args: | |
| serial: The firewall serial number | |
| hostname: The hostname of the firewall | |
| xml_response: The XML response containing hit count data | |
| output_dir: Directory to save output files | |
| """ | |
| import os | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Save the raw XML response | |
| filename = os.path.join(output_dir, f"{hostname}_{serial}_hit_counts.xml") | |
| try: | |
| with open(filename, 'w') as f: | |
| f.write(xml_response) | |
| print(f" Saved hit count data to {filename}") | |
| except IOError as e: | |
| print(f" Error saving hit count data: {e}") | |
| def main(): | |
| """ | |
| Main execution function. | |
| """ | |
| print("="*60) | |
| print("Panorama Firewall Hit Count Collection Script") | |
| print("="*60) | |
| # Step 1: Get connected devices from Panorama | |
| devices_xml = get_connected_devices(PANORAMA_HOST, API_KEY) | |
| if not devices_xml: | |
| print("Failed to retrieve connected devices. Exiting.") | |
| sys.exit(1) | |
| # Step 2: Parse the serial numbers from the response | |
| devices = parse_serial_numbers(devices_xml) | |
| if not devices: | |
| print("No connected devices found with serial numbers. Exiting.") | |
| sys.exit(1) | |
| print(f"\nFound {len(devices)} connected device(s)") | |
| print("="*60) | |
| # Step 3: Iterate over devices and get rule hit counts | |
| successful_queries = 0 | |
| failed_queries = 0 | |
| for i, device in enumerate(devices, 1): | |
| print(f"\n[{i}/{len(devices)}] Processing {device['hostname']}...") | |
| hit_count_xml = get_rule_hit_count( | |
| PANORAMA_HOST, | |
| API_KEY, | |
| device['serial'], | |
| device['hostname'] | |
| ) | |
| if hit_count_xml: | |
| # Save the hit count data | |
| save_hit_count_data(device['serial'], device['hostname'], hit_count_xml) | |
| successful_queries += 1 | |
| else: | |
| failed_queries += 1 | |
| # Add a small delay between requests to avoid overwhelming the API | |
| if i < len(devices): | |
| time.sleep(1) | |
| # Print summary | |
| print("\n" + "="*60) | |
| print("Summary:") | |
| print(f" Total devices processed: {len(devices)}") | |
| print(f" Successful queries: {successful_queries}") | |
| print(f" Failed queries: {failed_queries}") | |
| print("="*60) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment