Created
December 13, 2024 16:15
-
-
Save ymgve/b377f56534dfe5c7657853ac3381526c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cStringIO, struct, zipfile, base64, time, binascii, hashlib, urllib, urllib2, urllib3, requests | |
from steamlib import crypto | |
from steamlib.vdfparser import * | |
from steamlib.protobuf import * | |
try: | |
import psyco; psyco.full() | |
except ImportError: | |
pass | |
def readcstring(sio): | |
s = "" | |
while True: | |
c = sio.read(1) | |
if len(c) == 0: | |
return None | |
if c == "\0": | |
return s | |
s += c | |
class CDNFile(object): | |
def __init__(self): | |
pass | |
class CDNChunk(object): | |
def __init__(self): | |
pass | |
class CDNManifest(object): | |
def __init__(self, compressed_manifest, key = None): | |
self.key = key | |
sio = cStringIO.StringIO(compressed_manifest) | |
zf = zipfile.ZipFile(sio, "r") | |
zipnames = zf.namelist() | |
if len(zipnames) != 1 or zipnames[0] not in ("z", "zip"): | |
print "NEW ZIP NAMES", repr(zipnames) | |
manifest = zf.read(zipnames[0]) | |
zf.close() | |
sio.close() | |
sio = cStringIO.StringIO(manifest) | |
self.magic = sio.read(4) | |
if self.magic == "\x81\x97\x34\x16": | |
self.read_version4(sio) | |
elif self.magic == "\xd0\x17\xf6\x71": | |
self.read_version5(sio) | |
else: | |
open("WTFmanifest.bin", "wb").write(compressed_manifest) | |
raise Exception("BAD MAGIC") | |
def read_version5(self, sio): | |
payloadsize = struct.unpack("<L", sio.read(4))[0] | |
bin_payload = sio.read(payloadsize) | |
self.magic2 = sio.read(4) | |
if self.magic2 != "\xbe\x12\x48\x1f": | |
open("WTFmanifest.bin", "wb").write(compressed_manifest) | |
raise Exception("BAD proto MAGIC2") | |
metadatasize = struct.unpack("<L", sio.read(4))[0] | |
bin_metadata = sio.read(metadatasize) | |
self.magic3 = sio.read(4) | |
if self.magic3 != "\x17\xb8\x81\x1b": | |
open("WTFmanifest.bin", "wb").write(compressed_manifest) | |
raise Exception("BAD proto MAGIC3") | |
signaturesize = struct.unpack("<L", sio.read(4))[0] | |
bin_signature = sio.read(signaturesize) | |
self.magic4 = sio.read(4) | |
if self.magic4 != "\xab\x15\xc4\x32": | |
open("WTFmanifest.bin", "wb").write(compressed_manifest) | |
raise Exception("BAD proto MAGIC4") | |
if len(sio.read()) != 0: | |
raise Exception("Extra data!!") | |
proto = ProtoDeserializer(bin_metadata).deserialize() | |
self.version = 5 | |
self.depotid = proto[1] | |
self.manifestid = proto[2] | |
self.createtime = proto[3] | |
self.are_fnames_enc = proto[4] | |
self.uncompressedsize = proto[5] | |
self.compressedsize = proto[6] | |
self.chunkcount = proto[7] | |
self.encryptedcrc = proto[8] | |
self.decryptedcrc = proto[9] | |
self.files = [] | |
proto = ProtoDeserializer(bin_payload).deserialize(False, [1]) | |
for bin_filemapping in proto[1]: | |
file = CDNFile() | |
proto2 = ProtoDeserializer(bin_filemapping).deserialize(False, [6]) | |
file.filename = proto2[1] | |
if self.key: | |
file.filename = crypto.aes_decrypt_stream(self.key, base64.b64decode(file.filename)) | |
file.unpackedsize = proto2[2] | |
file.flags = proto2[3] | |
file.hashfilename = proto2[4] | |
file.hashcontent = proto2[5] | |
file.chunks = [] | |
if 6 not in proto2: | |
file.numchunks = 0 | |
else: | |
file.numchunks = len(proto2[6]) | |
for bin_chunkdata in proto2[6]: | |
chunk = CDNChunk() | |
proto3 = ProtoDeserializer(bin_chunkdata).deserialize() | |
chunk.chunkid = binascii.b2a_hex(proto3[1]) | |
chunk.chunkcrc = struct.pack("<L", proto3[2]) | |
chunk.offset = proto3[3] | |
chunk.unpackedsize = proto3[4] | |
chunk.packedsize = proto3[5] | |
file.chunks.append(chunk) | |
self.files.append(file) | |
def read_version4(self, sio): | |
self.version = struct.unpack("<L", sio.read(4))[0] | |
self.depotid = struct.unpack("<L", sio.read(4))[0] | |
self.manifestid = struct.unpack("<Q", sio.read(8))[0] | |
self.createtime = struct.unpack("<L", sio.read(4))[0] | |
self.are_fnames_enc = struct.unpack("<L", sio.read(4))[0] | |
self.uncompressedsize = struct.unpack("<Q", sio.read(8))[0] | |
self.compressedsize = struct.unpack("<Q", sio.read(8))[0] | |
self.chunkcount = struct.unpack("<L", sio.read(4))[0] | |
self.fileentrycount = struct.unpack("<L", sio.read(4))[0] | |
self.filemappingsize = struct.unpack("<L", sio.read(4))[0] | |
self.encryptedcrc = struct.unpack("<L", sio.read(4))[0] | |
self.decryptedcrc = struct.unpack("<L", sio.read(4))[0] | |
self.flags = struct.unpack("<L", sio.read(4))[0] | |
self.files = [] | |
for i in xrange(self.fileentrycount): | |
file = CDNFile() | |
file.filename = readcstring(sio) | |
if self.key: | |
file.filename = crypto.aes_decrypt_stream(self.key, base64.b64decode(file.filename)) | |
file.unpackedsize = struct.unpack("<Q", sio.read(8))[0] | |
file.flags = struct.unpack("<L", sio.read(4))[0] | |
file.hashcontent = binascii.b2a_hex(sio.read(20)) | |
file.hashfilename = binascii.b2a_hex(sio.read(20)) | |
file.numchunks = struct.unpack("<L", sio.read(4))[0] | |
file.chunks = [] | |
for i in xrange(file.numchunks): | |
chunk = CDNChunk() | |
chunk.chunkid = binascii.b2a_hex(sio.read(20)) | |
chunk.chunkcrc = sio.read(4) | |
chunk.offset = struct.unpack("<Q", sio.read(8))[0] | |
chunk.unpackedsize = struct.unpack("<L", sio.read(4))[0] | |
chunk.packedsize = struct.unpack("<L", sio.read(4))[0] | |
file.chunks.append(chunk) | |
self.files.append(file) | |
self.magic2 = sio.read(4) | |
if self.magic2 != "\x81\x97\x34\x16": | |
raise Exception("BAD MAGIC2") | |
class CDNDownloader(object): | |
def __init__(self, server, appticket): | |
self.server = server | |
self.appticket = appticket | |
self.session = requests.Session() | |
self.token = "?token=exp=1416061768~acl=/depot/731/*~hmac=f0351af0b9a3f84ce94fc7489699781750cab1f9" | |
print "NEW CDN TEST" | |
def get_manifest(self, depotid, manifestid, version = 4): | |
absolutepath = "/depot/%d/manifest/%d/%d%s" % (depotid, manifestid & 0xffffffffffffffff, version, self.token) | |
print absolutepath | |
res = self.session.get("http://" + self.server + absolutepath) | |
manifest = res.content | |
return manifest | |
def get_chunk(self, depotid, chunkid): | |
absolutepath = "/depot/%d/chunk/%s%s" % (depotid, chunkid, self.token) | |
url = "http://" + self.server + absolutepath | |
res = self.session.get(url) | |
chunk = res.content | |
return chunk |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment