Skip to content

Instantly share code, notes, and snippets.

@19h
Created February 16, 2026 00:20
Show Gist options
  • Select an option

  • Save 19h/09689f393a40c955f306f648f355c1dc to your computer and use it in GitHub Desktop.

Select an option

Save 19h/09689f393a40c955f306f648f355c1dc to your computer and use it in GitHub Desktop.
unicorn unpacker
#!/usr/bin/env python3
"""
Unicorn-based unpacker for test_packer ELF binary.
Strategy:
1. Parse the ELF, map all LOAD segments into Unicorn at a chosen base.
2. Apply R_X86_64_RELATIVE relocations (simulate ld-linux.so).
3. Set up a stack, stub out PLT imports (mprotect, open, read, etc.).
4. Execute starting at the init_array unpacker stub.
5. Track memory writes to the encrypted .text region.
6. Detect when the unpacker finishes (returns from init constructor)
and dump the now-decrypted .text to disk.
The binary layout (PIE, base 0):
LOAD R-- 0x00000 - 0x0dc08 (ELF headers, rodata, rela, etc.)
LOAD R-X 0x0ec10 - 0x1c810 (.text)
LOAD RW- 0x1d810 - 0x1f000 (.data.rel.ro, .dynamic, .got, .data, .bss)
LOAD RW- 0x1fc80 - 0x21740 (.data, .got.plt, .bss)
LOAD RWX 0x23000 - 0x308b0 (.ltext - unpacker code)
The unpacker entry is at init_array[0] = base + 0x2f390.
After the unpacker runs, .text (base+0xec10 to base+0x1c32d) should be decrypted.
"""
import struct
import sys
import os
from pathlib import Path
from unicorn import *
from unicorn.x86_const import *
from capstone import Cs, CS_ARCH_X86, CS_MODE_64
# ---------------------------------------------------------------------------
# ELF parsing helpers
# ---------------------------------------------------------------------------
class ElfSegment:
"""Minimal representation of an ELF PHDR LOAD segment."""
def __init__(self, p_type, p_flags, p_offset, p_vaddr, p_filesz, p_memsz, p_align):
self.p_type = p_type
self.p_flags = p_flags
self.p_offset = p_offset
self.p_vaddr = p_vaddr
self.p_filesz = p_filesz
self.p_memsz = p_memsz
self.p_align = p_align
@property
def readable(self):
return bool(self.p_flags & 4)
@property
def writable(self):
return bool(self.p_flags & 2)
@property
def executable(self):
return bool(self.p_flags & 1)
def __repr__(self):
rwx = (
("R" if self.readable else "-")
+ ("W" if self.writable else "-")
+ ("X" if self.executable else "-")
)
return (
f"LOAD {rwx} vaddr=0x{self.p_vaddr:08x} filesz=0x{self.p_filesz:x} "
f"memsz=0x{self.p_memsz:x} offset=0x{self.p_offset:x}"
)
class ElfReloc:
"""R_X86_64_RELATIVE relocation."""
def __init__(self, r_offset, r_addend):
self.r_offset = r_offset
self.r_addend = r_addend
class ElfInfo:
"""Parsed ELF information we need for emulation."""
def __init__(self, data: bytes):
self.data = data
# Verify ELF magic
assert data[:4] == b"\x7fELF", "Not an ELF file"
assert data[4] == 2, "Not 64-bit"
assert data[5] == 1, "Not little-endian"
self.e_type = struct.unpack_from("<H", data, 16)[0]
self.e_entry = struct.unpack_from("<Q", data, 24)[0]
e_phoff = struct.unpack_from("<Q", data, 32)[0]
e_phentsize = struct.unpack_from("<H", data, 54)[0]
e_phnum = struct.unpack_from("<H", data, 56)[0]
# Parse program headers
self.segments = []
self.all_phdrs = []
for i in range(e_phnum):
off = e_phoff + i * e_phentsize
p_type = struct.unpack_from("<I", data, off)[0]
p_flags = struct.unpack_from("<I", data, off + 4)[0]
p_offset = struct.unpack_from("<Q", data, off + 8)[0]
p_vaddr = struct.unpack_from("<Q", data, off + 16)[0]
p_filesz = struct.unpack_from("<Q", data, off + 32)[0]
p_memsz = struct.unpack_from("<Q", data, off + 40)[0]
p_align = struct.unpack_from("<Q", data, off + 48)[0]
seg = ElfSegment(
p_type, p_flags, p_offset, p_vaddr, p_filesz, p_memsz, p_align
)
self.all_phdrs.append(seg)
if p_type == 1: # PT_LOAD
self.segments.append(seg)
# Parse relocations
self.relatives = []
self.glob_dats = [] # (offset, sym_index)
self.jump_slots = [] # (offset, sym_index)
self._parse_rela_sections()
# Parse init_array
self.init_array_entries = self._parse_init_array()
# Parse dynamic symbol names
self.dynsym_names = self._parse_dynsym()
# Parse PLT stub addresses
self.plt_stubs = self._parse_plt()
def _find_section(self, name_target):
"""Find a section by name, returns (offset, size, addr) or None."""
e_shoff = struct.unpack_from("<Q", self.data, 40)[0]
e_shentsize = struct.unpack_from("<H", self.data, 58)[0]
e_shnum = struct.unpack_from("<H", self.data, 60)[0]
e_shstrndx = struct.unpack_from("<H", self.data, 62)[0]
if e_shnum == 0:
return None
# Get string table
shstrtab_entry = e_shoff + e_shstrndx * e_shentsize
shstrtab_offset = struct.unpack_from("<Q", self.data, shstrtab_entry + 24)[0]
shstrtab_size = struct.unpack_from("<Q", self.data, shstrtab_entry + 32)[0]
shstrtab = self.data[shstrtab_offset : shstrtab_offset + shstrtab_size]
for i in range(e_shnum):
off = e_shoff + i * e_shentsize
sh_name_idx = struct.unpack_from("<I", self.data, off)[0]
end = shstrtab.find(b"\x00", sh_name_idx)
name = shstrtab[sh_name_idx:end].decode("ascii", errors="replace")
if name == name_target:
sh_addr = struct.unpack_from("<Q", self.data, off + 16)[0]
sh_offset = struct.unpack_from("<Q", self.data, off + 24)[0]
sh_size = struct.unpack_from("<Q", self.data, off + 32)[0]
return (sh_offset, sh_size, sh_addr)
return None
def _parse_rela_sections(self):
"""Parse .rela.dyn and .rela.plt."""
for sec_name in (".rela.dyn", ".rela.plt"):
info = self._find_section(sec_name)
if info is None:
continue
sec_off, sec_size, _ = info
for i in range(0, sec_size, 24):
r_offset = struct.unpack_from("<Q", self.data, sec_off + i)[0]
r_info = struct.unpack_from("<Q", self.data, sec_off + i + 8)[0]
r_addend = struct.unpack_from("<q", self.data, sec_off + i + 16)[0]
r_type = r_info & 0xFFFFFFFF
r_sym = r_info >> 32
if r_type == 8: # R_X86_64_RELATIVE
self.relatives.append(ElfReloc(r_offset, r_addend))
elif r_type == 6: # R_X86_64_GLOB_DAT
self.glob_dats.append((r_offset, r_sym))
elif r_type == 7: # R_X86_64_JUMP_SLOT
self.jump_slots.append((r_offset, r_sym))
def _parse_init_array(self):
"""Parse init_array section to find constructor entries (pre-relocation addends)."""
# We need to find RELATIVE relocs targeting init_array offsets
info = self._find_section(".init_array")
if info is None:
return []
_, size, addr = info
entries = []
for rel in self.relatives:
if addr <= rel.r_offset < addr + size:
entries.append((rel.r_offset - addr, rel.r_addend))
entries.sort()
return [addend for _, addend in entries]
def _parse_dynsym(self):
"""Parse .dynsym + .dynstr to get symbol names by index."""
dynsym_info = self._find_section(".dynsym")
dynstr_info = self._find_section(".dynstr")
if dynsym_info is None or dynstr_info is None:
return {}
dynsym_off, dynsym_size, _ = dynsym_info
dynstr_off, dynstr_size, _ = dynstr_info
dynstr = self.data[dynstr_off : dynstr_off + dynstr_size]
names = {}
num_syms = dynsym_size // 24
for i in range(num_syms):
off = dynsym_off + i * 24
st_name = struct.unpack_from("<I", self.data, off)[0]
end = dynstr.find(b"\x00", st_name)
name = dynstr[st_name:end].decode("ascii", errors="replace")
names[i] = name
return names
def _parse_plt(self):
"""Parse .plt section to map PLT stub addresses to symbol indices."""
# .rela.plt gives us the GOT slot -> symbol mapping
# Each PLT entry is 16 bytes (after the initial PLT0 stub which is also 16 bytes)
plt_info = self._find_section(".plt")
if plt_info is None:
return {}
_, _, plt_addr = plt_info
# PLT0 is at plt_addr, first real entry at plt_addr + 0x10
stubs = {}
for idx, (got_offset, sym_idx) in enumerate(self.jump_slots):
stub_addr = plt_addr + 0x10 + idx * 0x10
name = self.dynsym_names.get(sym_idx, f"sym_{sym_idx}")
stubs[stub_addr] = name
return stubs
# ---------------------------------------------------------------------------
# Emulator
# ---------------------------------------------------------------------------
# PIE base address - we pick something reasonable
BASE_ADDR = 0x400000
# Stack configuration
STACK_ADDR = 0x7FFF0000
STACK_SIZE = 0x100000 # 1MB
# Stub return address - a magic address that we hook to detect function returns
STUB_RET_ADDR = 0xDEAD0000
# Address range for PLT stubs that return specific values
PLT_STUB_BASE = 0xBEEF0000
PLT_STUB_SIZE = 0x10000
# Page size
PAGE_SIZE = 0x1000
def align_down(addr, alignment):
return addr & ~(alignment - 1)
def align_up(addr, alignment):
return (addr + alignment - 1) & ~(alignment - 1)
class Unpacker:
def __init__(self, binary_path: str):
self.binary_path = binary_path
with open(binary_path, "rb") as f:
self.raw_data = f.read()
self.elf = ElfInfo(self.raw_data)
self.base = BASE_ADDR
# Unicorn engine
self.uc = Uc(UC_ARCH_X86, UC_MODE_64)
# Tracking state
self.written_pages = set() # Pages that have been written to
self.executed_pages = set() # Pages that have been executed
self.write_then_exec = (
set()
) # Pages that were written then executed (unpacked!)
self.instruction_count = 0
self.max_instructions = 50_000_000 # Safety limit
self.finished = False
self.unpacked_regions = [] # List of (start, end) ranges that were unpacked
# The .text section that we expect to be decrypted
text_info = self.elf._find_section(".text")
if text_info:
_, text_size, text_addr = text_info
self.text_vaddr = text_addr
self.text_size = text_size
self.text_start = self.base + text_addr
self.text_end = self.base + text_addr + text_size
print(
f"[*] .text section: 0x{self.text_start:x} - 0x{self.text_end:x} (0x{text_size:x} bytes)"
)
else:
print("[!] No .text section found")
self.text_start = 0
self.text_end = 0
self.text_size = 0
self.text_vaddr = 0
# Encrypted .text snapshot (before unpacking)
self.text_original = None
# Capstone disassembler for debug output
self.cs = Cs(CS_ARCH_X86, CS_MODE_64)
# PLT function hooks
self.plt_hooks = {} # address -> (name, handler_func)
# Syscall tracking
self.mprotect_calls = []
# File descriptor tracking for read() position
self.fd_state = {} # fd -> { 'path': str, 'pos': int }
def setup(self):
"""Set up the emulation environment."""
self._map_segments()
self._apply_relocations()
self._setup_stack()
self._setup_plt_stubs()
self._snapshot_text()
self._install_hooks()
def _map_segments(self):
"""Map all LOAD segments into Unicorn memory."""
print("\n[*] Mapping ELF segments:")
# First pass: find the total address range we need
min_addr = min(seg.p_vaddr for seg in self.elf.segments)
max_addr = max(seg.p_vaddr + seg.p_memsz for seg in self.elf.segments)
# Map the entire range as RWX for simplicity during unpacking
# The unpacker itself needs to write to .text which is nominally R-X
map_start = align_down(self.base + min_addr, PAGE_SIZE)
map_end = align_up(self.base + max_addr, PAGE_SIZE)
map_size = map_end - map_start
print(
f" Total mapping: 0x{map_start:x} - 0x{map_end:x} (0x{map_size:x} bytes)"
)
self.uc.mem_map(map_start, map_size, UC_PROT_ALL)
self.map_start = map_start
self.map_end = map_end
# Load segment data
for seg in self.elf.segments:
if seg.p_filesz > 0:
addr = self.base + seg.p_vaddr
file_data = self.raw_data[seg.p_offset : seg.p_offset + seg.p_filesz]
self.uc.mem_write(addr, file_data)
print(f" {seg} -> loaded at 0x{addr:x}")
def _apply_relocations(self):
"""Apply R_X86_64_RELATIVE relocations (base + addend)."""
count = 0
skipped = 0
for rel in self.elf.relatives:
addr = self.base + rel.r_offset
value = (self.base + rel.r_addend) & 0xFFFFFFFFFFFFFFFF # Mask to u64
try:
self.uc.mem_write(addr, struct.pack("<Q", value))
count += 1
except UcError:
skipped += 1
print(f"[*] Applied {count} R_X86_64_RELATIVE relocations (skipped {skipped})")
def _setup_stack(self):
"""Set up a stack for the emulated code."""
self.uc.mem_map(STACK_ADDR, STACK_SIZE, UC_PROT_ALL)
# Stack grows down, start near the top
sp = STACK_ADDR + STACK_SIZE - 0x1000
# Align to 16 bytes
sp = sp & ~0xF
self.uc.reg_write(UC_X86_REG_RSP, sp)
self.uc.reg_write(UC_X86_REG_RBP, 0)
# Map the stub return address page
self.uc.mem_map(align_down(STUB_RET_ADDR, PAGE_SIZE), PAGE_SIZE, UC_PROT_ALL)
# Write a HLT instruction there so we stop if we somehow execute it
self.uc.mem_write(STUB_RET_ADDR, b"\xf4") # HLT
print(f"[*] Stack at 0x{STACK_ADDR:x}, RSP = 0x{sp:x}")
def _setup_plt_stubs(self):
"""
Set up PLT stubs. For each PLT entry, write the resolved function
address into the GOT slot so the PLT jump goes to our stub.
"""
# Map PLT stub area
self.uc.mem_map(PLT_STUB_BASE, PLT_STUB_SIZE, UC_PROT_ALL)
# For each jump slot relocation, we know which GOT entry to patch
stub_offset = 0
for got_offset, sym_idx in self.elf.jump_slots:
name = self.elf.dynsym_names.get(sym_idx, f"sym_{sym_idx}")
stub_addr = PLT_STUB_BASE + stub_offset
# Write a RET instruction at the stub address
self.uc.mem_write(stub_addr, b"\xc3") # RET
# Patch the GOT entry to point to our stub
got_addr = self.base + got_offset
self.uc.mem_write(got_addr, struct.pack("<Q", stub_addr))
self.plt_hooks[stub_addr] = name
stub_offset += 0x10
# Also patch GLOB_DAT entries
for got_offset, sym_idx in self.elf.glob_dats:
name = self.elf.dynsym_names.get(sym_idx, f"sym_{sym_idx}")
stub_addr = PLT_STUB_BASE + stub_offset
self.uc.mem_write(stub_addr, b"\xc3")
got_addr = self.base + got_offset
self.uc.mem_write(got_addr, struct.pack("<Q", stub_addr))
self.plt_hooks[stub_addr] = name
stub_offset += 0x10
print(f"[*] Set up {len(self.plt_hooks)} PLT/GOT stubs")
def _snapshot_text(self):
"""Take a snapshot of .text before unpacking."""
if self.text_start and self.text_end:
self.text_original = bytes(
self.uc.mem_read(self.text_start, self.text_size)
)
# Quick entropy check
freq = {}
for b in self.text_original:
freq[b] = freq.get(b, 0) + 1
import math
entropy = -sum(
(c / len(self.text_original)) * math.log2(c / len(self.text_original))
for c in freq.values()
)
print(
f"[*] .text snapshot taken ({len(self.text_original)} bytes, entropy={entropy:.2f})"
)
def _install_hooks(self):
"""Install Unicorn hooks for tracking execution and memory writes."""
# Hook on memory writes to detect which pages get modified
self.uc.hook_add(UC_HOOK_MEM_WRITE, self._hook_mem_write)
# Hook on code execution to detect execution of previously-written pages
self.uc.hook_add(UC_HOOK_CODE, self._hook_code)
# Hook invalid memory access for debugging
self.uc.hook_add(
UC_HOOK_MEM_READ_UNMAPPED
| UC_HOOK_MEM_WRITE_UNMAPPED
| UC_HOOK_MEM_FETCH_UNMAPPED,
self._hook_mem_invalid,
)
def _generate_proc_maps(self) -> bytes:
"""Generate a fake /proc/self/maps that matches our emulation layout."""
lines = []
for seg in self.elf.segments:
start = self.base + seg.p_vaddr
end = self.base + seg.p_vaddr + seg.p_memsz
perms = (
("r" if seg.readable else "-")
+ ("w" if seg.writable else "-")
+ ("x" if seg.executable else "-")
+ "p"
)
lines.append(
f"{start:012x}-{end:012x} {perms} {seg.p_offset:08x} "
f"00:00 0 /tmp/test_packer\n"
)
# Add stack
lines.append(
f"{STACK_ADDR:012x}-{STACK_ADDR + STACK_SIZE:012x} rw-p 00000000 "
f"00:00 0 [stack]\n"
)
return "".join(lines).encode()
def _hook_mem_write(self, uc, access, address, size, value, user_data):
"""Track memory writes, especially to the .text region."""
page = align_down(address, PAGE_SIZE)
self.written_pages.add(page)
# Track writes specifically to .text
if self.text_start <= address < self.text_end:
# This is a write to the encrypted .text section - unpacking in progress!
pass # We'll compare later
def _hook_code(self, uc, address, size, user_data):
"""Track code execution."""
self.instruction_count += 1
# Safety limit
if self.instruction_count >= self.max_instructions:
print(f"\n[!] Hit instruction limit ({self.max_instructions}), stopping")
uc.emu_stop()
return
# Check if we hit the stub return address
if address == STUB_RET_ADDR:
print("[*] Execution reached STUB_RET_ADDR - init constructor returned!")
self.finished = True
uc.emu_stop()
return
# Check if we're executing from a PLT stub
if address in self.plt_hooks:
name = self.plt_hooks[address]
self._handle_plt_call(uc, address, name)
return
# Track execution of previously-written pages
page = align_down(address, PAGE_SIZE)
if page in self.written_pages and page not in self.executed_pages:
# First execution of a page that was written to!
if self.text_start <= address < self.text_end:
print(
f"\n[!] UNPACKED CODE EXECUTION DETECTED at 0x{address:x} (page 0x{page:x})"
)
self.write_then_exec.add(page)
# Don't stop yet - let the unpacker finish its init constructor
self.executed_pages.add(page)
# Periodic status
if self.instruction_count % 1_000_000 == 0:
rip = uc.reg_read(UC_X86_REG_RIP)
print(
f" [{self.instruction_count / 1_000_000:.0f}M insns] RIP=0x{rip:x}, "
f"written_pages={len(self.written_pages)}, "
f"write-then-exec={len(self.write_then_exec)}"
)
def _handle_plt_call(self, uc, address, name):
"""Handle a call to an imported function."""
rdi = uc.reg_read(UC_X86_REG_RDI)
rsi = uc.reg_read(UC_X86_REG_RSI)
rdx = uc.reg_read(UC_X86_REG_RDX)
r10 = uc.reg_read(UC_X86_REG_R10)
r8 = uc.reg_read(UC_X86_REG_R8)
r9 = uc.reg_read(UC_X86_REG_R9)
# Default: return 0 (success)
ret_val = 0
if name == "mprotect":
prot_str = []
if rdx & 1:
prot_str.append("X")
if rdx & 2:
prot_str.append("W")
if rdx & 4:
prot_str.append("R")
prot = "".join(prot_str) or "NONE"
print(f" [PLT] mprotect(0x{rdi:x}, 0x{rsi:x}, {prot}) -> 0")
self.mprotect_calls.append((rdi, rsi, rdx))
ret_val = 0
elif name == "getpagesize":
ret_val = PAGE_SIZE
elif name == "open":
# The unpacker might try to open itself or /proc/self/maps
fd = 3 + len(self.fd_state)
try:
path_data = bytes(uc.mem_read(rdi, 256))
path = path_data[: path_data.index(0)].decode("ascii", errors="replace")
print(f' [PLT] open("{path}", 0x{rsi:x}) -> {fd} (fake)')
self.fd_state[fd] = {"path": path, "pos": 0}
except:
print(f" [PLT] open(0x{rdi:x}, 0x{rsi:x}) -> {fd} (fake)")
self.fd_state[fd] = {"path": "", "pos": 0}
ret_val = fd
elif name == "fstat":
# Fill in a minimal stat structure
# rdi = fd, rsi = stat buffer
print(f" [PLT] fstat({rdi}, 0x{rsi:x}) -> 0")
# Write file size into st_size (offset 48 in struct stat on x86-64)
file_size = len(self.raw_data)
uc.mem_write(rsi + 48, struct.pack("<Q", file_size))
ret_val = 0
elif name == "mmap":
# mmap(addr, length, prot, flags, fd, offset)
length = rsi
offset = r9
flags = r10
print(
f" [PLT] mmap(0x{rdi:x}, 0x{rsi:x}, prot=0x{rdx:x}, flags=0x{r10:x}, fd={r8}, off=0x{r9:x})"
)
# If mapping the binary itself (fd=3), provide the file data
if r8 == 3:
# Allocate memory for the mapping
mmap_addr = 0xC0000000
mmap_size = align_up(length, PAGE_SIZE)
try:
self.uc.mem_map(mmap_addr, mmap_size, UC_PROT_ALL)
except UcError:
pass # Already mapped
# Copy file data at the requested offset
end = min(offset + length, len(self.raw_data))
if offset < len(self.raw_data):
data_to_write = self.raw_data[offset:end]
self.uc.mem_write(mmap_addr, data_to_write)
print(
f" -> 0x{mmap_addr:x} (mapped {end - offset} bytes from file offset 0x{offset:x})"
)
ret_val = mmap_addr
else:
# Anonymous mapping
mmap_addr = 0xC1000000
mmap_size = align_up(length, PAGE_SIZE)
try:
self.uc.mem_map(mmap_addr, mmap_size, UC_PROT_ALL)
except UcError:
pass
print(f" -> 0x{mmap_addr:x} (anonymous, {mmap_size} bytes)")
ret_val = mmap_addr
elif name == "read":
# read(fd, buf, count)
fd = rdi
buf = rsi
count = rdx
fd_info = self.fd_state.get(fd, {"path": "", "pos": 0})
print(
f" [PLT] read(fd={fd}, buf=0x{buf:x}, count=0x{count:x}) path={fd_info['path']}"
)
if "maps" in fd_info.get("path", ""):
# Generate a fake /proc/self/maps that shows our memory layout
maps_content = self._generate_proc_maps()
pos = fd_info["pos"]
remaining = maps_content[pos : pos + count]
if remaining:
uc.mem_write(buf, remaining)
fd_info["pos"] = pos + len(remaining)
ret_val = len(remaining)
else:
ret_val = 0 # EOF
elif fd in self.fd_state:
# Generic file read (e.g. the binary itself)
pos = fd_info["pos"]
chunk = self.raw_data[pos : pos + count]
if chunk:
uc.mem_write(buf, chunk)
fd_info["pos"] = pos + len(chunk)
ret_val = len(chunk)
else:
ret_val = 0
else:
ret_val = 0
elif name == "close":
ret_val = 0
elif name == "munmap":
print(f" [PLT] munmap(0x{rdi:x}, 0x{rsi:x}) -> 0")
ret_val = 0
elif name == "syscall":
# The binary calls syscall() - rdi = syscall number
syscall_nr = rdi
print(f" [PLT] syscall(nr={syscall_nr}, 0x{rsi:x}, 0x{rdx:x}, ...)")
ret_val = 0
elif name == "memcpy":
# memcpy(dest, src, n) - actually perform the copy
dest, src, n = rdi, rsi, rdx
if n > 0 and n < 0x100000:
try:
data = bytes(uc.mem_read(src, n))
uc.mem_write(dest, data)
ret_val = dest
except UcError:
ret_val = dest
else:
ret_val = dest
elif name == "memset":
# memset(s, c, n) - actually perform the set
s, c, n = rdi, rsi & 0xFF, rdx
if n > 0 and n < 0x100000:
try:
uc.mem_write(s, bytes([c]) * n)
except UcError:
pass
ret_val = s
elif name == "memmove":
dest, src, n = rdi, rsi, rdx
if n > 0 and n < 0x100000:
try:
data = bytes(uc.mem_read(src, n))
uc.mem_write(dest, data)
except UcError:
pass
ret_val = dest
elif name == "bcmp":
# bcmp(s1, s2, n) -> 0 if equal
ret_val = 0
elif name == "strlen":
try:
s = bytes(uc.mem_read(rdi, 4096))
ret_val = s.index(0)
except:
ret_val = 0
elif name == "puts":
try:
s = bytes(uc.mem_read(rdi, 4096))
text = s[: s.index(0)].decode("ascii", errors="replace")
print(f' [PLT] puts("{text}")')
except:
print(f" [PLT] puts(0x{rdi:x})")
ret_val = 1
elif name == "printf":
try:
s = bytes(uc.mem_read(rdi, 4096))
text = s[: s.index(0)].decode("ascii", errors="replace")
print(f' [PLT] printf("{text}", ...)')
except:
print(f" [PLT] printf(0x{rdi:x}, ...)")
ret_val = 1
elif name == "snprintf":
ret_val = 0
elif name == "clock_gettime":
# clock_gettime(clockid, timespec*)
# Write some fake time
if rsi:
uc.mem_write(rsi, struct.pack("<QQ", 1000000, 0))
ret_val = 0
elif name == "__errno_location":
# Return a pointer to a fake errno
errno_addr = STACK_ADDR + 0x100
uc.mem_write(errno_addr, struct.pack("<I", 0))
ret_val = errno_addr
elif name == "sigaddset" or name == "sigemptyset":
ret_val = 0
elif name == "pthread_mutex_lock" or name == "pthread_mutex_unlock":
ret_val = 0
elif name == "toupper":
c = rdi & 0xFF
if ord("a") <= c <= ord("z"):
ret_val = c - 32
else:
ret_val = c
elif name == "sqrtf" or name == "sinf" or name == "cosf":
# These are float functions, just return 0.0 in xmm0
ret_val = 0
elif name == "_Znwm":
# operator new(size_t)
new_addr = 0xD0000000
size = align_up(rdi, PAGE_SIZE) if rdi > 0 else PAGE_SIZE
try:
self.uc.mem_map(new_addr, size, UC_PROT_ALL)
except UcError:
pass
ret_val = new_addr
elif name == "_ZdlPvm":
# operator delete(void*, size_t)
ret_val = 0
else:
# Unknown function - just return 0
pass
# Set return value in RAX
uc.reg_write(UC_X86_REG_RAX, ret_val & 0xFFFFFFFFFFFFFFFF)
def _hook_mem_invalid(self, uc, access, address, size, value, user_data):
"""Handle invalid memory access."""
access_type = {
UC_MEM_READ_UNMAPPED: "READ",
UC_MEM_WRITE_UNMAPPED: "WRITE",
UC_MEM_FETCH_UNMAPPED: "FETCH",
}.get(access, f"type={access}")
rip = uc.reg_read(UC_X86_REG_RIP)
print(
f"\n[!] Invalid memory {access_type} at 0x{address:x} (size={size}) from RIP=0x{rip:x}"
)
# Try to map the page and continue
page = align_down(address, PAGE_SIZE)
try:
uc.mem_map(page, PAGE_SIZE, UC_PROT_ALL)
print(f" Auto-mapped page 0x{page:x}")
return True # Continue execution
except UcError:
return False # Stop execution
def run_unpacker(self):
"""Run the init_array unpacker constructor."""
if not self.elf.init_array_entries:
print("[!] No init_array entries found!")
return False
# The first init_array entry (0x2f390) is the unpacker
unpacker_addr = self.base + self.elf.init_array_entries[0]
print(f"\n{'=' * 70}")
print(f"[*] Starting unpacker at 0x{unpacker_addr:x}")
print(f" (init_array[0] = base + 0x{self.elf.init_array_entries[0]:x})")
print(f"{'=' * 70}\n")
# Push a return address on the stack so when the unpacker returns,
# we detect it
rsp = self.uc.reg_read(UC_X86_REG_RSP)
rsp -= 8
self.uc.mem_write(rsp, struct.pack("<Q", STUB_RET_ADDR))
self.uc.reg_write(UC_X86_REG_RSP, rsp)
try:
self.uc.emu_start(
unpacker_addr, STUB_RET_ADDR, timeout=0, count=self.max_instructions
)
except UcError as e:
rip = self.uc.reg_read(UC_X86_REG_RIP)
print(f"\n[!] Emulation error: {e} at RIP=0x{rip:x}")
# Even if we error out, check if .text was modified
if self._check_text_modified():
print("[*] Despite the error, .text appears to have been modified!")
return True
return False
print(f"\n[*] Emulation finished after {self.instruction_count} instructions")
if self.finished:
print("[*] Unpacker constructor returned normally")
return True
def _check_text_modified(self):
"""Check if .text has been modified from its original state."""
if self.text_original is None:
return False
current = bytes(self.uc.mem_read(self.text_start, self.text_size))
return current != self.text_original
def analyze_and_dump(self, output_path: str = "."):
"""Analyze the results and dump unpacked code."""
output_dir = Path(output_path)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"\n{'=' * 70}")
print("[*] ANALYSIS RESULTS")
print(f"{'=' * 70}")
print(f" Instructions executed: {self.instruction_count}")
print(f" Pages written to: {len(self.written_pages)}")
print(f" Pages that were written then executed: {len(self.write_then_exec)}")
print(f" mprotect calls: {len(self.mprotect_calls)}")
for addr, size, prot in self.mprotect_calls:
prot_str = []
if prot & 4:
prot_str.append("R")
if prot & 2:
prot_str.append("W")
if prot & 1:
prot_str.append("X")
print(
f" mprotect(0x{addr:x}, 0x{size:x}, {''.join(prot_str) or 'NONE'})"
)
# Check if .text was modified
if self.text_original is not None:
current_text = bytes(self.uc.mem_read(self.text_start, self.text_size))
if current_text != self.text_original:
# Count modified bytes
modified = sum(
1 for a, b in zip(self.text_original, current_text) if a != b
)
print(
f"\n[*] .text MODIFIED: {modified}/{self.text_size} bytes changed "
f"({modified * 100 / self.text_size:.1f}%)"
)
# Entropy of decrypted .text
import math
freq = {}
for b in current_text:
freq[b] = freq.get(b, 0) + 1
entropy = -sum(
(c / len(current_text)) * math.log2(c / len(current_text))
for c in freq.values()
)
print(
f" Decrypted .text entropy: {entropy:.2f} (was {self._original_entropy():.2f})"
)
# Find the boundaries of modified regions
regions = self._find_modified_regions(self.text_original, current_text)
print(f" Modified regions: {len(regions)}")
for start, end in regions[:10]:
abs_start = self.text_start + start
abs_end = self.text_start + end
print(
f" 0x{abs_start:x} - 0x{abs_end:x} ({end - start} bytes)"
)
# Dump the decrypted .text
text_path = output_dir / "unpacked_text.bin"
text_path.write_bytes(current_text)
print(
f"\n[+] Dumped decrypted .text to: {text_path} ({len(current_text)} bytes)"
)
# Also dump as a patched ELF - replace encrypted .text with decrypted
self._dump_patched_elf(output_dir, current_text)
# Disassemble first few instructions of decrypted code
self._disassemble_sample(current_text)
else:
print("\n[!] .text was NOT modified - unpacker may not have run fully")
# Dump all modified memory regions (not just .text)
self._dump_all_modified(output_dir)
def _original_entropy(self):
import math
freq = {}
for b in self.text_original:
freq[b] = freq.get(b, 0) + 1
return -sum(
(c / len(self.text_original)) * math.log2(c / len(self.text_original))
for c in freq.values()
)
def _find_modified_regions(self, original, current):
"""Find contiguous regions that differ between original and current."""
regions = []
in_region = False
start = 0
for i in range(len(original)):
if original[i] != current[i]:
if not in_region:
start = i
in_region = True
else:
if in_region:
regions.append((start, i))
in_region = False
if in_region:
regions.append((start, len(original)))
return regions
def _dump_patched_elf(self, output_dir: Path, decrypted_text: bytes):
"""Create a patched copy of the ELF with decrypted .text."""
patched = bytearray(self.raw_data)
# Find .text section file offset
text_info = self.elf._find_section(".text")
if text_info:
file_off, size, _ = text_info
patched[file_off : file_off + size] = decrypted_text[:size]
elf_path = output_dir / "unpacked_elf"
elf_path.write_bytes(bytes(patched))
os.chmod(str(elf_path), 0o755)
print(f"[+] Dumped patched ELF to: {elf_path}")
def _dump_all_modified(self, output_dir: Path):
"""Dump all pages that were written to during unpacking."""
# Dump the full memory image of the mapped region
try:
full_mem = bytes(
self.uc.mem_read(self.map_start, self.map_end - self.map_start)
)
mem_path = output_dir / "full_memory_dump.bin"
mem_path.write_bytes(full_mem)
print(
f"[+] Full memory dump: {mem_path} "
f"(0x{self.map_start:x}-0x{self.map_end:x}, {len(full_mem)} bytes)"
)
except UcError as e:
print(f"[!] Could not dump full memory: {e}")
def _disassemble_sample(self, code: bytes, num_insns: int = 30):
"""Disassemble the first few instructions of unpacked code."""
# Find the entry point offset within .text
entry_offset = self.elf.e_entry - self.text_vaddr
if 0 <= entry_offset < len(code):
sample = code[entry_offset : entry_offset + 256]
addr = self.base + self.elf.e_entry
label = "entry point"
else:
# Just show from the start
sample = code[:256]
addr = self.text_start
label = ".text start"
print(
f"\n[*] Disassembly sample (first {num_insns} instructions from {label} @ 0x{addr:x}):"
)
count = 0
for insn in self.cs.disasm(sample, addr):
hex_bytes = " ".join(f"{b:02x}" for b in insn.bytes)
print(
f" 0x{insn.address:x}: {hex_bytes:30s} {insn.mnemonic} {insn.op_str}"
)
count += 1
if count >= num_insns:
break
if count == 0:
print(" (no valid instructions decoded)")
# Also disassemble at the 'main' function if we can find it
# main was at 0x115f0 in the original binary
main_offset = 0x115F0 - self.text_vaddr
if 0 <= main_offset < len(code):
main_code = code[main_offset : main_offset + 256]
main_addr = self.base + 0x115F0
print(f"\n[*] Disassembly of 'main' (0x{main_addr:x}):")
count = 0
for insn in self.cs.disasm(main_code, main_addr):
hex_bytes = " ".join(f"{b:02x}" for b in insn.bytes)
print(
f" 0x{insn.address:x}: {hex_bytes:30s} {insn.mnemonic} {insn.op_str}"
)
count += 1
if count >= num_insns:
break
def main():
binary_path = "/Users/int/Downloads/test_packer"
if not os.path.exists(binary_path):
print(f"[!] Binary not found: {binary_path}")
sys.exit(1)
print(f"[*] Unicorn Unpacker for: {binary_path}")
print(f"[*] Base address: 0x{BASE_ADDR:x}")
output_dir = Path(__file__).parent / "output"
unpacker = Unpacker(binary_path)
unpacker.setup()
success = unpacker.run_unpacker()
if success:
unpacker.analyze_and_dump(str(output_dir))
else:
print("\n[!] Unpacking did not complete successfully")
# Still try to dump what we have
unpacker.analyze_and_dump(str(output_dir))
print("\n[*] Done!")
if __name__ == "__main__":
main()
from __future__ import annotations
import argparse
import errno
import hashlib
import json
import os
import struct
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from elftools.elf.elffile import ELFFile
from elftools.elf.relocation import RelocationSection
from unicorn import (
UC_ARCH_X86,
UC_HOOK_BLOCK,
UC_HOOK_INSN,
UC_HOOK_INTR,
UC_HOOK_MEM_INVALID,
UC_HOOK_MEM_WRITE,
UC_MEM_FETCH_PROT,
UC_MEM_FETCH_UNMAPPED,
UC_MEM_READ_PROT,
UC_MEM_READ_UNMAPPED,
UC_MEM_WRITE_PROT,
UC_MEM_WRITE_UNMAPPED,
UC_MODE_64,
UC_PROT_EXEC,
UC_PROT_READ,
UC_PROT_WRITE,
Uc,
UcError,
)
from unicorn.x86_const import (
UC_X86_INS_SYSCALL,
UC_X86_REG_FS_BASE,
UC_X86_REG_GS_BASE,
UC_X86_REG_R10,
UC_X86_REG_R8,
UC_X86_REG_R9,
UC_X86_REG_RAX,
UC_X86_REG_RBP,
UC_X86_REG_RCX,
UC_X86_REG_RDI,
UC_X86_REG_RDX,
UC_X86_REG_RIP,
UC_X86_REG_RSI,
UC_X86_REG_RSP,
)
PAGE_SIZE = 0x1000
STACK_TOP = 0x7FFF_FFFF_0000
STACK_SIZE = 8 * 1024 * 1024
DEFAULT_PIE_BASE = 0x5555_5555_4000
# Linux auxv constants.
AT_NULL = 0
AT_PHDR = 3
AT_PHENT = 4
AT_PHNUM = 5
AT_PAGESZ = 6
AT_BASE = 7
AT_ENTRY = 9
AT_PLATFORM = 15
AT_RANDOM = 25
# Linux syscall constants (x86_64).
SYS_READ = 0
SYS_WRITE = 1
SYS_OPEN = 2
SYS_CLOSE = 3
SYS_LSEEK = 8
SYS_MMAP = 9
SYS_MPROTECT = 10
SYS_MUNMAP = 11
SYS_BRK = 12
SYS_RT_SIGACTION = 13
SYS_RT_SIGPROCMASK = 14
SYS_IOCTL = 16
SYS_WRITEV = 20
SYS_ACCESS = 21
SYS_GETPID = 39
SYS_UNAME = 63
SYS_READLINK = 89
SYS_GETUID = 102
SYS_GETGID = 104
SYS_ARCH_PRCTL = 158
SYS_FUTEX = 202
SYS_SET_TID_ADDRESS = 218
SYS_CLOCK_GETTIME = 228
SYS_EXIT_GROUP = 231
SYS_OPENAT = 257
SYS_NEWFSTATAT = 262
SYS_PRLIMIT64 = 302
SYS_GETRANDOM = 318
SYS_EXIT = 60
# mmap flags.
MAP_FIXED = 0x10
MAP_ANONYMOUS = 0x20
# arch_prctl codes.
ARCH_SET_GS = 0x1001
ARCH_SET_FS = 0x1002
ARCH_GET_FS = 0x1003
ARCH_GET_GS = 0x1004
# x86_64 relocation types.
R_X86_64_64 = 1
R_X86_64_GLOB_DAT = 6
R_X86_64_JUMP_SLOT = 7
R_X86_64_RELATIVE = 8
def align_down(value: int, align: int = PAGE_SIZE) -> int:
return value & ~(align - 1)
def align_up(value: int, align: int = PAGE_SIZE) -> int:
return (value + align - 1) & ~(align - 1)
def as_i64(value: int) -> int:
value &= 0xFFFF_FFFF_FFFF_FFFF
if value & (1 << 63):
return value - (1 << 64)
return value
def as_u64(value: int) -> int:
return value & 0xFFFF_FFFF_FFFF_FFFF
def os_error_code(exc: OSError, fallback: int = errno.EIO) -> int:
if exc.errno is None:
return fallback
return int(exc.errno)
def elf_flags_to_uc(flags: int) -> int:
perm = 0
if flags & 4:
perm |= UC_PROT_READ
if flags & 2:
perm |= UC_PROT_WRITE
if flags & 1:
perm |= UC_PROT_EXEC
if perm == 0:
return UC_PROT_READ
return perm
def linux_prot_to_uc(prot: int) -> int:
perm = 0
if prot & 0x1:
perm |= UC_PROT_READ
if prot & 0x2:
perm |= UC_PROT_WRITE
if prot & 0x4:
perm |= UC_PROT_EXEC
if perm == 0:
return UC_PROT_READ
return perm
@dataclass
class DumpInfo:
index: int
trigger_rip: int
start: int
size: int
sha256: str
blob_path: str
meta_path: str
class UnpackEmulator:
def __init__(
self,
binary_path: Path,
out_dir: Path,
max_instructions: int,
max_dump_bytes: int,
base_addr: int | None,
verbose: bool,
) -> None:
self.binary_path = binary_path
self.out_dir = out_dir
self.max_instructions = max_instructions
self.max_dump_bytes = max_dump_bytes
self.base_addr = base_addr
self.verbose = verbose
self.uc = Uc(UC_ARCH_X86, UC_MODE_64)
self.page_perms: dict[int, int] = {}
self.dirty_pages: set[int] = set()
self.exec_pages: set[int] = set()
self.dumped_pages: set[int] = set()
self.dumps: list[DumpInfo] = []
self.import_slots: dict[str, list[int]] = {}
self.stub_symbols: dict[int, str] = {}
self.main_return_stub = 0
self.libc_after_init_stub = 0
self.pending_start_main: tuple[int, int, int] | None = None
self.pre_main_init_funcs: list[int] = []
self.pending_init_queue: list[int] = []
self.did_libc_start_shim = False
self.entry = 0
self.phdr = 0
self.phent = 0
self.phnum = 0
self.heap_base = 0
self.heap_end = 0
self.mmap_cursor = 0x1_0000_0000
self.next_guest_fd = 3
self.guest_fds: dict[int, int] = {}
self.exited = False
self.exit_code: int | None = None
self.start_time = 0.0
def log(self, msg: str) -> None:
if self.verbose:
print(f"[emu] {msg}")
def _set_page_perm(self, page: int, perm: int) -> None:
if page in self.page_perms:
self.uc.mem_protect(page, PAGE_SIZE, perm)
else:
self.uc.mem_map(page, PAGE_SIZE, perm)
self.page_perms[page] = perm
if perm & UC_PROT_EXEC:
self.exec_pages.add(page)
else:
self.exec_pages.discard(page)
def _map_range(self, start: int, size: int, perm: int) -> None:
if size <= 0:
return
page = align_down(start)
end = align_up(start + size)
while page < end:
self._set_page_perm(page, perm)
page += PAGE_SIZE
def _is_mapped(self, page: int) -> bool:
return page in self.page_perms
def _range_is_free(self, start: int, size: int) -> bool:
page = align_down(start)
end = align_up(start + size)
while page < end:
if page in self.page_perms:
return False
page += PAGE_SIZE
return True
def _unmap_range(self, start: int, size: int) -> None:
if size <= 0:
return
page = align_down(start)
end = align_up(start + size)
while page < end:
if page in self.page_perms:
self.uc.mem_unmap(page, PAGE_SIZE)
self.page_perms.pop(page, None)
self.exec_pages.discard(page)
self.dirty_pages.discard(page)
self.dumped_pages.discard(page)
page += PAGE_SIZE
def _set_ret(self, value: int) -> None:
self.uc.reg_write(UC_X86_REG_RAX, as_u64(value))
def _read_c_string(self, addr: int, max_len: int = 4096) -> str:
out = bytearray()
for i in range(max_len):
b = self.uc.mem_read(addr + i, 1)
if b == b"\x00":
break
out += b
return out.decode("utf-8", errors="replace")
def _push_u64(self, rsp: int, value: int) -> int:
rsp -= 8
self.uc.mem_write(rsp, struct.pack("<Q", as_u64(value)))
return rsp
def _alloc_guest_fd(self, host_fd: int) -> int:
guest_fd = self.next_guest_fd
self.next_guest_fd += 1
self.guest_fds[guest_fd] = host_fd
return guest_fd
def _resolve_host_fd(self, guest_fd: int) -> int | None:
if guest_fd in (0, 1, 2):
return guest_fd
return self.guest_fds.get(guest_fd)
def _close_guest_fds(self) -> None:
for guest_fd, host_fd in list(self.guest_fds.items()):
try:
os.close(host_fd)
except OSError:
pass
self.guest_fds.pop(guest_fd, None)
def _write_u64_mem(self, addr: int, value: int) -> None:
self.uc.mem_write(addr, struct.pack("<Q", as_u64(value)))
def _read_u64_mem(self, addr: int) -> int:
return struct.unpack("<Q", bytes(self.uc.mem_read(addr, 8)))[0]
def _apply_relocations(self, elf: ELFFile) -> None:
if self.base_addr is None:
raise RuntimeError("base address was not initialized")
base = self.base_addr
self.import_slots.clear()
for section in elf.iter_sections():
if not isinstance(section, RelocationSection):
continue
link_index = int(section["sh_link"])
symtab = elf.get_section(link_index) if link_index != 0 else None
for rel in section.iter_relocations():
r_type = int(rel["r_info_type"])
r_sym = int(rel["r_info_sym"])
reloc_addr = base + int(rel["r_offset"])
addend = int(rel["r_addend"]) if rel.is_RELA() else 0
sym_name = ""
sym_value = 0
sym_undef = True
if symtab is not None and r_sym != 0:
symbol = symtab.get_symbol(r_sym)
sym_name = symbol.name
sym_value = int(symbol["st_value"])
sym_undef = symbol.entry["st_shndx"] == "SHN_UNDEF"
try:
if r_type == R_X86_64_RELATIVE:
self._write_u64_mem(reloc_addr, base + addend)
elif r_type in (R_X86_64_GLOB_DAT, R_X86_64_JUMP_SLOT):
if sym_name and sym_undef:
self.import_slots.setdefault(sym_name, []).append(
reloc_addr
)
else:
self._write_u64_mem(reloc_addr, base + sym_value + addend)
elif r_type == R_X86_64_64 and sym_name and not sym_undef:
self._write_u64_mem(reloc_addr, base + sym_value + addend)
except UcError:
self.log(
f"skipped relocation type={r_type} at 0x{reloc_addr:x} (unmapped)"
)
def _collect_startup_initializers(self, elf: ELFFile) -> None:
if self.base_addr is None:
raise RuntimeError("base address was not initialized")
base = self.base_addr
dt_preinit_array = 0
dt_preinit_arraysz = 0
dt_init = 0
dt_init_array = 0
dt_init_arraysz = 0
for segment in elf.iter_segments():
if segment["p_type"] != "PT_DYNAMIC":
continue
for tag in segment.iter_tags():
d_tag = tag.entry.d_tag
if d_tag == "DT_PREINIT_ARRAY":
dt_preinit_array = int(tag["d_ptr"])
elif d_tag == "DT_PREINIT_ARRAYSZ":
dt_preinit_arraysz = int(tag["d_val"])
elif d_tag == "DT_INIT":
dt_init = int(tag["d_ptr"])
elif d_tag == "DT_INIT_ARRAY":
dt_init_array = int(tag["d_ptr"])
elif d_tag == "DT_INIT_ARRAYSZ":
dt_init_arraysz = int(tag["d_val"])
funcs: list[int] = []
if dt_preinit_array != 0 and dt_preinit_arraysz > 0:
count = dt_preinit_arraysz // 8
for i in range(count):
entry_addr = base + dt_preinit_array + i * 8
try:
fn = int(self._read_u64_mem(entry_addr))
except UcError:
continue
if fn != 0:
funcs.append(fn)
if dt_init != 0:
funcs.append(base + dt_init)
if dt_init_array != 0 and dt_init_arraysz > 0:
count = dt_init_arraysz // 8
for i in range(count):
entry_addr = base + dt_init_array + i * 8
try:
fn = int(self._read_u64_mem(entry_addr))
except UcError:
continue
if fn != 0:
funcs.append(fn)
seen: set[int] = set()
filtered: list[int] = []
for fn in funcs:
if fn in (0, 0xFFFF_FFFF_FFFF_FFFF):
continue
if fn in seen:
continue
seen.add(fn)
filtered.append(fn)
self.pre_main_init_funcs = filtered
if filtered:
joined = ", ".join(f"0x{addr:x}" for addr in filtered)
self.log(f"startup init funcs: {joined}")
def _alloc_stub_region(self, stub_count: int) -> int:
size = align_up(max(stub_count * 0x10, PAGE_SIZE))
candidate = align_up(max(self.mmap_cursor + 0x400000, 0x6000_0000_0000))
while not self._range_is_free(candidate, size):
candidate += size
self._map_range(candidate, size, UC_PROT_READ | UC_PROT_WRITE | UC_PROT_EXEC)
return candidate
def _build_import_stubs(self) -> None:
self.stub_symbols.clear()
symbols = sorted(self.import_slots.keys())
stub_base = self._alloc_stub_region(len(symbols) + 2)
cursor = stub_base
for sym in symbols:
stub_addr = cursor
cursor += 0x10
self.stub_symbols[stub_addr] = sym
self.uc.mem_write(stub_addr, b"\x31\xc0\xc3") # xor eax,eax; ret
for slot_addr in self.import_slots[sym]:
self._write_u64_mem(slot_addr, stub_addr)
self.main_return_stub = cursor
self.stub_symbols[self.main_return_stub] = "__main_return"
self.uc.mem_write(self.main_return_stub, b"\xc3")
cursor += 0x10
self.libc_after_init_stub = cursor
self.stub_symbols[self.libc_after_init_stub] = "__libc_after_init"
self.uc.mem_write(self.libc_after_init_stub, b"\xc3")
if symbols:
self.log(f"installed {len(symbols)} import stubs at 0x{stub_base:x}")
def _return_from_call(self, ret_value: int) -> None:
rsp = self.uc.reg_read(UC_X86_REG_RSP)
ret_addr = self._read_u64_mem(rsp)
self.uc.reg_write(UC_X86_REG_RSP, rsp + 8)
self.uc.reg_write(UC_X86_REG_RIP, ret_addr)
self.uc.reg_write(UC_X86_REG_RAX, as_u64(ret_value))
def _jump_to_main(self, main_addr: int, argc: int, argv: int, reason: str) -> bool:
if align_down(main_addr) not in self.page_perms:
return False
envp = argv + (argc + 1) * 8
self.uc.reg_write(UC_X86_REG_RDI, argc)
self.uc.reg_write(UC_X86_REG_RSI, argv)
self.uc.reg_write(UC_X86_REG_RDX, envp)
rsp = self.uc.reg_read(UC_X86_REG_RSP)
if self.main_return_stub != 0:
self._write_u64_mem(rsp, self.main_return_stub)
self.uc.reg_write(UC_X86_REG_RIP, main_addr)
self.did_libc_start_shim = True
self.log(
f"applied __libc_start_main shim ({reason}) main=0x{main_addr:x} argc={argc}"
)
return True
def _continue_pending_startup(self) -> bool:
if self.pending_start_main is None:
return False
main_addr, argc, argv = self.pending_start_main
envp = argv + (argc + 1) * 8
while self.pending_init_queue:
init_addr = int(self.pending_init_queue.pop(0))
if init_addr == 0:
continue
if align_down(init_addr) not in self.page_perms:
continue
rsp = self.uc.reg_read(UC_X86_REG_RSP)
if self.libc_after_init_stub != 0:
self._write_u64_mem(rsp, self.libc_after_init_stub)
self.uc.reg_write(UC_X86_REG_RDI, argc)
self.uc.reg_write(UC_X86_REG_RSI, argv)
self.uc.reg_write(UC_X86_REG_RDX, envp)
self.uc.reg_write(UC_X86_REG_RIP, init_addr)
self.log(f"calling startup init 0x{init_addr:x}")
return True
self.pending_start_main = None
return self._jump_to_main(main_addr, argc, argv, "startup-complete")
def _start_main_flow(
self, main_addr: int, argc: int, argv: int, init_addr: int, reason: str
) -> bool:
if align_down(main_addr) not in self.page_perms:
return False
self.pending_start_main = (int(main_addr), int(argc), int(argv))
queue: list[int] = []
if init_addr != 0:
queue.append(int(init_addr))
queue.extend(self.pre_main_init_funcs)
seen: set[int] = set()
self.pending_init_queue = []
for fn in queue:
if fn in seen:
continue
seen.add(fn)
self.pending_init_queue.append(fn)
self.did_libc_start_shim = True
self.log(
f"starting __libc_start_main flow ({reason}) main=0x{int(main_addr):x} init_count={len(self.pending_init_queue)}"
)
return self._continue_pending_startup()
def _try_null_libc_start_shim(self) -> bool:
if self.did_libc_start_shim and self.pending_start_main is None:
return False
main_addr = int(self.uc.reg_read(UC_X86_REG_RDI))
argc = int(self.uc.reg_read(UC_X86_REG_RSI))
argv = int(self.uc.reg_read(UC_X86_REG_RDX))
init_addr = int(self.uc.reg_read(UC_X86_REG_RCX))
return self._start_main_flow(main_addr, argc, argv, init_addr, "null-fetch")
def _handle_import_stub(self, address: int) -> bool:
symbol = self.stub_symbols.get(address)
if symbol is None:
return False
base_symbol = symbol.split("@", 1)[0]
rdi = self.uc.reg_read(UC_X86_REG_RDI)
rsi = self.uc.reg_read(UC_X86_REG_RSI)
rdx = self.uc.reg_read(UC_X86_REG_RDX)
rcx = self.uc.reg_read(UC_X86_REG_RCX)
r8 = self.uc.reg_read(UC_X86_REG_R8)
r9 = self.uc.reg_read(UC_X86_REG_R9)
if base_symbol == "__main_return":
self.exit_code = self.uc.reg_read(UC_X86_REG_RAX) & 0xFF
self.exited = True
self.uc.emu_stop()
return True
if base_symbol == "__libc_after_init":
if self._continue_pending_startup():
return True
self._return_from_call(-errno.ENOSYS)
return True
if base_symbol == "__libc_start_main":
if self._start_main_flow(
int(rdi), int(rsi), int(rdx), int(rcx), "import-stub"
):
return True
self._return_from_call(-errno.ENOSYS)
return True
if base_symbol in {"exit", "_exit", "abort", "__stack_chk_fail"}:
code = int(rdi) & 0xFF
if base_symbol in {"abort", "__stack_chk_fail"}:
code = 134
self.exit_code = code
self.exited = True
self.uc.emu_stop()
return True
if base_symbol in {"mprotect"}:
ret = self._sys_mprotect(int(rdi), int(rsi), int(rdx))
self._return_from_call(ret)
return True
if base_symbol in {"mmap", "mmap64"}:
ret = self._sys_mmap(
int(rdi),
int(rsi),
int(rdx),
int(rcx),
as_i64(r8),
int(r9),
)
self._return_from_call(ret)
return True
if base_symbol in {"munmap"}:
ret = self._sys_munmap(int(rdi), int(rsi))
self._return_from_call(ret)
return True
if base_symbol in {"read"}:
ret = self._sys_read(int(rdi), int(rsi), int(rdx))
self._return_from_call(ret)
return True
if base_symbol in {"write"}:
ret = self._sys_write(int(rdi), int(rsi), int(rdx))
self._return_from_call(ret)
return True
if base_symbol in {"open", "open64"}:
path = self._read_c_string(int(rdi))
ret = self._sys_open_common(path, int(rsi), int(rdx))
self._return_from_call(ret)
return True
if base_symbol in {"close"}:
guest_fd = int(rdi)
if guest_fd in self.guest_fds:
try:
os.close(self.guest_fds.pop(guest_fd))
ret = 0
except OSError as exc:
ret = -os_error_code(exc)
elif guest_fd in (0, 1, 2):
ret = 0
else:
ret = -errno.EBADF
self._return_from_call(ret)
return True
if base_symbol in {"lseek", "lseek64"}:
host_fd = self._resolve_host_fd(int(rdi))
if host_fd is None:
ret = -errno.EBADF
else:
try:
ret = os.lseek(host_fd, as_i64(rsi), int(rdx))
except OSError as exc:
ret = -os_error_code(exc)
self._return_from_call(ret)
return True
if base_symbol in {"memcpy", "memmove", "__memcpy_chk"}:
dst = int(rdi)
src = int(rsi)
count = int(rdx)
data = bytes(self.uc.mem_read(src, count))
self.uc.mem_write(dst, data)
self._return_from_call(dst)
return True
if base_symbol in {"memset"}:
dst = int(rdi)
value = int(rsi) & 0xFF
count = int(rdx)
self.uc.mem_write(dst, bytes([value]) * count)
self._return_from_call(dst)
return True
if base_symbol in {"strlen"}:
length = 0
ptr = int(rdi)
while length < (1 << 20):
b = self.uc.mem_read(ptr + length, 1)
if b == b"\x00":
break
length += 1
self._return_from_call(length)
return True
if base_symbol in {"strcmp"}:
s1 = self._read_c_string(int(rdi))
s2 = self._read_c_string(int(rsi))
if s1 == s2:
ret = 0
elif s1 < s2:
ret = -1
else:
ret = 1
self._return_from_call(ret)
return True
self._return_from_call(0)
return True
def load_elf(self) -> None:
with self.binary_path.open("rb") as f:
elf = ELFFile(f)
if elf.elfclass != 64:
raise ValueError("Only ELF64 is supported")
if elf["e_machine"] != "EM_X86_64":
raise ValueError("Only x86_64 ELF is supported")
is_pie = elf["e_type"] == "ET_DYN"
if self.base_addr is None:
self.base_addr = DEFAULT_PIE_BASE if is_pie else 0
self.entry = self.base_addr + int(elf["e_entry"])
self.phdr = self.base_addr + int(elf["e_phoff"])
self.phent = int(elf["e_phentsize"])
self.phnum = int(elf["e_phnum"])
min_addr = 1 << 63
max_addr = 0
for idx, segment in enumerate(elf.iter_segments()):
if segment["p_type"] != "PT_LOAD":
continue
seg_vaddr = self.base_addr + int(segment["p_vaddr"])
seg_memsz = int(segment["p_memsz"])
seg_filesz = int(segment["p_filesz"])
seg_perm = elf_flags_to_uc(int(segment["p_flags"]))
self._map_range(seg_vaddr, seg_memsz, seg_perm)
seg_data = segment.data()
if seg_data:
self.uc.mem_write(seg_vaddr, seg_data)
if seg_memsz > seg_filesz:
self.uc.mem_write(
seg_vaddr + seg_filesz, b"\x00" * (seg_memsz - seg_filesz)
)
min_addr = min(min_addr, align_down(seg_vaddr))
max_addr = max(max_addr, align_up(seg_vaddr + seg_memsz))
self.log(
f"mapped PT_LOAD[{idx}] 0x{seg_vaddr:x}-0x{seg_vaddr + seg_memsz:x} perm=0x{seg_perm:x}"
)
self.heap_base = align_up(max_addr + PAGE_SIZE)
self.heap_end = self.heap_base
self.mmap_cursor = align_up(self.heap_base + 0x200000)
self._apply_relocations(elf)
self._collect_startup_initializers(elf)
self._build_import_stubs()
def setup_stack(self) -> None:
stack_base = STACK_TOP - STACK_SIZE
self._map_range(stack_base, STACK_SIZE, UC_PROT_READ | UC_PROT_WRITE)
rsp = STACK_TOP
argv0 = str(self.binary_path).encode("utf-8") + b"\x00"
platform = b"x86_64\x00"
random_bytes = os.urandom(16)
rsp -= len(argv0)
self.uc.mem_write(rsp, argv0)
argv0_ptr = rsp
rsp -= len(platform)
self.uc.mem_write(rsp, platform)
platform_ptr = rsp
rsp -= len(random_bytes)
self.uc.mem_write(rsp, random_bytes)
random_ptr = rsp
rsp = align_down(rsp, 16)
auxv = [
(AT_PHDR, self.phdr),
(AT_PHENT, self.phent),
(AT_PHNUM, self.phnum),
(AT_PAGESZ, PAGE_SIZE),
(AT_BASE, 0),
(AT_ENTRY, self.entry),
(AT_PLATFORM, platform_ptr),
(AT_RANDOM, random_ptr),
(AT_NULL, 0),
]
for at_type, at_val in reversed(auxv):
rsp = self._push_u64(rsp, at_val)
rsp = self._push_u64(rsp, at_type)
rsp = self._push_u64(rsp, 0) # envp terminator
rsp = self._push_u64(rsp, 0) # argv terminator
rsp = self._push_u64(rsp, argv0_ptr)
rsp = self._push_u64(rsp, 1) # argc
self.uc.reg_write(UC_X86_REG_RSP, rsp)
self.uc.reg_write(UC_X86_REG_RBP, 0)
self.uc.reg_write(UC_X86_REG_RIP, self.entry)
def _write_utsname(self, addr: int) -> int:
fields = [
b"Linux",
b"unicorn-host",
b"5.15.0",
b"#1 SMP",
b"x86_64",
b"localdomain",
]
blob = b"".join(field.ljust(65, b"\x00")[:65] for field in fields)
self.uc.mem_write(addr, blob)
return 0
def _write_timespec(self, addr: int) -> None:
ns = time.time_ns()
sec = ns // 1_000_000_000
nsec = ns % 1_000_000_000
self.uc.mem_write(addr, struct.pack("<qq", sec, nsec))
def _write_stat_placeholder(self, addr: int) -> None:
self.uc.mem_write(addr, b"\x00" * 144)
def _sys_open_common(self, path: str, flags: int, mode: int) -> int:
try:
host_fd = os.open(path, flags, mode)
except OSError as exc:
return -os_error_code(exc)
return self._alloc_guest_fd(host_fd)
def _sys_read(self, guest_fd: int, buf: int, count: int) -> int:
host_fd = self._resolve_host_fd(guest_fd)
if host_fd is None:
return -errno.EBADF
try:
data = os.read(host_fd, count)
if data:
self.uc.mem_write(buf, data)
return len(data)
except OSError as exc:
return -os_error_code(exc)
def _sys_write(self, guest_fd: int, buf: int, count: int) -> int:
host_fd = self._resolve_host_fd(guest_fd)
if host_fd is None:
return -errno.EBADF
try:
data = bytes(self.uc.mem_read(buf, count))
written = os.write(host_fd, data)
return written
except UcError:
return -errno.EFAULT
except OSError as exc:
return -os_error_code(exc)
def _sys_writev(self, guest_fd: int, iov_addr: int, iovcnt: int) -> int:
host_fd = self._resolve_host_fd(guest_fd)
if host_fd is None:
return -errno.EBADF
total = 0
try:
for i in range(iovcnt):
ent = bytes(self.uc.mem_read(iov_addr + i * 16, 16))
base, length = struct.unpack("<QQ", ent)
if length == 0:
continue
data = bytes(self.uc.mem_read(base, length))
total += os.write(host_fd, data)
return total
except UcError:
return -errno.EFAULT
except OSError as exc:
return -os_error_code(exc)
def _sys_mmap(
self, addr: int, length: int, prot: int, flags: int, fd: int, offset: int
) -> int:
if length <= 0:
return -errno.EINVAL
size = align_up(length)
req_addr = align_down(addr)
perm = linux_prot_to_uc(prot)
if (flags & MAP_FIXED) and req_addr != 0:
target = req_addr
self._unmap_range(target, size)
else:
target = align_up(max(self.mmap_cursor, self.heap_end + PAGE_SIZE))
while not self._range_is_free(target, size):
target += PAGE_SIZE
self._map_range(target, size, perm)
if fd >= 0 and not (flags & MAP_ANONYMOUS):
host_fd = self._resolve_host_fd(fd)
if host_fd is None:
self._unmap_range(target, size)
return -errno.EBADF
try:
cur = os.lseek(host_fd, 0, os.SEEK_CUR)
os.lseek(host_fd, offset, os.SEEK_SET)
data = os.read(host_fd, length)
os.lseek(host_fd, cur, os.SEEK_SET)
if data:
self.uc.mem_write(target, data)
except OSError as exc:
self._unmap_range(target, size)
return -os_error_code(exc)
self.mmap_cursor = align_up(target + size + PAGE_SIZE)
return target
def _sys_mprotect(self, addr: int, length: int, prot: int) -> int:
if length <= 0:
return 0
start = align_down(addr)
end = align_up(addr + length)
page = start
while page < end:
if page not in self.page_perms:
return -errno.ENOMEM
page += PAGE_SIZE
perm = linux_prot_to_uc(prot)
page = start
while page < end:
self._set_page_perm(page, perm)
page += PAGE_SIZE
return 0
def _sys_munmap(self, addr: int, length: int) -> int:
if length <= 0:
return -errno.EINVAL
self._unmap_range(addr, length)
return 0
def _sys_brk(self, requested: int) -> int:
if requested == 0:
return self.heap_end
if requested < self.heap_base:
return self.heap_end
if requested > self.heap_end:
grow = requested - self.heap_end
self._map_range(self.heap_end, grow, UC_PROT_READ | UC_PROT_WRITE)
self.heap_end = requested
return self.heap_end
def hook_syscall(self, _uc: Uc, _user_data: Any) -> None:
nr = self.uc.reg_read(UC_X86_REG_RAX)
a0 = self.uc.reg_read(UC_X86_REG_RDI)
a1 = self.uc.reg_read(UC_X86_REG_RSI)
a2 = self.uc.reg_read(UC_X86_REG_RDX)
a3 = self.uc.reg_read(UC_X86_REG_R10)
a4 = self.uc.reg_read(UC_X86_REG_R8)
a5 = self.uc.reg_read(UC_X86_REG_R9)
ret = -errno.ENOSYS
try:
if nr == SYS_READ:
ret = self._sys_read(int(a0), int(a1), int(a2))
elif nr == SYS_WRITE:
ret = self._sys_write(int(a0), int(a1), int(a2))
elif nr == SYS_WRITEV:
ret = self._sys_writev(int(a0), int(a1), int(a2))
elif nr == SYS_OPEN:
path = self._read_c_string(int(a0))
ret = self._sys_open_common(path, int(a1), int(a2))
elif nr == SYS_OPENAT:
path = self._read_c_string(int(a1))
ret = self._sys_open_common(path, int(a2), int(a3))
elif nr == SYS_CLOSE:
guest_fd = int(a0)
if guest_fd in (0, 1, 2):
ret = 0
elif guest_fd in self.guest_fds:
host_fd = self.guest_fds.pop(guest_fd)
os.close(host_fd)
ret = 0
else:
ret = -errno.EBADF
elif nr == SYS_LSEEK:
host_fd = self._resolve_host_fd(int(a0))
if host_fd is None:
ret = -errno.EBADF
else:
ret = os.lseek(host_fd, as_i64(a1), int(a2))
elif nr == SYS_MMAP:
ret = self._sys_mmap(
int(a0), int(a1), int(a2), int(a3), as_i64(a4), int(a5)
)
elif nr == SYS_MPROTECT:
ret = self._sys_mprotect(int(a0), int(a1), int(a2))
elif nr == SYS_MUNMAP:
ret = self._sys_munmap(int(a0), int(a1))
elif nr == SYS_BRK:
ret = self._sys_brk(int(a0))
elif nr == SYS_RT_SIGACTION:
ret = 0
elif nr == SYS_RT_SIGPROCMASK:
ret = 0
elif nr == SYS_IOCTL:
ret = -errno.ENOTTY
elif nr == SYS_ACCESS:
path = self._read_c_string(int(a0))
ret = 0 if os.access(path, int(a1)) else -errno.ENOENT
elif nr == SYS_GETPID:
ret = 1337
elif nr == SYS_GETUID:
ret = os.getuid()
elif nr == SYS_GETGID:
ret = os.getgid()
elif nr == SYS_UNAME:
ret = self._write_utsname(int(a0))
elif nr == SYS_ARCH_PRCTL:
code = int(a0)
val = int(a1)
if code == ARCH_SET_FS:
self.uc.reg_write(UC_X86_REG_FS_BASE, val)
ret = 0
elif code == ARCH_SET_GS:
self.uc.reg_write(UC_X86_REG_GS_BASE, val)
ret = 0
elif code == ARCH_GET_FS:
fs = self.uc.reg_read(UC_X86_REG_FS_BASE)
self.uc.mem_write(val, struct.pack("<Q", fs))
ret = 0
elif code == ARCH_GET_GS:
gs = self.uc.reg_read(UC_X86_REG_GS_BASE)
self.uc.mem_write(val, struct.pack("<Q", gs))
ret = 0
else:
ret = -errno.EINVAL
elif nr == SYS_FUTEX:
ret = 0
elif nr == SYS_SET_TID_ADDRESS:
ret = 1337
elif nr == SYS_CLOCK_GETTIME:
self._write_timespec(int(a1))
ret = 0
elif nr == SYS_READLINK:
path = self._read_c_string(int(a0))
buf = int(a1)
buf_size = int(a2)
if path == "/proc/self/exe":
target = str(self.binary_path).encode("utf-8")
else:
target = os.readlink(path).encode("utf-8")
n = min(len(target), buf_size)
if n:
self.uc.mem_write(buf, target[:n])
ret = n
elif nr == SYS_NEWFSTATAT:
self._write_stat_placeholder(int(a2))
ret = 0
elif nr == SYS_PRLIMIT64:
ret = 0
elif nr == SYS_GETRANDOM:
buf = int(a0)
size = int(a1)
data = os.urandom(size)
self.uc.mem_write(buf, data)
ret = size
elif nr in (SYS_EXIT, SYS_EXIT_GROUP):
self.exit_code = int(a0) & 0xFF
self.exited = True
ret = 0
self.uc.emu_stop()
else:
self.log(f"unsupported syscall {nr}, returning -ENOSYS")
ret = -errno.ENOSYS
except UcError:
ret = -errno.EFAULT
except OSError as exc:
ret = -os_error_code(exc)
self._set_ret(ret)
def hook_mem_write(
self,
_uc: Uc,
_access: int,
address: int,
size: int,
_value: int,
_user_data: Any,
) -> None:
if size <= 0:
return
page = align_down(address)
end_page = align_down(address + size - 1)
while page <= end_page:
self.dirty_pages.add(page)
page += PAGE_SIZE
def _dump_for_page(self, page: int, trigger_rip: int) -> None:
def candidate(p: int) -> bool:
return (
p in self.dirty_pages
and p in self.exec_pages
and p not in self.dumped_pages
)
if not candidate(page):
return
start = page
end = page + PAGE_SIZE
while candidate(start - PAGE_SIZE):
start -= PAGE_SIZE
while candidate(end):
end += PAGE_SIZE
size = end - start
if size > self.max_dump_bytes:
size = align_down(self.max_dump_bytes)
start = align_down(page)
end = start + size
blob = bytes(self.uc.mem_read(start, size))
digest = hashlib.sha256(blob).hexdigest()
idx = len(self.dumps) + 1
blob_name = (
f"dump_{idx:04d}_rip_0x{trigger_rip:x}_base_0x{start:x}_size_0x{size:x}.bin"
)
meta_name = blob_name + ".json"
blob_path = self.out_dir / blob_name
meta_path = self.out_dir / meta_name
blob_path.write_bytes(blob)
meta = {
"index": idx,
"trigger_rip": trigger_rip,
"start": start,
"size": size,
"sha256": digest,
"timestamp": time.time(),
}
meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
p = start
while p < end:
self.dumped_pages.add(p)
p += PAGE_SIZE
info = DumpInfo(
index=idx,
trigger_rip=trigger_rip,
start=start,
size=size,
sha256=digest,
blob_path=str(blob_path),
meta_path=str(meta_path),
)
self.dumps.append(info)
print(f"[+] dumped unpacked code #{idx}: 0x{start:x}-0x{end:x} ({size} bytes)")
def hook_block(self, _uc: Uc, address: int, _size: int, _user_data: Any) -> None:
if address in self.stub_symbols:
self._handle_import_stub(address)
return
page = align_down(address)
if (
page in self.dirty_pages
and page in self.exec_pages
and page not in self.dumped_pages
):
self._dump_for_page(page, address)
def hook_mem_invalid(
self,
_uc: Uc,
access: int,
address: int,
size: int,
_value: int,
_user_data: Any,
) -> bool:
page = align_down(address)
if access == UC_MEM_FETCH_PROT and page in self.page_perms:
new_perm = self.page_perms[page] | UC_PROT_EXEC
self._set_page_perm(page, new_perm)
self.log(f"promoted execute perm at 0x{page:x}")
return True
if access == UC_MEM_WRITE_PROT and page in self.page_perms:
new_perm = self.page_perms[page] | UC_PROT_WRITE
self._set_page_perm(page, new_perm)
self.log(f"promoted write perm at 0x{page:x}")
return True
if access == UC_MEM_READ_PROT and page in self.page_perms:
new_perm = self.page_perms[page] | UC_PROT_READ
self._set_page_perm(page, new_perm)
return True
if access == UC_MEM_FETCH_UNMAPPED and address == 0:
if self._try_null_libc_start_shim():
return True
access_name = {
UC_MEM_READ_UNMAPPED: "READ_UNMAPPED",
UC_MEM_WRITE_UNMAPPED: "WRITE_UNMAPPED",
UC_MEM_FETCH_UNMAPPED: "FETCH_UNMAPPED",
}.get(access, f"access={access}")
print(
f"[!] invalid memory {access_name} at 0x{address:x} (size={size}); stopping"
)
return False
def hook_intr(self, _uc: Uc, intno: int, _user_data: Any) -> None:
print(f"[!] unhandled interrupt int 0x{intno:x}; stopping")
self.uc.emu_stop()
def install_hooks(self) -> None:
self.uc.hook_add(UC_HOOK_MEM_WRITE, self.hook_mem_write)
self.uc.hook_add(UC_HOOK_BLOCK, self.hook_block)
self.uc.hook_add(UC_HOOK_MEM_INVALID, self.hook_mem_invalid)
self.uc.hook_add(
UC_HOOK_INSN, self.hook_syscall, None, 1, 0, UC_X86_INS_SYSCALL
)
self.uc.hook_add(UC_HOOK_INTR, self.hook_intr)
def run(self) -> None:
self.out_dir.mkdir(parents=True, exist_ok=True)
self.load_elf()
self.setup_stack()
self.install_hooks()
print(f"[*] emulating: {self.binary_path}")
print(
f"[*] entry=0x{self.entry:x} base=0x{self.base_addr:x} max_insn={self.max_instructions}"
)
self.start_time = time.time()
stop_reason = "instruction limit reached"
try:
self.uc.emu_start(
self.entry, 0xFFFF_FFFF_FFFF_FFFF, 0, self.max_instructions
)
if self.exited:
stop_reason = f"guest exit({self.exit_code})"
except UcError as exc:
stop_reason = f"unicorn error: {exc}"
elapsed = time.time() - self.start_time
print(f"[*] stopped: {stop_reason} ({elapsed:.3f}s)")
print(f"[*] dumps written: {len(self.dumps)}")
report = {
"binary": str(self.binary_path),
"entry": self.entry,
"base": self.base_addr,
"max_instructions": self.max_instructions,
"stop_reason": stop_reason,
"elapsed_seconds": elapsed,
"dumps": [
{
"index": d.index,
"trigger_rip": d.trigger_rip,
"start": d.start,
"size": d.size,
"sha256": d.sha256,
"blob_path": d.blob_path,
"meta_path": d.meta_path,
}
for d in self.dumps
],
}
(self.out_dir / "run_report.json").write_text(
json.dumps(report, indent=2), encoding="utf-8"
)
self._close_guest_fds()
def parse_int(value: str) -> int:
return int(value, 0)
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Emulate an ELF x86_64 binary with Unicorn and dump runtime-written executable "
"pages when they are first executed (useful for packed binaries)."
)
)
parser.add_argument(
"binary",
nargs="?",
default="/Users/int/Downloads/test_packer",
help="Path to ELF64 x86_64 binary (default: /Users/int/Downloads/test_packer)",
)
parser.add_argument(
"--out-dir",
default="dumps",
help="Directory for dumped machine code and metadata",
)
parser.add_argument(
"--max-insn",
type=int,
default=5_000_000,
help="Maximum number of instructions to emulate",
)
parser.add_argument(
"--max-dump-bytes",
type=parse_int,
default=16 * 1024 * 1024,
help="Maximum size of a single dump region (accepts decimal or 0x...)",
)
parser.add_argument(
"--base",
type=parse_int,
default=None,
help="Force load base address for PIE binaries (e.g. 0x555555554000)",
)
parser.add_argument(
"--verbose", action="store_true", help="Enable verbose emulator logging"
)
args = parser.parse_args()
binary_path = Path(args.binary).expanduser().resolve()
if not binary_path.exists():
raise FileNotFoundError(f"Binary not found: {binary_path}")
emulator = UnpackEmulator(
binary_path=binary_path,
out_dir=Path(args.out_dir),
max_instructions=args.max_insn,
max_dump_bytes=args.max_dump_bytes,
base_addr=args.base,
verbose=args.verbose,
)
emulator.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment