Created
April 20, 2017 20:44
-
-
Save alessandrobologna/d78f09c954f6126413c8e978fa8ceec9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
from datetime import datetime, timedelta | |
import boto3 | |
import json | |
import hashlib | |
from botocore.vendored import requests | |
SUCCESS = "SUCCESS" | |
FAILED = "FAILED" | |
SLEEP=10 | |
TIMEOUT_THRESHOLD=20000 | |
class TimeoutException(Exception): | |
pass | |
class FailedException(Exception): | |
pass | |
json.JSONEncoder.default = lambda self,obj: (obj.isoformat() if isinstance(obj, datetime) else None) | |
def dump(d): | |
return json.dumps(d) | |
def send(event, context, status, data, id, reason=None): | |
url = event['ResponseURL'] | |
body = {} | |
body['Status'] = status | |
body['Reason'] = reason | |
body['PhysicalResourceId'] = id | |
body['StackId'] = event['StackId'] | |
body['RequestId'] = event['RequestId'] | |
body['LogicalResourceId'] = event['LogicalResourceId'] | |
body['Data'] = data | |
json_body = dump(body) | |
print "Result:\n" + json_body | |
headers = {'content-type' : '', 'content-length' : str(len(json_body))} | |
try: | |
response = requests.put(url,data=json_body,headers=headers) | |
print "Status code: " + response.reason | |
except Exception as e: | |
print "send failed: " + str(e) | |
def lower(s): | |
""" boto3 likes lower case""" | |
return s[:1].lower() + s[1:] if s else s | |
def convert(k,s): | |
""" and it's picky about types""" | |
if k in ('containerPort','desiredCount','maximumPercent','minimumHealthyPercent'): | |
return (k, int(s)) | |
if k in ('waitForMonitors',): | |
return (k, s.lower() in ('yes','true','sure','yup')) | |
return (k, s) | |
def transform(d): | |
""" recurse the dictionary to translate from CF lingo to boto3""" | |
if type(d) is dict: | |
return dict([(convert(lower(k), transform(v))) for k, v in d.items()]) | |
elif type(d) is list: | |
return [transform(i) for i in d] | |
else: | |
return d | |
def describe(client,cluster,service, contex): | |
""" wrapper around boto3 describe services""" | |
try: | |
return client.describe_services(cluster=cluster,services=[service])['services'][0] | |
except (KeyError, TypeError, IndexError): | |
return None | |
except botocore.exceptions.ClientError as exception: | |
if "ThrottlingException" in str(exception): | |
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD: | |
raise TimeoutException("Timeout while waiting to call describe_services again") | |
time.sleep(SLEEP) | |
return describe(client,cluster,service, contex) | |
raise exception | |
def get_monitor_state(client, name): | |
if name: | |
response=client.describe_alarms(AlarmNames=[name]).get('MetricAlarms',None) | |
if response: | |
return response[0].get('StateValue',None) | |
return None | |
def wait_monitors(client, context, cluster, name, resource, data, event, max_time): | |
region=event['ServiceToken'].split(':')[3] | |
monitor_states = event.get('MonitorStates',{}) | |
cw = boto3.client('cloudwatch', region) | |
print ("Waiting for monitors signaling") | |
while context.get_remaining_time_in_millis() > TIMEOUT_THRESHOLD: | |
for monitor, previous_state in monitor_states.iteritems(): | |
current_state = get_monitor_state(cw, monitor) | |
print ("%s: %s -> %s" % (monitor,previous_state, current_state)) | |
if current_state!=previous_state and current_state == 'ALARM': | |
return send(event, context, FAILED, data, resource, "%s failed waiting for monitor %s with state %s" % (event['RequestType'], monitor, current_state)) | |
time.sleep(SLEEP) | |
response = describe(client,cluster,name,context) | |
data['Name'] = response.get('serviceName') | |
data['Status'] = response.get('events')[0].get('message',None) | |
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType']) | |
def wait_events(client, context, cluster, name, resource, data, event, max_time): | |
""" wait for 'steady state' in the message response or timeout""" | |
started_time=event.get('StartedTime',None) | |
if started_time and time.time() - started_time > max_time: | |
return send(event, context, FAILED, data, resource, ("%s failed after %s seconds elapsed)" % (event['RequestType'], max_time))) | |
event['Pending']="stabilize" | |
events = describe(client,cluster,name,context).get('events',[])[:1] | |
print ("Events:") | |
if events: | |
print (dump(events)) | |
previous = events | |
while len(events)==0 or not 'steady state.' in events[0].get('message',None): | |
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD: | |
raise TimeoutException("Timeout waiting for steady state") | |
time.sleep(SLEEP) | |
events = describe(client,cluster,name,context).get('events',[])[:1] | |
if events and events != previous: | |
print (dump(events)) | |
previous=events | |
if event.get('MonitorStates', None): | |
event['Pending']="monitoring" | |
raise TimeoutException("Waiting for monitors signal") | |
response = describe(client,cluster,name,context) | |
data['Name'] = response['serviceName'] | |
data['Status'] = events[0].get('message',None) | |
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType']) | |
def wait_deployments(client, context, cluster, name, resource, data, event, max_time): | |
""" wait for 'steady state' in the message response or timeout""" | |
started_time=event.get('StartedTime',None) | |
event['Pending']="deployment" | |
deployments = describe(client,cluster,name,context).get('deployments',[]) | |
previous = deployments | |
print ("Deployments:") | |
print (dump(deployments)) | |
while not len(deployments)==1: | |
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD: | |
raise TimeoutException("Timeout waiting for completed deployment") | |
if started_time and time.time() - started_time > max_time: | |
raise FailedException("%s did not complete after %s seconds elapsed" % (event['RequestType'], max_time)) | |
time.sleep(SLEEP) | |
deployments = describe(client,cluster,name,context).get('deployments',[]) | |
if deployments != previous: | |
print (dump(deployments)) | |
previous = deployments | |
return wait_events(client, context, cluster, name, resource, data, event, max_time) | |
def wait_inactive(client, context, cluster, name, resource, data, event, max_time): | |
""" wait for status to be inactive or timeout""" | |
started_time=event.get('StartedTime',None) | |
event['Pending']="deletion" | |
status = describe(client,cluster,name,context).get('status',None) | |
previous = status | |
print ("%s status: %s" % (name, status)) | |
while not status == "INACTIVE": | |
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD: | |
raise TimeoutException("Timeout waiting for service to become inactive") | |
if started_time and time.time() - started_time > max_time: | |
raise FailedException("%s did not complete after %s seconds elapsed" % (event['RequestType'], max_time)) | |
time.sleep(SLEEP) | |
try: | |
status = describe(client,cluster,name,context).get('status',None) | |
except botocore.exceptions.ClientError as exception: | |
if "ThrottlingException" in str(exception): | |
pass | |
else: | |
raise exception | |
if status != previous: | |
print ("%s status: %s" % (name, status)) | |
previous = status | |
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType']) | |
def lambda_handler(event, context): | |
resource=event.get('PhysicalResourceId',None) | |
data = {} | |
try: | |
reason = "" | |
print ("Event:\n" + dump(event)) | |
props=transform(event['ResourceProperties']) | |
name=props.get('serviceName',None) | |
max_time=props.get('maxTime',3600) | |
cluster=props.get('cluster',None) | |
taskdef=props.get('taskDefinition',None) | |
revision=taskdef.split(':')[-1:][0] if taskdef else "0" | |
family=re.compile('[:/]').split(taskdef)[6] | |
arn=':'.join(taskdef.split(':')[:-2]) | |
resource=event.get('PhysicalResourceId',arn + ":service/" + name + ":" +revision) | |
stack_id=event.get('StackId', None) | |
logical_resource_id=event.get('LogicalResourceId',None) | |
region=event['ServiceToken'].split(':')[3] | |
monitors = props.pop("monitors",None) | |
wait_for_monitors = props.pop("waitForMonitors",False) | |
props.pop("serviceToken",None) | |
try: | |
if not name or not cluster: | |
raise Exception("Missing service or cluster name") | |
client = boto3.client('ecs', region) | |
cf = boto3.client('cloudformation', region) | |
# is this a continuation invocation for this lambda? | |
pending = event.get('Pending',None) | |
if pending=="deployment": | |
return wait_deployments(client, context, cluster, name, resource, data, event, max_time) | |
if pending=="stabilize": | |
return wait_events(client, context, cluster, name, resource, data, event, max_time) | |
if pending=="deletion": | |
return wait_inactive(client, context, cluster, name, resource, data, event, max_time) | |
if pending=="monitoring": | |
return wait_monitors(client, context, cluster, name, resource, data, event, max_time) | |
if event['RequestType'] == "Delete": | |
# need to distinguish between an actual delete and a cleanup | |
if stack_id and logical_resource_id: | |
response = cf.describe_stack_resource(StackName=stack_id,LogicalResourceId=logical_resource_id) | |
if resource == response.get('StackResourceDetail',{}).get('PhysicalResourceId',None): | |
# the resource id matches what is in cloudformation, it's an actual delete | |
response = describe(client,cluster,name,context) | |
if response and response['status']=='ACTIVE' and response['desiredCount']>0: | |
# shutdown tasks first | |
response = client.update_service(cluster=cluster,service=name,desiredCount=0) | |
response = client.delete_service(cluster=cluster,service=name) | |
print ("Delete response:\n" + dump(response)) | |
event['StartedTime'] = time.time() | |
return wait_inactive(client, context, cluster, name, resource, data, event, max_time) | |
else: | |
print "Cleanup request for %s, will ignore it" % resource | |
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType']) | |
if event['RequestType'] == "Create": | |
props['clientToken'] = hashlib.md5(str(props)).hexdigest() | |
print ("Creating:\n" + dump(props)) | |
print ("Create response:\n" + dump(client.create_service(**props)['service'])) | |
if event['RequestType'] == "Update": | |
response = describe(client,cluster,name,context) | |
if response and response['status']!='ACTIVE': | |
raise Exception("Service not found or INACTIVE, cannot be updated") | |
props.pop('loadBalancers',None) | |
props.pop('role',None) | |
props.pop('placementConstraints', None) | |
props.pop('placementStrategy', None) | |
props["service"]=props.pop("serviceName") | |
print ("Updating:\n" + dump(props)) | |
print ("Update response:\n" + dump(client.update_service(**props)['service'])) | |
event['StartedTime'] = time.time() | |
if monitors and wait_for_monitors: | |
cw = boto3.client('cloudwatch', region) | |
states={} | |
for monitor in re.compile(',\s*').split(monitors): | |
state = get_monitor_state(cw, monitor) | |
if state: | |
states[monitor] = state | |
else: | |
print ("Monitor %s was not found" % monitor) | |
if states: | |
event['MonitorStates'] = states | |
return wait_deployments(client, context, cluster, name, resource, data, event, max_time) | |
except TimeoutException as e: | |
print "Passing the baton (%s)" % e | |
client = boto3.client('lambda',region) | |
client.invoke(FunctionName=event['ServiceToken'],InvocationType='Event', Payload=dump(event)) | |
except Exception as e: | |
send(event, context, FAILED, data, resource, ("%s failed: Exception: (%s)" % (event['RequestType'], e))) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment