Skip to content

Commit

Permalink
neutrino + query: query.ErrChan, jobResults, Request.HandleResp refactor
Browse files Browse the repository at this point in the history
- Added Unfinished bool to jobResult to indicate successful jobs that
still need to send another request to the peer to be considered
complete.

- Made ErrChan a query option in that way it is optional for different
queries.

- Refactored HandleResp, peer is now passed as query.Peer instead of
using its address, jobErr is also passed as an argument in the
HandleResp function. This refactor is useful to enable the query package
to be used for fetching block headers. We passed the peer as the peer
is needed for validation and jobErr, so as to quickly exit the feedback
loop when fetched headers do not pass preliminary verification.

Signed-off-by: Maureen Ononiwu <amaka013@gmail.com>
  • Loading branch information
Chinwendu20 committed Sep 2, 2023
1 parent 2748d37 commit f1f8897
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 107 deletions.
12 changes: 9 additions & 3 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,14 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request {

// handleResponse is the internal response handler used for requests for this
// CFHeaders query.
func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {
func (c *checkpointedCFHeadersQuery) handleResponse(request query.ReqMessage, resp wire.Message,
peer query.Peer, _ *error) query.Progress {

peerAddr := ""
if peer != nil {
peerAddr = peer.Addr()
}
req := request.Message()

r, ok := resp.(*wire.MsgCFHeaders)
if !ok {
Expand Down Expand Up @@ -1106,7 +1112,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
// Hand the queries to the work manager, and consume the verified
// responses as they come back.
errChan := b.cfg.QueryDispatcher.Query(
q.requests(), query.Cancel(b.quit), query.NoRetryMax(),
q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)),
)

// Keep waiting for more headers as long as we haven't received an
Expand Down
20 changes: 11 additions & 9 deletions blockmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ func generateHeaders(genesisBlockHeader *wire.BlockHeader,

// generateResponses generates the MsgCFHeaders messages from the given queries
// and headers.
func generateResponses(msgs []wire.Message,
func generateResponses(msgs []query.ReqMessage,
headers *headers) ([]*wire.MsgCFHeaders, error) {

// Craft a response for each message.
var responses []*wire.MsgCFHeaders
for _, msg := range msgs {
// Only GetCFHeaders expected.
q, ok := msg.(*wire.MsgGetCFHeaders)
q, ok := msg.Message().(*wire.MsgGetCFHeaders)
if !ok {
return nil, fmt.Errorf("got unexpected message %T",
msg)
Expand Down Expand Up @@ -350,9 +350,9 @@ func TestBlockManagerInitialInterval(t *testing.T) {
requests []*query.Request,
options ...query.QueryOption) chan error {

var msgs []wire.Message
var msgs []query.ReqMessage
for _, q := range requests {
msgs = append(msgs, q.Req.Message())
msgs = append(msgs, q.Req)
}

responses, err := generateResponses(msgs, headers)
Expand All @@ -378,8 +378,9 @@ func TestBlockManagerInitialInterval(t *testing.T) {

// Let the blockmanager handle the
// message.
var jobErr error
progress := requests[index].HandleResp(
msgs[index], &resp, "",
msgs[index], &resp, nil, &jobErr,
)

if !progress.Finished {
Expand All @@ -400,7 +401,7 @@ func TestBlockManagerInitialInterval(t *testing.T) {
// Otherwise resend the response we
// just sent.
progress = requests[index].HandleResp(
msgs[index], &resp2, "",
msgs[index], &resp2, nil, &jobErr,
)
if !progress.Finished {
errChan <- fmt.Errorf("got "+
Expand Down Expand Up @@ -580,9 +581,9 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
requests []*query.Request,
options ...query.QueryOption) chan error {

var msgs []wire.Message
var msgs []query.ReqMessage
for _, q := range requests {
msgs = append(msgs, q.Req.Message())
msgs = append(msgs, q.Req)
}
responses, err := generateResponses(msgs, headers)
require.NoError(t, err)
Expand Down Expand Up @@ -617,9 +618,10 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
go func() {
// Check that the success of the callback match what we
// expect.
var jobErr error
for i := range responses {
progress := requests[i].HandleResp(
msgs[i], responses[i], "",
msgs[i], responses[i], nil, &jobErr,
)
if i == test.firstInvalid {
if progress.Finished {
Expand Down
14 changes: 11 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,10 @@ func (q *cfiltersQuery) request() *query.Request {

// handleResponse validates that the cfilter response we get from a peer is
// sane given the getcfilter query that we made.
func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
_ string) query.Progress {
func (q *cfiltersQuery) handleResponse(r query.ReqMessage, resp wire.Message,
peer query.Peer, _ *error) query.Progress {

req := r.Message()
// The request must have been a "getcfilters" msg.
request, ok := req.(*wire.MsgGetCFilters)
if !ok {
Expand Down Expand Up @@ -784,6 +785,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
query.Cancel(s.quit),
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.ErrChan(make(chan error, 1)),
}

errChan := s.workManager.Query(
Expand Down Expand Up @@ -868,7 +870,12 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// handleResp will be called for each message received from a peer. It
// will be used to signal to the work manager whether progress has been
// made or not.
handleResp := func(req, resp wire.Message, peer string) query.Progress {
handleResp := func(request query.ReqMessage, resp wire.Message, sp query.Peer, _ *error) query.Progress {
req := request.Message()
peer := ""
if sp != nil {
peer = sp.Addr()
}
// The request must have been a "getdata" msg.
_, ok := req.(*wire.MsgGetData)
if !ok {
Expand Down Expand Up @@ -968,6 +975,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.Cancel(s.quit),
query.ErrChan(make(chan error, 1)),
}

// Send the request to the work manager and await a response.
Expand Down
13 changes: 12 additions & 1 deletion query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type queryOptions struct {
// that a query can be retried. If this is set then numRetries has no
// effect.
noRetryMax bool

// errChan error channel with which the workmananger sends error.
errChan chan error
}

// QueryOption is a functional option argument to any of the network query
Expand All @@ -67,6 +70,14 @@ func (qo *queryOptions) applyQueryOptions(options ...QueryOption) {
}
}

// ErrChan is a query option that specifies the error channel which the workmanager
// sends any error to.
func ErrChan(err chan error) QueryOption {
return func(qo *queryOptions) {
qo.errChan = err
}
}

// NumRetries is a query option that specifies the number of times a query
// should be retried.
func NumRetries(num uint8) QueryOption {
Expand Down Expand Up @@ -138,7 +149,7 @@ type Request struct {
// should validate the response and immediately return the progress.
// The response should be handed off to another goroutine for
// processing.
HandleResp func(req, resp wire.Message, peer string) Progress
HandleResp func(req ReqMessage, resp wire.Message, peer Peer, jobErr *error) Progress

// SendQuery handles sending request to the worker's peer. It returns an error,
// if one is encountered while sending the request.
Expand Down
35 changes: 26 additions & 9 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ var (
// ErrJobCanceled is returned if the job is canceled before the query
// has been answered.
ErrJobCanceled = errors.New("job canceled")

// ErrResponseExistForQuery is returned if response exists already for the query.
ErrResponseExistForQuery = errors.New("response Exists for query")

// ErrResponseErr is returned if we received a compatible response for the query but, it did not pass
// preliminary verification.
ErrResponseErr = errors.New("received response with error")
)

// queryJob is the internal struct that wraps the Query to work on, in
Expand All @@ -42,9 +49,10 @@ func (q *queryJob) Index() float64 {

// jobResult is the final result of the worker's handling of the queryJob.
type jobResult struct {
job *queryJob
peer Peer
err error
job *queryJob
peer Peer
err error
unfinished bool
}

// worker is responsible for polling work from its work queue, and handing it
Expand Down Expand Up @@ -152,8 +160,9 @@ nexJobLoop:
// Wait for the correct response to be received from the peer,
// or an error happening.
var (
jobErr error
timeout = time.NewTimer(job.timeout)
jobErr error
jobUnfinished bool
timeout = time.NewTimer(job.timeout)
)

feedbackLoop:
Expand All @@ -164,7 +173,7 @@ nexJobLoop:
// our request.
case resp := <-msgChan:
progress := job.HandleResp(
job.Req.Message(), resp, peer.Addr(),
job.Req, resp, peer, &jobErr,
)

log.Tracef("Worker %v handled msg %T while "+
Expand All @@ -176,6 +185,10 @@ nexJobLoop:
// If the response did not answer our query, we
// check whether it did progress it.
if !progress.Finished {
if jobErr != nil {
break feedbackLoop
}

// If it did make progress we reset the
// timeout. This ensures that the
// queries with multiple responses
Expand All @@ -192,6 +205,9 @@ nexJobLoop:
continue feedbackLoop
}

if !progress.Progressed {
jobUnfinished = true
}
// We did get a valid response, and can break
// the loop.
break feedbackLoop
Expand Down Expand Up @@ -260,9 +276,10 @@ nexJobLoop:
// getting a new job.
select {
case results <- &jobResult{
job: resultJob,
peer: peer,
err: jobErr,
job: resultJob,
peer: peer,
err: jobErr,
unfinished: jobUnfinished,
}:
case <-quit:
return
Expand Down
Loading

0 comments on commit f1f8897

Please sign in to comment.