Created
January 24, 2019 07:54
-
-
Save mrmoneyc/03536a20ab749cc45f54f27bc87a74a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import json | |
import argparse | |
import boto3 | |
# Parsing paramateres | |
parser = argparse.ArgumentParser(description='Change WAF Rule Action') | |
parser.add_argument('--web-acl-id', dest='web_acl_id', type=str, required=True, help='WAF Web ACL ID') | |
parser.add_argument('--action', dest='action_type', type=str, default='COUNT', help='WAF Rule Action: ALLOW/BLOCK/COUNT, Default: COUNT') | |
parser.add_argument('--web-acl-default-action', dest='default_action', type=str, default='ALLOW', help='Web ACL Default Action: ALLOW/BLOCK, Default: ALLOW') | |
parser.add_argument('--regional', dest='is_regional', action='store_true', help='Use regional') | |
parser.add_argument('--global', dest='is_regional', action='store_false', help='Use global') | |
parser.set_defaults(is_regional=False) | |
args = parser.parse_args() | |
client = boto3.client('waf') | |
print(args.is_regional) | |
if args.is_regional: | |
client = boto3.client('waf-regional') | |
def get_change_token(): | |
response = client.get_change_token() | |
return response['ChangeToken'] | |
def remove_web_acl_rule(current_rules, web_acl_id, action_type, default_action): | |
token = get_change_token() | |
payload_updates = [] | |
for rule in current_rules: | |
print('WebACL ID: {}, Rule ID: {}, ChangeToken: {}'.format(web_acl_id, rule['RuleId'], token)) | |
payload_updates.append({ | |
"Action": "DELETE", | |
"ActivatedRule": rule | |
}) | |
return client.update_web_acl(WebACLId=web_acl_id, ChangeToken=token, Updates=payload_updates, DefaultAction={'Type': default_action}) | |
def insert_web_acl_rule(current_rules, web_acl_id, action_type, default_action): | |
token = get_change_token() | |
payload_updates = [] | |
for rule in current_rules: | |
print('WebACL ID: {}, Rule ID: {}, ChangeToken: {}'.format(web_acl_id, rule['RuleId'], token)) | |
payload_updates.append({ | |
"Action": "INSERT", | |
"ActivatedRule": { | |
"Priority": rule['Priority'], | |
"RuleId": rule['RuleId'], | |
"Action": { | |
"Type": action_type | |
}, | |
"Type": rule['Type'] | |
} | |
}) | |
return client.update_web_acl(WebACLId=web_acl_id, ChangeToken=token, Updates=payload_updates, DefaultAction={'Type': default_action}) | |
def main(): | |
web_acl_id = args.web_acl_id | |
action_type = args.action_type | |
default_action = args.default_action | |
response = client.get_web_acl(WebACLId=web_acl_id) | |
for rules in response['WebACL']['Rules']: | |
print(rules) | |
print(remove_web_acl_rule(response['WebACL']['Rules'], web_acl_id, action_type, default_action)) | |
print(insert_web_acl_rule(response['WebACL']['Rules'], web_acl_id, action_type, default_action)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment