Last active
March 23, 2026 07:36
-
-
Save geohot/76b52c51fba5fc5028ce0eeb089f879d to your computer and use it in GitHub Desktop.
ext4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Read-only ext4 filesystem. Opens device in 'rb' mode ONLY, never writes.""" | |
| import struct | |
| import stat | |
| from dataclasses import dataclass | |
| from types import TracebackType | |
| from typing import IO | |
| EXT4_SUPER_MAGIC = 0xEF53 | |
| EXT4_ROOT_INO = 2 | |
| EXT4_EXT_MAGIC = 0xF30A | |
| INCOMPAT_FILETYPE = 0x0002 | |
| INCOMPAT_64BIT = 0x0080 | |
| FT_UNKNOWN = 0 | |
| FT_REG_FILE = 1 | |
| FT_DIR = 2 | |
| def _u32(data: bytes | bytearray, offset: int) -> int: | |
| result: int = struct.unpack_from("<I", data, offset)[0] | |
| return result | |
| def _u16(data: bytes | bytearray, offset: int) -> int: | |
| result: int = struct.unpack_from("<H", data, offset)[0] | |
| return result | |
| @dataclass | |
| class Superblock: | |
| blocks_count: int = 0 | |
| block_size: int = 0 | |
| blocks_per_group: int = 0 | |
| inodes_per_group: int = 0 | |
| inode_size: int = 0 | |
| desc_size: int = 32 | |
| feature_incompat: int = 0 | |
| @property | |
| def is_64bit(self) -> bool: | |
| return bool(self.feature_incompat & INCOMPAT_64BIT) | |
| @property | |
| def has_filetype(self) -> bool: | |
| return bool(self.feature_incompat & INCOMPAT_FILETYPE) | |
| @dataclass | |
| class Inode: | |
| mode: int = 0 | |
| size: int = 0 | |
| i_block: bytes = b"" | |
| @property | |
| def is_dir(self) -> bool: | |
| return stat.S_ISDIR(self.mode) | |
| @dataclass | |
| class DirEntry: | |
| inode: int | |
| name: str | |
| file_type: int | |
| @dataclass | |
| class ExtentLeaf: | |
| logical_block: int | |
| length: int | |
| physical_start: int | |
| class Ext4Reader: | |
| def __init__(self, device_path: str) -> None: | |
| self._f: IO[bytes] = open(device_path, "rb") # READ ONLY - never write | |
| self.sb: Superblock = self._read_superblock() | |
| def close(self) -> None: | |
| self._f.close() | |
| def __enter__(self) -> "Ext4Reader": | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_val: BaseException | None, | |
| exc_tb: TracebackType | None, | |
| ) -> None: | |
| self.close() | |
| def _read_at(self, offset: int, length: int) -> bytes: | |
| self._f.seek(offset) | |
| return self._f.read(length) | |
| def _read_block(self, block_num: int) -> bytes: | |
| return self._read_at(block_num * self.sb.block_size, self.sb.block_size) | |
| def _read_superblock(self) -> Superblock: | |
| data = self._read_at(1024, 1024) | |
| if _u16(data, 0x38) != EXT4_SUPER_MAGIC: | |
| raise ValueError(f"Bad ext4 magic: 0x{_u16(data, 0x38):04x}") | |
| sb = Superblock( | |
| block_size=1024 << _u32(data, 24), | |
| blocks_per_group=_u32(data, 32), inodes_per_group=_u32(data, 40), | |
| inode_size=_u16(data, 88), feature_incompat=_u32(data, 96)) | |
| blocks_lo = _u32(data, 4) | |
| if sb.is_64bit: | |
| sb.desc_size = max(_u16(data, 254), 32) | |
| sb.blocks_count = blocks_lo | (_u32(data, 336) << 32) | |
| else: | |
| sb.desc_size = 32 | |
| sb.blocks_count = blocks_lo | |
| return sb | |
| def _read_bgd_inode_table(self, block_group: int) -> int: | |
| """Read the inode table block number for a given block group.""" | |
| sb = self.sb | |
| bgdt_start = (2 if sb.block_size == 1024 else 1) * sb.block_size | |
| data = self._read_at(bgdt_start + block_group * sb.desc_size, sb.desc_size) | |
| inode_table = _u32(data, 8) | |
| if sb.is_64bit and sb.desc_size >= 64: | |
| inode_table |= _u32(data, 40) << 32 | |
| return inode_table | |
| def _read_inode(self, ino: int) -> Inode: | |
| sb = self.sb | |
| block_group = (ino - 1) // sb.inodes_per_group | |
| index = (ino - 1) % sb.inodes_per_group | |
| offset = self._read_bgd_inode_table(block_group) * sb.block_size + index * sb.inode_size | |
| data = self._read_at(offset, sb.inode_size) | |
| return Inode( | |
| mode=_u16(data, 0), size=_u32(data, 4) | (_u32(data, 108) << 32), | |
| i_block=data[40:100]) | |
| def _walk_extent_tree(self, data: bytes | bytearray, offset: int = 0) -> list[ExtentLeaf]: | |
| if _u16(data, offset) != EXT4_EXT_MAGIC: | |
| raise ValueError(f"Bad extent magic: 0x{_u16(data, offset):04x}") | |
| num_entries = _u16(data, offset + 2) | |
| depth = _u16(data, offset + 6) | |
| extents: list[ExtentLeaf] = [] | |
| for i in range(num_entries): | |
| pos = offset + 12 + i * 12 | |
| if depth == 0: | |
| length = _u16(data, pos + 4) | |
| if length > 32768: | |
| length -= 32768 # uninitialized-extent flag | |
| physical = _u32(data, pos + 8) | (_u16(data, pos + 6) << 32) | |
| extents.append(ExtentLeaf(_u32(data, pos), length, physical)) | |
| else: | |
| child = _u32(data, pos + 4) | (_u16(data, pos + 8) << 32) | |
| extents.extend(self._walk_extent_tree(self._read_block(child))) | |
| extents.sort(key=lambda e: e.logical_block) | |
| return extents | |
| def _read_data(self, inode: Inode) -> bytes: | |
| if inode.size == 0: | |
| return b"" | |
| result = bytearray() | |
| for extent in self._walk_extent_tree(inode.i_block): | |
| for i in range(extent.length): | |
| if len(result) >= inode.size: | |
| return bytes(result[:inode.size]) | |
| result.extend(self._read_block(extent.physical_start + i)[:inode.size - len(result)]) | |
| return bytes(result[:inode.size]) | |
| def _resolve_path(self, path: str) -> Inode: | |
| path = path.strip() | |
| if not path.startswith("/"): | |
| raise ValueError(f"Path must be absolute: {path}") | |
| parts = [p for p in path.split("/") if p] | |
| current = self._read_inode(EXT4_ROOT_INO) | |
| for part in parts: | |
| if not current.is_dir: | |
| raise FileNotFoundError(f"Not a directory in path: {path}") | |
| found = next((e for e in self._list_dir(current) if e.name == part), None) | |
| if not found: | |
| raise FileNotFoundError(f"No such file or directory: {path} (missing '{part}')") | |
| current = self._read_inode(found.inode) | |
| return current | |
| def _list_dir(self, inode: Inode) -> list[DirEntry]: | |
| if not inode.is_dir: | |
| raise NotADirectoryError("Not a directory") | |
| data = self._read_data(inode) | |
| entries: list[DirEntry] = [] | |
| offset = 0 | |
| while offset + 8 <= len(data): | |
| d_inode = _u32(data, offset) | |
| rec_len = _u16(data, offset + 4) | |
| name_len = data[offset + 6] | |
| file_type = data[offset + 7] if self.sb.has_filetype else FT_UNKNOWN | |
| if rec_len == 0: | |
| break | |
| if d_inode and name_len: | |
| name = data[offset + 8 : offset + 8 + name_len].decode("utf-8", errors="replace") | |
| entries.append(DirEntry(d_inode, name, file_type)) | |
| offset += rec_len | |
| return entries | |
| def list_dir(self, path: str) -> list[DirEntry]: | |
| inode = self._resolve_path(path) | |
| if not inode.is_dir: | |
| raise NotADirectoryError(f"Not a directory: {path}") | |
| return self._list_dir(inode) | |
| def read_file(self, path: str) -> bytes: | |
| inode = self._resolve_path(path) | |
| if inode.is_dir: | |
| raise IsADirectoryError(f"Is a directory: {path}") | |
| return self._read_data(inode) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment