Skip to content

Instantly share code, notes, and snippets.

@zerolagtime
Created May 13, 2025 02:37
Show Gist options
  • Save zerolagtime/6a7b29be2d3a58181d55925276cd46aa to your computer and use it in GitHub Desktop.
Save zerolagtime/6a7b29be2d3a58181d55925276cd46aa to your computer and use it in GitHub Desktop.
Mapping fail2ban
"""Read the fail2ban-client information an make a GeoJSON to plot all blocked IPs on a map"""
import subprocess
import json
import requests
import logging
import time
from collections import defaultdict
MAX_REQUESTS = 45
TIME_WINDOW = 60 # seconds
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RateLimiter:
def __init__(self, max_requests, time_window, name):
self.max_requests = max_requests
self.time_window = time_window
self.name = name
self.request_counter: int = 0
self.call_times = []
def __del__(self):
logger.info('[%s] Made %d API calls', self.name, self.request_counter)
def wait(self):
current = time.time()
self.request_counter += 1
self.call_times = [t for t in self.call_times if current - t < self.time_window]
if len(self.call_times) >= self.max_requests:
sleep_time = self.time_window - (current - self.call_times[0])
logging.warning("[%s] Pausing to avoid exceeding the API rate limit (%d calls so far)",
self.name, self.request_counter)
time.sleep(sleep_time)
logging.info("[%s] Continuing", self.name)
self.call_times = self.call_times[1:]
self.call_times.append(time.time())
def get_active_jails():
"""Retrieve a list of active jails from Fail2ban."""
result = subprocess.run(['fail2ban-client', 'status'], capture_output=True, text=True)
jails_line = next((line for line in result.stdout.splitlines() if 'Jail list:' in line), None)
if jails_line:
jails = jails_line.split(':', 1)[1].strip().split(', ')
return jails
return []
def get_banned_ips(jail):
"""Retrieve a list of banned IPs for a specific jail."""
result = subprocess.run(['fail2ban-client', 'status', jail], capture_output=True, text=True)
banned_ips = []
for line in result.stdout.splitlines():
if 'Banned IP list:' in line:
ips = line.split(':', 1)[1].strip().split()
banned_ips.extend(ips)
break
return banned_ips
def get_geolocation(ip, rate_limiter):
rate_limiter.wait()
"""Fetch geolocation data for a given IP using ip-api.com."""
try:
response = requests.get(f'http://ip-api.com/json/{ip}', timeout=5)
if response.status_code == 429:
# Too Many Requests: wait and retry
time.sleep(TIME_WINDOW)
response = requests.get(f'http://ip-api.com/json/{ip}', timeout=5)
data = response.json()
if data['status'] == 'success':
return {
'ip': ip,
'country': data.get('country'),
'region': data.get('regionName'),
'city': data.get('city'),
'lat': data.get('lat'),
'lon': data.get('lon')
}
except requests.RequestException:
pass
return None
def main():
rate_limiter_getip = RateLimiter(MAX_REQUESTS, TIME_WINDOW, "ip-api")
jails = get_active_jails()
ip_jail_map = defaultdict(list)
# Collect all banned IPs across jails
for jail in jails:
banned_ips = get_banned_ips(jail)
for ip in banned_ips:
ip_jail_map[ip].append(jail)
geo_features = []
for ip, jails in ip_jail_map.items():
logger.debug('Looking up %s', ip)
geo = get_geolocation(ip, rate_limiter_getip)
if geo and geo['lat'] and geo['lon']:
feature = {
"type": "Feature",
"properties": {
"ip": geo['ip'],
"country": geo['country'],
"region": geo['region'],
"city": geo['city'],
"jails": jails
},
"geometry": {
"type": "Point",
"coordinates": [geo['lon'], geo['lat']]
}
}
geo_features.append(feature)
else:
logger.warning('Could not geolocate ip %s', ip)
geojson = {
"type": "FeatureCollection",
"features": geo_features
}
local_file = 'banned_ips.geojson'
logger.debug('Writing %d features in GeoJSON to %s', len(geo_features), local_file)
with open(local_file, 'w') as f:
json.dump(geojson, f, indent=2)
logger.info("GeoJSON data has been written to %s", local_file)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment