Last active
June 1, 2021 13:31
-
-
Save peolic/e9048803a905f14b51e8d14479f820bd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import hashlib | |
import re | |
import sys | |
import traceback | |
from datetime import datetime, timedelta, timezone | |
from pathlib import Path | |
from typing import cast, Optional | |
import requests | |
try: | |
from packaging.version import LegacyVersion | |
except ImportError: | |
from setuptools._vendor.packaging.version import LegacyVersion | |
here = Path(__file__).parent.resolve() | |
STASH_EXE: Path = here / 'stash-win.exe' | |
STASH_EXE_NEW: Path = here / 'stash-win-new.exe' | |
def get_asset(tag_name: str): | |
resp = requests.get(f'https://api.github.com/repos/stashapp/stash/releases/tags/{tag_name}') | |
data = resp.json() | |
if 'assets' not in data: | |
print(f'tag {tag_name!r} not found') | |
return None, None, None | |
try: | |
asset = next( | |
asset | |
for asset in data['assets'] | |
if asset['name'] == 'stash-win.exe' | |
# if asset['content_type'] == 'application/x-dosexec' | |
) | |
except StopIteration: | |
print('correct asset not found') | |
return None, None, None | |
else: | |
version = data['name'] | |
checksums_url = next((a['browser_download_url'] for a in data['assets'] if a['name'] == 'CHECKSUMS_SHA1'), None) | |
return asset, checksums_url, version | |
def download_asset(asset: dict, filename: str): | |
url = asset['browser_download_url'] | |
size = asset['size'] | |
cur_size = 0 | |
print('downloading asset... ', end='') | |
with requests.get(url, stream=True) as r: | |
r.raise_for_status() | |
with open(filename, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
cur_size += len(chunk) | |
done = int((cur_size / size) * 100) | |
sys.stdout.write(f'\rdownloading asset... {done}%') | |
sys.stdout.flush() | |
sys.stdout.write('\n') | |
sys.stdout.flush() | |
def human_timedelta(td: timedelta) -> str: | |
mm, ss = divmod(td.seconds, 60) | |
hh, mm = divmod(mm, 60) | |
s = '%dh %02dm %02ds' % (hh, mm, ss) | |
if td.days: | |
s = '%dd %s' % (td.days, s) | |
return s | |
def get_current_version(file_path: Path) -> str: | |
# version is around 75% the way into the file | |
size = file_path.stat().st_size | |
skip_to = int(size * 0.75) | |
with file_path.open('rb') as fh: | |
# Jump ahead | |
fh.seek(skip_to) | |
data = fh.read() | |
match = re.search(rb""" | |
\0(v | |
# base version | |
\d+\.\d+\.\d+ | |
# dev & git hash | |
(?:-(\d+-g[a-f\d]+))? | |
)\0 | |
""", data, flags=re.VERBOSE) | |
if not match: | |
raise ValueError('Current version not found in file!') | |
return match.group(1).decode('ascii') | |
def patch_disable_freeones(file_path: Path) -> bool: | |
try: | |
data = bytearray(file_path.read_bytes()) | |
# Find scraper definition start and end | |
scraper_start_pattern = re.compile(rb'\nname: Freeones', re.I) | |
scraper_end_pattern = re.compile(rb'# Last updated .+\n', re.I) | |
try: | |
scraper_start = scraper_start_pattern.search(cast(bytes, data)).start() | |
scraper_end = scraper_end_pattern.search(cast(bytes, data), scraper_start).end() | |
except AttributeError: | |
raise ValueError('Could not find scraper') | |
start_pattern = re.compile(rb'performerByName:\n') | |
end_pattern = re.compile(rb'\nxPathScrapers:\n') | |
try: | |
start = start_pattern.search(cast(bytes, data), scraper_start, scraper_end).start() | |
end = end_pattern.search(cast(bytes, data), scraper_start, scraper_end).start() | |
except AttributeError: | |
# Is it already patched? | |
if b'# builtin_freeones_disabled' in data[scraper_start:scraper_end]: | |
print('Already patched: disabled builtin_freeones') | |
return True | |
raise ValueError('Could not find section') | |
# Replace with spaces (keep LF) | |
new_data = bytearray( | |
c if c == 0x0A else 0x20 | |
for c in data[start:end] | |
) | |
# Insert a marker | |
marker = bytearray(b'# builtin_freeones_disabled\n') | |
new_data[:len(marker)] = marker | |
orig_size = len(data) | |
data[start:end] = new_data | |
new_size = len(data) | |
if new_size != orig_size: | |
raise ValueError(f'Sizes differ! {new_size=} != {orig_size=}') | |
file_path.write_bytes(bytes(data)) | |
print('patched: disabled builtin_freeones') | |
except Exception: | |
traceback.print_exc() | |
print('=== /\\ === Patch error === /\\ ===') | |
return False | |
return True | |
def get_checksum(checksums_url: Optional[str], asset_name: str) -> Optional[str]: | |
if checksums_url: | |
try: | |
resp = requests.get(checksums_url) | |
data = resp.text.splitlines() | |
return next( | |
(cs.split(' ')[0] for cs in data if cs.endswith(asset_name)), | |
None | |
) | |
except StopIteration: | |
pass | |
except Exception as error: | |
print(error) | |
def sha1_hash(path: Path) -> str: | |
h = hashlib.sha1() | |
b = bytearray(128*1024) | |
mv = memoryview(b) | |
with path.open('rb', buffering=0) as f: | |
for n in iter(lambda: f.readinto(mv), 0): | |
h.update(mv[:n]) | |
return h.hexdigest() | |
def main(args = sys.argv[1:]): | |
if args: | |
tag_name = args[0] | |
else: | |
tag_name = 'latest_develop' | |
print(f'getting asset `{tag_name}`') | |
asset, checksums_url, version_str = get_asset(tag_name) | |
if not asset: | |
print('Failed getting asset data') | |
return | |
if STASH_EXE.is_file(): | |
date_fmt = '%Y-%m-%d %H:%M:%S' | |
now = datetime.now().astimezone().replace(microsecond=0) | |
parsed_date_current = datetime.fromtimestamp(STASH_EXE.stat().st_mtime, tz=now.tzinfo).replace(microsecond=0) | |
parsed_date_new = datetime.fromisoformat(asset["updated_at"].rstrip('Z')).replace(tzinfo=timezone.utc).astimezone() | |
has_new_version = parsed_date_new > parsed_date_current | |
try: | |
current_version = get_current_version(STASH_EXE) | |
asset_version = version_str.partition(':')[0] # "v0.3.0-80-gabf0281: Latest development build" | |
has_new_version = LegacyVersion(asset_version) > LegacyVersion(current_version) # type: ignore | |
except Exception: | |
current_version = None | |
asset_version = None | |
version_check = current_version and asset_version | |
if version_check: | |
print() | |
print(f'Current version: {current_version}') | |
print(f'Remote version: {asset_version}') | |
print() | |
print(f'Asset last updated {human_timedelta(now - parsed_date_new)} ago (at {parsed_date_new.strftime(date_fmt)})') | |
if has_new_version: | |
if version_check: | |
print(f'New version {asset_version} available (current: {current_version})') | |
else: | |
print(f'New version available (current: {parsed_date_current.strftime(date_fmt)})') | |
else: | |
if version_check: | |
print(f'Up-to-date (current: {current_version})') | |
else: | |
print(f'Up-to-date (current: {parsed_date_current.strftime(date_fmt)})') | |
return True | |
# print('Downloading asset') | |
try: | |
download_asset(asset, str(STASH_EXE_NEW)) | |
except Exception: | |
print('Failed downloading asset') | |
raise | |
checksum = get_checksum(checksums_url, asset['name']) | |
if checksum: | |
actual_checksum = sha1_hash(STASH_EXE_NEW) | |
if actual_checksum != checksum: | |
print('Asset failed checksum validation, not replacing executable!') | |
return False | |
else: | |
print('Asset passed checksum validation.') | |
else: | |
print('Cannot verify checksum') | |
print('applying patches...') | |
patch_disable_freeones(STASH_EXE_NEW) | |
print('replacing executable...') | |
STASH_EXE_NEW.replace(STASH_EXE) | |
return True | |
if __name__ == '__main__': | |
if 'patch' in sys.argv[1:]: | |
patch_disable_freeones(STASH_EXE) | |
exit(0) | |
result = None | |
try: | |
result = main() | |
finally: | |
if result: | |
# print('Closing window in 5 seconds.') | |
# sleep(5) | |
exit(0) | |
input('Press ENTER to continue...') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment