Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save noel-friedrich/70e1f7cfa67a4b4d95f0ed45324a9d98 to your computer and use it in GitHub Desktop.
Save noel-friedrich/70e1f7cfa67a4b4d95f0ed45324a9d98 to your computer and use it in GitHub Desktop.
Code I used to warp-correct a MrBeast video
import cv2, math, os
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
import shutil, subprocess
from scipy.io import wavfile
# %%
class VideoWrapper:
def __init__(self, video_file_path: Path, src_color_mode=cv2.COLOR_BGR2RGB):
self.video_capture = cv2.VideoCapture(video_file_path)
self.fps = self.video_capture.get(cv2.CAP_PROP_FPS)
self.src_color_mode = src_color_mode
def reset(self):
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
def videotime_to_frame(self, video_time):
if isinstance(video_time, str):
if video_time.count(":") == 1:
minutes, seconds = [int(s) for s in video_time.split(":")]
return self.seconds_to_frame(minutes * 60 + seconds)
if video_time.count(":") == 2:
minutes, seconds, milliseconds = [int(s) for s in video_time.split(":")]
return self.seconds_to_frame(minutes * 60 + seconds + milliseconds / 1000)
return self.seconds_to_frame(video_time)
def seconds_to_frame(self, seconds: float):
return math.floor(seconds * self.fps)
def iter_images(self, start_seconds=None, end_seconds=None):
frame_index = 0
start_index = 0
end_index = math.inf
if start_seconds is not None:
start_index = self.videotime_to_frame(start_seconds)
if end_seconds is not None:
end_index = self.videotime_to_frame(end_seconds)
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_index - 1)
success, image = self.video_capture.read()
frame_index = start_index
while success and frame_index <= end_index:
if frame_index >= start_index and frame_index <= end_index:
yield frame_index, cv2.cvtColor(image, self.src_color_mode)
success, image = self.video_capture.read()
frame_index += 1
def get_image_at(self, seconds, frame_mode=False):
frame_index = self.videotime_to_frame(seconds) if not frame_mode else seconds
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_index - 1)
raw_image = self.video_capture.read()[1]
return cv2.cvtColor(raw_image, self.src_color_mode)
class ProgressbarFinder:
def __init__(self, y=0.98, thresh=20):
self.progressbar_y = y
self.thresh = thresh
def measure(self, image, debug=False):
# assume the progress bar has constant color
y_index = math.floor(image.shape[0] * self.progressbar_y)
pixel_row = image[y_index].astype(np.float32)
first_color = pixel_row[0]
if debug:
distances = [np.linalg.norm(first_color - c) for c in pixel_row]
plt.scatter(np.arange(len(distances)), distances, c=pixel_row / 255)
for i, color in enumerate(pixel_row):
distance = np.linalg.norm(color - first_color)
if distance > self.thresh:
return i / (image.shape[1] - 1)
videos_directory = Path("videos")
video_file = videos_directory / "ages1-to-100.fullhd.mp4"
progress_bar_bounds = ("18:05:500", "18:48")
video_wrapper = VideoWrapper(video_file)
sample_image = video_wrapper.get_image_at(progress_bar_bounds[0])
progressbar_finder = ProgressbarFinder(y=0.97, thresh=50)
progressbar_finder.measure(sample_image, debug=True)
# %%
progressbar_finder = ProgressbarFinder(y=0.99, thresh=30)
progress = np.array([[frame_index, progressbar_finder.measure(img)] for frame_index, img in video_wrapper.iter_images(*progress_bar_bounds)])
plt.xlabel("Frame Index")
plt.ylabel("Progress Bar Progress")
plt.plot(progress[:, 0], progress[:, 1])
progress[:, 1] = scipy.signal.savgol_filter(progress[:, 1], 30, 3)
plt.bar(progress[:, 0], progress[:, 1], color="yellow")
# %%
progress_velocity = np.gradient(progress[:, 1], progress[:, 0])
plt.xlabel("frame index")
plt.ylabel("Progress Bar Speed")
plt.plot(progress_velocity, color="red")
# %%
def find_best_frame_index(progress_level):
best_frame_index = None
best_distance = math.inf
for frame_index, curr_progress in progress:
distance = abs(curr_progress - progress_level)
if distance < best_distance:
best_frame_index = frame_index
best_distance = distance
return round(best_frame_index)
transformed_video_frames = []
transformed_video_numframes = video_wrapper.videotime_to_frame(progress_bar_bounds[1]) - video_wrapper.videotime_to_frame(progress_bar_bounds[0])
def get_image_at(new_frame_index):
real_progress = new_frame_index / (transformed_video_numframes - 1)
best_frame_index = find_best_frame_index(real_progress)
return video_wrapper.get_image_at(best_frame_index, frame_mode=True)
temp_directory_path = Path("_temp_imagedir")
if not os.path.exists(temp_directory_path):
os.mkdir(temp_directory_path)
for frame_index in range(transformed_video_numframes):
image = get_image_at(frame_index)
plt.imsave(temp_directory_path / f"{frame_index:06}.jpg", image)
print(f"{frame_index}/{transformed_video_numframes} ", end="\r")
print(f"finished making {transformed_video_numframes} images")
# %%
video_output_path = Path("temp_video.mp4")
images = [img for img in os.listdir(temp_directory_path)]
frame = cv2.imread(os.path.join(temp_directory_path, images[0]))
height, width, layers = frame.shape
video = cv2.VideoWriter(video_output_path, 0, video_wrapper.fps, (width, height))
for i, image in enumerate(images):
video.write(cv2.imread(os.path.join(temp_directory_path, image)))
print(f"{i}/{len(images) - 1}", end="\r")
cv2.destroyAllWindows()
video.release()
shutil.rmtree(temp_directory_path)
print(f"Exported {video_output_path} successfully.")
audio_output_path = Path("temp_warped_audio.wav")
start_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[0])
end_frame_index = video_wrapper.videotime_to_frame(progress_bar_bounds[1])
start_seconds = start_frame_index / video_wrapper.fps
end_seconds = end_frame_index / video_wrapper.fps
# --- 4) Use ffmpeg to extract that exact segment's audio to a temp WAV ---
temp_wav_path = Path("temp_segment_audio.wav")
ffmpeg_cmd = [
"ffmpeg",
"-y", # overwrite if exists
"-i", str(video_file),
"-ss", str(start_seconds),
"-to", str(end_seconds),
"-vn", # no video
"-acodec", "pcm_s16le",
"-ar", "44100", # You can choose a different sample rate if desired
"-ac", "2", # 2-channel stereo
str(temp_wav_path)
]
subprocess.run(ffmpeg_cmd, check=True)
sr, audio = wavfile.read(temp_wav_path)
if audio.ndim == 1:
audio = audio[:, np.newaxis] # ensure shape = (N, channels)
orig_frame_indices = progress[:, 0]
orig_frame_rel = orig_frame_indices - start_frame_index
t_orig_rel = orig_frame_rel / video_wrapper.fps
p_values = progress[:, 1]
# Sort by progress for interpolation
sort_idx = np.argsort(p_values)
p_sorted = p_values[sort_idx]
t_orig_rel_sorted = t_orig_rel[sort_idx]
# Number of samples in our extracted segment = audio.shape[0]
num_samples_new = audio.shape[0]
T = end_seconds - start_seconds
new_times = np.arange(num_samples_new) / sr
new_progress = new_times / T
t_orig_rel_new = np.interp(new_progress, p_sorted, t_orig_rel_sorted)
orig_sample_positions = t_orig_rel_new * sr
warped_audio = np.zeros_like(audio)
for ch in range(audio.shape[1]):
warped_audio[:, ch] = np.interp(
orig_sample_positions,
np.arange(audio.shape[0]),
audio[:, ch]
)
# If mono, flatten shape
if warped_audio.shape[1] == 1:
warped_audio = warped_audio[:, 0]
wavfile.write(audio_output_path, sr, warped_audio.astype(audio.dtype))
print(f"Warped audio written to: {audio_output_path.resolve()}")
# %%
final_output_path = Path("outputs") / Path("agevideo.mp4")
cmd = [
"ffmpeg",
"-y", # overwrite if final_output.mp4 already exists
"-i", str(video_output_path),
"-i", str(audio_output_path),
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
str(final_output_path)
]
subprocess.run(cmd, check=True)
os.remove(temp_wav_path)
os.remove(audio_output_path)
os.remove(video_output_path)
print(f"Combined video+audio written to: {final_output_path.resolve()}")
print("also deleted temp audio files")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment