Last active
March 14, 2025 15:42
-
-
Save leodido/c06cd39c80330a6ff24409afe1349d92 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -eu | |
# Define variables | |
INSTALL_DIR="/usr/local/bin" | |
NAME="update_hetzner_github_hooks_ips" | |
SCRIPT_NAME="$NAME.py" | |
SCRIPT_PATH="$INSTALL_DIR/$SCRIPT_NAME" | |
SERVICE_FILE="/etc/systemd/system/$NAME.service" | |
TIMER_FILE="/etc/systemd/system/$NAME.timer" | |
DATA_DIR="/var/lib/$NAME" | |
LOG_FILE="/var/log/$NAME.log" | |
CONFIG_FILE="$DATA_DIR/config.json" | |
# Create the data directory | |
echo "Creating data directory at $DATA_DIR..." | |
sudo mkdir -p "$DATA_DIR" | |
sudo chown root:root "$DATA_DIR" | |
sudo chmod 700 "$DATA_DIR" | |
# Create the log directory if it doesn't exist | |
echo "Creating log directory for $LOG_FILE..." | |
sudo mkdir -p "$(dirname "$LOG_FILE")" | |
sudo touch "$LOG_FILE" | |
sudo chown root:root "$LOG_FILE" | |
sudo chmod 600 "$LOG_FILE" | |
# Create config file template if it doesn't exist | |
if [[ ! -f "$CONFIG_FILE" ]]; then | |
echo "Creating configuration file template at $CONFIG_FILE..." | |
cat << EOF | sudo tee "$CONFIG_FILE" > /dev/null | |
{ | |
"HETZNER_API_TOKEN": "YOUR_HETZNER_API_TOKEN", | |
"HETZNER_FIREWALL_ID": "YOUR_HETZNER_FIREWALL_ID", | |
"PORTS": ["YOUR_PORT"] | |
} | |
EOF | |
sudo chmod 600 "$CONFIG_FILE" | |
echo "Please edit $CONFIG_FILE and add your Hetzner API token and firewall ID." | |
fi | |
# Check if Python 3 is installed | |
if ! command -v python3 &> /dev/null; then | |
echo "Python 3 is not installed. Installing Python 3..." | |
if command -v apt-get &> /dev/null; then | |
# Debian/Ubuntu | |
echo "Detected Debian/Ubuntu system. Installing required packages..." | |
sudo apt-get update | |
sudo apt-get install -y python3 python3-pip | |
elif command -v dnf &> /dev/null; then | |
# Fedora/RHEL/CentOS 8+ | |
sudo dnf install -y python3 python3-venv | |
elif command -v yum &> /dev/null; then | |
# CentOS/RHEL 7 | |
sudo yum install -y python3 python3-venv | |
elif command -v zypper &> /dev/null; then | |
# openSUSE | |
sudo zypper install -y python3 python3-venv | |
elif command -v pacman &> /dev/null; then | |
# Arch Linux | |
sudo pacman -S --noconfirm python python-virtualenv | |
else | |
echo "Unable to install Python 3 automatically. Please install Python 3 and venv manually." | |
exit 1 | |
fi | |
fi | |
# For Debian/Ubuntu systems, ensure we have the right venv package | |
if command -v apt-get &> /dev/null; then | |
echo "Installing Python virtual environment package..." | |
# First try the generic package | |
sudo apt-get install -y python3-venv | |
# If that doesn't work, try version-specific package | |
if ! python3 -m venv --help &>/dev/null; then | |
# Get Python version | |
PY_VER=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")') | |
echo "Installing Python $PY_VER virtual environment package..." | |
sudo apt-get install -y python$PY_VER-venv | |
# Still not working? Try some common versions | |
if ! python3 -m venv --help &>/dev/null; then | |
echo "Trying common Python venv packages..." | |
for ver in 3.8 3.9 3.10 3.11 3.12; do | |
echo "Trying python$ver-venv..." | |
sudo apt-get install -y python$ver-venv || true | |
done | |
fi | |
fi | |
fi | |
# Create a virtual environment for our application | |
VENV_DIR="$DATA_DIR/venv" | |
echo "Creating Python virtual environment at $VENV_DIR..." | |
sudo python3 -m venv "$VENV_DIR" | |
# Install required Python packages in the virtual environment | |
echo "Installing required Python packages in virtual environment..." | |
sudo "$VENV_DIR/bin/pip" install requests | |
# Create update_hetzner_github_hooks_ips.py script | |
cat << EOF | sudo tee "$SCRIPT_PATH" > /dev/null | |
#!/usr/bin/env python3 | |
import requests | |
import json | |
import logging | |
import sys | |
import os | |
from datetime import datetime | |
# Set up logging | |
LOG_FILE = "${LOG_FILE}" | |
DATA_DIR = "${DATA_DIR}" | |
CONFIG_FILE = f"{DATA_DIR}/config.json" | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(LOG_FILE), | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger('hetzner_github_hooks_updater') | |
def load_config(): | |
"""Load configuration from the config file.""" | |
try: | |
with open(CONFIG_FILE, 'r') as file: | |
config = json.load(file) | |
# Validate required fields | |
if not config.get('HETZNER_API_TOKEN') or config.get('HETZNER_API_TOKEN') == "YOUR_HETZNER_API_TOKEN": | |
logger.error(f"Missing or default API token in {CONFIG_FILE}. Please set your Hetzner API token.") | |
sys.exit(1) | |
if not config.get('HETZNER_FIREWALL_ID') or config.get('HETZNER_FIREWALL_ID') == "YOUR_HETZNER_FIREWALL_ID": | |
logger.error(f"Missing or default firewall ID in {CONFIG_FILE}. Please set your Hetzner Firewall ID.") | |
sys.exit(1) | |
# Check if PORTS is missing or contains placeholder values | |
if not config.get('PORTS'): | |
logger.error(f"Missing PORTS configuration in {CONFIG_FILE}. Please specify at least one port.") | |
sys.exit(1) | |
# Convert ports to list if it's not already | |
if not isinstance(config['PORTS'], list): | |
logger.info("Converting single port to list format") | |
config['PORTS'] = [config['PORTS']] | |
# Check for placeholder port values | |
placeholder_ports = ["YOUR_PORT", ""] | |
config['PORTS'] = [p for p in config['PORTS'] if str(p) not in placeholder_ports] | |
if not config['PORTS']: | |
logger.error(f"No valid ports specified in {CONFIG_FILE}. Please specify at least one port.") | |
sys.exit(1) | |
return config | |
except FileNotFoundError: | |
logger.error(f"Configuration file not found at {CONFIG_FILE}") | |
sys.exit(1) | |
except json.JSONDecodeError: | |
logger.error(f"Invalid JSON in configuration file {CONFIG_FILE}") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Error loading configuration: {str(e)}") | |
sys.exit(1) | |
def get_github_hook_ips(): | |
"""Retrieve GitHub webhook IP ranges.""" | |
url = "https://api.github.com/meta" | |
logger.info(f"Getting IPs from {url}") | |
try: | |
response = requests.get(url, timeout=30) | |
response.raise_for_status() | |
meta_data = response.json() | |
# Extract only the webhook IPs | |
hook_ips = meta_data.get('hooks', []) | |
logger.info(f"Retrieved {len(hook_ips)} webhook IP ranges from GitHub") | |
return hook_ips | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to retrieve GitHub webhook IP ranges: {str(e)}") | |
sys.exit(1) | |
except json.JSONDecodeError: | |
logger.error("Failed to parse GitHub API response as JSON") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Unexpected error retrieving GitHub IPs: {str(e)}") | |
sys.exit(1) | |
def whitelist_ips_in_hetzner(ip_ranges, config): | |
"""Update Hetzner firewall rules with the provided IP ranges.""" | |
headers = { | |
'Authorization': f'Bearer {config["HETZNER_API_TOKEN"]}', | |
'Content-Type': 'application/json', | |
} | |
# Get ports from config, defaulting to [8000] if not specified | |
ports = config.get("PORTS", [8000]) | |
# Validate ports configuration | |
if not isinstance(ports, list): | |
logger.warning("PORTS configuration is not a list. Converting single port to list.") | |
ports = [ports] | |
if not ports: | |
logger.error("No ports specified in configuration") | |
sys.exit(1) | |
# Create rules for each port | |
rules = [] | |
for port in ports: | |
# Convert port to string if it's a number | |
port_str = str(port) | |
rules.append({ | |
"direction": "in", | |
"source_ips": ip_ranges, | |
"port": port_str, | |
"protocol": "tcp", | |
"description": f"Allow GitHub webhook IPs to port {port_str}" | |
}) | |
payload = { | |
"rules": rules | |
} | |
logger.info(f"Updating Hetzner Firewall {config['HETZNER_FIREWALL_ID']} with {len(ip_ranges)} IP ranges on ports {config['PORTS']}") | |
try: | |
url = f"https://api.hetzner.cloud/v1/firewalls/{config['HETZNER_FIREWALL_ID']}/actions/set_rules" | |
logger.debug(f"Sending request to: {url}") | |
response = requests.post( | |
url, | |
headers=headers, | |
data=json.dumps(payload), | |
timeout=60 | |
) | |
response.raise_for_status() | |
ports_str = ', '.join(str(p) for p in config['PORTS']) | |
logger.info(f"GitHub webhook IPs whitelisted successfully in Hetzner Firewall for ports {ports_str}") | |
# Save the current IPs for reference | |
save_current_ips(ip_ranges) | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to whitelist IPs in Hetzner Firewall: {str(e)}") | |
if hasattr(e, 'response') and e.response: | |
logger.error(f"Response: {e.response.text}") | |
sys.exit(1) | |
def save_current_ips(ip_ranges): | |
"""Save the current IP ranges to a file for reference.""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{DATA_DIR}/github_hooks_ips_{timestamp}.txt" | |
try: | |
with open(filename, 'w') as file: | |
for ip in ip_ranges: | |
file.write(f"{ip}\n") | |
logger.info(f"Saved current IP ranges to {filename}") | |
# Create a symlink to the latest file | |
latest_link = f"{DATA_DIR}/github_hooks_ips_latest.txt" | |
if os.path.exists(latest_link): | |
os.remove(latest_link) | |
os.symlink(filename, latest_link) | |
except Exception as e: | |
logger.error(f"Failed to save current IP ranges: {str(e)}") | |
def cleanup_old_files(): | |
"""Clean up old IP files, keeping only the 5 most recent.""" | |
try: | |
files = [f for f in os.listdir(DATA_DIR) if f.startswith('github_hooks_ips_') and f.endswith('.txt')] | |
files.sort(reverse=True) | |
# Keep only the 5 most recent files | |
for old_file in files[5:]: | |
os.remove(os.path.join(DATA_DIR, old_file)) | |
logger.info(f"Cleaned up old file: {old_file}") | |
except Exception as e: | |
logger.error(f"Error during cleanup: {str(e)}") | |
def main(): | |
"""Main function to update GitHub webhook IPs in Hetzner Firewall.""" | |
logger.info("Starting GitHub webhook IPs updater for Hetzner Firewall") | |
try: | |
# Load configuration | |
config = load_config() | |
# Get GitHub webhook IPs | |
github_hook_ips = get_github_hook_ips() | |
# Log the IPs we're whitelisting | |
logger.info(f"Whitelisting {len(github_hook_ips)} GitHub webhook IP ranges") | |
for ip in github_hook_ips: | |
logger.debug(f"IP range: {ip}") | |
# Update Hetzner Firewall | |
whitelist_ips_in_hetzner(github_hook_ips, config) | |
# Clean up old files | |
cleanup_old_files() | |
logger.info("GitHub webhook IPs updater completed successfully") | |
except Exception as e: | |
logger.error(f"Unexpected error: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |
EOF | |
# Make the script executable | |
echo "Setting execute permissions for $SCRIPT_PATH..." | |
sudo chmod +x "$SCRIPT_PATH" | |
# Create systemd service file | |
echo "Writing systemd service file: $SERVICE_FILE..." | |
cat << EOF | sudo tee "$SERVICE_FILE" > /dev/null | |
[Unit] | |
Description=Update GitHub Webhook IPs in Hetzner Firewall | |
After=network-online.target | |
Wants=network-online.target | |
[Service] | |
Type=oneshot | |
ExecStart=$VENV_DIR/bin/python $SCRIPT_PATH | |
PrivateTmp=true | |
ProtectSystem=full | |
ProtectHome=true | |
NoNewPrivileges=true | |
[Install] | |
WantedBy=multi-user.target | |
EOF | |
# Create systemd timer file | |
echo "Writing systemd timer file: $TIMER_FILE..." | |
cat << EOF | sudo tee "$TIMER_FILE" > /dev/null | |
[Unit] | |
Description=Run $NAME.service daily | |
After=network-online.target | |
[Timer] | |
OnCalendar=daily | |
RandomizedDelaySec=1hour | |
Persistent=true | |
[Install] | |
WantedBy=timers.target | |
EOF | |
# Reload systemd daemon to recognize our new service and timer | |
echo "Reloading systemd daemon..." | |
sudo systemctl daemon-reload | |
# Enable the systemd timer | |
echo "Enabling systemd timer..." | |
sudo systemctl enable $NAME.timer | |
# Start the timer | |
echo "Starting the timer..." | |
sudo systemctl start $NAME.timer | |
echo "Installation complete. Please edit $CONFIG_FILE to add your Hetzner API token and firewall ID." | |
echo "You can check the timer status with: systemctl status $NAME.timer" | |
echo "You can start the service manually with: sudo systemctl start $NAME.service" | |
echo "The timer has been started and will run the service automatically daily." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment