Created
February 10, 2025 12:58
-
-
Save milo2012/94b6cb9ce98b75469137e4bd2d2112d0 to your computer and use it in GitHub Desktop.
This script is designed to analyze phishing URLs, particularly those that abuse Bing's tracking system (bing.com/ck). It extracts the real destination URL, tracks redirections, and captures screenshots at each step to help analyze how phishing campaigns redirect users to malicious sites.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
from playwright.async_api import async_playwright | |
from typing import Optional, List | |
import time | |
import base64 | |
import urllib.parse | |
""" | |
Phishing URL Redirection Tracker | |
This script is designed to analyze phishing URLs, particularly those that abuse Bing's tracking system (bing.com/ck). | |
It extracts the real destination URL, tracks redirections, and captures screenshots at each step to help analyze | |
how phishing campaigns redirect users to malicious sites. | |
Reference: | |
https://www.trustwave.com/en-us/resources/blogs/spiderlabs-blog/trusted-domain-hidden-danger-deceptive-url-redirections-in-email-phishing-attacks/ | |
Sample: https://www.bing.com/ck/a?c201-6040-011a-74e8c34061bd&u=a1aHR0cHM6Ly93d3cuZHJldm9iYi5ldS8&ntb=1 | |
""" | |
class RedirectTracker: | |
def __init__(self): | |
self.redirects: List[dict] = [] | |
self.final_url: Optional[str] = None | |
self.last_url: Optional[str] = None | |
self.screenshots_taken = set() | |
self.last_redirect_time = time.time() | |
def add_redirect(self, from_url: str, to_url: str, redirect_type: str) -> bool: | |
if to_url != self.last_url: # Only add if URL actually changed | |
self.redirects.append({ | |
'timestamp': time.time(), | |
'from': from_url, | |
'to': to_url, | |
'type': redirect_type | |
}) | |
self.final_url = to_url | |
self.last_url = to_url | |
self.last_redirect_time = time.time() | |
return True | |
return False | |
def time_since_last_redirect(self) -> float: | |
return time.time() - self.last_redirect_time | |
async def extract_real_url_from_bing(bing_url: str) -> Optional[str]: | |
try: | |
parsed = urllib.parse.urlparse(bing_url) | |
params = urllib.parse.parse_qs(parsed.query) | |
if 'u' in params: | |
encoded_url = params['u'][0] | |
if encoded_url.startswith('a1'): | |
encoded_url = encoded_url[2:] | |
try: | |
decoded_url = base64.b64decode(encoded_url + '=' * (-len(encoded_url) % 4)).decode() | |
return decoded_url | |
except: | |
return None | |
except: | |
return None | |
return None | |
async def take_screenshot(page, base_path: str, url: str, tracker: RedirectTracker): | |
if url not in tracker.screenshots_taken: | |
try: | |
screenshot_name = f"{base_path.rsplit('.', 1)[0]}_{len(tracker.screenshots_taken)}.png" | |
await page.screenshot(path=screenshot_name, full_page=True) | |
print(f"Screenshot saved for new URL: {screenshot_name}") | |
tracker.screenshots_taken.add(url) | |
except Exception as e: | |
print(f"Screenshot error: {str(e)}") | |
async def check_redirect(url: str, screenshot_path: str = "screenshot.png", timeout: int = 30, | |
observation_time: int = 30, stable_time: int = 10): | |
async with async_playwright() as p: | |
browser = await p.chromium.launch( | |
headless=False, | |
args=[ | |
'--disable-web-security', | |
'--disable-features=IsolateOrigins,site-per-process', | |
'--no-sandbox' | |
] | |
) | |
context = await browser.new_context( | |
viewport={'width': 1280, 'height': 800}, | |
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
ignore_https_errors=True, | |
bypass_csp=True, | |
extra_http_headers={ | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
) | |
page = await context.new_page() | |
tracker = RedirectTracker() | |
start_time = time.time() | |
should_exit = asyncio.Event() | |
try: | |
if 'bing.com/ck' in url: | |
real_url = await extract_real_url_from_bing(url) | |
if real_url: | |
print(f"Extracted URL from Bing redirect: {real_url}") | |
if tracker.add_redirect(url, real_url, "bing_extraction"): | |
await take_screenshot(page, screenshot_path, real_url, tracker) | |
url = real_url | |
async def handle_dialog(dialog): | |
await dialog.accept() | |
page.on('dialog', handle_dialog) | |
async def handle_url_change(): | |
current_url = page.url | |
while not should_exit.is_set(): | |
await asyncio.sleep(1) | |
new_url = page.url | |
if new_url != current_url: | |
print(f"URL changed: {current_url} → {new_url}") | |
if tracker.add_redirect(current_url, new_url, "javascript"): | |
await take_screenshot(page, screenshot_path, new_url, tracker) | |
current_url = new_url | |
# Check if we should exit due to stability | |
if tracker.time_since_last_redirect() > stable_time: | |
print(f"\nNo redirects detected for {stable_time} seconds - finishing up...") | |
should_exit.set() | |
break | |
# Check if we've exceeded maximum observation time | |
if time.time() - start_time > observation_time: | |
print("\nReached maximum observation time - finishing up...") | |
should_exit.set() | |
break | |
monitor_task = asyncio.create_task(handle_url_change()) | |
print(f'Navigating to "{url}"...') | |
try: | |
response = await page.goto(url, timeout=timeout * 1000, wait_until='networkidle') | |
if response: | |
if tracker.add_redirect(url, response.url, "initial_navigation"): | |
await take_screenshot(page, screenshot_path, response.url, tracker) | |
except Exception as e: | |
print(f"Initial navigation error: {str(e)}") | |
try: | |
response = await page.goto(url, timeout=timeout * 1000, wait_until='domcontentloaded') | |
if response: | |
if tracker.add_redirect(url, response.url, "retry_navigation"): | |
await take_screenshot(page, screenshot_path, response.url, tracker) | |
except Exception as retry_e: | |
print(f"Retry navigation also failed: {str(retry_e)}") | |
print(f"\nMonitoring for redirects (will exit after {stable_time}s of stability)...") | |
# Wait for either stability period or maximum observation time | |
await should_exit.wait() | |
# Clean up | |
await monitor_task | |
print("\nRedirect chain:") | |
for redirect in tracker.redirects: | |
print(f"[{time.strftime('%H:%M:%S', time.localtime(redirect['timestamp']))}] " | |
f"{redirect['type']}: {redirect['from']} → {redirect['to']}") | |
print(f"\nFinal URL: {tracker.final_url or page.url}") | |
print(f"Total unique URLs captured: {len(tracker.screenshots_taken)}") | |
except Exception as e: | |
print(f"General error: {str(e)}") | |
finally: | |
elapsed_time = time.time() - start_time | |
print(f"\nTotal execution time: {elapsed_time:.2f} seconds") | |
await browser.close() | |
def main(): | |
parser = argparse.ArgumentParser(description="Check final redirected URL and save a screenshot.") | |
parser.add_argument("-u", "--url", required=True, help="The URL to check for redirection") | |
parser.add_argument("-s", "--screenshot", default="screenshot.png", help="Path to save the screenshot") | |
parser.add_argument("-t", "--timeout", type=int, default=30, help="Navigation timeout in seconds (default: 30)") | |
parser.add_argument("-o", "--observe", type=int, default=60, help="Maximum observation time in seconds (default: 60)") | |
parser.add_argument("-w", "--wait", type=int, default=10, help="Stable time before exit in seconds (default: 10)") | |
args = parser.parse_args() | |
asyncio.run(check_redirect(args.url, args.screenshot, args.timeout, args.observe, args.wait)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment