Skip to content

Commit

Permalink
Improve Cache Performance (#343)
Browse files Browse the repository at this point in the history
* fix trie metrics

* add new prefetcher metrics

* use flush item pointer

* add warn if anything unexpected found in cache

* simplify trieproc tracking

* make DEBUG

* nits
  • Loading branch information
patrick-ogrady authored Nov 3, 2022
1 parent edd41f3 commit 7f268e4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
19 changes: 10 additions & 9 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,15 +1194,12 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
// Update the metrics touched during block processing
accountReadTimer.Inc(statedb.AccountReads.Milliseconds()) // Account reads are complete, we can mark them
storageReadTimer.Inc(statedb.StorageReads.Milliseconds()) // Storage reads are complete, we can mark them
accountUpdateTimer.Inc(statedb.AccountUpdates.Milliseconds()) // Account updates are complete, we can mark them
storageUpdateTimer.Inc(statedb.StorageUpdates.Milliseconds()) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Inc(statedb.SnapshotAccountReads.Milliseconds()) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Inc(statedb.SnapshotStorageReads.Milliseconds()) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc += statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Inc((time.Since(substart) - trieproc - triehash).Milliseconds())
blockTrieOpsTimer.Inc((trieproc + triehash).Milliseconds())
blockExecutionTimer.Inc((time.Since(substart) - trieproc).Milliseconds())

// Validate the state using the default validator
substart = time.Now()
Expand All @@ -1212,9 +1209,13 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
}

// Update the metrics touched during block validation
accountHashTimer.Inc(statedb.AccountHashes.Milliseconds()) // Account hashes are complete, we can mark them
storageHashTimer.Inc(statedb.StorageHashes.Milliseconds()) // Storage hashes are complete, we can mark them
blockStateValidationTimer.Inc((time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)).Milliseconds())
accountUpdateTimer.Inc(statedb.AccountUpdates.Milliseconds()) // Account updates are complete, we can mark them
storageUpdateTimer.Inc(statedb.StorageUpdates.Milliseconds()) // Storage updates are complete, we can mark them
accountHashTimer.Inc(statedb.AccountHashes.Milliseconds()) // Account hashes are complete, we can mark them
storageHashTimer.Inc(statedb.StorageHashes.Milliseconds()) // Storage hashes are complete, we can mark them
additionalTrieProc := statedb.AccountHashes + statedb.StorageHashes + statedb.AccountUpdates + statedb.StorageUpdates - trieproc
blockStateValidationTimer.Inc((time.Since(substart) - additionalTrieProc).Milliseconds())
blockTrieOpsTimer.Inc((trieproc + additionalTrieProc).Milliseconds())

// If [writes] are disabled, skip [writeBlockWithState] so that we do not write the block
// or the state trie to disk.
Expand Down
21 changes: 15 additions & 6 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type triePrefetcher struct {
fetches map[string]Trie // Partially or fully fetcher tries
fetchers map[string]*subfetcher // Subfetchers for each trie

deliveryMissMeter metrics.Meter
deliveryCopyMissMeter metrics.Meter
deliveryRequestMissMeter metrics.Meter
deliveryWaitMissMeter metrics.Meter

accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
Expand All @@ -68,7 +71,10 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
deliveryCopyMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/copy", nil),
deliveryRequestMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/request", nil),
deliveryWaitMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/wait", nil),

accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
Expand Down Expand Up @@ -123,7 +129,10 @@ func (p *triePrefetcher) copy() *triePrefetcher {
root: p.root,
fetches: make(map[string]Trie), // Active prefetchers use the fetches map

deliveryMissMeter: p.deliveryMissMeter,
deliveryCopyMissMeter: p.deliveryCopyMissMeter,
deliveryRequestMissMeter: p.deliveryRequestMissMeter,
deliveryWaitMissMeter: p.deliveryWaitMissMeter,

accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
Expand Down Expand Up @@ -171,15 +180,15 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
if p.fetches != nil {
trie := p.fetches[id]
if trie == nil {
p.deliveryMissMeter.Mark(1)
p.deliveryCopyMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
p.deliveryRequestMissMeter.Mark(1)
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
Expand All @@ -188,7 +197,7 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {

trie := fetcher.peek()
if trie == nil {
p.deliveryMissMeter.Mark(1)
p.deliveryWaitMissMeter.Mark(1)
return nil
}
return trie
Expand Down
33 changes: 22 additions & 11 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,21 @@ func (db *Database) EncodedNode(h common.Hash) node {
func (db *Database) node(hash common.Hash) ([]byte, *cachedNode, error) {
// Retrieve the node from the clean cache if available
if db.cleans != nil {
if enc := db.cleans.Get(nil, hash[:]); enc != nil {
memcacheCleanHitMeter.Mark(1)
memcacheCleanReadMeter.Mark(int64(len(enc)))
return enc, nil, nil
k := hash[:]
enc, found := db.cleans.HasGet(nil, k)
if found {
if len(enc) > 0 {
memcacheCleanHitMeter.Mark(1)
memcacheCleanReadMeter.Mark(int64(len(enc)))
return enc, nil, nil
} else {
// Delete anything from cache that may have been added incorrectly
//
// This will prevent a panic as callers of this function assume the raw
// or cached node is populated.
log.Debug("removing empty value found in cleans cache", "k", k)
db.cleans.Del(k)
}
}
}
// Retrieve the node from the dirty cache if available
Expand All @@ -414,7 +425,7 @@ func (db *Database) node(hash common.Hash) ([]byte, *cachedNode, error) {

// Content unavailable in memory, attempt to retrieve from disk
enc := rawdb.ReadTrieNode(db.diskdb, hash)
if len(enc) != 0 {
if len(enc) > 0 {
if db.cleans != nil {
db.cleans.Set(hash[:], enc)
memcacheCleanMissMeter.Mark(1)
Expand Down Expand Up @@ -561,7 +572,7 @@ type flushItem struct {
// writeFlushItems writes all items in [toFlush] to disk in batches of
// [ethdb.IdealBatchSize]. This function does not access any variables inside
// of [Database] and does not need to be synchronized.
func (db *Database) writeFlushItems(toFlush []flushItem) error {
func (db *Database) writeFlushItems(toFlush []*flushItem) error {
batch := db.diskdb.NewBatch()
for _, item := range toFlush {
rlp := item.node.rlp()
Expand Down Expand Up @@ -618,12 +629,12 @@ func (db *Database) Cap(limit common.StorageSize) error {
}

// Keep removing nodes from the flush-list until we're below allowance
toFlush := make([]flushItem, 0, 128)
toFlush := make([]*flushItem, 0, 128)
oldest := db.oldest
for pendingSize > limit && oldest != (common.Hash{}) {
// Fetch the oldest referenced node and push into the batch
node := db.dirties[oldest]
toFlush = append(toFlush, flushItem{oldest, node, nil})
toFlush = append(toFlush, &flushItem{oldest, node, nil})

// Iterate to the next flush item, or abort if the size cap was achieved. Size
// is the total size, including the useful cached data (hash -> blob), the
Expand Down Expand Up @@ -689,7 +700,7 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H
db.lock.RLock()
lockStart := time.Now()
nodes, storage := len(db.dirties), db.dirtiesSize
toFlush, err := db.commit(node, make([]flushItem, 0, 128), callback)
toFlush, err := db.commit(node, make([]*flushItem, 0, 128), callback)
if err != nil {
log.Error("Failed to commit trie from trie database", "err", err)
return err
Expand Down Expand Up @@ -739,7 +750,7 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H
//
// [callback] will be invoked as soon as it is determined a trie node will be
// flushed to disk (before it is actually written).
func (db *Database) commit(hash common.Hash, toFlush []flushItem, callback func(common.Hash)) ([]flushItem, error) {
func (db *Database) commit(hash common.Hash, toFlush []*flushItem, callback func(common.Hash)) ([]*flushItem, error) {
// If the node does not exist, it's a previously committed node
node, ok := db.dirties[hash]
if !ok {
Expand All @@ -757,7 +768,7 @@ func (db *Database) commit(hash common.Hash, toFlush []flushItem, callback func(
// By processing the children of each node before the node itself, we ensure
// that children are committed before their parents (an invariant of this
// package).
toFlush = append(toFlush, flushItem{hash, node, nil})
toFlush = append(toFlush, &flushItem{hash, node, nil})
if callback != nil {
callback(hash)
}
Expand Down

0 comments on commit 7f268e4

Please sign in to comment.