Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Created March 24, 2025 10:41
Show Gist options
  • Save sourceperl/6eb6ceac211bffe837b35152c838092e to your computer and use it in GitHub Desktop.
Save sourceperl/6eb6ceac211bffe837b35152c838092e to your computer and use it in GitHub Desktop.
An example of a Python Nginx access log parser with gzip support (logrotate compatible).
#!/usr/bin/env python3
""""
Decode nginx access logs (plain text or gzipped).
To analyze all available logs:
$ this_script /var/log/nginx/access.log*
"""
import argparse
import gzip
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from pprint import pprint
from typing import List, Optional
# some const
LOG_LINE_PAT = re.compile(r'(?P<remote_addr>\S+) - '
r'(?P<remote_user>\S+) '
r'\[(?P<time_local>.*?)\] '
r'"(?P<request>.*?)" '
r'(?P<status>\d+) '
r'(?P<body_bytes_sent>\d+) '
r'"(?P<http_referer>.*?)" '
r'"(?P<http_user_agent>.*?)"')
LOG_DATE_FMT = r'%d/%b/%Y:%H:%M:%S %z'
# some class
@dataclass
class NginxLogEntry:
remote_addr: str
remote_user: Optional[str]
time_local: datetime
request: str
status: int
body_bytes_sent: int
http_referer: Optional[str]
http_user_agent: Optional[str]
def decode_line(line: str) -> NginxLogEntry:
line = line.strip()
match = LOG_LINE_PAT.match(line)
if not match:
raise ValueError(f'"{line}" don\'t match pattern')
try:
log_entry = NginxLogEntry(
remote_addr=match.group('remote_addr'),
remote_user=match.group('remote_user') if match.group('remote_user') != '-' else None,
time_local=datetime.strptime(match.group('time_local'), LOG_DATE_FMT),
request=match.group('request'),
status=int(match.group('status')),
body_bytes_sent=int(match.group('body_bytes_sent')),
http_referer=match.group('http_referer') if match.group('http_referer') != '-' else None,
http_user_agent=match.group('http_user_agent') if match.group('http_user_agent') != '-' else None
)
except ValueError as e:
raise ValueError(f'"{line}" unable to decode tokens: except {e}')
return log_entry
if __name__ == '__main__':
# args parse
parser = argparse.ArgumentParser()
parser.add_argument('log_files', nargs='+', type=Path, help='log file(s)')
args = parser.parse_args()
# stores all nginx log entries
entries_l: List[NginxLogEntry] = []
# process log(s) (try gzip first on except try plain text)
for log_file in args.log_files:
try:
# gzip
with gzip.open(log_file, 'rt') as log:
for line in log:
entries_l.append(decode_line(line))
except (OSError, gzip.BadGzipFile):
# plain text
with open(log_file, 'rt') as log:
for line in log:
entries_l.append(decode_line(line))
# sorted results (by time of arrival)
entries_l = sorted(entries_l, key=lambda entry: entry.time_local)
# filter result (keep all HTTP 404 not found)
filtered_entries_l = [entry for entry in entries_l if entry.status == 404]
# show result
pprint(filtered_entries_l, width=120)
print(f'process {len(entries_l)} lines, filtered {len(filtered_entries_l)} lines')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment