Created
December 9, 2024 16:48
-
-
Save almet/de10e2b258df5a666c94fbb91be7e315 to your computer and use it in GitHub Desktop.
Get an attestation from a container registry with python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "requests>=2.31.0", | |
# ] | |
# /// | |
import requests | |
import json | |
import hashlib | |
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" | |
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" | |
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" | |
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" | |
DEFAULT_REGISTRY = "ghcr.io" | |
DEFAULT_ORG = "almet" | |
DEFAULT_IMAGE = "dangerzone/dangerzone" | |
class RegistryClient: | |
def __init__(self, registry=DEFAULT_REGISTRY, org=DEFAULT_ORG, image=DEFAULT_IMAGE): | |
"""Initialize the GHCR client for public images""" | |
self._registry = registry | |
self._org = org | |
self._image = image | |
self._auth_token = None | |
self._base_url = f"https://{registry}" | |
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}" | |
def get_auth_token(self): | |
if not self._auth_token: | |
auth_url = f"{self._base_url}/token" | |
response = requests.get( | |
auth_url, | |
params={ | |
"service": f"{self._registry}", | |
"scope": f"repository:{self._org}/{self._image}:pull", | |
}, | |
) | |
response.raise_for_status() | |
self._auth_token = response.json()["token"] | |
return self._auth_token | |
def get_manifest(self, tag, extra_headers=None): | |
"""Get manifest information for a specific tag""" | |
manifest_url = f"{self._image_url}/manifests/{tag}" | |
headers = { | |
"Accept": "application/vnd.docker.distribution.manifest.v2+json", | |
"Authorization": f"Bearer {self.get_auth_token()}", | |
} | |
if extra_headers: | |
headers.update(extra_headers) | |
response = requests.get(manifest_url, headers=headers) | |
response.raise_for_status() | |
return response | |
def list_manifest(self, tag): | |
return self.get_manifest( | |
tag, {"Accept": "application/vnd.oci.image.index.v1+json"} | |
) | |
def get_blob(self, hash): | |
url = f"{self._image_url}/blobs/{hash}" | |
response = requests.get( | |
url, | |
headers={ | |
"Authorization": f"Bearer {self.get_auth_token()}", | |
}, | |
) | |
response.raise_for_status() | |
return response | |
def get_attestation(self, tag): | |
"""Get an attestation for a given manifest.""" | |
print(f"Get the manifest from {tag}") | |
manifest = self.get_manifest(tag) | |
# The attestation is available on the same container registry, with a | |
# specific tag. The format of the tag is "sha256-{sha256(manifest)}" | |
manifest_hash = hashlib.sha256(manifest.content).hexdigest() | |
# This will get us a "list" of manifests... | |
print(f"Get the list at {manifest_hash}") | |
try: | |
manifest_list = self.list_manifest(f"sha256-{manifest_hash}") | |
except requests.exceptions.RequestException: | |
raise Exception( | |
"Unable to get the attestation from the registry. Does it exists?" | |
) | |
# ... from which we want the sigstore bundle | |
blob_manifest_digest = None | |
print(f"{manifest_list.json()}") | |
for manifest in manifest_list.json()["manifests"]: | |
if manifest.get("artifactType", "") == SIGSTORE_BUNDLE: | |
blob_manifest_digest = manifest.get("digest") | |
if not blob_manifest_digest: | |
raise Exception("Not Found !") | |
# From there, we will get the attestation in a blob. | |
# It will be the first layer listed at this manifest hash location | |
print(f"Blob manifest digest {blob_manifest_digest}") | |
extra_headers = {"Accept": OCI_IMAGE_MANIFEST} | |
layers = ( | |
self.get_manifest(blob_manifest_digest, extra_headers).json().get("layers") | |
) | |
blob_hash = None | |
for layer in layers: | |
if layer.get("mediaType") == SIGSTORE_BUNDLE: | |
blob_hash = layer.get("digest") | |
if blob_hash: | |
resp = self.get_blob(blob_hash) | |
print(resp) | |
def save_file(data: dict, filename: str): | |
with open(filename, "w") as f: | |
f.write(data) | |
def main(): | |
try: | |
client = RegistryClient("ghcr.io", "almet", "dangerzone/dangerzone") | |
client.get_attestation("latest") | |
except requests.exceptions.RequestException as e: | |
print(f"Error accessing GHCR: {e}") | |
except json.JSONDecodeError as e: | |
print(f"Error parsing response: {e}") | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment