Created
July 15, 2025 08:17
-
-
Save davidwtbuxton/6d81561833c088af9f00f3781f8d52d6 to your computer and use it in GitHub Desktop.
Fetching the deployed files for an App Engine standard app
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Install: gcloud SDK, and configure application default credentials. | |
https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login | |
Install: python -m pip install click google-api-python-client google-cloud-storage | |
Usage: | |
main.py versions --project=my-proj | |
main.py download --project=my-proj --version=my-version --service=default | |
N.B. The contents of deployed files cannot be downloaded after X days. But I | |
think the built containers are still available in the registry, not sure. | |
""" | |
import concurrent.futures | |
import itertools | |
import logging | |
import pathlib | |
import re | |
import click | |
from google.cloud import storage | |
from googleapiclient import discovery | |
from google.api_core import exceptions | |
def batched(iterable, n): | |
# batched('ABCDEFG', 3) → ABC DEF G | |
if n < 1: | |
raise ValueError('n must be at least one') | |
iterator = iter(iterable) | |
while batch := tuple(itertools.islice(iterator, n)): | |
yield batch | |
def parse_storage_url(url): | |
"""The bucket and object name for a storage URL.""" | |
_, _, path = url.partition("https://storage.googleapis.com/") | |
bucket, _, name = path.partition("/") | |
# Not sure if the name needs to be unquoted too. | |
return bucket, name | |
def fetch_blob(client, url, dest): | |
"""Download a GCS object to a local file.""" | |
# Using the client handles all the auth stuff. | |
bucket, name = parse_storage_url(url) | |
blob = client.bucket(bucket).blob(name) | |
def _download(dest): | |
try: | |
blob.download_to_filename(dest) | |
except exceptions.NotFound as err: | |
logging.warn("%s", err) | |
try: | |
_download(dest) | |
except FileNotFoundError: | |
pathlib.Path(dest).parent.mkdir(parents=True, exist_ok=True) | |
_download(dest) | |
def fetch_blob_batch(data): | |
"""Download multiple GCS objects. | |
This is for use with parallel tasks. | |
""" | |
project, urls_dests = data | |
client = storage.Client(project=project) | |
for url, dest in urls_dests: | |
print(f"{url} -> {dest}") | |
fetch_blob(client, url, dest) | |
def new_admin_client(): | |
"""Create a client for the App Engine Admin API.""" | |
return discovery.build("appengine", "v1") | |
@click.group() | |
def main(): | |
pass | |
@main.command() | |
@click.option("--project", required=True) | |
@click.option("--service", default="default") | |
def versions(project, service): | |
"""List deployed App Engine versions in a service.""" | |
client = new_admin_client() | |
request = ( | |
client.apps().services().versions().list(appsId=project, servicesId=service) | |
) | |
response = request.execute() | |
for v in response['versions']: | |
print(f"{v['id']} {v['createTime']} {v['createdBy']}") | |
@main.command() | |
@click.option("--project", required=True) | |
@click.option("--service", default="default") | |
@click.option("--version", required=True) | |
@click.option('--out', default=".") | |
@click.option('--filter', 'filter_', default=r'.*') | |
def download(project, service, version, out, filter_): | |
"""Download the files for a deployed version.""" | |
client = new_admin_client() | |
out = pathlib.Path(out) | |
kwargs = { | |
"appsId": project, | |
"servicesId": service, | |
"versionsId": version, | |
"view": "FULL", | |
} | |
request = client.apps().services().versions().get(**kwargs) | |
response = request.execute() | |
batch_size = 50 | |
process_data = [] | |
for batch in batched(response["deployment"]["files"].items(), batch_size): | |
urls_dests = tuple((data["sourceUrl"], out / dest) for dest, data in batch) | |
# Regular expression filter on filenames. Default matches all. | |
urls_dests = tuple((u, d) for u, d in urls_dests if re.search(filter_, str(d))) | |
process_data.append((project, urls_dests)) | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
executor.map(fetch_blob_batch, process_data) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment