Skip to content

Instantly share code, notes, and snippets.

@StarrFox
Last active June 20, 2025 22:05
Show Gist options
  • Save StarrFox/0bbefa5f0aa7db0db73592463d056244 to your computer and use it in GitHub Desktop.
Save StarrFox/0bbefa5f0aa7db0db73592463d056244 to your computer and use it in GitHub Desktop.
import pymem
import pymem.memory
import pymem.process
import pymem.ressources.structure
import regex
import json
import time
import threading
from queue import Queue, Empty
from pathlib import Path
from typing import TypedDict, TypeAlias
from collections import defaultdict
from pymem.ressources.structure import MODULEINFO
ROOT = Path(__file__).parent
class FuncInfo(TypedDict):
name: str
signature: str
mangled_name: str
PatternMap: TypeAlias = dict[str, bytes]
def _sig_to_bytes(str) -> bytes:
return b".".join(map(regex.escape, map(bytes.fromhex, str.split("?"))))
def process_json(file_path: Path) -> PatternMap:
entries: list[FuncInfo] = json.load(file_path.open())
processed: PatternMap = {}
for entry in entries:
processed[entry["name"]] = _sig_to_bytes(entry["signature"])
return processed
allowed_protections = [
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_EXECUTE,
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_EXECUTE_READ,
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_EXECUTE_READWRITE,
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_READWRITE,
pymem.ressources.structure.MEMORY_PROTECTION.PAGE_READONLY,
]
def _threaded_1_worker(queue: Queue[tuple[bytes, bytes, int, str]], out_queue: Queue[dict[str, list]], kill_event: threading.Event):
while not kill_event.is_set():
try:
pattern, data, start, name = queue.get(timeout=0.1)
except Empty:
continue
matches: dict[str, list] = defaultdict(list)
for match in regex.finditer(pattern, data, flags=regex.DOTALL):
matches[name].append(start + match.span()[0])
out_queue.put(matches)
queue.task_done()
def scan_module_threaded_1(process: pymem.Pymem, module: MODULEINFO, patterns: PatternMap, *, worker_number: int = 4):
region_start = module.lpBaseOfDll
max_size = region_start + module.SizeOfImage
queue: Queue[tuple[bytes, bytes, int, str]] = Queue()
out_queue: Queue[dict[str, list]] = Queue()
kill_event = threading.Event()
threads: list[threading.Thread] = []
for _ in range(worker_number):
thread = threading.Thread(target=_threaded_1_worker, args=(queue, out_queue, kill_event), daemon=True)
thread.start()
threads.append(thread)
matches: dict[str, list] = defaultdict(list)
while region_start < max_size:
region_info = pymem.memory.virtual_query(process.process_handle, region_start)
region_start = region_info.BaseAddress + region_info.RegionSize
# check for MEM_COMMIT
if region_info.State != 0x1000 or not region_info.protect in allowed_protections:
continue
region_data = process.read_bytes(region_info.BaseAddress, region_info.RegionSize)
for name, pattern in patterns.items():
queue.put((pattern, region_data, region_info.BaseAddress, name))
while not queue.unfinished_tasks == 0 or not out_queue.unfinished_tasks == 0:
processed_match = out_queue.get()
matches.update(processed_match)
out_queue.task_done()
kill_event.set()
for thread in threads:
thread.join()
return matches
class Timer:
start: float = 0
def __init__(self, name: str):
self.name = name
def __enter__(self, *_):
self.start = time.perf_counter()
return self
def __exit__(self, *_):
end = time.perf_counter()
print(f"{self.name}: {round(end - self.start, 2)}s")
def main():
process = pymem.Pymem("NMS.exe")
module = pymem.process.module_from_name(process.process_handle, "NMS.exe")
if module is None:
raise RuntimeError
patterns = process_json(ROOT / "data.json")
with Timer("threaded_1"):
result = scan_module_threaded_1(process, module, patterns)
print(result)
if __name__ == "__main__":
main()
import concurrent.futures
def _threaded_2_worker(pattern: bytes, data: bytes, start: int):
matches: list[int] = []
for match in regex.finditer(pattern, data, flags=regex.DOTALL):
matches.append(start + match.span()[0])
return matches
def scan_module_threaded_2(process: pymem.Pymem, module: MODULEINFO, patterns: PatternMap, *, worker_number: int = 4):
region_start = module.lpBaseOfDll
max_size = region_start + module.SizeOfImage
matches: dict[str, list] = defaultdict(list)
futures: dict[concurrent.futures.Future[list[int]], str] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_number) as executor:
while region_start < max_size:
region_info = pymem.memory.virtual_query(process.process_handle, region_start)
region_start = region_info.BaseAddress + region_info.RegionSize
# check for MEM_COMMIT
if region_info.State != 0x1000 or not region_info.protect in allowed_protections:
continue
region_data = process.read_bytes(region_info.BaseAddress, region_info.RegionSize)
for name, pattern in patterns.items():
futures[executor.submit(_threaded_2_worker, pattern, region_data, region_info.BaseAddress)] = name
for future in concurrent.futures.as_completed(futures):
matches[futures[future]].extend(future.result())
return matches
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment