Created
May 3, 2025 14:16
-
-
Save codeFlane/307c311e725f124d7b5d069374eed40a to your computer and use it in GitHub Desktop.
Video streamer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from climage import convert | |
from os import get_terminal_size, listdir, system, name, makedirs, path | |
from PIL import Image | |
from time import sleep, time | |
import cv2 | |
import keyboard | |
import sounddevice as sd | |
import soundfile as sf | |
import pygame | |
import threading | |
import numpy as np | |
import json | |
from datetime import datetime | |
import argparse | |
pygame.mixer.init() | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Video stream application") | |
parser.add_argument('-f', '--full-screen', action='store_true', help="Run in full-screen mode") | |
return parser.parse_args() | |
args = parse_args() | |
def get_scaled_width(console_height: int, width: int, height: int) -> int: | |
return int(width * (console_height * 2 / height)) | |
def load_video(console_height: int, console_width: int, folder: str) -> tuple: | |
dots = [".", "..", "..."] | |
i = 0 | |
start = time() | |
images = [] | |
for img in listdir(folder): | |
if keyboard.is_pressed('v'): | |
system('cls' if name == 'nt' else 'clear') | |
folder = input("Enter video folder (default testvid): ") or 'testvid' | |
system('cls' if name == 'nt' else 'clear') | |
return 'video', folder, [], 30 | |
if keyboard.is_pressed('c'): | |
system('cls' if name == 'nt' else 'clear') | |
return 'camera', folder, [], 30 | |
if keyboard.is_pressed('r'): | |
system('cls' if name == 'nt' else 'clear') | |
record_folder = input("Enter recording folder (default vid2): ") or 'vid2' | |
system('cls' if name == 'nt' else 'clear') | |
return 'record', record_folder, [], 30 | |
if keyboard.is_pressed('q'): | |
raise KeyboardInterrupt | |
if time() - start > 0.3: | |
start = time() | |
i = (i + 1) % 3 | |
system('cls' if name == 'nt' else 'clear') | |
padding = " " * ((console_width - len("Loading" + dots[i])) // 2) | |
print("\n" * (console_height // 2) + padding + "Loading" + dots[i], flush=True) | |
if img.endswith(('.png', '.jpg', '.jpeg')): | |
img_path = f'{folder}/{img}' | |
img_size = Image.open(img_path).size | |
width = console_width if args.full_screen else min(console_width, get_scaled_width(console_height, *img_size)) | |
images.append(convert(img_path, is_truecolor=True, is_unicode=True, is_256color=False, width=width)) | |
fps = 30 | |
json_path = f'{folder}/metadata.json' | |
if path.exists(json_path): | |
with open(json_path, 'r') as f: | |
metadata = json.load(f) | |
fps = metadata.get('fps', 30) | |
return 'video', folder, images, fps | |
def overlay_text(img: str, text: str, console_width: int, console_height: int, start_time: float) -> str: | |
if time() - start_time > 2: return img | |
lines = img.split('\n') | |
if not lines: return img | |
text = f"\x1b[40m{text}\x1b[0m" | |
padding = " " * max(0, (console_width - len(text)) // 2) | |
line_idx = console_height // 2 | |
if line_idx < len(lines): | |
lines[line_idx] = padding + text + " " * console_width | |
return '\n'.join(lines) | |
def format_time(seconds: float) -> str: | |
minutes = int(seconds // 60) | |
seconds = int(seconds % 60) | |
return f"{minutes}:{seconds:02d}" | |
def play_video(images: list, console_height: int, console_width: int, folder: str, fps: float): | |
if not images: | |
print(f"Error: no images in {folder}") | |
sleep(1) | |
return 'camera', folder | |
mp3_file = next((f for f in listdir(folder) if f.endswith('.mp3')), None) | |
text = "Playback mode" + (" (no sound)" if not mp3_file else "") | |
start_time = time() | |
frame_start = time() | |
frame_count = 0 | |
paused = False | |
i = 0 | |
display_fps = fps | |
overlay_msg = "" | |
overlay_time = 0 | |
if mp3_file: | |
pygame.mixer.music.load(f'{folder}/{mp3_file}') | |
pygame.mixer.music.play(start=0) | |
while True: | |
if keyboard.is_pressed('v'): | |
pygame.mixer.music.stop() | |
system('cls' if name == 'nt' else 'clear') | |
folder = input("Enter video folder (default testvid): ") or 'testvid' | |
system('cls' if name == 'nt' else 'clear') | |
return 'video', folder | |
if keyboard.is_pressed('c'): | |
pygame.mixer.music.stop() | |
system('cls' if name == 'nt' else 'clear') | |
return 'camera', folder | |
if keyboard.is_pressed('r'): | |
pygame.mixer.music.stop() | |
system('cls' if name == 'nt' else 'clear') | |
record_folder = input("Enter recording folder (default vid2): ") or 'vid2' | |
system('cls' if name == 'nt' else 'clear') | |
return 'record', record_folder | |
if keyboard.is_pressed('q'): | |
pygame.mixer.music.stop() | |
raise KeyboardInterrupt | |
if keyboard.is_pressed('space'): | |
paused = not paused | |
overlay_msg = "Paused" if paused else "" | |
overlay_time = time() | |
if mp3_file: | |
if paused: | |
pygame.mixer.music.pause() | |
else: | |
pygame.mixer.music.unpause() | |
sleep(0.2) | |
if keyboard.is_pressed('left'): | |
i = max(0, i - 15) | |
overlay_msg = "Rewound 15 frames" | |
overlay_time = time() | |
if mp3_file: | |
pygame.mixer.music.stop() | |
pygame.mixer.music.play(start=i / fps) | |
sleep(0.2) | |
if keyboard.is_pressed('right'): | |
i = min(len(images) - 1, i + 15) | |
overlay_msg = "Forwarded 15 frames" | |
overlay_time = time() | |
if mp3_file: | |
pygame.mixer.music.stop() | |
pygame.mixer.music.play(start=i / fps) | |
sleep(0.2) | |
render_start = time() | |
if i >= len(images) - 1 and not paused: | |
paused = True | |
overlay_msg = "Video end" | |
overlay_time = time() | |
if mp3_file: | |
pygame.mixer.music.pause() | |
img = overlay_text(images[i], overlay_msg or text, console_width, console_height, overlay_time if overlay_msg else start_time) | |
elapsed = time() - frame_start | |
if elapsed > 1: | |
display_fps = frame_count / elapsed | |
frame_start = time() | |
frame_count = 0 | |
print(f"\x1b[H{img}\nFPS: {display_fps:.1f} Time: {format_time(i / fps)}\n" + "\n" * (console_height - len(img.split('\n'))), flush=True, end='') | |
if not paused: | |
i = (i + 1) % len(images) | |
frame_count += 1 | |
sleep(1 / fps) | |
def record_audio(record_folder: str, stop_event: threading.Event, duration: float): | |
sample_rate = 44100 | |
audio = [] | |
start_time = time() | |
with sd.InputStream(samplerate=sample_rate, channels=1) as stream: | |
while not stop_event.is_set() and (time() - start_time) < duration: | |
data, _ = stream.read(256) | |
audio.append(data) | |
audio = np.concatenate(audio, axis=0) | |
sf.write(f'{record_folder}/audio.mp3', audio, sample_rate) | |
def stream_camera(console_height: int, console_width: int, record: bool = False, record_folder: str = 'vid2'): | |
system('cls' if name == 'nt' else 'clear') | |
padding = " " * ((console_width - len("Loading...")) // 2) | |
print("\n" * (console_height // 2) + padding + "Loading...", flush=True) | |
cap = cv2.VideoCapture(0) | |
if not cap.isOpened(): | |
print("Error: could not open camera") | |
sleep(1) | |
return 'video', record_folder | |
pixel_width = int(console_width * 0.5 if not args.full_screen else console_width * 0.6) | |
pixel_height = console_height * 2 | |
cap.set(cv2.CAP_PROP_FRAME_WIDTH, pixel_width) | |
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, pixel_height) | |
start_time = time() | |
audio_thread = None | |
if record: | |
makedirs(record_folder, exist_ok=True) | |
stop_event = threading.Event() | |
record_start = time() | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, float('inf')), daemon=True) | |
audio_thread.start() | |
frame_count = 0 | |
frame_start = time() | |
target_fps = 30 | |
render_times = [] | |
try: | |
while True: | |
if keyboard.is_pressed('v'): | |
system('cls' if name == 'nt' else 'clear') | |
folder = input("Enter video folder: ") | |
system('cls' if name == 'nt' else 'clear') | |
if record: | |
stop_event.set() | |
if audio_thread and audio_thread.is_alive(): | |
audio_thread.join() | |
duration = time() - record_start | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, duration), daemon=True) | |
audio_thread.start() | |
audio_thread.join() | |
avg_fps = frame_count / duration if frame_count > 0 else 30 | |
metadata = { | |
'folder': record_folder, | |
'fps': avg_fps, | |
'duration': duration, | |
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
with open(f'{record_folder}/metadata.json', 'w') as f: | |
json.dump(metadata, f, indent=4) | |
return 'video', folder | |
if keyboard.is_pressed('c'): | |
system('cls' if name == 'nt' else 'clear') | |
if record: | |
stop_event.set() | |
if audio_thread and audio_thread.is_alive(): | |
audio_thread.join() | |
duration = time() - record_start | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, duration), daemon=True) | |
audio_thread.start() | |
audio_thread.join() | |
avg_fps = frame_count / duration if frame_count > 0 else 30 | |
metadata = { | |
'folder': record_folder, | |
'fps': avg_fps, | |
'duration': duration, | |
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
with open(f'{record_folder}/metadata.json', 'w') as f: | |
json.dump(metadata, f, indent=4) | |
return 'camera', record_folder | |
if keyboard.is_pressed('r'): | |
system('cls' if name == 'nt' else 'clear') | |
record_folder_new = input("Enter recording : ") | |
system('cls' if name == 'nt' else 'clear') | |
if record: | |
stop_event.set() | |
if audio_thread and audio_thread.is_alive(): | |
audio_thread.join() | |
duration = time() - record_start | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, duration), daemon=True) | |
audio_thread.start() | |
audio_thread.join() | |
avg_fps = frame_count / duration if frame_count > 0 else 30 | |
metadata = { | |
'folder': record_folder, | |
'fps': avg_fps, | |
'duration': duration, | |
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
with open(f'{record_folder}/metadata.json', 'w') as f: | |
json.dump(metadata, f, indent=4) | |
return 'record', record_folder_new | |
if keyboard.is_pressed('q'): | |
if record: | |
stop_event.set() | |
if audio_thread and audio_thread.is_alive(): | |
audio_thread.join() | |
duration = time() - record_start | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, duration), daemon=True) | |
audio_thread.start() | |
audio_thread.join() | |
avg_fps = frame_count / duration if frame_count > 0 else 30 | |
metadata = { | |
'folder': record_folder, | |
'fps': avg_fps, | |
'duration': duration, | |
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
with open(f'{record_folder}/metadata.json', 'w') as f: | |
json.dump(metadata, f, indent=4) | |
raise KeyboardInterrupt | |
if keyboard.is_pressed('space') and record: | |
stop_event.set() | |
if audio_thread and audio_thread.is_alive(): | |
audio_thread.join() | |
duration = time() - record_start | |
audio_thread = threading.Thread(target=record_audio, args=(record_folder, stop_event, duration), daemon=True) | |
audio_thread.start() | |
audio_thread.join() | |
avg_fps = frame_count / duration if frame_count > 0 else 30 | |
metadata = { | |
'folder': record_folder, | |
'fps': avg_fps, | |
'duration': duration, | |
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
with open(f'{record_folder}/metadata.json', 'w') as f: | |
json.dump(metadata, f, indent=4) | |
system('cls' if name == 'nt' else 'clear') | |
return 'camera', record_folder | |
render_start = time() | |
ret, frame = cap.read() | |
if not ret: | |
print("Error: could not capture frame") | |
break | |
frame = cv2.flip(frame, 1) | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
new_width = console_width if args.full_screen else min(console_width, get_scaled_width(console_height, pixel_width, pixel_height)) | |
if record and frame_count % 2 == 0: | |
frame_resized = cv2.resize(frame, (1920, 1080), interpolation=cv2.INTER_AREA) | |
cv2.imwrite(f'{record_folder}/frame_{frame_count//2:04d}.png', cv2.cvtColor(frame_resized, cv2.COLOR_RGB2BGR)) | |
cv2.imwrite('temp.png', cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) | |
img = convert('temp.png', is_truecolor=True, is_unicode=True, is_256color=False, width=new_width) | |
text = "Recording mode" if record else "Camera mode" | |
render_time = time() - render_start | |
render_times.append(render_time) | |
fps = 1 / render_time if render_time > 0 else target_fps | |
status = f"FPS: {fps:.1f} Time: {format_time(time() - record_start)}" if record else f"FPS: {fps:.1f}" | |
print(f"\x1b[H{img}\n{status}\n" + "\n" * (console_height - len(img.split('\n'))), flush=True, end='') | |
frame_count += 1 | |
sleep(1 / target_fps) | |
finally: | |
if record and audio_thread and audio_thread.is_alive(): | |
stop_event.set() | |
audio_thread.join() | |
cap.release() | |
return 'video', record_folder | |
console_height = get_terminal_size().lines | |
console_width = get_terminal_size().columns | |
video_folder = 'testvid' | |
images = [] | |
system('cls' if name == 'nt' else 'clear') | |
print("Select mode (V, C, or R):", flush=True) | |
try: | |
while True: | |
if keyboard.is_pressed('v'): | |
system('cls' if name == 'nt' else 'clear') | |
video_folder = input("Enter video folder: ") | |
system('cls' if name == 'nt' else 'clear') | |
images = [] | |
mode = 'video' | |
break | |
if keyboard.is_pressed('c'): | |
system('cls' if name == 'nt' else 'clear') | |
mode = 'camera' | |
break | |
if keyboard.is_pressed('r'): | |
system('cls' if name == 'nt' else 'clear') | |
video_folder = input("Enter recording folder: ") | |
system('cls' if name == 'nt' else 'clear') | |
mode = 'record' | |
break | |
if keyboard.is_pressed('q'): | |
raise KeyboardInterrupt | |
sleep(0.05) | |
while True: | |
system('cls' if name == 'nt' else 'clear') | |
if mode == 'video': | |
mode, video_folder, images, fps = load_video(console_height, console_width, video_folder) | |
if mode != 'video': | |
images = [] | |
mode = play_video(images, console_height, console_width, video_folder, fps)[0] if mode == 'video' else mode | |
elif mode == 'camera': | |
mode, video_folder = stream_camera(console_height, console_width) | |
elif mode == 'record': | |
mode, video_folder = stream_camera(console_height, console_width, True, video_folder) | |
except KeyboardInterrupt: | |
system('cls' if name == 'nt' else 'clear') | |
print("Stopped") | |
except Exception as e: | |
system('cls' if name == 'nt' else 'clear') | |
print(f"Error: {e}") | |
finally: | |
try: | |
pygame.mixer.quit() | |
except: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
video recorder in console
Libraries:
Modes:
Hotkeys:
Reading video:
Recording video:
Bottom data:
Flags:
Screenshots: