Created
May 3, 2024 05:01
-
-
Save abbycantcode/4352002cd5370e6080879ae5fda3a179 to your computer and use it in GitHub Desktop.
Parses Caido JSON project file and parses all the requests and saves graphql queries!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import base64 | |
import argparse | |
from graphql import parse | |
def parse_http_request(http_request): | |
"""Parse the HTTP request to extract the JSON body and Host header.""" | |
try: | |
headers_and_body = http_request.split('\r\n\r\n') | |
if len(headers_and_body) < 2: | |
return None, None | |
json_body = headers_and_body[1] | |
headers = headers_and_body[0].split('\r\n') | |
host = next((header.split(': ')[1] for header in headers if header.startswith('Host: ')), None) | |
return json.loads(json_body), host | |
except (ValueError, json.JSONDecodeError): | |
return None, None | |
def is_graphql_query(content): | |
try: | |
if isinstance(content, str): | |
content = json.loads(content) | |
parse(content['query']) | |
return True | |
except Exception: | |
return False | |
def extract_variables(query): | |
variables = [] | |
try: | |
ast = parse(query) | |
for definition in ast.definitions: | |
if hasattr(definition, 'variable_definitions'): | |
variables.extend([var.variable.name.value for var in definition.variable_definitions]) | |
except Exception: | |
pass | |
return variables | |
def create_query_file(directory, operation_name, query): | |
file_name = f"{operation_name}.graphql" | |
file_path = os.path.join(directory, file_name) | |
with open(file_path, "w") as file: | |
file.write(query) | |
def save_operation_name(directory, operation_name): | |
file_path = os.path.join(directory, "operation_name.txt") | |
if not os.path.exists(file_path): | |
open(file_path, 'w').close() | |
if operation_name not in open(file_path).read(): | |
with open(file_path, "a") as file: | |
file.write(operation_name + '\n') | |
def save_variables(directory, variables): | |
file_path = os.path.join(directory, "variables.txt") | |
with open(file_path, "a") as file: | |
for variable in variables: | |
if variable not in open(file_path).read(): | |
file.write(variable + "\n") | |
def process_graphql_query(content, output_directory, host): | |
try: | |
if isinstance(content, str): | |
data = json.loads(content) | |
else: | |
data = content | |
operation_name = data.get('operationName', 'Unnamed_Operation') | |
query = data['query'] | |
variables = extract_variables(query) | |
host_directory = os.path.join(output_directory, host) | |
queries_directory = os.path.join(host_directory, "queries") | |
os.makedirs(queries_directory, exist_ok=True) | |
create_query_file(queries_directory, operation_name, query) | |
save_operation_name(host_directory, operation_name) | |
save_variables(host_directory, variables) | |
except json.JSONDecodeError: | |
pass | |
def parse_request(request, output_directory): | |
try: | |
if request["method"] != "POST": | |
return | |
raw_data = base64.b64decode(request["raw"]).decode("utf-8") | |
except Exception as e: | |
print("Error decoding raw data:", e) | |
return | |
print("Parsed Request:") | |
print("Method:", request["method"]) | |
print("Path:", request["path"]) | |
print("Query:", request["query"]) | |
print("Raw Data:") | |
print(raw_data) | |
print() | |
json_body, host = parse_http_request(raw_data) | |
if json_body and host and is_graphql_query(json_body): | |
process_graphql_query(json_body, output_directory, host) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Parse GraphQL requests from input JSON file") | |
parser.add_argument("input_file", help="Path to the input JSON file") | |
parser.add_argument("-o", "--output_directory", help="Output directory for parsed data", default=os.getcwd()) | |
args = parser.parse_args() | |
with open(args.input_file, "r") as file: | |
data = json.load(file) | |
output_directory = args.output_directory | |
os.makedirs(output_directory, exist_ok=True) | |
for request in data: | |
parse_request(request, output_directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment