From 72057bfee5f96c1a17490ed3439d4ed189f7e7b9 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 14 Jun 2024 00:00:25 -0400 Subject: [PATCH] Optimize to avoid eth_getBlockByNumber when keeping up with head Signed-off-by: Peter Broadhurst --- .../confirmations/confirmed_block_listener.go | 133 +++++++++++++----- .../confirmed_block_listener_test.go | 68 +++++++-- 2 files changed, 158 insertions(+), 43 deletions(-) diff --git a/internal/confirmations/confirmed_block_listener.go b/internal/confirmations/confirmed_block_listener.go index 21ab7e19..0854423a 100644 --- a/internal/confirmations/confirmed_block_listener.go +++ b/internal/confirmations/confirmed_block_listener.go @@ -52,6 +52,7 @@ type confirmedBlockListener struct { waitingForFromBlock bool rollingCheckpoint *ffcapi.BlockListenerCheckpoint blocksSinceCheckpoint []*apitypes.BlockInfo + newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint newBlockHashes chan *ffcapi.BlockHashEvent dispatcherTap chan struct{} eventStream chan<- *ffcapi.ListenerEvent @@ -187,19 +188,47 @@ func (cbl *confirmedBlockListener) processBlockNotification(block *apitypes.Bloc return } - // Otherwise see if it's a conflicting fork to any of our existing blocks - for idx, existingBlock := range cbl.blocksSinceCheckpoint { - if existingBlock.BlockNumber == block.BlockNumber { - // Must discard up to this point - cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0:idx] - // This block fits, and add this on the end. - if idx == 0 || block.ParentHash == cbl.blocksSinceCheckpoint[idx-1].BlockHash { - log.L(cbl.ctx).Debugf("Notification of block %d/%s after block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash) - cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint[0:idx], block) - } else { - log.L(cbl.ctx).Debugf("Notification of block %d/%s conflicting with previous block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash) + // If the block immediate adds onto the set of blocks being processed, then we just attach it there + // and notify the dispatcher to process it directly. No need for the other routine to query again. + // When we're in steady state listening to the stable head of the chain, this should be the most common case. + var dispatchHead *apitypes.BlockInfo + if len(cbl.newHeadToAdd) > 0 { + // we've snuck in multiple notifications while the dispatcher is busy... don't add indefinitely to this list + if len(cbl.newHeadToAdd) > 10 /* not considered worth adding/explaining a tuning property for this */ { + log.L(cbl.ctx).Infof("Block listener fell behind head of chain") + cbl.newHeadToAdd = nil + } else { + dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1] + } + } + if dispatchHead == nil && len(cbl.blocksSinceCheckpoint) > 0 { + dispatchHead = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1] + } + switch { + case dispatchHead != nil && block.BlockNumber == dispatchHead.BlockNumber+1 && block.ParentHash == dispatchHead.BlockHash: + // Ok - we just need to pop it onto the list, and wake the thread + log.L(cbl.ctx).Debugf("Directly passing block %d/%s to dispatcher after block %d/%s", block.BlockNumber, block.BlockHash, dispatchHead.BlockNumber, dispatchHead.BlockHash) + cbl.newHeadToAdd = append(cbl.newHeadToAdd, block) + case dispatchHead == nil: + // The dispatcher will check these against the checkpoint block before pulling them to the blocksSinceCheckpoint list + log.L(cbl.ctx).Debugf("Directly passing block %d/%s to dispatcher as no blocks pending", block.BlockNumber, block.BlockHash) + cbl.newHeadToAdd = append(cbl.newHeadToAdd, block) + default: + // Otherwise see if it's a conflicting fork to any of our existing blocks + for idx, existingBlock := range cbl.blocksSinceCheckpoint { + if existingBlock.BlockNumber == block.BlockNumber { + // Must discard up to this point + cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0:idx] + cbl.newHeadToAdd = nil + // This block fits, slot it into this point in the chain + if idx == 0 || block.ParentHash == cbl.blocksSinceCheckpoint[idx-1].BlockHash { + log.L(cbl.ctx).Debugf("Notification of re-org %d/%s replacing block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash) + cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint[0:idx], block) + } else { + log.L(cbl.ctx).Debugf("Notification of block %d/%s conflicting with previous block %d/%s", block.BlockNumber, block.BlockHash, existingBlock.BlockNumber, existingBlock.BlockHash) + } + break } - break } } @@ -221,10 +250,9 @@ func (cbl *confirmedBlockListener) dispatcher() { for { if !cbl.waitingForFromBlock { - // tight spin getting blocks until we it looks like we need to wait for a notification - for cbl.getNextBlock() { - // In all cases we ensure that we move our confirmation window forwards. - // The checkpoint block is always final, and we never move backwards + // spin getting blocks until we it looks like we need to wait for a notification + lastFromNotification := false + for cbl.readNextBlock(&lastFromNotification) { cbl.dispatchAllConfirmed() } } @@ -239,44 +267,78 @@ func (cbl *confirmedBlockListener) dispatcher() { } } -func (cbl *confirmedBlockListener) getNextBlock() (more bool) { +// MUST be called under lock +func (cbl *confirmedBlockListener) popDispatchedIfAvailable(lastFromNotification *bool) (blockNumberToFetch uint64, found bool) { + + if len(cbl.newHeadToAdd) > 0 { + // If we find one in the lock, it must be ready for us to append + nextBlock := cbl.newHeadToAdd[0] + cbl.newHeadToAdd = append([]*apitypes.BlockInfo{}, cbl.newHeadToAdd[1:]...) + cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock) + + // We track that we've done this, so we know if we run out going round the loop later, + // there's no point in doing a get-by-number + *lastFromNotification = true + return 0, true + } + + blockNumberToFetch = cbl.fromBlock + if cbl.rollingCheckpoint != nil && cbl.rollingCheckpoint.Block >= cbl.fromBlock { + blockNumberToFetch = cbl.rollingCheckpoint.Block + 1 + } + if len(cbl.blocksSinceCheckpoint) > 0 { + blockNumberToFetch = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockNumber.Uint64() + 1 + } + return blockNumberToFetch, false +} + +func (cbl *confirmedBlockListener) readNextBlock(lastFromNotification *bool) (found bool) { var nextBlock *apitypes.BlockInfo + var blockNumberToFetch uint64 + var dispatchedPopped bool err := cbl.retry.Do(cbl.ctx, "next block", func(_ int) (retry bool, err error) { - // Find the highest block in the lock + // If the notifier has lined up a block for us grab it before cbl.stateLock.Lock() - blockNumberToFetch := cbl.fromBlock - if cbl.rollingCheckpoint != nil && cbl.rollingCheckpoint.Block >= cbl.fromBlock { - blockNumberToFetch = cbl.rollingCheckpoint.Block + 1 - } - if len(cbl.blocksSinceCheckpoint) > 0 { - blockNumberToFetch = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockNumber.Uint64() + 1 - } + blockNumberToFetch, dispatchedPopped = cbl.popDispatchedIfAvailable(lastFromNotification) cbl.stateLock.Unlock() + if dispatchedPopped || *lastFromNotification { + // We processed a dispatch this time, or last time. + // Either way we're tracking at the head and there's no point doing a query + // we expect to return nothing - as we should get another notification. + return false, nil + } // Get the next block nextBlock, err = cbl.bcm.getBlockByNumber(blockNumberToFetch, false, "") return true, err }) if nextBlock == nil || err != nil { - // We didn't get the next block, and maybe our context completed - return false + // We either got a block dispatched, or did not find a block ourselves. + return dispatchedPopped } // In the lock append it to our list, checking it's valid to append to what we have cbl.stateLock.Lock() defer cbl.stateLock.Unlock() - if len(cbl.blocksSinceCheckpoint) > 0 { - if cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockHash != nextBlock.ParentHash { - // This doesn't attach to the end of our list. Trim it off and try again. - cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0 : len(cbl.blocksSinceCheckpoint)-1] - return true + // We have to check because we unlocked, that we weren't beaten to the punch while we queried + // by the dispatcher. + if _, dispatchedPopped = cbl.popDispatchedIfAvailable(lastFromNotification); !dispatchedPopped { + + // It's possible that while we were off at the node querying this, a notification came in + // that affected our state. We need to check this still matches, or go round again + if len(cbl.blocksSinceCheckpoint) > 0 { + if cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1].BlockHash != nextBlock.ParentHash { + // This doesn't attach to the end of our list. Trim it off and try again. + cbl.blocksSinceCheckpoint = cbl.blocksSinceCheckpoint[0 : len(cbl.blocksSinceCheckpoint)-1] + return true + } } - } - // We successfully attached it - cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock) + // We successfully attached it + cbl.blocksSinceCheckpoint = append(cbl.blocksSinceCheckpoint, nextBlock) + } return true } @@ -309,6 +371,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() { if toDispatch == nil { return } + log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.BlockEvent.BlockNumber.Uint64(), toDispatch.BlockEvent.BlockHash) select { case cbl.eventStream <- toDispatch: case <-cbl.ctx.Done(): diff --git a/internal/confirmations/confirmed_block_listener_test.go b/internal/confirmations/confirmed_block_listener_test.go index 5991daed..14537b3a 100644 --- a/internal/confirmations/confirmed_block_listener_test.go +++ b/internal/confirmations/confirmed_block_listener_test.go @@ -98,20 +98,21 @@ func TestCBLListenFromCurrentBlock(t *testing.T) { blocks := testBlockArray(15) - mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything) - mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) }) - mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything) mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) }) + mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Maybe() + bcm.requiredConfirmations = 5 cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch) assert.NoError(t, err) // Notify starting at block 5 - bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{ - BlockHashes: []string{blocks[5].BlockHash}, - }) + for i := 5; i < len(blocks); i++ { + bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{ + BlockHashes: []string{blocks[i].BlockHash}, + }) + } // Randomly notify below that too, which will be ignored bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{ @@ -120,6 +121,7 @@ func TestCBLListenFromCurrentBlock(t *testing.T) { for i := 5; i < len(blocks)-bcm.requiredConfirmations; i++ { b := <-esDispatch + assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber) assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) } @@ -264,8 +266,8 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o time.Sleep(1 * time.Millisecond) assert.LessOrEqual(t, len(cbl.blocksSinceCheckpoint), bcm.requiredConfirmations) select { - case <-esDispatch: - assert.Fail(t, "should not have received block in confirmation window") + case b := <-esDispatch: + assert.Fail(t, fmt.Sprintf("should not have received block in confirmation window: %d/%s", b.BlockEvent.BlockNumber.Int64(), b.BlockEvent.BlockHash)) default: // good - we should have the confirmations sat there, but no dispatch } @@ -324,6 +326,56 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) { mca.AssertExpectations(t) } +func TestCBLDispatcherFallsBehindHead(t *testing.T) { + bcm, mca := newTestBlockConfirmationManager() + + esDispatch := make(chan *ffcapi.ListenerEvent) + + id := fftypes.NewUUID() + + blocks := testBlockArray(30) + + mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything) + mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) }) + + // We'll fall back to this because we don't keep up + mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything) + mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) }) + + bcm.requiredConfirmations = 5 + cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch) + assert.NoError(t, err) + + // Notify all the blocks before we process any + for i := 0; i < len(blocks); i++ { + bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{ + BlockHashes: []string{blocks[i].BlockHash}, + }) + } + + for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { + // The dispatches should have been added, until it got too far ahead + // and then set to nil. + for cbl.newHeadToAdd != nil { + time.Sleep(1 * time.Millisecond) + } + b := <-esDispatch + assert.Equal(t, b.BlockEvent.BlockNumber, blocks[i].BlockNumber) + assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) + } + + time.Sleep(1 * time.Millisecond) + assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) + select { + case <-esDispatch: + assert.Fail(t, "should not have received block in confirmation window") + default: // good - we should have the confirmations sat there, but no dispatch + } + + bcm.Stop() + mca.AssertExpectations(t) +} + func TestCBLStartBadFromBlock(t *testing.T) { bcm, mca := newTestBlockConfirmationManager()