Created
December 12, 2018 18:59
-
-
Save juanesech/c7b48802083acb31f8b49ccec579c6c0 to your computer and use it in GitHub Desktop.
Create a ip set based rule and ACL on AWS WAF using boto3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import sys | |
import fileinput | |
client = boto3.client('waf') | |
app_name = sys.argv[1] | |
acl_action = sys.argv[2] | |
rule_action = sys.argv[3] | |
ip_set = '' | |
rule = '' | |
acl = '' | |
def get_token(): | |
return client.get_change_token()['ChangeToken'] | |
def create_ip_set(app): | |
return client.create_ip_set( | |
Name=app + '-WAF-WitheList-IPs', | |
ChangeToken=get_token() | |
) | |
def update_ip_set(ip_set_id): | |
return client.update_ip_set( | |
IPSetId=ip_set_id, | |
ChangeToken=get_token(), | |
Updates=[ | |
{ | |
'Action': 'INSERT', | |
'IPSetDescriptor': { | |
'Type': 'IPV4', | |
'Value': '200.46.145.2/32' | |
} | |
}, | |
] | |
) | |
def create_rule(app): | |
return client.create_rule( | |
Name=app + '-WAF-WitheList-Rule', | |
MetricName=app + 'WAFWitheListRuleMetric', | |
ChangeToken=get_token() | |
) | |
def update_rule(rule_id, ip_set_id): | |
return client.update_rule( | |
RuleId=rule_id, | |
ChangeToken=get_token(), | |
Updates=[ | |
{ | |
'Action': 'INSERT', | |
'Predicate': { | |
'Negated': False, | |
'Type': 'IPMatch', | |
'DataId': ip_set_id | |
} | |
}, | |
] | |
) | |
def create_acl(app, action): | |
return client.create_web_acl( | |
Name=app + '-WAF-WitheList-ACL', | |
MetricName=app + 'WAFWitheListACLMetric', | |
DefaultAction={ | |
'Type': action | |
}, | |
ChangeToken=get_token() | |
) | |
def update_acl(rule_id, acl_id, acl_act, rule_act): | |
return client.update_web_acl( | |
WebACLId=acl_id, | |
ChangeToken=get_token(), | |
Updates=[ | |
{ | |
'Action': 'INSERT', | |
'ActivatedRule': { | |
'Priority': 1, | |
'RuleId': rule_id, | |
'Action': { | |
'Type': rule_act | |
} | |
} | |
}, | |
], | |
DefaultAction={ | |
'Type': acl_act | |
} | |
) | |
def replace_acl_id(acl_id): | |
with fileinput.FileInput('parameters.json', inplace=True) as file: | |
for line in file: | |
print(line.replace('ACL_ID_REPLACE', acl_id), end='') | |
ip_sets_list = client.list_ip_sets( | |
Limit=100 | |
) | |
for i in ip_sets_list['IPSets']: | |
if i['Name'] == app_name + '-WAF-WitheList-IPs': | |
ip_set = i | |
if not ip_set: | |
print('IPSet Not found, creating') | |
ip_set = create_ip_set(app_name)['IPSet'] | |
update_ip_set(ip_set['IPSetId']) | |
else: | |
print('Updating IPSet') | |
update_ip_set(ip_set['IPSetId']) | |
rules_list = client.list_rules( | |
Limit=100 | |
) | |
for i in rules_list['Rules']: | |
if i['Name'] == app_name + '-WAF-WitheList-Rule': | |
rule = i | |
if not rule: | |
print('Rule Not found, creating') | |
rule = create_rule(app_name)['Rule'] | |
update_rule(rule['RuleId'], ip_set['IPSetId']) | |
else: | |
print('Updating Rule') | |
update_rule(rule['RuleId'], ip_set['IPSetId']) | |
acls_list = client.list_web_acls( | |
Limit=100 | |
) | |
for i in acls_list['WebACLs']: | |
if i['Name'] == app_name + '-WAF-WitheList-ACL': | |
acl = i | |
if not acl: | |
print('WebACL Not found, creating') | |
acl = create_acl(app_name, 'ALLOW')['WebACL'] | |
print('ACL ID: ' + acl['WebACLId']) | |
update_acl(rule['RuleId'], ip_set['IPSetId'], acl['WebACLId'], acl_action, rule_action) | |
else: | |
print('Updating WebACL') | |
print('ACL ID: ' + acl['WebACLId']) | |
update_acl(rule['RuleId'], acl['WebACLId'], acl_action, rule_action) | |
replace_acl_id(acl['WebACLId']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment