-
-
Save callum-p/63cff6654d41db3a3051c14081faf3ea to your computer and use it in GitHub Desktop.
Description: Deploys lambda functions to forward cloudwatch logs to logstash | |
Parameters: | |
coreNetworkingStackName: | |
Type: String | |
Resources: | |
lambdaRole: | |
Type: "AWS::IAM::Role" | |
Properties: | |
AssumeRolePolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: "lambda.amazonaws.com" | |
Action: "sts:AssumeRole" | |
ManagedPolicyArns: | |
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole | |
lambdaPolicy: | |
Type: "AWS::IAM::Policy" | |
Properties: | |
PolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Action: | |
- "logs:PutRetentionPolicy" | |
- "logs:describeLogGroups" | |
- "logs:putSubscriptionFilter" | |
- "logs:deleteSubscriptionFilter" | |
Resource: "*" | |
PolicyName: !Ref "AWS::StackName" | |
Roles: | |
- !Ref lambdaRole | |
cloudwatchPermission: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Ref processLogGroupFunction | |
Action: lambda:InvokeFunction | |
Principal: logs.amazonaws.com | |
SourceArn: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" | |
SourceAccount: !Ref "AWS::AccountId" | |
lambdaSecurityGroup: | |
Type: AWS::EC2::SecurityGroup | |
Properties: | |
GroupDescription: logstash cloudwatch log lambda processor | |
VpcId: | |
Fn::ImportValue: !Sub ${coreNetworkingStackName}:vpcId | |
processLogGroupFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '300' | |
VpcConfig: | |
SecurityGroupIds: | |
- !Ref lambdaSecurityGroup | |
SubnetIds: | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2A | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2B | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2C | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python3 | |
import socket | |
import sys | |
import json | |
import gzip | |
import copy | |
import base64 | |
import re | |
def transform(data): | |
new_data = copy.deepcopy(data) | |
new_data['@metadata'] = { | |
"beat": "lambda", | |
"version": "0.0.1" | |
} | |
if 'service' in data: | |
if data['service'] == 'blah': | |
new_data['@metadata']['beat'] = 'ecs' | |
if 'timestamp' in data: | |
del new_data['timestamp'] | |
new_data['lambda_timestamp'] = data['timestamp'] | |
if 'port' in data: | |
del new_data['port'] | |
return new_data | |
def send_log(data): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect(('logstash.xxx.internal', 5000)) | |
s.sendall(str(json.dumps(data)).encode('utf-8')) | |
s.send('\n'.encode('utf-8')) | |
s.close() | |
def handler(event, context): | |
decompressed = gzip.decompress( | |
base64.b64decode(event['awslogs']['data'])).decode('utf-8') | |
try: | |
data = json.loads(decompressed) | |
except Exception as e: | |
return | |
for str_event in data['logEvents']: | |
try: | |
e = json.loads(str_event['message']) | |
except Exception as e: | |
return | |
if 'level' in e: | |
if e['level'] == 'debug': | |
return | |
send_log(transform(e)) | |
if __name__ == '__main__': | |
handler(None, None) | |
updateAllLogGroupsFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '30' | |
Environment: | |
Variables: | |
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python3 | |
import boto3 | |
import os | |
c = boto3.client('logs') | |
bad_groups = [ | |
'core-networking', | |
's3av', | |
'healthcheck', | |
'updateLogGroupFunction', | |
'processLogGroupFunction' | |
] | |
good_groups = [ | |
'adamite', | |
'/aws/lambda' | |
] | |
def get_log_groups(): | |
groups = [] | |
params = {} | |
while True: | |
response = c.describe_log_groups(**params) | |
for group in response['logGroups']: | |
is_bad_group = False | |
for bg in bad_groups: | |
if bg in group['logGroupName']: | |
is_bad_group = True | |
if not is_bad_group: | |
for gg in good_groups: | |
if group['logGroupName'].startswith(gg): | |
groups.append(group['logGroupName']) | |
else: | |
delete_subscription(group['logGroupName']) | |
if 'nextToken' in response: | |
params['nextToken'] = response['nextToken'] | |
else: | |
break | |
return groups | |
def delete_subscription(group): | |
try: | |
c.delete_subscription_filter( | |
logGroupName=group, | |
filterName='logstash') | |
except Exception as e: | |
print(e) | |
def create_subscription(group): | |
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN') | |
c.put_subscription_filter( | |
logGroupName=group, | |
filterName='logstash', | |
filterPattern='', | |
destinationArn=sub_arn | |
) | |
def handler(event, context): | |
groups = get_log_groups() | |
for g in groups: | |
create_subscription(g) | |
# delete_subscription(g) | |
if __name__ == '__main__': | |
handler(None, None) | |
updateLogGroupFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '30' | |
Environment: | |
Variables: | |
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python | |
import boto3 | |
import json | |
import os | |
logs = boto3.client('logs') | |
bad_groups = [ | |
'core-networking', | |
's3av', | |
'healthcheck', | |
'updateLogGroupFunction', | |
'processLogGroupFunction' | |
] | |
good_groups = [ | |
'adamite', | |
'/aws/lambda' | |
] | |
def create_subscription(group): | |
for g in bad_groups: | |
if g in group: | |
return | |
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN') | |
logs.put_subscription_filter( | |
logGroupName=group, | |
filterName='logstash', | |
filterPattern='', | |
destinationArn=sub_arn | |
) | |
def handler(event, context): | |
try: | |
log_group = event['detail']['requestParameters']['logGroupName'] | |
except Exception as e: | |
print(e) | |
print(json.dumps(event)) | |
if log_group.startswith('/aws/lambda'): | |
for gg in good_groups: | |
if log_group.startswith(gg): | |
create_subscription(log_group) | |
break | |
if __name__ == '__main__': | |
handler(None, None) | |
logCreateEventPermission: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Ref updateLogGroupFunction | |
Action: lambda:InvokeFunction | |
Principal: events.amazonaws.com | |
SourceArn: !GetAtt logCreateEvent.Arn | |
logCreateEvent: | |
Type: AWS::Events::Rule | |
Properties: | |
Description: Triggers logstash subscription on new log groups | |
State: ENABLED | |
Targets: | |
- Arn: !GetAtt updateLogGroupFunction.Arn | |
Id: updateLogGroupFunction | |
EventPattern: | | |
{ | |
"source": [ | |
"aws.logs" | |
], | |
"detail-type": [ | |
"AWS API Call via CloudTrail" | |
], | |
"detail": { | |
"eventSource": [ | |
"logs.amazonaws.com" | |
], | |
"eventName": [ | |
"CreateLogGroup" | |
] | |
} | |
} |
I don't understand your question sorry.
@callum-p, I can rephrase my question. In your code above you said, "Description: Deploys lambda functions to forward cloudwatch logs to logstash", so my question is do I need to create a lambda function using this given code or should I replace my logstash.yml file with this code in my directory?
I don't understand where to place this code. I appreciate some clarification.
This is a cloudformation template. You would need to update https://gist.github.com/callum-p/63cff6654d41db3a3051c14081faf3ea#file-logstash-cloudwatch-yml-L104 to point it to your logstash server. You would also need to run the updateAllLogGroupsFunction function manually via the lambda console to update all your existing log groups (based on the whitelists defined in the code).
To be honest though, if you don't understand it you probably won't be able to get it working. I'm happy to help, but not walk you through step by step.
Thanks for letting me know what to change, but only one thing is unclear to me, where should I place this code? Should I place it in my project directory or should I create a lambda function in AWS using this code? If I understand this, I will find a way to use it.
It's a cloudformation template. You deploy the file via AWS cloudformation.
Ok great, I will look into it. Thank you :)
@callum-p, I really appreciate if you can let me know. It saves me a lot of time.