Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions gossip/c_block_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ var (
snapshotStorageReadTimer = metrics.GetOrRegisterTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.GetOrRegisterTimer("chain/snapshot/commits", nil)

blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)
blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)

_ = metrics.GetOrRegisterMeter("chain/reorg/executes", nil)
_ = metrics.GetOrRegisterMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -258,7 +257,7 @@ func consensusCallbackBeginBlockFn(
}

evmProcessor := blockProc.EVMModule.Start(blockCtx, statedb, evmStateReader, onNewLogAll, es.Rules)
substart := time.Now()
startOfBlockExecution := time.Now()

// Execute pre-internal transactions
preInternalTxs := blockProc.PreTxTransactor.PopInternalTxs(blockCtx, bs, es, sealing, statedb)
Expand Down Expand Up @@ -311,6 +310,20 @@ func consensusCallbackBeginBlockFn(

_ = evmProcessor.Execute(txs, false)

// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads)
storageReadTimer.Update(statedb.StorageReads)
accountUpdateTimer.Update(statedb.AccountUpdates)
storageUpdateTimer.Update(statedb.StorageUpdates)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads)
triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(startOfBlockExecution) - trieproc - triehash)

startOfBlockWriting := time.Now()

evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize()
block.SkippedTxs = skippedTxs
block.Root = hash.Hash(evmBlock.Root)
Expand Down Expand Up @@ -387,21 +400,9 @@ func consensusCallbackBeginBlockFn(
updateLowestBlockToFill(blockCtx.Idx, store)
updateLowestEpochToFill(es.Epoch, store)

// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads)
storageReadTimer.Update(statedb.StorageReads)
accountUpdateTimer.Update(statedb.AccountUpdates)
storageUpdateTimer.Update(statedb.StorageUpdates)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads)
triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes)
storageHashTimer.Update(statedb.StorageHashes)
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
// Update the metrics touched by new block
headBlockGauge.Update(int64(blockCtx.Idx))
headHeaderGauge.Update(int64(blockCtx.Idx))
Expand All @@ -424,7 +425,7 @@ func consensusCallbackBeginBlockFn(
accountCommitTimer.Update(statedb.AccountCommits)
storageCommitTimer.Update(statedb.StorageCommits)
snapshotCommitTimer.Update(statedb.SnapshotCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockWriteTimer.Update(time.Since(startOfBlockWriting) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)

now := time.Now()
Expand Down
27 changes: 22 additions & 5 deletions gossip/c_event_callbacks.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package gossip

import (
"context"
"errors"
"math/big"
"sync/atomic"
"time"

"github.com/Fantom-foundation/lachesis-base/gossip/dagprocessor"
"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/dag"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"

"github.com/Fantom-foundation/go-opera/eventcheck"
"github.com/Fantom-foundation/go-opera/eventcheck/epochcheck"
Expand All @@ -28,6 +31,10 @@ var (
errDirtyEvmSnap = errors.New("EVM snapshot is dirty")
)

var (
blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil)
)

func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) error {
// set some unique ID
e.SetID(s.uniqueEventIDs.sample())
Expand Down Expand Up @@ -69,7 +76,7 @@ func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) err
}

// processSavedEvent performs processing which depends on event being saved in DB
func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochState) error {
func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error {
err := s.dagIndexer.Add(e)
if err != nil {
return err
Expand All @@ -80,18 +87,26 @@ func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochS
return errWrongMedianTime
}

begin := ctx.Value("startOfValidation").(time.Time)
blockValidationTimer.Update(time.Since(begin))

// aBFT processing
return s.engine.Process(e)
err = s.engine.Process(e)
if err != nil {
return err
}

return nil
}

// saveAndProcessEvent deletes event in a case if it fails validation during event processing
func (s *Service) saveAndProcessEvent(e *inter.EventPayload, es *iblockproc.EpochState) error {
func (s *Service) saveAndProcessEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error {
fixEventTxHashes(e)
// indexing event
s.store.SetEvent(e)
defer s.dagIndexer.DropNotFlushed()

err := s.processSavedEvent(e, es)
err := s.processSavedEvent(ctx, e, es)
if err != nil {
s.store.DelEvent(e.ID())
return err
Expand Down Expand Up @@ -182,6 +197,8 @@ func (s *Service) processEvent(e *inter.EventPayload) error {
atomic.StoreUint32(&s.eventBusyFlag, 1)
defer atomic.StoreUint32(&s.eventBusyFlag, 0)

ctx := context.WithValue(context.Background(), "startOfValidation", time.Now())

// repeat the checks under the mutex which may depend on volatile data
if s.store.HasEvent(e.ID()) {
return eventcheck.ErrAlreadyConnectedEvent
Expand Down Expand Up @@ -211,7 +228,7 @@ func (s *Service) processEvent(e *inter.EventPayload) error {
return err
}

err = s.saveAndProcessEvent(e, &es)
err = s.saveAndProcessEvent(ctx, e, &es)
if err != nil {
return err
}
Expand Down