Skip to content

Instantly share code, notes, and snippets.

@AndrewMohawk
Created July 1, 2024 15:52
Show Gist options
  • Save AndrewMohawk/c99e4a569befa0b3e99e2b458b9dd3e9 to your computer and use it in GitHub Desktop.
Save AndrewMohawk/c99e4a569befa0b3e99e2b458b9dd3e9 to your computer and use it in GitHub Desktop.
Quick search from admin account to all other accounts to find SSH ports open and listening
import boto3
import subprocess
import os
def get_client(service, aws_access_key_id, aws_secret_access_key, aws_session_token, region_name='us-east-2'):
return boto3.client(service,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region_name=region_name)
def list_accounts(organizations_client):
accounts = []
paginator = organizations_client.get_paginator('list_accounts')
for page in paginator.paginate():
accounts.extend(page['Accounts'])
return accounts
def assume_role(sts_client, account_id, role_name):
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='CheckEC2Instances'
)
return response['Credentials']
def get_instances(ec2_client):
paginator = ec2_client.get_paginator('describe_instances')
instances = []
for page in paginator.paginate():
for reservation in page['Reservations']:
for instance in reservation['Instances']:
instances.append(instance)
return instances
def check_ssh_open(dns_name):
try:
result = subprocess.run(['nc', '-zv', dns_name, '22'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return True
except Exception as e:
print(f"Error checking SSH for {dns_name}: {e}")
return False
def main():
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_session_token = os.getenv('AWS_SESSION_TOKEN')
role_name = 'OrganizationAccountAccessRole' # Ensure this is the correct role name
organizations_client = get_client('organizations', aws_access_key_id, aws_secret_access_key, aws_session_token)
sts_client = get_client('sts', aws_access_key_id, aws_secret_access_key, aws_session_token)
accounts = list_accounts(organizations_client)
for account in accounts:
account_id = account['Id']
account_name = account['Name']
print(f"Checking account: {account_id} [ {account_name} ]")
try:
creds = assume_role(sts_client, account_id, role_name)
ec2_client = get_client('ec2', creds['AccessKeyId'], creds['SecretAccessKey'], creds['SessionToken'])
instances = get_instances(ec2_client)
print(f"Found {len(instances)} instances in account {account_id} [ {account_name} ]")
for instance in instances:
instance_id = instance['InstanceId']
instance_name = next((tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'), 'Unnamed')
public_ip = instance.get('PublicIpAddress')
public_dns = instance.get('PublicDnsName')
if public_ip or public_dns:
ip_or_dns = public_ip if public_ip else public_dns
print(f"Checking SSH for Instance: {instance_id} ({instance_name}) with IP/DNS: {ip_or_dns}")
if check_ssh_open(ip_or_dns):
print(f"Instance: {instance_id} ({instance_name}) is accessible via SSH.")
else:
print(f"Instance: {instance_id} ({instance_name}) is not accessible via SSH.")
else:
print(f"Instance: {instance_id} ({instance_name}) has no public IP or DNS.")
except Exception as e:
print(f"Error processing account {account_id}: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment