Skip to content

Instantly share code, notes, and snippets.

@afa7789
Last active October 22, 2024 17:32
Show Gist options
  • Save afa7789/a4bf884caa013d5026b803abc76cc4fd to your computer and use it in GitHub Desktop.
Save afa7789/a4bf884caa013d5026b803abc76cc4fd to your computer and use it in GitHub Desktop.
With a accounts.json I can use simulate transactions and cast_rpc is the checker for the condition that it should be different.
# I have a accounts.json. with [{account:"0xaddress_public_key",private_key:"0xprivate_key"},]
# this creates the info I need to put on cdk-erigon to add balance to the accounts I want to simulate.
import json
# Load accounts data from JSON file
with open('accounts.json', 'r') as f:
data = json.load(f)
# Create a dictionary to store the new formatted data
formatted_data = {}
# Use the fixed balance "0x52B7D2DCC80CD2E4000000" for all accounts
fixed_balance = "0x52B7D2DCC80CD2E4000000"
# Process each account and format the data
for account in data: # Directly iterate over the list
address = account["account"]
# Add the formatted entry with the fixed balance
formatted_data[address] = {
"balance": fixed_balance
}
# Save the formatted data to a new JSON file
with open('formatted_accounts.json', 'w') as f:
json.dump(formatted_data, f, indent=2)
print("Formatted data has been written to formatted_accounts.json.")
#!/bin/bash
# Define the RPC URL variable (replace with your actual RPC URL)
rpc_url="http://0.0.0.0:8545"
# Initialize a counter
count=0
# Loop indefinitely
while true; do
# Make the first call to get the txpool status
txpool_status=$(cast rpc --rpc-url $rpc_url txpool_status)
# Extract the queued value using jq
queued_value=$(cast rpc --rpc-url $rpc_url txpool_content | jq -c '.queued')
queued_count=$(cast rpc --rpc-url $rpc_url txpool_content | jq '.queued | map(values | length) | add')
# Check if queued is different from "0x0" or if the second call is different from "{}"
if [[ ! "$txpool_status" == *'"queued":"0x0"'* ]] || [[ "$queued_value" != "{}" ]]; then
echo "FOUND ONE CASE"
echo $txpool_status
echo "Number of queued transactions: $queued_count"
echo -e $queued_value \n >> queeud_values.txt
echo ":)"
((count++)) # Increment the counter
# Check if the count has reached 3
if [[ $count -ge 3 ]]; then
echo "Condition met 3 times, stopping."
break # Exit the loop
fi
fi
# Wait for a specified interval before the next check (e.g., 5 seconds)
sleep 5
done
#!/bin/bash
RPC_URL=http://0.0.0.0:8545
GAS=29999999
# add your private key here
PRVK=
ADDR=0xe859276098f208D003ca6904C6cC26629Ee364Ce
BYTECODE=608060405234801561001057600080fd5b5060005b60f781101561008657600067ffffffffffffffff8111156100385761003761008c565b5b6040519080825280601f01601f19166020018201604052801561006a5781602001600182028036833780820191505090505b508051906020012050808061007e906100f4565b915050610014565b5061013c565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052604160045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b6000819050919050565b60006100ff826100ea565b91507fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8203610131576101306100bb565b5b600182019050919050565b603f8061014a6000396000f3fe6080604052600080fdfea264697066735822122014d30d6957a21314f1866f1e18d298efac8c5979efbf6d2f4a1e188e874a0b4a64736f6c63430008120033
cast send --legacy --rpc-url $RPC_URL --async --private-key $PRVK --create $BYTECODE
import json
import random
import logging
import time
from web3 import Web3
from concurrent.futures import ThreadPoolExecutor, as_completed
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load accounts data from JSON
def load_accounts(file_path):
try:
with open(file_path, 'r') as f:
accounts = json.load(f)
logging.info(f"Loaded {len(accounts)} accounts from {file_path}.")
return accounts
except Exception as e:
logging.error(f"Failed to load accounts: {e}")
exit()
# Initialize Web3 connection (to an Ethereum node via Infura or local node)
def init_web3():
w3 = Web3(Web3.HTTPProvider('http://0.0.0.0:8545'))
if not w3.is_connected():
logging.error("Failed to connect to the Ethereum node!")
exit()
logging.info("Connected to the Ethereum node.")
chain_id = w3.eth.chain_id
logging.info(f"Chain ID: {chain_id}")
return w3
# Function to send a transaction from a randomly selected sender to a randomly selected recipient
def send_transaction(accounts, nonces_used):
# Select a sender and recipient account
sender_account = random.choice(accounts)
sender = sender_account["account"]
private_key = sender_account["private_key"]
# Get the current nonce for the sender
nonce = nonces_used.get(sender, w3.eth.get_transaction_count(sender))
# Select a recipient account ensuring it's not the same as the sender
recipient_account = select_recipient(accounts, sender)
# Create and send the transaction, handling any errors
try:
txn_hash = create_and_send_transaction(sender, recipient_account, private_key, nonce)
nonces_used[sender] = nonce + 1 # Increment nonce after successful send
logging.info(f"Transaction sent from {sender} to {recipient_account}. Tx Hash: {txn_hash.hex()}")
except Exception as e:
handle_transaction_error(e, sender, recipient_account, private_key, nonce, nonces_used)
def select_recipient(accounts, sender):
"""Select a random recipient account that is not the sender."""
return random.choice([acc for acc in accounts if acc["account"] != sender])["account"]
def create_and_send_transaction(sender, recipient_account, private_key, nonce):
"""Create and send a transaction."""
transaction = {
'to': recipient_account,
'value': w3.to_wei(0.01, 'ether'), # Sending 0.01 ETH
'gas': 21000,
'gasPrice': w3.to_wei('50', 'gwei'),
'nonce': nonce,
'chainId': w3.eth.chain_id # Use actual chain ID
}
signed_txn = w3.eth.account.sign_transaction(transaction, private_key)
return w3.eth.send_raw_transaction(signed_txn.raw_transaction)
def handle_transaction_error(e, sender, recipient_account, private_key, nonce, nonces_used):
"""Handle transaction errors and implement retry logic."""
logging.error(f"Error sending transaction from {sender} to {recipient_account}: {e}")
try:
nonce += 1 # Increment nonce
txn_hash = create_and_send_transaction(sender, recipient_account, private_key, nonce)
logging.info(f"Retry successful: Transaction sent from {sender} to {recipient_account}. Tx Hash: {txn_hash.hex()}")
nonces_used[sender] = nonce + 1 # Update nonce after successful retry
except Exception as retry_e:
logging.error(f"Retry failed for transaction from {sender} to {recipient_account}: {retry_e}")
# Main function to run transactions in parallel
def main():
global w3 # Declare global variable for w3
accounts = load_accounts('accounts.json') # Load accounts
w3 = init_web3() # Initialize Web3 connection
nonces_used = {} # Dictionary to track nonces for each account
while True:
# Use ThreadPoolExecutor to send a single transaction from each worker
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit one transaction task for each worker
futures = [executor.submit(send_transaction, accounts, nonces_used) for _ in range(5)]
# Wait for all futures to complete and handle their results
for future in as_completed(futures):
try:
future.result() # This will re-raise any exception that occurred in the thread
except Exception as e:
logging.error(f"Transaction simulation failed: {e}")
# Sleep for a specified amount of time before sending more transactions
time.sleep(0.1) # Adjust the delay as needed
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment