Skip to content

Instantly share code, notes, and snippets.

@AfroThundr3007730
Last active May 11, 2026 02:47
Show Gist options
  • Select an option

  • Save AfroThundr3007730/6b4d651ccb7b16148a8c7164a42fa41b to your computer and use it in GitHub Desktop.

Select an option

Save AfroThundr3007730/6b4d651ccb7b16148a8c7164a42fa41b to your computer and use it in GitHub Desktop.
A basic Flask server to receive network reports

Installation steps

  • Make the directory /opt/reports/logs

  • Add configuration to /opt/reports/logger.env

    FLASK_APP=report_logger
    FLASK_RUN_HOST=0.0.0.0
    FLASK_RUN_PORT=5001
    LOGDIR=logs
    PROXIES=1
    PYTHONPYCACHEPREFIX=/tmp
  • Place the script in /opt/reports/report_logger.py

  • Install flask and flask-limiter packages.

    apt install -y python3-flask python3-flask-limiter
    # Or via pip: pip install -q flask flask-limiter
  • Place the service file and enable the service.

    systemctl enable --now report_logger.service
  • Configure a web server to proxy connections to the logger.

  • A test report can be sent with the following:

    data='{
      "csp-report": {
          "document-uri": "https://example.com/foo",
          "referrer": "",
          "blocked-uri": "https://example.com/test.css",
          "violated-directive": "style-src 'none'",
          "original-policy": "default-src 'self'; style-src 'none'"
      }
    }'
    curl -X POST -H 'Content-Type: application/json' \
        -d "$data" http://localhost:5001/report?type=csp
  • Reports will be logged to /opt/reports/logs/YYYY-MM-DD.log

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# For issues or updated versions of this script, browse to the following URL:
# https://gist.github.com/AfroThundr3007730/6b4d651ccb7b16148a8c7164a42fa41b
"""A basic Flask server to receive network reports"""
__author__ = "AfroThundr"
__modified__ = "2026-05-10"
__version__ = "0.6.3"
from datetime import datetime, UTC
from http.server import BaseHTTPRequestHandler
from json import dumps
from os import getenv
from uuid import uuid4, UUID
from flask import Flask, Response, abort, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.datastructures import Headers
from werkzeug.exceptions import HTTPException
from werkzeug.middleware.proxy_fix import ProxyFix
type JVal = str | int | float | bool | None
type JDoc = dict[str, JVal | JDoc] | list[JVal | JDoc]
type JData = JDoc | JVal
type ResponseTuple = Response | tuple[Response, int]
LOGDIR = getenv("LOGDIR") or "/var/log"
PROXIES = int(getenv("PROXIES") or 1)
PRUNE = [
"X-RateLimit-Limit",
"X-RateLimit-Remaining",
"X-RateLimit-Reset",
"Retry-After",
]
SUCCESS = {
200: {"name": "OK", "description": "Report was received."},
201: {"name": "Created", "description": "Report was created."},
202: {"name": "Accepted", "description": "Report was submitted."},
}
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=PROXIES)
@app.after_request
def prune_headers(resp: Response) -> Response:
"""Prune ratelimit headers until needed"""
if not resp.status.split(" ")[0] == "429":
resp.headers = Headers(i for i in resp.headers if i[0] not in PRUNE)
return resp
@app.errorhandler(HTTPException)
def send_error(err: HTTPException) -> ResponseTuple:
"""Handle API error messages"""
return (
jsonify(code=err.code, name=err.name, description=err.description),
err.code or 500,
)
def send_success(code: int) -> ResponseTuple:
"""Handle success messages"""
return jsonify(code=code, **SUCCESS[code]), code
def get_type(data: JData) -> str:
"""Determine type of the submitted report"""
if not isinstance(data, dict):
abort(400)
if "type" in data.keys():
return str(data["type"])
if "csp-report" in data.keys():
return "csp-report"
return request.args.get("type") or "unknown"
def log_report(data: JData, rid: UUID, uid: bool = False) -> None:
"""Logs received network reports to a log file"""
file: str = f"{LOGDIR}/report-{datetime.now(UTC).strftime("%F")}.log"
obj: JDoc = {
"data": data,
"id": str(rid),
"type": get_type(data),
"timestamp": datetime.now(UTC).strftime("%FT%T.%fZ"),
**({"uuid": str(uuid4())} if uid else {}),
}
try:
with open(file, "a", encoding="utf-8") as log:
log.write(dumps(obj, ensure_ascii=False) + "\n")
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Error writing to log file: {type(err).__name__}")
abort(500)
@app.route("/report", methods=["POST"])
def get_report() -> ResponseTuple:
"""API endpoint to receive network reports"""
data: JData = request.get_json(cache=True, force=True)
rid: UUID = uuid4()
if isinstance(data, dict) and len(data) != 0:
log_report(data, rid)
return send_success(202)
if isinstance(data, list) and len(data) != 0:
_ = (log_report(d, rid, True) for d in data)
return send_success(202)
abort(400)
if __name__ == "__main__":
BaseHTTPRequestHandler.version_string = (
lambda self: f"{app.name}/{__version__}"
)
app.config["MAX_CONTENT_LENGTH"] = 4096
Limiter(
app=app,
default_limits=["60/minute", "10000/day"],
headers_enabled=True,
key_func=get_remote_address,
storage_uri="memory://",
)
app.run(debug=False)
# /etc/systemd/system/report_logger.service
[Unit]
Description=Service to log received network reports
After=network.target
RequiresMountsFor=/opt/reports
[Service]
Type=exec
WorkingDirectory=/opt/reports
EnvironmentFile=/opt/reports/logger.env
ExecStart=/usr/bin/flask run
PrivateTmp=true
User=www-data
Group=www-data
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment