Skip to content

Instantly share code, notes, and snippets.

@hervekhg
Created January 17, 2020 17:10
Show Gist options
  • Save hervekhg/bd0283689d5c33f8e094bd37d851980c to your computer and use it in GitHub Desktop.
Save hervekhg/bd0283689d5c33f8e094bd37d851980c to your computer and use it in GitHub Desktop.
###############################################
## Author : Hervekhg
## Description: This Lambda function enforce encryption on unencrypted S3 Bucket
#####################################################
from boto3 import resource, client
from logging import getLogger, info, error, debug
from os import environ
from botocore.exceptions import ClientError
SSEAlgorithm = "aws:kms"
KMSMasterKeyID = environ['KMSMasterKeyID']
class Enforce_EBS_Encryption(object):
def __init__(self):
self.s3_client = client('s3')
self.logger = getLogger()
self.logger.setLevel("INFO")
self.unencryptedbucket = list()
def getlistofUnEncryptedBucket(self):
response = self.s3_client.list_buckets()
for bucket in response['Buckets']:
try:
resp_encryption = self.s3_client.get_bucket_encryption(
Bucket=bucket['Name']
)
rules = resp_encryption['ServerSideEncryptionConfiguration']['Rules']
info("{0} is already encrypted : Encryption : {1}".format(bucket['Name'],rules))
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
info("{0} is not encrypted but will be, No Encrytion found".format(bucket['Name']))
self.unencryptedbucket.append(bucket['Name'])
else:
error("Unexpected error on Bucket: {0}".format(bucket['Name']))
def _putEncryptiononSingleBucket(self,bucket_name):
resp = self.s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': SSEAlgorithm,
'KMSMasterKeyID': KMSMasterKeyID
}
},
]
}
)
def forceEncrytionOnUnEncryptedBucket(self):
for bucket in self.unencryptedbucket:
self._putEncryptiononSingleBucket(bucket)
info("The Bucket : {0} has been encrypted with KMS key".format(bucket))
def lambda_handler(event, context):
print("***** Start Processing ****")
s3_encryption = Enforce_EBS_Encryption()
s3_encryption.getlistofUnEncryptedBucket()
s3_encryption.forceEncrytionOnUnEncryptedBucket()
print("***** End Processing ****")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment