Created
May 1, 2025 18:29
-
-
Save Afeez1131/ea107ad63dec633faa94d09c05270793 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from django.utils import timezone | |
from django.core.validators import MinValueValidator, MaxValueValidator | |
class ThreatCategory(models.Model): | |
name = models.CharField(max_length=255) | |
abbreviation = models.CharField(max_length=50, blank=True, null=True) | |
def __str__(self): | |
return self.name | |
class Threat(models.Model): | |
name = models.CharField(max_length=200) | |
abbreviation = models.CharField(max_length=50, blank=True, null=True) | |
cve_id = models.CharField(max_length=200) | |
category = models.ForeignKey(ThreatCategory, on_delete=models.CASCADE, related_name="threats", null=True, blank=True) | |
published_date = models.DateTimeField(null=True, blank=True) | |
publish_year = models.IntegerField( | |
validators=[ | |
MinValueValidator(1900), | |
MaxValueValidator(2050) | |
] | |
) | |
severity = models.DecimalField(decimal_places=2, max_digits=5, null=True, blank=True) | |
references = models.TextField(blank=True) | |
description = models.TextField() | |
created_at = models.DateTimeField(auto_now_add=True) | |
updated_at = models.DateTimeField(auto_now=True) | |
def __str__(self): | |
return self.name | |
class ISO27002Control(models.Model): | |
control_id = models.CharField(max_length=50) | |
name = models.CharField(max_length=255) | |
def __str__(self): | |
return f"{self.control_id} - {self.name}" | |
class ISAIECRequirement(models.Model): | |
requirement = models.CharField(max_length=255) | |
def __str__(self): | |
return self.requirement | |
class NISTCSF(models.Model): | |
description = models.TextField(null=True, blank=True) | |
def __str__(self): | |
return f"{self.description[:50]}" | |
class ControlMapping(models.Model): | |
threat = models.ForeignKey(Threat, on_delete=models.CASCADE, related_name='mappings') | |
iso_control = models.ForeignKey(ISO27002Control, on_delete=models.CASCADE, related_name='mappings', null=True, blank=True) | |
isa_iec = models.ForeignKey(ISAIECRequirement, on_delete=models.CASCADE, related_name='mappings', null=True, blank=True) | |
nist_csf = models.ForeignKey(NISTCSF, on_delete=models.CASCADE, related_name='mappings', null=True, blank=True) | |
class Meta: | |
unique_together = ('threat', 'iso_control', 'isa_iec', 'nist_csf') | |
def __str__(self): | |
return f"{self.threat} | {self.iso_control} | {self.isa_iec}" | |
class Activity(models.Model): | |
ACTIVITY_TYPES = ( | |
('add', 'Added'), | |
('update', 'Updated'), | |
('delete', 'Deleted'), | |
('create', 'Created'), | |
('fetch', 'Fetched Data'), | |
) | |
activity_type = models.CharField(max_length=20, choices=ACTIVITY_TYPES) | |
category = models.CharField(max_length=100, help_text="e.g., 'CMMC 2.0 Standard', 'Mastercard'") | |
description = models.TextField() | |
timestamp = models.DateTimeField(default=timezone.now) | |
source = models.CharField(max_length=100, blank=True, null=True, help_text="Source of data if applicable") | |
class Meta: | |
ordering = ['-timestamp'] | |
verbose_name_plural = "Activities" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import re | |
import requests | |
from core.models import NISTCSF, Activity, ISAIECRequirement, ISO27002Control | |
import zipfile | |
from os.path import isfile, join | |
import pandas as pd | |
from core.models import Threat, ControlMapping | |
from django.db.models import Q | |
THREATS = [ | |
"Adware", "ARP Spoofing", "ATM Skimming", "Baiting", "Botnets", | |
"Brute Force Attack", "Business Email Compromise", "Cloud Data Leakage", | |
"Cloud Misconfiguration", "Command Injection", | |
"Compromised Hardware", "Container Escape", "Credential Stuffing", | |
"Cross-Site Request Forgery", "Cross-Site Scripting", "Denial of Service", | |
"Dictionary Attack", "Distributed Denial of Service", "DNS Spoofing Poisoning", | |
"Eavesdropping", | |
"Firmware Exploits", "Hyperjacking", "IoT Botnets", "Keyloggers", | |
"Malicious Insider", "Man-in-the-Middle", "Negligent Insider", "Phishing", | |
"Port Scanning", "Pretexting", | |
"Privilege Escalation", "Rainbow Table Attack", "Ransomware", | |
"Remote Code Execution", "Rootkits", "Side-Channel Attacks", | |
"Smishing (SMS Phishing)", "Software Dependency Attacks", "Spear Phishing", | |
"Spyware", | |
"SQL Injection", "Third-Party Risks", "Third-Party Software Exploits", | |
"Trojans", "Unsecured IoT Devices", "USB-Based Attacks", | |
"Vishing (Voice Phishing)", "Viruses", "Whaling", "Worms", | |
"Zero-Day Exploits" | |
] | |
def write_threats_to_json(): | |
""" | |
Fetches threat data from the NVD API and writes it to JSON files. | |
Each threat type will be saved in a separate JSON file. | |
""" | |
os.makedirs("db_threats", exist_ok=True) | |
for threat in THREATS: | |
filename = f"db_threats/{threat}.json" | |
if os.path.exists(filename): | |
print(f"Skipping {threat} — file already exists.") | |
continue | |
print(f"Searching for {threat}") | |
items = search_values(threat) | |
with open(filename, "w", encoding="utf-8") as f: | |
json.dump(items, f, indent=2, ensure_ascii=False) | |
print(f"Wrote {len(items)} items to {filename}") | |
def unzip_data(): | |
zip_dir = 'zip' | |
unzip_dir = 'json' | |
os.makedirs(unzip_dir, exist_ok=True) | |
for filename in os.listdir(zip_dir): | |
if filename.endswith('.zip'): | |
zip_path = os.path.join(zip_dir, filename) | |
print(f"Extracting: {filename}") | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(unzip_dir) | |
print(f"Extracted {filename} to {unzip_dir}/") | |
except zipfile.BadZipFile as e: | |
print(f"Failed to extract {filename}: {e}") | |
print("Extraction complete.") | |
def contains_word(s, w): | |
return ('' + w.lower() + '') in ('' + s.lower() + '') | |
def create_nvd_dict(year): | |
filename = join(f"json/nvdcve-1.1-{str(year)}.json") | |
if not isfile(filename): | |
print(f"File not found: {filename}") | |
return None | |
# print("Opening: " + filename) | |
with open(filename, encoding='utf8') as json_file: | |
return json.load(json_file) | |
def search_values(expression): | |
_list = os.listdir("json/") | |
number_files = len(_list) | |
matching_reports = [] | |
for year in range(2020, 2026): | |
cve_dict = create_nvd_dict(year) | |
if not cve_dict: | |
print(f"Skipping year {year} due to missing file.") | |
continue | |
CVE_Items = cve_dict['CVE_Items'] | |
for item in CVE_Items: | |
description_list = item['cve']['description']['description_data'] | |
if description_list: | |
description = description_list[0]['value'] | |
if contains_word(description, expression): | |
cvss_v2_score = item.get('impact', {}).get('baseMetricV2', {}).get('cvssV2', {}).get('baseScore', None) | |
if cvss_v2_score is None: | |
cvss_v2_score = item.get('impact', {}).get('baseMetricV3', {}).get('cvssV3', {}).get('baseScore', None) | |
result = { | |
'threat': expression, | |
'CVE_ID': item['cve']['CVE_data_meta']['ID'], | |
'description': description, | |
'published_date': item.get('publishedDate', ''), | |
'year': year, | |
'references': [ref['url'] for ref in item['cve']['references']['reference_data']], | |
'severity':cvss_v2_score | |
} | |
matching_reports.append(result) | |
return (matching_reports) | |
def get_nvd_data(): | |
base_url = 'https://nvd.nist.gov/feeds/json/cve/1.1/' | |
filenames = ['nvdcve-1.1-2025.json.zip', 'nvdcve-1.1-2024.json.zip', 'nvdcve-1.1-2023.json.zip', | |
'nvdcve-1.1-2022.json.zip'] | |
# filenames = re.findall(r"nvdcve-1.1-[0-9]+\.json\.zip", response.text) | |
headers = {'User-Agent': 'Mozilla/5.0'} | |
os.makedirs('zip', exist_ok=True) | |
for filename in filenames: | |
print(f"Downloading: {filename}") | |
zip_url = f"{base_url}{filename}" | |
try: | |
response = requests.get(zip_url, stream=True, headers=headers) | |
response.raise_for_status() | |
with open(os.path.join('zip', filename), 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
except requests.RequestException as e: | |
print(f"Failed to download {filename}: {e}") | |
print("Download complete.") | |
def log_activity(activity_type, category, description, source=None): | |
""" | |
Utility function to log activities throughout the application | |
Args: | |
activity_type (str): Type of activity ('add', 'update', 'delete', 'create', 'fetch') | |
category (str): Category of the activity (e.g., 'CMMC 2.0 Standard') | |
description (str): Detailed description of the activity | |
source (str, optional): Source of the activity (e.g., API URL, user) | |
Returns: | |
Activity: The created Activity instance | |
""" | |
return Activity.objects.create( | |
activity_type=activity_type, | |
category=category, | |
description=description, | |
source=source | |
) | |
def process_control_mapping(file): | |
file_name = file.name.lower() | |
print(f"Processing file: {file_name}") | |
if not file_name.endswith('.xlsx'): | |
print("Unsupported file format.") | |
return | |
try: | |
df = pd.read_excel(file, engine='openpyxl') | |
print(f"Number of rows loaded: {len(df)}") | |
records = df.to_dict('records') | |
to_create = [] | |
print('records: ', records) | |
for row in records: | |
# Normalize field access | |
threat_name = row.get('Threat') or row.get('threat') or row.get('Threat Name') or row.get('Threat Name ') | |
threat_abbreviation = row.get('Threat Abbreviation') or row.get('threat abbreviation') | |
iso_27001_id = row.get('ISO 27001:2022 CONTROL ID') | |
iso_27001_control_name = row.get('ISO 27001:2022 Control Name') | |
isa_iec_62443_requirement = row.get('ISA/IEC 62443-3-3 Requirement') | |
nist_csf_value = row.get('NIST CSF') | |
print(f"Processing row with threat: {threat_name}") | |
if not threat_name: | |
print(f"Missing required fields in threat name: {row}") | |
continue | |
try: | |
# Find threat by name only | |
threat_name = threat_name.strip() | |
threat = Threat.objects.filter( | |
Q(name__iexact=threat_name) | |
).first() | |
# Update abbreviation if needed | |
if threat and threat_abbreviation and threat.abbreviation != threat_abbreviation: | |
threat.abbreviation = threat_abbreviation | |
threat.save() | |
print(f"Updated threat abbreviation: {threat_name} to {threat_abbreviation}") | |
# Create threat if it doesn't exist | |
if not threat: | |
print(f"Threat '{threat_name}' not found, creating new threat") | |
continue | |
# Convert ISO ID to string to avoid floating point issues | |
iso_id_str = str(iso_27001_id) if iso_27001_id and not pd.isna(iso_27001_id) else None | |
# Get or create ISO control | |
if iso_id_str and iso_27001_control_name: | |
iso2700, iso_created = ISO27002Control.objects.get_or_create( | |
control_id=iso_id_str, | |
defaults={"name": iso_27001_control_name} | |
) | |
if iso_created: | |
print(f"Created new ISO control: {iso_id_str}") | |
else: | |
iso2700 = None | |
# Get or create ISA/IEC requirement | |
if isa_iec_62443_requirement and not pd.isna(isa_iec_62443_requirement): | |
isa_iec_62443_requirement = str(isa_iec_62443_requirement) | |
else: | |
isa_iec_62443_requirement = None | |
if isa_iec_62443_requirement: | |
isa_iec, isa_created = ISAIECRequirement.objects.get_or_create( | |
requirement=isa_iec_62443_requirement | |
) | |
if isa_created: | |
print(f"Created new ISA/IEC requirement: {isa_iec_62443_requirement}") | |
else: | |
isa_iec = None | |
# Handle NIST CSF (which can be None) | |
nist_csf = None | |
if nist_csf_value and not pd.isna(nist_csf_value): | |
nist_csf, nist_created = NISTCSF.objects.get_or_create( | |
description=nist_csf_value | |
) | |
if nist_created: | |
print(f"Created new NIST CSF: {nist_csf_value}") | |
else: | |
nist_csf = None | |
print(f"Resolved objects: threat={threat}, iso={iso2700}, isa_iec={isa_iec}, nist_csf={nist_csf}") | |
# Verify required objects exist | |
# if not all([threat, iso2700, isa_iec]): | |
# print(f"Missing required objects for row: {row}") | |
# continue | |
print(f'Processing mapping for threat: {isa_iec}') | |
# Check for existing mapping | |
existing_mapping = ControlMapping.objects.filter( | |
threat=threat, | |
iso_control=iso2700, | |
isa_iec=isa_iec | |
).first() | |
if existing_mapping: | |
print(f"Mapping already exists for {threat_name} - {iso_27001_control_name}") | |
continue | |
# Add to bulk creation list | |
mapping = ControlMapping( | |
threat=threat, | |
iso_control=iso2700, | |
isa_iec=isa_iec, | |
nist_csf=nist_csf | |
) | |
to_create.append(mapping) | |
print(f"Prepared mapping for creation: {threat_name} - {iso_27001_control_name}") | |
except Exception as e: | |
print(f"Error processing row {row}: {str(e)}") | |
continue | |
# Bulk create mappings | |
if to_create: | |
print('in the to create: ', to_create) | |
ControlMapping.objects.bulk_create(to_create) | |
print(f"✅ Successfully created {len(to_create)} new control mappings.") | |
else: | |
print("No new mappings to create.") | |
except Exception as e: | |
print(f"Error processing file: {str(e)}") | |
def get_2700_data(threat): | |
qs = ControlMapping.objects.filter( | |
threat__name=threat.name, | |
iso_control__isnull=False | |
).values( | |
'iso_control__control_id', 'iso_control__name' | |
) | |
seen = set() | |
results = [] | |
for item in qs: | |
key = (item['iso_control__control_id'], item['iso_control__name']) | |
if key not in seen: | |
seen.add(key) | |
results.append({ | |
"control_id": item['iso_control__control_id'] or '', | |
"name": item['iso_control__name'], | |
}) | |
return results | |
def get_isaiec_data(threat): | |
qs = ControlMapping.objects.filter( | |
threat__name=threat.name, | |
isa_iec__isnull=False | |
).values( | |
'isa_iec__requirement' | |
) | |
seen = set() | |
results = [] | |
for item in qs: | |
requirement = item['isa_iec__requirement'] | |
if requirement not in seen: | |
seen.add(requirement) | |
results.append({"requirement": requirement}) | |
return results | |
def get_nist_data(threat): | |
qs = ControlMapping.objects.filter( | |
threat__name=threat.name, | |
nist_csf__isnull=False | |
).values( | |
'nist_csf__description' | |
) | |
seen = set() | |
results = [] | |
for item in qs: | |
description = item['nist_csf__description'] | |
if description not in seen: | |
seen.add(description) | |
results.append({"description": description}) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment