-
-
Save vvanglro/1b1c60b552329a97bfd9569ea67a9b90 to your computer and use it in GitHub Desktop.
Log context propagation in Python ASGI apps. Details here: https://rednafi.com/python/log_context_propagation/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from svc import log # noqa |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# log.py | |
import contextvars | |
import json | |
import logging | |
import time | |
# Set up the context variable with default values | |
default_context = {"user_id": "unknown", "platform": "unknown"} | |
log_context_var = contextvars.ContextVar("log_context", default=default_context.copy()) | |
# Custom log formatter | |
class ContextAwareJsonFormatter(logging.Formatter): | |
def format(self, record): | |
log_data = { | |
"message": record.getMessage(), | |
# Add millisecond precision timestamp | |
"timestamp": int(time.time() * 1000), | |
# Get the context from the context variable in a concurrent-safe way | |
"tags": log_context_var.get(), | |
} | |
return json.dumps(log_data) | |
# Set up the logger | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
formatter = ContextAwareJsonFormatter() | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# main.py | |
import uvicorn | |
from starlette.applications import Starlette | |
from starlette.middleware import Middleware | |
from starlette.routing import Route | |
from svc.middleware import LogContextMiddleware | |
from svc.view import view | |
middlewares = [Middleware(LogContextMiddleware)] | |
app = Starlette(routes=[Route("/", view)], middleware=middlewares) | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# middleware.py | |
import logging | |
from collections.abc import Awaitable, Callable | |
from starlette.middleware.base import BaseHTTPMiddleware | |
from starlette.requests import Request | |
from starlette.responses import Response | |
from svc.log import default_context, log_context_var | |
# Middleware for setting log context | |
class LogContextMiddleware(BaseHTTPMiddleware): | |
async def dispatch( | |
self, request: Request, call_next: Callable[[Request], Awaitable[Response]] | |
) -> Response: | |
context = default_context.copy() | |
user_id = request.headers.get("Svc-User-Id") | |
platform = request.headers.get("Svc-Platform") | |
if user_id: | |
context["user_id"] = user_id | |
if platform: | |
context["platform"] = platform | |
token = log_context_var.set(context) | |
try: | |
logging.info("From middleware: request started") | |
response = await call_next(request) | |
logging.info("From middleware: request ended") | |
finally: | |
# Reset the context after the request is processed | |
log_context_var.reset(token) | |
return response |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# view.py | |
import asyncio | |
import logging | |
from starlette.requests import Request | |
from starlette.responses import JSONResponse | |
async def view(request: Request) -> JSONResponse: | |
await work() | |
logging.info("From view function: work finished") | |
return JSONResponse({"message": f"Work work work!!!"}) | |
async def work() -> None: | |
logging.info("From work function: work started") | |
await asyncio.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment