Created
June 7, 2025 15:16
-
-
Save vporton/9a92e414ed963c7c4e755b971f513999 to your computer and use it in GitHub Desktop.
Protect from DoS attacks: Script to allow (by UFW firewall) your server HTTPS (port 443) to be accessed only from CloudFront
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re | |
import subprocess | |
import json | |
import urllib.request | |
import shutil | |
import os | |
import sys | |
# Variables | |
PROTOCOL = "tcp" | |
PORT = "443" | |
IP_RANGES_FILE = "cloudfront_ips.json" | |
UFW_RULE_COMMENT = "CloudFront HTTPS" | |
def run_command(command, check=True): | |
"""Run a shell command and handle errors.""" | |
result = subprocess.run(command, shell=True, capture_output=True, text=True) | |
if check and result.returncode != 0: | |
print(f"Error: {result.stderr.strip()}") | |
return False | |
return result | |
def check_ufw_installed(): | |
"""Check if UFW is installed.""" | |
if not shutil.which("/usr/sbin/ufw"): | |
print("Error: UFW is not installed. Install it with 'sudo apt-get install ufw'.") | |
sys.exit(1) | |
def set_default_policy(): | |
"""Set default incoming policy to allow.""" | |
print("Setting default incoming policy...") | |
if run_command("sudo ufw default allow incoming"): | |
print("Default incoming policy set to allow.") | |
else: | |
print("Failed to set default incoming policy.") | |
sys.exit(1) | |
def get_current_ufw_rules(): | |
"""Get current UFW rules for the specified port and protocol.""" | |
result = run_command("sudo ufw status numbered") | |
if not result: | |
return [] | |
rules = [] | |
for line in result.stdout.splitlines(): | |
if f"{PORT}/{PROTOCOL}" in line and UFW_RULE_COMMENT in line and line.find(' (v6) ') == -1: | |
rule_number = line.split("]")[0].strip("[").strip() | |
ip_range = re.split(" +", re.sub(r"^\[.*?\] +", "", line))[3].strip() | |
#print('ip_range:', ip_range) | |
if ip_range.find("/") == -1: | |
ip_range = ip_range + "/32" | |
rules.append({"number": rule_number, "ip_range": ip_range}) | |
return rules | |
def fetch_cloudfront_ips(): | |
"""Fetch CloudFront IP ranges from AWS.""" | |
print("Fetching CloudFront IP ranges...") | |
try: | |
with urllib.request.urlopen("https://d7uri8nf7uskq.cloudfront.net/tools/list-cloudfront-ips") as response: | |
with open(IP_RANGES_FILE, "w") as f: | |
f.write(response.read().decode()) | |
with open(IP_RANGES_FILE) as f: | |
data = json.load(f) | |
return data.get("CLOUDFRONT_GLOBAL_IP_LIST", []) + data.get("CLOUDFRONT_REGIONAL_EDGE_IP_LIST", []) | |
except Exception as e: | |
print(f"Error: Could not retrieve CloudFront IP ranges: {e}") | |
return None | |
finally: | |
if os.path.exists(IP_RANGES_FILE): | |
os.remove(IP_RANGES_FILE) | |
def delete_obsolete_rules(current_rules, cloudfront_ips): | |
"""Delete UFW rules for IPs not in the CloudFront IP list.""" | |
print(f"Checking for obsolete UFW rules for {PORT}/{PROTOCOL}...") | |
for rule in current_rules: | |
if rule["ip_range"] not in cloudfront_ips: | |
print(f"Deleting rule #{rule['number']} for {rule['ip_range']}") | |
if not run_command(f"sudo ufw --force delete {rule['number']}"): | |
print(f"Failed to delete rule #{rule['number']}") | |
else: | |
print(f"Deleted rule #{rule['number']} for {rule['ip_range']}") | |
def add_missing_rules(current_rules, cloudfront_ips): | |
"""Add UFW rules for missing CloudFront IP ranges.""" | |
print(f"Adding missing UFW rules for {PORT}/{PROTOCOL}...") | |
current_ips = {rule["ip_range"] for rule in current_rules} | |
for ip_range in cloudfront_ips: | |
if ip_range not in current_ips: | |
print(f"Adding rule for {ip_range}") | |
command = f"sudo ufw allow from {ip_range} to any port {PORT} proto {PROTOCOL} comment '{UFW_RULE_COMMENT}'" | |
if run_command(command): | |
print(f"Added rule for {ip_range}") | |
else: | |
print(f"Failed to add rule for {ip_range}") | |
def deny_other_ips(): | |
"""Deny TCP port for all other IPs if not already denied.""" | |
result = run_command("sudo ufw status") | |
if f"{PORT}/{PROTOCOL}" not in result.stdout or "DENY" not in result.stdout: | |
print(f"Denying TCP port {PORT} for all other IPs...") | |
if run_command(f"sudo ufw deny {PORT}/{PROTOCOL}"): | |
print(f"Denied TCP port {PORT} for non-CloudFront IPs.") | |
else: | |
print(f"Failed to deny TCP port {PORT} for non-CloudFront IPs.") | |
def main(): | |
check_ufw_installed() | |
set_default_policy() | |
current_rules = get_current_ufw_rules() | |
current_rules.reverse() | |
cloudfront_ips = fetch_cloudfront_ips() | |
# if not cloudfront_ips: | |
# sys.exit(1) | |
delete_obsolete_rules(current_rules, cloudfront_ips) | |
add_missing_rules(current_rules, cloudfront_ips) | |
deny_other_ips() | |
print("Current UFW status:") | |
run_command("sudo ufw status", check=False) | |
print("UFW configuration completed.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment