|
#! /usr/bin/env python |
|
import sys |
|
import json |
|
import subprocess |
|
|
|
|
|
def aws_run(aws_cmd): |
|
try: |
|
print(f"running: aws {aws_cmd} --output json") |
|
return subprocess.run( |
|
f"aws {aws_cmd} --output json", |
|
shell=True, |
|
capture_output=True |
|
).stdout.decode() |
|
except Exception as exc: |
|
raise RuntimeError(f"Cannot exec {aws_cmd}") from exc |
|
|
|
|
|
def get_aws_out(aws_cmd): |
|
return json.loads(aws_run(aws_cmd)) |
|
|
|
|
|
def get_layers(): |
|
return get_aws_out("lambda list-layers")["Layers"] |
|
|
|
|
|
def get_layer_versions(layer): |
|
return get_aws_out( |
|
f"lambda list-layer-versions --layer-name {layer}" |
|
)["LayerVersions"] |
|
|
|
|
|
def get_functions(): |
|
return get_aws_out("lambda list-functions")["Functions"] |
|
|
|
|
|
def get_functions_layers(functions): |
|
for function in functions: |
|
layers = function.get("Layers", []) |
|
for layer in layers: |
|
yield layer |
|
|
|
|
|
def get_layers_versions(layers): |
|
for layer in layers: |
|
name = layer["LayerName"] |
|
versions = get_layer_versions(name) |
|
for version in versions: |
|
yield version |
|
|
|
|
|
def get_previous_versions(layers, n=10): |
|
""" |
|
This try to keep at least 10 nearest versions of one in use |
|
""" |
|
for name in layers: |
|
version = int(name.split(":")[-1]) |
|
yield name |
|
|
|
for i in range(n): |
|
yield name.replace(f"{version}", f"{version - i}") |
|
yield name.replace(f"{version}", f"{version + i}") |
|
|
|
|
|
if __name__ == "__main__": |
|
layers_in_use = get_functions_layers(get_functions()) |
|
layers_versions = get_layers_versions(get_layers()) |
|
version_arns = set( |
|
sorted(version["LayerVersionArn"] for version in layers_versions) |
|
) |
|
in_use_arns = set( |
|
sorted(layer["Arn"] for layer in layers_in_use) |
|
) |
|
possible_in_use_arns = set( |
|
sorted(get_previous_versions(in_use_arns)) |
|
) |
|
not_in_use_arns = list( |
|
sorted( |
|
version_arns - possible_in_use_arns, |
|
key=lambda arn: int(arn.split(':')[-1]), |
|
reverse=True |
|
) |
|
) |
|
|
|
len_not_in_use = len(not_in_use_arns) |
|
len_versions = len(version_arns) |
|
len_possible_in_use = len(possible_in_use_arns) |
|
print("not in use = versions - possible in use") |
|
print(f"{len_not_in_use} = {len_versions} - {len_possible_in_use}") |
|
for arn in not_in_use_arns: |
|
name = arn.split(':')[-2] |
|
version = arn.split(':')[-1] |
|
cmd = " ".join([ |
|
"lambda delete-layer-version", |
|
f"--layer-name {name}", |
|
f"--version-number {version}" |
|
]) |
|
if len(sys.argv) > 1 and sys.argv[1] == "FLÜGGÅƎNKƋ€ČHIŒßØLĮÊN": |
|
assert arn not in possible_in_use_arns, f"{arn} in use" |
|
cmd = " ".join([ |
|
"lambda delete-layer-version", |
|
f"--layer-name {name}", |
|
f"--version-number {version}" |
|
]) |
|
print(aws_run(cmd)) |
|
else: |
|
cmd = " ".join([ |
|
"lambda get-layer-version", |
|
f"--layer-name {name}", |
|
f"--version-number {version}" |
|
]) |
|
print("NOT running", cmd) |