Skip to content

Instantly share code, notes, and snippets.

@starius
Created March 30, 2026 03:06
Show Gist options
  • Select an option

  • Save starius/9b0b87dc040f4d21a58fa5eda3d87e34 to your computer and use it in GitHub Desktop.

Select an option

Save starius/9b0b87dc040f4d21a58fa5eda3d87e34 to your computer and use it in GitHub Desktop.
Extract api_id and api_hash from Telegram Desktop binary
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import struct
import sys
from collections import Counter
from dataclasses import asdict, dataclass
from pathlib import Path
HEX32_RE = re.compile(rb"(?<![0-9a-f])[0-9a-f]{32}(?![0-9a-f])")
NIX_WRAPPED_RE = re.compile(
rb"/nix/store/[0-9a-z]{32}-[^\x00\s\"']+/bin/\.[^\x00/\s\"']+-wrapped"
)
@dataclass
class Section:
name: str
addr: int
offset: int
size: int
@dataclass
class ApiIdEvidence:
api_id: int
api_id_va: int
instruction: str
@dataclass
class HashReference:
hash_xref_va: int
api_id: int | None = None
api_id_va: int | None = None
instruction: str | None = None
@dataclass
class Candidate:
api_hash: str
hash_va: int
references: list[HashReference]
@dataclass
class Extraction:
input_path: str
analyzed_path: str
wrapped_target: str | None
api_hash: str
api_id: int
hash_va: int
references: list[HashReference]
candidates: list[Candidate]
class Elf64Little:
def __init__(self, path: Path) -> None:
self.path = path
self.data = path.read_bytes()
if self.data[:4] != b"\x7fELF":
raise ValueError(f"{path} is not an ELF file")
if self.data[4] != 2 or self.data[5] != 1:
raise ValueError(f"{path} is not a 64-bit little-endian ELF")
self._sections = self._read_sections()
def section(self, name: str) -> Section:
for section in self._sections:
if section.name == name:
return section
raise ValueError(f"{self.path} has no {name} section")
def section_bytes(self, section: Section) -> bytes:
return self.data[section.offset:section.offset + section.size]
def _read_sections(self) -> list[Section]:
e_shoff = struct.unpack_from("<Q", self.data, 0x28)[0]
e_shentsize = struct.unpack_from("<H", self.data, 0x3A)[0]
e_shnum = struct.unpack_from("<H", self.data, 0x3C)[0]
e_shstrndx = struct.unpack_from("<H", self.data, 0x3E)[0]
raw_sections = []
for index in range(e_shnum):
offset = e_shoff + index * e_shentsize
fields = struct.unpack_from("<IIQQQQIIQQ", self.data, offset)
raw_sections.append(fields)
shstr = raw_sections[e_shstrndx]
names = self.data[shstr[4]:shstr[4] + shstr[5]]
sections = []
for fields in raw_sections:
name_offset = fields[0]
name_end = names.find(b"\x00", name_offset)
name = names[name_offset:name_end].decode()
sections.append(Section(
name=name,
addr=fields[3],
offset=fields[4],
size=fields[5],
))
return sections
def is_elf(path: Path) -> bool:
try:
with path.open("rb") as handle:
return handle.read(4) == b"\x7fELF"
except OSError:
return False
def find_nix_wrapped_target(path: Path) -> Path | None:
try:
data = path.read_bytes()
except OSError:
return None
match = NIX_WRAPPED_RE.search(data)
if not match:
return None
target = Path(match.group().decode())
return target if target.exists() else None
def scan_hashes(rodata: Section, rodata_bytes: bytes) -> list[tuple[int, str]]:
results = []
for match in HEX32_RE.finditer(rodata_bytes):
results.append((rodata.addr + match.start(), match.group().decode()))
return results
def iter_rip_relative_lea_targets(
text: Section,
text_bytes: bytes,
candidate_addresses: set[int],
) -> list[tuple[int, int]]:
references = []
limit = len(text_bytes) - 7
for offset in range(limit):
rex = text_bytes[offset]
if not (0x48 <= rex <= 0x4F):
continue
if text_bytes[offset + 1] != 0x8D:
continue
modrm = text_bytes[offset + 2]
if (modrm & 0xC7) != 0x05:
continue
displacement = struct.unpack_from("<i", text_bytes, offset + 3)[0]
instruction_va = text.addr + offset
target_va = instruction_va + 7 + displacement
if target_va in candidate_addresses:
references.append((instruction_va, target_va))
return references
def c7_instruction_length(blob: bytes, offset: int) -> int | None:
if offset + 2 > len(blob):
return None
modrm = blob[offset + 1]
if ((modrm >> 3) & 7) != 0:
return None
mod = modrm >> 6
rm = modrm & 7
cursor = offset + 2
if mod != 3 and rm == 4:
if cursor >= len(blob):
return None
sib = blob[cursor]
cursor += 1
if mod == 0 and (sib & 7) == 5:
cursor += 4
if mod == 0:
if rm == 5:
cursor += 4
elif mod == 1:
cursor += 1
elif mod == 2:
cursor += 4
if cursor + 4 > len(blob):
return None
return cursor + 4 - offset
def plausible_api_id(value: int) -> bool:
return 1000 <= value <= 100_000_000
def first_api_id_after_xref(
text: Section,
text_bytes: bytes,
xref_va: int,
window: int = 0x120,
) -> ApiIdEvidence | None:
start = xref_va - text.addr
end = min(len(text_bytes), start + window)
cursor = start
while cursor < end:
opcode = text_bytes[cursor]
if 0xB8 <= opcode <= 0xBF and cursor + 5 <= end:
value = struct.unpack_from("<I", text_bytes, cursor + 1)[0]
if plausible_api_id(value):
return ApiIdEvidence(
api_id=value,
api_id_va=text.addr + cursor,
instruction="mov r32, imm32",
)
cursor += 5
continue
if (
0x40 <= opcode <= 0x4F
and cursor + 6 <= end
and 0xB8 <= text_bytes[cursor + 1] <= 0xBF
):
value = struct.unpack_from("<I", text_bytes, cursor + 2)[0]
if plausible_api_id(value):
return ApiIdEvidence(
api_id=value,
api_id_va=text.addr + cursor,
instruction="mov r32, imm32 (rex)",
)
cursor += 6
continue
if opcode == 0xC7 and cursor + 6 <= end:
length = c7_instruction_length(text_bytes, cursor)
if length and cursor + length <= end:
value = struct.unpack_from("<I", text_bytes, cursor + length - 4)[0]
if plausible_api_id(value):
return ApiIdEvidence(
api_id=value,
api_id_va=text.addr + cursor,
instruction="mov r/m32, imm32",
)
cursor += length
continue
if (
0x40 <= opcode <= 0x4F
and cursor + 7 <= end
and text_bytes[cursor + 1] == 0xC7
):
length = c7_instruction_length(text_bytes, cursor + 1)
if length and cursor + 1 + length <= end:
value = struct.unpack_from(
"<I",
text_bytes,
cursor + 1 + length - 4,
)[0]
if plausible_api_id(value):
return ApiIdEvidence(
api_id=value,
api_id_va=text.addr + cursor,
instruction="mov r/m32, imm32 (rex)",
)
cursor += 1 + length
continue
cursor += 1
return None
def analyze_elf(path: Path) -> list[Candidate]:
elf = Elf64Little(path)
rodata = elf.section(".rodata")
text = elf.section(".text")
rodata_bytes = elf.section_bytes(rodata)
text_bytes = elf.section_bytes(text)
hashes = scan_hashes(rodata, rodata_bytes)
candidate_addresses = {address for address, _ in hashes}
by_address = {address: Candidate(api_hash=value, hash_va=address, references=[]) for address, value in hashes}
for xref_va, hash_va in iter_rip_relative_lea_targets(text, text_bytes, candidate_addresses):
api_id = first_api_id_after_xref(text, text_bytes, xref_va + 7)
by_address[hash_va].references.append(HashReference(
hash_xref_va=xref_va,
api_id=api_id.api_id if api_id else None,
api_id_va=api_id.api_id_va if api_id else None,
instruction=api_id.instruction if api_id else None,
))
return [candidate for candidate in by_address.values() if candidate.references]
def candidate_score(candidate: Candidate) -> tuple[int, int, int]:
ids = [reference.api_id for reference in candidate.references if reference.api_id is not None]
counts = Counter(ids)
top_support = counts.most_common(1)[0][1] if counts else 0
return (len(candidate.references), top_support, sum(ids) if ids else 0)
def pick_best_candidate(candidates: list[Candidate]) -> Candidate:
if not candidates:
raise ValueError("no API hash candidates with code references were found")
return max(candidates, key=candidate_score)
def best_api_id(candidate: Candidate) -> int:
ids = [reference.api_id for reference in candidate.references if reference.api_id is not None]
if not ids:
raise ValueError("no plausible api_id was found near the hash references")
return Counter(ids).most_common(1)[0][0]
def extract_from_path(path: Path) -> Extraction:
analyzed = path.resolve()
candidates = analyze_elf(analyzed)
wrapped_target = None
if not candidates:
target = find_nix_wrapped_target(analyzed)
if target and is_elf(target):
wrapped_target = str(target)
analyzed = target
candidates = analyze_elf(analyzed)
best = pick_best_candidate(candidates)
return Extraction(
input_path=str(path),
analyzed_path=str(analyzed),
wrapped_target=wrapped_target,
api_hash=best.api_hash,
api_id=best_api_id(best),
hash_va=best.hash_va,
references=best.references,
candidates=sorted(candidates, key=candidate_score, reverse=True),
)
def json_ready(extraction: Extraction, all_candidates: bool) -> dict:
data = asdict(extraction)
if not all_candidates:
data.pop("candidates", None)
return data
def print_human(extraction: Extraction, all_candidates: bool) -> None:
print(f"input_path: {extraction.input_path}")
print(f"analyzed_path: {extraction.analyzed_path}")
if extraction.wrapped_target:
print(f"wrapped_target: {extraction.wrapped_target}")
print(f"api_hash: {extraction.api_hash}")
print(f"api_id: {extraction.api_id}")
print(f"hash_va: 0x{extraction.hash_va:x}")
print("references:")
for reference in extraction.references:
line = f" - hash_xref_va=0x{reference.hash_xref_va:x}"
if reference.api_id is not None and reference.api_id_va is not None:
line += (
f", api_id={reference.api_id},"
f" api_id_va=0x{reference.api_id_va:x},"
f" via={reference.instruction}"
)
print(line)
if all_candidates:
print("candidates:")
for candidate in extraction.candidates:
ids = [reference.api_id for reference in candidate.references if reference.api_id is not None]
id_summary = Counter(ids).most_common()
print(
f" - api_hash={candidate.api_hash},"
f" hash_va=0x{candidate.hash_va:x},"
f" xrefs={len(candidate.references)},"
f" ids={id_summary}"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("paths", nargs="+")
parser.add_argument("--json", action="store_true", dest="json_output")
parser.add_argument("--all-candidates", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
extractions = []
failed = False
for raw_path in args.paths:
path = Path(raw_path)
try:
extraction = extract_from_path(path)
extractions.append(extraction)
except Exception as error:
failed = True
if args.json_output:
extractions.append({
"input_path": str(path),
"error": str(error),
})
else:
print(f"{path}: {error}", file=sys.stderr)
if args.json_output:
payload = []
for item in extractions:
if isinstance(item, Extraction):
payload.append(json_ready(item, args.all_candidates))
else:
payload.append(item)
print(json.dumps(payload if len(payload) != 1 else payload[0], indent=2))
else:
for index, extraction in enumerate(extractions):
if not isinstance(extraction, Extraction):
continue
if index:
print()
print_human(extraction, args.all_candidates)
return 1 if failed else 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment