Created
June 4, 2025 22:56
-
-
Save lmorchard/97c532df654d6e135cf7ce4f1ad8846a to your computer and use it in GitHub Desktop.
Quick python script to download CloudFront logs from an S3 bucket and submit them to Loki
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
CloudFront to Loki Log Processor | |
This script downloads new CloudFront logs from S3 and pushes them to Loki. | |
It tracks processed files to avoid duplicates and filters for 404 responses. | |
""" | |
import boto3 | |
import gzip | |
import json | |
import requests | |
import time | |
import os | |
import argparse | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
# Try to import python-dotenv, but don't fail if it's not available | |
try: | |
from dotenv import load_dotenv | |
DOTENV_AVAILABLE = True | |
except ImportError: | |
DOTENV_AVAILABLE = False | |
def load_env_file(env_file=None): | |
"""Load environment variables from .env file if available""" | |
if not DOTENV_AVAILABLE: | |
return | |
if env_file: | |
if os.path.exists(env_file): | |
load_dotenv(env_file) | |
else: | |
print(f"Warning: Specified .env file '{env_file}' not found") | |
else: | |
# Try common locations | |
env_files = ['.env', '.env.local', os.path.expanduser('~/.cloudfront-loki.env')] | |
for env_file in env_files: | |
if os.path.exists(env_file): | |
load_dotenv(env_file) | |
break | |
def get_env_default(key, default=None): | |
"""Get environment variable with fallback to default""" | |
return os.getenv(key, default) | |
class CloudFrontLogsProcessor: | |
def __init__(self, config): | |
self.config = config | |
self.s3_client = boto3.client('s3') | |
self.processed_files = self.load_processed_files() | |
def load_processed_files(self): | |
"""Load the list of already processed files""" | |
if os.path.exists(self.config.state_file): | |
with open(self.config.state_file, 'r') as f: | |
return set(line.strip() for line in f) | |
return set() | |
def cleanup_old_state_entries(self): | |
"""Remove state entries older than configured days""" | |
if not os.path.exists(self.config.state_file): | |
return | |
days_to_keep = self.config.state_cleanup_days | |
cutoff_date = datetime.now() - timedelta(days=days_to_keep) | |
if self.config.verbose and not self.config.quiet: | |
print(f"Cleaning up state file - keeping entries newer than {cutoff_date.strftime('%Y-%m-%d')}") | |
# Read current entries | |
with open(self.config.state_file, 'r') as f: | |
lines = f.readlines() | |
original_count = len(lines) | |
# Filter entries - CloudFront filenames contain date info | |
kept_lines = [] | |
for line in lines: | |
filename = line.strip() | |
if not filename: | |
continue | |
# Extract date from filename like "blog.lmorchard.com/E5YXU82LZHZCM.2025-06-04-05.d024d283.gz" | |
try: | |
# Handle both with and without prefix | |
basename = filename.split('/')[-1] # Get just the filename part | |
date_part = basename.split('.')[1] # "2025-06-04-05" | |
file_date = datetime.strptime(date_part[:10], '%Y-%m-%d') | |
if file_date >= cutoff_date: | |
kept_lines.append(line) | |
elif self.config.debug and not self.config.quiet: | |
print(f" Removing old state entry: {filename} (date: {file_date.strftime('%Y-%m-%d')})") | |
except (IndexError, ValueError) as e: | |
# Keep lines we can't parse (safer) - might be different filename format | |
kept_lines.append(line) | |
if self.config.debug and not self.config.quiet: | |
print(f" Keeping unparseable state entry: {filename} ({e})") | |
# Only rewrite if we actually removed something | |
if len(kept_lines) < original_count: | |
with open(self.config.state_file, 'w') as f: | |
f.writelines(kept_lines) | |
removed_count = original_count - len(kept_lines) | |
if self.config.verbose and not self.config.quiet: | |
print(f"Cleaned up state file: removed {removed_count} old entries, kept {len(kept_lines)}") | |
elif self.config.verbose and not self.config.quiet: | |
print(f"State file cleanup: no old entries to remove ({original_count} entries total)") | |
def save_processed_file(self, filename): | |
"""Mark a file as processed""" | |
self.processed_files.add(filename) | |
with open(self.config.state_file, 'a') as f: | |
f.write(f"{filename}\n") | |
def get_new_log_files(self): | |
"""Get list of new log files from S3""" | |
try: | |
response = self.s3_client.list_objects_v2( | |
Bucket=self.config.s3_bucket, | |
Prefix=self.config.s3_prefix | |
) | |
if 'Contents' not in response: | |
if not self.config.quiet: | |
print("No files found in S3 bucket") | |
return [] | |
new_files = [] | |
for obj in response['Contents']: | |
filename = obj['Key'] | |
if filename.endswith('.gz') and filename not in self.processed_files: | |
new_files.append({ | |
'key': filename, | |
'last_modified': obj['LastModified'], | |
'size': obj['Size'] | |
}) | |
# Sort by last modified time | |
new_files.sort(key=lambda x: x['last_modified']) | |
return new_files | |
except Exception as e: | |
print(f"Error listing S3 objects: {e}") | |
return [] | |
def download_and_parse_log(self, s3_key): | |
"""Download and parse a gzipped CloudFront log file""" | |
try: | |
if not self.config.quiet: | |
print(f"Downloading {s3_key}...") | |
# Download the file | |
response = self.s3_client.get_object(Bucket=self.config.s3_bucket, Key=s3_key) | |
# Decompress and parse JSON | |
compressed_data = response['Body'].read() | |
decompressed_data = gzip.decompress(compressed_data) | |
# CloudFront real-time logs are newline-delimited JSON | |
log_entries = [] | |
for line_num, line in enumerate(decompressed_data.decode('utf-8').strip().split('\n'), 1): | |
if line: | |
try: | |
entry = json.loads(line) | |
log_entries.append(entry) | |
# Debug output - show each parsed entry | |
if self.config.debug and not self.config.quiet: | |
print(f" Line {line_num}: {json.dumps(entry, separators=(',', ':'))}") | |
except json.JSONDecodeError as e: | |
print(f"Error parsing JSON line {line_num}: {e}") | |
if self.config.debug and not self.config.quiet: | |
print(f" Raw line: {line}") | |
continue | |
if not self.config.quiet: | |
print(f"Parsed {len(log_entries)} log entries from {s3_key}") | |
return log_entries | |
except Exception as e: | |
print(f"Error downloading/parsing {s3_key}: {e}") | |
return [] | |
def filter_logs(self, log_entries): | |
"""Filter logs based on status code if configured""" | |
if self.config.filter_status_codes is None: | |
if self.config.debug and not self.config.quiet: | |
print(f" No status code filtering - keeping all {len(log_entries)} entries") | |
return log_entries | |
filtered = [] | |
for entry in log_entries: | |
# CloudFront real-time logs use 'sc-status' for status code | |
status_code = entry.get('sc-status', entry.get('status', 0)) | |
if status_code in self.config.filter_status_codes: | |
filtered.append(entry) | |
if self.config.debug and not self.config.quiet: | |
print(f" Keeping entry with status {status_code}: {json.dumps(entry, separators=(',', ':'))}") | |
elif self.config.debug and not self.config.quiet: | |
print(f" Filtering out entry with status {status_code}") | |
return filtered | |
def format_for_loki(self, log_entries, source_file): | |
"""Format log entries for Loki ingestion""" | |
loki_streams = {} | |
for entry in log_entries: | |
# Extract useful fields for labels | |
status_code = entry.get('sc-status', entry.get('status', 'unknown')) | |
method = entry.get('cs-method', entry.get('method', 'unknown')) | |
# Create label set | |
labels = { | |
'job': 'cloudfront', | |
'domain': self.config.domain, | |
'status_code': str(status_code), | |
'method': str(method) | |
} | |
# Convert labels to Loki format | |
label_string = '{' + ','.join([f'{k}="{v}"' for k, v in labels.items()]) + '}' | |
# Get timestamp (CloudFront logs include timestamp) | |
timestamp = entry.get('timestamp') | |
timestamp_ns = None | |
if timestamp: | |
# Handle ISO format timestamp | |
if isinstance(timestamp, str): | |
try: | |
# CloudFront timestamps are in UTC | |
if timestamp.endswith('Z'): | |
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) | |
elif '+' in timestamp or timestamp.endswith('UTC'): | |
dt = datetime.fromisoformat(timestamp.replace('UTC', '').strip()) | |
else: | |
# Assume UTC if no timezone specified | |
dt = datetime.fromisoformat(timestamp) | |
if dt.tzinfo is None: | |
dt = dt.replace(tzinfo=datetime.now().astimezone().tzinfo.utc) | |
timestamp_ns = int(dt.timestamp() * 1_000_000_000) | |
if self.config.debug and not self.config.quiet: | |
print(f" Parsed timestamp: {timestamp} -> {dt} -> {timestamp_ns}") | |
except Exception as e: | |
if self.config.debug and not self.config.quiet: | |
print(f" Error parsing timestamp '{timestamp}': {e}") | |
else: | |
timestamp_ns = int(timestamp * 1_000_000_000) | |
# Fallback to date + time fields if timestamp not available | |
if timestamp_ns is None: | |
date_str = entry.get('date') | |
time_str = entry.get('time') | |
if date_str and time_str: | |
try: | |
# Combine date and time: "2025-06-04" + "05:38:49" | |
# CloudFront date/time is in UTC | |
datetime_str = f"{date_str} {time_str}" | |
dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') | |
# Explicitly set as UTC | |
dt = dt.replace(tzinfo=datetime.now().astimezone().tzinfo.utc) | |
timestamp_ns = int(dt.timestamp() * 1_000_000_000) | |
if self.config.debug and not self.config.quiet: | |
print(f" Using date+time fallback: {datetime_str} UTC -> {timestamp_ns}") | |
except Exception as e: | |
if self.config.debug and not self.config.quiet: | |
print(f" Error parsing date+time: {e}") | |
# Final fallback to current time | |
if timestamp_ns is None: | |
timestamp_ns = int(time.time() * 1_000_000_000) | |
if self.config.debug and not self.config.quiet: | |
print(f" Using current time fallback") | |
# Create log line with structured data | |
log_line = json.dumps(entry) | |
# Group by label set | |
if label_string not in loki_streams: | |
loki_streams[label_string] = [] | |
loki_streams[label_string].append([str(timestamp_ns), log_line]) | |
# Convert to Loki format | |
streams = [] | |
for labels, values in loki_streams.items(): | |
# Parse labels back to dict | |
label_dict = {} | |
label_pairs = labels[1:-1].split(',') # Remove { and } | |
for pair in label_pairs: | |
k, v = pair.split('=', 1) | |
label_dict[k] = v.strip('"') | |
streams.append({ | |
'stream': label_dict, | |
'values': values | |
}) | |
return {'streams': streams} | |
def send_to_loki(self, loki_data): | |
"""Send formatted data to Loki""" | |
try: | |
response = requests.post( | |
self.config.loki_url, | |
json=loki_data, | |
headers={'Content-Type': 'application/json'}, | |
timeout=30 | |
) | |
if response.status_code == 204: | |
if not self.config.quiet: | |
print("Successfully sent logs to Loki") | |
return True | |
else: | |
print(f"Loki responded with status {response.status_code}: {response.text}") | |
return False | |
except Exception as e: | |
print(f"Error sending to Loki: {e}") | |
return False | |
def process_new_logs(self): | |
"""Main processing loop""" | |
if not self.config.quiet: | |
print(f"Checking for new CloudFront logs at {datetime.now()}") | |
# Clean up old state entries first | |
self.cleanup_old_state_entries() | |
new_files = self.get_new_log_files() | |
if not new_files: | |
if not self.config.quiet: | |
print("No new log files found") | |
return | |
if not self.config.quiet: | |
print(f"Found {len(new_files)} new log files") | |
for file_info in new_files: | |
s3_key = file_info['key'] | |
if not self.config.quiet: | |
print(f"\nProcessing {s3_key} (size: {file_info['size']} bytes)") | |
# Download and parse | |
log_entries = self.download_and_parse_log(s3_key) | |
if not log_entries: | |
if not self.config.quiet: | |
print(f"No log entries found in {s3_key}") | |
self.save_processed_file(s3_key) | |
continue | |
# Filter if configured | |
filtered_entries = self.filter_logs(log_entries) | |
if self.config.filter_status_codes and not self.config.quiet: | |
print(f"Filtered to {len(filtered_entries)} entries with status codes {self.config.filter_status_codes}") | |
if not filtered_entries: | |
if not self.config.quiet: | |
print(f"No entries match filter criteria") | |
self.save_processed_file(s3_key) | |
continue | |
# Format for Loki | |
loki_data = self.format_for_loki(filtered_entries, s3_key) | |
# Send to Loki | |
if self.config.dry_run: | |
if not self.config.quiet: | |
print(f"DRY RUN: Would send {len(loki_data['streams'])} streams to Loki") | |
self.save_processed_file(s3_key) | |
if not self.config.quiet: | |
print(f"Successfully processed {s3_key} (dry run)") | |
elif self.send_to_loki(loki_data): | |
self.save_processed_file(s3_key) | |
if not self.config.quiet: | |
print(f"Successfully processed {s3_key}") | |
else: | |
print(f"Failed to send {s3_key} to Loki - will retry next run") | |
def parse_args(): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser( | |
description='Process CloudFront logs and send them to Loki', | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Environment Variables (.env file support): | |
CF_S3_BUCKET S3 bucket name | |
CF_S3_PREFIX S3 prefix/path | |
CF_DOMAIN Domain name for Loki labels | |
CF_LOKI_URL Loki push URL | |
CF_STATE_FILE File to track processed logs | |
CF_STATUS_CODES Comma-separated status codes (e.g., "404,500") | |
CF_ALL_STATUS_CODES Set to "true" to process all status codes | |
CF_VERBOSE Set to "true" for verbose output | |
CF_DEBUG Set to "true" for debug output (shows every log entry) | |
CF_DRY_RUN Set to "true" for dry run mode | |
CF_STATE_CLEANUP_DAYS Days to keep in state file (default: 30) | |
CF_QUIET Set to "true" for quiet mode (only fatal errors) | |
Examples: | |
# Basic usage with defaults | |
%(prog)s | |
# Use custom .env file | |
%(prog)s --env-file /path/to/my.env | |
# Custom configuration | |
%(prog)s --bucket my-logs --prefix mysite.com/ --domain mysite.com | |
# Process all logs (not just 404s) | |
%(prog)s --all-status-codes | |
# Multiple status codes | |
%(prog)s --status-codes 404 500 503 | |
# Custom Loki URL (e.g., via Tailscale) | |
%(prog)s --loki-url http://100.x.x.x:3100/loki/api/v1/push | |
""" | |
) | |
parser.add_argument( | |
'--env-file', | |
help='Path to .env file (default: looks for .env, .env.local, ~/.cloudfront-loki.env)' | |
) | |
parser.add_argument( | |
'--bucket', '-b', | |
default=get_env_default('CF_S3_BUCKET', 'lmorchard-logs'), | |
help='S3 bucket name (default: lmorchard-logs, env: CF_S3_BUCKET)' | |
) | |
parser.add_argument( | |
'--prefix', '-p', | |
default=get_env_default('CF_S3_PREFIX', 'blog.lmorchard.com/'), | |
help='S3 prefix/path (default: blog.lmorchard.com/, env: CF_S3_PREFIX)' | |
) | |
parser.add_argument( | |
'--domain', '-d', | |
default=get_env_default('CF_DOMAIN', 'blog.lmorchard.com'), | |
help='Domain name for Loki labels (default: blog.lmorchard.com, env: CF_DOMAIN)' | |
) | |
parser.add_argument( | |
'--loki-url', '-l', | |
default=get_env_default('CF_LOKI_URL', 'http://localhost:3100/loki/api/v1/push'), | |
help='Loki push URL (default: http://localhost:3100/loki/api/v1/push, env: CF_LOKI_URL)' | |
) | |
parser.add_argument( | |
'--state-file', '-s', | |
default=get_env_default('CF_STATE_FILE', '/tmp/cloudfront_processed.txt'), | |
help='File to track processed logs (default: /tmp/cloudfront_processed.txt, env: CF_STATE_FILE)' | |
) | |
parser.add_argument( | |
'--state-cleanup-days', | |
type=int, | |
default=int(get_env_default('CF_STATE_CLEANUP_DAYS', '30')), | |
help='Days to keep entries in state file (default: 30, env: CF_STATE_CLEANUP_DAYS)' | |
) | |
# Handle status codes from environment | |
env_status_codes = get_env_default('CF_STATUS_CODES') | |
env_all_status = get_env_default('CF_ALL_STATUS_CODES', '').lower() in ('true', '1', 'yes', 'on') | |
# Default behavior: if no status codes specified anywhere, process all | |
default_status_codes = [404] # Only used if explicitly setting status codes | |
if env_status_codes: | |
try: | |
default_status_codes = [int(x.strip()) for x in env_status_codes.split(',')] | |
except ValueError: | |
print(f"Warning: Invalid CF_STATUS_CODES format '{env_status_codes}', using all status codes") | |
env_all_status = True | |
elif not env_all_status: | |
# If neither CF_STATUS_CODES nor CF_ALL_STATUS_CODES is set, default to all | |
env_all_status = True | |
parser.add_argument( | |
'--status-codes', | |
type=int, | |
nargs='+', | |
default=default_status_codes, | |
help='HTTP status codes to filter for (env: CF_STATUS_CODES as comma-separated)' | |
) | |
# Handle boolean flags from environment | |
parser.add_argument( | |
'--all-status-codes', | |
action='store_true', | |
default=env_all_status, | |
help='Process all status codes (overrides --status-codes, env: CF_ALL_STATUS_CODES)' | |
) | |
env_verbose = get_env_default('CF_VERBOSE', '').lower() in ('true', '1', 'yes', 'on') | |
parser.add_argument( | |
'--verbose', '-v', | |
action='store_true', | |
default=env_verbose, | |
help='Enable verbose output (env: CF_VERBOSE)' | |
) | |
env_debug = get_env_default('CF_DEBUG', '').lower() in ('true', '1', 'yes', 'on') | |
parser.add_argument( | |
'--debug', | |
action='store_true', | |
default=env_debug, | |
help='Enable debug output - shows every parsed log entry (env: CF_DEBUG)' | |
) | |
env_quiet = get_env_default('CF_QUIET', '').lower() in ('true', '1', 'yes', 'on') | |
parser.add_argument( | |
'--quiet', '-q', | |
action='store_true', | |
default=env_quiet, | |
help='Quiet mode - only show fatal errors (env: CF_QUIET)' | |
) | |
env_dry_run = get_env_default('CF_DRY_RUN', '').lower() in ('true', '1', 'yes', 'on') | |
parser.add_argument( | |
'--dry-run', | |
action='store_true', | |
default=env_dry_run, | |
help='Download and parse logs but do not send to Loki (env: CF_DRY_RUN)' | |
) | |
args = parser.parse_args() | |
# Handle mutually exclusive options | |
if args.quiet and (args.verbose or args.debug): | |
parser.error("--quiet cannot be used with --verbose or --debug") | |
# Handle --all-status-codes flag | |
if args.all_status_codes: | |
args.filter_status_codes = None | |
else: | |
args.filter_status_codes = args.status_codes | |
# Add attribute aliases for the config object | |
args.s3_bucket = args.bucket | |
args.s3_prefix = args.prefix | |
args.loki_url = args.loki_url | |
args.state_file = args.state_file | |
return args | |
def main(): | |
# Load environment variables first | |
load_env_file() | |
args = parse_args() | |
# Load specific .env file if provided | |
if args.env_file: | |
load_env_file(args.env_file) | |
# Re-parse with updated environment - but we need to be careful here | |
# Instead of re-parsing, just update the args with new env values | |
updated_args = parse_args() | |
args = updated_args | |
if args.verbose: | |
print(f"Configuration:") | |
print(f" S3 Bucket: {args.bucket}") | |
print(f" S3 Prefix: {args.prefix}") | |
print(f" Domain: {args.domain}") | |
print(f" Loki URL: {args.loki_url}") | |
print(f" State File: {args.state_file}") | |
print(f" State Cleanup Days: {args.state_cleanup_days}") | |
print(f" Status Filter: {args.filter_status_codes}") | |
print(f" Dry Run: {args.dry_run}") | |
print(f" Debug: {args.debug}") | |
print(f" Quiet: {args.quiet}") | |
if DOTENV_AVAILABLE: | |
print(f" .env support: Available") | |
else: | |
print(f" .env support: Not available (install python-dotenv)") | |
print() | |
processor = CloudFrontLogsProcessor(args) | |
processor.process_new_logs() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment