Created
December 19, 2018 15:52
-
-
Save alvarotuso/589ea02dfa5823328e0f1e356563b8a1 to your computer and use it in GitHub Desktop.
DynamoDB Autoscaling Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function, unicode_literals | |
import calendar | |
import datetime | |
from collections import defaultdict | |
import boto3 | |
MANAGER_ENABLED_TAG = 'autoscaling_manager_enabled' | |
autoscaling_client = boto3.client('application-autoscaling', region_name='us-west-2') | |
dynamo_client = boto3.client('dynamodb', region_name='us-west-2') | |
resource_groups_tagging_client = boto3.client('resourcegroupstaggingapi', region_name='us-west-2') | |
cloudwatch_resource = boto3.resource('cloudwatch', region_name='us-west-2') | |
class DynamoDBAutoscalingManager: | |
CONSUMED_WRITE_CAPACITY_METRIC = 'ConsumedWriteCapacityUnits' | |
CONSUMED_READ_CAPACITY_METRIC = 'ConsumedReadCapacityUnits' | |
WRITE_THROTTLE_EVENTS_METRIC = 'WriteThrottleEvents' | |
READ_THROTTLE_EVENTS_METRIC = 'ReadThrottleEvents' | |
ACTIVE_TABLE_STATUS = 'ACTIVE' | |
LOOK_BACK_WINDOW_MINUTES = 5 | |
CAPACITY_METRICS = [ | |
CONSUMED_WRITE_CAPACITY_METRIC, | |
CONSUMED_READ_CAPACITY_METRIC, | |
] | |
THROTTLE_METRICS = [ | |
WRITE_THROTTLE_EVENTS_METRIC, | |
READ_THROTTLE_EVENTS_METRIC, | |
] | |
ALL_METRICS = CAPACITY_METRICS + THROTTLE_METRICS | |
READ_CAPACITY = 'ReadCapacityUnits' | |
WRITE_CAPACITY = 'WriteCapacityUnits' | |
MAPPED_METRICS = { | |
READ_CAPACITY: { | |
'consumed': CONSUMED_READ_CAPACITY_METRIC, | |
'throttling': READ_THROTTLE_EVENTS_METRIC | |
}, | |
WRITE_CAPACITY: { | |
'consumed': CONSUMED_WRITE_CAPACITY_METRIC, | |
'throttling': WRITE_THROTTLE_EVENTS_METRIC | |
} | |
} | |
MIN_SETTINGS = { | |
READ_CAPACITY: 5, | |
WRITE_CAPACITY: 5, | |
} | |
MAX_SETTINGS = { | |
READ_CAPACITY: 100000, | |
WRITE_CAPACITY: 80000, | |
} | |
# these settings are used to decide when to modify capacity | |
THROTTLING_THRESHOLD = 50 # how much throttling triggers a bigger response | |
HIGH_THROTTLING_CAPACITY_INCREASE = 4000 | |
LOW_THROTTLING_CAPACITY_INCREASE = 1000 | |
TARGET_UTILIZATION = 0.8 # ideal target utilization | |
BUFFERED_TARGET_UTILIZATION = 0.6 # how much we can deviate to keep decreases in check | |
def __init__(self, table_name): | |
self.table_name = table_name | |
self.table_data = self.get_dynamo_table_data() | |
self.gsi_data = {index_data['IndexName']: index_data | |
for index_data in self.table_data.get('GlobalSecondaryIndexes', [])} | |
self.current_throughput_settings = {self.table_name: self.table_data['ProvisionedThroughput']} | |
for index_name in self.gsi_data: | |
self.current_throughput_settings[index_name] = self.gsi_data[index_name]['ProvisionedThroughput'] | |
def get_dynamo_table_data(self): | |
""" | |
Get the current table status + provisioned capacity | |
""" | |
return dynamo_client.describe_table(TableName=self.table_name)['Table'] | |
def get_throughput_settings_for_request(self, element_name, new_throughput_settings_by_element): | |
""" | |
Get the new throughput settings we'll be sending in the request | |
""" | |
return { | |
self.READ_CAPACITY: new_throughput_settings_by_element[element_name].get( | |
self.READ_CAPACITY, self.current_throughput_settings[element_name][self.READ_CAPACITY]), | |
self.WRITE_CAPACITY: new_throughput_settings_by_element[element_name].get( | |
self.WRITE_CAPACITY, self.current_throughput_settings[element_name][self.WRITE_CAPACITY]) | |
} | |
def modify_dynamo_capacity(self, new_throughput_settings_by_element): | |
""" | |
Modify dynamo capacity based on the passed settings | |
""" | |
update_parameters = { | |
'TableName': self.table_name | |
} | |
if self.table_name in new_throughput_settings_by_element: | |
update_parameters['ProvisionedThroughput'] = self.get_throughput_settings_for_request( | |
self.table_name, new_throughput_settings_by_element) | |
gsi_updates = [] | |
for index_name in self.gsi_data: | |
if index_name in new_throughput_settings_by_element: | |
gsi_updates.append( | |
{ | |
'Update': {'IndexName': index_name, | |
'ProvisionedThroughput': self.get_throughput_settings_for_request( | |
index_name, new_throughput_settings_by_element)} | |
} | |
) | |
if gsi_updates: | |
update_parameters['GlobalSecondaryIndexUpdates'] = gsi_updates | |
try: | |
dynamo_client.update_table(**update_parameters) | |
print('Updated {} with the following settings: {}'.format(self.table_name, update_parameters)) | |
except Exception as e: | |
print('There was an error while trying to update the table capacity of {}: {}'.format(self.table_name, e)) | |
def get_cloudwatch_metric(self, metric_name, index_name=None): | |
""" | |
Retrieve the data points for a metric, table pair for the requested range | |
This method will assume that the passed in metrics have a resolution | |
of at least 1 minute to fill the cloudwatch gaps with zeroes | |
The capacity metrics are averaged over one minute | |
""" | |
end_date = datetime.datetime.utcnow().replace(second=0, microsecond=0) | |
start_date = end_date - datetime.timedelta(minutes=self.LOOK_BACK_WINDOW_MINUTES) | |
metric = cloudwatch_resource.Metric('AWS/DynamoDB', metric_name) | |
dimensions = [{'Name': 'TableName', 'Value': self.table_name}, ] | |
if index_name: | |
dimensions.append({'Name': 'GlobalSecondaryIndexName', 'Value': index_name}) | |
response = metric.get_statistics( | |
Dimensions=dimensions, | |
StartTime=start_date, | |
EndTime=end_date, | |
Period=60, | |
Statistics=['Sum'], | |
) | |
data_points = {calendar.timegm(data_point['Timestamp'].timetuple()): data_point | |
for data_point in response['Datapoints']} | |
metric_report = [] | |
current_date = start_date | |
while current_date < end_date: | |
current_timestamp = calendar.timegm(current_date.timetuple()) | |
sum_value = data_points.get(current_timestamp, {}).get('Sum', 0) | |
metric_report.append({ | |
'timestamp': current_timestamp, | |
'value': sum_value / 60 if metric_name in self.CAPACITY_METRICS else sum_value | |
}) | |
current_date += datetime.timedelta(minutes=1) | |
return metric_report | |
def get_usage_metrics(self): | |
""" | |
Retrieve all the needed metrics for the current table | |
""" | |
table_metrics = defaultdict(dict) | |
for metric in self.ALL_METRICS: | |
table_metrics[self.table_name][metric] = self.get_cloudwatch_metric(metric) | |
if self.gsi_data: | |
for index_name in self.gsi_data: | |
for metric in self.ALL_METRICS: | |
table_metrics[index_name][metric] = self.get_cloudwatch_metric(metric, index_name) | |
return table_metrics | |
def get_new_throughput_settings(self, element_name, element_usage_metrics): | |
""" | |
Get the new throughput settings for the provided element metrics | |
Here's where our autoscaling algorithm gets applied | |
This is used both for tables and gsi | |
""" | |
new_throughput_settings = {} | |
current_throughput_settings = self.current_throughput_settings[element_name] | |
for capacity_setting in self.MAPPED_METRICS: | |
currently_provisioned = current_throughput_settings[capacity_setting] | |
new_capacity_settings = None | |
throttling_metrics = element_usage_metrics[self.MAPPED_METRICS[capacity_setting]['throttling']] | |
average_throttling = (sum(record['value'] for record in throttling_metrics) / len(throttling_metrics)) | |
consumed_metrics = element_usage_metrics[self.MAPPED_METRICS[capacity_setting]['consumed']] | |
average_consumed = (sum(record['value'] for record in consumed_metrics) / len(consumed_metrics)) | |
utilization = average_consumed / currently_provisioned | |
can_increase_capacity = currently_provisioned < self.MAX_SETTINGS[capacity_setting] | |
target_provisioning = int(average_consumed * (1 / self.TARGET_UTILIZATION)) | |
min_provisioning = self.MIN_SETTINGS[capacity_setting] | |
if average_throttling and can_increase_capacity: # While throttling, we do quantized increases | |
new_capacity_settings = currently_provisioned + (self.HIGH_THROTTLING_CAPACITY_INCREASE | |
if average_throttling >= self.THROTTLING_THRESHOLD | |
else self.LOW_THROTTLING_CAPACITY_INCREASE) | |
print('{}-{} has {} average throttling'.format( | |
element_name, capacity_setting, average_throttling)) | |
elif currently_provisioned < min_provisioning or not average_consumed: | |
new_capacity_settings = min_provisioning # If we are below min or nothing is using this table | |
print('{}-{} has no consumed capacity'.format( | |
element_name, capacity_setting)) | |
elif utilization > self.TARGET_UTILIZATION and can_increase_capacity: | |
new_capacity_settings = target_provisioning # Increase to match provisioning | |
print('{}-{} has high utilization ({})'.format( | |
element_name, capacity_setting, utilization)) | |
elif not average_throttling and utilization < self.BUFFERED_TARGET_UTILIZATION: | |
new_capacity_settings = target_provisioning # Decrease to match provisioning, giving some buffer | |
print('{}-{} has low utilization ({})'.format( | |
element_name, capacity_setting, utilization)) | |
if new_capacity_settings and new_capacity_settings != currently_provisioned: | |
print('Setting {}-{} capacity to {}'.format(element_name, capacity_setting, new_capacity_settings)) | |
new_throughput_settings[capacity_setting] = new_capacity_settings | |
else: | |
print('{}-{} is already at {}. Skipping'.format(element_name, capacity_setting, currently_provisioned)) | |
return new_throughput_settings | |
def manage_table_autoscaling(self): | |
""" | |
Retrieve the current throughput settings of the table and the consumed capacity in cloudwatch | |
If we detect that the capacity settings need adjusting, we'll try to change them | |
""" | |
if self.table_data['TableStatus'] != self.ACTIVE_TABLE_STATUS: | |
print('{} is not in {} status. Skipping as we will not be able to update it'.format(self.table_name, | |
self.ACTIVE_TABLE_STATUS)) | |
return | |
usage_metrics = self.get_usage_metrics() | |
new_throughput_settings_by_element = {} | |
for element_name in [self.table_name] + list(self.gsi_data.keys()): | |
new_throughput_settings = self.get_new_throughput_settings(element_name, usage_metrics[element_name]) | |
if new_throughput_settings: | |
new_throughput_settings_by_element[element_name] = new_throughput_settings | |
if new_throughput_settings_by_element: | |
self.modify_dynamo_capacity(new_throughput_settings_by_element) | |
def is_table_auto_scaling_enabled(table_name): | |
""" | |
Detect whether we even need to assist an auto scaling policy | |
""" | |
response = autoscaling_client.describe_scaling_policies(ServiceNamespace='dynamodb', | |
ResourceId='table/{}'.format(table_name)) | |
return bool(response['ScalingPolicies']) | |
def get_enabled_tables(): | |
""" | |
Retrieve the tables that have the MANAGER_ENABLED_TAG set to true | |
""" | |
response = resource_groups_tagging_client.get_resources( | |
TagFilters=[{ | |
'Key': MANAGER_ENABLED_TAG, 'Values': ['true'] | |
}], | |
ResourceTypeFilters=['dynamodb'] | |
) | |
return ([item['ResourceARN'].split('/')[1] for item in response['ResourceTagMappingList']] | |
if response['ResourceTagMappingList'] else []) | |
def handler(event, context): | |
""" | |
This lambda function manages dynamo db scaling for the tables that have the MANAGER_ENABLED_TAG tag set to true | |
If autoscaling is already enabled for this table, this won't perform any operations | |
""" | |
processed_tables = [] | |
enabled_tables = get_enabled_tables() | |
print('Retrieved the following tables with the {} tag set to true: {}'.format(MANAGER_ENABLED_TAG, enabled_tables)) | |
for table_name in enabled_tables: | |
if not is_table_auto_scaling_enabled(table_name): | |
print('Will manage autoscaling for {}'.format(table_name)) | |
DynamoDBAutoscalingManager(table_name).manage_table_autoscaling() | |
processed_tables.append(table_name) | |
else: | |
print('{} does is managed by dynamo autoscaling. Skipping'.format(table_name)) | |
return { | |
'processed_tables': processed_tables, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment