Last active
July 25, 2024 04:06
-
-
Save adamcousins/fc04fe9fb5e1e4749cc49222a0f7170b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
import boto3 | |
import botocore | |
from botocore.config import Config | |
def list_waf_acls_v2_regional(wafv2client): | |
list_of_acls = wafv2client.list_web_acls(Scope='REGIONAL')['WebACLs'] | |
print('The following V2 REGIONAL WebACLs have been discovered.....') | |
print(str(list_of_acls)) | |
return list_of_acls | |
def list_waf_acls_v2_cloudfront(wafv2client): | |
list_of_acls = wafv2client.list_web_acls(Scope='CLOUDFRONT')['WebACLs'] | |
print('The following V2 CLOUDFRONT WebACLs have been discovered.....') | |
print(str(list_of_acls)) | |
return list_of_acls | |
def list_waf_acls_v1(wafclient): | |
list_of_acls = wafclient.list_web_acls()['WebACLs'] | |
print("") | |
print('The following V1 WebACLs have been discovered.....') | |
print(str(list_of_acls)) | |
return list_of_acls | |
def get_waf_acl_v2_cloudfront(webACL, wafv2client): | |
webACLArn = wafv2client.get_web_acl(Name=webACL['Name'],Id=webACL['Id'],Scope='CLOUDFRONT')['WebACL']['ARN'] | |
#print(webACLArn) | |
return webACLArn | |
def get_waf_acl_v2_regional(webACL, wafv2client): | |
webACLArn = wafv2client.get_web_acl(Name=webACL['Name'],Id=webACL['Id'],Scope='REGIONAL')['WebACL']['ARN'] | |
print(webACLArn) | |
return webACLArn | |
def get_waf_acl_v1(webACL, wafclient): | |
webACLArn = wafclient.get_web_acl(WebACLId=webACL['WebACLId'])['WebACL']['WebACLArn'] | |
print(webACLArn) | |
return webACLArn | |
def get_waf_acl_v2_logging_config(webACLArn, wafv2client): | |
webACLLoggingConfig = {"ResourceArn": webACLArn, "LogDestinationConfigs":[] } | |
try: | |
webACLLoggingConfig = wafv2client.get_logging_configuration(ResourceArn=webACLArn,LogType='WAF_LOGS',LogScope='CUSTOMER')['LoggingConfiguration'] | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == 'WAFNonexistentItemException': | |
print("Logging Disabled") | |
else: | |
print("Unexpected error: %s" % e) | |
print("") | |
print('The current logging config for web ACL ' + str(webACLArn) + ' is ....') | |
print(str(webACLLoggingConfig)) | |
return webACLLoggingConfig | |
def get_waf_acl_v1_logging_config(webACLArn, wafclient): | |
try: | |
webACLLoggingConfig = wafclient.get_logging_configuration(ResourceArn=webACLArn)['LoggingConfiguration'] | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == 'WAFNonexistentItemException': | |
print("Logging Disabled") | |
webACLLoggingConfig = {"ResourceArn": webACLArn, "LogDestinationConfigs":[] } | |
else: | |
print("Unexpected error: %s" % e) | |
print("") | |
print('The current logging config for web ACL ' + str(webACLArn) + ' is ....') | |
print(str(webACLLoggingConfig)) | |
return webACLLoggingConfig | |
def put_waf_acl_v2_logging_config(webACLArn, loggingConfig, fireHoseARN, wafv2client): | |
updateConfig = {'LogDestinationConfigs': [fireHoseARN]} | |
loggingConfig.update(updateConfig) | |
webACLLoggingConfig = wafv2client.put_logging_configuration(LoggingConfiguration=loggingConfig) | |
print("") | |
print('The updated logging config for web ACL ' + str(webACLArn) + ' is ....') | |
print(str(webACLLoggingConfig)) | |
return webACLLoggingConfig | |
def put_waf_acl_v1_logging_config(webACLArn, loggingConfig, fireHoseARN, wafclient): | |
updateConfig = {'LogDestinationConfigs': [fireHoseARN]} | |
loggingConfig.update(updateConfig) | |
webACLLoggingConfig = wafclient.put_logging_configuration(LoggingConfiguration=loggingConfig) | |
print("") | |
print('The updated logging config for web ACL ' + str(webACLArn) + ' is ....') | |
print(str(webACLLoggingConfig)) | |
return webACLLoggingConfig | |
#### Actions ### | |
def put_v2_regional_logging_config(region, firehoseArn): | |
wafv2client = boto3.client('wafv2', region_name=region) | |
listOfWebACLArns = list_waf_acls_v2_regional(wafv2client) | |
for item in listOfWebACLArns: | |
print("") | |
print('The following WebACL is being modified') | |
print(str(item)) | |
webACLArn = get_waf_acl_v2_regional(item, wafv2client) | |
loggingConfig = get_waf_acl_v2_logging_config(webACLArn, wafv2client) | |
logDest = loggingConfig['LogDestinationConfigs'] | |
if firehoseArn not in logDest[0]: | |
print('updating ' + str(item)) | |
put_waf_acl_v2_logging_config(item, loggingConfig, firehoseArn, wafv2client) | |
else: | |
print("") | |
print('skipped updating ' + str(item) + ' as it already is set') | |
get_waf_acl_v2_logging_config(webACLArn, wafv2client) | |
print('done') | |
def put_v2_cloudfront_logging_config(region, firehoseArn): | |
wafv2client = boto3.client('wafv2', region_name=region) | |
listOfWebACLArns = list_waf_acls_v2_cloudfront(wafv2client) | |
for item in listOfWebACLArns: | |
print("") | |
print('The following WebACL is being modified') | |
print(str(item)) | |
webACLArn = get_waf_acl_v2_cloudfront(item, wafv2client) | |
loggingConfig = get_waf_acl_v2_logging_config(webACLArn, wafv2client) | |
logDest = loggingConfig['LogDestinationConfigs'] | |
if firehoseArn not in logDest[0]: | |
print('updating ' + str(item)) | |
put_waf_acl_v2_logging_config(item, loggingConfig, firehoseArn, wafv2client) | |
else: | |
print("") | |
print('skipped updating ' + str(item) + ' as it already is set') | |
get_waf_acl_v2_logging_config(webACLArn, wafv2client) | |
print('done') | |
def put_v1_cf_logging_config(region, firehoseArn): | |
wafclient = boto3.client('waf', region_name=region) | |
listOfWebACLArns = list_waf_acls_v1(wafclient) | |
for item in listOfWebACLArns: | |
print("") | |
print('The following WebACL is being modified') | |
print(str(item)) | |
webACLArn = get_waf_acl_v1(item, wafclient) | |
loggingConfig = get_waf_acl_v1_logging_config(webACLArn, wafclient) | |
logDest = loggingConfig['LogDestinationConfigs'] | |
if firehoseArn not in logDest[0]: | |
print('updating ' + str(item)) | |
put_waf_acl_v1_logging_config(item, loggingConfig, firehoseArn, wafclient) | |
else: | |
print("") | |
print('skipped updating ' + str(item) + ' as it already is set') | |
get_waf_acl_v1_logging_config(webACLArn, wafclient) | |
print('done') | |
def put_v1_regional_logging_config(region, firehoseArn): | |
wafclient = boto3.client('waf-regional', region_name=region) | |
listOfWebACLArns = list_waf_acls_v1(wafclient) | |
for item in listOfWebACLArns: | |
print("") | |
print('The following WebACL is being modified') | |
print(str(item)) | |
webACLArn = get_waf_acl_v1(item, wafclient) | |
loggingConfig = get_waf_acl_v1_logging_config(webACLArn, wafclient) | |
logDest = loggingConfig['LogDestinationConfigs'] | |
if firehoseArn not in logDest[0]: | |
print("") | |
print('updating ' + str(item)) | |
put_waf_acl_v1_logging_config(item, loggingConfig, firehoseArn, wafclient) | |
else: | |
print("") | |
print('skipped updating ' + str(item) + ' as it already is set') | |
get_waf_acl_v1_logging_config(webACLArn, wafclient) | |
print('done') | |
def get_acc_id(): | |
stsclient = boto3.client('sts') | |
response = stsclient.get_caller_identity()['Account'] | |
return response | |
def main(): | |
account_id = get_acc_id() | |
regional_firehose='arn:aws:firehose:ap-southeast-2:'+ account_id +':deliverystream/aws-waf-logs-org-waf-access-logs-'+ account_id +'-ap-southeast-2' | |
global_firehose='arn:aws:firehose:us-east-1:'+ account_id +':deliverystream/aws-waf-logs-org-waf-access-logs-'+ account_id +'-us-east-1' | |
put_v2_regional_logging_config('ap-southeast-2', regional_firehose) | |
put_v2_cloudfront_logging_config('us-east-1', global_firehose) | |
put_v1_cf_logging_config('us-east-1', global_firehose) | |
put_v1_regional_logging_config('ap-southeast-2', regional_firehose) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment