Last active
June 2, 2020 12:22
-
-
Save davidrosenstark/3d1e87638556f043c3d0dbeb1e21f690 to your computer and use it in GitHub Desktop.
error_log_retriever
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
from datetime import timedelta | |
import boto3 | |
from dateutil.parser import parse | |
environment = os.environ.get("ENV") | |
logger = logging.getLogger(__name__) | |
client = boto3.client('logs') | |
sns_client = boto3.client('sns') | |
#How many events to send to notification so the message is not too large | |
MAX_NUM_EVENTS = 3 | |
sns_arn = os.environ.get("NOTIFY_ARN") | |
def get_log_group_name(message): | |
if message['Trigger']['Namespace'] == 'AWS/Lambda': | |
# genericize for dimension FunctionName | |
func_name = list(filter(lambda x: x['name'] == 'FunctionName', message['Trigger']['Dimensions']))[0]['value'] | |
return "/aws/lambda/" + func_name | |
else: | |
raise Exception("not an alarm related to log filters") | |
def send_filter_log_events_request(log_group_name, alarm_time, nextToken=None): | |
args = {'logGroupName': log_group_name, 'endTime': int((alarm_time.timestamp()) * 1000), | |
'startTime': int((alarm_time - timedelta(minutes=10)).timestamp() * 1000), | |
'filterPattern': "?ERROR ?\"Task timed out\"", | |
'interleaved': True | |
} | |
if nextToken is not None: | |
args['nextToken'] = nextToken | |
return client.filter_log_events(**args) | |
def get_logs(log_group_name, alarm_time): | |
response = send_filter_log_events_request(log_group_name, alarm_time) | |
# remove garbage from the logs | |
events = [e['message'].replace("\u00a0"," ") for e in response['events']] | |
while 'nextToken' in response: | |
response = send_filter_log_events_request(log_group_name, alarm_time, response['nextToken']) | |
events += [e['message'].replace("\u00a0"," ") for e in response['events']] | |
return events | |
def lambda_handler(event, _): | |
logger.debug(f"incoming message: {event['Records']}") | |
for record in event['Records']: | |
try: | |
sns_message = record['Sns'] | |
alarm_time = parse(sns_message['Timestamp']) | |
message = json.loads(sns_message['Message']) | |
log_group_name = get_log_group_name(message) | |
logs = get_logs(log_group_name, alarm_time) | |
if len(logs) > 0: | |
alert_message = {'LambdaWithErrors': log_group_name[log_group_name.rfind("/")+1:], | |
'num_errors': len(logs), | |
'error_logs': logs[:5]} | |
logger.debug(f"sending alert message: {alert_message}") | |
sns_client.publish(TopicArn=sns_arn, Message=json.dumps(alert_message)) | |
else: | |
logger.warning("Failed to retrieve log records for error") | |
except Exception as ex: | |
logger.info(f"exception: {ex}") | |
logger.info(f"not processing: {record}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment