Created
May 10, 2024 17:52
-
-
Save DanaEpp/5b7fdb3a81f0273da0d07c6f75942155 to your computer and use it in GitHub Desktop.
Sensitive Data Detector see: https://danaepp.com/sensitive-data-detection-using-ai-for-api-hackers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
from typing import List | |
from dataclasses import dataclass | |
from presidio_analyzer import AnalyzerEngine, RecognizerResult | |
import argparse | |
from har_capture_reader import HarCaptureReader | |
analyzer: AnalyzerEngine = AnalyzerEngine() | |
# You can adjust the acceptable threshold here. Presidio using a weighting of 0 to 1. | |
# Typical "confidence" where the data is more likely to be sensitive is at around 0.75 | |
# for most entities. | |
# See https://github.com/microsoft/presidio/tree/main/presidio-analyzer/presidio_analyzer/predefined_recognizers | |
SCORE_THRESHOLD: float = 0.75 | |
@dataclass | |
class SensitiveDataResult: | |
""" Class for keeping track of potentially sensitive data """ | |
entity_type: str | |
score: float | |
data: str | |
@dataclass | |
class SuspectResponse: | |
""" Class for keeping track of responses that have potentially sensitive data """ | |
method: str | |
status_code: int | |
url: str | |
headers: dict | |
body: str | |
sensitive_data: List[SensitiveDataResult] | |
def check_for_sensitive_data(data: str, score_min: float ) -> List[SensitiveDataResult]: | |
""" Runs a response through Microsoft Presidio to see if it can detect any sensitive data """ | |
sensitive_data: List[SensitiveDataResult] = [] | |
results: List[RecognizerResult] = [] | |
try: | |
results = analyzer.analyze( | |
text=data, | |
entities=[ | |
"EMAIL_ADDRESS", "IBAN Generic", "IP_ADDRESS", | |
"PHONE_NUMBER", "LOCATION", "PERSON", "URL", | |
"US_BANK_NUMBER", "US_DRIVER_LICENSE", | |
"US_ITIN", "US_PASSPORT", "US_SSN" | |
], | |
score_threshold=score_min, | |
language='en') | |
except Exception as e: | |
print( f"Exception while analyzing data with Presidio: {e}") | |
return sensitive_data | |
for r in results: | |
try: | |
if r.score >= SCORE_THRESHOLD: | |
sensitive_data.append( SensitiveDataResult(r.entity_type, r.score, data[r.start:r.end]) ) | |
except Exception as e: | |
print(f"{e} : {r}") | |
return sensitive_data | |
def pretty_print(resp: SuspectResponse, show_details: bool = False ) -> None: | |
"""Prints details of responses containing sensitive data""" | |
print( f"\033[32m{resp.url}") | |
for item in resp.sensitive_data: | |
print( f"\033[0m{item.entity_type} (Score={item.score}) : \033[31m{item.data}" ) | |
if show_details: | |
print( "\n\033[36m========\nRESPONSE\n========") | |
print( f"Method: {resp.method}") | |
print( f"Status Code: {resp.status_code}\n") | |
for key,val in resp.headers.items(): | |
print( f"{key}: {str(val)}" ) | |
print( f"\n{resp.body}") | |
print("\033[0m") | |
def main() -> None: | |
"""Main function to process HTTP archive capture files for sensitive data""" | |
parser = argparse.ArgumentParser(description="Search through HTTP archive for sensitive data") | |
parser.add_argument("filename", help="The path to the HAR file to process") | |
parser.add_argument('-d', '--details', action='store_true', help='Shows full detailed response') | |
args = parser.parse_args() | |
try: | |
capture_reader = HarCaptureReader(args.filename) | |
suspect_responses: List[SuspectResponse] = [] | |
for req in capture_reader.captured_requests(): | |
content_type = req.get_response_content_type() | |
# Need to account for mixed JSON objects (ie:protobuf) | |
if content_type.lower().startswith("application/json"): | |
sensitive_data: List[SensitiveDataResult] = check_for_sensitive_data(req.get_response_body(), SCORE_THRESHOLD) | |
if sensitive_data: | |
suspect_responses.append( | |
SuspectResponse( | |
req.get_method(), req.get_response_status_code(), req.get_url(), | |
req.get_response_headers(), req.get_response_body(), | |
sensitive_data) | |
) | |
if suspect_responses: | |
for resp in suspect_responses: | |
pretty_print(resp, args.details) | |
except Exception as e: | |
print(f"General Exception: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment