Skip to content

Instantly share code, notes, and snippets.

@avi-perl
Last active August 21, 2024 13:19
Show Gist options
  • Save avi-perl/b173fdc30219155eb9ee4bb3a21c4104 to your computer and use it in GitHub Desktop.
Save avi-perl/b173fdc30219155eb9ee4bb3a21c4104 to your computer and use it in GitHub Desktop.
Audit logging in Python
  • I'd like to make note of sensitive changes in my app.
  • I'd like to be able to query these changes from day one.
  • I'd like to leave architecting a long term solution for later.
  • I'd like these records to persist.
  • I'd like to create alarms around some changes that get sent to a business (not development) team.

💡 Idea!

  • Add a sub-logger to python's logging module.
  • Add a rule in my infrastructure that recognizes the sub-logger logs and directs them to a log group with different persistence rules.
  • Format the sub-logger logs to be JSON for easier querying and processing.

By making this a part of the logging infrastructure, the sub-logger is always handy and easily retrievable. AWS CloudWatch has support for queries, so we have a strong search ability out of the box. In theory, an update to the sub-logger could have the records be directed to a more appropriate storage option, such as DynamoDB.

import logging
from typing import cast
import ecs_logging
from starlette_context import _request_scope_context_storage
try:
_request_scope_context_storage.get()
from starlette_context import context
use_context = True
except LookupError:
use_context = False
class StarletteContextFilter(logging.Filter):
def filter(self, record):
# Add your custom data to the record
if use_context:
record.context = context
return True
class AuditLogger(logging.Logger):
"""A logger class with extra functions and customization for use in tracking sensitive changes in the app."""
def __init__(self, name, level=logging.INFO):
super().__init__(name, level)
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
self.addHandler(handler)
self.addFilter(StarletteContextFilter())
def record_updated(self, msg: str, record_type: str, before: dict, after: dict, *args, **kwargs):
"""A general use audit function for when a model has been changed."""
updated = {'record_type': record_type, 'before': {}, 'after': {}}
# Find keys with different values
for key in before.keys() & after.keys():
if before[key] != after[key]:
updated['before'][key] = before[key]
updated['after'][key] = after[key]
if 'extra' not in kwargs:
kwargs['extra'] = {}
kwargs['extra']['changes'] = updated
self.info(msg, *args, **kwargs)
class MyAppLogger(logging.Logger):
"""Custom app logger with a special sub-logger for auditing."""
def __init__(self, name, level=logging.NOTSET):
super().__init__(name, level)
self.audit = AuditLogger(f"{name}.audit", logging.INFO)
@classmethod
def get(cls, name: str):
"""Use in place of logging.getLogger() so typing systems recognize the logger.audit"""
return cast(cls, logging.getLogger(name))
import logging
from audit_logger import MyAppLogger
logging.setLoggerClass(MyAppLogger)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = MyAppLogger.get(__name__)
if __name__ == '__main__':
logger.info('This is an info log from the regular logger, nothing has changed about it\'s functionality')
logger.audit.info(
'Simple text audit logs can be made by using the regular level functions',
extra={"some_extra_info": "can be passed via extra"}
)
# App specific logging function.
logger.audit.record_updated(
"Edit made to MFA information",
record_type='user',
before={'id': 1, 'phone': '+1234567890'},
after={'id': 1, 'phone': '+19876543210'}
)
2024-08-21 09:19:11,830 - __main__ - INFO - This is an info log from the regular logger, nothing has changed about it's functionality
{"@timestamp":"2024-08-21T13:19:11.830Z","log.level":"info","message":"Simple text audit logs can be made by using the regular level functions","ecs.version":"1.6.0","log":{"logger":"__main__.audit","origin":{"file":{"line":11,"name":"example.py"},"function":"<module>"},"original":"Simple text audit logs can be made by using the regular level functions"},"process":{"name":"MainProcess","pid":27152,"thread":{"id":19204,"name":"MainThread"}},"some_extra_info":"can be passed via extra"}
{"@timestamp":"2024-08-21T13:19:11.830Z","log.level":"info","message":"Edit made to MFA information","changes":{"after":{"phone":"+19876543210"},"before":{"phone":"+1234567890"},"record_type":"user"},"ecs.version":"1.6.0","log":{"logger":"__main__.audit","origin":{"file":{"line":49,"name":"__init__.py"},"function":"record_updated"},"original":"Edit made to MFA information"},"process":{"name":"MainProcess","pid":27152,"thread":{"id":19204,"name":"MainThread"}}}
import logging
from fastapi import FastAPI, Depends, Request
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from audit_logger import MyAppLogger
async def my_context_dependency(request: Request):
context.update({"request": request.headers})
yield
app = FastAPI(dependencies=[Depends(my_context_dependency)])
app.add_middleware(RawContextMiddleware)
logging.setLoggerClass(MyAppLogger)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = MyAppLogger.get(__name__)
@app.get("/")
async def show_logs():
client_id = "some_client"
logger.audit.info(f"Client secret retrieved for client '{client_id}'")
return {"client_id": client_id, "secret": "🤫"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=2000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment