Skip to content

Instantly share code, notes, and snippets.

@cyrenity
Created October 26, 2024 12:16
Show Gist options
  • Save cyrenity/3bcb84628db5fe5e501c06c1ec05305d to your computer and use it in GitHub Desktop.
Save cyrenity/3bcb84628db5fe5e501c06c1ec05305d to your computer and use it in GitHub Desktop.
Reference implementation of backend websocket server for mod_audio_fork!
import asyncio
import soundfile as sf
import numpy as np
import json
import websockets
import os
import logging
import concurrent.futures
from websockets.exceptions import ConnectionClosedOK
from dotenv import load_dotenv
from deepgram import DeepgramClient, DeepgramClientOptions, LiveOptions, LiveTranscriptionEvents
# Load environment variables from .env file
load_dotenv()
# Constants
API_KEY = os.getenv("DG_API_KEY") # Deepgram API key from environment variable
# Initialize a ThreadPool for processing audio chunks
pool = concurrent.futures.ThreadPoolExecutor(max_workers=(os.cpu_count() or 1))
# Function to process incoming audio chunks
def process_chunk(message):
"""Convert incoming audio message bytes to a NumPy array."""
audio = np.frombuffer(message, np.int16) # Convert byte data to NumPy array of int16
return audio
async def handler(websocket, path):
"""Handle incoming WebSocket connections."""
print(f'Connection from {websocket.remote_address}') # Log the address of the incoming connection
# Create a Deepgram client with the provided API key
config = DeepgramClientOptions(verbose=logging.DEBUG)
deepgram = DeepgramClient(API_KEY, config)
# Create a websocket connection to Deepgram for live transcription
dg_connection = deepgram.listen.asyncwebsocket.v("1")
# Define the event handlers for the Deepgram connection
async def on_message(result, **kwargs):
"""Handle received transcription messages."""
response = {"type": "transcription", "data": "", "is_partial": True}
transcript = result.channel.alternatives[0].transcript
response['data'] = transcript
response['is_partial'] = not result.is_final
# If the transcription result is final, log it and send audio content
if len(transcript) > 0:
if result.is_final:
print(transcript) # Log the final transcript
# Mock audio content for demonstration; replace with actual audio generation logic
# audio_content = "AAAAAC8OsD0Pby0+wqx9Pjguoz73qcI+l1ncPkN57z4td/s+6vf/PiXZ/D6mMvI+olXgPkrKxz68S6k+d8KFPtJ6PD55p889kaiAPlj5zD5HlsU+Z9O8PtQkzT6C7/I+VYP0PvdpIT5b5Zg+oRjqPgJb2j6IjNw+FrHOPgk46T6pfpY+Vd6dPgRdwz5a+Io+8Xj9PjiQ5LI+n2XLPpGlyT5Ke8w+zUr8PnB79z2FFY4+XrmjPizbhj6m3Ks+8BbXPgjP7T7LgDs+7ivKPjeJ5T4uGFY+PPygPnt2wj63SPM+EpvJPvB7uT7OrpI+Su3jPxj93D1PXyA+txImPj8H/z7FjJE+d+ftPft3+z6evog+UOUCPphV8T7H2V8+nxwqPsRhPj6fR7c+MX6NPgXs0z7NTdc+dzfePnRcej7RLVU+LNzDPg5P0T5Vt7c+2ngXPmF27j2fHuw+91v3PoQG4T7vN+M+OPGXPntCgT6TTDg+IUBkPujU/j5m17E+Nzo8PlofzT4UGOU+JrrCPkXeIj7AoLw+jiFNPk1N2T5ueKg+P+FLPqMf9j5/B4Y+S7gkPg6Lsz0VW9I+Rm4FPtPT4z2SOds+F4qQPnAhzz0bl2s+gz0pPjhMyT6+GE4+7hAtPvgw2j4eBZM+5K1aPmK7+T1U/jU+G5OEPg3RAT5Y9B8+uRs4PncNAD7dxj4+In4CPk34/T5npe0+qEYtPoyf7T1hQpI+klFYPuLXBz5g0yY+g+eUPnzC7j3PtBY+cY2oPpmH/jyAWTc+ntntPiUk5T5HpCM+DW+HPg0p2T0rRRE+XLFfPhrgBz2UvmA+/zGyPswPRT5jRYY+aQtZPseRHj5Dt9U+Gb6gPn1M3z6d8aI9mY+JPqtX1T7f1xY+J71xPu/2/T09XEU+RZkvPmdwIz1F6EE+OTffPuV3AD2ljOc+Vr4gPovEAj4eI6g+uj2ePnOX0j0kaF0+u6F7PkWk8z07k4Q+/hVNPtMz7j2mrG8+XbEYPl63bT6K3S4+zAxNPqC+3z0UZZs+QtwvPuNx/z0ltmI+fTtBPuKNPT0NCpg+MYd+PgYL4z2In6E+ZUyIPmFaHj4z2nM+3a4gPkk59j3Pj4g+LkhhPlcUmz09VdM+14OnPl2wDz0H7c8+ffGAPhC07j1h7Kc+Ttm2Pjc7zz7zwoE+/ZIBPgOw3D2BLsk+x5EIPiRr9T2F9r8+3gTgPu/r/T3xKfo+b+MrPgDEbT1ezLo+qCdZPhOUAj5VKe8+N5N7PhmkKz2i5tM+pyglPo7P5D2MtVo+pgNnPo+Z3D1u15c9"
# Send audio content to the client via WebSocket
# await websocket.send(json.dumps({"type": "playAudio", "data": {"audioContentType": "raw", "sampleRate": 16000, "audioContent": audio_content, "textContent": "sign wave"}}))
else:
print(f"\033[1m{transcript}\033[0m", end='\r') # Log interim results
# Send transcription response to the client
await websocket.send(json.dumps(response))
async def on_metadata(metadata, **kwargs):
"""Handle received metadata messages."""
print("Metadata received: ", metadata)
async def on_error(error, **kwargs):
"""Handle errors."""
print("Error: ", error)
async def on_open(open, **kwargs):
"""Handle new connections."""
print(f"Connection opened: {open}")
async def on_speech_started(speech_started, **kwargs):
"""Handle speech detection."""
print(f"Speech started: {speech_started}")
# Register event handlers for Deepgram connection
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
dg_connection.on(LiveTranscriptionEvents.Open, on_open)
# Uncomment if needed
# dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
# dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
# Configure options for live transcription
options = LiveOptions(
model="nova-2", # Specify the model to use
language="en-US", # Set language for transcription
sample_rate=16000, # Audio sample rate
smart_format=True, # Enable smart formatting
encoding="linear16", # Specify encoding
punctuate=True, # Enable punctuation
channels=1, # Number of audio channels
interim_results=True, # Allow interim results
utterance_end_ms=1000, # Wait for 1 second of silence for utterance end
vad_events=True, # Enable voice activity detection events
endpointing=300 # Silence timeout before finalizing speech
)
# Start the connection to Deepgram
addons = {
"no_delay": "true" # Prevent waiting for additional numbers
}
print("\n\nStart talking! Press Ctrl+C to stop...\n")
if await dg_connection.start(options, addons=addons) is False:
print("Failed to connect to Deepgram")
return
# Variables to manage audio streaming
audio_data = np.array([]) # Initialize empty NumPy array for audio data
try:
while True:
# Receive audio message from WebSocket
message = await websocket.recv()
audio_chunk = process_chunk(message) # Process audio chunk
audio_data = np.concatenate((audio_data, audio_chunk)) # Append new data to audio array
# Convert audio data to bytes and send to Deepgram
await dg_connection.send(audio_data.tobytes())
except ConnectionClosedOK:
print("Connection closed.") # Log connection closure
finally:
await dg_connection.finish() # Ensure Deepgram connection is finished
async def main():
"""Main function to start the WebSocket server."""
server = await websockets.serve(handler, "localhost", 8000) # Start server on localhost:8000
print("WebSocket server running on ws://localhost:8000")
await server.wait_closed() # Keep server running
# Run the server using asyncio event loop
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment