Skip to content

Commit

Permalink
feat:bs performance
Browse files Browse the repository at this point in the history
  • Loading branch information
constwz committed Jul 10, 2023
1 parent 76e677c commit 4937922
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 71 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace (
github.com/cometbft/cometbft => github.com/bnb-chain/greenfield-cometbft v0.0.1
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.3-alpha.1
github.com/forbole/juno/v4 => github.com/bnb-chain/juno/v4 v4.0.0-20230608053739-30bdfe1a1244
github.com/forbole/juno/v4 => github.com/bnb-chain/juno/v4 v4.0.0-20230710010441-a7c14d885a5c
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230425074444-eb5869b05fe
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230425074444-eb5869b05fe9/go.mod h1:rbc4o84RSEvhf09o2+4Qiazsv0snRJLiEZdk17HeIDw=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230425074444-eb5869b05fe9 h1:1ZdK+iR1Up02bOa2YTZCml7PBpP//kcdamOcK6aWO/s=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230425074444-eb5869b05fe9/go.mod h1:Ygz4wBHrgc7g0N+8+MrnTfS9LLn9aaTGa9hKopuym5k=
github.com/bnb-chain/juno/v4 v4.0.0-20230608053739-30bdfe1a1244 h1:IKda5sFglTzOZN5e2rOTNNLmtrvIPqZvcWNltoQMviE=
github.com/bnb-chain/juno/v4 v4.0.0-20230608053739-30bdfe1a1244/go.mod h1:EyZFqV+s8JckllSsMKQRLNBGwWterYYVBfJQl3PssaA=
github.com/bnb-chain/juno/v4 v4.0.0-20230710010441-a7c14d885a5c h1:Ws+XfElGdSv1tOquIVQkHWisamx5KOMEKT+ulzIGXTg=
github.com/bnb-chain/juno/v4 v4.0.0-20230710010441-a7c14d885a5c/go.mod h1:gMvGTQDAkXdIRyOSO1pBwzvjOq02Db82e/eFvLjuoVA=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
Expand Down
1 change: 1 addition & 0 deletions modular/blocksyncer/blocksyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
DsnBlockSyncerSwitched = "BLOCK_SYNCER_DSN_SWITCHED"
ErrDSNNotSet = errors.New("dsn config is not set in environment")
ErrBlockNotFound = errors.New("failed to get block from map need retry")
ErrHandleEvent = errors.New("failed to handle event")
)

const (
Expand Down
178 changes: 127 additions & 51 deletions modular/blocksyncer/blocksyncer_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ import (
"context"
"encoding/json"
"fmt"

Check failure on line 6 in modular/blocksyncer/blocksyncer_indexer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

File is not `goimports`-ed (goimports)

Check failure on line 6 in modular/blocksyncer/blocksyncer_indexer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

File is not `goimports`-ed (goimports)
"github.com/bnb-chain/greenfield-storage-provider/modular/blocksyncer/modules/prefixtree"
"github.com/forbole/juno/v4/modules/bucket"
"github.com/forbole/juno/v4/modules/group"
"github.com/forbole/juno/v4/modules/object"
"github.com/forbole/juno/v4/modules/payment"
"github.com/forbole/juno/v4/modules/permission"
storageprovider "github.com/forbole/juno/v4/modules/storage_provider"
"sync"
"sync/atomic"
"time"

abci "github.com/cometbft/cometbft/abci/types"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
Expand Down Expand Up @@ -41,16 +50,16 @@ type Impl struct {
DB database.Database

LatestBlockHeight atomic.Value
CatchUpFlag atomic.Value
ProcessedHeight uint64

ServiceName string
}

// ExportBlock accepts a finalized block and persists then inside the database.
// An error is returned if write fails.
func (i *Impl) ExportBlock(block *coretypes.ResultBlock, events *coretypes.ResultBlockResults, txs []*types.Tx, vals *coretypes.ResultValidators) error {
return nil
func (i *Impl) ExportBlock(block *coretypes.ResultBlock, events *coretypes.ResultBlockResults, txs []*types.Tx, getTmcValidators modules.GetTmcValidators) error {
//TODO implement me
panic("implement me")
}

// HandleEvent accepts the transaction and handles events contained inside the transaction.
Expand All @@ -75,49 +84,29 @@ func (i *Impl) Process(height uint64) error {
var events *coretypes.ResultBlockResults
var txs []*types.Tx
var err error
flagAny := i.GetCatchUpFlag().Load()
flag := flagAny.(int64)
heightKey := fmt.Sprintf("%s-%d", i.GetServiceName(), height)
if flag == -1 || flag >= int64(height) {
blockAny, okb := blockMap.Load(heightKey)
eventsAny, oke := eventMap.Load(heightKey)
txsAny, okt := txMap.Load(heightKey)
block, _ = blockAny.(*coretypes.ResultBlock)
events, _ = eventsAny.(*coretypes.ResultBlockResults)
txs, _ = txsAny.([]*types.Tx)
if !okb || !oke || !okt {
log.Warnf("failed to get map data height: %d", height)
return ErrBlockNotFound
}
} else {
// get block info
block, err = i.Node.Block(int64(height))
if err != nil {
log.Errorf("failed to get block from node: %s", err)
return err
}

// get txs
txs, err = i.Node.Txs(block)
if err != nil {
log.Errorf("failed to get transactions for block: %s", err)
return err
}

// get block results
events, err = i.Node.BlockResults(int64(height))
if err != nil {
log.Errorf("failed to get block results from node: %s", err)
return err
}
blockAny, okb := blockMap.Load(heightKey)
eventsAny, oke := eventMap.Load(heightKey)
txsAny, okt := txMap.Load(heightKey)
block, _ = blockAny.(*coretypes.ResultBlock)
events, _ = eventsAny.(*coretypes.ResultBlockResults)
txs, _ = txsAny.([]*types.Tx)
if !okb || !oke || !okt {
log.Warnf("failed to get map data height: %d", height)
return ErrBlockNotFound
}

startTime := time.Now().UnixMilli()

beginBlockEvents := events.BeginBlockEvents
endBlockEvents := events.EndBlockEvents
txCount := len(txs)
eventCount := 0

// 1. handle events in startBlock

if len(beginBlockEvents) > 0 {
eventCount += len(beginBlockEvents)
err = i.ExportEventsWithoutTx(context.Background(), block, beginBlockEvents)
if err != nil {
log.Errorf("failed to export events without tx: %s", err)
Expand All @@ -134,6 +123,7 @@ func (i *Impl) Process(height uint64) error {

// 3. handle events in endBlock
if len(endBlockEvents) > 0 {
eventCount += len(endBlockEvents)
err = i.ExportEventsWithoutTx(context.Background(), block, endBlockEvents)
if err != nil {
log.Errorf("failed to export events without tx: %s", err)
Expand All @@ -147,11 +137,20 @@ func (i *Impl) Process(height uint64) error {
return err
}

log.Infof("handle&write data cost: %d", time.Now().UnixMilli()-startTime)
log.Infof("height :%d tx count:%d event count:%c", height, txCount, eventCount)

blockMap.Delete(heightKey)
eventMap.Delete(heightKey)
txMap.Delete(heightKey)
i.ProcessedHeight = height

cost := time.Now().UnixMilli() - startTime
log.Infof("total cost: %d", cost)
metrics.ProcessBlockTime.WithLabelValues("process_block_time").Observe(float64(cost))
metrics.ProcessBlockTime.WithLabelValues("event_avg_time").Observe(float64(cost) / float64(eventCount))
metrics.ProcessBlockTime.WithLabelValues("tx_avg_time").Observe(float64(cost) / float64(txCount))

return nil
}

Expand Down Expand Up @@ -188,7 +187,7 @@ func (i *Impl) ExportValidators(block *coretypes.ResultBlock, vals *coretypes.Re

// ExportCommit accepts ResultValidators and persists validator commit signatures inside the database.
// An error is returned if write fails.
func (i *Impl) ExportCommit(block *coretypes.ResultBlock, vals *coretypes.ResultValidators) error {
func (i *Impl) ExportCommit(block *coretypes.ResultBlock, getTmcValidators modules.GetTmcValidators) error {
return nil
}

Expand All @@ -213,29 +212,110 @@ func (i *Impl) ExportEvents(ctx context.Context, block *coretypes.ResultBlock, e
return nil
}

type TxHashEvent struct {
Event sdk.Event
TxHash common.Hash
}

// ExportEventsInTxs accepts a slice of events in tx in order to save in database.
func (i *Impl) ExportEventsInTxs(ctx context.Context, block *coretypes.ResultBlock, txs []*types.Tx) error {
bucketEvent := make([]TxHashEvent, 0)
groupEvent := make([]TxHashEvent, 0)
objectEvent := make([]TxHashEvent, 0)
paymentEvent := make([]TxHashEvent, 0)
permissionEvent := make([]TxHashEvent, 0)
spEvent := make([]TxHashEvent, 0)
prefixEvent := make([]TxHashEvent, 0)

for _, tx := range txs {
txHash := common.HexToHash(tx.TxHash)
for _, event := range tx.Events {
if err := i.HandleEvent(ctx, block, txHash, sdk.Event(event)); err != nil {
return err
e := TxHashEvent{Event: sdk.Event(event), TxHash: txHash}
if bucket.BucketEvents[event.Type] {
bucketEvent = append(bucketEvent, e)
} else if group.GroupEvents[event.Type] {
groupEvent = append(groupEvent, e)
} else if object.ObjectEvents[event.Type] {
objectEvent = append(objectEvent, e)
} else if payment.PaymentEvents[event.Type] {
paymentEvent = append(paymentEvent, e)
} else if permission.PolicyEvents[event.Type] {
permissionEvent = append(permissionEvent, e)
} else if storageprovider.StorageProviderEvents[event.Type] {
spEvent = append(spEvent, e)
} else if prefixtree.BuildPrefixTreeEvents[event.Type] {
prefixEvent = append(prefixEvent, e)
}
}
}
return nil
allEvents := make([][]TxHashEvent, 0)
allEvents = append(allEvents, bucketEvent)
allEvents = append(allEvents, groupEvent)
allEvents = append(allEvents, objectEvent)
allEvents = append(allEvents, paymentEvent)
allEvents = append(allEvents, permissionEvent)
allEvents = append(allEvents, spEvent)
allEvents = append(allEvents, prefixEvent)
return i.concurrenceHandleEvent(ctx, block, allEvents)
}

func (i *Impl) concurrenceHandleEvent(ctx context.Context, block *coretypes.ResultBlock, allEvents [][]TxHashEvent) error {
wg := &sync.WaitGroup{}
wg.Add(len(allEvents))
var handleErr error
for _, events := range allEvents {
go func(event []TxHashEvent) {
defer wg.Done()
for _, e := range event {
if err := i.HandleEvent(ctx, block, e.TxHash, e.Event); err != nil {
log.Errorw("failed to HandleEvent err:%v", err)
handleErr = err
return
}
}
}(events)
}
return handleErr
}

// ExportEventsWithoutTx accepts a slice of events not in tx in order to save in database.
// events here don't have txHash
func (i *Impl) ExportEventsWithoutTx(ctx context.Context, block *coretypes.ResultBlock, events []abci.Event) error {
// call the event handlers
bucketEvent := make([]TxHashEvent, 0)
groupEvent := make([]TxHashEvent, 0)
objectEvent := make([]TxHashEvent, 0)
paymentEvent := make([]TxHashEvent, 0)
permissionEvent := make([]TxHashEvent, 0)
spEvent := make([]TxHashEvent, 0)
prefixEvent := make([]TxHashEvent, 0)

for _, event := range events {
if err := i.HandleEvent(ctx, block, common.Hash{}, sdk.Event(event)); err != nil {
return err
e := TxHashEvent{Event: sdk.Event(event)}
if bucket.BucketEvents[event.Type] {
bucketEvent = append(bucketEvent, e)
} else if group.GroupEvents[event.Type] {
groupEvent = append(groupEvent, e)
} else if object.ObjectEvents[event.Type] {
objectEvent = append(objectEvent, e)
} else if payment.PaymentEvents[event.Type] {
paymentEvent = append(paymentEvent, e)
} else if permission.PolicyEvents[event.Type] {
permissionEvent = append(permissionEvent, e)
} else if storageprovider.StorageProviderEvents[event.Type] {
spEvent = append(spEvent, e)
} else if prefixtree.BuildPrefixTreeEvents[event.Type] {
prefixEvent = append(prefixEvent, e)
}
}
return nil
allEvents := make([][]TxHashEvent, 0)
allEvents = append(allEvents, bucketEvent)
allEvents = append(allEvents, groupEvent)
allEvents = append(allEvents, objectEvent)
allEvents = append(allEvents, paymentEvent)
allEvents = append(allEvents, permissionEvent)
allEvents = append(allEvents, spEvent)
allEvents = append(allEvents, prefixEvent)
return i.concurrenceHandleEvent(ctx, block, allEvents)
}

// HandleGenesis accepts a GenesisDoc and calls all the registered genesis handlers in the order in which they have been registered.
Expand All @@ -244,10 +324,10 @@ func (i *Impl) HandleGenesis(genesisDoc *tmtypes.GenesisDoc, appState map[string
}

// HandleBlock accepts block and calls the block handlers.
func (i *Impl) HandleBlock(block *coretypes.ResultBlock, events *coretypes.ResultBlockResults, txs []*types.Tx, vals *coretypes.ResultValidators) {
func (i *Impl) HandleBlock(block *coretypes.ResultBlock, events *coretypes.ResultBlockResults, txs []*types.Tx, getTmcValidators modules.GetTmcValidators) {
for _, module := range i.Modules {
if blockModule, ok := module.(modules.BlockModule); ok {
err := blockModule.HandleBlock(block, events, txs, vals)
err := blockModule.HandleBlock(block, events, txs, getTmcValidators)
if err != nil {
log.Errorw("failed to handle event", "module", module.Name(), "height", block.Block.Height, "error", err)
}
Expand Down Expand Up @@ -304,10 +384,6 @@ func (i *Impl) GetLatestBlockHeight() *atomic.Value {

}

func (i *Impl) GetCatchUpFlag() *atomic.Value {
return &(i.CatchUpFlag)
}

func (i *Impl) CreateMasterTable() error {

return nil
Expand Down
39 changes: 24 additions & 15 deletions modular/blocksyncer/blocksyncer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (b *BlockSyncerModular) serve(ctx context.Context) {

latestBlockHeight := mustGetLatestHeight(b.parserCtx)
Cast(b.parserCtx.Indexer).GetLatestBlockHeight().Store(int64(latestBlockHeight))
Cast(b.parserCtx.Indexer).GetCatchUpFlag().Store(int64(-1))
go b.getLatestBlockHeight(ctx)

lastDbBlockHeight := uint64(0)
Expand Down Expand Up @@ -233,34 +232,44 @@ func (b *BlockSyncerModular) getLatestBlockHeight(ctx context.Context) {
func (b *BlockSyncerModular) quickFetchBlockData(startHeight uint64) {
count := uint64(b.config.Parser.Workers)
cycle := uint64(0)
startBlock := startHeight

Check failure on line 235 in modular/blocksyncer/blocksyncer_options.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

ineffectual assignment to startBlock (ineffassign)

Check failure on line 235 in modular/blocksyncer/blocksyncer_options.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

ineffectual assignment to startBlock (ineffassign)
endBlock := uint64(0)
for {
latestBlockHeightAny := Cast(b.parserCtx.Indexer).GetLatestBlockHeight().Load()
latestBlockHeight := latestBlockHeightAny.(int64)
//if latestBlockHeight < int64(count*(cycle+1)+startHeight-1) {
// log.Infof("quick fetch ended latestBlockHeight: %d", latestBlockHeight)
// Cast(b.parserCtx.Indexer).GetCatchUpFlag().Store(int64(count*cycle + startHeight - 1))
// break
//}
if latestBlockHeight == int64(endBlock) {
continue
}
if latestBlockHeight < int64(count*(cycle+1)+startHeight-1) {
log.Infof("quick fetch ended latestBlockHeight: %d", latestBlockHeight)
Cast(b.parserCtx.Indexer).GetCatchUpFlag().Store(int64(count*cycle + startHeight - 1))
break
startBlock = count*cycle + startHeight
endBlock = count*(cycle+1) + startHeight - 1
} else {
startBlock = endBlock + 1
endBlock = uint64(latestBlockHeight)
}

processedHeight := Cast(b.parserCtx.Indexer).ProcessedHeight
if processedHeight != 0 && count*cycle+startHeight-processedHeight > MaxHeightGapFactor*count {
if processedHeight != 0 && startBlock-processedHeight > MaxHeightGapFactor*count {
time.Sleep(time.Second)
continue
}
b.fetchData(count, cycle, startHeight, latestBlockHeight)
b.fetchData(startBlock, endBlock)
cycle++
}
}

func (b *BlockSyncerModular) fetchData(count, cycle, startHeight uint64, latestBlockHeight int64) {
func (b *BlockSyncerModular) fetchData(start, end uint64) {
wg := &sync.WaitGroup{}
wg.Add(int(count))
for i := uint64(0); i < count; i++ {
go func(idx, c uint64) {
wg.Add(int(end - start + 1))
for i := start; i <= end; i++ {
go func(height uint64) {
defer wg.Done()
height := idx + count*c + startHeight
if height > uint64(latestBlockHeight) {
return
}

for {
block, err := b.parserCtx.Node.Block(int64(height))
if err != nil {
Expand All @@ -284,7 +293,7 @@ func (b *BlockSyncerModular) fetchData(count, cycle, startHeight uint64, latestB
txMap.Store(heightKey, txs)
break
}
}(i, cycle)
}(i)
}
wg.Wait()
}
Expand Down
Loading

0 comments on commit 4937922

Please sign in to comment.