Skip to content

Commit

Permalink
Merge pull request lightninglabs#273 from ellemouton/useQueryDispatcher
Browse files Browse the repository at this point in the history
query+neutrino: use query dispatcher for GetBlock and GetCFilter
  • Loading branch information
Roasbeef authored Jul 10, 2023
2 parents 9fd0fc5 + 663cacc commit 5aac983
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 489 deletions.
8 changes: 7 additions & 1 deletion banman/reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type Reason uint8
// We prevent using `iota` to ensure the order does not have the value since
// these are serialized within the database.
const (
// ExcedeedBanThreshold signals that a peer exceeded its ban threshold.
// ExceededBanThreshold signals that a peer exceeded its ban threshold.
ExceededBanThreshold Reason = 1

// NoCompactFilters signals that a peer was unable to serve us compact
Expand All @@ -20,6 +20,9 @@ const (
// InvalidFilterHeaderCheckpoint signals that a peer served us an
// invalid filter header checkpoint.
InvalidFilterHeaderCheckpoint Reason = 4

// InvalidBlock signals that a peer served us a bad block.
InvalidBlock Reason = 5
)

// String returns a human-readable description for the reason a peer was banned.
Expand All @@ -37,6 +40,9 @@ func (r Reason) String() string {
case InvalidFilterHeaderCheckpoint:
return "peer served invalid filter header checkpoint"

case InvalidBlock:
return "peer served an invalid block"

default:
return "unknown reason"
}
Expand Down
2 changes: 1 addition & 1 deletion blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
// Hand the queries to the work manager, and consume the verified
// responses as they come back.
errChan := b.cfg.QueryDispatcher.Query(
q.requests(), query.Cancel(b.quit),
q.requests(), query.Cancel(b.quit), query.NoRetryMax(),
)

// Keep waiting for more headers as long as we haven't received an
Expand Down
2 changes: 2 additions & 0 deletions blockmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
// mockDispatcher implements the query.Dispatcher interface and allows us to
// set up a custom Query method during tests.
type mockDispatcher struct {
query.WorkManager

query func(requests []*query.Request,
options ...query.QueryOption) chan error
}
Expand Down
18 changes: 2 additions & 16 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,6 @@ type ChainService struct { // nolint:maligned
FilterCache *lru.Cache[FilterCacheKey, *CacheableFilter]
BlockCache *lru.Cache[wire.InvVect, *CacheableBlock]

// queryPeers will be called to send messages to one or more peers,
// expecting a response.
queryPeers func(wire.Message, func(*ServerPeer, wire.Message,
chan<- struct{}), ...QueryOption)

chainParams chaincfg.Params
addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
Expand All @@ -665,7 +660,7 @@ type ChainService struct { // nolint:maligned
utxoScanner *UtxoScanner
broadcaster *pushtx.Broadcaster
banStore banman.Store
workManager *query.WorkManager
workManager query.WorkManager

// peerSubscribers is a slice of active peer subscriptions, that we
// will notify each time a new peer is connected.
Expand Down Expand Up @@ -741,22 +736,13 @@ func NewChainService(cfg Config) (*ChainService, error) {
persistToDisk: cfg.PersistToDisk,
broadcastTimeout: cfg.BroadcastTimeout,
}
s.workManager = query.New(&query.Config{
s.workManager = query.NewWorkManager(&query.Config{
ConnectedPeers: s.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
})

// We set the queryPeers method to point to queryChainServicePeers,
// passing a reference to the newly created ChainService.
s.queryPeers = func(msg wire.Message, f func(*ServerPeer,
wire.Message, chan<- struct{}), qo ...QueryOption) {

queryChainServicePeers(&s, msg, f, qo...)
}

var err error

s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 5aac983

Please sign in to comment.