Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save weisisheng/1063091936d01f0e218e787f6d23087c to your computer and use it in GitHub Desktop.
Save weisisheng/1063091936d01f0e218e787f6d23087c to your computer and use it in GitHub Desktop.
Streamlining AWS Lambda with DuckDB for Dynamic Data Handling
import json
import os
import duckdb
import boto3
import datetime
from typing import Any, Dict
def construct_prepared_sql_and_params(sql_template, bind_params):
single_value_params = {k: v for k, v in bind_params.items() if not isinstance(v, list)}
list_params = {k: v for k, v in bind_params.items() if isinstance(v, list)}
for key in single_value_params.keys():
sql_template = sql_template.replace(f"{{{key}}}", "?")
for key, values in list_params.items():
placeholders = ', '.join(['?'] * len(values))
sql_template = sql_template.replace(f"{{{key}}}", placeholders)
flat_params = []
for key in bind_params:
if key in single_value_params:
flat_params.append(single_value_params[key])
elif key in list_params:
flat_params.extend(list_params[key])
return sql_template, flat_params
def serialize_dates(row):
for key, value in row.items():
if isinstance(value, datetime.date):
row[key] = value.strftime('%d-%m-%Y')
return row
def lambda_handler(event: Dict[str, Any], context):
print("Duckdb Lambda received event: " + json.dumps(event, indent=4))
file_name = event.get('file_name')
sql = event.get('sql')
bind_params = event.get('bind_params', {})
return_type = event.get('return_type', 'List')
s3 = boto3.client('s3')
bucket_name = os.getenv('BUCKET_NAME')
local_file_name = f"/tmp/{file_name}"
os.makedirs(os.path.dirname(local_file_name), exist_ok=True)
s3.download_file(bucket_name, file_name, local_file_name)
conn = duckdb.connect(database=':memory:', read_only=False)
prepared_sql = sql % local_file_name
try:
prepared_sql, flat_params = construct_prepared_sql_and_params(prepared_sql, bind_params)
print(prepared_sql)
print(f"flat_params -> {flat_params}")
cursor = conn.execute(prepared_sql, parameters=flat_params)
columns = [description[0] for description in cursor.description]
fetched_results = [dict(zip(columns, row)) for row in cursor.fetchall()]
fetched_results = [serialize_dates(row) for row in fetched_results]
if return_type == 'Single':
results = fetched_results[0] if fetched_results else {}
else:
results = fetched_results
finally:
conn.close()
print("Duckdb Lambda processing completed")
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment