Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize header download across peers to fetch headers within a chain's checkpointed region. #282

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
662 changes: 598 additions & 64 deletions blockmanager.go

Large diffs are not rendered by default.

945 changes: 906 additions & 39 deletions blockmanager_test.go

Large diffs are not rendered by default.

52 changes: 43 additions & 9 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
}
}

// IsSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from.
func (sp *ServerPeer) IsSyncCandidate() bool {
// The peer is not a candidate for sync if it's not a full node.
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
// is behind the start height of the request. If the peer is not behind the request start
// height false is returned, otherwise, true is.
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool {
queryGetHeaders, ok := req.(*headerQuery)

if !ok {
log.Debugf("request is not type headerQuery")

return true
}

if sp.LastBlock() < queryGetHeaders.startHeight {
return true
}
return false
}
Comment on lines +208 to +221
Copy link

@ProofOfKeags ProofOfKeags Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pass in the start height, this function should take two arguments: the server peer, and the start height. This is also doing a downcasting operation and that clutters the logic, do that at the call site. You are duplicating the downcasting logic in a lot of places and it isn't necessary. Also returning true in the case that the downcast fails clashes strongly with the name here. This function could just be

func (sp *ServerPeeer) IsPeerBehindStartHeight(h int32) bool {
    return sp.LastBlock() < h
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
Expand Down Expand Up @@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
}

bm, err := newBlockManager(&blockManagerCfg{
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
QueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
cfHeaderQueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
ConnectedPeers: s.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
}),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error {
s.addrManager.Start()
s.blockManager.Start()
s.blockSubscriptionMgr.Start()
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
return fmt.Errorf("unable to start block header work manager: %v", err)
}
if err := s.workManager.Start(); err != nil {
return fmt.Errorf("unable to start work manager: %v", err)
}
Expand Down
89 changes: 66 additions & 23 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ var (

// noProgress will be used to indicate to a query.WorkManager that a
// response makes no progress towards the completion of the query.
noProgress = query.Progress{
Finished: false,
Progressed: false,
}
noProgress = query.NoResponse
)

// queries are a set of options that can be modified per-query, unlike global
Expand Down Expand Up @@ -430,26 +427,50 @@ type cfiltersQuery struct {
headerIndex map[chainhash.Hash]int
targetHash chainhash.Hash
targetFilter *gcs.Filter
mtx sync.Mutex
}

// request couples a query message with the handler to be used for the response
// in a query.Request struct.
func (q *cfiltersQuery) request() *query.Request {
msg := wire.NewMsgGetCFilters(
q.filterType, uint32(q.startHeight), q.stopHash,
)
msg := &encodedQuery{
message: wire.NewMsgGetCFilters(
q.filterType, uint32(q.startHeight), q.stopHash,
),
encoding: wire.WitnessEncoding,
}

return &query.Request{
Req: msg,
HandleResp: q.handleResponse,
SendQuery: sendQueryMessageWithEncoding,
CloneReq: func(req query.ReqMessage) query.ReqMessage {
oldReq, ok := req.(*encodedQuery)
if !ok {
log.Errorf("request not of type *encodedQuery")
}
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFilters)
if !ok {
log.Errorf("request not of type *wire.MsgGetCFilters")
}
newReq := &encodedQuery{
message: wire.NewMsgGetCFilters(
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash,
),
encoding: oldReq.encoding,
priorityIndex: oldReq.priorityIndex,
}
return newReq
},
}
}

// handleResponse validates that the cfilter response we get from a peer is
// sane given the getcfilter query that we made.
func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
_ string) query.Progress {
func (q *cfiltersQuery) handleResponse(r query.ReqMessage, resp wire.Message,
peer query.Peer) query.Progress {

req := r.Message()
// The request must have been a "getcfilters" msg.
request, ok := req.(*wire.MsgGetCFilters)
if !ok {
Expand All @@ -476,6 +497,8 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,

// If this filter is for a block not in our index, we can ignore it, as
// we either already got it, or it is out of our queried range.
q.mtx.Lock()
defer q.mtx.Unlock()
i, ok := q.headerIndex[response.BlockHash]
if !ok {
return noProgress
Expand Down Expand Up @@ -548,17 +571,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
// If there are still entries left in the headerIndex then the query
// has made progress but has not yet completed.
if len(q.headerIndex) != 0 {
return query.Progress{
Finished: false,
Progressed: true,
}
return query.Progressed
}

// The headerIndex is empty and so this query is complete.
return query.Progress{
Finished: true,
Progressed: true,
}
return query.Finished
}

// prepareCFiltersQuery creates a cfiltersQuery that can be used to fetch a
Expand Down Expand Up @@ -759,6 +776,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
query.Cancel(s.quit),
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.ErrChan(make(chan error, 1)),
}

errChan := s.workManager.Query(
Expand Down Expand Up @@ -833,13 +851,22 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// Construct the appropriate getdata message to fetch the target block.
getData := wire.NewMsgGetData()
_ = getData.AddInvVect(inv)
msg := &encodedQuery{
message: getData,
encoding: wire.WitnessEncoding,
}

var foundBlock *btcutil.Block

// handleResp will be called for each message received from a peer. It
// will be used to signal to the work manager whether progress has been
// made or not.
handleResp := func(req, resp wire.Message, peer string) query.Progress {
handleResp := func(request query.ReqMessage, resp wire.Message, sp query.Peer) query.Progress {
req := request.Message()
peer := ""
if sp != nil {
peer = sp.Addr()
}
// The request must have been a "getdata" msg.
_, ok := req.(*wire.MsgGetData)
if !ok {
Expand Down Expand Up @@ -904,23 +931,39 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// we declare it sane. We can kill the query and pass the
// response back to the caller.
foundBlock = block
return query.Progress{
Finished: true,
Progressed: true,
}
return query.Finished
}

// Prepare the query request.
request := &query.Request{
Req: getData,
Req: msg,
HandleResp: handleResp,
SendQuery: sendQueryMessageWithEncoding,
CloneReq: func(req query.ReqMessage) query.ReqMessage {
newMsg := wire.NewMsgGetData()
_ = newMsg.AddInvVect(inv)

oldReq, ok := req.(*encodedQuery)
if !ok {
log.Errorf("request not of type *encodedQuery")
}

newReq := &encodedQuery{
message: newMsg,
encoding: oldReq.encoding,
priorityIndex: oldReq.priorityIndex,
}

return newReq
},
}

// Prepare the query options.
queryOpts := []query.QueryOption{
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.Cancel(s.quit),
query.ErrChan(make(chan error, 1)),
}

// Send the request to the work manager and await a response.
Expand Down
89 changes: 68 additions & 21 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type queryOptions struct {
// that a query can be retried. If this is set then numRetries has no
// effect.
noRetryMax bool

// errChan error channel with which the workmananger sends error.
errChan chan error
}

// QueryOption is a functional option argument to any of the network query
Expand All @@ -67,6 +70,14 @@ func (qo *queryOptions) applyQueryOptions(options ...QueryOption) {
}
}

// ErrChan is a query option that specifies the error channel which the workmanager
// sends any error to.
func ErrChan(err chan error) QueryOption {
return func(qo *queryOptions) {
qo.errChan = err
}
}

// NumRetries is a query option that specifies the number of times a query
// should be retried.
func NumRetries(num uint8) QueryOption {
Expand Down Expand Up @@ -107,25 +118,40 @@ func Cancel(cancel chan struct{}) QueryOption {
}
}

// Progress encloses the result of handling a response for a given Request,
// determining whether the response did progress the query.
type Progress struct {
// Finished is true if the query was finished as a result of the
// received response.
Finished bool

// Progressed is true if the query made progress towards fully
// answering the request as a result of the received response. This is
// used for the requests types where more than one response is
// expected.
Progressed bool
}
// Progress encloses the result of handling a response for a given Request.
type Progress string

var (

// Finished indicates we have received the complete, valid response for this request,
// and so we are done with it.
Finished Progress = "Received complete and valid response for request."

// Progressed indicates that we have received a valid response, but we are expecting more.
Progressed Progress = "Received valid response, expecting more response for query."

// UnFinishedRequest indicates that we have received some response, but we need to rescheule the job
// to completely fetch all the response required for this request.
UnFinishedRequest Progress = "Received valid response, reschedule to complete request"

// ResponseErr indicates we obtained a valid response but response fails checks and needs to
// be rescheduled.
ResponseErr Progress = "Received valid response but fails checks "

// IgnoreRequest indicates that we have received a valid response but the workmanager need take
// no action on the result of this job.
IgnoreRequest Progress = "Received response but ignoring"

// NoResponse indicates that we have received an invalid response for this request, and we need
// to wait for a valid one.
NoResponse Progress = "Received invalid response"
)
ProofOfKeags marked this conversation as resolved.
Show resolved Hide resolved

// Request is the main struct that defines a bitcoin network query to be sent to
// connected peers.
type Request struct {
// Req is the message request to send.
Req wire.Message
// Req contains the message request to send.
Req ReqMessage

// HandleResp is a response handler that will be called for every
// message received from the peer that the request was made to. It
Expand All @@ -138,7 +164,26 @@ type Request struct {
// should validate the response and immediately return the progress.
// The response should be handed off to another goroutine for
// processing.
HandleResp func(req, resp wire.Message, peer string) Progress
HandleResp func(req ReqMessage, resp wire.Message, peer Peer) Progress

// SendQuery handles sending request to the worker's peer. It returns an error,
// if one is encountered while sending the request.
SendQuery func(peer Peer, request ReqMessage) error

// CloneReq clones the message.
CloneReq func(message ReqMessage) ReqMessage
Comment on lines +173 to +174

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do we need that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained it in the code:

https://github.com/Chinwendu20/neutrino/blob/64b278771ff75da0b30136af7d4ab0ace06d897e/query/worker.go#L261-L267

Also reading the commit message that effected this change would help:
dd02e22

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should be modifying the request. It seems you're using a common structure for what ought to be two different data types. One represents the request itself, the top level job. The other represents the remaining work on that job. The latter is conceptually an in-progress job. I think you should label it as such and then rather than talking about it as a "clone", it's really a construction of a "InProgressRequest" or something to that effect.

}

// ReqMessage is an interface which all structs containing information
// required to process a message request must implement.
type ReqMessage interface {

// Message returns the message request.
Message() wire.Message

// PriorityIndex returns the priority the caller prefers the request
// would take.
PriorityIndex() float64
}

// WorkManager defines an API for a manager that dispatches queries to bitcoin
Expand Down Expand Up @@ -167,11 +212,6 @@ type Dispatcher interface {
// Peer is the interface that defines the methods needed by the query package
// to be able to make requests and receive responses from a network peer.
type Peer interface {
// QueueMessageWithEncoding adds the passed bitcoin message to the peer
// send queue.
QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
encoding wire.MessageEncoding)

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
Expand All @@ -184,4 +224,11 @@ type Peer interface {
// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
OnDisconnect() <-chan struct{}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
// the request's start Height which it receives as an argument.
IsPeerBehindStartHeight(req ReqMessage) bool

// IsSyncCandidate returns true if the peer is a sync candidate.
IsSyncCandidate() bool
}
Loading
Loading