Skip to content

Instantly share code, notes, and snippets.

@pjaudiomv
Last active January 2, 2026 16:01
Show Gist options
  • Select an option

  • Save pjaudiomv/e84024b328137d947377002d0d3ef29e to your computer and use it in GitHub Desktop.

Select an option

Save pjaudiomv/e84024b328137d947377002d0d3ef29e to your computer and use it in GitHub Desktop.
Audit BMLT servers to find duplicate format keys in the same language
#!/usr/bin/env python3
"""
Audit BMLT servers to find duplicate format keys in the same language.
This script checks all servers and reports formats that have the same key_string
in the same language (e.g., two formats with key "B" in English).
Usage:
python3 audit_duplicate_format_keys.py # Audit all servers
python3 audit_duplicate_format_keys.py 102 # Audit only server with id=102
python3 audit_duplicate_format_keys.py 102,104,105 # Audit servers with ids 102, 104, and 105
"""
import json
import sys
from typing import List, Dict, Set, Optional
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
import time
from collections import defaultdict
def fetch_json(url: str, timeout: int = 30) -> List[Dict]:
"""Fetch JSON data from a URL with error handling."""
try:
req = Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0')
with urlopen(req, timeout=timeout) as response:
data = json.loads(response.read().decode('utf-8'))
# Ensure we got a list, not a dict or other type
if not isinstance(data, list):
print(f" ⚠️ Unexpected response type: {type(data)}", file=sys.stderr)
return []
return data
except HTTPError as e:
print(f" ⚠️ HTTP Error {e.code}: {e.reason}", file=sys.stderr)
return []
except URLError as e:
print(f" ⚠️ URL Error: {e.reason}", file=sys.stderr)
return []
except json.JSONDecodeError as e:
print(f" ⚠️ JSON decode error: {e}", file=sys.stderr)
return []
except Exception as e:
print(f" ⚠️ Error: {str(e)}", file=sys.stderr)
return []
def load_server_list(use_github: bool = True) -> List[Dict]:
"""Load the list of BMLT servers from GitHub or local file."""
if use_github:
github_url = "https://raw.githubusercontent.com/bmlt-enabled/aggregator/refs/heads/main/serverList.json"
try:
servers = fetch_json(github_url)
if servers:
return servers
print("⚠️ Failed to fetch from GitHub, falling back to local file", file=sys.stderr)
except Exception as e:
print(f"⚠️ Error fetching from GitHub: {e}, falling back to local file", file=sys.stderr)
# Fallback to local file
with open("serverList.json", 'r') as f:
return json.load(f)
def get_formats(server_url: str) -> List[Dict]:
"""Get all formats from a BMLT server."""
url = f"{server_url}client_interface/json/?switcher=GetFormats&show_all=1"
return fetch_json(url)
def get_meetings(server_url: str) -> List[Dict]:
"""Get all meetings from a BMLT server."""
url = f"{server_url}client_interface/json/?switcher=GetSearchResults"
return fetch_json(url)
def parse_format_ids(format_id_list: str) -> Set[str]:
"""Parse comma-separated format ID list into a set."""
if not format_id_list or format_id_list.strip() == "":
return set()
return {fid.strip() for fid in format_id_list.split(',') if fid.strip()}
def audit_server(server: Dict) -> Dict:
"""Audit a single BMLT server for duplicate format keys."""
server_id = server.get('id', 'unknown')
server_name = server['name']
server_url = server['url']
print(f"\n{'='*80}")
print(f"Auditing: {server_name} (ID: {server_id})")
print(f"URL: {server_url}")
print(f"{'='*80}")
# Fetch formats
print(" Fetching formats...", end=" ", flush=True)
formats = get_formats(server_url)
if not formats:
print("❌ No formats found or error occurred")
return {
'server': server_name,
'url': server_url,
'error': 'Failed to fetch formats',
'duplicates': []
}
print(f"✓ Found {len(formats)} formats")
# Check for duplicate keys per language
print(" Checking for duplicate keys...", end=" ", flush=True)
# Group formats by language and key
lang_key_map = defaultdict(list)
for fmt in formats:
lang = fmt.get('lang', 'unknown')
key = fmt.get('key_string', '')
if key:
lang_key_map[(lang, key)].append({
'id': fmt.get('id', 'unknown'),
'key_string': key,
'name_string': fmt.get('name_string', ''),
'lang': lang,
})
# Find duplicates
duplicates = []
duplicate_format_ids = set()
for (lang, key), format_list in lang_key_map.items():
if len(format_list) > 1:
duplicates.append({
'language': lang,
'key': key,
'count': len(format_list),
'formats': format_list,
})
# Track which format IDs are duplicates
for fmt in format_list:
duplicate_format_ids.add(fmt['id'])
if duplicates:
print(f"⚠️ Found {len(duplicates)} duplicate key(s)")
for dup in duplicates:
format_ids = [f['id'] for f in dup['formats']]
print(f" {dup['language']}.{dup['key']}: {len(dup['formats'])} formats (IDs: {', '.join(format_ids)})")
else:
print("✓ No duplicate keys found")
# Fetch meetings and track format usage for duplicates
format_usage = {}
if duplicate_format_ids:
print(" Fetching meetings...", end=" ", flush=True)
meetings = get_meetings(server_url)
if meetings:
print(f"✓ Found {len(meetings)} meetings")
print(" Analyzing format usage...", end=" ", flush=True)
for meeting in meetings:
format_id_list = meeting.get('format_shared_id_list', '')
meeting_format_ids = parse_format_ids(format_id_list)
meeting_id = meeting.get('id_bigint', 'unknown')
# Track usage only for duplicate formats
for fmt_id in meeting_format_ids:
if fmt_id in duplicate_format_ids:
if fmt_id not in format_usage:
format_usage[fmt_id] = []
format_usage[fmt_id].append(meeting_id)
print("✓ Done")
else:
print("❌ No meetings found or error occurred")
return {
'server': server_name,
'server_id': server_id,
'url': server_url,
'total_formats': len(formats),
'duplicates': duplicates,
'format_usage': format_usage
}
def print_report(results: List[Dict]):
"""Print a summary report of the audit."""
print("\n" + "="*80)
print("AUDIT SUMMARY")
print("="*80)
total_servers = len(results)
servers_with_issues = sum(1 for r in results if r['duplicates'])
total_duplicates = sum(len(r['duplicates']) for r in results)
print(f"\nServers audited: {total_servers}")
print(f"Servers with duplicate keys: {servers_with_issues}")
print(f"Total duplicate key groups: {total_duplicates}")
if total_duplicates > 0:
print("\n" + "="*80)
print("DETAILED RESULTS")
print("="*80)
for result in results:
if result['duplicates']:
server_id = result.get('server_id', 'unknown')
print(f"\n{result['server']} (ID: {server_id})")
print(f" URL: {result['url']}")
print(f" {len(result['duplicates'])} duplicate key group(s):")
for dup in result['duplicates']:
print(f"\n Language: {dup['language']}, Key: {dup['key']}")
print(f" {dup['count']} formats with this key:")
for fmt in dup['formats']:
fmt_id = fmt['id']
print(f" - ID {fmt_id}: {fmt['name_string']}")
# Show which meetings use this format
if 'format_usage' in result and fmt_id in result['format_usage']:
meeting_ids = result['format_usage'][fmt_id]
meeting_ids_str = ', '.join(str(mid) for mid in sorted(meeting_ids))
print(f" Used by {len(meeting_ids)} meeting(s): {meeting_ids_str}")
def save_results(results: List[Dict], filepath: str = "audit_duplicate_keys_results.json"):
"""Save audit results to a JSON file."""
with open(filepath, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n✓ Results saved to {filepath}")
def main():
"""Main execution function."""
print("BMLT Duplicate Format Keys Audit")
print("="*80)
# Load server list
try:
all_servers = load_server_list()
except FileNotFoundError:
print("❌ Error: serverList.json not found", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Error parsing serverList.json: {e}", file=sys.stderr)
sys.exit(1)
# Check if specific server ID(s) were provided
servers = all_servers
if len(sys.argv) > 1:
server_ids = [sid.strip() for sid in sys.argv[1].split(',')]
servers = [s for s in all_servers if s['id'] in server_ids]
if not servers:
print(f"❌ Error: No servers found with ID(s) '{', '.join(server_ids)}'", file=sys.stderr)
print(f"Available server IDs: {', '.join([s['id'] for s in all_servers])}", file=sys.stderr)
sys.exit(1)
# Check if any requested IDs were not found
found_ids = {s['id'] for s in servers}
missing_ids = set(server_ids) - found_ids
if missing_ids:
print(f"⚠️ Warning: Server ID(s) not found: {', '.join(missing_ids)}", file=sys.stderr)
if len(servers) == 1:
print(f"Auditing single server: {servers[0]['name']} (ID: {servers[0]['id']})\n")
else:
print(f"Auditing {len(servers)} servers: {', '.join([s['id'] for s in servers])}\n")
else:
print(f"Loaded {len(servers)} servers from GitHub\n")
# Audit each server
results = []
for i, server in enumerate(servers, 1):
print(f"\n[{i}/{len(servers)}]", end=" ")
result = audit_server(server)
results.append(result)
# Be respectful with rate limiting
if i < len(servers):
time.sleep(1)
# Print and save results
print_report(results)
save_results(results)
# Exit with appropriate code
sys.exit(1 if any(r['duplicates'] for r in results) else 0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment