Skip to content

Instantly share code, notes, and snippets.

@gmariette
Created May 26, 2022 22:38
Show Gist options
  • Save gmariette/b7135090f3d08424f2f8169ba78e8209 to your computer and use it in GitHub Desktop.
Save gmariette/b7135090f3d08424f2f8169ba78e8209 to your computer and use it in GitHub Desktop.
def get_instance_id():
return requests.get('http://169.254.169.254/latest/meta-data/instance-id').content
def get_region():
return requests.get('http://169.254.169.254/latest/meta-data/placement/region').content
def init_ec2_client():
return boto3.client('ec2', region_name=get_region())
def identify_ebs_volume(client):
response = client.describe_instances(
InstanceIds=[get_instance_id()]
)['Reservations'][0]['Instances'][0].get('BlockDeviceMappings')
if len(response) > 1:
print(f'We have identified {len(response)} EBS drive, please check manually!')
sys.exit(1)
else:
return response[0]['Ebs'].get('VolumeId')
def get_volume_size(client, volume_id):
response = client.describe_volumes(
VolumeIds=[volume_id]
)['Volumes'][0]['Size']
return response
def extend_volume(client, volume_id, new_size):
print(f'Going to extend volume {volume_id} to {new_size}G')
try:
response = client.modify_volume(
VolumeId=volume_id,
Size=new_size
)
return True
except Exception as e:
print(f'Unable to extend volume, exception: {e}')
return False
def wait_volume_modified(client, volume_id):
available_states = ["optimizing", "completed"]
state = ""
while state not in available_states:
state = client.describe_volumes_modifications(VolumeIds=volume_id)['VolumesModifications'][0].get('ModificationState')
print(f'Volume {volume_id} still not available, waiting...')
time.sleep(5)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment