Skip to content

Instantly share code, notes, and snippets.

@ja7ad
Created December 9, 2024 06:13
Show Gist options
  • Save ja7ad/526b6c492f9353bd47d10244d242cbdc to your computer and use it in GitHub Desktop.
Save ja7ad/526b6c492f9353bd47d10244d242cbdc to your computer and use it in GitHub Desktop.
Pactus client swarm
package pactus
import (
"context"
"fmt"
"sync"
"time"
pactus "github.com/pactus-project/pactus/www/grpc/gen/go"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net"
)
// _defaultClientStateThreshold default set 6 based on 6 * discover_duration (5 sec) = 30 sec or 3 blocks
const _defaultClientStateThreshold = 6
type ConnManager interface {
Client() *Client
}
type Client struct {
Addr string
Blockchain pactus.BlockchainClient
Tx pactus.TransactionClient
Network pactus.NetworkClient
closeFunc func() error
}
type connManager struct {
mu sync.Mutex
clients map[string]*Client
activeAddr string
state *clientState
}
type ConnManagerOptions struct {
Threshold uint8
}
func NewConnManager(ctx context.Context, rpcs []string, options ConnManagerOptions) (ConnManager, error) {
if len(rpcs) < 1 {
return nil, fmt.Errorf("no RPCs specified")
}
if options.Threshold == 0 {
options.Threshold = _defaultClientStateThreshold
}
manager := &connManager{
clients: make(map[string]*Client),
state: newClientState(options.Threshold),
}
for _, rpc := range rpcs {
client := connect(ctx, rpc)
if client != nil {
manager.clients[rpc] = client
if manager.activeAddr == "" {
manager.activeAddr = rpc
}
}
}
if len(manager.clients) > 1 {
go manager.discover(ctx)
}
return manager, nil
}
func (c *connManager) Client() *Client {
c.mu.Lock()
defer c.mu.Unlock()
client, exists := c.clients[c.activeAddr]
if !exists {
return nil
}
return client
}
func (c *connManager) discover(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
c.closeAllClients()
return
case <-ticker.C:
c.checkAndSwitchClient(ctx)
}
}
}
func (c *connManager) checkAndSwitchClient(ctx context.Context) {
c.mu.Lock()
currentAddr := c.activeAddr
c.mu.Unlock()
client := c.clients[currentAddr]
info, err := client.Blockchain.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
if err == nil && c.state.shouldContinueWithClient(currentAddr, info.LastBlockHeight) {
return // Active client is healthy, no switch needed
}
for rpc, cli := range c.clients {
info, err := cli.Blockchain.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
if err != nil || !c.state.shouldContinueWithClient(rpc, info.LastBlockHeight) {
continue
}
c.mu.Lock()
c.activeAddr = rpc
c.mu.Unlock()
return
}
}
func (c *connManager) closeAllClients() {
c.mu.Lock()
defer c.mu.Unlock()
for _, client := range c.clients {
client.closeFunc()
}
}
func connect(ctx context.Context, rpc string) *Client {
conn, err := grpc.Dial(
rpc,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(_ context.Context, address string) (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second)
}),
)
if err != nil {
return nil
}
blockchainClient := pactus.NewBlockchainClient(conn)
transactionClient := pactus.NewTransactionClient(conn)
networkClient := pactus.NewNetworkClient(conn)
// Check client health before returning
if !isHealthy(ctx, blockchainClient) {
_ = conn.Close()
return nil
}
return &Client{
Addr: rpc,
Blockchain: blockchainClient,
Tx: transactionClient,
Network: networkClient,
closeFunc: conn.Close,
}
}
func isHealthy(ctx context.Context, bc pactus.BlockchainClient) bool {
_, err := bc.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
return err == nil
}
package pactus
type clientState struct {
threshold uint8
failureCount uint8
lastBlockHeight uint32
currentRPC string
}
func newClientState(threshold uint8) *clientState {
return &clientState{
threshold: threshold,
}
}
// Determines if the current client can continue based on block height
func (s *clientState) shouldContinueWithClient(rpc string, height uint32) bool {
// If a new RPC is selected, reset state and allow continuation
if s.currentRPC != rpc {
s.resetState(rpc, height)
return true
}
// If the block height has progressed, reset failure count and continue
if height > s.lastBlockHeight {
s.resetState(rpc, height)
return true
}
// If the failure count exceeds the threshold, disallow continuation
if s.failureCount >= s.threshold {
return false
}
// Increment failure count and allow continuation
s.failureCount++
return true
}
// Resets the client state with a new RPC and block height
func (s *clientState) resetState(rpc string, height uint32) {
s.currentRPC = rpc
s.lastBlockHeight = height
s.failureCount = 0
}
package pactus
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestClientState(t *testing.T) {
tests := []struct {
name string
threshold uint8
initialState *clientState
rpc string
height uint32
expectedResult bool
expectedState *clientState
}{
{
name: "New height on same RPC, reset state",
threshold: 3,
initialState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10},
rpc: "rpc1",
height: 12,
expectedResult: true,
expectedState: &clientState{currentRPC: "rpc1", lastBlockHeight: 12, failureCount: 0},
},
{
name: "New RPC, reset state",
threshold: 3,
initialState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10},
rpc: "rpc2",
height: 8,
expectedResult: true,
expectedState: &clientState{currentRPC: "rpc2", lastBlockHeight: 8, failureCount: 0},
},
{
name: "No progress on same RPC, increment failure count",
threshold: 3,
initialState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10},
rpc: "rpc1",
height: 10,
expectedResult: true,
expectedState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10, failureCount: 1},
},
{
name: "Failure count exceeds threshold",
threshold: 3,
initialState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10, failureCount: 3},
rpc: "rpc1",
height: 10,
expectedResult: false,
expectedState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10, failureCount: 3},
},
{
name: "Height regresses but same RPC, increment failure count",
threshold: 3,
initialState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10},
rpc: "rpc1",
height: 9,
expectedResult: true,
expectedState: &clientState{currentRPC: "rpc1", lastBlockHeight: 10, failureCount: 1},
},
{
name: "New RPC after empty state",
threshold: 3,
initialState: &clientState{},
rpc: "rpc1",
height: 15,
expectedResult: true,
expectedState: &clientState{currentRPC: "rpc1", lastBlockHeight: 15, failureCount: 0},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Arrange
state := test.initialState
state.threshold = test.threshold
// Act
result := state.shouldContinueWithClient(test.rpc, test.height)
// Assert
assert.Equal(t, test.expectedResult, result, "Result mismatch")
assert.Equal(t, test.expectedState.currentRPC, state.currentRPC, "currentRPC mismatch")
assert.Equal(t, test.expectedState.lastBlockHeight, state.lastBlockHeight, "lastBlockHeight mismatch")
assert.Equal(t, test.expectedState.failureCount, state.failureCount, "failureCount mismatch")
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment