Skip to content

Instantly share code, notes, and snippets.

@CoRfr
Created May 19, 2021 19:54
Show Gist options
  • Save CoRfr/1523b5bc3d30920149ac98b348667450 to your computer and use it in GitHub Desktop.
Save CoRfr/1523b5bc3d30920149ac98b348667450 to your computer and use it in GitHub Desktop.
Script to collect AWS EC2 instance prices and associated EKS pod limit
#!/usr/bin/env python3
import logging
from os import cpu_count
import boto3
import json
import requests
from pkg_resources import resource_filename
from multiprocessing import Pool
import tabulate
import time
import argparse
import sys
# Search product filter
FLT = (
'[{{"Field": "tenancy", "Value": "shared", "Type": "TERM_MATCH"}},'
'{{"Field": "operatingSystem", "Value": "{o}", "Type": "TERM_MATCH"}},'
'{{"Field": "preInstalledSw", "Value": "NA", "Type": "TERM_MATCH"}},'
'{{"Field": "instanceType", "Value": "{t}", "Type": "TERM_MATCH"}},'
'{{"Field": "location", "Value": "{r}", "Type": "TERM_MATCH"}},'
'{{"Field": "capacitystatus", "Value": "Used", "Type": "TERM_MATCH"}}]'
)
# Get current AWS price for an on-demand instance
def query_info(region, instance, os):
f = FLT.format(r=region, t=instance, o=os)
while True:
try:
data = client.get_products(ServiceCode="AmazonEC2", Filters=json.loads(f))
break
except Exception as ex:
logging.error(ex)
time.sleep(5)
price_list = json.loads(data["PriceList"][0])
info = {}
info["cpu_count"] = float(price_list["product"]["attributes"]["vcpu"])
info["cpu_type"] = price_list["product"]["attributes"]["physicalProcessor"]
info["mem"] = float(
price_list["product"]["attributes"]["memory"].replace("GiB", "")
)
od = price_list["terms"]["OnDemand"]
id1 = list(od)[0]
id2 = list(od[id1]["priceDimensions"])[0]
info["price_per_hour"] = float(
od[id1]["priceDimensions"][id2]["pricePerUnit"]["USD"]
)
return info
# Translate region code to region name
def get_region_name(region_code):
default_region = "EU (Ireland)"
endpoint_file = resource_filename("botocore", "data/endpoints.json")
try:
with open(endpoint_file, "r") as f:
data = json.load(f)
return data["partitions"][0]["regions"][region_code]["description"]
except IOError:
return default_region
parser = argparse.ArgumentParser(
description="Script to collect instance info regarding to EKS pod limit"
)
node_families = ["nano", "micro", "small", "medium", "large", "xlarge"]
for i in range(2, 32):
node_families += ["%ixlarge" % i]
node_families += ["metal"]
# Pricing API is not available everywhere
parser.add_argument("-r", "--region", default="us-east-1")
parser.add_argument("-x", "--no-arm-cpu", default=True, action="store_true")
parser.add_argument("-H", "--highlight", action="append")
parser.add_argument("-o", "--output")
parser.add_argument("-f", "--format", choices=["table", "json"], default="table")
parser.add_argument("-m", "--min", default="small")
parser.add_argument("-M", "--max", default="xlarge")
args = parser.parse_args()
region_code = args.region
region_name = get_region_name(region_code)
# Use AWS Pricing API
client = boto3.client("pricing", region_name=region_code)
# Node family filter
min_index = node_families.index(args.min)
max_index = node_families.index(args.max)
node_types_filter = node_families[min_index:max_index]
# Filter out ARM instances
cpu_types_filter = []
if args.no_arm_cpu:
cpu_types_filter = ["Intel", "AMD"]
pod_limit_url = "https://raw.githubusercontent.com/awslabs/amazon-eks-ami/master/files/eni-max-pods.txt"
def get_node_type_info(input):
node_type, pod_limit = input
print(node_type)
info = query_info(region_name, node_type, "Linux")
for cpu_filter in cpu_types_filter:
if cpu_filter in info["cpu_type"]:
break
else:
return None
info["node_type"] = node_type
info["pod_limit"] = pod_limit
# Price per month
info["price_per_month"] = info["price_per_hour"] * 24 * (365 / 12)
# Per pod calculations
info["price_per_pod"] = info["price_per_month"] / pod_limit
info["cpu_per_pod"] = info["cpu_count"] / pod_limit
info["mem_per_pod"] = info["mem"] / pod_limit
return info
if __name__ == "__main__":
list_node_types = []
resp = requests.get(pod_limit_url)
for line in resp.iter_lines():
content = line.decode("utf-8").split(" ")
if content[0].startswith("#") or len(content) != 2:
continue
node_type = content[0]
pod_limit = int(content[1])
for node_filter in node_types_filter:
if ("." + node_filter) in node_type:
break
else:
continue
list_node_types += [(node_type, pod_limit)]
with Pool(4) as pool:
node_types = []
for node_type in pool.map(get_node_type_info, list_node_types):
# Filter out None results
if node_type:
node_types += [node_type]
headers_map = {
"highlight": "-",
"node_type": "Type",
"pod_limit": "Pod limit",
"cpu_count": "vCPU",
"cpu_per_pod": "Per pod",
"mem": "RAM",
"mem_per_pod": "Per pod",
"price_per_month": "Price",
"price_per_pod": "Per pod",
"cpu_type": "CPU",
}
# Sorted per pod price
node_types_sorted = sorted(node_types, key=lambda row: row["price_per_pod"])
if args.format == "table":
# Map data from our internal db to a table
table = []
for info in node_types_sorted:
row = []
for key in headers_map.keys():
if key == "highlight":
node_type = info["node_type"]
if node_type in args.highlight:
row += ["*"]
else:
row += [""]
else:
row += [info[key]]
table += [row]
# Output as table
doc = tabulate.tabulate(table, headers=headers_map.values())
if args.output:
with open(args.output, "w") as fd:
fd.write(doc)
else:
print(doc)
elif args.format == "json":
if args.output:
with open(args.output, "w") as fd:
json.dump(node_types_sorted, fd)
else:
json.dump(node_types_sorted, sys.stdout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment