Created
February 22, 2025 11:25
-
-
Save DinisCruz/4cb46c184fdf0b7e4d896911dd1299cf to your computer and use it in GitHub Desktop.
working lambda code for the cloudwatch to OpenObserve connector https://openobserve.ai/blog/monitor-cloudfront-access-logs-kinesis-streams-amazon-data-firehose-guide
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import base64 | |
import gzip | |
import io | |
import logging | |
from urllib.parse import unquote_plus | |
# Configure logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def safe_convert_int(value, default=0): | |
"""Helper function to safely convert values to integers""" | |
try: | |
return int(value) if value and value != "-" else default | |
except ValueError: | |
return default | |
def process_field(value): | |
"""Helper function to process field values, converting '-' to None""" | |
return None if value == "-" else value | |
def lambda_handler(event, context): | |
output = [] | |
EXPECTED_FIELDS = 10 # Number of fields in Real-Time Logs configuration | |
for record in event['records']: | |
try: | |
# Decode base64-encoded data | |
payload = base64.b64decode(record['data']) | |
try: | |
# Try direct string parsing first (more efficient) | |
log_line = payload.decode('utf-8').strip() | |
except UnicodeDecodeError: | |
# Fallback to gzip if needed | |
with io.BytesIO(payload) as compressed_stream: | |
with gzip.GzipFile(fileobj=compressed_stream, mode='rb') as gz: | |
log_line = gz.read().decode('utf-8').strip() | |
# Remove any trailing newlines or carriage returns | |
log_line = log_line.rstrip('\r\n') | |
# Skip header lines | |
if log_line.startswith('#'): | |
continue | |
# Split into fields | |
fields = log_line.split('\t') | |
# Log warning if field count doesn't match expected | |
if len(fields) != EXPECTED_FIELDS: | |
logger.warning(f"Expected {EXPECTED_FIELDS} fields but got {len(fields)} fields") | |
logger.warning(f"Log line: {log_line[:200]}...") | |
# Create transformed log with only the Real-Time Log fields | |
transformed_log = { | |
"@timestamp": fields[0] if len(fields) > 0 else "", | |
"client_ip": process_field(fields[1] if len(fields) > 1 else "-"), | |
"status_code": safe_convert_int(fields[2] if len(fields) > 2 else "-"), | |
"http_method": process_field(fields[3] if len(fields) > 3 else "-"), | |
"uri_stem": process_field(fields[4] if len(fields) > 4 else "-"), | |
"edge_location": process_field(fields[5] if len(fields) > 5 else "-"), | |
"user_agent": unquote_plus(process_field(fields[6] if len(fields) > 6 else "-") or "-"), | |
"referer": process_field(fields[7] if len(fields) > 7 else "-"), | |
"edge_response_result_type": process_field(fields[8] if len(fields) > 8 else "-"), | |
"edge_result_type": process_field(fields[9] if len(fields) > 9 else "-") | |
} | |
# Convert to JSON string and encode in base64 | |
json_str = json.dumps(transformed_log) | |
encoded_data = base64.b64encode(json_str.encode('utf-8')).decode('utf-8') | |
output_record = { | |
'recordId': record['recordId'], | |
'result': 'Ok', | |
'data': encoded_data | |
} | |
logger.info("Processed record successfully") | |
output.append(output_record) | |
except Exception as e: | |
logger.error(f"Error processing record: {str(e)}") | |
logger.error(f"Raw record data: {record['data']}") | |
output_record = { | |
'recordId': record['recordId'], | |
'result': 'ProcessingFailed', | |
'data': record['data'] | |
} | |
output.append(output_record) | |
return {'records': output} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment