|
#!/usr/bin/env python3 |
|
from configparser import ConfigParser |
|
import json |
|
import os |
|
import subprocess |
|
import sys |
|
|
|
CREDENTIALS_PATH = os.path.join(os.path.expanduser("~"), '.aws', 'credentials') |
|
CONFIG_PATH = os.path.join(os.path.expanduser("~"), '.aws', 'config') |
|
CACHE_DIR = os.path.join(os.path.expanduser("~"), '.aws', 'cli', 'cache') |
|
|
|
|
|
def read_credentials_file(): |
|
with open(CREDENTIALS_PATH, 'r') as f: |
|
return f.read() |
|
|
|
|
|
def update_content(content, sso_creds): |
|
magic_header = '### GENERATED, DO NOT EDIT BELOW\n' |
|
defaultSection = ( |
|
f'{magic_header}\n[default]\n' |
|
f'aws_access_key_id={sso_creds["AccessKeyId"]}\n' |
|
f'aws_secret_access_key={sso_creds["SecretAccessKey"]}\n' |
|
f'aws_session_token={sso_creds["SessionToken"]}\n' |
|
) |
|
content = read_credentials_file() |
|
if magic_header in content: |
|
content = content[0:content.index(magic_header)] |
|
else: |
|
content += '\n' |
|
return content + defaultSection |
|
|
|
|
|
def write_credentials_file(content): |
|
with open(CREDENTIALS_PATH, 'w') as f: |
|
f.write(content) |
|
|
|
|
|
def update_aws_credentials_file(sso_creds): |
|
content = read_credentials_file() |
|
updated = update_content(content, sso_creds) |
|
write_credentials_file(updated) |
|
|
|
|
|
def list_credentials_cache_files(): |
|
if not os.path.exists(CACHE_DIR): |
|
return [] |
|
all_files = [os.path.join(CACHE_DIR, f) for f in os.listdir(CACHE_DIR)] |
|
cache_files = [f for f in all_files if f.endswith('.json')] |
|
return cache_files |
|
|
|
|
|
def clear_credentials_cache_files(): |
|
for f in list_credentials_cache_files(): |
|
os.remove(f) |
|
|
|
|
|
def read_credentials_from_file(filename): |
|
with open(filename, 'r') as f: |
|
obj = json.load(f) |
|
return obj['Credentials'] |
|
|
|
|
|
def already_logged_in(profile): |
|
cmd = f'aws sts get-caller-identity --profile {profile} 2>&1 >/dev/null' |
|
return subprocess.call(cmd, shell=True) == 0 |
|
|
|
|
|
def list_sso_profiles(): |
|
config = ConfigParser() |
|
config.read(CONFIG_PATH) |
|
result = [] |
|
for section in config.sections(): |
|
if config.has_option(section, 'sso_start_url'): |
|
section = section.replace('profile ', '') |
|
result.append(section) |
|
return result |
|
|
|
|
|
def login(profile): |
|
if not already_logged_in(profile): |
|
subprocess.call(f'aws sso login --profile {profile}', shell=True) |
|
if not already_logged_in(profile): |
|
print('Something went wrong') |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == '__main__': |
|
sso_profiles = list_sso_profiles() |
|
if len(sys.argv) != 2 or sys.argv[1] not in sso_profiles: |
|
print(f'Available profiles: {sso_profiles}') |
|
sys.exit(1) |
|
profile = sys.argv[1] |
|
clear_credentials_cache_files() |
|
login(profile) |
|
cache_files = list_credentials_cache_files() |
|
creds = read_credentials_from_file(cache_files[0]) |
|
update_aws_credentials_file(creds) |
|
print(f'Logged in to profile: {profile}') |