Skip to content

Commit

Permalink
Reuse key buffers during hashing (#2902)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Apr 3, 2024
1 parent 5be62de commit d7452d3
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 55 deletions.
60 changes: 60 additions & 0 deletions x/merkledb/bytes_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "sync"

type bytesPool struct {
slots chan struct{}
bytesLock sync.Mutex
bytes [][]byte
}

func newBytesPool(numSlots int) *bytesPool {
return &bytesPool{
slots: make(chan struct{}, numSlots),
bytes: make([][]byte, 0, numSlots),
}
}

func (p *bytesPool) Acquire() []byte {
p.slots <- struct{}{}
return p.pop()
}

func (p *bytesPool) TryAcquire() ([]byte, bool) {
select {
case p.slots <- struct{}{}:
return p.pop(), true
default:
return nil, false
}
}

func (p *bytesPool) pop() []byte {
p.bytesLock.Lock()
defer p.bytesLock.Unlock()

numBytes := len(p.bytes)
if numBytes == 0 {
return nil
}

b := p.bytes[numBytes-1]
p.bytes = p.bytes[:numBytes-1]
return b
}

func (p *bytesPool) Release(b []byte) {
// Before waking anyone waiting on a slot, return the bytes.
p.bytesLock.Lock()
p.bytes = append(p.bytes, b)
p.bytesLock.Unlock()

select {
case <-p.slots:
default:
panic("release of unacquired semaphore")
}
}
46 changes: 46 additions & 0 deletions x/merkledb/bytes_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "testing"

func Benchmark_BytesPool_Acquire(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Acquire()
}
}

func Benchmark_BytesPool_Release(b *testing.B) {
s := newBytesPool(b.N)
for i := 0; i < b.N; i++ {
s.Acquire()
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Release(nil)
}
}

func Benchmark_BytesPool_TryAcquire_Success(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}

func Benchmark_BytesPool_TryAcquire_Failure(b *testing.B) {
s := newBytesPool(1)
s.Acquire()

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}
24 changes: 12 additions & 12 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"
"golang.org/x/sync/semaphore"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -216,9 +215,10 @@ type merkleDB struct {
// Valid children of this trie.
childViews []*view

// hashNodesSema controls the number of goroutines that are created inside
// [hashChangedNode] at any given time.
hashNodesSema *semaphore.Weighted
// hashNodesKeyPool controls the number of goroutines that are created
// inside [hashChangedNode] at any given time and provides slices for the
// keys needed while hashing.
hashNodesKeyPool *bytesPool

tokenSize int
}
Expand All @@ -242,9 +242,9 @@ func newDatabase(
return nil, err
}

rootGenConcurrency := uint(runtime.NumCPU())
rootGenConcurrency := runtime.NumCPU()
if config.RootGenConcurrency != 0 {
rootGenConcurrency = config.RootGenConcurrency
rootGenConcurrency = int(config.RootGenConcurrency)
}

// Share a sync.Pool of []byte between the intermediateNodeDB and valueNodeDB
Expand All @@ -270,12 +270,12 @@ func newDatabase(
bufferPool,
metrics,
int(config.ValueNodeCacheSize)),
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
hashNodesSema: semaphore.NewWeighted(int64(rootGenConcurrency)),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
hashNodesKeyPool: newBytesPool(rootGenConcurrency),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
}

if err := trieDB.initializeRoot(); err != nil {
Expand Down
131 changes: 88 additions & 43 deletions x/merkledb/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,31 +281,31 @@ func (v *view) hashChangedNodes(ctx context.Context) {
return
}

_ = v.db.hashNodesSema.Acquire(context.Background(), 1)
defer v.db.hashNodesSema.Release(1)
// If there are no children, we can avoid allocating [keyBuffer].
root := v.root.Value()
if len(root.children) == 0 {
v.changes.rootID = root.calculateID(v.db.metrics)
return
}

v.changes.rootID = v.hashChangedNode(v.root.Value())
// Allocate [keyBuffer] and populate it with the root node's key.
keyBuffer := v.db.hashNodesKeyPool.Acquire()
keyBuffer = v.setKeyBuffer(root, keyBuffer)
v.changes.rootID, keyBuffer = v.hashChangedNode(root, keyBuffer)
v.db.hashNodesKeyPool.Release(keyBuffer)
}

// Calculates the ID of all descendants of [n] which need to be recalculated,
// and then calculates the ID of [n] itself.
func (v *view) hashChangedNode(n *node) ids.ID {
// If there are no children, we can avoid allocating [keyBuffer].
if len(n.children) == 0 {
return n.calculateID(v.db.metrics)
}

// Calculate the size of the largest child key of this node. This allows
// only allocating a single slice for all of the keys.
var maxChildBitLength int
for _, childEntry := range n.children {
maxChildBitLength = max(maxChildBitLength, childEntry.compressedKey.length)
}

//
// Returns a potentially expanded [keyBuffer]. By returning this value this
// function is able to have a maximum total number of allocations shared across
// multiple invocations.
//
// Invariant: [keyBuffer] must be populated with [n]'s key and have sufficient
// length to contain any of [n]'s child keys.
func (v *view) hashChangedNode(n *node, keyBuffer []byte) (ids.ID, []byte) {
var (
maxBytesNeeded = bytesNeeded(n.key.length + v.tokenSize + maxChildBitLength)
// keyBuffer is allocated onto the heap because it is dynamically sized.
keyBuffer = make([]byte, maxBytesNeeded)
// childBuffer is allocated on the stack.
childBuffer = make([]byte, 1)
dualIndex = dualBitIndex(v.tokenSize)
Expand All @@ -318,14 +318,13 @@ func (v *view) hashChangedNode(n *node) ids.ID {
// We use [wg] to wait until all descendants of [n] have been updated.
wg waitGroup
)

if bytesForKey > 0 {
// We only need to copy this node's key once because it does not change
// as we iterate over the children.
copy(keyBuffer, n.key.value)
lastKeyByte = keyBuffer[bytesForKey-1]
}

// This loop is optimized to avoid allocations when calculating the
// [childKey] by reusing [keyBuffer] and leaving the first [bytesForKey-1]
// bytes unmodified.
for childIndex, childEntry := range n.children {
childBuffer[0] = childIndex << dualIndex
childIndexAsKey := Key{
Expand All @@ -336,17 +335,26 @@ func (v *view) hashChangedNode(n *node) ids.ID {
}

totalBitLength := n.key.length + v.tokenSize + childEntry.compressedKey.length
buffer := keyBuffer[:bytesNeeded(totalBitLength)]
// Make sure the last byte of the key is originally set correctly
// Because [keyBuffer] may have been modified in a prior iteration of
// this loop, it is not guaranteed that its length is at least
// [bytesNeeded(totalBitLength)]. However, that's fine. The below
// slicing would only panic if the buffer didn't have sufficient
// capacity.
keyBuffer = keyBuffer[:bytesNeeded(totalBitLength)]
// We don't need to copy this node's key. It's assumed to already be
// correct; except for the last byte. We must make sure the last byte of
// the key is set correctly because extendIntoBuffer may OR bits from
// the extension and overwrite the last byte. However, extendIntoBuffer
// does not modify the first [bytesForKey-1] bytes of [keyBuffer].
if bytesForKey > 0 {
buffer[bytesForKey-1] = lastKeyByte
keyBuffer[bytesForKey-1] = lastKeyByte
}
extendIntoBuffer(buffer, childIndexAsKey, n.key.length)
extendIntoBuffer(buffer, childEntry.compressedKey, n.key.length+v.tokenSize)
extendIntoBuffer(keyBuffer, childIndexAsKey, n.key.length)
extendIntoBuffer(keyBuffer, childEntry.compressedKey, n.key.length+v.tokenSize)
childKey := Key{
// It is safe to use byteSliceToString because [buffer] is not
// It is safe to use byteSliceToString because [keyBuffer] is not
// modified while [childKey] is in use.
value: byteSliceToString(buffer),
value: byteSliceToString(keyBuffer),
length: totalBitLength,
}

Expand All @@ -355,32 +363,69 @@ func (v *view) hashChangedNode(n *node) ids.ID {
// This child wasn't changed.
continue
}
childEntry.hasValue = childNodeChange.after.hasValue()

childNode := childNodeChange.after
childEntry.hasValue = childNode.hasValue()

// If there are no children of the childNode, we can avoid constructing
// the buffer for the child keys.
if len(childNode.children) == 0 {
childEntry.id = childNode.calculateID(v.db.metrics)
continue
}

// Try updating the child and its descendants in a goroutine.
if ok := v.db.hashNodesSema.TryAcquire(1); ok {
if childKeyBuffer, ok := v.db.hashNodesKeyPool.TryAcquire(); ok {
wg.Add(1)

// Passing variables explicitly through the function call rather
// than implicitly passing them through the scope of the function
// definition allows the passed variables to be allocated on the
// stack.
go func(wg *sync.WaitGroup, childEntry *child) {
childEntry.id = v.hashChangedNode(childNodeChange.after)
v.db.hashNodesSema.Release(1)
go func(wg *sync.WaitGroup, childEntry *child, childNode *node, childKeyBuffer []byte) {
childKeyBuffer = v.setKeyBuffer(childNode, childKeyBuffer)
childEntry.id, childKeyBuffer = v.hashChangedNode(childNode, childKeyBuffer)
v.db.hashNodesKeyPool.Release(childKeyBuffer)
wg.Done()
}(wg.wg, childEntry)
}(wg.wg, childEntry, childNode, childKeyBuffer)
} else {
// We're at the goroutine limit; do the work in this goroutine.
childEntry.id = v.hashChangedNode(childNodeChange.after)
//
// We can skip copying the key here because [keyBuffer] is already
// constructed to be childNode's key.
keyBuffer = v.setLengthForChildren(childNode, keyBuffer)
childEntry.id, keyBuffer = v.hashChangedNode(childNode, keyBuffer)
}
}

// Wait until all descendants of [n] have been updated.
wg.Wait()

// The IDs [n]'s descendants are up to date so we can calculate [n]'s ID.
return n.calculateID(v.db.metrics)
return n.calculateID(v.db.metrics), keyBuffer
}

// setKeyBuffer expands [keyBuffer] to have sufficient size for any of [n]'s
// child keys and populates [n]'s key into [keyBuffer]. If [keyBuffer] already
// has sufficient size, this function will not perform any memory allocations.
func (v *view) setKeyBuffer(n *node, keyBuffer []byte) []byte {
keyBuffer = v.setLengthForChildren(n, keyBuffer)
copy(keyBuffer, n.key.value)
return keyBuffer
}

// setLengthForChildren expands [keyBuffer] to have sufficient size for any of
// [n]'s child keys.
func (v *view) setLengthForChildren(n *node, keyBuffer []byte) []byte {
// Calculate the size of the largest child key of this node.
var maxBitLength int
for _, childEntry := range n.children {
maxBitLength = max(maxBitLength, childEntry.compressedKey.length)
}
maxBytesNeeded := bytesNeeded(n.key.length + v.tokenSize + maxBitLength)
return setBytesLength(keyBuffer, maxBytesNeeded)
}

func setBytesLength(b []byte, size int) []byte {
if size <= cap(b) {
return b[:size]
}
return append(b[:cap(b)], make([]byte, size-cap(b))...)
}

// GetProof returns a proof that [bytesPath] is in or not in trie [t].
Expand Down

0 comments on commit d7452d3

Please sign in to comment.