-
-
Save snixon/059b0a0edf87e9a34d020bb2c9546874 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| import os | |
| import boto3 | |
| import argparse | |
| import json | |
| # `pip install -U PTable` will get you the right fork of PrettyTable | |
| from prettytable import PrettyTable | |
| from botocore.exceptions import ClientError | |
| # Tags in this list will be checked against any tags Security Groups may have on them | |
| # If a match is found, the SG will be excluded. Matches are case insensitive for both key and value | |
| exclusion_tags = [{"Key": "ephemeral", "Value": "true"}] | |
| try: | |
| parser = argparse.ArgumentParser(description="Find and delete unused Security Groups") | |
| parser.add_argument( | |
| "-r", "--region", type=str, default="us-east-1", help="The default region is us-east-1" | |
| ) | |
| parser.add_argument( | |
| "-p", | |
| "--profile", | |
| type=str, | |
| default="default", | |
| help="The AWS profile to use for the connection", | |
| ) | |
| parser.add_argument( | |
| "-d", "--delete", action="store_true", help="Try to delete the security groups we find" | |
| ) | |
| parser.add_argument("--dry-run", dest="dry_run", action="store_true", help="Simulate deletes") | |
| parser.add_argument( | |
| "--todos", dest="all_regions", action="store_true", help="Run on each region in turn" | |
| ) | |
| parser.add_argument( | |
| "--json", | |
| dest="json_output", | |
| action="store_true", | |
| help="Output JSON Doc of rules for each SG to be deleted", | |
| ) | |
| parser.add_argument( | |
| "-q", | |
| "--quiet", | |
| action="store_true", | |
| help="Don't show summaries for non-deletable resources", | |
| ) | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| action="store", | |
| default="output", | |
| help="Optional directory prefix for the output json files if json_output is specified.", | |
| ) | |
| args = parser.parse_args() | |
| session = boto3.session.Session(profile_name=args.profile) | |
| regions = [] | |
| if args.all_regions: | |
| for region in session.get_available_regions("ec2"): | |
| regions.append(region) | |
| else: | |
| regions.append(args.region) | |
| for region in regions: | |
| ec2 = session.resource("ec2", region_name=region) | |
| client = session.client("ec2", region_name=region) | |
| acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0] | |
| acct_id = session.client("sts").get_caller_identity().get("Account") | |
| all_groups = [] | |
| security_groups_in_use = [] | |
| rule_referenced_sgs = [] | |
| tag_excluded_sgs = [] | |
| def lookup_by_id(sgid): | |
| sg = ec2.get_all_security_groups(group_ids=sgid) | |
| return sg[0].name | |
| # Get ALL security groups names | |
| try: | |
| security_groups_dict = client.describe_security_groups() | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "AuthFailure": | |
| if args.quiet: | |
| continue | |
| else: | |
| error_table = PrettyTable(["Error Message"]) | |
| error_table.add_row( | |
| ["Authentication Failure: You may not have access to this Region"] | |
| ) | |
| print( | |
| error_table.get_string( | |
| title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
| ) | |
| ) | |
| continue | |
| security_groups = security_groups_dict["SecurityGroups"] | |
| for groupobj in security_groups: | |
| if ( | |
| groupobj["GroupName"] == "default" | |
| or groupobj["GroupName"].startswith("d-") | |
| or groupobj["GroupName"].startswith("AWS-OpsWorks-") | |
| ): | |
| security_groups_in_use.append(groupobj["GroupId"]) | |
| for ruleset in groupobj["IpPermissions"]: | |
| if len(ruleset["UserIdGroupPairs"]) > 0: | |
| for group in ruleset["UserIdGroupPairs"]: | |
| rule_referenced_sgs.append(group["GroupId"]) | |
| if len(exclusion_tags) > 0: | |
| if "Tags" in groupobj: | |
| for tag_group in exclusion_tags: | |
| for tags in groupobj["Tags"]: | |
| if str(tag_group).casefold() == str(tags).casefold(): | |
| tag_excluded_sgs.append(groupobj["GroupId"]) | |
| all_groups.append(groupobj["GroupId"]) | |
| total_groups = len(all_groups) | |
| # Prune the groups that are referenced by other groups | |
| for group in rule_referenced_sgs: | |
| if group in all_groups: | |
| all_groups.remove(group) | |
| security_groups_in_use.append(group) | |
| for group in tag_excluded_sgs: | |
| if group in all_groups: | |
| all_groups.remove(group) | |
| security_groups_in_use.append(group) | |
| # Get all security groups used by instances | |
| instances_dict = client.describe_instances() | |
| reservations = instances_dict["Reservations"] | |
| network_interface_count = 0 | |
| for i in reservations: | |
| for j in i["Instances"]: | |
| for k in j["SecurityGroups"]: | |
| if k["GroupId"] not in security_groups_in_use: | |
| security_groups_in_use.append(k["GroupId"]) | |
| # Security Groups in use by Network Interfaces | |
| eni_dict = client.describe_network_interfaces() | |
| for i in eni_dict["NetworkInterfaces"]: | |
| for j in i["Groups"]: | |
| if j["GroupId"] not in security_groups_in_use: | |
| security_groups_in_use.append(j["GroupId"]) | |
| # Security groups used by classic ELBs | |
| elb_client = session.client("elb", region_name=region) | |
| elb_dict = elb_client.describe_load_balancers() | |
| for i in elb_dict["LoadBalancerDescriptions"]: | |
| for j in i["SecurityGroups"]: | |
| if j not in security_groups_in_use: | |
| security_groups_in_use.append(j) | |
| # Security groups used by ALBs | |
| elb2_client = session.client("elbv2", region_name=region) | |
| elb2_dict = elb2_client.describe_load_balancers() | |
| for i in elb2_dict["LoadBalancers"]: | |
| if "SecurityGroups" in i.keys(): | |
| for j in i["SecurityGroups"]: | |
| if j not in security_groups_in_use: | |
| security_groups_in_use.append(j) | |
| # Security groups used by RDS | |
| rds_client = session.client("rds", region_name=region) | |
| rds_dict = rds_client.describe_db_instances() | |
| for i in rds_dict["DBInstances"]: | |
| for j in i["VpcSecurityGroups"]: | |
| if j["VpcSecurityGroupId"] not in security_groups_in_use: | |
| security_groups_in_use.append(j["VpcSecurityGroupId"]) | |
| delete_candidates = [] | |
| for group in all_groups: | |
| if group not in security_groups_in_use: | |
| delete_candidates.append(group) | |
| if args.json_output: | |
| region_dict = {} | |
| # Create json docs in directory structure account_id/vpc_id/sg_id.json | |
| path = "./{}/{}".format(args.output, acct_id) | |
| os.makedirs(path, exist_ok=True) | |
| for group in sorted(delete_candidates): | |
| security_group = ec2.SecurityGroup(group) | |
| sg_doc = { | |
| "id": security_group.id, | |
| "region": region, | |
| "name": security_group.group_name, | |
| "description": security_group.description, | |
| "owner_id": security_group.owner_id, | |
| "vpc_id": security_group.vpc_id, | |
| "tags": security_group.tags, | |
| "ingress_rules": security_group.ip_permissions, | |
| "egress_rules": security_group.ip_permissions_egress, | |
| } | |
| if region not in region_dict: | |
| region_dict[region] = [] | |
| region_dict[region].append({security_group.id: sg_doc}) | |
| for region_name in region_dict: | |
| filename = path + "/" + region_name + "_unused_sg.json" | |
| with open(filename, "w") as outfile: | |
| outfile.write(json.dumps(region_dict, indent=2)) | |
| if args.delete: | |
| print("We will now delete security groups identified to not be in use.") | |
| dry_run_deletes = 0 | |
| for group in delete_candidates: | |
| security_group = ec2.SecurityGroup(group) | |
| try: | |
| if args.dry_run: | |
| security_group.delete(DryRun=True) | |
| else: | |
| security_group.delete() | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "DependencyViolation": | |
| print( | |
| "{0} requires manual remediation. DependencyViolation".format( | |
| security_group.group_name | |
| ) | |
| ) | |
| elif e.response["Error"]["Code"] == "DryRunOperation": | |
| dry_run_deletes += 1 | |
| else: | |
| print("{0} requires manual remediation.".format(security_group.group_name)) | |
| else: | |
| if args.quiet and len(delete_candidates) == 0: | |
| continue | |
| else: | |
| table = PrettyTable(["Region", "VPC ID", "SecurityGroup ID", "SecurityGroup Name"]) | |
| table.align["SecurityGroup ID"] = "l" | |
| table.align["VPC ID"] = "c" | |
| table.align["SecurityGroup Name"] = "l" | |
| table.sortby = "VPC ID" | |
| for group in sorted(delete_candidates): | |
| security_group = ec2.SecurityGroup(group) | |
| table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name]) | |
| print( | |
| table.get_string( | |
| title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
| ) | |
| ) | |
| if args.quiet: | |
| continue | |
| else: | |
| summary_table = PrettyTable(["Category Evaluated", "Count"]) | |
| summary_table.align["Category Evaluated"] = "l" | |
| summary_table.align["Count"] = "r" | |
| summary_table.add_row(["Total Security Groups", total_groups]) | |
| summary_table.add_row(["Total EC2 Instances", len(reservations)]) | |
| summary_table.add_row( | |
| [ | |
| "Total Load Balancers", | |
| len(elb_dict["LoadBalancerDescriptions"]) + len(elb2_dict["LoadBalancers"]), | |
| ] | |
| ) | |
| summary_table.add_row(["Total RDS Instances", len(rds_dict["DBInstances"])]) | |
| summary_table.add_row(["Total Network Interfaces", len(eni_dict["NetworkInterfaces"])]) | |
| summary_table.add_row(["In-Use Security Groups", len(set(security_groups_in_use))]) | |
| summary_table.add_row(["Security Groups Excluded by Tag", len(tag_excluded_sgs)]) | |
| summary_table.add_row(["---", "---"]) | |
| if args.dry_run: | |
| summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) | |
| elif args.delete: | |
| summary_table.add_row(["Unused SG Deleted", len(delete_candidates)]) | |
| else: | |
| summary_table.add_row(["Unused SG to Delete", len(delete_candidates)]) | |
| print(summary_table.get_string(title="Summary")) | |
| except KeyboardInterrupt: | |
| print("\nCtrl+C Caught, Terminating") |
I ran into this issue also. Change line 244 to:
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])
@josharrington
Thanks for the notes, updated!
Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]IndexError: list index out of range
@yinghan1221 The script is expecting that you've set an account alias on your AWS account, like a friendly name, add one to it and it should work for you
It looks like the Dry Run option is broken? This is what I get when using --dry-run.
Traceback (most recent call last): File ".\security-group-cleanup.py", line 271, in <module> summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) NameError: name 'dry_run_deletes' is not defined
Honestly though I'm not sure why the dry-run code even exists...the default mode without specify the delete argument is a dry-run....
Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?
IndexError: list index out of range