Last active
December 31, 2021 13:46
-
-
Save LiEnby/3c9c1053e4abe832865b2d23adaeccdd to your computer and use it in GitHub Desktop.
ps3 title update bruteforcer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3.9 | |
# PS3 Update Bruteforcer | |
# This script can find all updates on any given Bucket eg: (T2) and Bucket Secret eg: (f4f12ef10e3864dd) | |
# Created by SilicaAndPina | |
# Public Domain | |
import requests | |
import threading | |
import sys | |
import os | |
from lxml import etree | |
from threading import Thread | |
# Examples: | |
#http://b0.ww.np.dl.playstation.net/tppkg/np/NPUB31419/NPUB31419_T2/f4f12ef10e3864dd/UP4433-NPUB31419_00-MINECRAFTPS30295-A0102-V0100-PE.pkg | |
#http://b0.ww.np.dl.playstation.net/tppkg/np/BLES01976/BLES01976_T32/3619462fffd770bf/EP4433-BLES01976_00-MINECRAFTPS30363-A0115-V0100-PE.pkg | |
global MAX_TRY | |
global ENV | |
global UPDATECDN_DOMAIN | |
global UPDATEXMLCDN_DOMAIN | |
global TPPKG_URL | |
global TPL_URL | |
global EXT | |
global TID | |
global BUCKET_ID | |
global BUCKET_SECRET | |
global CID | |
global APP_VERSION | |
global VERSION | |
global FLAG | |
global LATEST | |
global LATEST_BUCKET | |
global LVER | |
global VERSIONS_LOG | |
global BUCKET_LOG | |
global EXISTING_VERSIONS | |
global EXISTING_BUCKETS | |
ENV = "np" # np environment (np, sp-int, mgmt, etc) | |
UPDATECDN_DOMAIN = "b0.ww."+ENV+".dl.playstation.net" # Download CDN domain | |
UPDATEXMLCDN_DOMAIN = "a0.ww."+ENV+".dl.playstation.net" | |
TPL_URL = "https://"+UPDATEXMLCDN_DOMAIN+"/tpl/"+ENV+"/" # tpL path | |
TPPKG_URL = "http://"+UPDATECDN_DOMAIN+"/tppkg/"+ENV+"/" # tpPKG path | |
LVER = "ver" # titleid-ver.xml | |
EXT = ".pkg" # PACKAGES File extension | |
EXTXML = ".xml" # XML File extension | |
MAX_TRY = 30 # Maximum (not app version) to try. | |
VERSIONS_LOG = open("version.log", "a") | |
BUCKET_LOG = open("bucket.log", "a") | |
EXISTING_VERSIONS = [] | |
EXISTING_BUCKETS = [] | |
s = requests.Session() | |
sem = threading.BoundedSemaphore(1000) | |
# Read our known good URL and extract the various parts from it | |
def read_url(url): | |
global EXT | |
global TID | |
global BUCKET_ID | |
global BUCKET_SECRET | |
global CID | |
global APP_VERSION | |
global VERSION | |
global FLAG | |
global LATEST | |
global LATEST_BUCKET | |
parts = url.split("/") | |
TID = parts[5] | |
BUCKET_ID = parts[6].split("_")[1] | |
BUCKET_SECRET = parts[7] | |
CID = parts[8][:36] | |
APP_VERSION = parts[8][37:][:5] | |
VERSION = parts[8][43:][:5] | |
FLAG = parts[8][49:].split(".")[0] | |
LATEST = APP_VERSION | |
LATEST_BUCKET = BUCKET_ID | |
# Brute force all buckets by sending a request to all posibilities | |
# 403 = Folder exists | |
# 404 = Folder does not exist | |
def find_buckets(): | |
global EXISTING_BUCKETS | |
global BUCKET_LOG | |
threads = [] | |
for cur_bucket in range(0, int(LATEST_BUCKET[1:])+1): | |
t = Thread(target=try_bucket,args=(cur_bucket,)) | |
threads.append(t) | |
for thread in threads: | |
sem.acquire(True) | |
thread.start() | |
for thread in threads: | |
thread.join() | |
threads.clear() | |
BUCKET_LOG.close() | |
return EXISTING_BUCKETS | |
def try_bucket(bucket_id): | |
while True: | |
try: | |
try_bucket_id = "T"+str(bucket_id) | |
bucket_url = build_bucket_url(try_bucket_id) | |
success = try_bucket_url(bucket_url) | |
if success: | |
EXISTING_BUCKETS.append(bucket_url) | |
BUCKET_LOG.write(bucket_url + "\r\n") | |
BUCKET_LOG.flush() | |
sem.release() | |
return True | |
except KeyboardInterrupt: | |
os._quit(1) | |
except Exception: | |
continue | |
def try_bucket_url(bucket_url): | |
r = s.head(bucket_url) | |
if r.status_code == 403: | |
return True | |
elif r.status_code == 404: | |
return False | |
else: | |
print("Unknown status code: "+str(r.status_code)) | |
return False | |
def try_pkg_url(pkg_url): | |
r = s.head(pkg_url) | |
if r.status_code == 200: | |
return True | |
elif r.status_code == 404: | |
return False | |
else: | |
print("Unknown status code: "+str(r.status_code)) | |
return False | |
def try_version(first_version, cur_app_version): | |
global EXISTING_VERSIONS | |
try_app_version = "A"+str(cur_app_version).zfill(4) | |
for cur_version in range(first_version, first_version+MAX_TRY): | |
try: | |
try_version = "V"+str(cur_version).zfill(4) | |
pkg_url = build_pkg_url(try_app_version, try_version) | |
success = try_pkg_url(pkg_url) | |
if success: | |
EXISTING_VERSIONS.append(pkg_url) | |
VERSIONS_LOG.write(pkg_url+"\r\n") | |
VERSIONS_LOG.flush() | |
sem.release() | |
return True | |
except KeyboardInterrupt: | |
os._quit(1) | |
except Exception: | |
continue | |
sem.release() | |
return False | |
# Bruteforce all app versions | |
# on any given bucket and bucket secret. | |
def find_app_versions(): | |
global VERSIONS_LOG | |
global EXISTING_VERSIONS | |
global LATEST | |
found_success = False | |
first_version = 100 # Might be diff on other games | |
exiting = False | |
threads = [] | |
for cur_app_version in range(first_version,LATEST+1): | |
t = Thread(target=try_version,args=(first_version, cur_app_version,)) | |
threads.append(t) | |
for thread in threads: | |
sem.acquire(True) | |
thread.start() | |
for thread in threads: | |
thread.join() | |
threads.clear() | |
VERSIONS_LOG.close() | |
return EXISTING_VERSIONS | |
def get_latest_version(): | |
update_xml_url = build_updatexml_url(TID) | |
r = s.get(update_xml_url, verify=False) | |
tree = etree.fromstring(r.content) | |
packages = tree.xpath("/titlepatch/tag/package") | |
highest_version = 0 | |
for package in packages: | |
cur_version = int(package.attrib['version'].replace(".", "")) | |
if cur_version > highest_version: | |
highest_version = cur_version | |
latest_bucket = tree.xpath("/titlepatch/tag")[0].attrib["name"].split("_")[1] | |
return latest_bucket, highest_version | |
def build_updatexml_url(titleid): | |
return TPL_URL + titleid + "/" + titleid + "-"+LVER + EXTXML | |
def build_bucket_url(bucket_id): | |
return TPPKG_URL +TID+"/"+TID+"_"+bucket_id+"/" | |
def build_bucket_url_with_secret(): | |
return build_bucket_url(BUCKET_ID)+BUCKET_SECRET+"/" | |
def build_pkg_url(app_version, version): | |
return build_bucket_url_with_secret()+CID+"-"+app_version+"-"+version+"-"+FLAG+EXT | |
if __name__ == "__main__": | |
if len(sys.argv) <= 1: | |
known_good = input("Enter known Title Update URL: ") | |
else: | |
known_good = sys.argv[1] | |
if not known_good.startswith(TPPKG_URL): | |
print("Invalid URL.") | |
os._exit(1) | |
requests.packages.urllib3.disable_warnings() | |
print("Parsing URL...", end="", flush=True) | |
try: | |
read_url(known_good) | |
print("OK") | |
except: | |
print("FAIL - Could not parse") | |
os._exit(1) | |
print("Testing Secret...", end="", flush=True) | |
r = s.head(build_bucket_url_with_secret()) | |
if r.status_code == 403: | |
print("OK") | |
else: | |
print("FAIL - Invalid secret") | |
os._exit(1) | |
print("Env: "+ENV) | |
print("Title ID: "+TID) | |
print("Bucket ID: "+BUCKET_ID) | |
print("Bucket Secret: "+BUCKET_SECRET) | |
print("Content ID: "+CID) | |
print("App Version: "+APP_VERSION) | |
print("Version: "+VERSION) | |
print("Flag: "+FLAG) | |
print("Finding latest version...", end="", flush=True) | |
LATEST_BUCKET, LATEST = get_latest_version() | |
print("OK (A"+str(LATEST).zfill(4)+", "+LATEST_BUCKET+")") | |
print("Enumerating app versions...", end="", flush=True) | |
versions = find_app_versions() | |
print("OK") | |
print("Enumerating buckets...", end="", flush=True) | |
buckets = find_buckets() | |
print("OK") | |
print("-- RESULTS --") | |
print("Found buckets: ") | |
for bucket in buckets: | |
print("\t"+bucket) | |
print("Found versions: ") | |
for version in versions: | |
print("\t"+version) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment