Skip to content

Instantly share code, notes, and snippets.

@ja7ad
Last active January 22, 2025 07:19
Show Gist options
  • Save ja7ad/4de42c4fdb184692b9d76805f33d1258 to your computer and use it in GitHub Desktop.
Save ja7ad/4de42c4fdb184692b9d76805f33d1258 to your computer and use it in GitHub Desktop.
Pactus blockchain zmq subscriber with decode message: https://pips.pactus.org/PIPs/pip-36
package main
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/www/zmq"
"log"
"strconv"
"strings"
)
func main() {
sub := zmq4.NewSub(context.Background())
if err := sub.Dial("tcp://localhost:28332"); err != nil {
log.Fatal(err)
}
if err := sub.SetOption(zmq4.OptionSubscribe, string(zmq.TopicBlockInfo.Bytes())); err != nil {
log.Fatal(err)
}
if err := sub.SetOption(zmq4.OptionSubscribe, string(zmq.TopicTransactionInfo.Bytes())); err != nil {
log.Fatal(err)
}
if err := sub.SetOption(zmq4.OptionSubscribe, string(zmq.TopicRawBlock.Bytes())); err != nil {
log.Fatal(err)
}
if err := sub.SetOption(zmq4.OptionSubscribe, string(zmq.TopicRawTransaction.Bytes())); err != nil {
log.Fatal(err)
}
for {
msg, err := sub.Recv()
if err != nil {
log.Fatal(err)
}
if msg.Frames == nil {
log.Println("Received frame with nil frames")
continue
}
rawMsg := msg.Frames[0]
topic := zmq.TopicFromBytes(rawMsg[:2])
switch topic {
case zmq.TopicBlockInfo:
go decodeBlockInfo(rawMsg[2:])
case zmq.TopicTransactionInfo:
go decodeTransactionInfo(rawMsg[2:])
case zmq.TopicRawBlock:
go decodeRawBlock(rawMsg[2:])
case zmq.TopicRawTransaction:
go decodeRawTx(rawMsg[2:])
}
}
}
// decodeBlockInfo: <Proposer Address: 21 bytes> + <Block Time in Unix: 4 bytes> + <Total Txs: 2 bytes> + <Block Number: 4 bytes>
func decodeBlockInfo(msg []byte) {
str := strings.Builder{}
rawProposer := msg[:21]
blockTime := binary.BigEndian.Uint32(msg[21:25])
totalTx := binary.BigEndian.Uint16(msg[25:27])
blockHeight := binary.BigEndian.Uint32(msg[27:31])
seqNo := binary.BigEndian.Uint32(msg[31:35])
str.WriteString("Seq No: " + strconv.Itoa(int(seqNo)) + "\n")
str.WriteString("Topic: " + zmq.TopicBlockInfo.String() + "\n")
str.WriteString("Proposer: " + string(rawProposer) + "\n")
str.WriteString("Block Time: " + strconv.Itoa(int(blockTime)) + "\n")
str.WriteString("Total Tx: " + strconv.Itoa(int(totalTx)) + "\n")
str.WriteString("Block Height: " + strconv.Itoa(int(blockHeight)) + "\n")
str.WriteString("\n========================================\n")
fmt.Println(str.String())
}
// decodeTransactionInfo: <Transaction ID: 32 bytes> + <Block Number: 4 Bytes>
func decodeTransactionInfo(msg []byte) {
str := strings.Builder{}
txHash, err := hash.FromBytes(msg[:32])
if err != nil {
log.Fatal(err)
}
blockHeight := binary.BigEndian.Uint32(msg[32:36])
seqNo := binary.BigEndian.Uint32(msg[36:40])
str.WriteString("Seq No: " + strconv.Itoa(int(seqNo)) + "\n")
str.WriteString("Topic: " + zmq.TopicTransactionInfo.String() + "\n")
str.WriteString("Block Height: " + strconv.Itoa(int(blockHeight)) + "\n")
str.WriteString("Tx Hash: " + hex.EncodeToString(txHash[:]) + "\n")
str.WriteString("\n========================================\n")
fmt.Println(str.String())
}
// decodeRawBlock: <Raw Block Header: 138 bytes> + <Block Number: 4 Bytes>
func decodeRawBlock(msg []byte) {
str := strings.Builder{}
buf := bytes.NewBuffer(msg[:138])
header := new(block.Header)
if err := header.Decode(buf); err != nil {
log.Fatal(err)
}
blockHeight := binary.BigEndian.Uint32(msg[138:142])
seqNo := binary.BigEndian.Uint32(msg[142:])
str.WriteString("Seq No: " + strconv.Itoa(int(seqNo)) + "\n")
str.WriteString("Topic: " + zmq.TopicRawBlock.String() + "\n")
str.WriteString("Block Height: " + strconv.Itoa(int(blockHeight)) + "\n")
str.WriteString("Block Header Time: " + header.Time().String() + "\n")
str.WriteString("Block Header Proposer: " + header.ProposerAddress().String() + "\n")
str.WriteString("Block Header State Root: " + header.StateRoot().String() + "\n")
str.WriteString("Block Header Prev Block Hash: " + header.PrevBlockHash().String() + "\n")
str.WriteString("\n========================================\n")
fmt.Println(str.String())
}
// decodeRawTx: <Raw Transaction: Variable Length> + <Block Number: 4 Bytes>
func decodeRawTx(msg []byte) {
str := strings.Builder{}
rawTx := msg[:len(msg)-8]
blockNumberOffset := len(msg) - 8
height := binary.BigEndian.Uint32(msg[blockNumberOffset : blockNumberOffset+4])
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])
txn, err := tx.FromBytes(rawTx)
if err != nil {
log.Fatal(err)
}
str.WriteString("Seq No: " + strconv.Itoa(int(seqNo)) + "\n")
str.WriteString("Topic: " + zmq.TopicRawTransaction.String() + "\n")
str.WriteString("Block Height: " + strconv.Itoa(int(height)) + "\n")
str.WriteString("Transaction: " + txn.String() + "\n")
str.WriteString("\n========================================\n")
fmt.Println(str.String())
}
// Output
/*
Seq No: 15058
Topic: block_info
Q5�ɔXU��ǂ}
Block Time: 1737530220
Total Tx: 2
Block Height: 2182744
========================================
Seq No: 28060
Topic: transaction_info
Block Height: 2182744
Tx Hash: fcfb49334237e3045cd9366d38db8e81ecc919b7fddfcbbe2cc10895c54066e3
========================================
Seq No: 28061
Topic: transaction_info
Block Height: 2182744
Tx Hash: 685a92fa19e581c528778d5037a5e3acc400174921a86ff6fa564a1c5c828906
========================================
Seq No: 28061
Topic: raw_transaction
Block Height: 2182744
Transaction: {⌘ 685A92FA19E5 - 2182742 🏵 {Sortition 🎯 pc1plr2skmq6}
========================================
Seq No: 28060
Topic: raw_transaction
Block Height: 2182744
Transaction: {⌘ FCFB49334237 - 2182744 🏵 {Send 💸 000000000000->pc1zdp9z737t 1 PAC}
========================================
Seq No: 15058
Topic: raw_block
Block Height: 2182744
Block Header Time: 2025-01-22 10:47:00 +0330 +0330
Block Header Proposer: pc1p0ugx5majmxss65f4alyegkz4nhdu0qnacu3cpa
Block Header State Root: 9a1365f6f757b3a17f66d8d9973fbc88ab22dd2b0dea50ac36ec7a408ff2ae7d
Block Header Prev Block Hash: 359df200375b8a29843bbffcab77298874a4c3149cbb495af88b3c4dcb6116ce
========================================
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment