Created
April 19, 2025 03:31
-
-
Save jalakoo/9716e16d61160d46a8ab1378031bd92b to your computer and use it in GitHub Desktop.
Simple FastAPI server that takes a list of URLs for audio files, extract entities using AssemblyAI and uploads it to Neo4j.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import assemblyai as aai | |
from neo4j import GraphDatabase | |
from dotenv import load_dotenv | |
import os | |
# Load environment variables | |
load_dotenv() | |
# Load credentials from environment variables | |
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
app = FastAPI() | |
class AudioFilesRequest(BaseModel): | |
audio_files: list[str] | |
@app.post("/upload_entities") | |
async def upload_entities(request: AudioFilesRequest): | |
try: | |
print(f"Processing {len(request.audio_files)} audio files") | |
# Process each audio file | |
for audio_file in request.audio_files: | |
# Create transcription config | |
config = aai.TranscriptionConfig(entity_detection=True) | |
# Transcribe audio | |
transcript = aai.Transcriber().transcribe(audio_file, config) | |
# Upload entities to Neo4j | |
upload_entities_to_neo4j(audio_file, transcript.entities) | |
return {"message": "Files processed successfully"} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def upload_entities_to_neo4j(source, entities): | |
print(f"Uploading {len(entities)} entities from {source}") | |
# Load Neo4j credentials from environment variables | |
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687") | |
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") | |
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
# Create a list of all entities to process | |
entity_data = [ | |
{ | |
"text": entity.text, | |
"labels": [entity.entity_type, "_entity"], | |
"start": entity.start, | |
"end": entity.end | |
} | |
for entity in entities | |
] | |
with GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver: | |
driver.execute_query( | |
""" | |
MERGE (d:_source {name: $source}) | |
WITH d, $entities AS entities | |
UNWIND entities AS entity | |
MERGE (n:$(entity.labels) {name: entity.text}) | |
MERGE (d)-[:MENTIONS]->(n) | |
MERGE (s:_timestamp {name: entity.start}) | |
MERGE (n)-[:STARTS_AT]->(s) | |
MERGE (n)-[:ENDS_AT]->(e:_timestamp {name: entity.end}) | |
MERGE (s)<-[:AT]-(d) | |
MERGE (e)<-[:AT]-(d) | |
""", | |
source=source, | |
entities=entity_data | |
) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment