-
-
Save thedoc31/82cd1ad04ad279700be166cc9f059845 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""fix_s3_owner_permissions""" | |
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import argparse | |
# import sys | |
import logging | |
import botocore | |
import boto3 | |
DOCUMENTATION = ''' | |
--- | |
module: fix_s3_owner_permissions.py | |
short_description: Adds bucket-owner-full-control permissions to objects in S3 | |
description: | |
- Iterates through an S3 bucket to grant bucket-owner-full-control permissions on any object which is not | |
- owned by the current account | |
author: | |
- Adapted from an original script by Ami Mahloof (https://gist.github.com/innovia/218a8214a2a94286ff9e8dd690940960) | |
- J. Casalino | |
notes: [] | |
options: | |
bucket: | |
description: Name of the S3 bucket (without s3://) | |
required: true | |
default: null | |
prefix: | |
description: Prefix of the object(s) in s3 you want to check | |
required: true | |
default: null | |
debug: | |
description: Adds additional DEBUG logging from both the script and AWS responses | |
required: false | |
default: false | |
''' | |
EXAMPLES = ''' | |
''' | |
RETURN = r''' # ''' | |
CLIENT = boto3.client('s3') | |
failures = [] | |
def main(args): | |
"""Main""" | |
### Set up logging | |
if args.debug: | |
log_level = 'DEBUG' | |
else: | |
log_level = 'INFO' | |
log_level = getattr(logging, log_level, None) | |
if not isinstance(log_level, int): | |
raise ValueError('Invalid log level: %s' % log_level) | |
logging.basicConfig(level=log_level, | |
filename='./fix_s3_owner_permissions.log', | |
format='%(asctime)s: %(name)s - %(levelname)s - %(message)s', | |
datefmt='%d-%b-%y %H:%M:%S') | |
logging.info("==========") | |
bucket = args.bucket | |
prefix = args.prefix | |
kwargs = {'Bucket': bucket, 'Prefix': prefix, 'FetchOwner': True} | |
items_checked = 0 | |
items_fixed = 0 | |
# Obtain the owner of the target bucket; we need to check if the bucket owner | |
# has access to the object | |
try: | |
aws_owner_id = CLIENT.get_bucket_acl(Bucket=bucket)['Owner']['ID'] | |
except botocore.exceptions.ClientError as error: | |
if error.response['Error']['Code'] == 'LimitExceededException': | |
logging.warning('API call limit exceeded; backing off and retrying...') | |
else: | |
raise error | |
# Set up a boto3 paginator to loop through the list_objects_v2 function for | |
# all matching prefixes automatically | |
try: | |
paginator = CLIENT.get_paginator('list_objects_v2') | |
pages = paginator.paginate(**kwargs) | |
except botocore.exceptions.ClientError as error: | |
if error.response['Error']['Code'] == 'LimitExceededException': | |
logging.warning('API call limit exceeded; backing off and retrying...') | |
else: | |
raise error | |
except botocore.exceptions.ParamValidationError as error: | |
raise ValueError('The parameters you provided are incorrect: {}'.format(error)) | |
# Work on the pages one page at a time until we've gone through all objects with | |
# specified prefix | |
for resp in pages: | |
for obj in resp['Contents']: | |
logging.info(obj['Key']) | |
items_checked += 1 | |
if not obj['Owner']['ID'] == aws_owner_id: | |
if not check_acl(bucket=bucket, key=obj['Key'], owner=aws_owner_id): | |
logging.info(" -- ACL does not grant access to current account, fixing.") | |
set_acl(bucket=bucket, key=obj['Key']) | |
items_fixed += 1 | |
else: | |
logging.debug(" -- OK, bucket owner already has access") | |
else: | |
logging.debug(" -- Skipped; bucket owner owns object") | |
# Log any failures | |
logging.info("*** FAILURES: {}".format('\n'.join(map(str, failures)))) | |
logging.info("*** %s objects total checked, %s total objects fixed", | |
str(items_checked), str(items_fixed)) | |
logging.info("========== RUN COMPLETE") | |
def check_acl(bucket, key, owner): | |
"""Gets ACL on target key and checks against owner of bucket""" | |
try: | |
object_acl = CLIENT.get_object_acl( | |
Bucket=bucket, | |
Key=key | |
) | |
except botocore.exceptions.ClientError as error: | |
if error.response['Error']['Code'] == 'LimitExceededException': | |
logging.warning('API call limit exceeded; backing off and retrying...') | |
else: | |
failures.append(key) | |
raise error | |
owner_is_present = False | |
logging.debug("Object ACL: %s", object_acl) | |
for grant in object_acl['Grants']: | |
if grant['Grantee']['ID'] == owner: | |
owner_is_present = True | |
break | |
return owner_is_present | |
def set_acl(bucket, key): | |
"""Sets ACL on target key to match owner of current bucket""" | |
try: | |
CLIENT.put_object_acl( | |
ACL='bucket-owner-full-control', | |
Bucket=bucket, | |
Key=key | |
) | |
except botocore.exceptions.ClientError as error: | |
if error.response['Error']['Code'] == 'LimitExceededException': | |
logging.warning('API call limit exceeded; backing off and retrying...') | |
else: | |
failures.append(key) | |
raise error | |
if __name__ == "__main__": | |
PARSER = argparse.ArgumentParser( | |
description="Set S3 ACL on bucket to current AWS account owner" | |
) | |
PARSER.add_argument( | |
"--bucket", | |
help="<required> S3 Bucket name.", | |
required=True | |
) | |
PARSER.add_argument( | |
"--prefix", | |
help="<required> S3 prefix to set permissions recursively on.", | |
required=True | |
) | |
PARSER.add_argument( | |
"--debug", | |
help="<optional> Enable debug logging", | |
required=False | |
) | |
main(PARSER.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist is adapted from an original script by Ami Mahloof (https://gist.github.com/innovia/218a8214a2a94286ff9e8dd690940960). It has been heavily modified to better handle giant buckets with hundreds of millions of objects in them. It minimizes AWS operations by checking to see if the object is already owned by the bucket's owner or has the ACL applied.
It adds use of the python logging module, which enables a debug mode so you can easily see what boto3 is doing in each step.
It also adds some exception handling in case unexpected things occur.
The script assumes: