Last active
February 22, 2025 15:37
-
-
Save ericsmalling/f1fc9030c5832dc4c7b61e95ca41ca75 to your computer and use it in GitHub Desktop.
finds layers in OCI images that contain operations on paths matching a substring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# For a given image URL, platform and search string, fid any layers that add, remove or change a file with a name that matches the search string | |
import sys | |
import os | |
import subprocess | |
import json | |
import re | |
import datetime | |
# default platform is linux/arm64 | |
platform = "linux/arm64" | |
def get_image_manifest_json(image_url, platform): | |
try: | |
manifest = subprocess.check_output(["crane", "manifest", "--platform", platform, image_url]) | |
manifest_json = json.loads(manifest) | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") | |
sys.exit(1) | |
except json.JSONDecodeError as e: | |
print(f"Error: {e}") | |
return manifest_json | |
def get_image_config_json(image_url, platform): | |
try: | |
config = subprocess.check_output(["crane", "config", "--platform", platform, image_url]) | |
config_json = json.loads(config) | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") | |
sys.exit(1) | |
except json.JSONDecodeError as e: | |
print(f"Error: {e}") | |
return config_json | |
def merge_manifest_into_history(manifest_json, config_json): | |
# for every history entry that doesn't contain "empty_layer": true, add the digest of the layer to the history entry | |
history = config_json["history"] | |
for history_entry in history: | |
if "empty_layer" not in history_entry: | |
# pop the 0th elelement off the manifest_json["layers"] list and add the digest to the history entry | |
layer = manifest_json["layers"].pop(0) | |
history_entry["digest"] = layer["digest"] | |
history_entry["size"] = layer["size"] | |
return history | |
def get_creation_since(created): | |
created = re.sub(r"\.\d+Z$", "Z", created) # Strip any decimal value off the seconds | |
created_time = datetime.datetime.strptime(created, "%Y-%m-%dT%H:%M:%SZ") | |
time_since_created = datetime.datetime.now() - created_time | |
days, seconds = time_since_created.days, time_since_created.seconds | |
if days > 0: | |
created = f"{days} days ago" | |
elif seconds >= 3600: | |
hours = seconds // 3600 | |
created = f"{hours} hours ago" | |
else: | |
minutes = (seconds % 3600) // 60 | |
created = f"{minutes} minutes ago" | |
return created | |
def formated_size(bytes): | |
for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
if bytes < 1024: | |
break | |
bytes /= 1024 | |
bytes = f"{bytes:.1f}{unit}" | |
return bytes | |
def search_layer_for_file(image_url, history, search): | |
print("CREATED CREATED BY SIZE COMMENT FILES") | |
# using crane blob $IMG@$blob | tar -tz to list the files in the layer, search for any files containg the search string and add a list of files found to the history entry | |
for history_entry in reversed(history): | |
history_entry["files"] = [] | |
if "empty_layer" in history_entry: | |
continue | |
digest = history_entry["digest"] | |
# get the list of files in the layer | |
try: | |
files = subprocess.check_output(f"crane blob {image_url}@{digest} | tar -tz", shell=True).decode('utf-8').splitlines() | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") | |
sys.exit(1) | |
except json.JSONDecodeError as e: | |
print(f"Error: {e}") | |
# search for the search string in the list of files | |
for file in files: | |
if re.search(search, file): | |
history_entry["files"] = history_entry.get("files", []) | |
history_entry["files"].append(file) | |
created = get_creation_since(history_entry["created"]) | |
created_by = history_entry["created_by"] | |
size = formated_size(history_entry["size"]) | |
comment = history_entry.get("comment", "") | |
files = history_entry.get("files", []) | |
if files: | |
for file in files: | |
highlighted_file = re.sub(f"({search})", r"\033[91m\1\033[0m", file) | |
print(f"{created:<13.13} {created_by:<47.47} {str(size):<9.9} {comment:<15.15} {highlighted_file}") | |
created = '' # Clear created to only print it once | |
created_by = '' # Clear created_by to only print it once | |
size = '' # Clear size to only print it once | |
comment = '' # Clear comment to only print it once | |
else: | |
print(f"\033[90m{created:<13.13} {created_by:<47.47} {str(size):<9.9} {comment:<15.15}\033[0m") | |
if __name__ == "__main__": | |
if len(sys.argv) < 3: | |
print("Usage: layergrep.py <image_url> <search> [<platform>]") | |
sys.exit(1) | |
if len(sys.argv) == 4: | |
platform = sys.argv[3] | |
manifest = get_image_manifest_json(sys.argv[1], platform) | |
config = get_image_config_json(sys.argv[1], platform) | |
history = merge_manifest_into_history(manifest, config) | |
# search for a file in the layers | |
search_layer_for_file(sys.argv[1], history, sys.argv[2]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment