Skip to content

Instantly share code, notes, and snippets.

@Chen-Han
Created December 29, 2024 21:33
Show Gist options
  • Save Chen-Han/2318e464b2c9a509d0c26567973e9470 to your computer and use it in GitHub Desktop.
Save Chen-Han/2318e464b2c9a509d0c26567973e9470 to your computer and use it in GitHub Desktop.
partial program for playing video programmatically to virtual output
"""
Play video graphics and audio to respective virtual output
"""
import time
import typing
import cv2
import collections
import moviepy.editor as mp
import queue
import sys
import threading
import pyvirtualcam
import log
import sounddevice as sd
import numpy as np
from rich import progress
from args import args
from scene import Scene
logger = log.base_logger.getChild("video_player")
event = threading.Event()
# class StreamSceneQueue:
# def __init__(self):
# self.current_scene = None
# pass
# def get(self):
# return self.current_scene
TIMEOUT_SCENE = 2
class VideoStream:
def __init__(self, scene_queue):
self.scene_queue = scene_queue
self.stop_event = threading.Event()
self.video_near_end_callback = None
self.width = 1620
self.height = 1080
self.fps = 30
self._frame_buffer = queue.Queue(maxsize=2)
self._abort_queue = queue.Queue()
def _play_from_buffer(self):
with pyvirtualcam.Camera(self.width,
self.height, self.fps,
fmt=pyvirtualcam.PixelFormat.BGR,
print_fps=args.print_fps) as cam:
frame = self._frame_buffer.get(TIMEOUT_SCENE)
cam.send(frame) # send first frame manually
last_frame = frame
while not self.stop_event.is_set():
try:
frame = self._frame_buffer.get_nowait()
last_frame = frame
except queue.Empty:
frame = last_frame
cam.sleep_until_next_frame()
cam.send(frame)
def start(self):
self._play_thread = threading.Thread(target=self._play_from_buffer, daemon=True)
self._play_thread.start()
self._consume_thread = threading.Thread(
target=self._consume_scenes, daemon=True)
self._consume_thread.start()
def stop(self):
self.stop_event.set()
def abort_current(self):
# clear all frames and the remaining of the current scene
self._abort_queue.put(True)
def on_video_near_end(self, callback):
self.video_near_end_callback = callback
def _consume_scenes(self):
with progress.Progress(
progress.SpinnerColumn(),
*progress.Progress.get_default_columns(),
progress.TimeElapsedColumn(),
transient=True,
) as progress_bar:
current_scene_task = None
while True:
logger.debug("vs getting scene")
scn: Scene = self.scene_queue.get(timeout=TIMEOUT_SCENE)
logger.debug("vs done getting scene")
logger.debug("scene %s", scn)
video = cv2.VideoCapture(scn.filename)
length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if current_scene_task!=None:
progress_bar.remove_task(current_scene_task)
current_scene_task = progress_bar.add_task(f"[green]{scn.name}", total=length)
video.set(cv2.CAP_PROP_POS_FRAMES, 0)
if not video.isOpened():
raise RuntimeError("error opening video")
logger.debug("opened video %s length %d", video, length)
frame_not_found = False
for index in range(length):
if self.stop_event.isSet():
video.release()
return
try:
# we might not play the last frame but thats ok
self._abort_queue.get_nowait()
break
except queue.Empty:
pass
_, frame = video.read()
if frame is None:
frame_not_found = True
logger.debug("frame index(0-based) %d length %d, cannot find frame", index, length)
break
self._frame_buffer.put(frame, timeout=1) # this might block for the duration of 1 frame
progress_bar.update(current_scene_task, advance=1)
if frame_not_found or index == length -1: # consider frame not found as finished
# finished putting all frames to buffer
if self.video_near_end_callback:
self.video_near_end_callback()
video.release() # release video obj
video = None
NCHANNELS = 1
NFRAMES = 1500
def _get_audio_callback_handler(q_audio, stop_event:threading.Event):
def _audio_callback(outdata, frames, time, status):
if stop_event.isSet():
raise sd.CallbackAbort
assert frames == NFRAMES
if status.output_underflow:
logger.error('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
# data = q_audio.get_nowait()
data = q_audio.get_nowait()
except queue.Empty:
# logger.debug("Audio queue empty")
outdata[:] = np.zeros(outdata.shape)
return
if len(data) < len(outdata):
outdata[:len(data)] = data
shape = (int(len(outdata) - len(data)), 1)
outdata[len(data):] = np.zeros(shape)
# raise sd.CallbackStop
else:
outdata[:] = data
return _audio_callback
class AudioStream:
def __init__(self, scene_queue:queue.Queue):
self.stop_event = threading.Event()
# NFRAMES = 2048, samplerate = 44.1k
# for 1 block of data, it consists of 2048 / 441000 =0.0046 second of audio
# we want this buffer to hold 2 video frames, that's approximately
# 1000ms / 30 * 2 = 66.67ms of video
# setting this q size to be 0.06667s / 0.0046s = 14.49
# approximately 14 data blocks
# this is currently set to be lower than 14, because testing reveals
# setting it to 14 creates a lag in audio, compared to video
self._q_audio = queue.Queue(maxsize=3)
self.scene_queue = scene_queue
self._audio_callback = _get_audio_callback_handler(
self._q_audio, self.stop_event)
self._stream = sd.OutputStream(
samplerate=args.samplerate, blocksize=NFRAMES,
device=args.audio_out, channels=NCHANNELS, dtype='float32',
callback=self._audio_callback)
self.near_end_callback = None
self._abort_queue = queue.Queue()
def on_audio_near_end(self, callback):
self.near_end_callback = callback
def _play(self):
with self._stream:
video_file_clip = None
while not self.stop_event.is_set():
logger.debug("aus getting scene")
try:
sc: Scene = self.scene_queue.get(timeout=TIMEOUT_SCENE)
except queue.Empty:
continue
logger.debug("aus done getting scene")
video_file_clip = mp.VideoFileClip(sc.filename, audio_nbytes=2)
audio_file = video_file_clip.audio
if not audio_file:
continue
logger.info("Playing audio %s", sc.name)
try:
self._abort_queue.get_nowait()
logger.debug("ignore current abort signal right after new audio started")
except queue.Empty:
pass
# timeout = 20 * NFRAMES / audio_file.fps
audio_file.reader.seek(0)
while not self.stop_event.is_set():
try:
self._abort_queue.get_nowait()
logger.debug("aborting current audio")
break
except queue.Empty:
pass
data = audio_file.reader.read_chunk(NFRAMES)
# enforce mono, left channel only
# this assumes that the audio data is always 2 channels
data = data[:, 0:1]
if len(data) == 0:
logger.debug("audio data empty, stop putting it to queue")
if self.near_end_callback:
self.near_end_callback()
break
# this will block until audio callback removes data from q
self._q_audio.put(data, timeout=TIMEOUT_SCENE)
# close the entire video clip to release resource
# otherwise there might be some sound quality degradation
# reason unknown
video_file_clip.close()
video_file_clip = None
logger.info("audio stream stopped playing")
def start(self):
self._thread = threading.Thread(target=self._play, daemon=True)
self._thread.start()
def abort_current(self):
self._abort_queue.put(1)
def stop(self):
self.stop_event.set()
VideoState = collections.namedtuple("VideoState", ["vfile", "sec"])
class VideoPlayer():
def __init__(self, scene_queue:queue.Queue, on_near_end:typing.Callable):
# self.vs_scene_q = queue.Queue()
# self.as_scene_q = queue.Queue()
self.v_stream = VideoStream(scene_queue)
self.a_stream = AudioStream(scene_queue)
self.scene_queue = scene_queue
# only have near end call on video
self.v_stream.on_video_near_end(on_near_end)
def start(self):
self.a_stream.start()
self.v_stream.start()
def stop(self):
self.v_stream.stop()
self.a_stream.stop()
def abort_current(self):
self.a_stream.abort_current()
self.v_stream.abort_current()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment