Skip to content

Instantly share code, notes, and snippets.

def get_task_def_exposed_port(self, task_definition_arn:str, container_name:str):
# Here we build our cache - there is no need to call the API everytime as we don't push every minute
if self.task_definition_port_reference.get(task_definition_arn) is None:
self.task_definition_port_reference[task_definition_arn] = {}
if self.task_definition_port_reference[task_definition_arn].get(container_name) is None:
logger.info(f'{task_definition_arn=} not referenced for {container_name=}, using describe_task_definition api')
response = self.ecs_client.describe_task_definition(
taskDefinition=task_definition_arn
)['taskDefinition']
for container_definition in response.get('containerDefinitions'):
def __init__(self, event_bus_name:str) -> None:
self.region = os.environ.get('REGION')
self.stackname = os.environ.get('STACKNAME')
self.project, self.env_type, self.env_num = self.stackname.split('-')
self.env_name = f"{self.env_type}-{self.env_num}".upper()
self.cluster_name = f'{self.stackname}-ECS-CLUSTER'
import boto3
import yaml
import json
import logging
import os
import sys
from glob import glob
from kubernetes import client, config, dynamic
from eks_token import get_token
from typing import List
IdentifyDockerPorts(){
FuncName
export PORT_LIST=""
OLDIFS=${IFS}
IFS=$'\n'
for CONTAINER in $(docker ps -q); do
if [[ ${PORT_LIST} == "" ]]; then
PORT_LIST=$(docker inspect ${CONTAINER} | jq -r '.[].HostConfig.PortBindings| .[]| .[]|.HostPort')
else
PORT_LIST=$(echo ${PORT_LIST},$(docker inspect ${CONTAINER} | jq -r '.[].HostConfig.PortBindings| .[]| .[]|.HostPort'))
import logging
import argparse
import os
import time
from glob import glob
from prometheus_client import start_http_server, Gauge
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def get_main_disk(fs):
disk_to_extend = subprocess.check_output('lsblk -l | grep -B1 ' + fs + ' | head -1 | awk \'{print $1}\'', shell=True).strip().decode('ascii')
return disk_to_extend
def extend_disk(fs):
disk_to_extend = get_main_disk(fs)
try:
print(f'Extending disk {disk_to_extend}')
disk_entend = subprocess.check_output(f'sudo growpart /dev/{disk_to_extend} 1', shell=True).strip().decode('ascii')
print(disk_entend)
def get_instance_id():
return requests.get('http://169.254.169.254/latest/meta-data/instance-id').content
def get_region():
return requests.get('http://169.254.169.254/latest/meta-data/placement/region').content
def init_ec2_client():
return boto3.client('ec2', region_name=get_region())
def identify_ebs_volume(client):
def check_usage(fs_to_check):
print(f"Checking usage of partition {fs_to_check}")
partition_usage = float(subprocess.check_output('df -h ' + fs_to_check + ' | tail -1 | awk \'{sub("%","");print $5}\'', shell=True).decode('ascii'))
return partition_usage
def get_current_partition_size(fs_to_check):
print(f"Getting current partition size {fs_to_check}")
partition_size = float(subprocess.check_output('df -h ' + fs_to_check + ' | tail -1 | awk \'{gsub("G","");print $2}\'', shell=True).decode('ascii'))
return partition_size
import subprocess
from math import ceil
import requests
import sys
import time
import boto3
#############################################
# #
# Author: GMariette #
import logging
import os
import boto3
import json
def lambda_handler(event, context):
"""
Send a message to an Amazon SQS queue.
"""