Last active
June 10, 2024 11:18
-
-
Save jinnosux/63160c7cf9d929f7eb9ce0221917b345 to your computer and use it in GitHub Desktop.
entro.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Network Packets Entropy Analyzer | |
Author: Vahid Konicanin | |
Part of a Research Paper "Detecting Network Anomalies with Shannon Entropy: A Novel Approach to Cybersecurity" | |
International Balkan University, 2024 | |
""" | |
import math | |
import hexdump | |
import dpkt | |
import statistics | |
def print_colored(text, color): | |
colors = { | |
'red': '\033[91m', | |
'reset': '\033[0m' | |
} | |
return f"{colors[color]}{text}{colors['reset']}" | |
def calculate_entropy(string): | |
# get frequency of characters in string | |
freqs = [float(string.count(char)) / len(string) for char in set(string)] | |
# calculate the entropy using Shannon's formula | |
shannon_entropy = -sum([freq * math.log2(freq) for freq in freqs]) | |
return shannon_entropy | |
def calculate_normalized_entropy(string): | |
freqs = [float(string.count(char)) / len(string) for char in set(string)] | |
entropy = -sum([freq * math.log2(freq) for freq in freqs]) | |
# Calculate the maximum possible entropy for the given data set | |
max_entropy = math.log2(len(set(string))) | |
# Normalize the entropy | |
normalized_entropy = entropy / max_entropy | |
return normalized_entropy | |
def process_pcap(file_path, num_packets): | |
entropy_info = {} | |
entropy_values = [] | |
entropy_sum = 0 | |
with open(file_path, 'rb') as pcap_file: | |
pcap = dpkt.pcap.Reader(pcap_file) | |
for i, (timestamp, packet) in enumerate(pcap): | |
if i >= num_packets: | |
break | |
# Convert packet data to string | |
packet_string = ''.join(hexdump.dump(packet).split()[1:]) | |
# Get IP address | |
eth = dpkt.ethernet.Ethernet(packet) | |
if isinstance(eth.data, dpkt.ip.IP): | |
ip_address = socket.inet_ntoa(eth.data.src) | |
# Calculate normalized entropy | |
normalized_entropy = calculate_normalized_entropy(packet_string) | |
entropy_sum += normalized_entropy | |
if ip_address not in entropy_info: | |
entropy_info[ip_address] = [] | |
entropy_info[ip_address].append(normalized_entropy) | |
entropy_values.append(normalized_entropy) | |
# Calculate average normalized entropy | |
average_normalized_entropy = entropy_sum / num_packets | |
# Calculate the global average normalized entropy and standard deviation | |
global_avg_normalized_entropy = statistics.mean(entropy_values) | |
global_std_dev = statistics.stdev(entropy_values) | |
# Calculate the threshold for identifying outliers | |
outlier_threshold = global_avg_normalized_entropy + global_std_dev | |
print(f"Global Average Normalized Entropy: {global_avg_normalized_entropy}") | |
print(f"Global Std Dev of Normalized Entropy: {global_std_dev}") | |
for ip_address, normalized_entropies in entropy_info.items(): | |
avg_ip_normalized_entropy = sum(normalized_entropies) / len(normalized_entropies) | |
outlier = avg_ip_normalized_entropy > outlier_threshold | |
# Check if the average_normalized_entropy is above the threshold | |
if outlier: | |
avg_ip_normalized_entropy_str = print_colored(f"{avg_ip_normalized_entropy:.4f}", 'red') | |
ip_address_str = print_colored(f"{ip_address}", 'red') | |
else: | |
avg_ip_normalized_entropy_str = f"{avg_ip_normalized_entropy:.4f}" | |
ip_address_str = f"{ip_address}" | |
print(f"IP Address: {ip_address_str}, Average Normalized Entropy: {avg_ip_normalized_entropy_str}") | |
if __name__ == "__main__": | |
import socket | |
pcap_file_path = "wireshark_Ethernet_2_00229.pcap" | |
num_packets_to_process = 10000 | |
process_pcap(pcap_file_path, num_packets_to_process) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment