Skip to content

Instantly share code, notes, and snippets.

@EdsonAlcala
Created April 8, 2025 17:08
Show Gist options
  • Save EdsonAlcala/a1af32ad7e7622eee6b35421467e76d5 to your computer and use it in GitHub Desktop.
Save EdsonAlcala/a1af32ad7e7622eee6b35421467e76d5 to your computer and use it in GitHub Desktop.
import hashlib
from base58 import b58encode
from cdp_agentkit_core.actions.near.segwit_addr import bech32_encode, convertbits
from eth_hash.auto import keccak
def get_evm_address(public_key: bytes) -> str:
"""
Computes the Ethereum address from an uncompressed public key.
The public key is expected to be 65 bytes with a leading 0x04.
According to Ethereum specifications, the 0x04 is dropped and the
Keccak-256 hash is computed on the remaining 64 bytes (the X and Y coordinates).
The last 20 bytes of the hash are used as the Ethereum address.
:param public_key: Uncompressed public key (65 bytes)
:return: Ethereum address as a hex string (prefixed with "0x")
"""
# Drop the 0x04 prefix if present
if public_key[0] == 0x04:
pubkey_no_prefix = public_key[1:]
else:
pubkey_no_prefix = public_key
# Calcula el hash Keccak directamente sobre los datos
hash_bytes = keccak(pubkey_no_prefix)
eth_address = hash_bytes[-20:]
return "0x" + eth_address.hex()
def get_btc_legacy_address(public_key: bytes, network: str = 'bitcoin') -> str:
"""
Computes the Bitcoin legacy (P2PKH) address from a public key.
Steps:
1. Compute SHA256, then RIPEMD160 of the public key.
2. Prepend the version byte: 0x00 for Bitcoin mainnet, 0x6f for testnet.
3. Compute the checksum: first 4 bytes of double SHA256 of the payload.
4. Concatenate payload and checksum, then encode using Base58.
:param public_key: Public key bytes (can be uncompressed or compressed)
:param network: 'bitcoin' (mainnet) or 'testnet'
:return: Base58Check encoded Bitcoin legacy address.
"""
sha256_hash = hashlib.sha256(public_key).digest()
ripemd160_hash = hashlib.new('ripemd160', sha256_hash).digest()
version_byte = b'\x00' if network == 'bitcoin' else b'\x6f'
payload = version_byte + ripemd160_hash
# Compute checksum: first 4 bytes of double SHA256 of the payload
checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
full_payload = payload + checksum
address = b58encode(full_payload).decode()
return address
def get_btc_segwit_address(public_key: bytes, network: str = 'bitcoin') -> str:
"""
Computes the Bitcoin SegWit (P2WPKH) address from a public key.
Steps:
1. Compute SHA256, then RIPEMD160 of the public key.
2. Convert the resulting 20-byte hash (witness program) from 8-bit to 5-bit groups.
3. Prepend the witness version (0 for P2WPKH).
4. Encode using Bech32 with HRP: "bc" for mainnet, "tb" for testnet.
:param public_key: Public key bytes.
:param network: 'bitcoin' (mainnet) or 'testnet'
:return: SegWit address in Bech32 format.
"""
sha256_hash = hashlib.sha256(public_key).digest()
ripemd160_hash = hashlib.new('ripemd160', sha256_hash).digest()
witness_version = 0
# Convert the 20-byte hash to 5-bit groups.
converted = convertbits(list(ripemd160_hash), 8, 5, True)
if converted is None:
raise ValueError("Error converting hash to 5-bit groups for Bech32 encoding")
data = [witness_version] + converted
hrp = 'bc' if network == 'bitcoin' else 'tb'
segwit_addr = bech32_encode(hrp, data, "bech32")
return segwit_addr
# ------------------ Pytest Tests ------------------
def test_get_evm_address():
public_key_hex = (
"04e612e7650febebc50b448bf790f6bdd70a8a6ce3b111a1d7e72c87afe84be7"
"76e36226e3f89de1ba3cbb62c0f3fc05bffae672c9c59d5fa8a4737b6547c64eb7"
)
public_key = bytes.fromhex(public_key_hex)
evm_addr = get_evm_address(public_key)
expected_evm_addr = "0xd8d25820c9b9e2aa9cce55504355e500efcce715"
assert evm_addr == expected_evm_addr, f"Expected {expected_evm_addr}, got {evm_addr}"
def test_get_btc_legacy_address():
public_key_hex = (
"04e612e7650febebc50b448bf790f6bdd70a8a6ce3b111a1d7e72c87afe84be7"
"76e36226e3f89de1ba3cbb62c0f3fc05bffae672c9c59d5fa8a4737b6547c64eb7"
)
public_key = bytes.fromhex(public_key_hex)
# For testnet, the expected legacy address (Base58Check) might be known.
# Here, as an example, we check that the address starts with "m" or "n" (typical for testnet).
legacy_addr = get_btc_legacy_address(public_key, network='testnet')
assert legacy_addr[0] in ('m', 'n'), f"Unexpected testnet legacy address: {legacy_addr}"
def test_get_btc_segwit_address():
public_key_hex = (
"04e612e7650febebc50b448bf790f6bdd70a8a6ce3b111a1d7e72c87afe84be7"
"76e36226e3f89de1ba3cbb62c0f3fc05bffae672c9c59d5fa8a4737b6547c64eb7"
)
public_key = bytes.fromhex(public_key_hex)
segwit_addr = get_btc_segwit_address(public_key, network='testnet')
# For testnet, segwit addresses typically start with "tb1"
assert segwit_addr.startswith("tb1"), f"Unexpected testnet segwit address: {segwit_addr}"
DEFAULT_PATH = "account-1"
DEFAULT_KEY_VERSION = 0
# https://docs.near.org/build/chain-abstraction/chain-signatures/#1-deriving-the-foreign-address
MPC_SIGNER_TESTNET = "v1.signer-prod.testnet"
MPC_SIGNER_MAINNET = "v1.signer"
ROOT_PUBLIC_KEY_TESTNET = "secp256k1:4NfTiv3UsGahebgTaHyD9vF8KYKMBnfd6kh94mK6xv8fGBiJB8TBtFMP5WWXz6B89Ac1fbpzPwAvoyQebemHFwx3"
ROOT_PUBLIC_KEY_MAINNET = "secp256k1:3tFRbMqmoa6AAALMrEFAYCEoHcqKxeW38YptwowBVBtXK1vo36HDbUWuR6EZmoK4JcH6HDkNMGGqP1ouV7VZUWya"
# https://github.com/Omni-rs/omni-box/blob/main/src/utils/address.rs#L47
EPSILON_DERIVATION_PREFIX = "near-mpc-recovery v0.1.0 epsilon derivation:"
NEAR_TESTNET_NETWORK_ID = "near-testnet"
NEAR_MAINNET_NETWORK_ID = "near-mainnet"
from collections.abc import Callable
from cdp import Wallet
from pydantic import BaseModel, Field, field_validator
from cdp_agentkit_core.actions import CdpAction
from cdp_agentkit_core.actions.near.kdf import get_derived_public_key
from cdp_agentkit_core.actions.near.constants import DEFAULT_PATH
from cdp_agentkit_core.actions.near.supported_address_types import SUPPORTED_ADDRESS_TYPES
from cdp_agentkit_core.actions.near.address import get_evm_address, get_btc_legacy_address, get_btc_segwit_address
GET_CROSS_CHAIN_ADDRESS_PROMPT = """
This tool computes a cross chain address of a particular type using the derivation path, network, NEAR account id and the type of address, returning the result in hex string format.
The derived address is compatible with ECDSA and can be used to interact with contracts or perform transactions on the specified chain.
"""
class GetCrossChainAddressInput(BaseModel):
"""Input argument schema for get cross chain address action."""
account_id: str | None = Field(
None,
description="The NEAR account id. If not provided, uses the wallet's default address.",
)
network: str | None = Field(
None,
description="The NEAR network. If not provided, uses the wallet's network id, which defaults to `near-mainnet`.",
)
path: str | None = Field(
None,
description="The derivation path to compute the public key, e.g. `Ethereum-1`. If not provided, uses the default derivation path `account-1`.",
)
address_type: str = Field(
...,
description="The address type based on the target chain and type of address for networks like Bitcoin and Ethereum (e.g., `evm` or `bitcoin-mainnet-legacy`).",
)
@field_validator("address_type")
def validate_chain(cls, value):
"""Validate the address type against the supported address types."""
# Ensure the address type is supported
if value not in SUPPORTED_ADDRESS_TYPES:
raise ValueError(
f"Unsupported address type: {value}. Supported address types are: {', '.join(SUPPORTED_ADDRESS_TYPES)}."
)
return value
def get_cross_chain_address(wallet: Wallet, account_id: str | None, network: str | None, path: str | None, address_type: str) -> str:
"""Computes an address for a specific chain using the account id, network, derivation path, and chain identifier.
"""
check_account_id = account_id if account_id is not None else wallet.default_address.address_id
check_path = path if path is not None else DEFAULT_PATH
check_network = network if network is not None else wallet.network_id
public_key = None
address = None
try:
public_key = get_derived_public_key(check_account_id, check_path, check_network)
match address_type:
case "evm":
address = get_evm_address(public_key)
case "bitcoin-mainnet-legacy":
address = get_btc_legacy_address(public_key, "bitcoin")
case "bitcoin-mainnet-segwit":
address = get_btc_segwit_address(public_key, "bitcoin")
case "bitcoin-testnet-legacy":
address = get_btc_legacy_address(public_key, "testnet")
case "bitcoin-testnet-segwit":
address = get_btc_segwit_address(public_key, "testnet")
except Exception as e:
return f"Error generating the address: {e!s}"
return f"Generated cross chain address of type {address_type} for account id {account_id}, network {network} and derivation path {path} is {address}."
class GetCrossChainAddressAction(CdpAction):
"""Get Cross Chain Address action."""
name: str = "get_cross_chain_address"
description: str = GET_CROSS_CHAIN_ADDRESS_PROMPT
args_schema: type[BaseModel] | None = GetCrossChainAddressInput
func: Callable[..., str] = get_cross_chain_address
from collections.abc import Callable
from cdp import Wallet
from pydantic import BaseModel, Field, field_validator
from cdp_agentkit_core.actions import CdpAction
from cdp_agentkit_core.actions.near.kdf import get_derived_public_key
from cdp_agentkit_core.actions.near.constants import DEFAULT_PATH
GET_CROSS_CHAIN_PUBLIC_KEY_PROMPT = """
This tool computes a public key using the chain signature key derivation function, a given derivation path, network and a NEAR account id, returning the result in hex string format.
The resulted public key is the key the user can sign for via chain signatures and can be further converted into a valid ECDSA address for any supported chain.
"""
class GetCrossChainPublicKeyInput(BaseModel):
"""Input argument schema for get cross chain public key action."""
account_id: str | None = Field(
None,
description="The NEAR account id. If not provided, uses the wallet's default address.",
)
network: str | None = Field(
None,
description="The NEAR network. If not provided, uses the wallet's network id, which defaults to `near-mainnet`.",
)
path: str | None = Field(
None,
description="The derivation path to compute the public key, e.g. `Ethereum-1`. If not provided, uses the default derivation path `account-1`.",
)
@field_validator("network")
def validate_chain(cls, value):
"""Validate the NEAR network identifier."""
supported_networks = {
"near-mainnet",
"near-testnet",
}
# ensure the network is supported
if value not in supported_networks:
raise ValueError(
f"Unsupported network: {value}. Supported NEAR networks are: {', '.join(supported_networks)}."
)
return value
def get_cross_chain_public_key(wallet: Wallet, account_id: str | None, network: str | None, path: str | None) -> str:
"""Computes a public key using the chain signature key derivation function and a given derivation path, returning the result in hex string format.
"""
check_account_id = account_id if account_id is not None else wallet.default_address.address_id
check_path = path if path is not None else DEFAULT_PATH
check_network = network if network is not None else wallet.network_id
public_key = None
try:
public_key = get_derived_public_key(check_account_id, check_path, check_network)
except Exception as e:
return f"Error computing the cross chain public key {e!s}"
return f"Computed public key for account id {account_id}, network {network} and derivation path {path} is {public_key.hex()}."
class GetCrossChainPublicKeyAction(CdpAction):
"""Get Cross Chain Public Key action."""
name: str = "get_cross_chain_public_key"
description: str = GET_CROSS_CHAIN_PUBLIC_KEY_PROMPT
args_schema: type[BaseModel] | None = GetCrossChainPublicKeyInput
func: Callable[..., str] = get_cross_chain_public_key
from hashlib import sha3_256
from base58 import b58decode
from ecdsa import SECP256k1
from ecdsa.ellipticcurve import Point
from cdp_agentkit_core.actions.near.constants import EPSILON_DERIVATION_PREFIX, ROOT_PUBLIC_KEY_MAINNET, ROOT_PUBLIC_KEY_TESTNET
def get_root_public_key(network: str = "near-mainnet") -> str:
"""
Returns the root public key according to the network.
:param network: "near-mainnet" or "near-testnet" (case insensitive)
:return: The root public key string.
"""
net = network.lower()
if net == "near-testnet":
return ROOT_PUBLIC_KEY_TESTNET
elif net == "near-mainnet":
return ROOT_PUBLIC_KEY_MAINNET
else:
raise ValueError(f"Unsupported network: {network}")
def derive_epsilon(account_id: str, path: str) -> int:
"""
Deterministically derive an epsilon value from a NEAR account identifier and a derivation path.
This function constructs the following string: "near-mpc-recovery v0.1.0 epsilon derivation:<account_id>,<path>"
and applies SHA3-256 to it. The resulting 32-byte hash is then converted to an integer.
:param account_id: The NEAR account identifier (e.g., "chainsignature.near")
:param path: The derivation path (e.g., "bitcoin-1" or "ethereum-1") as a string. Defaults to "ethereum-1".
:return: The epsilon value as an integer.
"""
derivation_input = f"{EPSILON_DERIVATION_PREFIX}{account_id},{path}"
hash_bytes = sha3_256(derivation_input.encode("utf-8")).digest()
epsilon = int.from_bytes(hash_bytes, byteorder="big")
return epsilon
def derive_public_key(root_public_key_str: str, epsilon: int) -> bytes:
"""
Derives a new public key from a root public key and an epsilon value.
The root public key must be provided in the format:
"secp256k1:<Base58 encoded 64 bytes>"
Steps:
1. Remove the "secp256k1:" prefix and decode the Base58 part to obtain 64 bytes.
2. Prepend the byte 0x04 to convert it into an uncompressed public key (65 bytes).
3. Extract the X and Y coordinates and construct the EC point on the secp256k1 curve.
4. Compute the derived point as:
derived_point = root_point + (epsilon * G)
where G is the generator of the curve.
5. Return the derived public key in uncompressed format (0x04 || X || Y).
:param root_public_key_str: The root public key in the format "secp256k1:<Base58 encoded 64 bytes>"
:param epsilon: The epsilon value derived by `derive_epsilon`
:return: The derived public key as bytes (uncompressed, 65 bytes).
"""
prefix = "secp256k1:"
if not root_public_key_str.startswith(prefix):
raise ValueError("Invalid root public key format. Must start with 'secp256k1:'.")
# Extract and decode the Base58 portion.
base58_part = root_public_key_str[len(prefix):]
decoded = b58decode(base58_part)
if len(decoded) != 64:
raise ValueError("Decoded root public key must be 64 bytes long.")
# Prepend 0x04 to obtain an uncompressed public key (65 bytes).
uncompressed_pub = b'\x04' + decoded
x = int.from_bytes(uncompressed_pub[1:33], byteorder="big")
y = int.from_bytes(uncompressed_pub[33:65], byteorder="big")
# Create the root point on the secp256k1 curve.
curve = SECP256k1.curve
root_point = Point(curve, x, y)
# Get the curve generator.
generator = SECP256k1.generator
# Calculate the derived public key:
# derived_point = root_point + (epsilon * generator)
derived_point = root_point + (epsilon * generator)
# Convert the derived point to uncompressed bytes: 0x04 || X || Y.
x_derived = derived_point.x()
y_derived = derived_point.y()
derived_pub = b'\x04' + x_derived.to_bytes(32, "big") + y_derived.to_bytes(32, "big")
return derived_pub
def get_derived_public_key(account_id: str, path: str, network: str) -> bytes:
"""
Calculates the derived public key given a NEAR account, a derivation path and the network.
The function uses the constant root public key for the given network.
:param account_id: The NEAR account identifier.
:param path: The derivation path.
:param network: "mainnet" or "testnet".
:return: The derived public key as bytes (uncompressed, 65 bytes).
"""
root_public_key_str = get_root_public_key(network)
epsilon = derive_epsilon(account_id, path)
return derive_public_key(root_public_key_str, epsilon)
# ------------------ Pytest Tests ------------------
def test_derive_epsilon():
account_id = "omnitester.testnet"
path = "bitcoin-1"
epsilon = derive_epsilon(account_id, path)
assert isinstance(epsilon, int)
assert epsilon > 0
def test_derive_public_key_bitcoin():
account_id = "omnitester.testnet"
path = "bitcoin-1"
derived_pub_bytes = get_derived_public_key(account_id, path, "testnet")
derived_pub_hex = derived_pub_bytes.hex()
expected_hex = (
"0471f75dc56b971fbe52dd3e80d2f8532eb8905157556df39cb7338a67c804126"
"40c869f717217ba5b916db6d7dc7d6a84220f8251e626adad62cac9c7d6f8e032"
)
assert derived_pub_hex == expected_hex
def test_derive_public_key_ethereum():
account_id = "omnitester.testnet"
path = "ethereum-1"
derived_pub_bytes = get_derived_public_key(account_id, path, 'testnet')
derived_pub_hex = derived_pub_bytes.hex()
expected_hex = (
"04e612e7650febebc50b448bf790f6bdd70a8a6ce3b111a1d7e72c87afe84be7"
"76e36226e3f89de1ba3cbb62c0f3fc05bffae672c9c59d5fa8a4737b6547c64eb7"
)
assert derived_pub_hex == expected_hex
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment