Created
April 23, 2023 13:40
-
-
Save ericswpark/518ee8febcc7d114d46e46609d0aa506 to your computer and use it in GitHub Desktop.
Script to optimize movie files (4K or large file size)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import subprocess | |
import argparse | |
# Set the path to the directory containing the movies | |
directory = 'Movies/' | |
# Name to append at the end of optimized files | |
ffmpeg_preset = "slow" | |
presets = [ | |
{ | |
"name": "h264", | |
"encoder_name": "libx264", | |
}, | |
{ | |
"name": "h265", | |
"encoder_name": "libx265", | |
} | |
] | |
selected_preset = 0 | |
reencoded_movies = 0 | |
processed_movies = 0 | |
# Set up argument parser | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-d", "--dry-run", action="store_true", help="Perform a dry run without making any changes") | |
parser.add_argument("-l", "--limit", action="store", help="Number of movies to process before quitting") | |
parser.add_argument("-p", "--preset", action="store", help="Name or index of preset to use. Default is the first one") | |
args = parser.parse_args() | |
# Function to print elapsed time | |
def get_human_readable_elapsed_time(start, end): | |
elapsed = end - start | |
hour = int((end - start) / 60 / 60) | |
elapsed %= 60 * 60 | |
minute = int((end - start) / 60) | |
elapsed %= 60 | |
second = int(elapsed) | |
return f"{hour}h {minute}m {second}s" | |
# Function to check if movie is 4K | |
def is_movie_4k(filepath): | |
# Use ffprobe to extract the video's metadata | |
ffprobe_output = subprocess.check_output(['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=s=x:p=0', filepath]) | |
# Parse the output to extract the video's resolution | |
try: | |
items = ffprobe_output.strip().split(b'x') | |
width, height = map(int, [items[0], items[1]]) | |
except Exception as e: | |
print(f"Warning: got back {ffprobe_output.strip()} and I don't know what to do with this, bailing...") | |
raise e | |
# Check if the video is in 4K resolution and return results | |
return width >= 3840 and height >= 2160 | |
# Function to check if the movie is over 10 GB | |
def is_movie_over_10gb(filepath): | |
return os.path.getsize(filepath) >= 10 * 1024 * 1024 * 1024 | |
# Function to convert a movie to 1080p resolution in a universal format supported by most video players | |
def create_optimized_movie(filepath): | |
global reencoded_movies | |
reencoded_movies += 1 | |
# Split the file name and extension of the movie | |
basename = os.path.basename(filepath) | |
filename, ext = os.path.splitext(basename) | |
# Get the directory of the movie | |
directory = os.path.dirname(filepath) | |
# Create the optimized directory if it doesn't exist | |
optimized_directory = os.path.join(directory, 'Optimized') | |
# To create 1080p, add -vf scale=1920:1080 after {filepath} | |
command = ['ffmpeg', '-i', filepath, '-c:v', presets[selected_preset]["encoder_name"], '-preset', ffmpeg_preset, '-crf', '18', '-vf', 'format=yuv420p', '-c:a', 'aac', '-b:a', '160k', '-ac', '2', '-movflags', '+faststart', f"{optimized_directory}/{filename}-{presets[selected_preset]['name']}-{ffmpeg_preset}-Universal.mp4"] | |
# Check dry run parameter | |
if args.dry_run: | |
print(f"Dry run mode. Would've run the following command:\n\t{' '.join(command)}") | |
else: | |
# Create optimized directory if it doesn't exist | |
if not os.path.exists(optimized_directory): | |
os.mkdir(optimized_directory) | |
# Run command | |
try: | |
segment_start_time = time.time() | |
print(subprocess.check_output(command)) | |
segment_end_time = time.time() | |
print(f"Finished re-encoding movie {filename}. Elapsed time: {get_human_readable_elapsed_time(segment_start_time, segment_end_time)}") | |
except subprocess.CalledProcessError as err: | |
print(err) | |
print(f"An error occurred while re-encoding the movie {filename}") | |
exit(1) | |
# Function to recursively search for target movies in the specified directory | |
def search_for_target_movies(directory, recurse=True): | |
# Loop through all the files and directories in the directory | |
for index, name in enumerate(os.listdir(directory)): | |
# If limit is set, stop | |
if args.limit is not None and int(args.limit) - 1 < index: | |
print(f"Reached limit of {args.limit} movies, exiting...") | |
break | |
global processed_movies | |
processed_movies += 1 | |
# Skip directories named "Optimized/" | |
if name == 'Optimized': | |
continue | |
# Construct the full path to the file or directory | |
filepath = os.path.join(directory, name) | |
# Check if the file or directory is a 4K movie | |
if filepath.endswith('.mkv') or filepath.endswith('.mp4'): | |
# Get the directory of the movie | |
directory = os.path.dirname(filepath) | |
basename = os.path.basename(filepath) | |
# Get file name parts and extension | |
filename, ext = os.path.splitext(basename) | |
# Check if an optimized file already exists and skip it is | |
optimized_directory = os.path.join(directory, 'Optimized') | |
if os.path.exists(optimized_directory): | |
if os.path.exists(f"{optimized_directory}/{filename}-{presets[selected_preset]['name']}-{ffmpeg_preset}-Universal.mp4"): | |
print(f"The movie {name} is already optimized, skipping...") | |
continue | |
# Check if file meets standards and convert | |
if is_movie_4k(filepath) or is_movie_over_10gb(filepath): | |
print(f"Converting movie {name} as it meets the criteria...") | |
create_optimized_movie(filepath) | |
else: | |
print(f"Skipping movie {name} as it is not over 4K resolution or over 10 GB...") | |
else: | |
# If it's not a movie, and if we are currently recursing, check if it's a directory | |
if recurse and os.path.isdir(filepath): | |
# If it's a directory, search for movies in that directory, but do not recurse further | |
search_for_target_movies(filepath, recurse=False) | |
def main(): | |
global selected_preset | |
if args.preset is not None and int(args.preset) >= 0 and int(args.preset) <= len(presets) - 1: | |
selected_preset = int(args.preset) | |
# Search for target movies in the specified directory | |
total_start_time = time.time() | |
search_for_target_movies(directory, recurse=True) | |
total_end_time = time.time() | |
print(f"Finished! Total elapsed time: {get_human_readable_elapsed_time(total_start_time, total_end_time)}") | |
print(f"Re-encoded {reencoded_movies} out of a total of {processed_movies} movies.") | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment