Skip to content

Commit

Permalink
rescan: add a waitForBlocks helper method to rescanState
Browse files Browse the repository at this point in the history
Add a new `waitForBlocks` helper method to rescanState that will wait on
block notifications until a given predicate returns true. Currently this
method is only called with one predicate but an upcoming commit will
make use of it using a differnet predicate.
  • Loading branch information
ellemouton committed Jul 25, 2023
1 parent 97b7482 commit e1a71d4
Showing 1 changed file with 80 additions and 68 deletions.
148 changes: 80 additions & 68 deletions rescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,79 +434,14 @@ func (rs *rescanState) rescan() error {
chain := rs.chain
ro := rs.opts

// Now that we've determined the starting point of our rescan, we can
// begin processing updates from the client.
var updates []*updateOptions

// We'll need to ensure that the backing chain has actually caught up to
// the rescan's starting height.
bestBlock, err := chain.BestBlock()
if err != nil {
if err := rs.waitForBlocks(func(_ chainhash.Hash, height uint32) bool {
return height >= uint32(rs.curStamp.Height)
}); err != nil {
return err
}

// If it hasn't, we'll subscribe for block notifications at tip and wait
// until we receive a notification for a block with the rescan's
// starting height.
if bestBlock.Height < rs.curStamp.Height {
log.Debugf("Waiting to catch up to the rescan start height=%d "+
"from height=%d", rs.curStamp.Height, bestBlock.Height)

blockSubscription, err := chain.Subscribe(
uint32(bestBlock.Height),
)
if err != nil {
return err
}

waitUntilSynced:
for {
select {
// We'll make sure to process any updates while we're
// syncing to prevent blocking the client.
case update := <-ro.update:
updates = append(updates, update)

// A new block notification for the tip of the chain has
// arrived. We'll determine we've caught up to the
// rescan's starting height by receiving a block
// connected notification for the same height.
case ntfn, ok := <-blockSubscription.Notifications:
if !ok {
return errors.New("rescan block " +
"subscription was canceled " +
"while waiting to catch up")
}

if _, ok := ntfn.(*blockntfns.Connected); !ok {
continue
}
if ntfn.Height() < uint32(rs.curStamp.Height) {
continue
}

break waitUntilSynced

case <-ro.quit:
blockSubscription.Cancel()
return ErrRescanExit
}
}

blockSubscription.Cancel()

// If any updates were queued while waiting to catch up to the
// start height of the rescan, apply them now.
for _, upd := range updates {
_, err := ro.updateFilter(
chain, upd, &rs.curStamp, &rs.curHeader,
)
if err != nil {
return err
}
}
}

log.Debugf("Starting rescan from known block %d (%s)",
rs.curStamp.Height, rs.curStamp.Hash)

Expand Down Expand Up @@ -803,6 +738,83 @@ rescanLoop:
}
}

// waitForBlocks is a helper function that can be used to wait on block
// notifications until the given predicate returns true.
func (rs *rescanState) waitForBlocks(predicate func(hash chainhash.Hash,
height uint32) bool) error {

chain := rs.chain
ro := rs.opts

bestBlock, err := chain.BestBlock()
if err != nil {
return err
}

// Before subscribing to block notifications, first check if the
// predicate is not already satisfied by the current best block.
if predicate(bestBlock.Hash, uint32(bestBlock.Height)) {
return nil
}

log.Debugf("Waiting to catch up to the rescan start height=%d "+
"from height=%d", rs.curStamp.Height, bestBlock.Height)

blockSubscription, err := chain.Subscribe(uint32(bestBlock.Height))
if err != nil {
return err
}
defer blockSubscription.Cancel()

var updates []*updateOptions

waitUntilSynced:
for {
select {
// We'll make sure to process any updates while we're syncing to
// prevent blocking the client.
case update := <-ro.update:
updates = append(updates, update)

// A new block notification for the tip of the chain has
// arrived. We'll determine we've caught up to the rescan's
// starting height by receiving a block connected notification
// for the same height.
case ntfn, ok := <-blockSubscription.Notifications:
if !ok {
return errors.New("rescan block subscription " +
"was canceled while waiting to catch " +
"up")
}
cNtfn, ok := ntfn.(*blockntfns.Connected)
if !ok {
continue
}

header := cNtfn.Header()
if predicate(header.BlockHash(), cNtfn.Height()) {
break waitUntilSynced
}

case <-ro.quit:
return ErrRescanExit
}

// If any updates were queued while waiting to catch up to the
// start height of the rescan, apply them now.
for _, upd := range updates {
_, err := ro.updateFilter(
chain, upd, &rs.curStamp, &rs.curHeader,
)
if err != nil {
return err
}
}
}

return nil
}

// notifyBlock calls appropriate listeners based on the block filter.
func (rs *rescanState) notifyBlock() error {
chain := rs.chain
Expand Down

0 comments on commit e1a71d4

Please sign in to comment.