Skip to content

Instantly share code, notes, and snippets.

@Saren-Arterius
Last active September 4, 2025 02:37
Show Gist options
  • Save Saren-Arterius/7996592b9a69a427b8d1c1405e3e182e to your computer and use it in GitHub Desktop.
Save Saren-Arterius/7996592b9a69a427b8d1c1405e3e182e to your computer and use it in GitHub Desktop.
import os
import glob
import json
import time
import sqlite3
import subprocess
import re
from openai import OpenAI
import ipaddress
import logging
from collections import defaultdict, deque
import fnmatch
# --- Dependencies ---
# This script requires the inotify_simple library.
# Install it using: pip install inotify-simple
try:
from inotify_simple import INotify, flags
except ImportError:
logging.critical("inotify_simple is not installed. Please run: pip install inotify-simple")
exit(1)
# --- Configuration ---
# NOTE: Adjust these paths and settings to match your environment.
LOG_DIR_GLOB = "/var/log/nginx/saren/wtako.net/*.log"
CONFIG_FILE = "log_monitor_config.json"
DB_FILE = "blacklist.db"
NGINX_DENY_LIST = "/etc/nginx/conf.d/blacklist.conf"
UNBAN_FILE = "/tmp/nginx-unban-ips.txt"
# IMPORTANT: Set your API key as an environment variable for security.
# export OPENROUTER_API_KEY="your_key_here"
API_KEY = os.getenv("OPENROUTER_API_KEY", "")
API_BASE_URL = "https://openrouter.ai/api/v1"
LLM_MODEL = "moonshotai/kimi-k2:free"
# Batching parameters
MAX_QUEUE_SIZE = 100
MAX_WAIT_SECONDS = 60
# Interval for inotify timeout; acts as a fallback check interval.
POLLING_INTERVAL_SECONDS = 5
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- IP Filtering ---
# Private IP ranges to be excluded from logs sent to LLM
PRIVATE_IP_NETWORKS = [
ipaddress.ip_network('10.0.0.0/8'), # Private
ipaddress.ip_network('172.16.0.0/12'), # Private
ipaddress.ip_network('192.168.0.0/16'), # Private
ipaddress.ip_network('100.64.0.0/10'), # Carrier-Grade NAT
ipaddress.ip_network('127.0.0.0/8'), # Loopback
]
# Global variable to store the dynamic public IP (if applicable)
CURRENT_PUBLIC_IP = None
def get_external_ip_from_ifconfig():
"""Tries to get the external IP address by parsing ifconfig output.
Assumes 'ext1' is the interface name.
"""
try:
result = subprocess.run(['ifconfig', 'ext1'], capture_output=True, text=True, check=True)
# Regex to find IPv4 address (e.g., inet 123.45.67.89)
match = re.search(r'inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', result.stdout)
if match:
logging.info(f"Detected external IP from ifconfig ext1: {match.group(1)}")
return match.group(1)
except FileNotFoundError:
logging.warning("ifconfig command not found. Cannot determine external IP via ifconfig.")
except subprocess.CalledProcessError as e:
logging.warning(f"Error running ifconfig ext1: {e.stderr.strip()}")
except Exception as e:
logging.warning(f"Unexpected error when getting external IP from ifconfig: {e}")
return None
def update_current_public_ip():
"""Updates the global CURRENT_PUBLIC_IP variable."""
global CURRENT_PUBLIC_IP
new_ip = get_external_ip_from_ifconfig() # Or requests.get("https://api.ipify.org").text
if new_ip and new_ip != CURRENT_PUBLIC_IP:
logging.info(f"Public IP updated from {CURRENT_PUBLIC_IP} to {new_ip}")
CURRENT_PUBLIC_IP = new_ip
def is_private_ip(ip_str: str) -> bool:
"""Checks if an IP address string is in the defined private/reserved ranges
or matches the current dynamic public IP.
"""
try:
ip = ipaddress.ip_address(ip_str)
if ip.is_multicast or ip.is_unspecified or ip.is_loopback or ip.is_link_local:
return True
for net in PRIVATE_IP_NETWORKS:
if ip in net:
return True
# Check against the current dynamic public IP
if CURRENT_PUBLIC_IP and ip == ipaddress.ip_address(CURRENT_PUBLIC_IP):
return True
except ValueError:
return True # Treat invalid IP strings as private to ignore them
return False
# --- Initialization Functions ---
def initialize_config(config_path: str, log_glob_path: str) -> dict:
"""Creates or loads the monitor configuration file."""
if os.path.exists(config_path):
logging.info(f"Loading existing configuration from {config_path}")
with open(config_path, 'r') as f:
return json.load(f)
logging.info(f"Creating new configuration file at {config_path}")
config = {"files": {}}
log_files = glob.glob(log_glob_path)
if not log_files:
logging.warning(f"No log files found matching pattern: {log_glob_path}")
for log_file in log_files:
if ".error" in os.path.basename(log_file):
logging.info(f"Ignoring error log file: {log_file}")
continue
server_name = os.path.basename(log_file).replace('.log', '')
try:
file_size = os.path.getsize(log_file)
config["files"][log_file] = {
"server_name": server_name,
"cursor": file_size,
"system_prompt_context": f"This server '{server_name}' hosts a public service. Please add specific context here."
}
logging.info(f"Initializing {log_file} with cursor at {file_size} bytes.")
except OSError as e:
logging.error(f"Could not access {log_file}: {e}")
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
return config
def initialize_db(db_path: str) -> sqlite3.Connection:
"""Initializes the SQLite database and blacklist table."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS blacklist (
ip TEXT PRIMARY KEY,
reason_tldr TEXT NOT NULL,
confidence REAL NOT NULL,
timestamp INTEGER NOT NULL,
logs TEXT,
sections TEXT
)
''')
# Add new columns if they don't exist
try:
cursor.execute("ALTER TABLE blacklist ADD COLUMN logs TEXT")
logging.info("Added 'logs' column to blacklist table.")
except sqlite3.OperationalError as e:
if "duplicate column name" not in str(e):
logging.warning(f"Could not add 'logs' column: {e}")
try:
cursor.execute("ALTER TABLE blacklist ADD COLUMN sections TEXT")
logging.info("Added 'sections' column to blacklist table.")
except sqlite3.OperationalError as e:
if "duplicate column name" not in str(e):
logging.warning(f"Could not add 'sections' column: {e}")
conn.commit()
logging.info(f"Database initialized at {db_path}")
return conn
def initialize_unban_file(unban_filepath: str):
"""Creates the unban IP file if it doesn't exist."""
if not os.path.exists(unban_filepath):
try:
with open(unban_filepath, 'w') as f:
pass # Create an empty file
logging.info(f"Created unban file at {unban_filepath}")
except OSError as e:
logging.error(f"Could not create unban file at {unban_filepath}: {e}")
def add_new_file_to_config(config: dict, filepath: str, config_path: str):
"""Adds a newly discovered log file to the running config, starting at cursor 0."""
if filepath in config.get("files", {}):
return # Already tracking
if ".error" in os.path.basename(filepath):
logging.info(f"Ignoring newly discovered error log file: {filepath}")
return
logging.info(f"Discovered new log file via inotify: {filepath}")
server_name = os.path.basename(filepath).replace('.log', '')
try:
# We start at cursor 0 to process the entire content of the new file.
config["files"][filepath] = {
"server_name": server_name,
"cursor": 0,
"system_prompt_context": f"This server '{server_name}' hosts a public service. Please add specific context here."
}
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
except Exception as e:
logging.error(f"Failed to add {filepath} to config: {e}")
# --- Core Logic Functions ---
def read_new_log_lines(filepath: str, last_cursor: int) -> tuple[list[str], int]:
"""Reads new lines from a log file since the last cursor position."""
try:
current_size = os.path.getsize(filepath)
if current_size > last_cursor:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(last_cursor)
new_content = f.read()
# Filter out empty lines that might result from splitting
return [line for line in new_content.strip().split('\n') if line], current_size
elif current_size < last_cursor:
logging.warning(f"Log file is smaller than cursor: {filepath}. It might have been truncated.")
# If file is gone, we reset its cursor to 0 for the potential new file
return [], 0
except FileNotFoundError:
logging.warning(f"Log file not found: {filepath}. It might have been rotated.")
# If file is gone, we reset its cursor to 0 for the potential new file
return [], 0
except Exception as e:
logging.error(f"Error reading {filepath}: {e}")
return [], last_cursor
def format_logs_for_llm(log_batch: list[tuple[str, str]], config_files: dict) -> str:
"""Formats a batch of logs for the LLM prompt, grouping by server."""
logs_by_server = defaultdict(list)
for file_path, log_line in log_batch:
server_name = config_files.get(file_path, {}).get("server_name", "unknown_server")
logs_by_server[server_name].append(log_line)
prompt_parts = []
for server_name, lines in logs_by_server.items():
context = "No context provided."
# Find the full context string from the config
for conf in config_files.values():
if conf['server_name'] == server_name:
context = conf['system_prompt_context']
break
prompt_parts.append(f"[{server_name}]")
prompt_parts.append(context)
prompt_parts.extend(lines)
return "\n".join(prompt_parts)
def query_llm(log_content: str) -> list | None:
"""Sends log content to the LLM and gets a structured JSON response."""
if not API_KEY or "<" in API_KEY:
logging.error("OpenRouter API key is not set. Please set the OPENROUTER_API_KEY environment variable.")
return None
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
system_prompt = (
"""
You are a server network security expert. You will be provided snippet of nginx HTTP log for a public server that accessible by internet, including bots.
Give the list of IPs that should be blacklisted for a very long time for blatant vulnerability scanning/triggering in attempt to hack. We have zero tolerance for such activities. However, well-known crawlers are not enemies, be nice to them.
Output the JSON array in format of:
[{
"ip": string,
"reason_tldr": string,
"confidence": 0-1
}, ...]
If no suspicious IPs, return empty array [], and no need extra reasoning.
"""
)
try:
logging.info("Querying LLM with new log batch of %d characters.", len(log_content))
completion = client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": log_content}
],
# Use `response_format` for models that support it to enforce JSON output
# Note: OpenRouter may or may not pass this to the underlying model.
# a well-crafted prompt is the most reliable method.
)
response_content = completion.choices[0].message.content
# Clean up common markdown formatting from LLM response
response_content = response_content.strip().strip('```').strip('json').strip()
data = json.loads(response_content)
if isinstance(data, list):
logging.info(f"LLM identified {len(data)} IPs to blacklist.")
return data
else:
logging.warning(f"LLM response was valid JSON but not a list as expected: {type(data)}")
return []
except json.JSONDecodeError as e:
logging.error(f"Failed to decode JSON from LLM response: {e}\nResponse text: {response_content}")
except Exception as e:
logging.error(f"An unexpected error occurred while querying LLM: {e}")
return None
def update_database(conn: sqlite3.Connection, items_to_blacklist: list, logs_by_ip: dict) -> bool:
"""Inserts or replaces IPs in the blacklist. Returns True if any DB change was made."""
if not items_to_blacklist:
return False
cursor = conn.cursor()
changes_made = False
timestamp = int(time.time())
for item in items_to_blacklist:
if not isinstance(item, dict) or not all(k in item for k in ["ip", "reason_tldr", "confidence"]):
logging.warning(f"Skipping malformed item from LLM: {item}")
continue
ip = item.get("ip")
if not ip or not isinstance(ip, str) or is_private_ip(ip):
logging.info(f"Skipping invalid or private IP from LLM response: {ip}")
continue
# Get associated logs and sections for the IP from the current batch
ip_evidence = logs_by_ip.get(ip)
if ip_evidence:
related_logs = "\n".join(ip_evidence['logs'])
related_sections = ",".join(sorted(list(ip_evidence['sections'])))
else:
# This can happen if the LLM hallucinates an IP not in the log batch
related_logs = None
related_sections = None
logging.warning(f"IP {ip} returned by LLM was not found in the processed log batch. Storing without logs/sections.")
try:
# INSERT OR REPLACE handles new IPs and updates existing ones with the latest data
cursor.execute(
"INSERT OR REPLACE INTO blacklist (ip, reason_tldr, confidence, timestamp, logs, sections) VALUES (?, ?, ?, ?, ?, ?)",
(ip, str(item["reason_tldr"]), float(item["confidence"]), timestamp, related_logs, related_sections)
)
if cursor.rowcount > 0:
changes_made = True
logging.info(f"ADD/UPDATE IP: {ip} | Reason: {item['reason_tldr']} | Confidence: {item['confidence']:.2f}")
except (sqlite3.Error, ValueError) as e:
logging.error(f"DB/Type error for IP {ip}: {e}")
if changes_made:
conn.commit()
return changes_made
def process_unban_requests(unban_filepath: str, conn: sqlite3.Connection, nginx_deny_path: str):
"""Reads IPs from the unban file, removes them from the DB, and reloads Nginx."""
try:
with open(unban_filepath, 'r') as f:
ips_to_unban = [line.strip() for line in f if line.strip()]
if not ips_to_unban:
return # Nothing to do
logging.info(f"Processing unban request for IPs: {', '.join(ips_to_unban)}")
cursor = conn.cursor()
changes_made = 0
for ip in ips_to_unban:
try:
# Validate IP format before attempting DB operation
ipaddress.ip_address(ip)
cursor.execute("DELETE FROM blacklist WHERE ip = ?", (ip,))
if cursor.rowcount > 0:
changes_made += 1
logging.info(f"Removed IP {ip} from the blacklist.")
else:
logging.warning(f"IP {ip} from unban file was not found in the blacklist.")
except ValueError:
logging.warning(f"Skipping invalid IP address in unban file: {ip}")
except sqlite3.Error as e:
logging.error(f"DB error while unbanning IP {ip}: {e}")
if changes_made > 0:
conn.commit()
logging.info(f"Successfully unbanned {changes_made} IP(s).")
# Regenerate the deny list and reload Nginx
export_nginx_deny_list(conn, nginx_deny_path)
reload_nginx()
else:
logging.info("No IPs from the unban file were found in the database. No changes made.")
# Clear the file after processing
with open(unban_filepath, 'w') as f:
pass
logging.info(f"Cleared unban file: {unban_filepath}")
except FileNotFoundError:
logging.warning(f"Unban file {unban_filepath} not found for processing.")
except Exception as e:
logging.error(f"An error occurred while processing unban requests: {e}")
def export_nginx_deny_list(conn: sqlite3.Connection, output_path: str):
"""Exports all blacklisted IPs to an Nginx 'deny' file using geo module format."""
try:
cursor = conn.cursor()
# Fetch IP, reason, and timestamp for better context in the deny file
cursor.execute("SELECT ip, reason_tldr, timestamp FROM blacklist ORDER BY timestamp DESC")
ips_with_details = cursor.fetchall()
with open(output_path, 'w') as f:
f.write(f"# Auto-generated by LLM Log Monitor on {time.ctime()}\n")
f.write(f"# Total IPs: {len(ips_with_details)}\n\n")
f.write("geo $is_denied {\n")
f.write(" default 0;\n")
for ip, reason_tldr, timestamp in ips_with_details:
# Convert timestamp back to human-readable format for comments
timestamp_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
f.write(f" {ip} 1; # Added {timestamp_str}, Reason: {reason_tldr}\n")
f.write("}\n")
logging.info(f"Exported {len(ips_with_details)} IPs to {output_path} in geo format.")
except Exception as e:
logging.error(f"Failed to export Nginx deny list in geo format: {e}")
def reload_nginx():
"""Executes 'sudo nginx -s reload'. IMPORTANT: Requires passwordless sudo."""
command = ["sudo", "nginx", "-s", "reload"]
logging.info(f"Running command: {' '.join(command)}")
try:
# Using check=True will raise CalledProcessError on non-zero exit codes
result = subprocess.run(command, check=True, capture_output=True, text=True)
logging.info("Nginx reloaded successfully.")
if result.stderr: # Nginx often prints to stderr on success
logging.info(f"Nginx output (stderr): {result.stderr.strip()}")
except FileNotFoundError:
logging.error("`sudo` command not found. Is it in the system's PATH?")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to reload Nginx. Return code: {e.returncode}")
logging.error(f"Stderr: {e.stderr.strip()}")
logging.error("Ensure the user running this script has passwordless sudo rights for 'nginx -s reload'.")
logging.error("Example sudoers entry: 'your_user ALL=(root) NOPASSWD: /usr/sbin/nginx -s reload'")
def main():
"""The main entry point and monitoring loop using inotify."""
# Initial update of the public IP
update_current_public_ip()
config = initialize_config(CONFIG_FILE, LOG_DIR_GLOB)
db_conn = initialize_db(DB_FILE)
initialize_unban_file(UNBAN_FILE)
export_nginx_deny_list(db_conn, NGINX_DENY_LIST)
log_dir = os.path.dirname(LOG_DIR_GLOB)
unban_dir = os.path.dirname(UNBAN_FILE)
inotify = None
try:
inotify = INotify()
watch_flags = flags.CREATE | flags.MOVED_TO | flags.MODIFY
inotify.add_watch(log_dir, watch_flags)
logging.info(f"Started inotify watch on directory: {log_dir}")
inotify.add_watch(unban_dir, watch_flags)
logging.info(f"Started inotify watch on directory: {unban_dir}")
except Exception as e:
logging.critical(f"Failed to initialize inotify: {e}. The monitor cannot run. Exiting.")
return
log_queue = deque()
# Holds {filepath: new_cursor_pos} that will be committed after successful processing
pending_cursor_updates = {}
last_llm_check_time = time.time()
ip_regex = re.compile(r'^(\S+)')
unban_file_basename = os.path.basename(UNBAN_FILE)
try:
while True:
# Replaces time.sleep() with a blocking read that has a timeout.
# It wakes up on file events or after POLLING_INTERVAL_SECONDS.
events = inotify.read(timeout=POLLING_INTERVAL_SECONDS * 1000)
# Check for file events (unban requests, new log files)
for event in events:
# --- Unban Request Handling ---
if event.name == unban_file_basename and event.mask & (flags.MODIFY | flags.CREATE | flags.MOVED_TO):
logging.info(f"Detected event for unban file: {UNBAN_FILE}")
process_unban_requests(UNBAN_FILE, db_conn, NGINX_DENY_LIST)
# Don't process this event as a log file
continue
# --- New Log File Discovery ---
if event.mask & (flags.CREATE | flags.MOVED_TO):
filepath = os.path.join(log_dir, event.name)
if fnmatch.fnmatch(filepath, LOG_DIR_GLOB):
add_new_file_to_config(config, filepath, CONFIG_FILE)
had_new_logs = False
# Check all configured files for new content. This is robust against missed events.
for filepath, file_conf in list(config.get("files", {}).items()):
# Exclude error logs from active monitoring based on their name
if ".error" in os.path.basename(filepath):
continue
# Use cursor from pending if it exists, otherwise from main config
current_cursor = pending_cursor_updates.get(filepath, file_conf["cursor"])
new_lines, new_cursor = read_new_log_lines(filepath, current_cursor)
if new_cursor > current_cursor:
had_new_logs = True
for line in new_lines:
match = ip_regex.match(line)
if match and not is_private_ip(match.group(1)):
log_queue.append((filepath, line))
pending_cursor_updates[filepath] = new_cursor
logging.info(f"Found {len(new_lines)} new lines in {filepath}. Current queue size: {len(log_queue)}")
elif new_cursor < current_cursor: # File was truncated or rotated
logging.info(f"Log file {filepath} was truncated. Resetting cursor.")
pending_cursor_updates[filepath] = new_cursor
time_since_last_check = time.time() - last_llm_check_time
queue_is_full = len(log_queue) >= MAX_QUEUE_SIZE
# Only trigger timeout if there are logs in the queue or new logs were just added
timeout_reached = (log_queue or had_new_logs) and time_since_last_check >= MAX_WAIT_SECONDS
if log_queue and (queue_is_full or timeout_reached):
logging.info(f"Processing batch. Reason: {'Queue full' if queue_is_full else 'Timeout'}. Queue size: {len(log_queue)}")
batch_to_process = []
logs_by_ip = defaultdict(lambda: {'logs': [], 'sections': set()})
blacklisted_items = None
if len(log_queue) > 0:
batch_to_process = [log_queue.popleft() for _ in range(min(len(log_queue), MAX_QUEUE_SIZE))]
# Create a map of IP -> {logs, sections} for the current batch
for filepath, line in batch_to_process:
match = ip_regex.match(line)
if match:
ip = match.group(1)
server_name = config["files"].get(filepath, {}).get("server_name", "unknown_server")
logs_by_ip[ip]['logs'].append(line)
logs_by_ip[ip]['sections'].add(server_name)
llm_prompt_content = format_logs_for_llm(batch_to_process, config["files"])
print('=============================')
print(llm_prompt_content)
print('=============================')
blacklisted_items = query_llm(llm_prompt_content)
update_current_public_ip()
if blacklisted_items is not None: # Indicates a successful API call (even if list is empty)
db_updated = update_database(db_conn, blacklisted_items, logs_by_ip)
if db_updated:
export_nginx_deny_list(db_conn, NGINX_DENY_LIST)
reload_nginx()
# On success, commit pending cursor updates to the main config object
for path, new_pos in pending_cursor_updates.items():
if path in config["files"]:
config["files"][path]["cursor"] = new_pos
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
pending_cursor_updates.clear()
logging.info("Successfully processed batch and updated cursors.")
else:
# LLM query failed, put items back at the front of the queue to retry
logging.warning("LLM query failed. Re-queuing batch for next cycle.")
log_queue.extendleft(reversed(batch_to_process))
last_llm_check_time = time.time()
# No time.sleep() needed, as inotify.read() with a timeout handles the wait.
except KeyboardInterrupt:
logging.info("Shutdown requested a by user.")
finally:
if inotify:
inotify.close()
logging.info("inotify watch closed.")
if db_conn:
db_conn.close()
logging.info("Monitor stopped.")
if __name__ == "__main__":
main()
"""
Create a python llm nginx logs monitor for network security:
1. for init, scan /var/log/nginx/saren/wtako.net/*.log and create json config. Mark the file size as last read position (cursor) (we discard previous log entries). Also make the config to have a string field to inject into LLM system prompt for more context.
2. Watch for every appended lines/bytes, and after sending the log lines into LLM and reply received, move the cursor and update json config file.
3. The appended log lines will be in a queue that waits for at most 60 seconds or as soon as 100 lines reached. The LLM query should consist up to 100 nginx lines. In other words, if no log lines after 60 seconds, do nothing. The log lines should be filtered to exclude all private IPv4 ranges (espcially 192.168.0.0/16, 100.64.0.0/10, 172.16.0.0/12) before appending into the queue. Otherwise use openai api (host = openrouter.) to query the LLM.
```
from openai import OpenAI
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key="<OPENROUTER_API_KEY>",
)
completion = client.chat.completions.create(
model="openai/gpt-4o",
messages=[
{
"role": "system",
"content": "......."
},
{
"role": "user",
"content": "LIST OF NGINX LOGS"
}
]
)
print(completion.choices[0].message.content)
```
4. The LLM will reply a list of IPs (empty array if none) to be blacklisted in json format of
```
[{
"ip": string,
"reason_tldr": string,
"confidence": 0-1
}, ...]
```
5. Use sqlite to save the info above. IP should be primary key. Should also filter private IP ranges from LLM response in case of LLM error. Expect many duplication in IP. The field would have a unix timestamp in integer.
6. If there is any insert to the sqlite, export a nginx deny list (format: list of "deny <ip>;" seperated by \n) to a path (/tmp/deny.conf), and run "sudo nginx -s reload"
7. LLM query example ([drop] and [www] has new logs, and 60 seconds passed. Notice servers that does not have new nginx logs wont get sections):
```
[drop]
public file upload service
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /bak.php?p= HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /style2.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /end.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /fwe.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /uplozyu.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /icon.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /searchl.php HTTP/1.1" 404 9 "-" "-"
52.187.77.78 - - [31/Aug/2025:04:55:12 +0800] "GET /504.php HTTP/1.1" 404 9 "-" "-"
94.23.188.221 - - [31/Aug/2025:05:28:24 +0800] "GET /robots.txt HTTP/1.1" 404 9 "-" "Mozilla/5.0 (compatible; AhrefsBot/7.0; +http://ahrefs.com/robot/)"
51.75.236.134 - - [31/Aug/2025:05:28:33 +0800] "GET /file/966a6e118fdd6a8d5d161ee2f44c381954f50735.gif HTTP/1.1" 200 8198692 "-" "Mozilla/5.0 (compatible; AhrefsBot/7.0; +http://ahrefs.com/robot/)"
[www]
front page
74.241.246.208 - - [31/Aug/2025:05:36:17 +0800] "GET /wp-setup.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:17 +0800] "GET /wp-signin.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:18 +0800] "GET /wp-wso.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:20 +0800] "GET /wp.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:20 +0800] "GET /wp_wrong_datlib.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:21 +0800] "GET /wsa.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:21 +0800] "GET /wso.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:22 +0800] "GET /y.php HTTP/2.0" 404 1087 "-" "-"
74.241.246.208 - - [31/Aug/2025:05:36:22 +0800] "GET /zwso.php HTTP/2.0" 404 1087 "-" "-"
192.168.0.166 - - [31/Aug/2025:05:36:39 +0800] "GET /wpad.dat HTTP/2.0" 404 3993 "-" "WinHttp-Autoproxy-Service/5.1"
...
```
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment