Created
May 18, 2026 08:46
-
-
Save aaaddress1/ff2338c7167d2ea3bfbdedb72ede1ad3 to your computer and use it in GitHub Desktop.
OpenFile("\\.\C:") + ReadFile - Forensic based Read All Your FIles without Other Win32 APIs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| NTFS resident $DATA demo (v2 — fragmentation-aware + force flush) | |
| """ | |
| import ctypes, struct, os, uuid, time | |
| from ctypes import wintypes | |
| k32 = ctypes.WinDLL('kernel32', use_last_error=True) | |
| k32.CreateFileW.argtypes = [wintypes.LPCWSTR, wintypes.DWORD, wintypes.DWORD, | |
| wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD, wintypes.HANDLE] | |
| k32.CreateFileW.restype = wintypes.HANDLE | |
| k32.SetFilePointerEx.argtypes = [wintypes.HANDLE, ctypes.c_longlong, | |
| ctypes.POINTER(ctypes.c_longlong), wintypes.DWORD] | |
| k32.SetFilePointerEx.restype = wintypes.BOOL | |
| k32.ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, | |
| ctypes.POINTER(wintypes.DWORD), wintypes.LPVOID] | |
| k32.ReadFile.restype = wintypes.BOOL | |
| k32.FlushFileBuffers.argtypes = [wintypes.HANDLE] | |
| k32.FlushFileBuffers.restype = wintypes.BOOL | |
| k32.CloseHandle.argtypes = [wintypes.HANDLE] | |
| GENERIC_READ, GENERIC_WRITE = 0x80000000, 0x40000000 | |
| FILE_SHARE_RW, OPEN_EXISTING, FILE_BEGIN = 0x3, 3, 0 | |
| INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value | |
| def open_volume(path, write=False): | |
| access = GENERIC_READ | (GENERIC_WRITE if write else 0) | |
| h = k32.CreateFileW(path, access, FILE_SHARE_RW, None, OPEN_EXISTING, 0, None) | |
| if h == INVALID_HANDLE_VALUE: | |
| raise OSError(f"CreateFile failed: err={ctypes.get_last_error()}") | |
| return h | |
| def read_at(h, offset, size): | |
| pos = ctypes.c_longlong(0) | |
| k32.SetFilePointerEx(h, offset, ctypes.byref(pos), FILE_BEGIN) | |
| buf = (ctypes.c_ubyte * size)() | |
| n = wintypes.DWORD(0) | |
| k32.ReadFile(h, buf, size, ctypes.byref(n), None) | |
| return bytes(buf[:n.value]) | |
| # ─── NTFS structures ─── | |
| def parse_vbr(vbr): | |
| assert vbr[3:11] == b'NTFS ' | |
| bps = struct.unpack_from('<H', vbr, 0x0B)[0] | |
| spc = struct.unpack_from('<B', vbr, 0x0D)[0] | |
| mft_lcn = struct.unpack_from('<Q', vbr, 0x30)[0] | |
| cpr = struct.unpack_from('<b', vbr, 0x40)[0] | |
| cluster = bps * spc | |
| rec_size = cpr * cluster if cpr >= 0 else (1 << -cpr) | |
| return {'bps': bps, 'cluster': cluster, | |
| 'mft_off': mft_lcn * cluster, 'rec_size': rec_size} | |
| def apply_fixups(rec, sector_size): | |
| """USA fixup. 失敗回 None(壞 record 跳過而不是 abort)""" | |
| if len(rec) < 0x30 or rec[:4] != b'FILE': | |
| return None | |
| try: | |
| rec = bytearray(rec) | |
| usa_off = struct.unpack_from('<H', rec, 0x04)[0] | |
| usa_cnt = struct.unpack_from('<H', rec, 0x06)[0] | |
| if usa_off == 0 or usa_cnt < 2 or usa_off + usa_cnt * 2 > len(rec): | |
| return None | |
| usa = bytes(rec[usa_off:usa_off + usa_cnt * 2]) | |
| sig = usa[:2] | |
| for i in range(1, usa_cnt): | |
| end = i * sector_size | |
| if end > len(rec) or bytes(rec[end - 2:end]) != sig: | |
| return None | |
| rec[end - 2:end] = usa[i * 2:i * 2 + 2] | |
| return bytes(rec) | |
| except Exception: | |
| return None | |
| def walk_attrs(rec): | |
| try: | |
| pos = struct.unpack_from('<H', rec, 0x14)[0] | |
| while pos < len(rec) - 4: | |
| t = struct.unpack_from('<I', rec, pos)[0] | |
| if t == 0xFFFFFFFF: | |
| return | |
| length = struct.unpack_from('<I', rec, pos + 4)[0] | |
| if length == 0 or pos + length > len(rec): | |
| return | |
| yield {'type': t, 'length': length, | |
| 'resident': rec[pos + 8] == 0, | |
| 'data': bytes(rec[pos:pos + length])} | |
| pos += length | |
| except Exception: | |
| return | |
| def parse_data_runs(runs): | |
| out, pos, prev_lcn = [], 0, 0 | |
| while pos < len(runs) and runs[pos] != 0: | |
| hdr = runs[pos]; pos += 1 | |
| l_sz, o_sz = hdr & 0xF, (hdr >> 4) & 0xF | |
| if pos + l_sz + o_sz > len(runs): break | |
| length = int.from_bytes(runs[pos:pos + l_sz], 'little'); pos += l_sz | |
| if o_sz == 0: | |
| out.append({'lcn': None, 'length': length, 'sparse': True}) | |
| else: | |
| delta = int.from_bytes(runs[pos:pos + o_sz], 'little', signed=True) | |
| pos += o_sz; prev_lcn += delta | |
| out.append({'lcn': prev_lcn, 'length': length, 'sparse': False}) | |
| return out | |
| def get_filename(d): | |
| v_off = struct.unpack_from('<H', d, 0x14)[0] | |
| fn_len = d[v_off + 0x40] | |
| return d[v_off + 0x42 : v_off + 0x42 + fn_len * 2].decode('utf-16-le', 'replace') | |
| def get_resident_data(d): | |
| v_len = struct.unpack_from('<I', d, 0x10)[0] | |
| v_off = struct.unpack_from('<H', d, 0x14)[0] | |
| return bytes(d[v_off:v_off + v_len]) | |
| def iter_mft_records(h, vbr, mft_runs): | |
| """沿著 $MFT data runs 走訪整個 MFT(正確處理 fragmentation)""" | |
| rec_num = 0 | |
| for run in mft_runs: | |
| recs_per_run = (run['length'] * vbr['cluster']) // vbr['rec_size'] | |
| if run['sparse']: | |
| rec_num += recs_per_run | |
| continue | |
| disk_off = run['lcn'] * vbr['cluster'] | |
| byte_len = run['length'] * vbr['cluster'] | |
| CHUNK = 4 * 1024 * 1024 | |
| for off in range(0, byte_len, CHUNK): | |
| sz = min(CHUNK, byte_len - off) | |
| data = read_at(h, disk_off + off, sz) | |
| for i in range(0, len(data) - vbr['rec_size'] + 1, vbr['rec_size']): | |
| yield rec_num, data[i:i + vbr['rec_size']] | |
| rec_num += 1 | |
| # ─── Demo ─── | |
| def main(): | |
| # 1. 寫檔 | |
| os.makedirs(r'C:\tmp', exist_ok=True) | |
| path = r'C:\tmp\hello.txt' | |
| marker = f"hello from raw disk! id={uuid.uuid4()}" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| f.write(marker) | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| print(f"[Setup] wrote {len(marker)} bytes to {path}") | |
| print(f" marker = {marker!r}") | |
| # 2. 強制 volume flush(讓 MFT dirty page 寫回 disk) | |
| hv = open_volume(r'\\.\C:', write=True) | |
| ok = bool(k32.FlushFileBuffers(hv)) | |
| k32.CloseHandle(hv) | |
| print(f"\n[Flush] FlushFileBuffers on \\\\.\\C: = {ok}") | |
| time.sleep(1) | |
| # 3. 讀 volume | |
| h = open_volume(r'\\.\C:') | |
| try: | |
| vbr = parse_vbr(read_at(h, 0, 512)) | |
| print(f"\n[VBR] cluster={vbr['cluster']} " | |
| f"$MFT @ 0x{vbr['mft_off']:X} rec_size={vbr['rec_size']}") | |
| # 4. 讀 $MFT record 0,解出 $MFT 自己的 data runs | |
| rec0 = apply_fixups(read_at(h, vbr['mft_off'], vbr['rec_size']), vbr['bps']) | |
| assert rec0, "$MFT record 0 corrupt" | |
| mft_runs = None | |
| for a in walk_attrs(rec0): | |
| if a['type'] == 0x80 and not a['resident']: | |
| run_off = struct.unpack_from('<H', a['data'], 0x20)[0] | |
| mft_runs = parse_data_runs(a['data'][run_off:a['length']]) | |
| assert mft_runs, "no $DATA in $MFT record 0" | |
| total_recs = sum(r['length'] * vbr['cluster'] // vbr['rec_size'] for r in mft_runs) | |
| print(f"\n[MFT] {len(mft_runs)} extent(s), {total_recs:,} records total") | |
| for i, r in enumerate(mft_runs): | |
| if r['sparse']: | |
| print(f" [{i}] SPARSE × {r['length']} clusters") | |
| else: | |
| off = r['lcn'] * vbr['cluster'] | |
| sz_mb = r['length'] * vbr['cluster'] / (1 << 20) | |
| print(f" [{i}] LCN {r['lcn']:>12} × {r['length']:>8} " | |
| f"clusters @ 0x{off:X} ({sz_mb:.1f} MB)") | |
| # 5. 沿 data runs 掃整個 MFT | |
| print(f"\n[Scan] scanning all {total_recs:,} MFT records...") | |
| scanned, valid, found = 0, 0, None | |
| for rec_num, raw in iter_mft_records(h, vbr, mft_runs): | |
| scanned += 1 | |
| rec = apply_fixups(raw, vbr['bps']) | |
| if rec is None: | |
| continue | |
| if not (struct.unpack_from('<H', rec, 0x16)[0] & 1): | |
| continue | |
| valid += 1 | |
| fname, data_attr = None, None | |
| for a in walk_attrs(rec): | |
| if a['type'] == 0x30 and a['resident']: | |
| fname = get_filename(a['data']) | |
| elif a['type'] == 0x80: | |
| data_attr = a | |
| if fname == 'hello.txt' and data_attr: | |
| found = (rec_num, data_attr); break | |
| print(f" scanned {scanned:,} records ({valid:,} in-use)") | |
| if not found: | |
| print(" ❌ hello.txt not found"); return | |
| rec_num, data_attr = found | |
| print(f"\n[Hit] hello.txt @ MFT record #{rec_num} " | |
| f"({'resident' if data_attr['resident'] else 'NON-resident'} $DATA)") | |
| if data_attr['resident']: | |
| content = get_resident_data(data_attr['data']) | |
| print(f"\n[Extracted from raw disk]\n {content!r}") | |
| print(f"\n[Match] {'✅ identical' if content.decode() == marker else '❌ MISMATCH'}") | |
| else: | |
| print(" (non-resident — 解析 data runs 略)") | |
| finally: | |
| k32.CloseHandle(h) | |
| if __name__ == '__main__': | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execution Results