Created
July 19, 2024 02:01
-
-
Save ajikamaludin/dbe0a4515e64da03394cec564a3912a9 to your computer and use it in GitHub Desktop.
lambda-dog-function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from PIL import Image | |
from io import BytesIO | |
import boto3 | |
from chalice import Chalice, Response | |
from chalice.app import ConvertToMiddleware | |
from aws_lambda_powertools import Logger | |
from aws_lambda_powertools import Tracer | |
app = Chalice(app_name='random-image-function') | |
s3 = boto3.client('s3') | |
BUCKET_NAME = 'dog-image-bucket' | |
logger = Logger(service=app.app_name) | |
tracer = Tracer(service=app.app_name) | |
app.register_middleware(ConvertToMiddleware(logger.inject_lambda_context)) | |
app.register_middleware( | |
ConvertToMiddleware( | |
tracer.capture_lambda_handler(capture_response=True)) | |
) | |
@app.middleware('http') | |
def inject_route_info(event, get_response): | |
logger.structure_logs(append=True, request_path=event.path) | |
return get_response(event) | |
@tracer.capture_method | |
def fetch_random_image_url(): | |
response = requests.get('https://dog.ceo/api/breeds/image/random') | |
if response.status_code != 200: | |
raise Exception('Failed to fetch image URL') | |
image_data = response.json() | |
image_url = image_data.get('message') | |
if not image_url: | |
raise Exception('No image URL found in response') | |
return image_url | |
@tracer.capture_method(capture_response=False) | |
def fetch_image(image_url): | |
image_response = requests.get(image_url) | |
if image_response.status_code != 200: | |
raise Exception('Failed to fetch image') | |
return Image.open(BytesIO(image_response.content)) | |
@tracer.capture_method(capture_response=False) | |
def compress_image(img): | |
buffer = BytesIO() | |
img.save(buffer, format="JPEG", quality=20) | |
buffer.seek(0) | |
return buffer | |
@tracer.capture_method | |
def upload_to_s3(buffer, key): | |
s3.upload_fileobj(buffer, BUCKET_NAME, key) | |
@app.route('/') | |
def index(): | |
try: | |
# Fetch the random image URL | |
image_url = fetch_random_image_url() | |
# Fetch the actual image | |
img = fetch_image(image_url) | |
# Compress the image | |
buffer = compress_image(img) | |
# Save to S3 | |
key = 'compressed-image.jpg' | |
upload_to_s3(buffer, key) | |
return { | |
'message': image_url, | |
'status': 'success' | |
} | |
except Exception as e: | |
return Response(body={'error': str(e)}, | |
status_code=500)% |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requests | |
Pillow | |
boto3 | |
aws-xray-sdk | |
aws_lambda_powertools |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment