Skip to content

Instantly share code, notes, and snippets.

@craftsman-expert
Last active January 23, 2025 09:54
Show Gist options
  • Save craftsman-expert/6e17febf333d99a3d5bc92e39a72b350 to your computer and use it in GitHub Desktop.
Save craftsman-expert/6e17febf333d99a3d5bc92e39a72b350 to your computer and use it in GitHub Desktop.
import time
import re
import subprocess
from collections import defaultdict
import requests
from datetime import datetime, timedelta
NGINX_LOG = "/var/log/nginx/access.log"
BLOCK_FILE = "/etc/nginx/vhosts-includes/bots-block.conf"
LIMIT = 500
BLOCK_DURATION = 1800
TELEGRAM_BOT_TOKEN = "8190209959:AAEWat2nyd2PZwNNdxF5JNOTbuZ-caWEq8g"
TELEGRAM_CHAT_ID = "-1002234113662"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
WHITELIST_USER_AGENTS = [
"Googlebot",
"YandexBot",
"YandexMetrika",
"Mail.RU_Bot",
"Bingbot",
"DuckDuckBot",
"Yahoo! Slurp",
"SputnikBot",
"RamblerBot"
]
blocked_ips_cache = set()
def log_message(message, level="INFO"):
colors = {
"INFO": "\033[94m", # Blue
"WARNING": "\033[93m", # Yellow
"ERROR": "\033[91m", # Red
"ENDC": "\033[0m" # Reset
}
color = colors.get(level, "\033[0m")
print(f"{color}[{level}] {message}{colors['ENDC']}")
def send_telegram_message(message):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(url, json=payload)
if response.status_code != 200:
log_message(f"Ошибка при отправке сообщения в Телеграм: {response.text}", "ERROR")
except Exception as e:
log_message(f"Ошибка при соединении с Телеграм: {e}", "ERROR")
def block_ip(ip, user_agent):
if ip in blocked_ips_cache:
log_message(f"IP {ip} уже заблокирован.", "WARNING")
return
try:
timestamp = datetime.now().strftime(DATE_FORMAT)
with open(BLOCK_FILE, "a") as f:
f.write(f"deny {ip}; # Заблокирован: {timestamp}\n")
subprocess.run(["sudo", "nginx", "-t"], check=True)
subprocess.run(["sudo", "nginx", "-s", "reload"], check=True)
blocked_ips_cache.add(ip)
message = f"IP {ip} заблокирован за превышение лимита запросов на 30 минут.\nUser-Agent: {user_agent}"
send_telegram_message(message)
log_message(f"IP {ip} был заблокирован. User-Agent: {user_agent}", "INFO")
except Exception as e:
log_message(f"Ошибка при блокировке IP {ip}: {e}", "ERROR")
def remove_expired_blocks():
try:
now = datetime.now()
updated_lines = []
removed_ips = []
with open(BLOCK_FILE, "r") as f:
for line in f:
if "# Заблокирован:" in line:
match = re.search(r"# Заблокирован: (.+)$", line)
if match:
block_time = datetime.strptime(match.group(1).strip(), DATE_FORMAT)
if now - block_time > timedelta(seconds=BLOCK_DURATION):
ip_match = re.match(r"deny (\d+\.\d+\.\d+\.\d+);", line)
if ip_match:
removed_ips.append(ip_match.group(1))
blocked_ips_cache.discard(ip_match.group(1))
continue
updated_lines.append(line)
if updated_lines:
with open(BLOCK_FILE, "w") as f:
f.writelines(updated_lines)
subprocess.run(["sudo", "nginx", "-t"], check=True)
subprocess.run(["sudo", "nginx", "-s", "reload"], check=True)
if removed_ips:
send_telegram_message(f"Разблокированы IP-адреса: {', '.join(removed_ips)}")
log_message(f"Разблокированы IP-адреса: {', '.join(removed_ips)}", "INFO")
except Exception as e:
log_message(f"Ошибка при удалении блокировок: {e}", "ERROR")
def send_block_summary():
try:
blocked_ips = []
with open(BLOCK_FILE, "r") as f:
for line in f:
if "deny" in line and "# Заблокирован:" in line:
ip_match = re.match(r"deny (\d+\.\d+\.\d+\.\d+);", line)
if ip_match:
blocked_ips.append(ip_match.group(1))
total_blocked = len(blocked_ips)
if blocked_ips:
if total_blocked > 10:
displayed_ips = blocked_ips[:10]
message = (f"Текущие заблокированные IP-адреса (показаны 10 из {total_blocked}):\n"
f"{', '.join(displayed_ips)}")
else:
message = f"Текущие заблокированные IP-адреса:\n{', '.join(blocked_ips)}"
send_telegram_message(message)
log_message(f"Отправлена сводка: {message}", "INFO")
else:
send_telegram_message("Нет заблокированных IP-адресов.")
log_message("Нет заблокированных IP-адресов.", "INFO")
except Exception as e:
log_message(f"Ошибка при отправке сводки блокировок: {e}", "ERROR")
def monitor_logs():
ip_counts = defaultdict(lambda: {"count": 0, "user_agent": ""})
last_check_time = time.time()
with open(NGINX_LOG, "r") as log_file:
log_file.seek(0, 2)
log_message("Мониторинг логов Nginx запущен.", "INFO")
while True:
line = log_file.readline()
if not line:
time.sleep(0.1)
continue
match = re.match(r'^(\d+\.\d+\.\d+\.\d+).+\"[A-Z]+ [^\"]+\" \d+ \d+ \"[^\"]*\" \"([^\"]+)\"', line)
if match:
ip = match.group(1)
user_agent = match.group(2)
if any(bot in user_agent for bot in WHITELIST_USER_AGENTS):
log_message(f"Игнорируется User-Agent из белого списка: {user_agent}", "INFO")
continue
ip_counts[ip]["count"] += 1
ip_counts[ip]["user_agent"] = user_agent
current_time = time.time()
if current_time - last_check_time >= 60:
log_message("Проверка лимитов запросов.", "INFO")
for ip, data in list(ip_counts.items()):
if data["count"] > LIMIT:
block_ip(ip, data["user_agent"])
del ip_counts[ip]
last_check_time = current_time
remove_expired_blocks()
if __name__ == "__main__":
try:
remove_expired_blocks()
send_telegram_message("Скрипт мониторинга логов Nginx запущен.")
send_block_summary()
monitor_logs()
except KeyboardInterrupt:
log_message("Мониторинг остановлен.", "WARNING")
pass
@craftsman-expert
Copy link
Author

Скрипт используется для автоматической защиты веб-сервера Nginx от DoS-атак и подозрительной активности. Он выявляет IP-адреса, которые отправляют слишком много запросов за короткое время, и временно блокирует их. Это позволяет снизить нагрузку на сервер и защитить его от перегрузок.

  1. Защита Nginx от DoS-атак:

    • Автоматически блокирует IP-адреса, превышающие заданный лимит запросов (500 запросов в минуту).
  2. Управление блокировкой:

    • Добавляет IP-адреса в конфигурационный файл блокировок Nginx.
    • Применяет изменения с помощью мягкой перезагрузки Nginx, без остановки сервера.
  3. Удаление устаревших блокировок:

    • Автоматически снимает блокировку через 30 минут, чтобы освободить место для новых записей.
  4. Интеграция с Telegram:

    • Уведомляет администратора о заблокированных и разблокированных IP.
    • Отправляет сводку заблокированных адресов при запуске скрипта.
  5. Белый список:

    • Исключает поисковых роботов (Googlebot, YandexBot и др.) из обработки, чтобы избежать их блокировки.
  6. Отслеживание логов Nginx:

    • Постоянно мониторит файл логов /var/log/nginx/access.log для выявления подозрительных IP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment