Last active
June 11, 2025 13:15
-
-
Save sam2332/e8b62fc4897e9881f7cca28211dbd2d1 to your computer and use it in GitHub Desktop.
Generate a server wallpaper with stats and suspicious ip addresses
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
IIS Wallpaper Updater | |
===================== | |
A monitoring tool that automatically generates and sets Windows desktop wallpaper | |
based on IIS server activity and statistics. | |
Author: Lily Rudloff | |
License: MIT | |
Version: 2.0 (Optimized) | |
""" | |
import logging | |
import os | |
from logging.handlers import RotatingFileHandler | |
logging.getLogger('matplotlib').setLevel(logging.WARNING) | |
log_dir = os.path.dirname(os.path.abspath(__file__)) | |
log_path = os.path.join(log_dir, "iis_wallpaper.log") | |
# Set up rotating log handler: 3 backups, each max 1MB | |
handler = RotatingFileHandler(log_path, maxBytes=1_000_000, backupCount=3) | |
formatter = logging.Formatter( | |
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
handler.setFormatter(formatter) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(handler) | |
# Optional: also log to console | |
if not logger.hasHandlers(): | |
logger.addHandler(logging.StreamHandler()) | |
logger.info("IIS Wallpaper Updater starting") | |
import sys | |
def handle_uncaught_exception(exc_type, exc_value, exc_traceback): | |
if issubclass(exc_type, KeyboardInterrupt): | |
# Let KeyboardInterrupt go through so the user can exit with Ctrl+C | |
sys.__excepthook__(exc_type, exc_value, exc_traceback) | |
return | |
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) | |
sys.excepthook = handle_uncaught_exception | |
import os, glob, time, argparse, subprocess, re, io, string, shutil, ctypes | |
from collections import deque, defaultdict | |
from datetime import datetime, timedelta | |
from functools import lru_cache | |
from PIL import Image, ImageDraw, ImageFont | |
import matplotlib.pyplot as plt | |
# ───────────────────────── Graph helper ───────────────────────────────────────── | |
def render_request_graph(requests_over_time): | |
logger.debug("Rendering request graph with %d sites", len(requests_over_time)) | |
try: | |
fig, ax = plt.subplots() | |
fig.set_size_inches(5, 5) | |
for site, points in requests_over_time.items(): | |
if not points: | |
logger.warning("No data points for site %s", site) | |
continue | |
logger.debug("Plotting %d data points for site %s", len(points), site) | |
times, counts = zip(*sorted(points)) | |
ax.plot(times, counts, label=site) | |
ax.legend(fontsize=8) | |
ax.set_title("Requests Over Time") | |
ax.tick_params(axis='x', labelrotation=45, colors='white') | |
ax.tick_params(axis='y', colors='white') | |
for spine in ax.spines.values(): | |
spine.set_color('white') | |
ax.yaxis.label.set_color('white') | |
ax.xaxis.label.set_color('white') | |
ax.title.set_color('white') | |
fig.patch.set_alpha(0) | |
buf = io.BytesIO() | |
plt.tight_layout() | |
plt.savefig(buf, format='png', transparent=True) | |
plt.close(fig) | |
buf.seek(0) | |
logger.debug("Request graph rendered successfully") | |
return Image.open(buf) | |
except Exception as e: | |
logger.error("Error rendering request graph: %s", str(e), exc_info=True) | |
raise | |
# ───────────────────────── Disk helper ───────────────────────────────────────── | |
def get_disk_usage(): | |
logger.debug("Getting disk usage information") | |
try: | |
drives = [f"{d}:\\" for d in string.ascii_uppercase if os.path.exists(f"{d}:\\")] | |
logger.debug("Found %d drives: %s", len(drives), drives) | |
usage = {} | |
for d in drives: | |
try: | |
total, used, free = shutil.disk_usage(d) | |
percent = used/total*100 | |
usage[d] = dict(total=total, used=used, free=free, percent=percent) | |
logger.debug("Drive %s: %.1f%% used (%.1f GB free)", | |
d, percent, free/2**30) | |
except Exception as e: | |
logger.error("Error getting disk usage for drive %s: %s", d, str(e)) | |
return usage | |
except Exception as e: | |
logger.error("Error in get_disk_usage: %s", str(e), exc_info=True) | |
return {} | |
# ───────────────────────── IIS helpers ───────────────────────────────────────── | |
def get_site_mapping(): | |
logger.debug("Getting IIS site mapping") | |
mapping = {} | |
try: | |
appcmd = r"C:\\Windows\\System32\\inetsrv\\appcmd.exe" | |
logger.debug("Running appcmd to list sites: %s", appcmd) | |
startupinfo = subprocess.STARTUPINFO() | |
output = subprocess.check_output([appcmd, 'list', 'site'], | |
startupinfo=startupinfo, | |
encoding='utf-8') | |
logger.debug("Processing output from appcmd") | |
for line in output.splitlines(): | |
try: | |
m = re.search(r'id:(\d+),bindings:(.*?),state:', line) | |
if m: | |
sid = int(m.group(1)) | |
bindings = m.group(2) | |
http_name = next((b.split(':')[-1] for b in bindings.split(',') | |
if b.startswith('http/')), None) | |
site_name = http_name or f'site_{sid}' | |
mapping[sid] = site_name | |
logger.debug("Mapped site ID %d to name %s", sid, site_name) | |
except Exception as e: | |
logger.error("Error processing site line: %s - %s", line, str(e)) | |
logger.info("Found %d IIS sites", len(mapping)) | |
return mapping | |
except Exception as e: | |
logger.error("Error getting site mapping: %s", str(e), exc_info=True) | |
return {} | |
# ───────────────────────── Log parser ───────────────────────────────────────── | |
def compile_patterns(patterns): | |
logger.debug("Compiling %d patterns into single regex", len(patterns)) | |
return re.compile('|'.join(f'(?:{p})' for p in patterns), re.I) | |
# Compile filter and threat detection patterns | |
logger.debug("Initializing filter patterns") | |
filter_pattern = compile_patterns([ | |
r'^/cdn-cgi/rum(?:\?.*)?$', r'^/(css|js|images|fonts|static|assets|_next|vendor)/.*$', | |
r'^/favicon\.ico$', r'^/robots\.txt$', r'^/sitemap\.xml$', | |
r'\.(css|js|ico|png|jpg|jpeg|gif|svg|map|woff2?|ttf|eot|webp|avif|mp4|webm|ogg|json|xml|txt)$', | |
r'^/health(?:check)?$', r'^/metrics$' | |
]) | |
logger.debug("Initializing critical patterns") | |
critical_patterns = compile_patterns([r'\.git/', r'\.svn/', r'\.env\b', r'\.htaccess\b', r'\.htpasswd\b', r'etc/passwd', r'/cgi-bin/', r'/cgi/', r'cmd=', r'xmlrpc.php']) | |
logger.debug("Initializing suspicious patterns") | |
suspicious_patterns = compile_patterns([r'\badmin\b', r'\bwp-admin\b', r'wpadmin', r'\bcpanel\b', r'\bwebadmin\b', r'\badministrator\b', r'\badminpanel\b', r'\blogin\b', r'\bbackend\b', r'\bsetup\b', r'\binstall\b', r'\bupgrade\b', r'\bconfig\b', r'\bsettings\b', r'\bdrupal\b', r'\bjoomla\b', r'\bmagento\b', r'\bwordpress\b', r'\.php\b', r'\.asp\b', r'\.jsp\b', r'\.cgi\b', r'\.zip\b', r'\.tar\b', r'\.tar\.gz\b', r'\.bak\b', r'\.old\b', r'\.backup\b', r'passwd', r'wp-login']) | |
def is_recent_file(path, days=2): | |
try: | |
file_date = datetime.fromtimestamp(os.path.getmtime(path)) | |
cutoff_date = datetime.now() - timedelta(days=days) | |
is_recent = file_date > cutoff_date | |
if is_recent: | |
logger.debug("File %s is recent (modified %s)", | |
os.path.basename(path), file_date.strftime("%Y-%m-%d")) | |
return is_recent | |
except Exception as e: | |
logger.error("Error checking if file %s is recent: %s", path, str(e)) | |
return False | |
def parse_logs(log_dir, hours, recent_count=30): | |
logger.info("Parsing IIS logs from %s for the last %d hours", log_dir, hours) | |
window_start = datetime.now() - timedelta(hours=hours) | |
logger.debug("Window start time: %s", window_start.strftime("%Y-%m-%d %H:%M:%S")) | |
site_map = get_site_mapping() | |
stats = {} | |
ip_totals = defaultdict(int) | |
threats = defaultdict(list) | |
recent_404 = defaultdict(set) | |
timeline = defaultdict(int) | |
processed_files = 0 | |
skipped_files = 0 | |
total_lines = 0 | |
for sid, name in site_map.items(): | |
logger.debug("Processing site: %s (ID: %d)", name, sid) | |
folder = os.path.join(log_dir, f'W3SVC{sid}') | |
views = errors = 0 | |
recent = deque(maxlen=recent_count) | |
log_files = glob.glob(os.path.join(folder, '*.log')) | |
logger.debug("Found %d log files for site %s", len(log_files), name) | |
site_processed_files = 0 | |
for file in log_files: | |
if not is_recent_file(file): | |
skipped_files += 1 | |
continue | |
logger.debug("Processing log file: %s", os.path.basename(file)) | |
site_processed_files += 1 | |
processed_files += 1 | |
try: | |
with open(file, 'r', encoding='utf-8', errors='ignore') as fh: | |
fields = {} | |
file_lines = 0 | |
for line in fh: | |
file_lines += 1 | |
total_lines += 1 | |
if line.startswith('#Fields:'): | |
headers = line.strip().split()[1:] | |
fields = {k: i for i, k in enumerate(headers)} | |
logger.debug("Found headers: %s", fields.keys()) | |
continue | |
if line.startswith('#') or not fields: | |
continue | |
cols = line.split() | |
try: | |
t = datetime.strptime(cols[fields['date']] + ' ' + cols[fields['time']], '%Y-%m-%d %H:%M:%S') | |
except Exception as e: | |
logger.warning("Failed to parse timestamp in line: %s", e) | |
continue | |
rounded = t.replace(minute=0, second=0, microsecond=0) | |
timeline[(name, rounded)] += 1 | |
if t < window_start: | |
continue | |
views += 1 | |
status = int(cols[fields['sc-status']]) | |
if status >= 400: | |
errors += 1 | |
ip = cols[fields['c-ip']] | |
url = cols[fields['cs-uri-stem']] | |
ip_totals[ip] += 1 | |
if status == 404: | |
recent_404[url].add(ip) | |
if filter_pattern.search(url): | |
continue | |
recent.append((ip, url)) | |
if critical_patterns.search(url): | |
threats[ip].append(('critical', url)) | |
logger.warning("Critical pattern match: IP %s requested %s", ip, url) | |
elif suspicious_patterns.search(url): | |
threats[ip].append(('suspicious', url)) | |
logger.info("Suspicious pattern match: IP %s requested %s", ip, url) | |
logger.debug("Processed %d lines in file %s", file_lines, os.path.basename(file)) | |
except Exception as e: | |
logger.error("Error processing file %s: %s", file, str(e), exc_info=True) | |
logger.info("Site %s: processed %d files, %d views, %d errors", | |
name, site_processed_files, views, errors) | |
stats[name] = (views, errors, list(recent)) | |
# Process results | |
logger.info("Processed %d files, skipped %d old files, parsed %d lines total", | |
processed_files, skipped_files, total_lines) | |
# Filter timeline to only include entries from last 30 days | |
timeline = {k: v for k, v in timeline.items() if k[1] > datetime.now() - timedelta(days=30)} | |
logger.debug("Timeline contains %d data points", len(timeline)) | |
# Sort 404 errors by number of unique IPs requesting them | |
recent_404s = sorted(((u, sorted(ips)) for u, ips in recent_404.items()), key=lambda x: -len(x[1]))[:30] | |
logger.info("Found %d distinct URLs returning 404 status", len(recent_404)) | |
# Process threat data | |
threats = {ip: sorted(set(v), key=lambda x: (x[0] != 'critical', x[1])) for ip, v in threats.items()} | |
logger.info("Found %d IPs with suspicious or critical requests", len(threats)) | |
return stats, ip_totals, threats, recent_404s, timeline | |
# ───────────────────────── Wallpaper Renderer / Utility (unchanged) ───────────────────────── | |
# Place your existing render_wallpaper() and set_wallpaper() code here | |
def render_wallpaper(stats, recent_ip_totals, evil_ips, window_hours, | |
output_path, recent_404s, request_timeline): | |
logger.info("Rendering wallpaper to %s", output_path) | |
# Define dimensions and colors | |
width, height = 1920, 1080 | |
bg = (30, 30, 30) | |
fg = (230, 230, 230) | |
fg_disabled = (120, 120, 120) | |
errc = (200, 80, 80) | |
rec_c = (150, 150, 150) | |
evil_c = (255, 60, 60) | |
nf_c = (255, 140, 0) | |
logger.debug("Creating %dx%d wallpaper image", width, height) | |
img = Image.new('RGB', (width, height), bg) | |
draw = ImageDraw.Draw(img) | |
# Load fonts | |
try: | |
logger.debug("Loading fonts") | |
f_title = ImageFont.truetype('SegoeUI.ttf', 48) | |
f_head = ImageFont.truetype('SegoeUI.ttf', 36) | |
f_cell = ImageFont.truetype('SegoeUI.ttf', 32) | |
f_small = ImageFont.truetype('SegoeUI.ttf', 24) | |
logger.debug("Fonts loaded successfully") | |
except IOError as e: | |
logger.warning("Failed to load fonts: %s. Using default font instead", str(e)) | |
f_title = f_head = f_cell = f_small = ImageFont.load_default() | |
# ─ title ─ | |
y = 40 | |
title = f'IIS Stats (last {window_hours} h)' | |
draw.text(((width - draw.textlength(title, f_title)) / 2, y), | |
title, fill=fg, font=f_title) | |
y += 60 | |
stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
draw.text(((width - draw.textlength(stamp, f_small)) / 2, y), | |
stamp, fill=fg, font=f_small) | |
# ─ site table ─ | |
px, y = 100, 120 | |
max_site = max(draw.textlength(n, f_cell) for n in stats) if stats else 0 | |
col1, col2 = px, px + max_site + 40 | |
col3 = col2 + 260 | |
col5 = col3 + 260 | |
col6 = col5+320 | |
col7 = col6+520 | |
for txt, x in [('Site', col1), ('Total', col2), ('Errors', col3)]: | |
draw.text((x, y), txt, fg, font=f_head) | |
y += f_head.getbbox('Hg')[3] + 20 | |
for site, (v, e, recs) in sorted(stats.items()): | |
color = fg if recs else fg_disabled | |
draw.text((col1, y), site, color, font=f_cell) | |
draw.text((col2, y), str(v), color, font=f_cell) | |
draw.text((col3, y), str(e), errc, font=f_cell) | |
y += f_cell.getbbox('Hg')[3] + 5 | |
# ─ Suspicious IPs ─ | |
y = 120 + f_head.getbbox('Hg')[3] + 20 | |
draw.text((col5, y), 'Suspicious IPs:', evil_c, font=f_head) | |
y += f_head.getbbox('Hg')[3] + 8 | |
def sev_key(item): | |
ip, ent = item | |
crit = any(s == 'critical' for s, _ in ent) | |
return (not crit, -len(ent), ip) | |
for ip, entries in sorted(evil_ips.items(), key=sev_key): | |
draw.text((col5, y), f'{ip} ({len(entries)})', evil_c, font=f_cell) | |
y += f_cell.getbbox('Hg')[3] + 2 | |
for sev, url in entries: | |
col = evil_c if sev == 'critical' else (255, 165, 0) | |
draw.text((col5 + 40, y), f'-> {url}', col, font=f_small) | |
y += f_small.getbbox('Hg')[3] + 2 | |
y += 6 | |
# ─ Recent 404s ─ | |
if recent_404s: | |
y = 120 | |
draw.text((col6, y), 'Recent 404 Requests:', nf_c, font=f_head) | |
y += f_head.getbbox('Hg')[3] + 8 | |
for url, ips in recent_404s: | |
draw.text((col6, y), url, nf_c, font=f_small) | |
y += f_small.getbbox('Hg')[3] + 2 | |
draw.text((col6 + 40, y), | |
f'ips ({len(ips)}): {",".join(ips)}', | |
nf_c, font=f_small) | |
y += f_small.getbbox('Hg')[3] + 6 | |
# ─ Disk usage ─ | |
y =20 | |
draw.text((col7, y), 'Disk Info:', fg, font=f_head) | |
y += f_head.getbbox('Hg')[3] + 8 | |
for drv, u in get_disk_usage().items(): | |
txt = (f"{drv} {u['used']/2**30:.1f} GB used, " | |
f"{u['free']/2**30:.1f} GB free ({u['percent']:.1f}% used)") | |
draw.text((col7, y), txt, fg, font=f_cell) | |
y += f_cell.getbbox('Hg')[3] + 4 | |
# ─ request graph ─ | |
series = defaultdict(list) | |
for (site, ts), cnt in request_timeline.items(): | |
series[site].append((ts, cnt)) | |
gimg = render_request_graph(series).convert('RGBA') | |
img.paste(gimg, (width - 500, 420), gimg) | |
try: | |
logger.debug("Saving wallpaper to %s", output_path) | |
img.save(output_path) | |
logger.info("Wallpaper saved successfully") | |
except Exception as e: | |
logger.error("Error saving wallpaper to %s: %s", output_path, str(e), exc_info=True) | |
raise | |
# SPI constants | |
SPI_SETDESKWALLPAPER = 20 | |
SPIF_UPDATEINIFILE = 0x01 | |
SPIF_SENDCHANGE = 0x02 | |
def set_wallpaper(path): | |
logger.info("Setting desktop wallpaper to %s", path) | |
try: | |
ctypes.windll.user32.SystemParametersInfoW( | |
SPI_SETDESKWALLPAPER, 0, path, SPIF_UPDATEINIFILE | SPIF_SENDCHANGE | |
) | |
logger.info("Wallpaper set successfully") | |
except Exception as e: | |
logger.error("Failed to set wallpaper: %s", str(e), exc_info=True) | |
raise | |
# ───────────────────────── Main ────────────────────────────────────────────── | |
def main(): | |
logger.info("IIS Wallpaper Updater starting") | |
try: | |
# Parse command line arguments | |
ap = argparse.ArgumentParser(description='Update wallpaper with IIS stats') | |
ap.add_argument('--log-dir', default=r'C:\inetpub\logs\LogFiles') | |
ap.add_argument('--hours', type=int, default=6) | |
ap.add_argument('--output', default=fr'e:\scripts\wallpaper_{int(time.time())}.png') | |
args = ap.parse_args() | |
logger.info("Arguments parsed: log_dir=%s, hours=%d, output=%s", | |
args.log_dir, args.hours, args.output) | |
# Main workflow | |
logger.info("Starting log parsing...") | |
stats, ip_totals, threats, r404, timeline = parse_logs(args.log_dir, args.hours) | |
logger.info("Rendering wallpaper...") | |
render_wallpaper(stats, ip_totals, threats, args.hours, args.output, r404, timeline) | |
logger.info("Setting wallpaper...") | |
set_wallpaper(args.output) | |
logger.info("IIS Wallpaper Updater completed successfully") | |
except Exception as e: | |
logger.critical("Unhandled exception in main: %s", str(e), exc_info=True) | |
raise | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
logger.critical("Fatal error: %s", str(e), exc_info=True) | |
print(f"Fatal error: {str(e)}") | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment