-
-
Save ConAlgorithm/703d10032382bd74722391fc6ed04c9c to your computer and use it in GitHub Desktop.
Merging multiple videos into one grid video by opencv and numpy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import os | |
import numpy as np | |
class ExtractImageFromVideo(object): | |
def __init__(self, path, frame_range=None, debug=False): | |
assert os.path.exists(path) | |
self._p = path | |
self._vc = cv2.VideoCapture(self._p) | |
self.size = int(self._vc.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.fps = int(self._vc.get(cv2.CAP_PROP_FPS)) | |
self.total_frames = int(self._vc.get(cv2.CAP_PROP_FRAME_COUNT)) | |
self._start = 0 | |
self._count = self.total_frames | |
self._debug = debug | |
if frame_range is not None: | |
self.set_frames_range(frame_range) | |
if self._debug: | |
print(f"video size( W x H ) : {self.size[0]} x {self.size[1]}") | |
def __del__(self): | |
self.release() | |
def set_frames_range(self, frame_range=None): | |
if frame_range is None: | |
self._start = 0 | |
self._count = self.total_frames | |
else: | |
assert isinstance(frame_range, (list, tuple, range)) | |
if isinstance(frame_range, (list, tuple)): | |
assert len(frame_range) == 2 | |
start, end = frame_range[0], frame_range[-1] | |
if end is None \ | |
or end == -1 \ | |
or end >= self.total_frames: | |
end = self.total_frames | |
assert end >= start | |
self._start = start | |
self._count = end - start | |
assert self._count <= self.total_frames | |
self._vc.set(cv2.CAP_PROP_POS_FRAMES, self._start) | |
def extract(self, path=None, bgr2rgb=False, target_size=None, text=None, text_position=None): | |
if path is not None and not os.path.exists(path): | |
os.makedirs(path) | |
for i in range(0, self._count): | |
success, frame = self._vc.read() | |
if not success: | |
print(f"index {i} exceeded.") | |
break | |
if self._debug: | |
print(f"frame {self._start + i}") | |
if path is not None: | |
cv2.imwrite(os.path.join(path, f"{self._start + i}.jpg"), frame) | |
if bgr2rgb: | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
if target_size is not None: | |
assert len(target_size) == 2 | |
assert isinstance(target_size, (list, tuple)) | |
frame = cv2.resize(frame, tuple(target_size)) | |
if text is not None: | |
if text_position is not None: | |
w_scale, h_scale = text_position | |
else: | |
w_scale, h_scale = 1 / 10, 1 / 10 | |
pos = int(self.size[0] * w_scale), int(self.size[1] * h_scale) # text position | |
frame = cv2.putText(frame, text, pos, cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), thickness=2) | |
yield frame | |
def release(self): | |
if self._vc is not None: | |
self._vc.release() | |
def create_image_grid(images, grid_size=None): | |
assert images.ndim == 3 or images.ndim == 4 | |
num, img_w, img_h = images.shape[0], images.shape[-2], images.shape[-3] | |
if grid_size is not None: | |
grid_h, grid_w = tuple(grid_size) | |
else: | |
grid_w = max(int(np.ceil(np.sqrt(num))), 1) | |
grid_h = max((num - 1) // grid_w + 1, 1) | |
grid = np.zeros([grid_h * img_h, grid_w * img_w] + list(images.shape[-1:]), dtype=images.dtype) | |
for idx in range(num): | |
x = (idx % grid_w) * img_w | |
y = (idx // grid_w) * img_h | |
grid[y : y + img_h, x : x + img_w, ...] = images[idx] | |
return grid | |
def merge_videos(videos_in, video_out, grid_size=None, titles=None, title_position=(0.5, 0.5), max_frames: int = None): | |
""" | |
Args: | |
videos_in: List/Tuple | |
List of input video paths. e.g. | |
('path/to/v1.mp4', 'path/to/v2.mp4', 'path/to/v3.mp4') | |
video_out: String | |
Path of output video. e.g. | |
'path/to/output.mp4' | |
grid_size: List/Tuple. | |
Row and Column respectively. e.g. | |
(1, 3) | |
titles: List/Tuple | |
The title of each video will be displayed in the video grid, | |
the same length as the input video. e.g. | |
('v1', 'v2', 'v3') | |
title_position: List/Tuple | |
The position(width and height) where the title is displayed, and the value range is (0, 1). | |
e.g. If we want display text in the center of the video, the position is | |
(0.5, 0.5) | |
max_frames: Int | |
Maximum number of frames per input video will be merge, e.g. | |
200 | |
Returns: | |
None | |
""" | |
texts = titles | |
if texts is None: | |
texts = [None] * len(videos_in) | |
assert len(videos_in) == len(texts) | |
dir_name = os.path.dirname(video_out) | |
if dir_name and not os.path.exists(dir_name): | |
os.makedirs(dir_name, exist_ok=True) | |
video_handles = [] | |
for v, text in zip(videos_in, texts): | |
assert os.path.exists(v), f'{v} not exists!' | |
video_handles.append((ExtractImageFromVideo(v), text)) | |
if max_frames is not None: | |
assert max_frames > 0 | |
least_frames = max_frames | |
else: | |
least_frames = sorted([e.total_frames for e, _t in video_handles])[0] # all with same number of frames | |
least_size = sorted([e.size for e, _t in video_handles])[0] # all with same size WH | |
generators = [e.extract(text=t, text_position=title_position) for e, t in video_handles] | |
# read one frame and resize for each generator, then get the output video size | |
cur_frames = np.array([cv2.resize(next(g), least_size) for g in generators]) | |
frames_grid = create_image_grid(cur_frames, grid_size=grid_size) # HWC | |
fps = video_handles[0][0].fps # use the fps of first video | |
out_size = frames_grid.shape[0:2] # HWC to HW | |
out_size = out_size[::-1] # reverse HW to WH, as VideoWriter need that format | |
video_writer = cv2.VideoWriter(video_out, | |
cv2.VideoWriter_fourcc(*'XVID'), | |
fps, | |
out_size) | |
for n in range(least_frames-1): | |
if n % 100 == 0: | |
print(f'{n}: {len(cur_frames)} frames merge into grid with size={frames_grid.shape}') | |
video_writer.write(frames_grid) | |
cur_frames = np.array([cv2.resize(next(g), least_size) for g in generators]) | |
frames_grid = create_image_grid(cur_frames, grid_size=grid_size) | |
video_writer.release() | |
print(f'Output video saved... {video_out}') | |
if __name__ == '__main__': | |
videos_to_merge = [ | |
'1.mp4', | |
'2.mp4', | |
'3.mp4', | |
] | |
titles = ['v1', 'v2', 'v3'] | |
merge_videos( | |
videos_to_merge, | |
'merged.mp4', | |
grid_size=(2, 2), # 2 row, 2 cols | |
titles=titles, | |
title_position=(0.1, 0.5), # text poistion is (0.1 * w, 0.5 * h) | |
max_frames=100) # merge first 100 frames per video |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment