diff --git a/rescan.go b/rescan.go index 9d7d9831..d04a4a80 100644 --- a/rescan.go +++ b/rescan.go @@ -74,6 +74,10 @@ type ChainSource interface { // // TODO(wilmer): extend with best hash as well. Subscribe(bestHeight uint32) (*blockntfns.Subscription, error) + + // IsCurrent returns true if the backend chain thinks that its view of + // the network is current. + IsCurrent() bool } // ScanProgressHandler is used in rescanOptions to update the caller with the @@ -289,9 +293,29 @@ func updateChan(update <-chan *updateOptions) RescanOption { } } -// rescan is a single-threaded function that uses headers from the database and -// functional options as arguments. -func rescan(chain ChainSource, options ...RescanOption) error { +// rescanState hold the state used throughout a rescan. +type rescanState struct { + // chain is the backend chain that the rescan has access to. + chain ChainSource + + // opts holds the various rescan configuration options. + opts *rescanOptions + + // curHeader is the block header of our current position in the chain. + curHeader wire.BlockHeader + + // curStamp is the block stamp of our current position in the chain. + curStamp headerfs.BlockStamp + + // scanning is true if the current block should be scanned for filter + // matches. + scanning bool +} + +// newRescanState constructs a new rescanState. +func newRescanState(chain ChainSource, options ...RescanOption) (*rescanState, + error) { + // First, we'll apply the set of default options, then serially apply // all the options that've been passed in. ro := defaultRescanOptions() @@ -303,12 +327,17 @@ func rescan(chain ChainSource, options ...RescanOption) error { option(ro) } + rs := &rescanState{ + chain: chain, + opts: ro, + } + // If we have something to watch, create a watch list. The watch list // can be composed of a set of scripts, outpoints, and txids. for _, addr := range ro.watchAddrs { script, err := txscript.PayToAddrScript(addr) if err != nil { - return err + return nil, err } ro.watchList = append(ro.watchList, script) @@ -354,132 +383,101 @@ func rescan(chain ChainSource, options ...RescanOption) error { // If we don't have a quit channel, and the end height is still // unspecified, then we'll exit out here. if ro.quit == nil && ro.endBlock.Height == 0 { - return fmt.Errorf("rescan request must specify a quit channel" + - " or valid end block") + return nil, fmt.Errorf("rescan request must specify a quit " + + "channel or valid end block") } - // Track our position in the chain. - var ( - curHeader wire.BlockHeader - curStamp headerfs.BlockStamp - ) - // If no start block is specified, start the scan from our current best // block. if ro.startBlock == nil { bs, err := chain.BestBlock() if err != nil { - return err + return nil, err } ro.startBlock = bs } - curStamp = *ro.startBlock + rs.curStamp = *ro.startBlock // To find our starting block, either the start hash should be set, or // the start height should be set. If neither is, then we'll be // starting from the genesis block. - if (curStamp.Hash != chainhash.Hash{}) { - header, height, err := chain.GetBlockHeader(&curStamp.Hash) + if (rs.curStamp.Hash != chainhash.Hash{}) { + header, height, err := chain.GetBlockHeader(&rs.curStamp.Hash) if err == nil { - curHeader = *header - curStamp.Height = int32(height) + rs.curHeader = *header + rs.curStamp.Height = int32(height) } else { - curStamp.Hash = chainhash.Hash{} + rs.curStamp.Hash = chainhash.Hash{} } } - if (curStamp.Hash == chainhash.Hash{}) { - if curStamp.Height == 0 { - curStamp.Hash = *chain.ChainParams().GenesisHash + if (rs.curStamp.Hash == chainhash.Hash{}) { + if rs.curStamp.Height == 0 { + rs.curStamp.Hash = *chain.ChainParams().GenesisHash } else { header, err := chain.GetBlockHeaderByHeight( - uint32(curStamp.Height), + uint32(rs.curStamp.Height), ) if err == nil { - curHeader = *header - curStamp.Hash = curHeader.BlockHash() + rs.curHeader = *header + rs.curStamp.Hash = rs.curHeader.BlockHash() } else { - curHeader = chain.ChainParams().GenesisBlock.Header - curStamp.Hash = *chain.ChainParams().GenesisHash - curStamp.Height = 0 + chainParams := chain.ChainParams() + rs.curHeader = chainParams.GenesisBlock.Header + rs.curStamp.Hash = *chainParams.GenesisHash + rs.curStamp.Height = 0 } } } - // Now that we've determined the starting point of our rescan, we can - // begin processing updates from the client. - var updates []*updateOptions + return rs, nil +} + +// rescan is a single-threaded function that uses headers from the database and +// functional options as arguments. +func (rs *rescanState) rescan() error { + chain := rs.chain + ro := rs.opts // We'll need to ensure that the backing chain has actually caught up to // the rescan's starting height. - bestBlock, err := chain.BestBlock() - if err != nil { + if err := rs.waitForBlocks(func(_ chainhash.Hash, height uint32) bool { + return height >= uint32(rs.curStamp.Height) + }); err != nil { return err } - // If it hasn't, we'll subscribe for block notifications at tip and wait - // until we receive a notification for a block with the rescan's - // starting height. - if bestBlock.Height < curStamp.Height { - log.Debugf("Waiting to catch up to the rescan start height=%d "+ - "from height=%d", curStamp.Height, bestBlock.Height) - - blockSubscription, err := chain.Subscribe( - uint32(bestBlock.Height), - ) - if err != nil { - return err - } - - waitUntilSynced: - for { - select { - // We'll make sure to process any updates while we're - // syncing to prevent blocking the client. - case update := <-ro.update: - updates = append(updates, update) - - // A new block notification for the tip of the chain has - // arrived. We'll determine we've caught up to the - // rescan's starting height by receiving a block - // connected notification for the same height. - case ntfn, ok := <-blockSubscription.Notifications: - if !ok { - return errors.New("rescan block " + - "subscription was canceled " + - "while waiting to catch up") - } - - if _, ok := ntfn.(*blockntfns.Connected); !ok { - continue - } - if ntfn.Height() < uint32(curStamp.Height) { - continue - } + // To ensure that we batch as many filter queries as possible, we also + // wait for the header chain to either be current or for it to at least + // have caught up with the specified end block. + log.Debugf("Waiting for the chain source to be current or for the " + + "rescan end height to be reached.") - break waitUntilSynced + if err := rs.waitForBlocks(func(hash chainhash.Hash, + height uint32) bool { - case <-ro.quit: - blockSubscription.Cancel() - return ErrRescanExit - } + // If the header chain is current, then there is no need to + // wait. + if chain.IsCurrent() { + return true } - blockSubscription.Cancel() + // If an end height was specified then we wait until the + // notification corresponding to that block height. + if ro.endBlock.Height > 0 && + height >= uint32(ro.endBlock.Height) { - // If any updates were queued while waiting to catch up to the - // start height of the rescan, apply them now. - for _, upd := range updates { - _, err := ro.updateFilter( - chain, upd, &curStamp, &curHeader, - ) - if err != nil { - return err - } + return true } + + // If a block hash was specified, check if the notification is + // for that block. + return hash == ro.endBlock.Hash + }); err != nil { + return err } - log.Debugf("Starting rescan from known block %d (%s)", curStamp.Height, - curStamp.Hash) + log.Debugf("Starting rescan from known block %d (%s)", + rs.curStamp.Height, rs.curStamp.Hash) // Compare the start time to the start block. If the start time is // later, cycle through blocks until we find a block timestamp later @@ -487,7 +485,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { // time is non-monotonic between blocks, we look for the first block to // trip the switch, and download filters from there, rather than // checking timestamps at each block. - scanning := ro.startTime.Before(curHeader.Timestamp) + rs.scanning = ro.startTime.Before(rs.curHeader.Timestamp) // Even though we'll have multiple subscriptions, they'll always be // referred to by the same variable, so we only need to defer its @@ -511,134 +509,6 @@ func rescan(chain ChainSource, options ...RescanOption) error { blockRetryQueue = newBlockRetryQueue() ) - // handleBlockConnected is a closure that handles a new block connected - // notification. - // - // TODO(wilmer): refactor this and handleBlockDisconnected into their - // own methods. - handleBlockConnected := func(ntfn *blockntfns.Connected) error { - // If we've somehow missed a header in the range, then we'll - // mark ourselves as not current so we can walk down the chain - // and notify the callers of blocks we may have missed. - header := ntfn.Header() - if header.PrevBlock != curStamp.Hash { - return fmt.Errorf("out of order block %v: expected "+ - "PrevBlock %v, got %v", header.BlockHash(), - curStamp.Hash, header.PrevBlock) - } - - // Ensure the filter header still exists before attempting to - // fetch the filter. This should usually succeed since - // notifications are delivered once filter headers are synced. - nextBlockHeight := uint32(curStamp.Height + 1) - _, err := chain.GetFilterHeaderByHeight(nextBlockHeight) - if err != nil { - return fmt.Errorf("unable to get filter header for "+ - "new block with height %v: %v", nextBlockHeight, - err) - } - - newStamp := headerfs.BlockStamp{ - Hash: header.BlockHash(), - Height: int32(nextBlockHeight), - Timestamp: header.Timestamp, - } - - log.Tracef("Rescan got block %d (%s)", newStamp.Height, - newStamp.Hash) - - // We're only scanning if the header is beyond the horizon of - // our start time. - if !scanning { - scanning = ro.startTime.Before(header.Timestamp) - } - - // If we're not scanning or our watch list is empty, then we can - // just notify the block without fetching any filters/blocks. - if !scanning || len(ro.watchList) == 0 { - if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected( - newStamp.Height, &header, nil, - ) - } - if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected( // nolint:staticcheck - &newStamp.Hash, newStamp.Height, - header.Timestamp, - ) - } - - curHeader = header - curStamp = newStamp - - return nil - } - - // Otherwise, we'll attempt to fetch the filter to retrieve the - // relevant transactions and notify them. - queryOptions := NumRetries(0) - blockFilter, err := chain.GetCFilter( - newStamp.Hash, wire.GCSFilterRegular, queryOptions, - ) - if err != nil { - // If the query failed, then this either means that we - // don't have any peers to fetch this filter from, or - // the peer(s) that we're trying to fetch from are in - // the progress of a re-org. - log.Errorf("unable to get filter for hash=%v, "+ - "retrying: %v", curStamp.Hash, err) - return errRetryBlock - } - - err = notifyBlockWithFilter( - chain, ro, &header, &newStamp, blockFilter, - ) - if err != nil { - return err - } - - // With the block successfully notified, we'll advance our state - // to it. - curHeader = header - curStamp = newStamp - - return nil - } - - // handleBlockDisconnected is a helper closure that handles a new block - // disconnected notification. - handleBlockDisconnected := func(ntfn *blockntfns.Disconnected) error { // nolint:unparam - blockDisconnected := ntfn.Header() - log.Debugf("Rescan got disconnected block %d (%s)", - ntfn.Height(), blockDisconnected.BlockHash()) - - // Only deal with it if it's the current block we know about. - // Otherwise, it's in the future. - if blockDisconnected.BlockHash() != curStamp.Hash { - return nil - } - - // Run through notifications. This is all single-threaded. We - // include deprecated calls as they're still used, for now. - if ro.ntfn.OnFilteredBlockDisconnected != nil { - ro.ntfn.OnFilteredBlockDisconnected( - curStamp.Height, &curHeader, - ) - } - if ro.ntfn.OnBlockDisconnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockDisconnected( // nolint:staticcheck - &curStamp.Hash, curStamp.Height, - curHeader.Timestamp, - ) - } - - curHeader = ntfn.ChainTip() - curStamp.Hash = curHeader.BlockHash() - curStamp.Height-- - - return nil - } - // We'll need to keep track of whether we are current with the chain in // order to properly recover from a re-org. We'll start by assuming that // we are not current in order to catch up from the starting point to @@ -651,9 +521,9 @@ rescanLoop: for { // If we've reached the ending height or hash for this rescan, // then we'll exit. - if curStamp.Hash == ro.endBlock.Hash || + if rs.curStamp.Hash == ro.endBlock.Hash || (ro.endBlock.Height > 0 && - curStamp.Height == ro.endBlock.Height) { + rs.curStamp.Height == ro.endBlock.Height) { return nil } @@ -677,7 +547,8 @@ rescanLoop: // requested client. case update := <-ro.update: rewound, err := ro.updateFilter( - chain, update, &curStamp, &curHeader, + chain, update, &rs.curStamp, + &rs.curHeader, ) if err != nil { return err @@ -691,7 +562,8 @@ rescanLoop: if rewound { log.Tracef("Rewound to block %d (%s), "+ "no longer current", - curStamp.Height, curStamp.Hash) + rs.curStamp.Height, + rs.curStamp.Hash) current = false blockSubscription.Cancel() @@ -715,7 +587,7 @@ rescanLoop: continue rescanLoop } - err := handleBlockConnected(ntfn) + err := rs.handleBlockConnected(ntfn) switch err { case nil: @@ -751,11 +623,7 @@ rescanLoop: // as they are now considered stale. blockRetryQueue.remove(ntfn.Header()) - err := handleBlockDisconnected(ntfn) - if err != nil { - log.Errorf("Unable to process "+ - "%v: %v", ntfn, err) - } + rs.handleBlockDisconnected(ntfn) default: log.Warnf("Received unhandled block "+ @@ -776,7 +644,9 @@ rescanLoop: continue rescanLoop } - err := handleBlockConnected(retryBlock) + err := rs.handleBlockConnected( + retryBlock, + ) switch err { // We successfully notified the block // this time, so we can remove it from @@ -825,8 +695,8 @@ rescanLoop: select { case update := <-ro.update: _, err := ro.updateFilter( - chain, update, &curStamp, - &curHeader, + chain, update, &rs.curStamp, + &rs.curHeader, ) if err != nil { return err @@ -846,7 +716,7 @@ rescanLoop: // the block. If the next height is above the best // height known to the chain service, then we mark // ourselves as current and follow notifications. - nextHeight := curStamp.Height + 1 + nextHeight := rs.curStamp.Height + 1 if nextHeight > bestBlock.Height { // Ensure we cancel the old subscription if // we're going back to scan for missed blocks. @@ -856,7 +726,7 @@ rescanLoop: // Subscribe to block notifications. blockSubscription, err = chain.Subscribe( - uint32(curStamp.Height), + uint32(rs.curStamp.Height), ) if err != nil { return fmt.Errorf("unable to register "+ @@ -865,7 +735,7 @@ rescanLoop: log.Debugf("Rescan became current at %d (%s), "+ "subscribing to block notifications", - curStamp.Height, curStamp.Hash) + rs.curStamp.Height, rs.curStamp.Hash) current = true blockRetryQueue.clear() @@ -884,35 +754,115 @@ rescanLoop: return err } - curHeader = *header - curStamp.Height++ - curStamp.Hash = header.BlockHash() + rs.curHeader = *header + rs.curStamp.Height++ + rs.curStamp.Hash = header.BlockHash() + + if !rs.scanning { + rs.scanning = ro.startTime.Before( + rs.curHeader.Timestamp, + ) + } - if !scanning { - scanning = ro.startTime.Before(curHeader.Timestamp) + err = rs.notifyBlock() + if err != nil { + return err } - err = notifyBlock(chain, ro, curHeader, curStamp, scanning) + } + } +} + +// waitForBlocks is a helper function that can be used to wait on block +// notifications until the given predicate returns true. +func (rs *rescanState) waitForBlocks(predicate func(hash chainhash.Hash, + height uint32) bool) error { + + chain := rs.chain + ro := rs.opts + + bestBlock, err := chain.BestBlock() + if err != nil { + return err + } + + // Before subscribing to block notifications, first check if the + // predicate is not already satisfied by the current best block. + if predicate(bestBlock.Hash, uint32(bestBlock.Height)) { + return nil + } + + log.Debugf("Waiting to catch up to the rescan start height=%d "+ + "from height=%d", rs.curStamp.Height, bestBlock.Height) + + blockSubscription, err := chain.Subscribe(uint32(bestBlock.Height)) + if err != nil { + return err + } + defer blockSubscription.Cancel() + + var updates []*updateOptions + +waitUntilSynced: + for { + select { + // We'll make sure to process any updates while we're syncing to + // prevent blocking the client. + case update := <-ro.update: + updates = append(updates, update) + + // A new block notification for the tip of the chain has + // arrived. We'll determine we've caught up to the rescan's + // starting height by receiving a block connected notification + // for the same height. + case ntfn, ok := <-blockSubscription.Notifications: + if !ok { + return errors.New("rescan block subscription " + + "was canceled while waiting to catch " + + "up") + } + cNtfn, ok := ntfn.(*blockntfns.Connected) + if !ok { + continue + } + + header := cNtfn.Header() + if predicate(header.BlockHash(), cNtfn.Height()) { + break waitUntilSynced + } + + case <-ro.quit: + return ErrRescanExit + } + + // If any updates were queued while waiting to catch up to the + // start height of the rescan, apply them now. + for _, upd := range updates { + _, err := ro.updateFilter( + chain, upd, &rs.curStamp, &rs.curHeader, + ) if err != nil { return err } } } + + return nil } // notifyBlock calls appropriate listeners based on the block filter. -func notifyBlock(chain ChainSource, ro *rescanOptions, - curHeader wire.BlockHeader, curStamp headerfs.BlockStamp, - scanning bool) error { +func (rs *rescanState) notifyBlock() error { + chain := rs.chain + ro := rs.opts // Find relevant transactions based on watch list. If scanning is // false, we can safely assume this block has no relevant transactions. var relevantTxs []*btcutil.Tx - if len(ro.watchList) != 0 && scanning { + if len(ro.watchList) != 0 && rs.scanning { // If we have a non-empty watch list, then we need to see if it // matches the rescan's filters, so we get the basic filter // from the DB or network. matched, filter, err := blockFilterMatches( - chain, ro, &curStamp.Hash, + chain, ro, &rs.curStamp.Hash, ) if err != nil { return err @@ -920,7 +870,7 @@ func notifyBlock(chain ChainSource, ro *rescanOptions, if matched { relevantTxs, err = extractBlockMatches( - chain, ro, &curStamp, filter, + chain, ro, &rs.curStamp, filter, ) if err != nil { return err @@ -929,18 +879,109 @@ func notifyBlock(chain ChainSource, ro *rescanOptions, } if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected(curStamp.Height, &curHeader, - relevantTxs) + ro.ntfn.OnFilteredBlockConnected( + rs.curStamp.Height, &rs.curHeader, relevantTxs, + ) } if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected(&curStamp.Hash, // nolint:staticcheck - curStamp.Height, curHeader.Timestamp) + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &rs.curStamp.Hash, rs.curStamp.Height, + rs.curHeader.Timestamp, + ) } return nil } +// handleBlockConnected handles a new block connected notification. +func (rs *rescanState) handleBlockConnected(ntfn *blockntfns.Connected) error { + chain := rs.chain + ro := rs.opts + + // If we've somehow missed a header in the range, then we'll mark + // ourselves as not current so we can walk down the chain and notify the + // callers of blocks we may have missed. + header := ntfn.Header() + if header.PrevBlock != rs.curStamp.Hash { + return fmt.Errorf("out of order block %v: expected PrevBlock "+ + "%v, got %v", header.BlockHash(), rs.curStamp.Hash, + header.PrevBlock) + } + + // Ensure the filter header still exists before attempting to fetch the + // filter. This should usually succeed since notifications are delivered + // once filter headers are synced. + nextBlockHeight := uint32(rs.curStamp.Height + 1) + _, err := chain.GetFilterHeaderByHeight(nextBlockHeight) + if err != nil { + return fmt.Errorf("unable to get filter header for new block "+ + "with height %v: %v", nextBlockHeight, err) + } + + newStamp := headerfs.BlockStamp{ + Hash: header.BlockHash(), + Height: int32(nextBlockHeight), + Timestamp: header.Timestamp, + } + + log.Tracef("Rescan got block %d (%s)", newStamp.Height, newStamp.Hash) + + // We're only scanning if the header is beyond the horizon of + // our start time. + if !rs.scanning { + rs.scanning = ro.startTime.Before(header.Timestamp) + } + + // If we're not scanning or our watch list is empty, then we can just + // notify the block without fetching any filters/blocks. + if !rs.scanning || len(ro.watchList) == 0 { + if ro.ntfn.OnFilteredBlockConnected != nil { + ro.ntfn.OnFilteredBlockConnected( + newStamp.Height, &header, nil, + ) + } + if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &newStamp.Hash, newStamp.Height, + header.Timestamp, + ) + } + + rs.curHeader = header + rs.curStamp = newStamp + + return nil + } + + // Otherwise, we'll attempt to fetch the filter to retrieve the relevant + // transactions and notify them. + queryOptions := NumRetries(0) + blockFilter, err := chain.GetCFilter( + newStamp.Hash, wire.GCSFilterRegular, queryOptions, + ) + if err != nil { + // If the query failed, then this either means that we don't + // have any peers to fetch this filter from, or the peer(s) that + // we're trying to fetch from are in the progress of a re-org. + log.Errorf("unable to get filter for hash=%v, retrying: %v", + rs.curStamp.Hash, err) + + return errRetryBlock + } + + err = rs.notifyBlockWithFilter(&header, &newStamp, blockFilter) + if err != nil { + return err + } + + // With the block successfully notified, we'll advance our state to it. + rs.curHeader = header + rs.curStamp = newStamp + + return nil +} + // extractBlockMatches fetches the target block from the network, and filters // out any relevant transactions found within the block. func extractBlockMatches(chain ChainSource, ro *rescanOptions, @@ -1021,12 +1062,47 @@ func extractBlockMatches(chain ChainSource, ro *rescanOptions, return relevantTxs, nil } +// handleBlockDisconnected handles a new block disconnected notification. +func (rs *rescanState) handleBlockDisconnected(ntfn *blockntfns.Disconnected) { + ro := rs.opts + + blockDisconnected := ntfn.Header() + log.Debugf("Rescan got disconnected block %d (%s)", ntfn.Height(), + blockDisconnected.BlockHash()) + + // Only deal with it if it's the current block we know about. Otherwise, + // it's in the future. + if blockDisconnected.BlockHash() != rs.curStamp.Hash { + return + } + + // Run through notifications. This is all single-threaded. We include + // deprecated calls as they're still used, for now. + if ro.ntfn.OnFilteredBlockDisconnected != nil { + ro.ntfn.OnFilteredBlockDisconnected( + rs.curStamp.Height, &rs.curHeader, + ) + } + if ro.ntfn.OnBlockDisconnected != nil { // nolint:staticcheck + ro.ntfn.OnBlockDisconnected( // nolint:staticcheck + &rs.curStamp.Hash, rs.curStamp.Height, + rs.curHeader.Timestamp, + ) + } + + rs.curHeader = ntfn.ChainTip() + rs.curStamp.Hash = rs.curHeader.BlockHash() + rs.curStamp.Height-- +} + // notifyBlockWithFilter calls appropriate listeners based on the block filter. // This differs from notifyBlock in that is expects the caller to already have // obtained the target filter. -func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, - curHeader *wire.BlockHeader, curStamp *headerfs.BlockStamp, - filter *gcs.Filter) error { +func (rs *rescanState) notifyBlockWithFilter(header *wire.BlockHeader, + stamp *headerfs.BlockStamp, filter *gcs.Filter) error { + + chain := rs.chain + ro := rs.opts // Based on what we find within the block or the filter, we'll be // sending out a set of notifications with transactions that are @@ -1037,14 +1113,14 @@ func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, // match the items within the filter to ensure we create any relevant // notifications. if filter != nil { - matched, err := matchBlockFilter(ro, filter, &curStamp.Hash) + matched, err := matchBlockFilter(ro, filter, &stamp.Hash) if err != nil { return err } if matched { relevantTxs, err = extractBlockMatches( - chain, ro, curStamp, filter, + chain, ro, stamp, filter, ) if err != nil { return err @@ -1053,13 +1129,14 @@ func notifyBlockWithFilter(chain ChainSource, ro *rescanOptions, } if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected(curStamp.Height, curHeader, + ro.ntfn.OnFilteredBlockConnected(stamp.Height, header, relevantTxs) } if ro.ntfn.OnBlockConnected != nil { // nolint:staticcheck - ro.ntfn.OnBlockConnected(&curStamp.Hash, // nolint:staticcheck - curStamp.Height, curHeader.Timestamp) + ro.ntfn.OnBlockConnected( // nolint:staticcheck + &stamp.Hash, stamp.Height, header.Timestamp, + ) } return nil @@ -1323,7 +1400,10 @@ func (r *Rescan) Start() <-chan error { defer r.wg.Done() rescanArgs := append(r.options, updateChan(r.updateChan)) // nolint - err := rescan(r.chain, rescanArgs...) + rs, err := newRescanState(r.chain, rescanArgs...) + if err == nil { + err = rs.rescan() + } close(r.running) diff --git a/rescan_test.go b/rescan_test.go index e611eb90..8720c33c 100644 --- a/rescan_test.go +++ b/rescan_test.go @@ -272,6 +272,12 @@ func (c *mockChainSource) setFailGetFilter(b bool) { c.failGetFilter = b } +// IsCurrent returns true if the backend chain thinks that its view of +// the network is current. +func (c *mockChainSource) IsCurrent() bool { + return true +} + // GetCFilter returns the filter of the given type for the block with the given // hash. func (c *mockChainSource) GetCFilter(hash chainhash.Hash,