Last active
December 5, 2023 11:36
-
-
Save rokibhasansagar/5ec201911892ea691e504fe4a283d586 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Filename: zonefileInfuserZ.py | |
import argparse | |
import os | |
import json | |
import shutil | |
import time | |
import vapoursynth as vs | |
from vapoursynth import core | |
core.max_cache_size = 8*1024 | |
core.num_threads = 4 | |
VALID_METHODS = ['v2_zig', 'v2'] | |
# Parse arguments | |
parser = argparse.ArgumentParser(prog='VapourSynth zonefile Infuser using SSIMULACRA2 score', description='Calculate SSIMULACRA2 score and Rebuild zonefile in scene json file.') | |
parser.add_argument('source', help='Source video path. Can be relative to this script or a full path.') | |
parser.add_argument('encoded', help='Encoded video path. Can be relative to this script or a full path.') | |
parser.add_argument('-m', '--method', dest='method', choices=VALID_METHODS, default='v2_zig', help='SSIMULACRA method. Default: v2_zig.') | |
parser.add_argument('-c', '--crf', dest='crf', type=int, default=30, help='CRF Value for 1st Encoded Video to use. Default: 30.') | |
parser.add_argument('-w', '--width', dest='width', type=int, help='Video width to up/downscale to.') | |
parser.add_argument('-p', '--height', dest='height', type=int, help='Video height to up/downscale to.') | |
parser.add_argument('-v', '--variability', dest='variability', type=int, default=4, help='variability from CRF Value for new zonefile. Default: 4.') | |
parser.add_argument('-s', '--skip', dest='skip', type=int, default=1, help='Frames calculated every (n+1)th frame. Default: 1 (Every other frame is calculated). For example, setting this to 5 will calculate every 6th frame.') | |
args = parser.parse_args() | |
source: str = args.source | |
encoded: str = args.encoded | |
method: str = args.method | |
crf: int = args.crf | |
width: int = args.width | |
height: int = args.height | |
variability: int = args.variability | |
skip: int = args.skip | |
def getscenes(): | |
# Loads sc.json file into json dictionary | |
with open("sc.json") as file: | |
c = file.read().replace("null", "0") | |
return json.loads(c) | |
def ssimu2compute(sourcefile, distortedfile, skip = 1): | |
# Computes SSIMULACRA2 score | |
src = core.lsmas.LWLibavSource(sourcefile, threads=0, cache=0) | |
dis = core.lsmas.LWLibavSource(distortedfile, threads=0, cache=0) | |
if (skip > 0): | |
src = src.std.SelectEvery(cycle=skip + 1, offsets=0) | |
dis = dis.std.SelectEvery(cycle=skip + 1, offsets=0) | |
# Adjust format and color space for selected method | |
if (method == 'v2'): | |
# Only works with RGB24 format. | |
src = src.resize.Bicubic(width=width, height=height, format=vs.RGB24, matrix_in_s='709') | |
dis = dis.resize.Bicubic(width=width, height=height, format=vs.RGB24, matrix_in_s='709') | |
elif (method == 'v2_zig'): | |
# Only works with RGBS format. | |
src = src.resize.Bicubic(width=width, height=height, format=vs.RGBS, matrix_in_s='709') | |
dis = dis.resize.Bicubic(width=width, height=height, format=vs.RGBS, matrix_in_s='709') | |
# Must be converted from gamma to linear with fmtc because resize/zimg uses another formula. | |
src = src.fmtc.transfer(transs="srgb", transd="linear", bits=32) | |
dis = dis.fmtc.transfer(transs="srgb", transd="linear", bits=32) | |
else: | |
# Invalid method | |
print(f'Invalid method: {method}') | |
exit(1) | |
result = src.julek.SSIMULACRA(dis, feature=0) if method == 'v2' else src.ssimulacra2.SSIMULACRA2(dis) | |
res = [[ind*(skip+1), fr.props["_SSIMULACRA2"]] for (ind, fr) in enumerate(result.frames())] | |
return [k for k in res if k[1] > 0] | |
def statistics(intlist): | |
# Returns (avg, deviation, median, 5th percentile, 95th percentile) | |
avg = sum(intlist)/len(intlist) | |
deviation = ((sum([k*k for k in intlist])/len(intlist)) - avg*avg)**0.5 | |
sortedlist = sorted(intlist) | |
return (avg, deviation, sortedlist[len(intlist)//2], sortedlist[len(intlist)//20], sortedlist[19*len(intlist)//20]) | |
def cutssimu2byscene(scores, scenes): | |
# Separates SSIMULACRA2 score into per-scene basis | |
scoreind = 0 | |
res = [] | |
print(f'len(scores) = {len(scores)}\n') | |
for scene in scenes["scenes"]: | |
# print(f'Scene Data: {scene}') | |
res.append([]) | |
while (scoreind < len(scores) and scores[scoreind][0] < scene["end_frame"]): | |
if (scores[scoreind][0] >= scene["start_frame"]): | |
res[-1].append(scores[scoreind][1]) | |
# print(f'Frame, Score = [{scores[scoreind][0]}, {scores[scoreind][1]}]') | |
else: | |
print("There is an error somewhere in scenes or in scores") | |
scoreind += 1 | |
return res | |
def generatezonefile(scores, oldcrf, variability = 8): | |
# Re-creates scene json file to infuse optimized zonefile | |
if not os.path.isfile("sc.json"): | |
print("Error for generating zone file, no scene file") | |
return | |
scenes = getscenes() | |
# print("Original scene json fetched") | |
cutssimu2 = cutssimu2byscene(scores, scenes) | |
# To see if scoring is working or not... | |
print(f"\nMean SSIMU2 cut per-scene: {cutssimu2}") | |
# Save the per-scene frame score in a file if every file is measured | |
if (skip == 0): | |
with open("scorefile.txt", "a") as scorefile: | |
scorefile.write(str(f'{cutssimu2}')+"\n") | |
oldstats = statistics([k[1] for k in scores]) # (mean, standard deviation, median, 5th percentile, 95th percentile) | |
# print(f"Stats generated") | |
for sceneind in range(len(cutssimu2)): | |
if cutssimu2[sceneind] == []: | |
cutssimu2[sceneind].append(oldstats[0]) | |
print(f"Warning: scene {sceneind}/{len(cutssimu2)} not having any ssimu2 value: ", scenes["scenes"][sceneind]["start_frame"], scenes["scenes"][sceneind]["end_frame"]) | |
means = [oldstats[0] if len(scene) == 0 else sum(scene)/len(scene) for scene in cutssimu2] # Empty parts of cutssimu2 results error here | |
devs = [(mean - oldstats[0])/oldstats[1] for mean in means] | |
sdevs = sorted(devs) | |
for sceneind in range(len(scenes["scenes"])): | |
newcrf = str(min(63, max(0,oldcrf+round(variability/4+max(-variability,(devs[sceneind]/max(abs(sdevs[99*len(sdevs)//100]), abs(sdevs[1*len(sdevs)//100])))*variability))))) | |
with open("zonefile.txt", "a") as zonefile: | |
z = str(f'{scenes["scenes"][sceneind]["start_frame"]} {scenes["scenes"][sceneind]["end_frame"]} aom --cq-level={newcrf}') | |
# print(z) | |
zonefile.write(z+"\n") | |
scenes["scenes"][sceneind]["zone_overrides"] = {"encoder":"aom","passes":2,"min_scene_len":42,"video_params":["--cq-level="+newcrf]} | |
res = json.dumps(scenes) | |
with open("sc.json", "w") as file: | |
file.write(res) | |
# Still unused | |
def resetzonefile(): | |
# Resets scene json file to original state | |
scenes = getscenes() | |
for sceneind in range(len(scenes["scenes"])): | |
scenes["scenes"][sceneind]["zone_overrides"] = "null" | |
res = json.dumps(scenes) | |
res = res.replace('"null"', "null") | |
with open("sc.json", "w") as file: | |
file.write(res) | |
def operation(source = source, encoded = encoded, crf = crf, variability = "auto", skip = skip): | |
if not os.path.isfile(source): | |
print("Couldnt find ", source) | |
return | |
scores1 = ssimu2compute(source, encoded, skip=skip) | |
print("SSIMU2 computation done") | |
# print(f'scores1: {scores1}') | |
stats1 = statistics([k[1] for k in scores1]) | |
with open("scoreresult.txt", "a") as scoreresult: | |
scoreresult.write(str(f'{stats1}')) | |
print(f"\nFirst scores: {stats1}\n") | |
if variability == "auto": | |
variability = 15*crf/30*stats1[1]/8 | |
print(f'variability is now {variability}\n') | |
generatezonefile(scores1, crf, variability = variability) | |
print("\nRun Second Pass Now") | |
# Run it in place | |
operation(source = source, encoded = encoded, crf = crf, variability = variability, skip = skip) | |
# Finish |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment