Skip to content

Instantly share code, notes, and snippets.

@gmariette
Created March 26, 2023 00:49
Show Gist options
  • Save gmariette/327c550f5e6802b2e7744ffe241943f7 to your computer and use it in GitHub Desktop.
Save gmariette/327c550f5e6802b2e7744ffe241943f7 to your computer and use it in GitHub Desktop.
def get_task_def_exposed_port(self, task_definition_arn:str, container_name:str):
# Here we build our cache - there is no need to call the API everytime as we don't push every minute
if self.task_definition_port_reference.get(task_definition_arn) is None:
self.task_definition_port_reference[task_definition_arn] = {}
if self.task_definition_port_reference[task_definition_arn].get(container_name) is None:
logger.info(f'{task_definition_arn=} not referenced for {container_name=}, using describe_task_definition api')
response = self.ecs_client.describe_task_definition(
taskDefinition=task_definition_arn
)['taskDefinition']
for container_definition in response.get('containerDefinitions'):
if container_definition.get('name') == container_name:
self.task_definition_port_reference[task_definition_arn][container_name] = container_definition['portMappings'][0].get('hostPort')
return self.task_definition_port_reference[task_definition_arn].get(container_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment