diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index 103b66404..638e4abc2 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -50,10 +50,9 @@ var ( snapshotStorageReadTimer = metrics.GetOrRegisterTimer("chain/snapshot/storage/reads", nil) snapshotCommitTimer = metrics.GetOrRegisterTimer("chain/snapshot/commits", nil) - blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil) - blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil) - blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil) - blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil) + blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil) + blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil) + blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil) _ = metrics.GetOrRegisterMeter("chain/reorg/executes", nil) _ = metrics.GetOrRegisterMeter("chain/reorg/add", nil) @@ -258,7 +257,7 @@ func consensusCallbackBeginBlockFn( } evmProcessor := blockProc.EVMModule.Start(blockCtx, statedb, evmStateReader, onNewLogAll, es.Rules) - substart := time.Now() + startOfBlockExecution := time.Now() // Execute pre-internal transactions preInternalTxs := blockProc.PreTxTransactor.PopInternalTxs(blockCtx, bs, es, sealing, statedb) @@ -311,6 +310,20 @@ func consensusCallbackBeginBlockFn( _ = evmProcessor.Execute(txs, false) + // Update the metrics touched during block processing + accountReadTimer.Update(statedb.AccountReads) + storageReadTimer.Update(statedb.StorageReads) + accountUpdateTimer.Update(statedb.AccountUpdates) + storageUpdateTimer.Update(statedb.StorageUpdates) + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) + triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation + trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates + blockExecutionTimer.Update(time.Since(startOfBlockExecution) - trieproc - triehash) + + startOfBlockWriting := time.Now() + evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() block.SkippedTxs = skippedTxs block.Root = hash.Hash(evmBlock.Root) @@ -387,21 +400,9 @@ func consensusCallbackBeginBlockFn( updateLowestBlockToFill(blockCtx.Idx, store) updateLowestEpochToFill(es.Epoch, store) - // Update the metrics touched during block processing - accountReadTimer.Update(statedb.AccountReads) - storageReadTimer.Update(statedb.StorageReads) - accountUpdateTimer.Update(statedb.AccountUpdates) - storageUpdateTimer.Update(statedb.StorageUpdates) - snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) - snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) - triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation - trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates - trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates - blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) // Update the metrics touched during block validation accountHashTimer.Update(statedb.AccountHashes) storageHashTimer.Update(statedb.StorageHashes) - blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)) // Update the metrics touched by new block headBlockGauge.Update(int64(blockCtx.Idx)) headHeaderGauge.Update(int64(blockCtx.Idx)) @@ -424,7 +425,7 @@ func consensusCallbackBeginBlockFn( accountCommitTimer.Update(statedb.AccountCommits) storageCommitTimer.Update(statedb.StorageCommits) snapshotCommitTimer.Update(statedb.SnapshotCommits) - blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) + blockWriteTimer.Update(time.Since(startOfBlockWriting) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) blockInsertTimer.UpdateSince(start) now := time.Now() diff --git a/gossip/c_event_callbacks.go b/gossip/c_event_callbacks.go index ba96b2058..76aaee4e1 100644 --- a/gossip/c_event_callbacks.go +++ b/gossip/c_event_callbacks.go @@ -1,15 +1,18 @@ package gossip import ( + "context" "errors" "math/big" "sync/atomic" + "time" "github.com/Fantom-foundation/lachesis-base/gossip/dagprocessor" "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/inter/dag" "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/Fantom-foundation/go-opera/eventcheck" "github.com/Fantom-foundation/go-opera/eventcheck/epochcheck" @@ -28,6 +31,10 @@ var ( errDirtyEvmSnap = errors.New("EVM snapshot is dirty") ) +var ( + blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil) +) + func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) error { // set some unique ID e.SetID(s.uniqueEventIDs.sample()) @@ -69,7 +76,7 @@ func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) err } // processSavedEvent performs processing which depends on event being saved in DB -func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochState) error { +func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error { err := s.dagIndexer.Add(e) if err != nil { return err @@ -80,18 +87,26 @@ func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochS return errWrongMedianTime } + begin := ctx.Value("startOfValidation").(time.Time) + blockValidationTimer.Update(time.Since(begin)) + // aBFT processing - return s.engine.Process(e) + err = s.engine.Process(e) + if err != nil { + return err + } + + return nil } // saveAndProcessEvent deletes event in a case if it fails validation during event processing -func (s *Service) saveAndProcessEvent(e *inter.EventPayload, es *iblockproc.EpochState) error { +func (s *Service) saveAndProcessEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error { fixEventTxHashes(e) // indexing event s.store.SetEvent(e) defer s.dagIndexer.DropNotFlushed() - err := s.processSavedEvent(e, es) + err := s.processSavedEvent(ctx, e, es) if err != nil { s.store.DelEvent(e.ID()) return err @@ -182,6 +197,8 @@ func (s *Service) processEvent(e *inter.EventPayload) error { atomic.StoreUint32(&s.eventBusyFlag, 1) defer atomic.StoreUint32(&s.eventBusyFlag, 0) + ctx := context.WithValue(context.Background(), "startOfValidation", time.Now()) + // repeat the checks under the mutex which may depend on volatile data if s.store.HasEvent(e.ID()) { return eventcheck.ErrAlreadyConnectedEvent @@ -211,7 +228,7 @@ func (s *Service) processEvent(e *inter.EventPayload) error { return err } - err = s.saveAndProcessEvent(e, &es) + err = s.saveAndProcessEvent(ctx, e, &es) if err != nil { return err }