Skip to content

Instantly share code, notes, and snippets.

@ajaymdesai
Forked from cobookman/index.html
Created January 20, 2018 00:34
Show Gist options
  • Save ajaymdesai/cf4bdc261e9f8b0ad5da5413fa78d4c7 to your computer and use it in GitHub Desktop.
Save ajaymdesai/cf4bdc261e9f8b0ad5da5413fa78d4c7 to your computer and use it in GitHub Desktop.
Speech Streaming with Python & GRPC
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<h2>Transcript</h2>
<div id="transcript"><?div>
<script>
var app = {
socket: null,
mediaTrack: null,
counter: 0,
bufferSize: 4096,
main: function() {
this.socket = new WebSocket("ws://35.188.93.150:80");
this.socket.addEventListener("open", this.onSocketOpen.bind(this));
this.socket.addEventListener("message", this.onSocketMessage.bind(this));
},
onSocketOpen: function(event) {
this.initRecorder();
console.log("onSocketOpen", event);
},
onSocketMessage: function(event) {
console.log("Message", event.data);
document.getElementById("transcript").innerHTML += "<p>" + event.data + "</p>"
},
shimAudioContext: function() {
try {
// Shims
window.AudioContext = window.AudioContext || window.webkitAudioContext;
navigator.getUserMedia = navigator.getUserMedia ||
navigator.webkitGetUserMedia ||
navigator.mozGetUserMedia ||
navigator.msGetUserMedia;
} catch (e) {
alert("Your browser is not supported");
return false;
}
if(!navigator.getUserMedia || !window.AudioContext) {
alert("Your browser is not supported");
return false;
}
return true;
},
initRecorder: function() {
// shim audio context
if (!this.shimAudioContext()) {
return;
}
return navigator.mediaDevices.getUserMedia({"audio": true, "video": false}).then((stream) => {
var context = new window.AudioContext();
// send metadata on audio stream to backend
this.socket.send(JSON.stringify({
rate: context.sampleRate,
language: "en-US",
format: "LINEAR16"
}));
// Caputure mic audio data into a stream
var audioInput = context.createMediaStreamSource(stream);
// only record mono audio w/a buffer of 2048 bits per function call
var recorder = context.createScriptProcessor(this.bufferSize, 1, 1);
// specify the processing function
recorder.onaudioprocess = this.audioProcess.bind(this);
// connect stream to our recorder
audioInput.connect(recorder);
// connect recorder to previous destination
recorder.connect(context.destination);
// store media track
this.mediaTrack = stream.getTracks()[0];
});
},
float32To16BitPCM: function(float32Arr) {
var pcm16bit = new Int16Array(float32Arr.length);
for(var i = 0; i < float32Arr.length; ++i) {
// force number in [-1,1]
var s = Math.max(-1, Math.min(1, float32Arr[i]));
/**
* convert 32 bit float to 16 bit int pcm audio
* 0x8000 = minimum int16 value, 0x7fff = maximum int16 value
*/
pcm16bit[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
}
return pcm16bit;
},
audioProcess: function(event) {
// only 1 channel as specified above.....
var float32Audio = event.inputBuffer.getChannelData(0) || new Flaot32Array(this.bufferSize);
var pcm16Audio = this.float32To16BitPCM(float32Audio);
this.socket.send(pcm16Audio.buffer);
}
};
app.main();
</script>
</body>
</html>
#!/usr/bin/python3
import asyncio
import websockets
import json
import io
import threading
import queue
from google.cloud import speech
from google.cloud.gapic.speech.v1 import speech_client
from google.cloud.proto.speech.v1 import cloud_speech_pb2
class StreamingRequest(object):
"""A Streaming Request iterable for speech api."""
def __init__(self, audio_stream, config):
"""Initializes the streaming request obj.
params:
audio_stream: An AudioStream obj
config: The protobuf configuration for api call
"""
self.audio_stream = audio_stream
self.config = config
self.is_first = True
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
"""Generate the next gRPC streaming api request."""
if self.audio_stream.closed:
return None
if self.is_first:
self.is_first = False
return cloud_speech_pb2.StreamingRecognizeRequest(
streaming_config=self.config)
# block until read some data or until stream closed
data = self.audio_stream.read()
while not self.audio_stream.closed and len(data) == 0:
data = self.audio_stream.read()
return cloud_speech_pb2.StreamingRecognizeRequest(
audio_content=data)
def results_to_dict(results):
if results is None:
return []
output = []
for result in results.results:
r = {}
r["stability"] = result.stability
r["is_final"] = False
if result.is_final:
r["is_final"] = True
r["alternatives"] = []
for alternative in result.alternatives:
r["alternatives"].append({
"transcript": alternative.transcript,
"confidence": alternative.confidence,
})
output.append(r)
return output
class AudioStream(io.BytesIO):
"""Read dumps latest unread written data."""
def read(self, n=None):
"""Reads up to `n` bytes."""
if not hasattr(self, "_position"):
self._position = 0
self.seek(self._position)
data = super(AudioStream, self).read(n)
self._position += len(data)
return data
class Transcoder(object):
"""Streaming Transcodes chunks of audio to text."""
def __init__(self, encoding, rate, language):
self.encoding = encoding
self.rate = rate
self.language = language
self.audio = AudioStream()
self.results = queue.Queue()
def start(self):
"""Start up streaming speech call."""
threading.Thread(target=self._process).start()
def write(self, data):
"""Send chunk of audio to speech api."""
self.audio.write(data)
def get_result(self):
"""Gets a result from the streaming api."""
try:
return self.results.get(False)
except:
return None
def _process(self):
"""sets up a streaming speech api request. And streams results into a queue."""
self.client = speech_client.SpeechClient()
self.config = cloud_speech_pb2.StreamingRecognitionConfig(
config=cloud_speech_pb2.RecognitionConfig(
encoding=self.encoding,
sample_rate_hertz=self.rate,
language_code=self.language),
interim_results=True)
requests = StreamingRequest(self.audio, self.config)
streaming_resp = self.client.streaming_recognize(iter(requests))
# This will block until self.audio is closed...which closes the streaming_recognize req
for resp in streaming_resp:
self.results.put(resp)
@asyncio.coroutine
def audioin(websocket, path):
# First message should be config
config = yield from websocket.recv()
if not isinstance(config, str):
print("ERROR, no config")
yield from websocket.send(
json.dumps({"error": "configuration not received as first message"}));
return;
config = json.loads(config)
transcoder = Transcoder(
encoding=config["format"],
rate=config["rate"],
language=config["language"],
)
# Start the transcoding
transcoder.start()
# Process incoming audio packets
while True:
data = yield from websocket.recv()
transcoder.write(data)
# Check for messages
result = transcoder.get_result()
result_dict= results_to_dict(result)
result_json = json.dumps(result_dict)
print(result_dict)
yield from websocket.send(result_json)
start_server = websockets.serve(audioin, "0.0.0.0", 80)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment