Skip to content

Instantly share code, notes, and snippets.

@gmariette
Last active December 11, 2022 21:57
Show Gist options
  • Save gmariette/d4c552e8013f95a2366b3d5e4669a2e5 to your computer and use it in GitHub Desktop.
Save gmariette/d4c552e8013f95a2366b3d5e4669a2e5 to your computer and use it in GitHub Desktop.
import boto3
import yaml
import json
import logging
import os
import sys
from glob import glob
from kubernetes import client, config, dynamic
from eks_token import get_token
from typing import List
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class KubeRegistration:
def __init__(self, _event):
self.event = _event
logger.info(self.event)
request_id = self.event.get('detail').get('RequestId')
self.kube_sd_path=f"/tmp/charts-{request_id}"
logger.info(f"About to create directory {self.kube_sd_path=}")
# Create the config file path
os.makedirs(self.kube_sd_path, exist_ok = True)
self.kube_config_path=f"/tmp/kube-config-{request_id}"
logger.info(f"About to create directory {self.kube_config_path=}")
os.makedirs(self.kube_config_path, exist_ok = True)
self.namespace = 'observability-abp'
def api_endpoints_mapping(self) -> List[dict]:
'''
DOCSTRING: Return endpoints / apiversion because we use the kubernetes dynamic client
INPUT: None
OUTPUT: List of dictionnaries
'''
return [
{ 'kind': 'Endpoints', 'apiVersion': 'v1' },
{ 'kind': 'Service', 'apiVersion': 'v1' },
{ 'kind': 'ServiceMonitor', 'apiVersion': 'monitoring.coreos.com/v1', 'namespace': 'operators' }
]
def generate_endpoints_config_file(self, name:str, namespace:str, ip_list:List[str], port_list:List[int]) -> None:
'''
DOCSTRING: Create an endpoints.yaml that will register all of our hosts, per ip
INPUT: name:str, namespace:str, ip_list:List[str], port_list:List[int]
OUTPUT: None, write config f'{kube_sd_path}/endpoints.yaml'
'''
server_type, project, env, env_num = name.split('-')
address_config = {}
address_config['addresses'] = []
address_config['ports'] = []
for ip in ip_list:
address_config['addresses'].append({'ip':ip, 'targetRef': {'namespace': namespace}})
for port in port_list:
address_config['ports'].append(
{
'name': 'metrics',
'port': port,
'protocol': 'TCP'
}
)
file_content = {
'apiVersion': 'v1',
'kind': 'Endpoints',
'metadata': {
'name': name,
'namespace': namespace,
'labels': {
'app': name,
'server_type': server_type,
'env': env,
'env_num': env_num,
'project': project,
}
},
'subsets': [address_config]
}
documents = yaml.dump(file_content, default_flow_style=False)
with open(f'{self.kube_sd_path}/endpoints.yaml', 'w') as file:
file.write(documents)
def generate_service_config_file(self, name:str, namespace:str, port_list:List[str]) -> None:
'''
DOCSTRING: Create one service for k8s-app identified by it's metadata:name
INPUT: name:str, namespace:str, port_list:List[str]
OUTPUT: None, write config f'{kube_sd_path}/service_{name}.yaml'
'''
logger.info(f'Generating Service file for k8s-app {name}, {namespace=}')
server_type, project, env, env_num = name.split('-')
tmp_port_list = []
for port in port_list:
tmp_port_list.append(
{
'name': f'metrics-{port}',
'port': port,
'protocol': 'TCP',
'targetPort': port,
}
)
file_content = {
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {
'name': name,
'namespace': namespace,
'labels': {
'server_type': server_type,
'env': env,
'env_num': env_num,
'project': project,
'app': name
}
},
'spec': {
'type': 'ClusterIP',
'ports': tmp_port_list
}
}
documents = yaml.dump(file_content, default_flow_style=False)
with open(f'{self.kube_sd_path}/service_{name}.yaml', 'w') as file:
file.write(documents)
def generate_service_monitor_file(self, name:str, namespace:str) -> None:
'''
DOCSTRING: Create one service_monitor for labels:k8s-app
INPUT: name:str, namespace:str
OUTPUT: None, write config f'{kube_sd_path}/service_monitor.yaml'
'''
logger.info(f'Generating ServiceMonitor file for k8s-app {name}, {namespace=}')
server_type, project, env, env_num = name.split('-')
file_content = {
'apiVersion': 'monitoring.coreos.com/v1',
'kind': 'ServiceMonitor',
'metadata': {
'name': name,
'namespace': 'operators',
'labels': {
'server_type': server_type,
'env': env,
'env_num': env_num,
'project': project,
'k8s-app': name
}
},
'spec': {
'endpoints': [
{
'honorLabels': True,
'interval': '1m',
'path': '/metrics',
'port': 'metrics',
'scheme': 'http',
'scrapeTimeout': '30s'
}
],
'jobLabel': name,
'namespaceSelector': {
'matchNames': [ namespace ]
},
'selector': {
'matchLabels': {
'app': name
}
}
}
}
documents = yaml.dump(file_content, default_flow_style=False)
with open(f'{self.kube_sd_path}/service_monitor.yaml', 'w') as file:
file.write(documents)
def identify_instances_by_asg(self, source_account:int, asg:str) -> list:
'''
DOCSTRING: Identify running instance for an ASG
INPUT: source_account:int, asg:str
OUTPUT: list of instances id, an empty list if no instance running
'''
logger.info(f"Going to identify the instances for ASG {asg}")
ACCESS_KEY, SECRET_KEY, SESSION_TOKEN = self.my_assume_role(source_account, os.environ.get('obs_iam_role'))
client = self.init_aws_client_with_session('autoscaling', os.environ.get('region'), ACCESS_KEY, SECRET_KEY, SESSION_TOKEN)
try:
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg]).get('AutoScalingGroups')[0]
except IndexError:
logger.error(f'No instance found for asg {asg}!')
sys.exit(1)
instances = [ x.get('InstanceId') for x in response.get('Instances') ]
logger.info(f"Found {len(instances)} instances for ASG {asg}")
if len(instances):
return instances
else:
logger.error(f'No instance found for asg {asg}')
return []
def get_lifecycle_hook_metadata(self, source_account:int, asg:str, lch:str) -> tuple:
'''
DOCSTRING: Get the lifecycle hook meta data from an ASG
INPUT: source_account:int, asg:str, lch:str
OUTPUT: tuple(name of the asg (to map on the k8s cluster), exporter ports exposed))
'''
logger.info(f"Going to grab metadata from LCH {lch}")
ACCESS_KEY, SECRET_KEY, SESSION_TOKEN = self.my_assume_role(source_account, os.environ.get('obs_iam_role'))
client = self.init_aws_client_with_session('autoscaling', os.environ.get('region'), ACCESS_KEY, SECRET_KEY, SESSION_TOKEN)
try:
response = client.describe_lifecycle_hooks(
AutoScalingGroupName=asg,
LifecycleHookNames=[lch]
).get('LifecycleHooks')[0]
except IndexError:
logger.error(f'No instance found for asg {lch}!')
sys.exit(1)
notification_metadata = json.loads(response.get('NotificationMetadata'))
name = notification_metadata.get('name')
ports = notification_metadata.get('ports')
rules = notification_metadata.get('rules')
return (name, ports, rules)
def get_instances_ip(self, source_account:int, instance_list:List[str]) -> list:
'''
DOCSTRING: Get the instances ip based on instances id
INPUT: source_account:int, instance_list:List[str]
OUTPUT: ip list (either with IPv4 addresses or empty)
'''
logger.info(f"Going to identify the ip addresses for instances {' - '.join(instance_list)}")
ACCESS_KEY, SECRET_KEY, SESSION_TOKEN = self.my_assume_role(source_account, os.environ.get('obs_iam_role'))
client = self.init_aws_client_with_session('ec2', os.environ.get('region'), ACCESS_KEY, SECRET_KEY, SESSION_TOKEN)
ip_list = []
response = client.describe_instances(InstanceIds=instance_list).get('Reservations')
for item in response:
ip_list.append(item['Instances'][0].get('PrivateIpAddress'))
logger.info(f'Found ips {" - ".join(ip_list)}')
return ip_list
def my_assume_role(self, id:int, role:str) -> tuple:
'''
DOCSTRING: Assume an aws role with fixed endpoint url (legacy from sts endpoint)
INPUT: id:int, role:str
OUTPUT: tuple(credentials)
'''
# Sts needs to have the endpoint url configured
stsClient = boto3.client('sts', region_name=os.environ.get('region'), endpoint_url=f'https://sts.{os.environ.get("region")}.amazonaws.com')
assumedRoleObject=stsClient.assume_role(
RoleArn=f"arn:aws:iam::{id}:role/{role}",
RoleSessionName="LambdaRole"
)
credentials=assumedRoleObject['Credentials']
ACCESS_KEY=credentials.get("AccessKeyId")
SECRET_KEY=credentials.get("SecretAccessKey")
SESSION_TOKEN=credentials.get("SessionToken")
return (ACCESS_KEY, SECRET_KEY, SESSION_TOKEN)
def init_aws_client_with_session(self, client_type:str, region:str, access_key:str, secret_key:str, session_token:str) -> boto3.Session:
'''
DOCSTRING: Create a boto3 session for a specific client
INPUT: client_type:str, region:str, access_key:str, secret_key:str, session_token:str
OUTPUT: boto3.Session
'''
return boto3.Session(
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token
).client(client_type)
def create_kubeconfig(self) -> None:
'''
DOCSTRING: Create a kubeconfig to access an EKS cluster
INPUT: None
OUTPUT: None, dumps '/tmp/kubeconfig.yaml'
'''
logger.info('Creating kubeconfig file')
# build the cluster config hash
cluster_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [
{
"cluster": {
"server": os.environ.get('cluster_api_endpoint'),
"certificate-authority-data": os.environ.get('cluster_certificate_autority_data')
},
"name": "kubernetes"
}
],
"contexts": [
{
"context": {
"cluster": "kubernetes",
"user": "aws"
},
"name": "aws"
}
],
"current-context": "aws",
"preferences": {},
"users": [
{
"user": {'token': get_token(cluster_name=os.environ.get('cluster_name'))['status']['token']},
"name": "aws"
}
]
}
config_text=yaml.dump(cluster_config, default_flow_style=False)
with open(f'{self.kube_config_path}/kubeconfig.yaml', 'w') as f:
f.write(config_text)
logger.info('Kubeconfig file created')
def load_kubeconfig(self) -> None:
'''
DOCSTRING: Load kubeconfig
INPUT: None
OUTPUT: None
'''
with open(f'{self.kube_config_path}/kubeconfig.yaml', 'r') as f:
config.load_kube_config(config_file=f)
def list_sd_files(self) -> List[str]:
'''
DOCSTRING: List all yaml files to be executed on our kubernetes cluster
INPUT: None
OUTPUT: List of yaml files
'''
file_list = [f for f in glob(f"{self.kube_sd_path}/*.yaml")]
logger.info(f'Founds those files to apply: {file_list}')
return file_list
def apply_simple_item(self, dynamic_client, manifest: dict) -> None:
'''
DOCSTRING: Apply a loaded yaml file to our Kubernetes cluster. Try a create first, if fails, run a patch command
INPUT: dynamic_client, manifest: dict
OUTPUT: List of yaml files
'''
api_version = manifest.get("apiVersion")
kind = manifest.get("kind")
resource_name = manifest.get("metadata").get("name")
namespace = manifest.get("metadata").get("namespace")
crd_api = dynamic_client.resources.get(api_version=api_version, kind=kind)
try:
crd_api.get(namespace=namespace, name=resource_name)
crd_api.patch(body=manifest, content_type="application/merge-patch+json")
logger.info(f"{namespace}/{resource_name} patched")
except:
crd_api.create(body=manifest, namespace=namespace)
logger.info(f"{namespace}/{resource_name} created")
def apply_simple_item_from_yaml(self, dynamic_client, filepath:str) -> None:
'''
DOCSTRING: Load a yaml file and call the method apply_simple_item with the file loaded
INPUT: dynamic_client, filepath
OUTPUT: None
'''
with open(filepath, 'r') as f:
manifest = yaml.safe_load(f)
self.apply_simple_item(dynamic_client=dynamic_client, manifest=manifest)
def apply_yaml_to_kube(self) -> None:
'''
DOCSTRING: Main method to apply a yaml file to kube. Create a k8s client, and call the apply_simple_item_from_yaml
method to apply all of the found yaml config files
INPUT: None
OUTPUT: None
'''
k8s_client = dynamic.DynamicClient(client.ApiClient())
sd_files = self.list_sd_files()
logger.info(f"Applying all files from dir {self.kube_sd_path}")
for sd_file in sd_files:
logger.info(f"Applying the sd file {sd_file}")
try:
self.apply_simple_item_from_yaml(k8s_client, sd_file)
except Exception as e:
logger.exception(e)
logger.error(f"Unable to apply sd file {sd_file}")
with open(sd_file, 'r') as f:
logger.error(yaml.safe_load(f))
continue
def delete_item(self, crd_api, namespace:str, name:str) -> None:
try:
crd_api.delete(namespace=namespace, name=name)
logger.info(f'Succssfully deleted {name=}, {namespace=}')
except Exception as e:
logger.exception(e)
logger.error(f'Unable to delete {name=}, {namespace=}')
def purge_custom_resource(self, name:str, namespace:str) -> None:
'''
DOCSTRING: Purge a resource for a kubernetes cluster by invoking the deleted method
INPUT: name:str, namespace:str
OUTPUT: None
'''
k8s_client = dynamic.DynamicClient(client.ApiClient())
for item in self.api_endpoints_mapping():
kind = item.get('kind')
api_version = item.get('apiVersion')
ns = item.get('namespace', namespace)
logger.info(f'About to delete {kind=}, {api_version=} for {name}, {namespace=}')
crd_api = k8s_client.resources.get(api_version=api_version, kind=kind)
self.delete_item(crd_api, ns, name)
def handler(event:dict, context:dict):
'''
DOCSTRING: Main AWS lambda program
INPUT: event:dict, context:dict
OUTPUT: None
'''
# First we identify all the variables we need
asg_name = event.get('detail').get('AutoScalingGroupName')
event_detail = event.get('detail-type')
lifecycle_hook = asg_name.replace('AutoScalingGroup', 'LifecycleHook')
source_account = event.get('account')
namespace = os.environ.get('namespace')
kube_registration = KubeRegistration(event)
name, ports, rules = kube_registration.get_lifecycle_hook_metadata(source_account, asg_name, lifecycle_hook)
logger.info(f"Going to analyse {name=} {ports=}, {asg_name=}")
# Then we identify if our ASG has some running instances
instances = kube_registration.identify_instances_by_asg(source_account, asg_name)
# If we don't have any instance running on the ASG, we init our ip_list as empty, to purge the configs on k8s side later
if instances:
ip_list = kube_registration.get_instances_ip(source_account, instances)
else:
ip_list = []
# Kubernetes config
kube_registration.create_kubeconfig()
kube_registration.load_kubeconfig()
# If the ip_list in not empty, then we create the objects
if ip_list:
# We generate the files required - we assume that if the instance is being terminated, it has already being configured.
kube_registration.generate_endpoints_config_file(name, namespace, ip_list, ports)
if event_detail == "EC2 Instance Launch Successful":
kube_registration.generate_service_config_file(name, namespace, ports)
kube_registration.generate_service_monitor_file(name, namespace)
kube_registration.apply_yaml_to_kube()
# We assume that the ASG does not contains any instance, so we delete all the existing objects (endpoints, service, servicemonitor, rules)
else:
logger.info(f'No ip found for ASG {asg_name}')
kube_registration.purge_custom_resource(name, namespace)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment