Skip to content

Instantly share code, notes, and snippets.

@asaf400
Last active January 14, 2025 09:17
Show Gist options
  • Save asaf400/709caf609534f4596fee6a4d104c36b5 to your computer and use it in GitHub Desktop.
Save asaf400/709caf609534f4596fee6a4d104c36b5 to your computer and use it in GitHub Desktop.
This script is meant to be used as a backup, restore and convertion tool for AWS SSO (Identity Store & Identity Center)
#!/bin/env python
# AUTHOR: github.com/asaf400
# This script is meant to be used as a backup, restore and convertion tool for AWS SSO (Identity Store & Identity Center)
# Currently AWS does not provide any way to backup the following resources:
# Users & Groups in the Identity Store API,
# The Principal Assignment with an AWS Account and PermissionSet in the SSO Admin API
# The Permissions sets and their inline policies
# hench I created this script which allows for a backup in a relatively Human Readble JSON format
# it is able to backup to:
# python .pickle file
# .json file
# Terraform via the tf.json format (I didn't find any json2hcl V2 converter)
# and it is able to restore the entire 'combined store' of users, groups, permissions and assignment
# in case of an sso nuke being dropped, like the one this aws project drops: https://github.com/awslabs/ssosync
# (if implemented after manual groups and users have been created, the project might DROP the users or groups, depending on filters.. with no way to restore them!!!)
# the requirements for this script should be extrapolated from imports..
# The script was built over time with many other projects going around, so it might not be purly pythonic in nature, and may contains dragons..
# Script was tested on Python11 [on Windows + Python10 had a wierd access violation errors on random runs]
# SCRIPT IS DISTRIBUTED AS IS, NOT SUPPORT REQUESTS.
import os.path
import sys
import time
import boto3
import botocore.errorfactory
import json_tricks as json
import pickle
import tempfile
import argparse
import collections
import hashlib
import copy
parser = argparse.ArgumentParser()
parser.add_argument('--store-id', type=str)
parser.add_argument('--action', type=str, choices=['backup', 'restore', 'both', 'convert'])
parser.add_argument('--region', '--region-name', type=str)
parser.add_argument('--profile', type=str)
parser.add_argument('--pickle', default=False, action='store_true')
parser.add_argument('--json', default=False, action='store_true')
parser.add_argument('--secretsmanager', default=False, action='store_true')
parser.add_argument('--terraform', default=False, action='store_true')
parser.add_argument('--output-dir', type=str, default=tempfile.gettempdir())
arguments = vars(parser.parse_args(sys.argv[1:]))
arguments['output_dir']=os.path.normpath(os.path.expandvars(os.path.expanduser(arguments['output_dir'])))
session = boto3.session.Session(region_name=arguments['region'], profile_name=arguments['profile'])
identitystore = session.client('identitystore')
sso_admin = session.client('sso-admin')
organizations = session.client('organizations')
secretsmanager = session.client('secretsmanager')
store_id = arguments['store_id']
def sanitize_object(self: dict, keys=None):
bad_keys = ['GroupId', 'UserId', 'MembershipId', 'CreatedDate', 'PrincipalId', 'PermissionSetArn',
'IdentityStoreId']
if keys:
bad_keys.extend(keys)
def scrub(obj, bad_key="_this_is_bad"):
if isinstance(obj, dict):
# the call to `list` is useless for py2 but makes
# the code py2/py3 compatible
for key in list(obj.keys()):
if key == bad_key:
del obj[key]
else:
scrub(obj[key], bad_key)
elif isinstance(obj, list):
for i in reversed(range(len(obj))):
if obj[i] == bad_key:
del obj[i]
else:
scrub(obj[i], bad_key)
else:
# neither a dict nor a list, do nothing
pass
for key in bad_keys:
scrub(self, key)
def invoke_aws_pagination(boto3_client, paginator_call, results, **kwargs):
paginator = boto3_client.get_paginator(paginator_call)
response_iterator = paginator.paginate(
PaginationConfig={
'MaxItems': 200,
'PageSize': 10
},
**kwargs
)
responses = []
for response in response_iterator:
responses.append(response)
return [y for x in responses for y in x[results]]
def invoke_aws(boto3_client, paginator_call, results=None, **kwargs):
print(f"Invoking '{boto3_client.meta.service_model.service_name}.{paginator_call}'")
response = getattr(boto3_client, paginator_call)(**kwargs)
if results and results in response:
return response[results]
return response
def pprint(obj):
print(json.dumps(obj, indent=2, sort_keys=True, default=str))
def read_identity_store():
combined_store = {'accounts': {}}
groups = invoke_aws_pagination(identitystore, 'list_groups', 'Groups', IdentityStoreId=store_id)
all_permission_sets = {}
users = invoke_aws_pagination(identitystore, 'list_users', 'Users',
IdentityStoreId=store_id,
MaxResults=50
)
combined_store['groups'] = {x['DisplayName']: x for x in groups}
combined_store['users'] = {x['UserName']: x for x in users}
combined_store['permission_sets'] = all_permission_sets
for account in accounts:
combined_store['accounts'][account] = {**accounts[account]}
# accounts[account].update()
sets = invoke_aws(sso_admin, 'list_permission_sets_provisioned_to_account', 'PermissionSets',
InstanceArn=instance_arn,
AccountId=accounts[account]['id'],
MaxResults=100
)
for i, pset in enumerate(sets):
set_description = invoke_aws(sso_admin, 'describe_permission_set', 'PermissionSet',
InstanceArn=instance_arn,
PermissionSetArn=pset
)
sets[i] = set_description
account_assignments = invoke_aws_pagination(sso_admin, 'list_account_assignments', 'AccountAssignments',
InstanceArn=instance_arn,
AccountId=accounts[account]['id'],
PermissionSetArn=pset,
MaxResults=100
)
for i, account_assignment in enumerate(copy.deepcopy(account_assignments)):
if account_assignment['PrincipalType'] == 'GROUP':
detected_groups = [x for x in groups if
account_assignment['PrincipalType'] == 'GROUP' and account_assignment[
'PrincipalId'] == x['GroupId']]
if detected_groups:
account_assignments[i] = {**account_assignment, **detected_groups[0], 'AccountName': account}
# else:
# detected_users = [x for x in users if
# account_assignment['PrincipalType'] == 'USER' and x['UserId'] ==
# account_assignment['PrincipalId']]
# if detected_users:
# print("WTF")
# account_assignments[i] = {'AccountName': account, 'UserName': detected_users[0]['UserName'],
# **account_assignment
# }
set_description.update({"AccountAssignments": account_assignments})
if set_description['Name'] in all_permission_sets:
all_permission_sets[set_description['Name']]['AccountAssignments'].extend(account_assignments)
else:
all_permission_sets[set_description['Name']] = set_description
sets = {x['Name']: x for x in sets}
combined_store['accounts'][account].update({"PermissionSets": sets})
for name, permission_set in all_permission_sets.items():
managed_policies = sso_admin.list_managed_policies_in_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=permission_set['PermissionSetArn'],
MaxResults=100
)['AttachedManagedPolicies']
managed_policies = {x['Name']: x['Arn'] for x in managed_policies}
inline_policy = invoke_aws(sso_admin, 'get_inline_policy_for_permission_set', 'InlinePolicy',
InstanceArn=instance_arn,
PermissionSetArn=permission_set['PermissionSetArn']
)
if inline_policy:
inline_policy = json.loads(inline_policy)
else:
inline_policy = {}
all_permission_sets[name] = {**permission_set, 'managed_policies': managed_policies,
'inline_policy': inline_policy}
for group in groups:
list_group_members = invoke_aws_pagination(identitystore, 'list_group_memberships', 'GroupMemberships',
IdentityStoreId=store_id,
GroupId=group['GroupId'],
MaxResults=50
)
group['GroupMemberships'] = []
for member in list_group_members:
user = [x for x in users if x['UserId'] == member['MemberId']['UserId']][0]
group['GroupMemberships'].append(user['UserName'])
return combined_store
def write_identity_store(loadpath):
# with open(loadpath + ".pickle", 'rb') as fp:
# combined_store = pickle.load(fp)
with open(loadpath + ".json", 'r') as fp:
combined_store = json.load(fp)
sanitize_object(combined_store)
permission_sets_list = invoke_aws_pagination(sso_admin, 'list_permission_sets', 'PermissionSets',
InstanceArn=instance_arn)
users = invoke_aws_pagination(identitystore, 'list_users', 'Users',
IdentityStoreId=store_id,
MaxResults=50
)
groups = invoke_aws_pagination(identitystore, 'list_groups', 'Groups', IdentityStoreId=store_id)
users = {x['UserName']: x for x in users}
groups = {x['DisplayName']: x for x in groups}
permission_sets = {}
for i, permission_set in enumerate(permission_sets_list):
pset = invoke_aws(sso_admin, 'describe_permission_set', 'PermissionSet',
InstanceArn=instance_arn,
PermissionSetArn=permission_set
)
permission_sets.update({pset['Name']: pset})
restore_resources(combined_store_resource=combined_store['permission_sets'], resources_from_api=permission_sets,
api_object_name='PermissionSet', api=sso_admin,
api_call='create_permission_set', pretty_name='Permission Set',
bad_keys=['AccountAssignments', 'managed_policies', 'inline_policy'],
exists_exception=sso_admin.exceptions.ConflictException, InstanceArn=instance_arn)
restore_resources(combined_store_resource=combined_store['users'], resources_from_api=users,
api_object_name='Users', api=identitystore, api_call='create_user', pretty_name='Users',
bad_keys=None,
exists_exception=identitystore.exceptions.ConflictException, IdentityStoreId=store_id)
restore_resources(combined_store_resource=combined_store['groups'], resources_from_api=groups,
api_object_name='Groups', api=identitystore, api_call='create_group', pretty_name='Groups',
bad_keys=['GroupMemberships'],
exists_exception=identitystore.exceptions.ConflictException, IdentityStoreId=store_id)
# Restore Group and User memberships.
for group in combined_store['groups']:
for user in combined_store['groups'][group]['GroupMemberships']:
try:
invoke_aws(identitystore, 'create_group_membership',
GroupId=combined_store['groups'][group]['GroupId'],
MemberId={'UserId': combined_store['users'][user]['UserId']},
IdentityStoreId=store_id)
except identitystore.exceptions.ConflictException as e:
print(f"Resource: GroupMembership for '{group}/{user}' already exists")
# enrich the combined_store with the existing data (populate the p_set id)
pass
for name, pset in combined_store['permission_sets'].items():
for account_assignment in pset['AccountAssignments']:
try:
if account_assignment['PrincipalType'] == 'GROUP':
principal_id = groups[account_assignment['DisplayName']]['GroupId']
display_name = account_assignment['DisplayName']
elif account_assignment['PrincipalType'] == 'USER':
continue
# principal_id = users[account_assignment['UserName']]['UserId']
# display_name=account_assignment['UserName']
response = invoke_aws(sso_admin, 'create_account_assignment', 'AccountAssignmentCreationStatus',
InstanceArn=instance_arn,
TargetId=account_assignment['AccountId'],
TargetType='AWS_ACCOUNT',
PermissionSetArn=permission_sets[name]['PermissionSetArn'],
PrincipalType=account_assignment['PrincipalType'],
PrincipalId=principal_id
)
count = 2
time.sleep(count)
status = invoke_aws(sso_admin, 'describe_account_assignment_creation_status',
'AccountAssignmentCreationStatus',
InstanceArn=instance_arn,
AccountAssignmentCreationRequestId=response['RequestId']
)
while status['Status'] == 'IN_PROGRESS':
count = count * 1.3
if count > 60:
count = 60
status = invoke_aws(sso_admin, 'describe_account_assignment_creation_status',
'AccountAssignmentCreationStatus',
InstanceArn=instance_arn,
AccountAssignmentCreationRequestId=response['RequestId']
)
if status['Status'] != 'SUCCEEDED':
print(status)
else:
print(
f"Resource: Account Assignment {account_assignment['AccountId']}-{account_assignment['AccountName']}/{name}/{display_name} Created or already existed")
except sso_admin.exceptions.ConflictException as e:
print(
f"Resource: Account Assignment {account_assignment['AccountId']}-{account_assignment['AccountName']}/{name}/{display_name} already exists")
# enrich the combined_store with the existing data (populate the p_set id)
combined_store['permission_sets'][name].update(permission_sets[name])
pass
pprint(combined_store)
def convert_store_to_tf(combined_store):
workdir=tempfile.mkdtemp()
print(workdir)
import terraformpy
combined_store=copy.deepcopy(combined_store)
sanitize_object(combined_store)
for name, pset in combined_store['permission_sets'].items():
groups = {x: [] for x in set([x['DisplayName'] for x in pset['AccountAssignments'] if 'DisplayName' in x])}
[groups[group].append(assignment['AccountId']) for group in groups for assignment in pset['AccountAssignments']
if 'DisplayName' in assignment and assignment['DisplayName'] == group]
pset['AccountAssignments']=[
{'principal_name':x,
'principal_type':'GROUP',
'permission_set':name,
'account_ids': groups[x]}
for x in groups]
module_object={name: {
'session_duration': pset['SessionDuration'],
'managed_policies': [x for x in pset['managed_policies'].values()]
}}
if 'Description' in pset:
module_object[name].update({'description': pset['Description']})
if 'inline_policy' in pset:
module_object[name].update({'inline_policy': json.dumps(pset['inline_policy'])})
module = terraformpy.Module(f'sso-{name}',
source='avlcloudtechnologies/sso/aws',
version='0.2.0',
permission_sets=module_object,
account_assignments=pset['AccountAssignments'])
with open(f'{workdir}/{name}.tf.json','w') as fp:
json.dump(module.compile(),fp,indent=2,sort_keys=True)
terraformpy.reset()
pass
def restore_resources(combined_store_resource, resources_from_api, api_object_name, api, api_call, pretty_name,
bad_keys,
exists_exception, **kwargs):
if isinstance(combined_store_resource, dict):
# TODO: can be merged into one method with users
for name, obj in combined_store_resource.items():
try:
obj = copy.deepcopy(obj)
sanitize_object(obj, bad_keys)
response = invoke_aws(api, api_call, api_object_name,
**obj,
**kwargs
)
combined_store_resource[name].update(response)
except exists_exception as e:
print(f"Resource: {pretty_name} '{name}' already exists")
# enrich the combined_store with the existing data (populate the p_set id)
combined_store_resource[name].update(resources_from_api[name])
pass
instance_arn = invoke_aws(sso_admin, 'list_instances', 'Instances', MaxResults=1)[0]['InstanceArn']
accounts = invoke_aws(organizations, 'list_accounts', 'Accounts', MaxResults=20)
accounts = {item['Name']: {'id': item['Id']} for item in accounts if item['Status'] == 'ACTIVE'}
if arguments['action'] in ['both', 'backup']:
combined_store = read_identity_store()
json_string = json.dumps(combined_store, indent=2, sort_keys=True, primitives=True)
if arguments['json']:
with open(arguments['output_dir'] + "/py_identity_store" + ".json", 'w') as fp:
fp.write(json_string)
if arguments['pickle']:
with open(arguments['output_dir'] + "/py_identity_store" + ".pickle", 'wb') as fp:
pickle.dump(combined_store, fp)
if arguments['secretsmanager']:
try:
sanitize_object(combined_store)
json_string = json.dumps(combined_store, primitives=True, sort_keys=True)
ClientRequestToken = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
invoke_aws(secretsmanager, 'create_secret',
Name='identitystore',
ClientRequestToken=ClientRequestToken,
Description='Backup of aws iam identity center (sso) & identity store',
SecretString=json_string
)
except secretsmanager.exceptions.ResourceExistsException as e:
ClientRequestToken = hashlib.sha256(json_string.encode('utf-8')).hexdigest()
invoke_aws(secretsmanager, 'put_secret_value',
SecretId='identitystore',
ClientRequestToken=ClientRequestToken,
SecretString=json_string
)
pass
if arguments['terraform']:
if arguments['action'] == 'convert':
with open(arguments['output_dir'] + "/py_identity_store" + ".json", 'r') as fp:
combined_store = json.load(fp)
convert_store_to_tf(combined_store)
if arguments['action'] in ['both', 'restore']:
write_identity_store(arguments['output_dir'])
print("All done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment