Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

neutrino+query+rescan: improve rescan speed #236

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ type ChainService struct { // nolint:maligned
utxoScanner *UtxoScanner
broadcaster *pushtx.Broadcaster
banStore banman.Store
queryDispatcher query.Dispatcher
workManager *query.WorkManager

// peerSubscribers is a slice of active peer subscriptions, that we
Expand Down Expand Up @@ -679,6 +680,8 @@ func NewChainService(cfg Config) (*ChainService, error) {
Ranking: query.NewPeerRanking(),
})

s.queryDispatcher = s.workManager

// We set the queryPeers method to point to queryChainServicePeers,
// passing a reference to the newly created ChainService.
s.queryPeers = func(msg wire.Message, f func(*ServerPeer,
Expand Down
135 changes: 79 additions & 56 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/lightninglabs/neutrino/query"

"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -947,75 +949,96 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
getData := wire.NewMsgGetData()
_ = getData.AddInvVect(inv)

// The block is only updated from the checkResponse function argument,
// which is always called single-threadedly. We don't check the block
// until after the query is finished, so we can just write to it
// naively.
var foundBlock *btcutil.Block
s.queryPeers(
// Send a wire.GetDataMsg
getData,

// Check responses and if we get one that matches, end the
// query early.
func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}) {
switch response := resp.(type) {
// We're only interested in "block" messages.
case *wire.MsgBlock:
// Only keep this going if we haven't already
// found a block, or we risk closing an already
// closed channel.
if foundBlock != nil {
return
request := &query.Request{
Req: getData,
HandleResp: func(req, resp wire.Message, peer string) query.Progress {
// The request must have been a "getdata" msg.
_, ok := req.(*wire.MsgGetData)
if !ok {
return query.Progress{
Finished: false,
Progressed: false,
}
}

// If this isn't our block, ignore it.
if response.BlockHash() != blockHash {
return
// We're only interested in "block" messages.
response, ok := resp.(*wire.MsgBlock)
if !ok {
return query.Progress{
Finished: false,
Progressed: false,
}
block := btcutil.NewBlock(response)
}

// Only set height if btcutil hasn't
// automagically put one in.
if block.Height() == btcutil.BlockHeightUnknown {
block.SetHeight(int32(height))
// If this isn't the block we asked for, ignore it.
if response.BlockHash() != blockHash {
return query.Progress{
Finished: false,
Progressed: false,
}
}

// If this claims our block but doesn't pass
// the sanity check, the peer is trying to
// bamboozle us. Disconnect it.
if err := blockchain.CheckBlockSanity(
block,
// We don't need to check PoW because
// by the time we get here, it's been
// checked during header
// synchronization
s.chainParams.PowLimit,
s.timeSource,
); err != nil {
log.Warnf("Invalid block for %s "+
"received from %s -- "+
"disconnecting peer", blockHash,
sp.Addr())
sp.Disconnect()
return
block := btcutil.NewBlock(response)

// Only set height if btcutil hasn't
// automagically put one in.
if block.Height() == btcutil.BlockHeightUnknown {
block.SetHeight(int32(height))
}

// If this claims our block but doesn't pass
// the sanity check, the peer is trying to
// bamboozle us.
if err := blockchain.CheckBlockSanity(
block,
// We don't need to check PoW because
// by the time we get here, it's been
// checked during header
// synchronization
s.chainParams.PowLimit,
s.timeSource,
); err != nil {
log.Warnf("Invalid block for %s "+
"received from %s -- ",
blockHash, peer)
fmt.Println(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth it to include the error to the log.Warnf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh haha yeah that was left over from debugging 🙈 will fix. thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior logic would disconnect here, but now we'll continue...

Seems worthy of a future spin off to propagate a "ban worthy" error back up to the main scheduler.


return query.Progress{
Finished: false,
Progressed: false,
}
}

// TODO(roasbeef): modify CheckBlockSanity to
// also check witness commitment
// TODO(roasbeef): modify CheckBlockSanity to
// also check witness commitment

// At this point, the block matches what we
// know about it and we declare it sane. We can
// kill the query and pass the response back to
// the caller.
foundBlock = block
close(quit)
default:
// At this point, the block matches what we
// know about it and we declare it sane. We can
// kill the query and pass the response back to
// the caller.
foundBlock = block
return query.Progress{
Finished: true,
Progressed: true,
}
},
options...,
}

errChan := s.queryDispatcher.Query(
[]*query.Request{request}, query.Encoding(qo.encoding),
query.Cancel(s.quit),
)

select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-s.quit:
return nil, ErrShuttingDown
}

if foundBlock == nil {
return nil, fmt.Errorf("couldn't retrieve block %s from "+
"network", blockHash)
Expand Down
41 changes: 26 additions & 15 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/lightninglabs/neutrino/query"

"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -280,19 +282,27 @@ func TestBlockCache(t *testing.T) {
chainParams: chaincfg.Params{
PowLimit: maxPowLimit,
},
timeSource: blockchain.NewMedianTime(),
timeSource: blockchain.NewMedianTime(),
queryDispatcher: &mockDispatcher{},
}

// We'll set up the queryPeers method to make sure we are only querying
// for blocks, and send the block hashes queried over the queries
// channel.
queries := make(chan chainhash.Hash, 1)
cs.queryPeers = func(msg wire.Message, f func(*ServerPeer,
wire.Message, chan<- struct{}), qo ...QueryOption) {
cs.queryDispatcher.(*mockDispatcher).query = func(requests []*query.Request,
options ...query.QueryOption) chan error {

errChan := make(chan error, 1)
defer close(errChan)

getData, ok := msg.(*wire.MsgGetData)
if len(requests) != 1 {
t.Fatalf("unexpected 1 request, got %d", len(requests))
}

getData, ok := requests[0].Req.(*wire.MsgGetData)
if !ok {
t.Fatalf("unexpected type: %T", msg)
t.Fatalf("unexpected type: %T", requests[0].Req)
}

if len(getData.InvList) != 1 {
Expand All @@ -308,30 +318,31 @@ func TestBlockCache(t *testing.T) {
// Serve the block that matches the requested block header.
for _, b := range blocks {
if *b.Hash() == inv.Hash {
header, _, err := headers.FetchHeader(b.Hash())
if err != nil {
t.Fatalf("")
}

// Execute the callback with the found block,
// and wait for the quit channel to be closed.
quit := make(chan struct{})
f(nil, b.MsgBlock(), quit)

select {
case <-quit:
case <-time.After(1 * time.Second):
t.Fatalf("channel not closed")
resp := &wire.MsgBlock{
Header: *header,
Transactions: b.MsgBlock().Transactions,
}

requests[0].HandleResp(requests[0].Req, resp, "")

// Notify the test about the query.
select {
case queries <- inv.Hash:
case <-time.After(1 * time.Second):
t.Fatalf("query was not handled")
}

return
return errChan
}
}

t.Fatalf("queried for unknown block: %v", inv.Hash)
return errChan
}

// fetchAndAssertPeersQueried calls GetBlock and makes sure the block
Expand Down