Created
October 13, 2023 11:51
-
-
Save heck-gd/b59b9c48c0bfe12229e363e7f5653e56 to your computer and use it in GitHub Desktop.
CobaltStrike Volatility Config Extractor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import re | |
from itertools import cycle | |
MAX_SETTINGS = 128 | |
def load_mapping(filename: str) -> dict[int, int]: | |
"""Processes textual Volatility memmap output into a page mapping.""" | |
page_mapping = {} | |
with open(filename, 'r') as fp: | |
for line in fp.readlines(): | |
if not line.startswith("0x"): | |
continue | |
cols = line.split("\t") | |
virt_addr = int(cols[0], 0) | |
size = int(cols[2], 0) | |
offset = int(cols[3], 0) | |
page_mapping[virt_addr] = offset | |
if size > 0x1000: | |
for i in range(virt_addr + 0x1000, virt_addr + size, 0x1000): | |
page_mapping[i] = offset + i - virt_addr | |
return page_mapping | |
def get_unpacked_config_address(cs_filename: str) -> int: | |
"""Gets unpacked config heap address from beacon dump.""" | |
with open(cs_filename, 'rb') as fp: | |
data = fp.read() | |
match = re.search(rb"\x48\x8B\x0D(.{4})\x48\x0F\xBF\xFF\x8B\xD3\x0F\xB7\xF0\x48\xC1\xE7\x04\x66\x89\x1C\x0F", data) # noqa:E501 | |
if not match: | |
raise Exception("Code pattern for config reference not found") | |
# Compute RIP-relative pointer to data section. | |
data_addr = int.from_bytes(match.group(1), 'little') + match.start() + 7 | |
# Read qword in data section to get pointer to heap. | |
heap_addr = int.from_bytes(data[data_addr:data_addr+8], 'little') | |
return heap_addr | |
def decrypt(data: bytes, key: bytes) -> bytes: | |
"""Unmasks XOR with cyclic key.""" | |
return bytes([b ^ k for b, k in zip(data, cycle(key))]) | |
def _page(addr: int) -> int: | |
return addr & 0xFFFFFFFF_FFFFF000 | |
def read_and_decrypt_heap(pages: dict[int, int], heap_addr: int, size: int, | |
dmp_filename: str, key: bytes) -> bytes: | |
"""Reads and unmasks heap data from the dump.""" | |
page_addr = _page(heap_addr) | |
if page_addr not in pages: | |
raise Exception(f"Unable to find the requested page: {hex(page_addr)}") | |
with open(dmp_filename, 'rb') as dmp: | |
dmp.seek(pages[page_addr] + (heap_addr & 0xfff)) | |
crypted_data = dmp.read(size) | |
return decrypt(crypted_data, key) | |
def retrieve_config(pages: dict[int, int], heap_addr: int, dmp_filename: str, | |
key: bytes) -> dict[int, tuple[int, bytes]]: | |
config_decrypted = read_and_decrypt_heap(pages, heap_addr, MAX_SETTINGS * 16, dmp_filename, key) | |
for i in range(10): | |
kind = int.from_bytes(config_decrypted[i:i+2], 'little') | |
if kind not in (0, 1, 2, 3): | |
print(config_decrypted.hex()) | |
raise Exception(f"Config sanity check failed, got kind {hex(kind)} at index {i}") | |
config = {} | |
for i in range(MAX_SETTINGS): | |
offset = i * 16 | |
kind = int.from_bytes(config_decrypted[offset:offset+2], 'little') | |
if kind == 0: | |
# Empty slot. | |
continue | |
data = int.from_bytes(config_decrypted[offset+8:offset+16], 'little') | |
if kind == 1: # WORD | |
data_bin = (data & 0xFFFF).to_bytes(2, 'big') | |
elif kind == 2: # DWORD | |
data_bin = (data & 0xFFFFFFFF).to_bytes(4, 'big') | |
elif kind == 3: | |
data_bin = read_and_decrypt_heap(pages, data, 512, dmp_filename, key) | |
# Consider value terminated after at least 10 zero chars. | |
zero_pos = data_bin.find(b"\x00" * 10) | |
if zero_pos > -1: | |
data_bin = data_bin[:zero_pos] | |
else: | |
print(f"[!!!] Encountered unknown kind {kind} at {i}") | |
continue | |
config[i] = (kind, data_bin) | |
return config | |
if __name__ == '__main__': | |
path_mapping = "/path/to/memmap.txt" | |
path_dmp = "/path/to/memmap.dmp" | |
path_cobalt = "/path/to/unmasked_beacon.dat" | |
key = bytes.fromhex("<your key as hex>") | |
pages = load_mapping(path_mapping) | |
heap_addr = get_unpacked_config_address(path_cobalt) | |
print(f"Found heap address {hex(heap_addr)}") | |
config = retrieve_config(pages, heap_addr, path_dmp, key) | |
print(config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment