Created
November 18, 2025 21:30
-
-
Save maksadbek/28f317ca085886dfd78f5db7119cc63e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import signal | |
| import subprocess | |
| import sys | |
| import time | |
| import ffmpeg | |
| from PyQt6.QtCore import QUrl, Qt | |
| from PyQt6.QtMultimedia import QAudioOutput, QMediaPlayer | |
| from PyQt6.QtMultimediaWidgets import QVideoWidget | |
| from PyQt6.QtWidgets import QApplication, QVBoxLayout, QWidget, QPushButton | |
| # ---------- CONFIG ---------- | |
| # Set these based on: ffmpeg -f avfoundation -list_devices true -i "" | |
| SCREEN_INDEX = 4 # e.g. [1] Capture screen 0 | |
| CAM_INDEX = 0 # e.g. [0] FaceTime HD Camera | |
| SCREEN_FILE = "/tmp/screen_raw.mp4" | |
| CAM_FILE = "/tmp/cam_raw.mp4" | |
| OUTPUT_FILE = "/tmp/final_pip.mp4" | |
| # How big the camera window is relative to the screen: | |
| # 0.33 ≈ 1/9 area, 0.4 ≈ 1/6, 0.5 = 1/4 | |
| SIDE_FRACTION = 0.4 | |
| # ---------- PyQt player ---------- | |
| # os.environ["QT_FFMPEG_DECODING_HW_DEVICE_TYPES"] = "" # disable hw decode (videotoolbox_vld etc.) | |
| # ---------- ffmpeg helpers (recording) ---------- | |
| def spawn_ffmpeg(cmd: list[str]) -> subprocess.Popen: | |
| """ | |
| Spawn ffmpeg in its own process group so we can kill it even if it ignores Ctrl+C. | |
| """ | |
| return subprocess.Popen( | |
| cmd, | |
| start_new_session=True, # new process group | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| def stop_recording(proc: subprocess.Popen | None) -> None: | |
| """Nicely stop an ffmpeg process.""" | |
| if proc is None: | |
| return | |
| if proc.poll() is not None: | |
| return # already exited | |
| print("stopping ffmpeg {proc}") | |
| # try SIGINT first (like Ctrl+C) | |
| proc.send_signal(signal.SIGTERM) | |
| try: | |
| proc.wait(timeout=2) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| print("stopped ffmpeg {proc}") | |
| def start_recording() -> tuple[subprocess.Popen, subprocess.Popen]: | |
| screen_stream = ( | |
| ffmpeg.input( | |
| str(SCREEN_INDEX), | |
| f="avfoundation", | |
| thread_queue_size=512, | |
| framerate=30, | |
| video_size="1920x1080", | |
| pixel_format="nv12", | |
| ).output( | |
| SCREEN_FILE, | |
| vcodec="libx264", | |
| preset="ultrafast", | |
| crf=23, | |
| pix_fmt="yuv420p", | |
| ).global_args("-y")) | |
| screen_proc = screen_stream.run_async( | |
| pipe_stdin=False, | |
| pipe_stdout=subprocess.DEVNULL, | |
| pipe_stderr=subprocess.STDOUT, | |
| ) | |
| # --- Camera recording (was: ffmpeg -f avfoundation ... -i CAM_INDEX ...) --- | |
| cam_stream = ( | |
| ffmpeg | |
| .input( | |
| str(CAM_INDEX), | |
| f="avfoundation", | |
| thread_queue_size=512, | |
| framerate=30, | |
| pixel_format="nv12", | |
| ) | |
| .output( | |
| CAM_FILE, | |
| vcodec="libx264", | |
| preset="ultrafast", | |
| crf=23, | |
| pix_fmt="yuv420p", | |
| ) | |
| .global_args("-y") | |
| ) | |
| cam_proc = cam_stream.run_async( | |
| pipe_stdin=False, | |
| pipe_stdout=subprocess.DEVNULL, | |
| pipe_stderr=subprocess.STDOUT, | |
| ) | |
| print("Starting screen recording ffmpeg (ffmpeg-python)…") | |
| print(f" -> {SCREEN_FILE} {screen_proc.pid}") | |
| print("Starting camera recording ffmpeg (ffmpeg-python)…") | |
| print(f" -> {CAM_FILE} {cam_proc.pid}") | |
| return screen_proc, cam_proc | |
| def merge_videos_py(screen_path: str, cam_path: str, output_path: str = "final_pip.mp4"): | |
| if not (os.path.exists(screen_path) and os.path.getsize(screen_path) > 0): | |
| raise RuntimeError(f"Screen file '{screen_path}' missing or empty") | |
| if not (os.path.exists(cam_path) and os.path.getsize(cam_path) > 0): | |
| raise RuntimeError(f"Camera file '{cam_path}' missing or empty") | |
| screen = ffmpeg.input(screen_path) | |
| cam = ffmpeg.input(cam_path) | |
| # Scale camera down by SIDE_FRACTION (relative to its own size) | |
| cam_scaled = cam.video.filter( | |
| "scale", | |
| f"iw*{SIDE_FRACTION}", | |
| f"ih*{SIDE_FRACTION}", | |
| ) | |
| pip = ffmpeg.overlay( | |
| screen.video, | |
| cam_scaled, | |
| x="W-w-20", | |
| y="H-h-20", | |
| ) | |
| scaled = pip.filter("scale", 2048, -1) | |
| out = ( | |
| ffmpeg | |
| .output( | |
| scaled, | |
| output_path, | |
| vcodec="libx264", | |
| preset="veryfast", | |
| crf=23, | |
| pix_fmt="yuv420p", | |
| shortest=None, | |
| ) | |
| .overwrite_output() | |
| ) | |
| out.run() | |
| class VideoPlayerWidget(QWidget): | |
| def __init__(self, video_path: str, parent=None): | |
| super().__init__(parent) | |
| self.setWindowTitle("Recording preview") | |
| layout = QVBoxLayout(self) | |
| # --- Video output --- | |
| self.video_widget = QVideoWidget() | |
| layout.addWidget(self.video_widget) | |
| # --- Player --- | |
| self.player = QMediaPlayer() | |
| self.audio_output = QAudioOutput() | |
| self.player.setAudioOutput(self.audio_output) | |
| self.player.setVideoOutput(self.video_widget) | |
| self.player.setSource(QUrl.fromLocalFile(video_path)) | |
| # --- Replay button --- | |
| self.replay_btn = QPushButton("Replay") | |
| self.replay_btn.setFixedHeight(40) | |
| self.replay_btn.setStyleSheet("font-size: 18px;") | |
| layout.addWidget(self.replay_btn) | |
| self.replay_btn.clicked.connect(self.replay) | |
| def replay(self): | |
| self.player.stop() | |
| self.player.setPosition(0) | |
| self.player.play() | |
| def showEvent(self, event): | |
| super().showEvent(event) | |
| # Autoplay when window becomes visible | |
| self.player.play() | |
| class RecorderWidget(QWidget): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.screen_proc: subprocess.Popen | None = None | |
| self.cam_proc: subprocess.Popen | None = None | |
| self.setWindowTitle("Recorder") | |
| self.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.Tool) | |
| layout = QVBoxLayout(self) | |
| self.button = QPushButton("Record") | |
| self.button.clicked.connect(self.toggle_recording) | |
| playButton = QPushButton("Play") | |
| playButton.clicked.connect(self.play) | |
| layout.addWidget(self.button) | |
| layout.addWidget(playButton) | |
| self.setFixedSize(180, 100) | |
| # optional: move to top-right corner of main screen | |
| screen_geo = QApplication.primaryScreen().geometry() | |
| self.move( | |
| screen_geo.right() - self.width() - 20, | |
| screen_geo.top() + 40, | |
| ) | |
| def toggle_recording(self) -> None: | |
| # Not recording -> start | |
| if self.screen_proc is None and self.cam_proc is None: | |
| self.screen_proc, self.cam_proc = start_recording() | |
| self.button.setText("Stop") | |
| return | |
| # Recording -> stop, merge | |
| print("Stopping recording…") | |
| stop_recording(self.screen_proc) | |
| stop_recording(self.cam_proc) | |
| self.screen_proc = None | |
| self.cam_proc = None | |
| self.button.setText("Record") | |
| try: | |
| merge_videos_py(SCREEN_FILE, CAM_FILE, OUTPUT_FILE) | |
| except Exception as e: | |
| print("Merge failed:", e, file=sys.stderr) | |
| def play(self) -> None: | |
| self.player_window = VideoPlayerWidget(OUTPUT_FILE) | |
| self.player_window.resize(960, 540) | |
| self.player_window.show() | |
| self.player_window.raise_() | |
| self.player_window.activateWindow() | |
| def main() -> None: | |
| app = QApplication(sys.argv) | |
| w = RecorderWidget() | |
| w.show() | |
| # w = VideoPlayerWidget(OUTPUT_FILE) | |
| # w.show() | |
| sys.exit(app.exec()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment