From 3e9573b36c86f6418686f596e0c70cb5dc71db6a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:05:01 +0200 Subject: [PATCH 1/8] rescan+refactor: use a rescanState struct for rescan variables Introduce a new `rescanState` struct which holds the variables required for a rescan. We then also conver the existing `rescan` function into a method of `rescanState`. This refactor will assist in making future clean up of the `rescan` method easier. --- rescan.go | 178 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 109 insertions(+), 69 deletions(-) diff --git a/rescan.go b/rescan.go index 9d7d9831..6a47e8a5 100644 --- a/rescan.go +++ b/rescan.go @@ -289,9 +289,29 @@ func updateChan(update <-chan *updateOptions) RescanOption { } } -// rescan is a single-threaded function that uses headers from the database and -// functional options as arguments. -func rescan(chain ChainSource, options ...RescanOption) error { +// rescanState hold the state used throughout a rescan. +type rescanState struct { + // chain is the backend chain that the rescan has access to. + chain ChainSource + + // opts holds the various rescan configuration options. + opts *rescanOptions + + // curHeader is the block header of our current position in the chain. + curHeader wire.BlockHeader + + // curStamp is the block stamp of our current position in the chain. + curStamp headerfs.BlockStamp + + // scanning is true if the current block should be scanned for filter + // matches. + scanning bool +} + +// newRescanState constructs a new rescanState. +func newRescanState(chain ChainSource, options ...RescanOption) (*rescanState, + error) { + // First, we'll apply the set of default options, then serially apply // all the options that've been passed in. ro := defaultRescanOptions() @@ -303,12 +323,17 @@ func rescan(chain ChainSource, options ...RescanOption) error { option(ro) } + rs := &rescanState{ + chain: chain, + opts: ro, + } + // If we have something to watch, create a watch list. The watch list // can be composed of a set of scripts, outpoints, and txids. for _, addr := range ro.watchAddrs { script, err := txscript.PayToAddrScript(addr) if err != nil { - return err + return nil, err } ro.watchList = append(ro.watchList, script) @@ -354,57 +379,61 @@ func rescan(chain ChainSource, options ...RescanOption) error { // If we don't have a quit channel, and the end height is still // unspecified, then we'll exit out here. if ro.quit == nil && ro.endBlock.Height == 0 { - return fmt.Errorf("rescan request must specify a quit channel" + - " or valid end block") + return nil, fmt.Errorf("rescan request must specify a quit " + + "channel or valid end block") } - // Track our position in the chain. - var ( - curHeader wire.BlockHeader - curStamp headerfs.BlockStamp - ) - // If no start block is specified, start the scan from our current best // block. if ro.startBlock == nil { bs, err := chain.BestBlock() if err != nil { - return err + return nil, err } ro.startBlock = bs } - curStamp = *ro.startBlock + rs.curStamp = *ro.startBlock // To find our starting block, either the start hash should be set, or // the start height should be set. If neither is, then we'll be // starting from the genesis block. - if (curStamp.Hash != chainhash.Hash{}) { - header, height, err := chain.GetBlockHeader(&curStamp.Hash) + if (rs.curStamp.Hash != chainhash.Hash{}) { + header, height, err := chain.GetBlockHeader(&rs.curStamp.Hash) if err == nil { - curHeader = *header - curStamp.Height = int32(height) + rs.curHeader = *header + rs.curStamp.Height = int32(height) } else { - curStamp.Hash = chainhash.Hash{} + rs.curStamp.Hash = chainhash.Hash{} } } - if (curStamp.Hash == chainhash.Hash{}) { - if curStamp.Height == 0 { - curStamp.Hash = *chain.ChainParams().GenesisHash + if (rs.curStamp.Hash == chainhash.Hash{}) { + if rs.curStamp.Height == 0 { + rs.curStamp.Hash = *chain.ChainParams().GenesisHash } else { header, err := chain.GetBlockHeaderByHeight( - uint32(curStamp.Height), + uint32(rs.curStamp.Height), ) if err == nil { - curHeader = *header - curStamp.Hash = curHeader.BlockHash() + rs.curHeader = *header + rs.curStamp.Hash = rs.curHeader.BlockHash() } else { - curHeader = chain.ChainParams().GenesisBlock.Header - curStamp.Hash = *chain.ChainParams().GenesisHash - curStamp.Height = 0 + chainParams := chain.ChainParams() + rs.curHeader = chainParams.GenesisBlock.Header + rs.curStamp.Hash = *chainParams.GenesisHash + rs.curStamp.Height = 0 } } } + return rs, nil +} + +// rescan is a single-threaded function that uses headers from the database and +// functional options as arguments. +func (rs *rescanState) rescan() error { + chain := rs.chain + ro := rs.opts + // Now that we've determined the starting point of our rescan, we can // begin processing updates from the client. var updates []*updateOptions @@ -419,9 +448,9 @@ func rescan(chain ChainSource, options ...RescanOption) error { // If it hasn't, we'll subscribe for block notifications at tip and wait // until we receive a notification for a block with the rescan's // starting height. - if bestBlock.Height < curStamp.Height { + if bestBlock.Height < rs.curStamp.Height { log.Debugf("Waiting to catch up to the rescan start height=%d "+ - "from height=%d", curStamp.Height, bestBlock.Height) + "from height=%d", rs.curStamp.Height, bestBlock.Height) blockSubscription, err := chain.Subscribe( uint32(bestBlock.Height), @@ -452,7 +481,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { if _, ok := ntfn.(*blockntfns.Connected); !ok { continue } - if ntfn.Height() < uint32(curStamp.Height) { + if ntfn.Height() < uint32(rs.curStamp.Height) { continue } @@ -470,7 +499,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { // start height of the rescan, apply them now. for _, upd := range updates { _, err := ro.updateFilter( - chain, upd, &curStamp, &curHeader, + chain, upd, &rs.curStamp, &rs.curHeader, ) if err != nil { return err @@ -478,8 +507,8 @@ func rescan(chain ChainSource, options ...RescanOption) error { } } - log.Debugf("Starting rescan from known block %d (%s)", curStamp.Height, - curStamp.Hash) + log.Debugf("Starting rescan from known block %d (%s)", + rs.curStamp.Height, rs.curStamp.Hash) // Compare the start time to the start block. If the start time is // later, cycle through blocks until we find a block timestamp later @@ -487,7 +516,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { // time is non-monotonic between blocks, we look for the first block to // trip the switch, and download filters from there, rather than // checking timestamps at each block. - scanning := ro.startTime.Before(curHeader.Timestamp) + rs.scanning = ro.startTime.Before(rs.curHeader.Timestamp) // Even though we'll have multiple subscriptions, they'll always be // referred to by the same variable, so we only need to defer its @@ -521,16 +550,16 @@ func rescan(chain ChainSource, options ...RescanOption) error { // mark ourselves as not current so we can walk down the chain // and notify the callers of blocks we may have missed. header := ntfn.Header() - if header.PrevBlock != curStamp.Hash { + if header.PrevBlock != rs.curStamp.Hash { return fmt.Errorf("out of order block %v: expected "+ "PrevBlock %v, got %v", header.BlockHash(), - curStamp.Hash, header.PrevBlock) + rs.curStamp.Hash, header.PrevBlock) } // Ensure the filter header still exists before attempting to // fetch the filter. This should usually succeed since // notifications are delivered once filter headers are synced. - nextBlockHeight := uint32(curStamp.Height + 1) + nextBlockHeight := uint32(rs.curStamp.Height + 1) _, err := chain.GetFilterHeaderByHeight(nextBlockHeight) if err != nil { return fmt.Errorf("unable to get filter header for "+ @@ -549,13 +578,13 @@ func rescan(chain ChainSource, options ...RescanOption) error { // We're only scanning if the header is beyond the horizon of // our start time. - if !scanning { - scanning = ro.startTime.Before(header.Timestamp) + if !rs.scanning { + rs.scanning = ro.startTime.Before(header.Timestamp) } // If we're not scanning or our watch list is empty, then we can // just notify the block without fetching any filters/blocks. - if !scanning || len(ro.watchList) == 0 { + if !rs.scanning || len(ro.watchList) == 0 { if ro.ntfn.OnFilteredBlockConnected != nil { ro.ntfn.OnFilteredBlockConnected( newStamp.Height, &header, nil, @@ -568,8 +597,8 @@ func rescan(chain ChainSource, options ...RescanOption) error { ) } - curHeader = header - curStamp = newStamp + rs.curHeader = header + rs.curStamp = newStamp return nil } @@ -586,7 +615,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { // the peer(s) that we're trying to fetch from are in // the progress of a re-org. log.Errorf("unable to get filter for hash=%v, "+ - "retrying: %v", curStamp.Hash, err) + "retrying: %v", rs.curStamp.Hash, err) return errRetryBlock } @@ -599,8 +628,8 @@ func rescan(chain ChainSource, options ...RescanOption) error { // With the block successfully notified, we'll advance our state // to it. - curHeader = header - curStamp = newStamp + rs.curHeader = header + rs.curStamp = newStamp return nil } @@ -614,7 +643,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { // Only deal with it if it's the current block we know about. // Otherwise, it's in the future. - if blockDisconnected.BlockHash() != curStamp.Hash { + if blockDisconnected.BlockHash() != rs.curStamp.Hash { return nil } @@ -622,19 +651,19 @@ func rescan(chain ChainSource, options ...RescanOption) error { // include deprecated calls as they're still used, for now. if ro.ntfn.OnFilteredBlockDisconnected != nil { ro.ntfn.OnFilteredBlockDisconnected( - curStamp.Height, &curHeader, + rs.curStamp.Height, &rs.curHeader, ) } if ro.ntfn.OnBlockDisconnected != nil { // nolint:staticcheck ro.ntfn.OnBlockDisconnected( // nolint:staticcheck - &curStamp.Hash, curStamp.Height, - curHeader.Timestamp, + &rs.curStamp.Hash, rs.curStamp.Height, + rs.curHeader.Timestamp, ) } - curHeader = ntfn.ChainTip() - curStamp.Hash = curHeader.BlockHash() - curStamp.Height-- + rs.curHeader = ntfn.ChainTip() + rs.curStamp.Hash = rs.curHeader.BlockHash() + rs.curStamp.Height-- return nil } @@ -651,9 +680,9 @@ rescanLoop: for { // If we've reached the ending height or hash for this rescan, // then we'll exit. - if curStamp.Hash == ro.endBlock.Hash || + if rs.curStamp.Hash == ro.endBlock.Hash || (ro.endBlock.Height > 0 && - curStamp.Height == ro.endBlock.Height) { + rs.curStamp.Height == ro.endBlock.Height) { return nil } @@ -677,7 +706,8 @@ rescanLoop: // requested client. case update := <-ro.update: rewound, err := ro.updateFilter( - chain, update, &curStamp, &curHeader, + chain, update, &rs.curStamp, + &rs.curHeader, ) if err != nil { return err @@ -691,7 +721,8 @@ rescanLoop: if rewound { log.Tracef("Rewound to block %d (%s), "+ "no longer current", - curStamp.Height, curStamp.Hash) + rs.curStamp.Height, + rs.curStamp.Hash) current = false blockSubscription.Cancel() @@ -825,8 +856,8 @@ rescanLoop: select { case update := <-ro.update: _, err := ro.updateFilter( - chain, update, &curStamp, - &curHeader, + chain, update, &rs.curStamp, + &rs.curHeader, ) if err != nil { return err @@ -846,7 +877,7 @@ rescanLoop: // the block. If the next height is above the best // height known to the chain service, then we mark // ourselves as current and follow notifications. - nextHeight := curStamp.Height + 1 + nextHeight := rs.curStamp.Height + 1 if nextHeight > bestBlock.Height { // Ensure we cancel the old subscription if // we're going back to scan for missed blocks. @@ -856,7 +887,7 @@ rescanLoop: // Subscribe to block notifications. blockSubscription, err = chain.Subscribe( - uint32(curStamp.Height), + uint32(rs.curStamp.Height), ) if err != nil { return fmt.Errorf("unable to register "+ @@ -865,7 +896,7 @@ rescanLoop: log.Debugf("Rescan became current at %d (%s), "+ "subscribing to block notifications", - curStamp.Height, curStamp.Hash) + rs.curStamp.Height, rs.curStamp.Hash) current = true blockRetryQueue.clear() @@ -884,14 +915,20 @@ rescanLoop: return err } - curHeader = *header - curStamp.Height++ - curStamp.Hash = header.BlockHash() + rs.curHeader = *header + rs.curStamp.Height++ + rs.curStamp.Hash = header.BlockHash() - if !scanning { - scanning = ro.startTime.Before(curHeader.Timestamp) + if !rs.scanning { + rs.scanning = ro.startTime.Before( + rs.curHeader.Timestamp, + ) } - err = notifyBlock(chain, ro, curHeader, curStamp, scanning) + + err = notifyBlock( + chain, ro, rs.curHeader, rs.curStamp, + rs.scanning, + ) if err != nil { return err } @@ -1323,7 +1360,10 @@ func (r *Rescan) Start() <-chan error { defer r.wg.Done() rescanArgs := append(r.options, updateChan(r.updateChan)) // nolint - err := rescan(r.chain, rescanArgs...) + rs, err := newRescanState(r.chain, rescanArgs...) + if err == nil { + err = rs.rescan() + } close(r.running) From 818428fa37749c2c2223b451cc36b80b2e0fb6ea Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:13:53 +0200 Subject: [PATCH 2/8] rescan+refactor: convert notifyBlock to rescanState method --- rescan.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rescan.go b/rescan.go index 6a47e8a5..c4eb58bb 100644 --- a/rescan.go +++ b/rescan.go @@ -925,10 +925,7 @@ rescanLoop: ) } - err = notifyBlock( - chain, ro, rs.curHeader, rs.curStamp, - rs.scanning, - ) + err = rs.notifyBlock() if err != nil { return err } @@ -937,19 +934,19 @@ rescanLoop: } // notifyBlock calls appropriate listeners based on the block filter. -func notifyBlock(chain ChainSource, ro *rescanOptions, - curHeader wire.BlockHeader, curStamp headerfs.BlockStamp, - scanning bool) error { +func (rs *rescanState) notifyBlock() error { + chain := rs.chain + ro := rs.opts // Find relevant transactions based on watch list. If scanning is // false, we can safely assume this block has no relevant transactions. var relevantTxs []*btcutil.Tx - if len(ro.watchList) != 0 && scanning { + if len(ro.watchList) != 0 && rs.scanning { // If we have a non-empty watch list, then we need to see if it // matches the rescan's filters, so we get the basic filter // from the DB or network. matched, filter, err := blockFilterMatches( - chain, ro, &curStamp.Hash, + chain, ro, &rs.curStamp.Hash, ) if err != nil { return err @@ -957,7 +954,7 @@ func notifyBlock(chain ChainSource, ro *rescanOptions, if matched { relevantTxs, err = extractBlockMatches( - chain, ro, &curStamp, filter, + chain, ro, &rs.curStamp, filter, ) if err != nil { return err @@ -966,13 +963,16 @@ func notifyBlock(chain ChainSource, ro *rescanOptions, } if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected(curStamp.Height, &curHeader, - relevantTxs) + ro.ntfn.OnFilteredBlockConnected( + rs.curStamp.Height, &rs.curHeader, relevantTxs, + ) } if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected(&curStamp.Hash, // nolint:staticcheck - curStamp.Height, curHeader.Timestamp) + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &rs.curStamp.Hash, rs.curStamp.Height, + rs.curHeader.Timestamp, + ) } return nil From 2e0df032f796d879fa59320f8181daf1211f3359 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:25:10 +0200 Subject: [PATCH 3/8] rescan: convert notifyBlockWithFilter to rescanState method --- rescan.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/rescan.go b/rescan.go index c4eb58bb..382d3d9f 100644 --- a/rescan.go +++ b/rescan.go @@ -619,9 +619,7 @@ func (rs *rescanState) rescan() error { return errRetryBlock } - err = notifyBlockWithFilter( - chain, ro, &header, &newStamp, blockFilter, - ) + err = rs.notifyBlockWithFilter(&header, &newStamp, blockFilter) if err != nil { return err } @@ -1061,9 +1059,11 @@ func extractBlockMatches(chain ChainSource, ro *rescanOptions, // notifyBlockWithFilter calls appropriate listeners based on the block filter. // This differs from notifyBlock in that is expects the caller to already have // obtained the target filter. -func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, - curHeader *wire.BlockHeader, curStamp *headerfs.BlockStamp, - filter *gcs.Filter) error { +func (rs *rescanState) notifyBlockWithFilter(header *wire.BlockHeader, + stamp *headerfs.BlockStamp, filter *gcs.Filter) error { + + chain := rs.chain + ro := rs.opts // Based on what we find within the block or the filter, we'll be // sending out a set of notifications with transactions that are @@ -1074,14 +1074,14 @@ func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, // match the items within the filter to ensure we create any relevant // notifications. if filter != nil { - matched, err := matchBlockFilter(ro, filter, &curStamp.Hash) + matched, err := matchBlockFilter(ro, filter, &stamp.Hash) if err != nil { return err } if matched { relevantTxs, err = extractBlockMatches( - chain, ro, curStamp, filter, + chain, ro, stamp, filter, ) if err != nil { return err @@ -1090,13 +1090,14 @@ func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, } if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected(curStamp.Height, curHeader, + ro.ntfn.OnFilteredBlockConnected(stamp.Height, header, relevantTxs) } if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected(&curStamp.Hash, // nolint:staticcheck - curStamp.Height, curHeader.Timestamp) + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &stamp.Hash, stamp.Height, header.Timestamp, + ) } return nil From 47f84373b82d77c50edbcf5c6ab540ecb2c0103a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:28:03 +0200 Subject: [PATCH 4/8] rescan: refactor handleBlockDisconnected to be rescanState method --- rescan.go | 73 ++++++++++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/rescan.go b/rescan.go index 382d3d9f..1142ddd8 100644 --- a/rescan.go +++ b/rescan.go @@ -632,40 +632,6 @@ func (rs *rescanState) rescan() error { return nil } - // handleBlockDisconnected is a helper closure that handles a new block - // disconnected notification. - handleBlockDisconnected := func(ntfn *blockntfns.Disconnected) error { // nolint:unparam - blockDisconnected := ntfn.Header() - log.Debugf("Rescan got disconnected block %d (%s)", - ntfn.Height(), blockDisconnected.BlockHash()) - - // Only deal with it if it's the current block we know about. - // Otherwise, it's in the future. - if blockDisconnected.BlockHash() != rs.curStamp.Hash { - return nil - } - - // Run through notifications. This is all single-threaded. We - // include deprecated calls as they're still used, for now. - if ro.ntfn.OnFilteredBlockDisconnected != nil { - ro.ntfn.OnFilteredBlockDisconnected( - rs.curStamp.Height, &rs.curHeader, - ) - } - if ro.ntfn.OnBlockDisconnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockDisconnected( // nolint:staticcheck - &rs.curStamp.Hash, rs.curStamp.Height, - rs.curHeader.Timestamp, - ) - } - - rs.curHeader = ntfn.ChainTip() - rs.curStamp.Hash = rs.curHeader.BlockHash() - rs.curStamp.Height-- - - return nil - } - // We'll need to keep track of whether we are current with the chain in // order to properly recover from a re-org. We'll start by assuming that // we are not current in order to catch up from the starting point to @@ -780,11 +746,7 @@ rescanLoop: // as they are now considered stale. blockRetryQueue.remove(ntfn.Header()) - err := handleBlockDisconnected(ntfn) - if err != nil { - log.Errorf("Unable to process "+ - "%v: %v", ntfn, err) - } + rs.handleBlockDisconnected(ntfn) default: log.Warnf("Received unhandled block "+ @@ -1056,6 +1018,39 @@ func extractBlockMatches(chain ChainSource, ro *rescanOptions, return relevantTxs, nil } +// handleBlockDisconnected handles a new block disconnected notification. +func (rs *rescanState) handleBlockDisconnected(ntfn *blockntfns.Disconnected) { + ro := rs.opts + + blockDisconnected := ntfn.Header() + log.Debugf("Rescan got disconnected block %d (%s)", ntfn.Height(), + blockDisconnected.BlockHash()) + + // Only deal with it if it's the current block we know about. Otherwise, + // it's in the future. + if blockDisconnected.BlockHash() != rs.curStamp.Hash { + return + } + + // Run through notifications. This is all single-threaded. We include + // deprecated calls as they're still used, for now. + if ro.ntfn.OnFilteredBlockDisconnected != nil { + ro.ntfn.OnFilteredBlockDisconnected( + rs.curStamp.Height, &rs.curHeader, + ) + } + if ro.ntfn.OnBlockDisconnected != nil { // nolint:staticcheck + ro.ntfn.OnBlockDisconnected( // nolint:staticcheck + &rs.curStamp.Hash, rs.curStamp.Height, + rs.curHeader.Timestamp, + ) + } + + rs.curHeader = ntfn.ChainTip() + rs.curStamp.Hash = rs.curHeader.BlockHash() + rs.curStamp.Height-- +} + // notifyBlockWithFilter calls appropriate listeners based on the block filter. // This differs from notifyBlock in that is expects the caller to already have // obtained the target filter. From 97b74826c0643dd32edce08067f892d93f375de5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:31:05 +0200 Subject: [PATCH 5/8] rescan: refactor handleBlockConnected to be rescanState method --- rescan.go | 186 +++++++++++++++++++++++++++--------------------------- 1 file changed, 92 insertions(+), 94 deletions(-) diff --git a/rescan.go b/rescan.go index 1142ddd8..66aed969 100644 --- a/rescan.go +++ b/rescan.go @@ -540,98 +540,6 @@ func (rs *rescanState) rescan() error { blockRetryQueue = newBlockRetryQueue() ) - // handleBlockConnected is a closure that handles a new block connected - // notification. - // - // TODO(wilmer): refactor this and handleBlockDisconnected into their - // own methods. - handleBlockConnected := func(ntfn *blockntfns.Connected) error { - // If we've somehow missed a header in the range, then we'll - // mark ourselves as not current so we can walk down the chain - // and notify the callers of blocks we may have missed. - header := ntfn.Header() - if header.PrevBlock != rs.curStamp.Hash { - return fmt.Errorf("out of order block %v: expected "+ - "PrevBlock %v, got %v", header.BlockHash(), - rs.curStamp.Hash, header.PrevBlock) - } - - // Ensure the filter header still exists before attempting to - // fetch the filter. This should usually succeed since - // notifications are delivered once filter headers are synced. - nextBlockHeight := uint32(rs.curStamp.Height + 1) - _, err := chain.GetFilterHeaderByHeight(nextBlockHeight) - if err != nil { - return fmt.Errorf("unable to get filter header for "+ - "new block with height %v: %v", nextBlockHeight, - err) - } - - newStamp := headerfs.BlockStamp{ - Hash: header.BlockHash(), - Height: int32(nextBlockHeight), - Timestamp: header.Timestamp, - } - - log.Tracef("Rescan got block %d (%s)", newStamp.Height, - newStamp.Hash) - - // We're only scanning if the header is beyond the horizon of - // our start time. - if !rs.scanning { - rs.scanning = ro.startTime.Before(header.Timestamp) - } - - // If we're not scanning or our watch list is empty, then we can - // just notify the block without fetching any filters/blocks. - if !rs.scanning || len(ro.watchList) == 0 { - if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected( - newStamp.Height, &header, nil, - ) - } - if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected( // nolint:staticcheck - &newStamp.Hash, newStamp.Height, - header.Timestamp, - ) - } - - rs.curHeader = header - rs.curStamp = newStamp - - return nil - } - - // Otherwise, we'll attempt to fetch the filter to retrieve the - // relevant transactions and notify them. - queryOptions := NumRetries(0) - blockFilter, err := chain.GetCFilter( - newStamp.Hash, wire.GCSFilterRegular, queryOptions, - ) - if err != nil { - // If the query failed, then this either means that we - // don't have any peers to fetch this filter from, or - // the peer(s) that we're trying to fetch from are in - // the progress of a re-org. - log.Errorf("unable to get filter for hash=%v, "+ - "retrying: %v", rs.curStamp.Hash, err) - return errRetryBlock - } - - err = rs.notifyBlockWithFilter(&header, &newStamp, blockFilter) - if err != nil { - return err - } - - // With the block successfully notified, we'll advance our state - // to it. - rs.curHeader = header - rs.curStamp = newStamp - - return nil - } - // We'll need to keep track of whether we are current with the chain in // order to properly recover from a re-org. We'll start by assuming that // we are not current in order to catch up from the starting point to @@ -710,7 +618,7 @@ rescanLoop: continue rescanLoop } - err := handleBlockConnected(ntfn) + err := rs.handleBlockConnected(ntfn) switch err { case nil: @@ -767,7 +675,9 @@ rescanLoop: continue rescanLoop } - err := handleBlockConnected(retryBlock) + err := rs.handleBlockConnected( + retryBlock, + ) switch err { // We successfully notified the block // this time, so we can remove it from @@ -938,6 +848,94 @@ func (rs *rescanState) notifyBlock() error { return nil } +// handleBlockConnected handles a new block connected notification. +func (rs *rescanState) handleBlockConnected(ntfn *blockntfns.Connected) error { + chain := rs.chain + ro := rs.opts + + // If we've somehow missed a header in the range, then we'll mark + // ourselves as not current so we can walk down the chain and notify the + // callers of blocks we may have missed. + header := ntfn.Header() + if header.PrevBlock != rs.curStamp.Hash { + return fmt.Errorf("out of order block %v: expected PrevBlock "+ + "%v, got %v", header.BlockHash(), rs.curStamp.Hash, + header.PrevBlock) + } + + // Ensure the filter header still exists before attempting to fetch the + // filter. This should usually succeed since notifications are delivered + // once filter headers are synced. + nextBlockHeight := uint32(rs.curStamp.Height + 1) + _, err := chain.GetFilterHeaderByHeight(nextBlockHeight) + if err != nil { + return fmt.Errorf("unable to get filter header for new block "+ + "with height %v: %v", nextBlockHeight, err) + } + + newStamp := headerfs.BlockStamp{ + Hash: header.BlockHash(), + Height: int32(nextBlockHeight), + Timestamp: header.Timestamp, + } + + log.Tracef("Rescan got block %d (%s)", newStamp.Height, newStamp.Hash) + + // We're only scanning if the header is beyond the horizon of + // our start time. + if !rs.scanning { + rs.scanning = ro.startTime.Before(header.Timestamp) + } + + // If we're not scanning or our watch list is empty, then we can just + // notify the block without fetching any filters/blocks. + if !rs.scanning || len(ro.watchList) == 0 { + if ro.ntfn.OnFilteredBlockConnected != nil { + ro.ntfn.OnFilteredBlockConnected( + newStamp.Height, &header, nil, + ) + } + if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &newStamp.Hash, newStamp.Height, + header.Timestamp, + ) + } + + rs.curHeader = header + rs.curStamp = newStamp + + return nil + } + + // Otherwise, we'll attempt to fetch the filter to retrieve the relevant + // transactions and notify them. + queryOptions := NumRetries(0) + blockFilter, err := chain.GetCFilter( + newStamp.Hash, wire.GCSFilterRegular, queryOptions, + ) + if err != nil { + // If the query failed, then this either means that we don't + // have any peers to fetch this filter from, or the peer(s) that + // we're trying to fetch from are in the progress of a re-org. + log.Errorf("unable to get filter for hash=%v, retrying: %v", + rs.curStamp.Hash, err) + + return errRetryBlock + } + + err = rs.notifyBlockWithFilter(&header, &newStamp, blockFilter) + if err != nil { + return err + } + + // With the block successfully notified, we'll advance our state to it. + rs.curHeader = header + rs.curStamp = newStamp + + return nil +} + // extractBlockMatches fetches the target block from the network, and filters // out any relevant transactions found within the block. func extractBlockMatches(chain ChainSource, ro *rescanOptions, From e1a71d43c0c69880ec5475f7dbfa2a8512912212 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:47:53 +0200 Subject: [PATCH 6/8] rescan: add a waitForBlocks helper method to rescanState Add a new `waitForBlocks` helper method to rescanState that will wait on block notifications until a given predicate returns true. Currently this method is only called with one predicate but an upcoming commit will make use of it using a differnet predicate. --- rescan.go | 148 +++++++++++++++++++++++++++++------------------------- 1 file changed, 80 insertions(+), 68 deletions(-) diff --git a/rescan.go b/rescan.go index 66aed969..185c5f96 100644 --- a/rescan.go +++ b/rescan.go @@ -434,79 +434,14 @@ func (rs *rescanState) rescan() error { chain := rs.chain ro := rs.opts - // Now that we've determined the starting point of our rescan, we can - // begin processing updates from the client. - var updates []*updateOptions - // We'll need to ensure that the backing chain has actually caught up to // the rescan's starting height. - bestBlock, err := chain.BestBlock() - if err != nil { + if err := rs.waitForBlocks(func(_ chainhash.Hash, height uint32) bool { + return height >= uint32(rs.curStamp.Height) + }); err != nil { return err } - // If it hasn't, we'll subscribe for block notifications at tip and wait - // until we receive a notification for a block with the rescan's - // starting height. - if bestBlock.Height < rs.curStamp.Height { - log.Debugf("Waiting to catch up to the rescan start height=%d "+ - "from height=%d", rs.curStamp.Height, bestBlock.Height) - - blockSubscription, err := chain.Subscribe( - uint32(bestBlock.Height), - ) - if err != nil { - return err - } - - waitUntilSynced: - for { - select { - // We'll make sure to process any updates while we're - // syncing to prevent blocking the client. - case update := <-ro.update: - updates = append(updates, update) - - // A new block notification for the tip of the chain has - // arrived. We'll determine we've caught up to the - // rescan's starting height by receiving a block - // connected notification for the same height. - case ntfn, ok := <-blockSubscription.Notifications: - if !ok { - return errors.New("rescan block " + - "subscription was canceled " + - "while waiting to catch up") - } - - if _, ok := ntfn.(*blockntfns.Connected); !ok { - continue - } - if ntfn.Height() < uint32(rs.curStamp.Height) { - continue - } - - break waitUntilSynced - - case <-ro.quit: - blockSubscription.Cancel() - return ErrRescanExit - } - } - - blockSubscription.Cancel() - - // If any updates were queued while waiting to catch up to the - // start height of the rescan, apply them now. - for _, upd := range updates { - _, err := ro.updateFilter( - chain, upd, &rs.curStamp, &rs.curHeader, - ) - if err != nil { - return err - } - } - } - log.Debugf("Starting rescan from known block %d (%s)", rs.curStamp.Height, rs.curStamp.Hash) @@ -803,6 +738,83 @@ rescanLoop: } } +// waitForBlocks is a helper function that can be used to wait on block +// notifications until the given predicate returns true. +func (rs *rescanState) waitForBlocks(predicate func(hash chainhash.Hash, + height uint32) bool) error { + + chain := rs.chain + ro := rs.opts + + bestBlock, err := chain.BestBlock() + if err != nil { + return err + } + + // Before subscribing to block notifications, first check if the + // predicate is not already satisfied by the current best block. + if predicate(bestBlock.Hash, uint32(bestBlock.Height)) { + return nil + } + + log.Debugf("Waiting to catch up to the rescan start height=%d "+ + "from height=%d", rs.curStamp.Height, bestBlock.Height) + + blockSubscription, err := chain.Subscribe(uint32(bestBlock.Height)) + if err != nil { + return err + } + defer blockSubscription.Cancel() + + var updates []*updateOptions + +waitUntilSynced: + for { + select { + // We'll make sure to process any updates while we're syncing to + // prevent blocking the client. + case update := <-ro.update: + updates = append(updates, update) + + // A new block notification for the tip of the chain has + // arrived. We'll determine we've caught up to the rescan's + // starting height by receiving a block connected notification + // for the same height. + case ntfn, ok := <-blockSubscription.Notifications: + if !ok { + return errors.New("rescan block subscription " + + "was canceled while waiting to catch " + + "up") + } + cNtfn, ok := ntfn.(*blockntfns.Connected) + if !ok { + continue + } + + header := cNtfn.Header() + if predicate(header.BlockHash(), cNtfn.Height()) { + break waitUntilSynced + } + + case <-ro.quit: + return ErrRescanExit + } + + // If any updates were queued while waiting to catch up to the + // start height of the rescan, apply them now. + for _, upd := range updates { + _, err := ro.updateFilter( + chain, upd, &rs.curStamp, &rs.curHeader, + ) + if err != nil { + return err + } + } + } + + return nil +} + // notifyBlock calls appropriate listeners based on the block filter. func (rs *rescanState) notifyBlock() error { chain := rs.chain From a5e64528146757ea2190a95f6e8a369134d83f92 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 31 May 2023 16:53:03 +0200 Subject: [PATCH 7/8] rescan: add IsCurrent to the ChainSource interface This is required for the follow up commit where access to the IsCurrent method is required in the rescanState.rescan method which only has access to the ChainSource interface. --- rescan.go | 4 ++++ rescan_test.go | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/rescan.go b/rescan.go index 185c5f96..ebf32f07 100644 --- a/rescan.go +++ b/rescan.go @@ -74,6 +74,10 @@ type ChainSource interface { // // TODO(wilmer): extend with best hash as well. Subscribe(bestHeight uint32) (*blockntfns.Subscription, error) + + // IsCurrent returns true if the backend chain thinks that its view of + // the network is current. + IsCurrent() bool } // ScanProgressHandler is used in rescanOptions to update the caller with the diff --git a/rescan_test.go b/rescan_test.go index e611eb90..8720c33c 100644 --- a/rescan_test.go +++ b/rescan_test.go @@ -272,6 +272,12 @@ func (c *mockChainSource) setFailGetFilter(b bool) { c.failGetFilter = b } +// IsCurrent returns true if the backend chain thinks that its view of +// the network is current. +func (c *mockChainSource) IsCurrent() bool { + return true +} + // GetCFilter returns the filter of the given type for the block with the given // hash. func (c *mockChainSource) GetCFilter(hash chainhash.Hash, From b38fa7f50176dca35b7fabbb475eceae7d6283f6 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 14:57:07 +0200 Subject: [PATCH 8/8] rescan: wait till current or end height reached Let the rescan function wait until the filter headers have either caught up to the back end chain or until they have caught up to the specified rescan end block. This lets the rescan operation take advantage of doing batch filter fetching during rescan making the operation a lot faster since filters can be fetched in batches of 1000 instead of one at a time. --- rescan.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rescan.go b/rescan.go index ebf32f07..d04a4a80 100644 --- a/rescan.go +++ b/rescan.go @@ -446,6 +446,36 @@ func (rs *rescanState) rescan() error { return err } + // To ensure that we batch as many filter queries as possible, we also + // wait for the header chain to either be current or for it to at least + // have caught up with the specified end block. + log.Debugf("Waiting for the chain source to be current or for the " + + "rescan end height to be reached.") + + if err := rs.waitForBlocks(func(hash chainhash.Hash, + height uint32) bool { + + // If the header chain is current, then there is no need to + // wait. + if chain.IsCurrent() { + return true + } + + // If an end height was specified then we wait until the + // notification corresponding to that block height. + if ro.endBlock.Height > 0 && + height >= uint32(ro.endBlock.Height) { + + return true + } + + // If a block hash was specified, check if the notification is + // for that block. + return hash == ro.endBlock.Hash + }); err != nil { + return err + } + log.Debugf("Starting rescan from known block %d (%s)", rs.curStamp.Height, rs.curStamp.Hash)