Created
July 24, 2023 21:21
-
-
Save jewelsjacobs/8dc644ea35f9cd7b3891fc47924506a1 to your computer and use it in GitHub Desktop.
LoadConsoleData Python Lambda App
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
import coto | |
from botocore.exceptions import ClientError | |
import logging | |
from pythonjsonlogger import jsonlogger | |
from datetime import datetime | |
import json | |
class DateTimeEncoder(json.JSONEncoder): | |
def default(self, o): | |
if isinstance(o, datetime): | |
return o.isoformat() | |
return json.JSONEncoder.default(self, o) | |
session = coto.Session( | |
boto3_session=boto3.Session() | |
) | |
def setup_logging(log_level): | |
""" structured json logger handler | |
Include pythonjsonlogger package or create layer | |
https://github.com/madzak/python-json-logger | |
""" | |
logger = logging.getLogger() | |
# Testing showed lambda sets up one default handler. If there are more, | |
# something has changed and we want to fail so an operator can investigate. | |
assert len(logger.handlers) == 1 | |
logger.setLevel(log_level) | |
json_handler = logging.StreamHandler() | |
formatter = jsonlogger.JsonFormatter( | |
fmt='%(asctime)s %(levelname)s %(name)s %(message)s' | |
) | |
json_handler.setFormatter(formatter) | |
logger.addHandler(json_handler) | |
logger.removeHandler(logger.handlers[0]) | |
setup_logging(logging.INFO) | |
logger = logging.getLogger() | |
def create_session(account_number, role_name): | |
logger.info("Assuming role {} for within account {}.".format(role_name, account_number)) | |
try: | |
# Beginning the assume role process for account | |
sts_client = boto3.client('sts') | |
# Get the current partition | |
partition = sts_client.get_caller_identity()['Arn'].split(":")[1] | |
# Assume the role within the account | |
response = sts_client.assume_role( | |
RoleArn='arn:{}:iam::{}:role/{}'.format(partition, account_number, role_name), | |
RoleSessionName=role_name | |
) | |
# Storing account credentials into coto session https://github.com/sentialabs/coto | |
logger.info("Granted as role {} within account {}, storing temporary credentials.".format(role_name, | |
account_number)) | |
session = boto3.Session( | |
aws_access_key_id=response['Credentials']['AccessKeyId'], | |
aws_secret_access_key=response['Credentials']['SecretAccessKey'], | |
aws_session_token=response['Credentials']['SessionToken'] | |
) | |
logger.info("Credentials for session in the account {} established.".format(account_number)) | |
return session | |
except ClientError as error: | |
logger.error(error) | |
return False | |
except Exception as error: | |
logger.error(error) | |
return False | |
def get_contact_info(): | |
try: | |
client = session.client('billing') | |
list_alternate_contacts = client.list_alternate_contacts() | |
alt_contacts=[] | |
for contact in list_alternate_contacts: | |
alt_contacts.append({ 'M' : { | |
"name": { | |
"S": contact["name"] | |
}, | |
"contactType": { | |
"S": contact["contactType"] | |
}, | |
"contactId": { | |
"S": str(contact["contactId"]) | |
}, | |
"email": { | |
"S": contact["email"] | |
}, | |
"phoneNumber": { | |
"S": contact["phoneNumber"] | |
}, | |
"title": { | |
"S": contact["title"] | |
} | |
}}) | |
alt_contact_items={ 'M': { | |
'AlternateContacts': { | |
'L': alt_contacts | |
} | |
} | |
} | |
logger.info(alt_contact_items) | |
return alt_contact_items | |
except ClientError as error: | |
logger.error(error) | |
return False | |
def get_member_account_ids(): | |
try: | |
organizations = boto3.client('organizations') | |
response = organizations.list_accounts() | |
results = response["Accounts"] | |
while "NextToken" in response: | |
response = organizations.list_accounts(NextToken=response["NextToken"]) | |
results.extend(response["Accounts"]) | |
accounts = [] | |
for result in results: | |
accounts.append(result['Id']) | |
logger.info(accounts) | |
return accounts | |
except ClientError as error: | |
logger.error(error) | |
return False | |
def get_control_info(account_id, contacts_db, dynamo_db_session): | |
try: | |
client = dynamo_db_session.client('dynamodb') | |
response = client.query( | |
TableName=contacts_db, | |
IndexName='Account-Index', | |
FilterExpression='evidencetype = :evidencetype', | |
KeyConditionExpression='accountid = :accountid and begins_with (accountidcontrol, :accountid)', | |
ProjectionExpression='#id, controlaccountid', | |
ExpressionAttributeNames={"#id": "ID"}, | |
ExpressionAttributeValues={ | |
':accountid': {'S': str(account_id)}, | |
':evidencetype': {'S': "alternatecontacts"} | |
} | |
) | |
return response['Items'] | |
except ClientError as error: | |
logger.error(error) | |
return False | |
def update(contact_info, control, contacts_db, dynamo_db_session): | |
try: | |
client = dynamo_db_session.client('dynamodb') | |
response = client.update_item( | |
TableName=contacts_db, | |
Key={ | |
'ID': control["ID"], | |
'controlaccountid': control["controlaccountid"] | |
}, | |
UpdateExpression='set evidence = :evidence, configrule = :configrule, arn = :arn, #ST = :compliancestatus', | |
ExpressionAttributeNames={'#ST': 'status'}, | |
ExpressionAttributeValues={ | |
':evidence': contact_info, | |
':configrule': {'S': "NA"}, | |
':arn': {'S': "NA"}, | |
':compliancestatus': {'S': "COMPLIANT"} | |
}, | |
ReturnValues="UPDATED_NEW" | |
) | |
return response | |
except ClientError as error: | |
logger.error(error) | |
return False | |
def lambda_handler(event, context): | |
# logger.info("boto3 version: {}".format(boto3.__version__)) | |
# logger.info('## EVENT') | |
# logger.info(event) | |
# logger.info('## ENVIRONMENT VARIABLES') | |
# logger.info("ContactsDB: {}".format(os.environ['ContactsDB'])) | |
contacts_db = os.environ['ContactsDB'] | |
# logger.info("SecHubRole: {}".format(os.environ['SecHubRole'])) | |
dynamo_db_role = os.environ['DynamoDBRole'] | |
# logger.info("DynamoDBRole: {}".format(os.environ['DynamoDBRole'])) | |
security_account_id = os.environ['SecurityAccountID'] | |
try: | |
dynamo_db_session = create_session(security_account_id, dynamo_db_role) | |
account_ids = get_member_account_ids() | |
for account_id in account_ids: | |
controls = get_control_info(account_id, contacts_db, dynamo_db_session) | |
for control in controls: | |
contact_info = get_contact_info() | |
logger.info(contact_info) | |
item = update(contact_info, control, contacts_db, dynamo_db_session) | |
logger.info(item) | |
return True | |
except ClientError as error: | |
logger.error(error) | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment