Created
April 18, 2018 14:00
-
-
Save laughingman7743/514efbffae88a0d45df897209d2ab34e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Unless explicitly stated otherwise all files in this repository are licensed | |
# under the Apache License Version 2.0. | |
# This product includes software developed at Datadog (https://www.datadoghq.com/). | |
# Copyright 2017 Datadog, Inc. | |
from __future__ import print_function | |
import base64 | |
import gzip | |
import json | |
import logging | |
import os | |
import socket | |
import ssl | |
import time | |
import boto3 | |
_logger = logging.getLogger(__name__) | |
_logger.setLevel(int(os.getenv('LOGGING_LEVEL', logging.INFO))) | |
REGION = os.environ['AWS_DEFAULT_REGION'] | |
SSM_CLIENT = boto3.client('ssm', region_name=REGION) | |
DD_API_KEY = SSM_CLIENT.get_parameter(Name=os.environ['DD_API_KEY'], | |
WithDecryption=True)['Parameter']['Value'] | |
DD_HOST = os.getenv('DD_HOST', 'intake.logs.datadoghq.com') | |
DD_SSL_PORT = int(os.getenv('DD_SSL_PORT', 10516)) | |
DD_SOURCE = 'ddsource' | |
DD_SOURCE_CATEGORY = 'ddsourcecategory' | |
DD_CUSTOM_TAGS = 'ddtags' | |
def lambda_handler(event, context): | |
# Attach Datadog's Socket | |
socket_ = connect_to_datadog(DD_HOST, DD_SSL_PORT) | |
try: | |
logs = generate_logs(event, context) | |
for log in logs: | |
socket_ = safe_submit_log(socket_, log) | |
except Exception as e: | |
logger.exception('Unexpected exception: {0} for event {1}'.format(str(e), event)) | |
finally: | |
socket_.close() | |
def connect_to_datadog(host, port): | |
socket_ = ssl.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) | |
socket_.connect((host, port)) | |
return socket_ | |
def generate_logs(event, context): | |
try: | |
# Route to the corresponding parser | |
logs = kinesis_handler(event, context) | |
except Exception as e: | |
# Logs through the socket the error | |
err_message = 'Error parsing the object. Exception: {0} for event {1}'.format(str(e), event) | |
_logger.exception(err_message) | |
logs = [err_message] | |
return logs | |
def safe_submit_log(socket_, log): | |
try: | |
send_entry(socket_, log) | |
except Exception as e: | |
# retry once | |
_logger.exception('Sending to datadog failed. Retry once.') | |
time.sleep(30) | |
socket_ = connect_to_datadog(DD_HOST, DD_SSL_PORT) | |
send_entry(socket_, log) | |
return socket_ | |
def kinesis_handler(event, context): | |
structured_logs = [] | |
for record in event['Records']: | |
# Create structured object and send it | |
data = json.loads(gzip.decompress(base64.b64decode(record['kinesis']['data'])).decode('utf-8')) | |
for log in data['logEvents']: | |
custom_tags = { | |
'function_name': context.function_name, | |
'log_group': data['logGroup'], | |
'log_stream': data['logStream'], | |
'aws_region': record['awsRegion'], | |
'event_source': record['eventSource'], | |
'event_source_name': record['eventSourceARN'].split('/')[-1], | |
'event_name': record['eventName'], | |
'event_version': record['eventVersion'], | |
} | |
structured_line = { | |
'id': log['id'], | |
'timestamp': log['timestamp'], | |
'message': log['message'], | |
'message_type': data['messageType'], | |
'owner': data['owner'], | |
'log_group': data['logGroup'], | |
'log_stream': data['logStream'], | |
'subscription_filters': data['subscriptionFilters'], | |
'kinesis': { | |
'kinesis_schema_version': record['kinesis']['kinesisSchemaVersion'], | |
'partition_key': record['kinesis']['partitionKey'], | |
'sequence_number': record['kinesis']['sequenceNumber'], | |
'approximate_arrival_timestamp': record['kinesis']['approximateArrivalTimestamp'], | |
}, | |
'aws': { | |
'event_source': record['eventSource'], | |
'event_name': record['eventName'], | |
'event_version': record['eventVersion'], | |
'event_id': record['eventID'], | |
'invoke_identity_arn': record['invokeIdentityArn'], | |
'aws_region': record['awsRegion'], | |
'event_source_arn': record['eventSourceARN'], | |
'function_version': context.function_version, | |
'invoked_function_arn': context.invoked_function_arn, | |
}, | |
DD_SOURCE: 'kinesis', | |
DD_SOURCE_CATEGORY: 'aws', | |
# Add custom tags here by adding new value with the following format "key1:value1, key2:value2" | |
DD_CUSTOM_TAGS: ','.join(['{0}:{1}'.format(k, v) for k, v in custom_tags.items()]) | |
} | |
structured_logs.append(structured_line) | |
return structured_logs | |
def send_entry(socket_, log_entry): | |
# The log_entry can only be a string or a dict | |
if isinstance(log_entry, str): | |
log_entry = {'message': log_entry} | |
elif not isinstance(log_entry, dict): | |
raise Exception( | |
'Cannot send the entry as it must be either a string or a dict. Provided entry: ' + str(log_entry)) | |
# Send to Datadog | |
str_entry = json.dumps(log_entry) | |
prefix = '{0} '.format(DD_API_KEY) | |
return socket_.send((prefix + str_entry + '\n').encode('UTF-8')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment