Last active
June 1, 2020 13:06
-
-
Save gannino/231e9ad52571fdb095b8300eee292ba6 to your computer and use it in GitHub Desktop.
Python script to tag AMI, Snapshot and Volumes the one not used by instances or AMI will get the Name tag with a Prefixed UNUSED at first run, when run again if the TAG start with UNUSED will delete the resource
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# most credit to the original: https://gist.github.com/brandond/6b4d22eaefbd66895f230f68f27ee586 & https://gist.github.com/danpritts/1089d878a76b14393478a7476476f97b | |
# Python script to tag AMI, Snapshot and Volumes the one not used by instances or AMI | |
# will get the Name tag with a Prefixed UNUSED at first run | |
# when run again if the TAG start with UNUSED will delete the resource | |
# Export the below variable to point Profile and region | |
# export AWS_PROFILE=aws-profile | |
# export AWS_DEFAULT_REGION=eu-west-3 | |
# export AWS_DEFAULT_REGION=eu-west-2 | |
# export AWS_DEFAULT_REGION=eu-central-1 | |
# export AWS_DEFAULT_REGION=eu-west-1 | |
# python3 tag-ami-snap-vol-and-clean-not-used-v2.py | |
import copy | |
import logging | |
import os | |
import boto3 | |
import time | |
from ratelimiter import RateLimiter | |
from datetime import datetime | |
import datetime | |
# import dateparser? | |
# dateparser.parse('12-12-12') | |
#https://dateparser.readthedocs.io/en/latest/ | |
logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'INFO')) | |
ec2 = boto3.client('ec2') | |
logger = logging.getLogger(__name__) | |
def tag_ami(): | |
image = {} | |
for image in ec2.describe_images(Owners=['self'])['Images']: | |
tags = extract_tags(image.get('Tags', [])) | |
cur_tags = extract_tags(image.get('Tags', [])) | |
new_tags = copy.deepcopy(cur_tags) | |
new_tags.update(tags) | |
new_tags['Name'] = 'AMI:' + image['Name'] | |
if new_tags != cur_tags: | |
logger.info('{0}: Tags changed to {1}'.format(image['ImageId'], new_tags)) | |
ec2.create_tags(Resources=[image['ImageId']], Tags=load_tags(new_tags)) | |
# imagecreation = image.get('CreationDate', []) | |
# today = datetime.datetime.now().strftime("%Y-%m-%d") | |
# print(today) | |
# print(imagecreation) | |
def tag_snapshots_and_clean_not_used(): | |
snapshots = {} | |
for response in ec2.get_paginator('describe_snapshots').paginate(OwnerIds=['self']): | |
snapshots.update([(snapshot['SnapshotId'], snapshot) for snapshot in response['Snapshots']]) | |
for image in ec2.describe_images(Owners=['self'])['Images']: | |
tags = extract_tags(image.get('Tags', [])) | |
devnum = 0 | |
numberofdevs=len(image['BlockDeviceMappings']) | |
try: | |
for device in image['BlockDeviceMappings']: | |
if 'SnapshotId' in device['Ebs']: | |
devnum += 1 | |
snapshot = snapshots[device['Ebs']['SnapshotId']] | |
snapshot['Used'] = True | |
cur_tags = extract_tags(snapshot.get('Tags', [])) | |
new_tags = copy.deepcopy(cur_tags) | |
new_tags.update(tags) | |
new_tags['ImageId'] = image['ImageId'] | |
# here's where to change formatting | |
new_tags['Name'] = 'AMI:' + image['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')' | |
if new_tags != cur_tags: | |
logger.info('{0}: Tags changed to {1}'.format(snapshot['SnapshotId'], new_tags)) | |
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=load_tags(new_tags)) | |
except Exception as error: | |
logger.info('Not an EBS Device! Skipping {0} for Image: {1} as {2} - Python Error: {3}'.format(device['DeviceName'],image['ImageId'],device['VirtualName'],error)) | |
for snapshot in snapshots.values(): | |
if 'Used' not in snapshot: | |
cur_tags = extract_tags(snapshot.get('Tags', [])) | |
name = cur_tags.get('Name', snapshot['SnapshotId']) | |
if name.startswith('UNUSED'): | |
logger.warning('Deleting not used snapshot: {0}'.format(snapshot['SnapshotId'])) | |
ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'] ) | |
if not name.startswith('UNUSED'): | |
logger.warning('{0} Unused! Updating tag'.format(snapshot['SnapshotId'])) | |
cur_tags['Name'] = 'UNUSED ' + name | |
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=load_tags(cur_tags)) | |
def tag_volumes_and_clean_not_used(): | |
volumes = {} | |
for response in ec2.get_paginator('describe_volumes').paginate(): | |
volumes.update([(volume['VolumeId'], volume) for volume in response['Volumes']]) | |
for response in ec2.get_paginator('describe_instances').paginate(): | |
for reservation in response['Reservations']: | |
for instance in reservation['Instances']: | |
tags = extract_tags(instance.get('Tags', [])) | |
# to tag the number volumes on an instance, e.g., (1/3) (2/3) (3/3) | |
devnum = 0 | |
numberofdevs=len(instance['BlockDeviceMappings']) | |
for device in instance['BlockDeviceMappings']: | |
devnum += 1 | |
volume = volumes[device['Ebs']['VolumeId']] | |
volume['Used'] = True | |
cur_tags = extract_tags(volume.get('Tags', [])) | |
new_tags = copy.deepcopy(cur_tags) | |
new_tags.update(tags) | |
# here's where to change formatting | |
new_tags['Name'] = tags['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')' | |
if new_tags != cur_tags: | |
logger.info('{0} Tags changed to {1}'.format(volume['VolumeId'], new_tags)) | |
ec2.create_tags(Resources=[volume['VolumeId']], Tags=load_tags(new_tags)) | |
for volume in volumes.values(): | |
if 'Used' not in volume: | |
cur_tags = extract_tags(volume.get('Tags', [])) | |
name = cur_tags.get('Name', volume['VolumeId']) | |
if name.startswith('UNUSED'): | |
logger.warning('Deleting not used volume: {0}'.format(volume['VolumeId'])) | |
ec2.delete_volume(VolumeId=volume['VolumeId'] ) | |
if not name.startswith('UNUSED'): | |
logger.warning('{0} Unused! Updating tag'.format(volume['VolumeId'])) | |
cur_tags['Name'] = 'UNUSED ' + name | |
ec2.create_tags(Resources=[volume['VolumeId']], Tags=load_tags(cur_tags)) | |
@RateLimiter(max_calls=5, period=1) | |
def tag_everything(): | |
tag_ami() | |
tag_snapshots_and_clean_not_used() | |
tag_volumes_and_clean_not_used() | |
def extract_tags(tags): | |
return {tag["Key"]: tag["Value"] for tag in tags if not tag["Key"].startswith("aws:")} | |
def load_tags(tags): | |
return [{"Key": k, "Value": v} for k, v in tags.items() if not k.startswith("aws:")] | |
def handler(event, context): | |
tag_everything() | |
if __name__ == '__main__': | |
tag_everything() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment