Skip to content

Instantly share code, notes, and snippets.

@ymgve
Created February 3, 2025 19:06
Show Gist options
  • Save ymgve/77776476ac91814563ff56ae9c6c7ab0 to your computer and use it in GitHub Desktop.
Save ymgve/77776476ac91814563ff56ae9c6c7ab0 to your computer and use it in GitHub Desktop.
import io, struct
def decode_varint(bio):
n = 0
shift = 0
while True:
c = bio.read(1)[0]
n |= (c & 0x7f) << shift
if c & 0x80 == 0:
return n
shift += 7
def deserialize(b, packed=[], want_types=False):
end = len(b)
bio = io.BytesIO(b)
res = {}
while bio.tell() < end:
n = decode_varint(bio)
ftype = n & 0x7
tag = n >> 3
if ftype == 0:
value = decode_varint(bio)
elif ftype == 1:
value = struct.unpack("<Q", bio.read(8))[0]
elif ftype == 2:
sz = decode_varint(bio)
value = bio.read(sz)
elif ftype == 5:
value = struct.unpack("<I", bio.read(4))[0]
else:
raise Exception("unknown ftype", ftype, "tag", tag)
if want_types:
value = (ftype, value)
if tag in res:
if type(res[tag]) != list:
res[tag] = [res[tag]]
res[tag].append(value)
else:
res[tag] = value
for key in res:
if key in packed and type(res[key]) != list:
res[key] = [res[key]]
return res
import base64, hashlib, io, struct, zipfile, zlib
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA1
from Crypto.PublicKey import RSA
from steamlib import protobuf
public_key_hex = "30819D300D06092A864886F70D010101050003818B0030818702818100F0A35BF1883A651F14738FCB702E69795F310AE71F22F37B3912654A1F0E88CF4451E477A149D3EB3468606FD097D60FFC06C975B6F99EA5394D9790F7F513F7C99788E9EF65986EEDA40D624CAF963C7D4B368CBA4C58A620E2FF9B5BCDDD0F4C1C6BE13CB1F9A33B775A873ED34137A4CBDAA49E95644B8FDB4AF35BABAA37020111"
pubkey = RSA.import_key(bytes.fromhex(public_key_hex))
def readcstring(bio):
res = bytearray()
while True:
c = bio.read(1)
if c == b"\x00":
return res
res += c
def read_with_sz(bio):
sz = struct.unpack("<I", bio.read(4))[0]
res = bio.read(sz)
if len(res) != sz:
raise Exception("not enough data")
return res
class Steam3File:
def __init__(self):
pass
class Steam3Chunk:
def __init__(self):
pass
class Steam3Manifest:
def __init__(self, manifdata_zipped, key=None, check_integrity=True):
self.key = key
self.check_integrity = check_integrity
if manifdata_zipped[0:4] == b"PK\x03\x04":
bio = io.BytesIO(manifdata_zipped)
zf = zipfile.ZipFile(bio)
zipnames = zf.namelist()
if len(zipnames) != 1 or zipnames[0] not in ("z", "zip"):
raise Exception("NEW ZIP NAMES", zipnames)
self.manifdata = zf.read(zipnames[0])
zf.close()
bio.close()
else:
self.manifdata = manifdata_zipped
bio = io.BytesIO(self.manifdata)
self.magic = bio.read(4)
if self.magic == b"\x81\x97\x34\x16":
self.version = 4
self.read_version4(bio)
elif self.magic == b"\xd0\x17\xf6\x71":
self.version = 5
self.read_version5(bio)
else:
raise Exception("BAD MAGIC", self.magic.hex())
def read_version4(self, bio):
(
self.version2,
self.depotid,
self.manifestid,
self.createtime,
self.are_fnames_enc,
self.uncompressedsize,
self.compressedsize,
self.chunkcount,
self.fileentrycount,
self.filemappingsize,
self.encryptedcrc,
self.decryptedcrc,
self.flags,
) = struct.unpack("<IIQIIQQIIIIII", bio.read(64))
if self.version != self.version2:
raise Exception()
if self.flags != 42:
raise Exception()
if self.filemappingsize != len(self.manifdata) - 72:
raise Exception("bad size")
if self.check_integrity:
if self.encryptedcrc != zlib.crc32(self.manifdata[68:-4]):
raise Exception("CRC check failed")
self.files = []
for i in range(self.fileentrycount):
file = Steam3File()
file.filename = readcstring(bio)
(
file.unpackedsize,
file.flags,
file.hashcontent,
file.hashfilename,
file.numchunks,
) = struct.unpack("<QI20s20sI", bio.read(56))
file.chunks = []
for i in range(file.numchunks):
chunk = Steam3Chunk()
(
chunk.chunkid,
chunk.chunkcrc,
chunk.offset,
chunk.unpackedsize,
chunk.packedsize,
) = struct.unpack("<20sIQII", bio.read(40))
file.chunks.append(chunk)
self.files.append(file)
self.magic2 = bio.read(4)
if self.magic2 != b"\x81\x97\x34\x16":
raise Exception("BAD MAGIC2")
res = bio.read()
if res != b"":
raise Exception("DATA AT END OF MANIFES")
def read_version5(self, bio):
bin_filedata = read_with_sz(bio)
if self.check_integrity:
filedatacrc = zlib.crc32(self.manifdata[4:bio.tell()])
self.magic2 = bio.read(4)
if self.magic2 != b"\xbe\x12\x48\x1f":
raise Exception("BAD proto MAGIC2")
bin_metadata = read_with_sz(bio)
if self.check_integrity:
signeddata = self.manifdata[:bio.tell()]
self.magic3 = bio.read(4)
if self.magic3 != b"\x17\xb8\x81\x1b":
raise Exception("BAD proto MAGIC3")
bin_signature = read_with_sz(bio)
self.magic4 = bio.read(4)
if self.magic4 != b"\xab\x15\xc4\x32":
raise Exception("BAD proto MAGIC4")
res = bio.read()
if res != b"":
raise Exception("DATA AT END OF MANIFES")
proto = protobuf.deserialize(bin_metadata)
if self.check_integrity:
for key in proto:
if key not in (1, 2, 3, 4, 5, 6, 7, 8, 9):
raise Exception("unknown key", key)
self.version = 5
self.depotid = proto[1]
self.manifestid = proto[2]
self.createtime = proto[3]
self.are_fnames_enc = proto[4]
self.uncompressedsize = proto[5]
self.compressedsize = proto[6]
self.chunkcount = proto[7]
self.encryptedcrc = proto[8]
self.decryptedcrc = proto[9]
self.files = []
proto = protobuf.deserialize(bin_filedata, [1])
if self.check_integrity:
for key in proto:
if key not in (1,):
raise Exception("unknown key", key)
for bin_filemapping in proto[1]:
proto2 = protobuf.deserialize(bin_filemapping, [6])
if self.check_integrity:
for key in proto2:
if key not in (1, 2, 3, 4, 5, 6, 7):
raise Exception("unknown key", key)
file = Steam3File()
file.filename = proto2[1]
file.unpackedsize = proto2[2]
file.flags = proto2[3]
file.hashfilename = proto2[4]
file.hashcontent = proto2[5]
# linux symlinks?
if 7 in proto2:
file.linktarget = proto2[7]
file.chunks = []
if 6 not in proto2:
file.numchunks = 0
else:
file.numchunks = len(proto2[6])
for bin_chunkdata in proto2[6]:
proto3 = protobuf.deserialize(bin_chunkdata)
if self.check_integrity:
for key in proto3:
if key not in (1, 2, 3, 4, 5):
raise Exception("unknown key", key)
chunk = Steam3Chunk()
chunk.chunkid = proto3[1]
chunk.chunkcrc = struct.pack("<I", proto3[2])
chunk.offset = proto3[3]
chunk.unpackedsize = proto3[4]
chunk.packedsize = proto3[5]
file.chunks.append(chunk)
self.files.append(file)
if self.check_integrity:
proto = protobuf.deserialize(bin_signature)
for key in proto:
if key not in (1,):
raise Exception("unknown key", key)
if 1 in proto:
signature = proto[1]
if self.encryptedcrc != filedatacrc:
raise Exception("CRC check failed")
h = SHA1.new(signeddata)
pkcs1_15.new(pubkey).verify(h, signature)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment