Skip to content

Instantly share code, notes, and snippets.

@PaulDuvall
Last active February 3, 2025 23:05
Show Gist options
  • Save PaulDuvall/65ddf5436e883ea96410b9a8c25504d4 to your computer and use it in GitHub Desktop.
Save PaulDuvall/65ddf5436e883ea96410b9a8c25504d4 to your computer and use it in GitHub Desktop.
aws-case-study-generator-main
"""
Case Study Generator - Automated tool for generating case studies from audio transcripts
using AWS services and AI.
"""
from __future__ import annotations
import json
import mimetypes
import os
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import boto3
import requests
import structlog
from botocore.config import Config
from botocore.exceptions import ClientError
from dotenv import load_dotenv
from gtts import gTTS
from pydantic import Field, validator
from pydantic_settings import BaseSettings
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer
from tenacity import retry, stop_after_attempt, wait_exponential
# Load environment variables test
load_dotenv()
class Settings(BaseSettings):
"""Application settings with environment variable support."""
aws_region: str = Field(default="us-east-1", env="AWS_REGION")
bedrock_model: str = Field(default="anthropic.claude-v2:1", env="BEDROCK_MODEL")
max_retries: int = Field(default=5, env="MAX_RETRIES")
base_delay: int = Field(default=5, env="BASE_DELAY")
s3_bucket: str = Field(..., env="S3_BUCKET")
dynamodb_table: str = Field(..., env="DYNAMODB_TABLE")
request_timeout: int = Field(default=30, env="REQUEST_TIMEOUT")
max_file_size_mb: int = Field(default=100, env="MAX_FILE_SIZE_MB")
@validator("max_retries")
def validate_max_retries(cls, v):
if v < 1:
raise ValueError("max_retries must be at least 1")
return v
@validator("base_delay")
def validate_base_delay(cls, v):
if v < 1:
raise ValueError("base_delay must be at least 1")
return v
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()
# Initialize structured logging
logger = structlog.get_logger()
# Configure AWS clients with timeouts
aws_config = Config(
region_name=settings.aws_region,
retries={"max_attempts": settings.max_retries, "mode": "adaptive"},
connect_timeout=settings.request_timeout,
read_timeout=settings.request_timeout
)
# Initialize AWS clients
sts = boto3.client("sts", config=aws_config)
s3 = boto3.client("s3", config=aws_config)
transcribe = boto3.client("transcribe", config=aws_config)
bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
config=aws_config
)
@dataclass
class S3Location:
"""Data class for S3 bucket and object information."""
bucket_name: str
object_key: str
uri: str
@classmethod
def from_path(cls, bucket: str, file_path: str) -> S3Location:
"""Create S3Location from bucket and file path."""
object_key = Path(file_path).name
return cls(
bucket_name=bucket,
object_key=object_key,
uri=f"s3://{bucket}/{object_key}"
)
def validate_audio_file(file_path: str) -> None:
"""Validate audio file type and size."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Audio file not found: {file_path}")
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith('audio/'):
raise ValueError(f"Invalid file type: {mime_type}. Expected audio file.")
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > settings.max_file_size_mb:
raise ValueError(f"File size ({file_size_mb:.2f}MB) exceeds limit of {settings.max_file_size_mb}MB")
@retry(
stop=stop_after_attempt(settings.max_retries),
wait=wait_exponential(multiplier=settings.base_delay)
)
def get_or_create_s3_bucket() -> str:
"""Ensures an S3 bucket exists with proper configuration."""
try:
account_id = sts.get_caller_identity()["Account"]
bucket_name = f"casestudy-{account_id}"
try:
s3.head_bucket(Bucket=bucket_name)
logger.info("bucket.exists", bucket=bucket_name)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code in ("404", "NoSuchBucket"):
logger.info("bucket.creating", bucket=bucket_name)
s3.create_bucket(Bucket=bucket_name)
bucket_policy = {
"Version": "2012-10-17",
"Statement": [{
"Sid": "AllowTranscribeAccess",
"Effect": "Allow",
"Principal": {"Service": "transcribe.amazonaws.com"},
"Action": ["s3:GetObject", "s3:ListBucket"],
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*"
]
}]
}
s3.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(bucket_policy)
)
logger.info("bucket.policy_updated", bucket=bucket_name)
else:
raise
return bucket_name
except Exception as e:
logger.error("bucket.error", error=str(e))
raise
def upload_audio_to_s3(file_path: str, bucket_name: str) -> S3Location:
"""Upload audio file to S3 with progress tracking."""
try:
validate_audio_file(file_path)
location = S3Location.from_path(bucket_name, file_path)
s3.upload_file(
file_path,
location.bucket_name,
location.object_key
)
logger.info("audio.uploaded", location=location.uri)
return location
except Exception as e:
logger.error("upload.failed", error=str(e))
raise
def monitor_transcription_job(job_name: str) -> str:
"""
Monitor transcription job progress.
Args:
job_name: Name of the transcription job
Returns:
str: The transcript text
"""
max_retries = settings.max_retries
base_delay = settings.base_delay
for attempt in range(max_retries):
try:
response = transcribe.get_transcription_job(TranscriptionJobName=job_name)
status = response['TranscriptionJob']['TranscriptionJobStatus']
logger.info("transcription.status", job=job_name, status=status)
if status == 'COMPLETED':
transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
response = requests.get(transcript_uri)
if response.status_code != 200:
raise Exception(f"Failed to get transcript: {response.status_code}")
try:
transcript_json = response.json()
logger.info("transcription.completed", job=job_name)
return transcript_json['results']['transcripts'][0]['transcript']
except json.JSONDecodeError:
# Try to decode as string if JSON fails
try:
text = response.text
# Extract transcript from text response
import re
match = re.search(r'"transcript"\s*:\s*"([^"]+)"', text)
if match:
return match.group(1)
else:
raise Exception("Could not find transcript in response")
except UnicodeDecodeError:
raise Exception("Could not decode response as UTF-8")
elif status == 'FAILED':
failure_reason = response['TranscriptionJob'].get('FailureReason', 'Unknown error')
raise Exception(f"Transcription failed: {failure_reason}")
time.sleep(base_delay * (2 ** attempt))
except Exception as e:
if attempt == max_retries - 1:
raise Exception(f"Failed to get transcription after {max_retries} attempts: {str(e)}")
time.sleep(base_delay * (2 ** attempt))
raise Exception(f"Transcription timed out after {max_retries} attempts")
def start_transcription_job(audio_location: S3Location) -> str:
"""Start and monitor an Amazon Transcribe job."""
try:
# Generate a shorter job name using first 8 chars of a UUID
job_name = f"j{uuid.uuid4().hex[:8]}"
# Delete any existing job with the same name
try:
transcribe.delete_transcription_job(
TranscriptionJobName=job_name
)
except ClientError as e:
if "The requested job couldn't be found" not in str(e):
raise
logger.warning("transcription.delete_failed", error=str(e), job=job_name)
# Start transcription job
logger.info("transcription.started", job=job_name)
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': audio_location.uri},
MediaFormat='mp3',
LanguageCode='en-US'
)
return job_name
except Exception as e:
logger.error("transcription.error", error=str(e))
raise
def generate_case_study(transcript: str) -> str:
"""Generate a case study using Claude 2.1 via Amazon Bedrock."""
try:
if not transcript.strip():
raise ValueError("Empty transcript provided")
prompt = f"""Human: You are a professional case study writer. Generate a detailed case study based on the following transcript. The case study should include:
1. Executive Summary
2. Background and Context
3. Key Challenges
4. Solution Implementation
5. Results and Impact
6. Lessons Learned
7. Recommendations
Here's the transcript:
{transcript}
Please format the case study in a clear, professional manner with proper sections and paragraphs.
Assistant: """
request_body = {
"prompt": prompt,
"max_tokens_to_sample": 2048,
"temperature": 0.7,
"top_p": 0.9,
"anthropic_version": "bedrock-2023-05-31"
}
response = bedrock_runtime.invoke_model(
modelId=settings.bedrock_model,
body=json.dumps(request_body),
contentType="application/json",
accept="application/json"
)
response_body = json.loads(response.get("body").read())
case_study_text = response_body.get("completion", "").strip()
if not case_study_text:
raise ValueError("No text generated by the model")
logger.info(
"case_study.generated",
length=len(case_study_text),
preview=case_study_text[:100]
)
return case_study_text
except Exception as e:
logger.error("case_study.error", error=str(e))
raise
def save_as_pdf(text: str, output_file: str = "case_study.pdf") -> None:
"""Save the case study as a professionally formatted PDF."""
try:
logger.info("pdf.generating", text_length=len(text))
doc = SimpleDocTemplate(
output_file,
pagesize=letter,
rightMargin=72,
leftMargin=72,
topMargin=72,
bottomMargin=72
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
spaceAfter=30
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=14,
spaceBefore=20,
spaceAfter=10
)
body_style = ParagraphStyle(
'CustomBody',
parent=styles['Normal'],
fontSize=11,
leading=14
)
story = []
# Add title
story.append(Paragraph("Case Study", title_style))
story.append(Spacer(1, 12))
# Process text by sections
sections = text.split('\n\n')
for section in sections:
if section.strip():
if any(section.startswith(heading) for heading in
['Executive Summary', 'Background', 'Key Challenges',
'Solution', 'Results', 'Lessons', 'Recommendations']):
story.append(Paragraph(section.split('\n')[0], heading_style))
content = '\n'.join(section.split('\n')[1:])
if content.strip():
story.append(Paragraph(content, body_style))
else:
story.append(Paragraph(section, body_style))
story.append(Spacer(1, 12))
doc.build(story)
logger.info("pdf.completed", output_file=output_file)
except Exception as e:
logger.error("pdf.error", error=str(e))
raise
def generate_test_audio() -> str:
"""Generate a test audio file using gTTS."""
transcript = """Zoom Call Transcript:
Interviewer (Director at CloudMed Solutions):
Welcome, and thank you for joining us today. We're excited to dive into our work with MedCore Innovations, a Fortune 100 leader in healthcare and medical device manufacturing. As you know, CloudMed Solutions is proud to help organizations like MedCore transition to AWS with the right mix of security, scalability, and innovation.
Respondent (Engagement Manager at CloudMed Solutions):
MedCore faced several significant challenges with their legacy infrastructure. Scalability was a huge issue—they couldn't handle the growing demand and fluctuating workloads. Their existing systems were costly to maintain, tying up resources that could've been reinvested in innovation. They also needed to maintain strict compliance with HIPAA.
We implemented several AWS services including Amazon EC2 & Auto Scaling for compute power, Amazon S3 & AWS Glue for data storage and ETL, Amazon RDS for HIPAA-compliant databases, AWS Lambda for serverless computing, and Amazon GuardDuty & AWS WAF for security.
The results were impressive:
- 40% reduction in infrastructure costs
- 99.99% uptime achievement
- Deployment times reduced from days to hours
- 30% increase in customer satisfaction
- Enhanced security and HIPAA compliance
- Successful integration of predictive analytics using Amazon SageMaker
This transformation has set the stage for MedCore's future growth. They now have the infrastructure and agility needed to expand globally, launch new products quickly, and respond to evolving healthcare needs."""
try:
output_file = "test_audio.mp3"
gTTS(text=transcript, lang='en', slow=False).save(output_file)
logger.info("test_audio.generated", file=output_file)
return output_file
except Exception as e:
logger.error("test_audio.error", error=str(e))
raise
def main(audio_file_path: Optional[str] = None) -> None:
"""Main workflow to process audio and generate case studies."""
try:
if not audio_file_path:
audio_file_path = generate_test_audio()
logger.info("test_audio.generated", file=audio_file_path)
logger.info("workflow.starting", audio_file=audio_file_path)
bucket_name = get_or_create_s3_bucket()
audio_location = upload_audio_to_s3(audio_file_path, bucket_name)
job_name = start_transcription_job(audio_location)
transcript = monitor_transcription_job(job_name)
case_study = generate_case_study(transcript)
save_as_pdf(case_study)
logger.info("workflow.completed", output_file="case_study.pdf")
except Exception as e:
logger.error("workflow.failed", error=str(e))
raise
if __name__ == "__main__":
import sys
audio_path = sys.argv[1] if len(sys.argv) > 1 else None
main(audio_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment