Skip to content

Commit

Permalink
feat(cmgr): caching all peers info on the network
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy committed Feb 16, 2024
1 parent f008314 commit 365d24f
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 331 deletions.
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ linters:
- unused
- godox
- gocritic
- gci
- containedctx
- gci
- contextcheck
- exhaustive
- wastedassign
Expand Down
46 changes: 23 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,33 @@ func NewClient(endpoint string) (*Client, error) {
}, nil
}

func (c *Client) GetBlockchainInfo() (*pactus.GetBlockchainInfoResponse, error) {
blockchainInfo, err := c.blockchainClient.GetBlockchainInfo(context.Background(), &pactus.GetBlockchainInfoRequest{})
func (c *Client) GetBlockchainInfo(ctx context.Context) (*pactus.GetBlockchainInfoResponse, error) {
blockchainInfo, err := c.blockchainClient.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
if err != nil {
return nil, err
}
return blockchainInfo, nil
}

func (c *Client) GetBlockchainHeight() (uint32, error) {
blockchainInfo, err := c.blockchainClient.GetBlockchainInfo(context.Background(), &pactus.GetBlockchainInfoRequest{})
func (c *Client) GetBlockchainHeight(ctx context.Context) (uint32, error) {
blockchainInfo, err := c.blockchainClient.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
if err != nil {
return 0, err
}
return blockchainInfo.LastBlockHeight, nil
}

func (c *Client) GetNetworkInfo() (*pactus.GetNetworkInfoResponse, error) {
networkInfo, err := c.networkClient.GetNetworkInfo(context.Background(), &pactus.GetNetworkInfoRequest{})
func (c *Client) GetNetworkInfo(ctx context.Context) (*pactus.GetNetworkInfoResponse, error) {
networkInfo, err := c.networkClient.GetNetworkInfo(ctx, &pactus.GetNetworkInfoRequest{})
if err != nil {
return nil, err
}

return networkInfo, nil
}

func (c *Client) GetPeerInfo(address string) (*pactus.PeerInfo, error) {
networkInfo, _ := c.GetNetworkInfo()
func (c *Client) GetPeerInfo(ctx context.Context, address string) (*pactus.PeerInfo, error) {
networkInfo, _ := c.GetNetworkInfo(ctx)
if networkInfo != nil {
for _, p := range networkInfo.ConnectedPeers {
for _, addr := range p.ConsensusAddress {
Expand All @@ -75,8 +75,8 @@ func (c *Client) GetPeerInfo(address string) (*pactus.PeerInfo, error) {
return nil, errors.New("peer does not exist")
}

func (c *Client) GetValidatorInfo(address string) (*pactus.GetValidatorResponse, error) {
validator, err := c.blockchainClient.GetValidator(context.Background(),
func (c *Client) GetValidatorInfo(ctx context.Context, address string) (*pactus.GetValidatorResponse, error) {
validator, err := c.blockchainClient.GetValidator(ctx,
&pactus.GetValidatorRequest{Address: address})
if err != nil {
return nil, err
Expand All @@ -85,8 +85,8 @@ func (c *Client) GetValidatorInfo(address string) (*pactus.GetValidatorResponse,
return validator, nil
}

func (c *Client) GetValidatorInfoByNumber(num int32) (*pactus.GetValidatorResponse, error) {
validator, err := c.blockchainClient.GetValidatorByNumber(context.Background(),
func (c *Client) GetValidatorInfoByNumber(ctx context.Context, num int32) (*pactus.GetValidatorResponse, error) {
validator, err := c.blockchainClient.GetValidatorByNumber(ctx,
&pactus.GetValidatorByNumberRequest{Number: num})
if err != nil {
return nil, err
Expand All @@ -95,8 +95,8 @@ func (c *Client) GetValidatorInfoByNumber(num int32) (*pactus.GetValidatorRespon
return validator, nil
}

func (c *Client) TransactionData(hash string) (*pactus.TransactionInfo, error) {
data, err := c.transactionClient.GetTransaction(context.Background(),
func (c *Client) TransactionData(ctx context.Context, hash string) (*pactus.TransactionInfo, error) {
data, err := c.transactionClient.GetTransaction(ctx,
&pactus.GetTransactionRequest{
Id: []byte(hash),
Verbosity: pactus.TransactionVerbosity_TRANSACTION_DATA,
Expand All @@ -108,38 +108,38 @@ func (c *Client) TransactionData(hash string) (*pactus.TransactionInfo, error) {
return data.GetTransaction(), nil
}

func (c *Client) LastBlockTime() (uint32, uint32, error) {
info, err := c.blockchainClient.GetBlockchainInfo(context.Background(), &pactus.GetBlockchainInfoRequest{})
func (c *Client) LastBlockTime(ctx context.Context) (uint32, uint32, error) {
info, err := c.blockchainClient.GetBlockchainInfo(ctx, &pactus.GetBlockchainInfoRequest{})
if err != nil {
return 0, 0, err
}

lastBlockTime, err := c.blockchainClient.GetBlock(context.Background(), &pactus.GetBlockRequest{
lastBlockTime, err := c.blockchainClient.GetBlock(ctx, &pactus.GetBlockRequest{
Height: info.LastBlockHeight,
Verbosity: pactus.BlockVerbosity_BLOCK_INFO,
})

return lastBlockTime.BlockTime, info.LastBlockHeight, err
}

func (c *Client) GetNodeInfo() (*pactus.GetNodeInfoResponse, error) {
info, err := c.networkClient.GetNodeInfo(context.Background(), &pactus.GetNodeInfoRequest{})
func (c *Client) GetNodeInfo(ctx context.Context) (*pactus.GetNodeInfoResponse, error) {
info, err := c.networkClient.GetNodeInfo(ctx, &pactus.GetNodeInfoRequest{})
if err != nil {
return &pactus.GetNodeInfoResponse{}, err
}

return info, err
}

func (c *Client) GetTransactionData(txID string) (*pactus.GetTransactionResponse, error) {
return c.transactionClient.GetTransaction(context.Background(), &pactus.GetTransactionRequest{
func (c *Client) GetTransactionData(ctx context.Context, txID string) (*pactus.GetTransactionResponse, error) {
return c.transactionClient.GetTransaction(ctx, &pactus.GetTransactionRequest{
Id: []byte(txID),
Verbosity: pactus.TransactionVerbosity_TRANSACTION_DATA,
})
}

func (c *Client) GetBalance(address string) (int64, error) {
account, err := c.blockchainClient.GetAccount(context.Background(), &pactus.GetAccountRequest{
func (c *Client) GetBalance(ctx context.Context, address string) (int64, error) {
account, err := c.blockchainClient.GetAccount(ctx, &pactus.GetAccountRequest{
Address: address,
})
if err != nil {
Expand Down
138 changes: 99 additions & 39 deletions client/client_mgr.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,100 @@
package client

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/kehiy/RoboPac/log"
"github.com/pactus-project/pactus/util/logger"
pactus "github.com/pactus-project/pactus/www/grpc/gen/go"
)

type Mgr struct {
valMapLock sync.RWMutex
valMap map[string]*pactus.PeerInfo

ctx context.Context
cancel context.CancelFunc
clients []IClient
}

func NewClientMgr() *Mgr {
func NewClientMgr(ctx context.Context, cnl context.CancelFunc) *Mgr {
return &Mgr{
clients: make([]IClient, 0),
clients: make([]IClient, 0),
valMap: make(map[string]*pactus.PeerInfo),
valMapLock: sync.RWMutex{},
ctx: ctx,
cancel: cnl,
}
}

func (cm *Mgr) Start() {
ticker := time.NewTicker(5 * time.Minute)

go func() {
for {
select {
case <-cm.ctx.Done():
return

case <-ticker.C:
logger.Info("updating validator map started")
cm.updateValMap()
}
}
}()

cm.updateValMap()
}

func (cm *Mgr) Stop() {
for addr, c := range cm.clients {
if err := c.Close(); err != nil {
log.Error("could not close connection to RPC node", "err", err, "RPCAddr", addr)
}
}
}

func (cm *Mgr) updateValMap() {
freshValMap := make(map[string]*pactus.PeerInfo)

for _, c := range cm.clients {
networkInfo, err := c.GetNetworkInfo(cm.ctx)
if err != nil {
continue
}

if networkInfo == nil {
logger.Warn("network info is nil")
continue
}

for _, p := range networkInfo.ConnectedPeers {
for _, addr := range p.ConsensusAddress {
current := freshValMap[addr]
if current != nil {
if current.LastSent < p.LastSent {
freshValMap[addr] = p
}
} else {
freshValMap[addr] = p
}
}
}
}

cm.valMapLock.Lock()
clear(cm.valMap)
cm.valMap = freshValMap
cm.valMapLock.Unlock()

logger.Info("validator map updated successfully")
}

// AddClient should call before Start.
func (cm *Mgr) AddClient(c IClient) {
cm.clients = append(cm.clients, c)
}
Expand All @@ -37,7 +114,7 @@ func (cm *Mgr) GetRandomClient() IClient {

func (cm *Mgr) GetBlockchainInfo() (*pactus.GetBlockchainInfoResponse, error) {
localClient := cm.getLocalClient()
info, err := localClient.GetBlockchainInfo()
info, err := localClient.GetBlockchainInfo(cm.ctx)
if err != nil {
return nil, err
}
Expand All @@ -46,7 +123,7 @@ func (cm *Mgr) GetBlockchainInfo() (*pactus.GetBlockchainInfoResponse, error) {

func (cm *Mgr) GetBlockchainHeight() (uint32, error) {
localClient := cm.getLocalClient()
height, err := localClient.GetBlockchainHeight()
height, err := localClient.GetBlockchainHeight(cm.ctx)
if err != nil {
return 0, err
}
Expand All @@ -55,7 +132,7 @@ func (cm *Mgr) GetBlockchainHeight() (uint32, error) {

func (cm *Mgr) GetLastBlockTime() (uint32, uint32) {
localClient := cm.getLocalClient()
lastBlockTime, lastBlockHeight, err := localClient.LastBlockTime()
lastBlockTime, lastBlockHeight, err := localClient.LastBlockTime(cm.ctx)
if err != nil {
return 0, 0
}
Expand All @@ -65,7 +142,7 @@ func (cm *Mgr) GetLastBlockTime() (uint32, uint32) {

func (cm *Mgr) GetNetworkInfo() (*pactus.GetNetworkInfoResponse, error) {
for _, c := range cm.clients {
info, err := c.GetNetworkInfo()
info, err := c.GetNetworkInfo(cm.ctx)
if err != nil {
continue
}
Expand Down Expand Up @@ -94,29 +171,20 @@ func (cm *Mgr) FindPublicKey(address string, firstVal bool) (string, error) {
}

func (cm *Mgr) GetPeerInfo(address string) (*pactus.PeerInfo, error) {
for _, c := range cm.clients {
networkInfo, err := c.GetNetworkInfo()
if err != nil {
continue
}
cm.valMapLock.Lock()
defer cm.valMapLock.Unlock()

if networkInfo != nil {
for _, p := range networkInfo.ConnectedPeers {
for _, addr := range p.ConsensusAddress {
if addr == address {
return p, nil
}
}
}
}
peerInfo, ok := cm.valMap[address]
if !ok {
return nil, fmt.Errorf("peer does not exist with this address: %v", address)
}

return nil, fmt.Errorf("peer does not exist with this address: %v", address)
return peerInfo, nil
}

func (cm *Mgr) GetValidatorInfo(address string) (*pactus.GetValidatorResponse, error) {
localClient := cm.getLocalClient()
val, err := localClient.GetValidatorInfo(address)
val, err := localClient.GetValidatorInfo(cm.ctx, address)
if err != nil {
return nil, err
}
Expand All @@ -125,7 +193,7 @@ func (cm *Mgr) GetValidatorInfo(address string) (*pactus.GetValidatorResponse, e

func (cm *Mgr) GetValidatorInfoByNumber(num int32) (*pactus.GetValidatorResponse, error) {
localClient := cm.getLocalClient()
val, err := localClient.GetValidatorInfoByNumber(num)
val, err := localClient.GetValidatorInfoByNumber(cm.ctx, num)
if err != nil {
return nil, err
}
Expand All @@ -134,7 +202,7 @@ func (cm *Mgr) GetValidatorInfoByNumber(num int32) (*pactus.GetValidatorResponse

func (cm *Mgr) GetTransactionData(txID string) (*pactus.GetTransactionResponse, error) {
localClient := cm.getLocalClient()
txData, err := localClient.GetTransactionData(txID)
txData, err := localClient.GetTransactionData(cm.ctx, txID)
if err != nil {
return nil, err
}
Expand All @@ -144,7 +212,7 @@ func (cm *Mgr) GetTransactionData(txID string) (*pactus.GetTransactionResponse,
func (cm *Mgr) GetCirculatingSupply() (int64, error) {
localClient := cm.getLocalClient()

height, err := localClient.GetBlockchainInfo()
height, err := localClient.GetBlockchainInfo(cm.ctx)
if err != nil {
return 0, err
}
Expand All @@ -159,44 +227,36 @@ func (cm *Mgr) GetCirculatingSupply() (int64, error) {
var addr5Out int64 = 0 // warm wallet
var addr6Out int64 = 0 // warm wallet

balance1, err := localClient.GetBalance("pc1z2r0fmu8sg2ffa0tgrr08gnefcxl2kq7wvquf8z")
balance1, err := localClient.GetBalance(cm.ctx, "pc1z2r0fmu8sg2ffa0tgrr08gnefcxl2kq7wvquf8z")
if err == nil {
addr1Out = 8_400_000_000_000_000 - balance1
}

balance2, err := localClient.GetBalance("pc1zprhnvcsy3pthekdcu28cw8muw4f432hkwgfasv")
balance2, err := localClient.GetBalance(cm.ctx, "pc1zprhnvcsy3pthekdcu28cw8muw4f432hkwgfasv")
if err == nil {
addr2Out = 6_300_000_000_000_000 - balance2
}

balance3, err := localClient.GetBalance("pc1znn2qxsugfrt7j4608zvtnxf8dnz8skrxguyf45")
balance3, err := localClient.GetBalance(cm.ctx, "pc1znn2qxsugfrt7j4608zvtnxf8dnz8skrxguyf45")
if err == nil {
addr3Out = 4_200_000_000_000_000 - balance3
}

balance4, err := localClient.GetBalance("pc1zs64vdggjcshumjwzaskhfn0j9gfpkvche3kxd3")
balance4, err := localClient.GetBalance(cm.ctx, "pc1zs64vdggjcshumjwzaskhfn0j9gfpkvche3kxd3")
if err == nil {
addr4Out = 2_100_000_000_000_000 - balance4
}

balance5, err := localClient.GetBalance("pc1zuavu4sjcxcx9zsl8rlwwx0amnl94sp0el3u37g")
balance5, err := localClient.GetBalance(cm.ctx, "pc1zuavu4sjcxcx9zsl8rlwwx0amnl94sp0el3u37g")
if err == nil {
addr5Out = 420_000_000_000_000 - balance5
}

balance6, err := localClient.GetBalance("pc1zf0gyc4kxlfsvu64pheqzmk8r9eyzxqvxlk6s6t")
balance6, err := localClient.GetBalance(cm.ctx, "pc1zf0gyc4kxlfsvu64pheqzmk8r9eyzxqvxlk6s6t")
if err == nil {
addr6Out = 210_000_000_000_000 - balance6
}

circulating := (addr1Out + addr2Out + addr3Out + addr4Out + addr5Out + addr6Out + int64(minted)) - staked - warm
return circulating, nil
}

func (cm *Mgr) Stop() {
for addr, c := range cm.clients {
if err := c.Close(); err != nil {
log.Error("could not close connection to RPC node", "err", err, "RPCAddr", addr)
}
}
}
Loading

0 comments on commit 365d24f

Please sign in to comment.