Skip to content

Commit

Permalink
Optimize to avoid eth_getBlockByNumber when keeping up with head
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jun 14, 2024
1 parent 072aa38 commit 72057bf
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 43 deletions.
133 changes: 98 additions & 35 deletions internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type confirmedBlockListener struct {
waitingForFromBlock bool
rollingCheckpoint *ffcapi.BlockListenerCheckpoint
blocksSinceCheckpoint []*apitypes.BlockInfo
newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint
newBlockHashes chan *ffcapi.BlockHashEvent
dispatcherTap chan struct{}
eventStream chan<- *ffcapi.ListenerEvent
Expand Down Expand Up @@ -187,19 +188,47 @@ func (cbl *confirmedBlockListener) processBlockNotification(block *apitypes.Bloc
return
}

// Otherwise see if it's a conflicting fork to any of our existing blocks
for idx, existingBlock := range cbl.blocksSinceCheckpoint {
if existingBlock.BlockNumber == block.BlockNumber {
// Must discard up to this point
cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0:idx]
// This block fits, and add this on the end.
if idx == 0 || block.ParentHash == cbl.blocksSinceCheckpoint[idx-1].BlockHash {
log.L(cbl.ctx).Debugf("Notification of block %d/%s after block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash)
cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint[0:idx], block)
} else {
log.L(cbl.ctx).Debugf("Notification of block %d/%s conflicting with previous block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash)
// If the block immediate adds onto the set of blocks being processed, then we just attach it there
// and notify the dispatcher to process it directly. No need for the other routine to query again.
// When we're in steady state listening to the stable head of the chain, this should be the most common case.
var dispatchHead *apitypes.BlockInfo
if len(cbl.newHeadToAdd) > 0 {
// we've snuck in multiple notifications while the dispatcher is busy... don't add indefinitely to this list
if len(cbl.newHeadToAdd) > 10 /* not considered worth adding/explaining a tuning property for this */ {
log.L(cbl.ctx).Infof("Block listener fell behind head of chain")
cbl.newHeadToAdd = nil
} else {
dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1]
}
}
if dispatchHead == nil && len(cbl.blocksSinceCheckpoint) > 0 {
dispatchHead = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1]
}
switch {
case dispatchHead != nil && block.BlockNumber == dispatchHead.BlockNumber+1 && block.ParentHash == dispatchHead.BlockHash:
// Ok - we just need to pop it onto the list, and wake the thread
log.L(cbl.ctx).Debugf("Directly passing block %d/%s to dispatcher after block %d/%s", block.BlockNumber, block.BlockHash, dispatchHead.BlockNumber, dispatchHead.BlockHash)
cbl.newHeadToAdd = append(cbl.newHeadToAdd, block)
case dispatchHead == nil:
// The dispatcher will check these against the checkpoint block before pulling them to the blocksSinceCheckpoint list
log.L(cbl.ctx).Debugf("Directly passing block %d/%s to dispatcher as no blocks pending", block.BlockNumber, block.BlockHash)
cbl.newHeadToAdd = append(cbl.newHeadToAdd, block)
default:
// Otherwise see if it's a conflicting fork to any of our existing blocks
for idx, existingBlock := range cbl.blocksSinceCheckpoint {
if existingBlock.BlockNumber == block.BlockNumber {
// Must discard up to this point
cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0:idx]
cbl.newHeadToAdd = nil
// This block fits, slot it into this point in the chain
if idx == 0 || block.ParentHash == cbl.blocksSinceCheckpoint[idx-1].BlockHash {
log.L(cbl.ctx).Debugf("Notification of re-org %d/%s replacing block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash)
cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint[0:idx], block)
} else {
log.L(cbl.ctx).Debugf("Notification of block %d/%s conflicting with previous block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash)
}
break
}
break
}
}

Expand All @@ -221,10 +250,9 @@ func (cbl *confirmedBlockListener) dispatcher() {

for {
if !cbl.waitingForFromBlock {
// tight spin getting blocks until we it looks like we need to wait for a notification
for cbl.getNextBlock() {
// In all cases we ensure that we move our confirmation window forwards.
// The checkpoint block is always final, and we never move backwards
// spin getting blocks until we it looks like we need to wait for a notification
lastFromNotification := false
for cbl.readNextBlock(&lastFromNotification) {
cbl.dispatchAllConfirmed()
}
}
Expand All @@ -239,44 +267,78 @@ func (cbl *confirmedBlockListener) dispatcher() {
}
}

func (cbl *confirmedBlockListener) getNextBlock() (more bool) {
// MUST be called under lock
func (cbl *confirmedBlockListener) popDispatchedIfAvailable(lastFromNotification *bool) (blockNumberToFetch uint64, found bool) {

if len(cbl.newHeadToAdd) > 0 {
// If we find one in the lock, it must be ready for us to append
nextBlock := cbl.newHeadToAdd[0]
cbl.newHeadToAdd = append([]*apitypes.BlockInfo{}, cbl.newHeadToAdd[1:]...)
cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock)

// We track that we've done this, so we know if we run out going round the loop later,
// there's no point in doing a get-by-number
*lastFromNotification = true
return 0, true
}

blockNumberToFetch = cbl.fromBlock
if cbl.rollingCheckpoint != nil && cbl.rollingCheckpoint.Block >= cbl.fromBlock {
blockNumberToFetch = cbl.rollingCheckpoint.Block + 1
}
if len(cbl.blocksSinceCheckpoint) > 0 {
blockNumberToFetch = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockNumber.Uint64() + 1
}
return blockNumberToFetch, false
}

func (cbl *confirmedBlockListener) readNextBlock(lastFromNotification *bool) (found bool) {

var nextBlock *apitypes.BlockInfo
var blockNumberToFetch uint64
var dispatchedPopped bool
err := cbl.retry.Do(cbl.ctx, "next block", func(_ int) (retry bool, err error) {
// Find the highest block in the lock
// If the notifier has lined up a block for us grab it before
cbl.stateLock.Lock()
blockNumberToFetch := cbl.fromBlock
if cbl.rollingCheckpoint != nil && cbl.rollingCheckpoint.Block >= cbl.fromBlock {
blockNumberToFetch = cbl.rollingCheckpoint.Block + 1
}
if len(cbl.blocksSinceCheckpoint) > 0 {
blockNumberToFetch = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockNumber.Uint64() + 1
}
blockNumberToFetch, dispatchedPopped = cbl.popDispatchedIfAvailable(lastFromNotification)
cbl.stateLock.Unlock()
if dispatchedPopped || *lastFromNotification {
// We processed a dispatch this time, or last time.
// Either way we're tracking at the head and there's no point doing a query
// we expect to return nothing - as we should get another notification.
return false, nil
}

// Get the next block
nextBlock, err = cbl.bcm.getBlockByNumber(blockNumberToFetch, false, "")
return true, err
})
if nextBlock == nil || err != nil {
// We didn't get the next block, and maybe our context completed
return false
// We either got a block dispatched, or did not find a block ourselves.
return dispatchedPopped
}

// In the lock append it to our list, checking it's valid to append to what we have
cbl.stateLock.Lock()
defer cbl.stateLock.Unlock()

if len(cbl.blocksSinceCheckpoint) > 0 {
if cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockHash != nextBlock.ParentHash {
// This doesn't attach to the end of our list. Trim it off and try again.
cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0 : len(cbl.blocksSinceCheckpoint)-1]
return true
// We have to check because we unlocked, that we weren't beaten to the punch while we queried
// by the dispatcher.
if _, dispatchedPopped = cbl.popDispatchedIfAvailable(lastFromNotification); !dispatchedPopped {

// It's possible that while we were off at the node querying this, a notification came in
// that affected our state. We need to check this still matches, or go round again
if len(cbl.blocksSinceCheckpoint) > 0 {
if cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockHash != nextBlock.ParentHash {
// This doesn't attach to the end of our list. Trim it off and try again.
cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0 : len(cbl.blocksSinceCheckpoint)-1]
return true
}
}
}

// We successfully attached it
cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock)
// We successfully attached it
cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock)
}
return true

}
Expand Down Expand Up @@ -309,6 +371,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() {
if toDispatch == nil {
return
}
log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.BlockEvent.BlockNumber.Uint64(), toDispatch.BlockEvent.BlockHash)
select {
case cbl.eventStream <- toDispatch:
case <-cbl.ctx.Done():
Expand Down
68 changes: 60 additions & 8 deletions internal/confirmations/confirmed_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,21 @@ func TestCBLListenFromCurrentBlock(t *testing.T) {

blocks := testBlockArray(15)

mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything)
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything)
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) })

mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Maybe()

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
assert.NoError(t, err)

// Notify starting at block 5
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[5].BlockHash},
})
for i := 5; i < len(blocks); i++ {
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[i].BlockHash},
})
}

// Randomly notify below that too, which will be ignored
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
Expand All @@ -120,6 +121,7 @@ func TestCBLListenFromCurrentBlock(t *testing.T) {

for i := 5; i < len(blocks)-bcm.requiredConfirmations; i++ {
b := <-esDispatch
assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber)
assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo)
}

Expand Down Expand Up @@ -264,8 +266,8 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o
time.Sleep(1 * time.Millisecond)
assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), bcm.requiredConfirmations)
select {
case <-esDispatch:
assert.Fail(t, "should not have received block in confirmation window")
case b := <-esDispatch:
assert.Fail(t, fmt.Sprintf("should not have received block in confirmation window: %d/%s", b.BlockEvent.BlockNumber.Int64(), b.BlockEvent.BlockHash))
default: // good - we should have the confirmations sat there, but no dispatch
}

Expand Down Expand Up @@ -324,6 +326,56 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) {
mca.AssertExpectations(t)
}

func TestCBLDispatcherFallsBehindHead(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()

esDispatch := make(chan *ffcapi.ListenerEvent)

id := fftypes.NewUUID()

blocks := testBlockArray(30)

mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything)
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) })

// We'll fall back to this because we don't keep up
mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything)
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
assert.NoError(t, err)

// Notify all the blocks before we process any
for i := 0; i < len(blocks); i++ {
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[i].BlockHash},
})
}

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
// The dispatches should have been added, until it got too far ahead
// and then set to nil.
for cbl.newHeadToAdd != nil {
time.Sleep(1 * time.Millisecond)
}
b := <-esDispatch
assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber)
assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo)
}

time.Sleep(1 * time.Millisecond)
assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations)
select {
case <-esDispatch:
assert.Fail(t, "should not have received block in confirmation window")
default: // good - we should have the confirmations sat there, but no dispatch
}

bcm.Stop()
mca.AssertExpectations(t)
}

func TestCBLStartBadFromBlock(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()

Expand Down

0 comments on commit 72057bf

Please sign in to comment.