Last active
August 5, 2022 03:32
-
-
Save ganbaaelmer/530a5cb1cfad0eb7fe046afbb0a0cd4e to your computer and use it in GitHub Desktop.
app.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, render_template, Response, jsonify, request | |
from camera import VideoCamera | |
import pyaudio | |
app = Flask(__name__) | |
############ Audio recording section ############ | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 2 | |
RATE = 44100 | |
CHUNK = 1024 | |
RECORD_SECONDS = 5 | |
audio1 = pyaudio.PyAudio() | |
def genHeader(sampleRate, bitsPerSample, channels): | |
datasize = 2000*10**6 | |
o = bytes("RIFF",'ascii') # (4byte) Marks file as RIFF | |
o += (datasize + 36).to_bytes(4,'little') # (4byte) File size in bytes excluding this and RIFF marker | |
o += bytes("WAVE",'ascii') # (4byte) File type | |
o += bytes("fmt ",'ascii') # (4byte) Format Chunk Marker | |
o += (16).to_bytes(4,'little') # (4byte) Length of above format data | |
o += (1).to_bytes(2,'little') # (2byte) Format type (1 - PCM) | |
o += (channels).to_bytes(2,'little') # (2byte) | |
o += (sampleRate).to_bytes(4,'little') # (4byte) | |
o += (sampleRate * channels * bitsPerSample // 8).to_bytes(4,'little') # (4byte) | |
o += (channels * bitsPerSample // 8).to_bytes(2,'little') # (2byte) | |
o += (bitsPerSample).to_bytes(2,'little') # (2byte) | |
o += bytes("data",'ascii') # (4byte) Data Chunk Marker | |
o += (datasize).to_bytes(4,'little') # (4byte) Data size in bytes | |
return o | |
@app.route('/audio') | |
def audio(): | |
# start audio Recording | |
def sound(): | |
CHUNK = 1024 | |
sampleRate = 44100 | |
bitsPerSample = 16 | |
channels = 2 | |
wav_header = genHeader(sampleRate, bitsPerSample, channels) | |
stream = audio1.open(format=FORMAT, channels=CHANNELS, | |
rate=RATE, input=True,input_device_index=1, | |
frames_per_buffer=CHUNK) | |
print("recording...") | |
#frames = [] | |
first_run = True | |
while True: | |
if first_run: | |
data = wav_header + stream.read(CHUNK) | |
first_run = False | |
else: | |
data = stream.read(CHUNK) | |
yield(data) | |
return Response(sound()) | |
############ Video recording section ############ | |
video_camera = None | |
global_frame = None | |
@app.route('/') | |
def index(): | |
return render_template('index.html') | |
@app.route('/record_status', methods=['POST']) | |
def record_status(): | |
global video_camera | |
if video_camera == None: | |
video_camera = VideoCamera() | |
json = request.get_json() | |
status = json['status'] | |
if status == "true": | |
video_camera.start_record() | |
return jsonify(result="started") | |
else: | |
video_camera.stop_record() | |
return jsonify(result="stopped") | |
def video_stream(): | |
global video_camera | |
global global_frame | |
if video_camera == None: | |
video_camera = VideoCamera() | |
while True: | |
frame = video_camera.get_frame() | |
if frame != None: | |
global_frame = frame | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') | |
else: | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + global_frame + b'\r\n\r\n') | |
@app.route('/video_viewer') | |
def video_viewer(): | |
return Response(video_stream(), | |
mimetype='multipart/x-mixed-replace; boundary=frame') | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', debug=True, threaded=True,port=9000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment