Last active
August 15, 2024 18:37
-
-
Save lrhazi/66d7b6d1650cf88596c29f6ccf9c46cb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyshark | |
import json | |
from collections import defaultdict | |
def process_pcap(pcap_file, target_uri, ssl_keylog_file=None): | |
# Set up capture with TLS decryption if keylog file is provided | |
display_filter = f'http.request.uri contains "{target_uri}" or http.response' | |
if ssl_keylog_file: | |
cap = pyshark.FileCapture(pcap_file, | |
display_filter=display_filter, | |
tshark_path='path/to/tshark', # Specify the path to tshark | |
override_prefs={'ssl.keylog_file': ssl_keylog_file}) | |
else: | |
cap = pyshark.FileCapture(pcap_file, display_filter=display_filter) | |
# Use a dictionary to store streams | |
streams = defaultdict(lambda: {'request': None, 'response': None}) | |
for packet in cap: | |
if 'TCP' in packet and 'HTTP' in packet: | |
stream_id = f"{packet.ip.src}:{packet.tcp.srcport}-{packet.ip.dst}:{packet.tcp.dstport}" | |
if hasattr(packet.http, 'request_uri'): | |
streams[stream_id]['request'] = { | |
'uri': packet.http.request_uri, | |
'method': packet.http.request_method, | |
'src_ip': packet.ip.src, | |
'dst_ip': packet.ip.dst, | |
'time': packet.sniff_time | |
} | |
if 'TLS' in packet: | |
streams[stream_id]['request']['tls_version'] = packet.tls.record_version | |
elif hasattr(packet.http, 'response_code'): | |
streams[stream_id]['response'] = { | |
'status_code': packet.http.response_code, | |
'content_type': packet.http.content_type if hasattr(packet.http, 'content_type') else None, | |
'time': packet.sniff_time | |
} | |
if 'TLS' in packet: | |
streams[stream_id]['response']['tls_version'] = packet.tls.record_version | |
cap.close() | |
# Filter completed transactions containing the target URI | |
completed_transactions = [ | |
stream for stream in streams.values() | |
if stream['request'] and stream['response'] and target_uri in stream['request']['uri'] | |
] | |
return completed_transactions | |
def main(): | |
pcap_file = 'capture.pcap' | |
target_uri = '/specific/path' | |
ssl_keylog_file = '/path/to/your/sslkeylog.txt' # Set to None if not needed | |
transactions = process_pcap(pcap_file, target_uri, ssl_keylog_file) | |
print(json.dumps(transactions, indent=2, default=str)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment