Skip to content

Instantly share code, notes, and snippets.

@odashi
Created June 17, 2025 13:45
Show Gist options
  • Save odashi/4305fbe791719b4469fa76849449ac7e to your computer and use it in GitHub Desktop.
Save odashi/4305fbe791719b4469fa76849449ac7e to your computer and use it in GitHub Desktop.
"""Example implementation of Cloud Storage proxy app."""
import logging
import os
import tempfile
from typing import Any, Protocol, TypeAlias
import fastapi
import fastapi.responses
import google.cloud.exceptions # type: ignore[import]
import google.cloud.logging # type: ignore[import]
import google.cloud.storage # type: ignore[import]
import uvicorn # type: ignore[import]
def _running_on_gcp() -> bool:
"""Checks if the runtime is on Google Cloud or not."""
return (
os.environ.get("K_SERVICE") is not None # Cloud Run
or os.environ.get("FUNCTION_TARGET") is not None # Cloud Functions
)
def setup_logging() -> None:
"""Set up common logging configs."""
if _running_on_gcp():
google.cloud.logging.Client().setup_logging()
logging.basicConfig(level=logging.INFO)
ASGIDict: TypeAlias = dict[str, Any]
class ASGIReceive(Protocol):
@staticmethod
async def __call__() -> ASGIDict:
...
class ASGISend(Protocol):
@staticmethod
async def __call__(event: ASGIDict) -> None:
...
class ASGIApplication(Protocol):
@staticmethod
async def __call__(scope: ASGIDict, receive: ASGIReceive, send: ASGISend) -> None:
...
ASGIApplicationLike: TypeAlias = ASGIApplication | fastapi.FastAPI
def create_app(*, add_probe: bool = True) -> fastapi.FastAPI:
"""Helper function to create a FastAPI server.
Args:
add_probe: Whether to add startup/liveness probe or not.
Returns:
A FastAPI object.
"""
app = fastapi.FastAPI(docs_url=None, openapi_url=None, redoc_url=None)
if add_probe:
# Inserts startup/liveness probe endpoint.
@app.get("/ready")
async def ready() -> fastapi.Response:
return fastapi.Response(b"", status_code=200, media_type="text/plain")
return app
def run_server(
app: ASGIApplicationLike,
port: int,
) -> None:
"""Runs Uvicorn server with the given application.
Args:
app: ASGI application to serve.
port: Port number to listen.
"""
setup_logging()
uvicorn.run(app, host="0.0.0.0", port=port, server_header=False)
_logger = logging.getLogger(__name__)
def _path_rewrite(filepath: str) -> str:
"""URL rewrite for the directory index.
Args:
filepath: Path to rewrite.
Returns:
Updated path.
"""
if not filepath or filepath[-1] == "/":
return filepath + "index.html"
return filepath
def _download_http404_content(
bucket: google.cloud.storage.Bucket,
path: str | None,
) -> tuple[str, str]:
"""Download 404 content to the local disk.
Args:
bucket: Cloud Storage bucket.
path: Path to the Cloud Storage object, or None if default page is used.
Returns:
Tuple of the following values:
- Path to the local file containing the 404 content.
- Content type of the 404 content.
"""
fd, filename = tempfile.mkstemp()
os.close(fd)
if path is not None:
blob = bucket.blob(path)
blob.download_to_filename(filename)
return filename, blob.content_type
else:
# Generates the default page.
with open(filename, "w") as fp:
fp.write("""{"detail":"Not Found"}""")
return filename, "application/json"
def create_app(bucket_name: str, http404_path: str | None = None) -> fastapi.FastAPI:
"""Creates a proxy server.
Args:
bucket_name: Cloud Storage bucket name.
http404_path: If not None, try to retrieve this content as the not found page.
Returns:
FastAPI object representing the server.
"""
app = create_app()
bucket = google.cloud.storage.Client().bucket(bucket_name)
http404_filename, http404_content_type = _download_http404_content(
bucket, http404_path
)
@app.get("/{filepath:path}")
def get_object(
filepath: str,
background: fastapi.BackgroundTasks,
) -> fastapi.responses.FileResponse:
"""Accepts every paths and locates the corresponding file.
Args:
path: Query path.
background: Background tasks.
Returns:
FileResponse with corresponding data.
Raises:
fastapi.HTTPException: File not found.
Exception: Something else went wrong.
"""
filepath = _path_rewrite(filepath)
fd, temp_filename = tempfile.mkstemp()
os.close(fd)
def remove_file() -> None:
os.remove(temp_filename)
try:
blob = bucket.blob(filepath)
blob.download_to_filename(temp_filename)
background.add_task(remove_file)
return fastapi.responses.FileResponse(
path=temp_filename,
media_type=blob.content_type,
)
except google.cloud.exceptions.NotFound:
remove_file()
_logger.warning(f"status=404 {filepath=}")
return fastapi.responses.FileResponse(
path=http404_filename,
status_code=404,
media_type=http404_content_type,
)
except Exception as e:
remove_file()
_logger.error(f"status=500 {filepath=} {e}")
raise fastapi.HTTPException(status_code=500)
return app
if __name__ == "__main__":
bucket = os.environ["BUCKET"]
http404_path = os.environ.get("HTTP404_PATH")
port = int(os.environ["PORT"])
run_server(create_app(bucket, http404_path), port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment