Last active
August 2, 2024 15:10
-
-
Save kmsec-uk/96944f95b3cd55b07c3918493a029fb6 to your computer and use it in GitHub Desktop.
Quick+dirty asynchronous URLScan subtasking with aiohttp and asyncio (phishing example)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import json | |
import aiohttp | |
import os | |
import re | |
api_key = os.environ.get("URLSCAN_API_KEY") | |
urlscan_base_url = "https://urlscan.io/api/v1/" | |
headers = {"Content-Type": "application/json", "API-Key": api_key} | |
domain_regex = re.compile(r'https?:\/\/(?P<domain>.+?)\/') | |
def extract_domain(url: str): | |
groups = re.findall(domain_regex, url) | |
return groups[0] | |
class APISearch: | |
def __init__(self, search: str) -> None: | |
self.search = search | |
self.session = None | |
self.search_results = [] | |
self.subtask_queue = asyncio.Queue() | |
self.task_results = [] | |
async def return_results(self) -> None: | |
"""returns the search results and puts tasks (search results) in the subtask_queue""" | |
search_params = {"q": self.search} | |
async with self.session.get( | |
urlscan_base_url + "search", params=search_params, headers=headers | |
) as response: | |
print("Status:", response.status) | |
if response.ok: | |
j = await response.json() | |
for result in j["results"]: | |
await self.subtask_queue.put(result) | |
else: | |
raise Exception(str(response.status) + " from " + str(response.url)) | |
async def subtask(self) -> None: | |
"""Pick up task from the queue (a result from the search query), and triage for true positives by examining its full result page""" | |
while True: | |
if self.subtask_queue.empty(): | |
break | |
try: | |
task = await self.subtask_queue.get() | |
task_url = task["task"]["url"] | |
print("processing ", task_url, task["result"]) | |
async with self.session.get( | |
task["result"], headers=headers | |
) as response: | |
if response.ok: | |
j = await response.json() | |
# we find and confirm the IOC by checking that the second request in the request chain | |
request = j["data"]["requests"][1] | |
try: | |
script_line_number = request["request"]["initiator"][ | |
"stack" | |
]["callFrames"][0]["lineNumber"] | |
except (KeyError, TypeError, IndexError): | |
script_line_number = -1 | |
if ( | |
request["response"]["dataLength"] < 150 | |
and request["response"]["type"] == "Script" | |
and script_line_number == 0 | |
): | |
script_url = request["request"]["request"]["url"] | |
self.task_results.append( | |
{ | |
"phish_url" : task_url, | |
"phish_domain" : extract_domain(task_url), | |
"script_url": script_url, | |
"script_domain" : extract_domain(script_url) | |
} | |
) | |
print("true positive") | |
else: | |
print("false positive") | |
else: | |
raise Exception( | |
str(response.status) + " from " + str(response.url) | |
) | |
self.subtask_queue.task_done() | |
except asyncio.exceptions.CancelledError: | |
break | |
async def start(self) -> None: | |
"""entrypoint - sets up an aiohttp ssession and proceeds until the initial task (the search) and each subtask is completed. Prints the results of triage.""" | |
async with aiohttp.ClientSession(trust_env=True) as session: | |
self.session = session | |
await self.return_results() | |
workers = [asyncio.create_task(self.subtask()) for _ in range(5)] | |
await asyncio.gather(*workers) | |
print(json.dumps(self.task_results, indent=2)) | |
async def main(): | |
search = APISearch( | |
'page.domain:"en.wikipedia.org" AND NOT task.domain:"wikipedia.org" AND NOT task.domain:whatiscrowdstrike.com NOT task.domain:t.co' | |
) | |
await search.start() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment