Created
June 21, 2025 17:04
-
-
Save Udinic/f94bd7c828e205ce936443ce48d94f4c to your computer and use it in GitHub Desktop.
MKV video repackaging tool to include only English audio/subtitles tracks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import re | |
import sys | |
import os | |
import argparse | |
from pathlib import Path | |
from dataclasses import dataclass | |
from typing import List, Optional | |
@dataclass | |
class Track: | |
"""Represents a single track in an MKV file""" | |
number: int | |
track_id: int # mkvmerge/mkvextract ID | |
track_type: str | |
codec: str | |
language: Optional[str] = None | |
name: Optional[str] = None | |
channels: Optional[int] = None | |
def is_audio(self) -> bool: | |
return self.track_type == "audio" | |
def is_subtitle(self) -> bool: | |
return self.track_type == "subtitles" | |
def is_video(self) -> bool: | |
return self.track_type == "video" | |
def is_english(self) -> bool: | |
"""Check if track is English language""" | |
if self.language in ["en", "eng"]: | |
return True | |
if self.name and ("English" in self.name or "SDH" in self.name): | |
return True | |
return False | |
def is_original_or_commentary(self) -> bool: | |
"""Check if track is named Original or Commentary""" | |
if self.name and (self.name == "Original" or self.name == "Commentary"): | |
return True | |
return False | |
def is_unlabeled(self) -> bool: | |
"""Check if track has no language or name (likely English)""" | |
return not self.language and not self.name | |
def __str__(self): | |
parts = [f"Track {self.number} (ID {self.track_id}): {self.track_type}"] | |
if self.language: | |
parts.append(f"lang={self.language}") | |
if self.name: | |
parts.append(f"name='{self.name}'") | |
if self.channels: | |
parts.append(f"channels={self.channels}") | |
return " ".join(parts) | |
class MKVFile: | |
"""Represents an MKV file and its tracks""" | |
def __init__(self, filepath: str): | |
self.filepath = Path(filepath) | |
self.tracks: List[Track] = [] | |
self._parse_mkvinfo() | |
def _parse_mkvinfo(self): | |
"""Parse mkvinfo output into Track objects""" | |
try: | |
result = subprocess.run(['mkvinfo', str(self.filepath)], | |
capture_output=True, text=True, check=True) | |
self._parse_tracks(result.stdout) | |
except subprocess.CalledProcessError as e: | |
print(f"β Error running mkvinfo on {self.filepath}: {e}") | |
sys.exit(1) | |
except FileNotFoundError: | |
print("β Error: mkvinfo not found. Please install mkvtoolnix.") | |
sys.exit(1) | |
def _get_indent_level(self, line: str) -> int: | |
"""Get the indentation level by counting spaces between | and +""" | |
if not line.startswith('|'): | |
return -1 | |
# Find the '+' and count spaces before it | |
plus_pos = line.find('+') | |
if plus_pos == -1: | |
return -1 | |
# Count spaces between | and + | |
spaces = plus_pos - 1 | |
return spaces | |
def _parse_tracks(self, mkvinfo_output: str): | |
"""Parse the mkvinfo output into Track objects using indentation levels""" | |
lines = mkvinfo_output.split('\n') | |
current_track = None | |
in_tracks_section = False | |
for i, line in enumerate(lines): | |
indent_level = self._get_indent_level(line) | |
# Skip non-mkvinfo lines | |
if indent_level == -1: | |
continue | |
# Level 0: Main sections | |
if indent_level == 0: | |
if "Tracks" in line: | |
in_tracks_section = True | |
continue | |
elif in_tracks_section: | |
# End of tracks section | |
if current_track: | |
self.tracks.append(current_track) | |
break | |
# Skip if not in tracks section | |
if not in_tracks_section: | |
continue | |
# Level 1: Individual tracks | |
if indent_level == 1 and "Track" in line and line.strip().endswith("Track"): | |
if current_track: | |
self.tracks.append(current_track) | |
# Don't reset current_track here - wait for track number line | |
current_track = "pending" # Mark that we're expecting a new track | |
continue | |
# Level 2: Track properties | |
if indent_level == 2: | |
# Track number and ID - this creates the actual track object | |
track_match = re.search(r'Track number: (\d+) \(track ID for mkvmerge & mkvextract: (\d+)\)', line) | |
if track_match: | |
track_number = int(track_match.group(1)) | |
track_id = int(track_match.group(2)) | |
current_track = Track( | |
number=track_number, | |
track_id=track_id, | |
track_type="unknown", | |
codec="unknown" | |
) | |
continue | |
# Skip other properties if we don't have a proper track object yet | |
if not current_track or current_track == "pending": | |
continue | |
# Track type | |
if "Track type:" in line: | |
track_type = line.split("Track type:")[-1].strip() | |
current_track.track_type = track_type | |
# Codec | |
elif "Codec ID:" in line: | |
codec = line.split("Codec ID:")[-1].strip() | |
current_track.codec = codec | |
# Language | |
elif "Language:" in line and "Original language" not in line: | |
lang_match = re.search(r'Language[^:]*:\s*(\w+)', line) | |
if lang_match: | |
current_track.language = lang_match.group(1) | |
# Name | |
elif "Name:" in line: | |
name_match = re.search(r'Name:\s*(.+)', line) | |
if name_match: | |
name = name_match.group(1).strip() | |
if name and name != "|" and name != "": | |
current_track.name = name | |
# Level 3: Sub-properties (like Audio track details) | |
elif indent_level == 3: | |
# Skip if we don't have a proper track object | |
if not current_track or current_track == "pending": | |
continue | |
# Audio channels | |
if "Channels:" in line: | |
channels_match = re.search(r'Channels:\s*(\d+)', line) | |
if channels_match: | |
current_track.channels = int(channels_match.group(1)) | |
def get_audio_tracks(self) -> List[Track]: | |
"""Get all audio tracks""" | |
return [t for t in self.tracks if t.is_audio()] | |
def get_subtitle_tracks(self) -> List[Track]: | |
"""Get all subtitle tracks""" | |
return [t for t in self.tracks if t.is_subtitle()] | |
def get_desired_audio_tracks(self) -> List[Track]: | |
"""Get desired audio tracks (Original/Commentary or English) in original order""" | |
audio_tracks = self.get_audio_tracks() | |
# First priority: Original or Commentary tracks | |
original_commentary = [t for t in audio_tracks if t.is_original_or_commentary()] | |
if original_commentary: | |
return original_commentary | |
# Second priority: English language tracks | |
english_tracks = [t for t in audio_tracks if t.is_english()] | |
if english_tracks: | |
return english_tracks | |
# Fallback: If only one audio track, keep it (even if "und") | |
if len(audio_tracks) == 1: | |
return audio_tracks | |
return [] | |
def get_desired_subtitle_tracks(self) -> List[Track]: | |
"""Get desired subtitle tracks (English or unlabeled) in original order""" | |
subtitle_tracks = self.get_subtitle_tracks() | |
desired = [] | |
# Process tracks in original order to preserve track ordering | |
for track in subtitle_tracks: | |
# Check if this track should be kept | |
if track.is_english() or track.is_unlabeled(): | |
desired.append(track) | |
return desired | |
def needs_repackaging(self) -> bool: | |
"""Check if file needs repackaging""" | |
all_audio = self.get_audio_tracks() | |
desired_audio = self.get_desired_audio_tracks() | |
all_subs = self.get_subtitle_tracks() | |
desired_subs = self.get_desired_subtitle_tracks() | |
audio_ids_all = [t.track_id for t in all_audio] | |
audio_ids_desired = [t.track_id for t in desired_audio] | |
sub_ids_all = [t.track_id for t in all_subs] | |
sub_ids_desired = [t.track_id for t in desired_subs] | |
return audio_ids_all != audio_ids_desired or sub_ids_all != sub_ids_desired | |
def print_analysis(self): | |
"""Print track analysis""" | |
print(f"π Inspecting: {self.filepath}", flush=True) | |
audio_tracks = self.get_audio_tracks() | |
desired_audio = self.get_desired_audio_tracks() | |
subtitle_tracks = self.get_subtitle_tracks() | |
desired_subs = self.get_desired_subtitle_tracks() | |
# Show what we found and why | |
for track in desired_audio: | |
if track.is_english(): | |
print(f"π» Found English audio track: {track.track_id}", flush=True) | |
elif track.is_original_or_commentary(): | |
print(f"π» Found {track.name} audio track: {track.track_id}", flush=True) | |
# Special case: single audio track fallback | |
if len(audio_tracks) == 1 and not any(t.is_english() or t.is_original_or_commentary() for t in audio_tracks): | |
print(f"π» Only one audio track found - keeping it: {audio_tracks[0].track_id}", flush=True) | |
# Special case: unlabeled audio tracks fallback | |
if (len(desired_audio) > 1 and | |
not any(t.is_english() or t.is_original_or_commentary() for t in desired_audio) and | |
all(t.is_unlabeled() for t in desired_audio)): | |
print(f"π» No labeled English tracks - keeping all unlabeled audio tracks: {' '.join(str(t.track_id) for t in desired_audio)}", flush=True) | |
# Show unlabeled subtitle info (but avoid duplication) | |
unlabeled_subs = [t for t in desired_subs if t.is_unlabeled()] | |
if unlabeled_subs and not any(t.is_english() for t in desired_subs): | |
print(f"β No English subtitles - keeping unlabeled subtitle tracks: {' '.join(str(t.track_id) for t in unlabeled_subs)}", flush=True) | |
print(f"π§ All audio IDs: {' '.join(str(t.track_id) for t in audio_tracks)}", flush=True) | |
print(f"π§ Desired audio IDs: {' '.join(str(t.track_id) for t in desired_audio)}", flush=True) | |
print(f"π€ All subtitle IDs: {' '.join(str(t.track_id) for t in subtitle_tracks)}", flush=True) | |
print(f"π€ Desired subtitle IDs: {' '.join(str(t.track_id) for t in desired_subs)}", flush=True) | |
def repackage(self, dry_run: bool = False) -> bool: | |
"""Repackage the file""" | |
if not self.needs_repackaging(): | |
print("β No repackaging needed.", flush=True) | |
return False | |
desired_audio = self.get_desired_audio_tracks() | |
desired_subs = self.get_desired_subtitle_tracks() | |
output_path = self.filepath.with_name(f"{self.filepath.stem}_repack{self.filepath.suffix}") | |
if dry_run: | |
print(f"π [DRY RUN] Would repackage as: {output_path}", flush=True) | |
return True | |
print(f"π Repackaging as: {output_path}", flush=True) | |
# Build mkvmerge command | |
cmd = ["mkvmerge", "-o", str(output_path)] | |
if desired_audio: | |
audio_ids = ",".join(str(t.track_id) for t in desired_audio) | |
cmd.extend(["--audio-tracks", audio_ids]) | |
if desired_subs: | |
sub_ids = ",".join(str(t.track_id) for t in desired_subs) | |
cmd.extend(["--subtitle-tracks", sub_ids]) | |
cmd.append(str(self.filepath)) | |
try: | |
result = subprocess.run(cmd, check=True, capture_output=True, text=True) | |
print(f"β Successfully created: {output_path}", flush=True) | |
return True | |
except subprocess.CalledProcessError as e: | |
print(f"β Failed to create: {output_path}", flush=True) | |
print(f"Error: {e.stderr}", flush=True) | |
return False | |
def scan_directory(directory: Path): | |
"""Scan directory for repackaging opportunities""" | |
if not directory.is_dir(): | |
print(f"β Error: Directory '{directory}' does not exist", flush=True) | |
return | |
print(f"π Scanning directory: {directory}", flush=True) | |
# Search recursively for MKV files | |
mkv_files = list(directory.rglob("*.mkv")) | |
# Skip _repack files | |
mkv_files = [f for f in mkv_files if not f.name.endswith("_repack.mkv")] | |
if not mkv_files: | |
print(f"βΉοΈ No MKV files found in: {directory}", flush=True) | |
print("---------------------------------------------", flush=True) | |
return | |
repack_candidates = 0 | |
for mkv_file in mkv_files: | |
try: | |
mkv = MKVFile(mkv_file) | |
# Only show files that need repackaging | |
if mkv.needs_repackaging(): | |
repack_candidates += 1 | |
# Get track info | |
all_audio = mkv.get_audio_tracks() | |
desired_audio = mkv.get_desired_audio_tracks() | |
all_subs = mkv.get_subtitle_tracks() | |
desired_subs = mkv.get_desired_subtitle_tracks() | |
print(f"π {mkv_file.relative_to(directory)}", flush=True) | |
# Audio analysis | |
if len(all_audio) != len(desired_audio): | |
removed_audio = len(all_audio) - len(desired_audio) | |
kept_types = [] | |
for track in desired_audio: | |
if track.is_original_or_commentary(): | |
kept_types.append(track.name) | |
elif track.is_english(): | |
kept_types.append("English") | |
else: | |
kept_types.append("Unlabeled/Undefined") | |
kept_desc = ", ".join(kept_types) if kept_types else "none" | |
print(f" π§ Audio: {len(all_audio)} tracks β {len(desired_audio)} tracks (keeping: {kept_desc})", flush=True) | |
print(f" Will remove: {removed_audio} audio tracks", flush=True) | |
else: | |
# Show what we're keeping even if no change | |
if desired_audio: | |
kept_types = [] | |
for track in desired_audio: | |
if track.is_original_or_commentary(): | |
kept_types.append(track.name) | |
elif track.is_english(): | |
kept_types.append("English") | |
elif len(all_audio) == 1: | |
kept_types.append("Single track (fallback)") | |
elif track.is_unlabeled(): | |
kept_types.append("Unlabeled/Undefined (fallback)") | |
else: | |
kept_types.append("Unknown") | |
kept_desc = ", ".join(set(kept_types)) | |
print(f" β Audio: All {len(all_audio)} tracks will be kept ({kept_desc})", flush=True) | |
else: | |
print(f" β Audio: All {len(all_audio)} tracks will be kept", flush=True) | |
# Subtitle analysis | |
if len(all_subs) != len(desired_subs): | |
removed_subs = len(all_subs) - len(desired_subs) | |
kept_types = [] | |
for track in desired_subs: | |
if track.is_english(): | |
kept_types.append("English") | |
elif track.is_unlabeled(): | |
kept_types.append("Unlabeled/Undefined") | |
else: | |
kept_types.append("Other") | |
kept_desc = ", ".join(set(kept_types)) if kept_types else "none" | |
print(f" π€ Subtitles: {len(all_subs)} tracks β {len(desired_subs)} tracks (keeping: {kept_desc})", flush=True) | |
print(f" Will remove: {removed_subs} subtitle tracks", flush=True) | |
else: | |
# Show what we're keeping even if no change | |
if desired_subs: | |
kept_types = [] | |
for track in desired_subs: | |
if track.is_english(): | |
kept_types.append("English") | |
elif track.is_unlabeled(): | |
kept_types.append("Unlabeled/Undefined (fallback)") | |
else: | |
kept_types.append("Other") | |
kept_desc = ", ".join(set(kept_types)) | |
print(f" β Subtitles: All {len(all_subs)} tracks will be kept ({kept_desc})", flush=True) | |
else: | |
print(f" β Subtitles: All {len(all_subs)} tracks will be kept", flush=True) | |
print(flush=True) | |
except Exception as e: | |
print(f"β Error scanning {mkv_file}: {e}", flush=True) | |
if repack_candidates == 0: | |
print("β No files need repackaging - all files are already optimized!", flush=True) | |
else: | |
print(f"π Found {repack_candidates} files that can be repackaged", flush=True) | |
print("---------------------------------------------", flush=True) | |
def process_directory(directory: Path, dry_run: bool = False): | |
"""Process all MKV files in a directory""" | |
if not directory.is_dir(): | |
print(f"β Error: Directory '{directory}' does not exist", flush=True) | |
return | |
print(f"ποΈ Processing directory: {directory}", flush=True) | |
# Search recursively for MKV files | |
mkv_files = list(directory.rglob("*.mkv")) | |
# Skip _repack files | |
mkv_files = [f for f in mkv_files if not f.name.endswith("_repack.mkv")] | |
if not mkv_files: | |
print(f"βΉοΈ No MKV files found in: {directory}", flush=True) | |
print("---------------------------------------------", flush=True) | |
return | |
for mkv_file in mkv_files: | |
try: | |
mkv = MKVFile(mkv_file) | |
mkv.print_analysis() | |
mkv.repackage(dry_run=dry_run) | |
print("---------------------------------------------", flush=True) | |
except Exception as e: | |
print(f"β Error processing {mkv_file}: {e}", flush=True) | |
print("---------------------------------------------", flush=True) | |
def finalize_files(directory: Path): | |
"""Replace original files with their _repack versions""" | |
if not directory.is_dir(): | |
print(f"β Error: Directory '{directory}' does not exist", flush=True) | |
return | |
print(f"π Finalizing _repack.mkv files in: {directory}", flush=True) | |
# Search recursively for _repack.mkv files | |
repack_files = list(directory.rglob("*_repack.mkv")) | |
if not repack_files: | |
print(f"βΉοΈ No _repack.mkv files found in: {directory}", flush=True) | |
return | |
finalized_count = 0 | |
for repack_file in repack_files: | |
try: | |
# Determine the original filename by removing "_repack" | |
original_name = repack_file.name.replace("_repack.mkv", ".mkv") | |
original_file = repack_file.parent / original_name | |
if original_file.exists(): | |
# Replace original with repack version | |
print(f"π Replacing: {original_file.name} with {repack_file.name}", flush=True) | |
original_file.unlink() # Delete original | |
repack_file.rename(original_file) # Rename repack to original name | |
print(f"β Finalized: {original_file}", flush=True) | |
finalized_count += 1 | |
else: | |
# No original file found - just rename to remove _repack suffix | |
print(f"βΉοΈ No original found for: {repack_file.name}", flush=True) | |
print(f"π Renaming to remove _repack suffix", flush=True) | |
new_name = repack_file.parent / original_name | |
repack_file.rename(new_name) | |
print(f"β Renamed: {new_name}", flush=True) | |
finalized_count += 1 | |
except Exception as e: | |
print(f"β Failed to finalize {repack_file}: {e}", flush=True) | |
print(f"β Finalized {finalized_count} files", flush=True) | |
def clean_repack_files(directory: Path): | |
"""Remove all _repack.mkv files in a directory""" | |
if not directory.is_dir(): | |
print(f"β Error: Directory '{directory}' does not exist", flush=True) | |
return | |
print(f"π§Ή Cleaning _repack.mkv files in: {directory}", flush=True) | |
# Search recursively for _repack.mkv files | |
repack_files = list(directory.rglob("*_repack.mkv")) | |
if not repack_files: | |
print(f"βΉοΈ No _repack.mkv files found in: {directory}", flush=True) | |
return | |
for repack_file in repack_files: | |
try: | |
repack_file.unlink() | |
print(f"ποΈ Deleted: {repack_file}", flush=True) | |
except Exception as e: | |
print(f"β Failed to delete {repack_file}: {e}", flush=True) | |
print(f"β Cleaned {len(repack_files)} _repack.mkv files", flush=True) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Repackage MKV files to keep only desired English audio and subtitle tracks", | |
epilog=""" | |
Examples: | |
%(prog)s --scan . # Scan for repackaging opportunities | |
%(prog)s --dry . # Preview changes for current directory | |
%(prog)s "Season 17" # Process Season 17 directory | |
%(prog)s --dry "Season 17" "Movies" # Preview multiple directories | |
%(prog)s --clean . # Remove all _repack.mkv files | |
%(prog)s --finalize . # Replace originals with _repack versions | |
""", | |
formatter_class=argparse.RawDescriptionHelpFormatter | |
) | |
parser.add_argument("paths", nargs="*", | |
help="Directories to process (searches recursively for .mkv files)") | |
parser.add_argument("--scan", action="store_true", | |
help="Scan for repackaging opportunities - show only files that need changes") | |
parser.add_argument("--dry", action="store_true", | |
help="Dry run mode - show what would be done without making changes") | |
parser.add_argument("--clean", action="store_true", | |
help="Remove all *_repack.mkv files from specified directories") | |
parser.add_argument("--finalize", action="store_true", | |
help="Replace original files with _repack versions") | |
args = parser.parse_args() | |
# Show help when no arguments provided | |
if not args.paths: | |
parser.print_help() | |
print("\nβ Error: At least one directory path is required", flush=True) | |
return | |
if args.scan: | |
print("π Scan mode - looking for repackaging opportunities", flush=True) | |
print(flush=True) | |
for path_str in args.paths: | |
path = Path(path_str) | |
scan_directory(path) | |
print("π Scan complete!", flush=True) | |
return | |
if args.finalize: | |
print("π Finalize mode - replacing originals with _repack versions", flush=True) | |
for path_str in args.paths: | |
path = Path(path_str) | |
finalize_files(path) | |
print("---------------------------------------------", flush=True) | |
print("π Finalization complete!", flush=True) | |
return | |
if args.clean: | |
print("π§Ή Clean mode - removing _repack.mkv files", flush=True) | |
for path_str in args.paths: | |
path = Path(path_str) | |
clean_repack_files(path) | |
print("---------------------------------------------", flush=True) | |
print("π Cleaning complete!", flush=True) | |
return | |
if args.dry: | |
print("π Running in DRY RUN mode", flush=True) | |
print(flush=True) | |
for path_str in args.paths: | |
path = Path(path_str) | |
process_directory(path, dry_run=args.dry) | |
print("π Processing complete!", flush=True) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment