From f1f8897b5156b97c17f8a49b791070120f66dfe5 Mon Sep 17 00:00:00 2001 From: Maureen Ononiwu Date: Sat, 2 Sep 2023 09:31:58 +0100 Subject: [PATCH] neutrino + query: query.ErrChan, jobResults, Request.HandleResp refactor - Added Unfinished bool to jobResult to indicate successful jobs that still need to send another request to the peer to be considered complete. - Made ErrChan a query option in that way it is optional for different queries. - Refactored HandleResp, peer is now passed as query.Peer instead of using its address, jobErr is also passed as an argument in the HandleResp function. This refactor is useful to enable the query package to be used for fetching block headers. We passed the peer as the peer is needed for validation and jobErr, so as to quickly exit the feedback loop when fetched headers do not pass preliminary verification. Signed-off-by: Maureen Ononiwu --- blockmanager.go | 12 +- blockmanager_test.go | 20 +-- query.go | 14 +- query/interface.go | 13 +- query/worker.go | 35 +++-- query/worker_test.go | 193 ++++++++++++++++++------- query/workmanager.go | 77 +++++++--- query/workmanager_test.go | 286 +++++++++++++++++++++++++++++++++++++- query_test.go | 6 +- 9 files changed, 549 insertions(+), 107 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 35523fa1..f5f2d4da 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -850,8 +850,14 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request { // handleResponse is the internal response handler used for requests for this // CFHeaders query. -func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message, - peerAddr string) query.Progress { +func (c *checkpointedCFHeadersQuery) handleResponse(request query.ReqMessage, resp wire.Message, + peer query.Peer, _ *error) query.Progress { + + peerAddr := "" + if peer != nil { + peerAddr = peer.Addr() + } + req := request.Message() r, ok := resp.(*wire.MsgCFHeaders) if !ok { @@ -1106,7 +1112,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // Hand the queries to the work manager, and consume the verified // responses as they come back. errChan := b.cfg.QueryDispatcher.Query( - q.requests(), query.Cancel(b.quit), query.NoRetryMax(), + q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)), ) // Keep waiting for more headers as long as we haven't received an diff --git a/blockmanager_test.go b/blockmanager_test.go index 97416cf1..3bce9475 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -214,14 +214,14 @@ func generateHeaders(genesisBlockHeader *wire.BlockHeader, // generateResponses generates the MsgCFHeaders messages from the given queries // and headers. -func generateResponses(msgs []wire.Message, +func generateResponses(msgs []query.ReqMessage, headers *headers) ([]*wire.MsgCFHeaders, error) { // Craft a response for each message. var responses []*wire.MsgCFHeaders for _, msg := range msgs { // Only GetCFHeaders expected. - q, ok := msg.(*wire.MsgGetCFHeaders) + q, ok := msg.Message().(*wire.MsgGetCFHeaders) if !ok { return nil, fmt.Errorf("got unexpected message %T", msg) @@ -350,9 +350,9 @@ func TestBlockManagerInitialInterval(t *testing.T) { requests []*query.Request, options ...query.QueryOption) chan error { - var msgs []wire.Message + var msgs []query.ReqMessage for _, q := range requests { - msgs = append(msgs, q.Req.Message()) + msgs = append(msgs, q.Req) } responses, err := generateResponses(msgs, headers) @@ -378,8 +378,9 @@ func TestBlockManagerInitialInterval(t *testing.T) { // Let the blockmanager handle the // message. + var jobErr error progress := requests[index].HandleResp( - msgs[index], &resp, "", + msgs[index], &resp, nil, &jobErr, ) if !progress.Finished { @@ -400,7 +401,7 @@ func TestBlockManagerInitialInterval(t *testing.T) { // Otherwise resend the response we // just sent. progress = requests[index].HandleResp( - msgs[index], &resp2, "", + msgs[index], &resp2, nil, &jobErr, ) if !progress.Finished { errChan <- fmt.Errorf("got "+ @@ -580,9 +581,9 @@ func TestBlockManagerInvalidInterval(t *testing.T) { requests []*query.Request, options ...query.QueryOption) chan error { - var msgs []wire.Message + var msgs []query.ReqMessage for _, q := range requests { - msgs = append(msgs, q.Req.Message()) + msgs = append(msgs, q.Req) } responses, err := generateResponses(msgs, headers) require.NoError(t, err) @@ -617,9 +618,10 @@ func TestBlockManagerInvalidInterval(t *testing.T) { go func() { // Check that the success of the callback match what we // expect. + var jobErr error for i := range responses { progress := requests[i].HandleResp( - msgs[i], responses[i], "", + msgs[i], responses[i], nil, &jobErr, ) if i == test.firstInvalid { if progress.Finished { diff --git a/query.go b/query.go index ad75bfca..c28bdabc 100644 --- a/query.go +++ b/query.go @@ -470,9 +470,10 @@ func (q *cfiltersQuery) request() *query.Request { // handleResponse validates that the cfilter response we get from a peer is // sane given the getcfilter query that we made. -func (q *cfiltersQuery) handleResponse(req, resp wire.Message, - _ string) query.Progress { +func (q *cfiltersQuery) handleResponse(r query.ReqMessage, resp wire.Message, + peer query.Peer, _ *error) query.Progress { + req := r.Message() // The request must have been a "getcfilters" msg. request, ok := req.(*wire.MsgGetCFilters) if !ok { @@ -784,6 +785,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, query.Cancel(s.quit), query.Encoding(qo.encoding), query.NumRetries(qo.numRetries), + query.ErrChan(make(chan error, 1)), } errChan := s.workManager.Query( @@ -868,7 +870,12 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, // handleResp will be called for each message received from a peer. It // will be used to signal to the work manager whether progress has been // made or not. - handleResp := func(req, resp wire.Message, peer string) query.Progress { + handleResp := func(request query.ReqMessage, resp wire.Message, sp query.Peer, _ *error) query.Progress { + req := request.Message() + peer := "" + if sp != nil { + peer = sp.Addr() + } // The request must have been a "getdata" msg. _, ok := req.(*wire.MsgGetData) if !ok { @@ -968,6 +975,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, query.Encoding(qo.encoding), query.NumRetries(qo.numRetries), query.Cancel(s.quit), + query.ErrChan(make(chan error, 1)), } // Send the request to the work manager and await a response. diff --git a/query/interface.go b/query/interface.go index cd6c56c8..52ab30dd 100644 --- a/query/interface.go +++ b/query/interface.go @@ -43,6 +43,9 @@ type queryOptions struct { // that a query can be retried. If this is set then numRetries has no // effect. noRetryMax bool + + // errChan error channel with which the workmananger sends error. + errChan chan error } // QueryOption is a functional option argument to any of the network query @@ -67,6 +70,14 @@ func (qo *queryOptions) applyQueryOptions(options ...QueryOption) { } } +// ErrChan is a query option that specifies the error channel which the workmanager +// sends any error to. +func ErrChan(err chan error) QueryOption { + return func(qo *queryOptions) { + qo.errChan = err + } +} + // NumRetries is a query option that specifies the number of times a query // should be retried. func NumRetries(num uint8) QueryOption { @@ -138,7 +149,7 @@ type Request struct { // should validate the response and immediately return the progress. // The response should be handed off to another goroutine for // processing. - HandleResp func(req, resp wire.Message, peer string) Progress + HandleResp func(req ReqMessage, resp wire.Message, peer Peer, jobErr *error) Progress // SendQuery handles sending request to the worker's peer. It returns an error, // if one is encountered while sending the request. diff --git a/query/worker.go b/query/worker.go index 5e3d1644..f7c45772 100644 --- a/query/worker.go +++ b/query/worker.go @@ -17,6 +17,13 @@ var ( // ErrJobCanceled is returned if the job is canceled before the query // has been answered. ErrJobCanceled = errors.New("job canceled") + + // ErrResponseExistForQuery is returned if response exists already for the query. + ErrResponseExistForQuery = errors.New("response Exists for query") + + // ErrResponseErr is returned if we received a compatible response for the query but, it did not pass + // preliminary verification. + ErrResponseErr = errors.New("received response with error") ) // queryJob is the internal struct that wraps the Query to work on, in @@ -42,9 +49,10 @@ func (q *queryJob) Index() float64 { // jobResult is the final result of the worker's handling of the queryJob. type jobResult struct { - job *queryJob - peer Peer - err error + job *queryJob + peer Peer + err error + unfinished bool } // worker is responsible for polling work from its work queue, and handing it @@ -152,8 +160,9 @@ nexJobLoop: // Wait for the correct response to be received from the peer, // or an error happening. var ( - jobErr error - timeout = time.NewTimer(job.timeout) + jobErr error + jobUnfinished bool + timeout = time.NewTimer(job.timeout) ) feedbackLoop: @@ -164,7 +173,7 @@ nexJobLoop: // our request. case resp := <-msgChan: progress := job.HandleResp( - job.Req.Message(), resp, peer.Addr(), + job.Req, resp, peer, &jobErr, ) log.Tracef("Worker %v handled msg %T while "+ @@ -176,6 +185,10 @@ nexJobLoop: // If the response did not answer our query, we // check whether it did progress it. if !progress.Finished { + if jobErr != nil { + break feedbackLoop + } + // If it did make progress we reset the // timeout. This ensures that the // queries with multiple responses @@ -192,6 +205,9 @@ nexJobLoop: continue feedbackLoop } + if !progress.Progressed { + jobUnfinished = true + } // We did get a valid response, and can break // the loop. break feedbackLoop @@ -260,9 +276,10 @@ nexJobLoop: // getting a new job. select { case results <- &jobResult{ - job: resultJob, - peer: peer, - err: jobErr, + job: resultJob, + peer: peer, + err: jobErr, + unfinished: jobUnfinished, }: case <-quit: return diff --git a/query/worker_test.go b/query/worker_test.go index 4fc83f8b..c036426b 100644 --- a/query/worker_test.go +++ b/query/worker_test.go @@ -35,6 +35,12 @@ var ( finalResp = &wire.MsgTx{ Version: 222, } + noProgressNoFinalResp = &wire.MsgTx{ + Version: 333, + } + finalRespWithErr = &wire.MsgTx{ + Version: 444, + } ) type mockPeer struct { @@ -69,7 +75,7 @@ func (m *mockPeer) Addr() string { func makeJob() *queryJob { q := &Request{ Req: req, - HandleResp: func(req, resp wire.Message, _ string) Progress { + HandleResp: func(req ReqMessage, resp wire.Message, peer Peer, err *error) Progress { if resp == finalResp { return Progress{ Finished: true, @@ -84,6 +90,20 @@ func makeJob() *queryJob { } } + if resp == noProgressNoFinalResp { + return Progress{ + Finished: true, + Progressed: false, + } + } + + if resp == finalRespWithErr { + *err = ErrResponseExistForQuery + return Progress{ + Finished: true, + Progressed: true, + } + } return Progress{ Finished: false, Progressed: false, @@ -233,6 +253,11 @@ func TestWorkerIgnoreMsgs(t *testing.T) { t.Fatalf("result's job index should not be different from task's") } + // Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + // And the correct peer. if result.peer != ctx.peer { t.Fatalf("expected peer to be %v, was %v", @@ -300,6 +325,11 @@ func TestWorkerTimeout(t *testing.T) { ctx.peer.Addr(), result.peer) } + // Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + // It will immediately attempt to fetch another task. select { case ctx.nextJob <- task: @@ -365,6 +395,11 @@ func TestWorkerDisconnect(t *testing.T) { ctx.peer.Addr(), result.peer) } + // Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + // No more jobs should be accepted by the worker after it has exited. select { case ctx.nextJob <- task: @@ -393,70 +428,115 @@ func TestWorkerProgress(t *testing.T) { } // Create a task with a small timeout, and give it to the worker. - task := makeJob() - task.timeout = taskTimeout - - select { - case ctx.nextJob <- task: - case <-time.After(1 * time.Second): - t.Fatalf("did not pick up job") + type testResp struct { + name string + response *wire.MsgTx + err *error + unfinished bool } - // The request should be given to the peer. - select { - case <-ctx.peer.requests: - case <-time.After(time.Second): - t.Fatalf("request not sent") - } + testCases := []testResp{ - // Send a few other responses that indicates progress, but not success. - // We add a small delay between each time we send a response. In total - // the delay will be larger than the query timeout, but since we are - // making progress, the timeout won't trigger. - for i := 0; i < 5; i++ { - select { - case ctx.peer.responses <- progressResp: - case <-time.After(time.Second): - t.Fatalf("resp not received") - } + { + name: "final response", + response: finalResp, + }, - time.Sleep(taskTimeout / 2) - } + { + name: "final response, no progress", + response: noProgressNoFinalResp, + unfinished: true, + }, - // Finally send the final response. - select { - case ctx.peer.responses <- finalResp: - case <-time.After(time.Second): - t.Fatalf("resp not received") + { + name: "final response, with err", + response: finalRespWithErr, + err: &ErrResponseExistForQuery, + }, } - // The worker should respond with a job finised. - var result *jobResult - select { - case result = <-ctx.jobResults: - case <-time.After(time.Second): - t.Fatalf("response not received") - } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + task := makeJob() + task.timeout = taskTimeout - if result.err != nil { - t.Fatalf("expected no error, got: %v", result.err) - } + select { + case ctx.nextJob <- task: + case <-time.After(1 * time.Second): + t.Fatalf("did not pick up job") + } - // Make sure the QueryJob instance in the result is different from the initial one - // supplied to the worker - if result.job == task { - t.Fatalf("result's job should be different from the task's") - } + // The request should be given to the peer. + select { + case <-ctx.peer.requests: + case <-time.After(time.Second): + t.Fatalf("request not sent") + } - // Make sure we are receiving the corresponding result for the given task. - if result.job.Index() != task.Index() { - t.Fatalf("result's job index should not be different from task's") - } + // Send a few other responses that indicates progress, but not success. + // We add a small delay between each time we send a response. In total + // the delay will be larger than the query timeout, but since we are + // making progress, the timeout won't trigger. + for i := 0; i < 5; i++ { + select { + case ctx.peer.responses <- progressResp: + case <-time.After(time.Second): + t.Fatalf("resp not received") + } - // And the correct peer. - if result.peer != ctx.peer { - t.Fatalf("expected peer to be %v, was %v", - ctx.peer.Addr(), result.peer) + time.Sleep(taskTimeout / 2) + } + + // Finally send the final response. + select { + case ctx.peer.responses <- tc.response: + case <-time.After(time.Second): + t.Fatalf("resp not received") + } + + // The worker should respond with a job finished. + var result *jobResult + select { + case result = <-ctx.jobResults: + case <-time.After(time.Second): + t.Fatalf("response not received") + } + + if tc.err == nil && result.err != nil { + t.Fatalf("expected no error, got: %v", result.err) + } + + if tc.err != nil && result.err != *tc.err { + t.Fatalf("expected error, %v but got: %v", *tc.err, + result.err) + } + + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") + } + + // And the correct peer. + if result.peer != ctx.peer { + t.Fatalf("expected peer to be %v, was %v", + ctx.peer.Addr(), result.peer) + } + + // Make sure job does not return as unfinished. + if tc.unfinished && !result.unfinished { + t.Fatalf("expected job unfinished but got job finished") + } + + if !tc.unfinished && result.unfinished { + t.Fatalf("expected job finished but got unfinished job") + } + }) } } @@ -532,6 +612,11 @@ func TestWorkerJobCanceled(t *testing.T) { t.Fatalf("result's job index should not be different from task's") } + // Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + // And the correct peer. if result.peer != ctx.peer { t.Fatalf("expected peer to be %v, was %v", diff --git a/query/workmanager.go b/query/workmanager.go index 847f5311..2c95e1f5 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -26,7 +26,6 @@ var ( type batch struct { requests []*Request options *queryOptions - errChan chan error } // Worker is the interface that must be satisfied by workers managed by the @@ -94,6 +93,9 @@ type Config struct { // Ranking is used to rank the connected peers when determining who to // give work to. Ranking PeerRanking + + // IsEligibleWorkerFunc determines which peer is eligible to receive a job. + IsEligibleWorkerFunc func(r *activeWorker, next *queryJob) bool } // peerWorkManager is the main access point for outside callers, and satisfies @@ -192,7 +194,9 @@ func (w *peerWorkManager) workDispatcher() { // batches and send on their error channel. defer func() { for _, b := range currentBatches { - b.errChan <- ErrWorkManagerShuttingDown + if b.errChan != nil { + b.errChan <- ErrWorkManagerShuttingDown + } } }() @@ -212,7 +216,7 @@ Loop: next := work.Peek().(*queryJob) // Find the peers with free work slots available. - var freeWorkers []string + var freeEligibleWorkers []string for p, r := range workers { // Only one active job at a time is currently // supported. @@ -220,15 +224,24 @@ Loop: continue } - freeWorkers = append(freeWorkers, p) + // If there is a specified eligibilty function for + // the peer, use it to determine which peers we can + // send jobs to. + if w.cfg.IsEligibleWorkerFunc != nil { + if !w.cfg.IsEligibleWorkerFunc(r, next) { + continue + } + } + + freeEligibleWorkers = append(freeEligibleWorkers, p) } // Use the historical data to rank them. - w.cfg.Ranking.Order(freeWorkers) + w.cfg.Ranking.Order(freeEligibleWorkers) // Give the job to the highest ranked peer with free // slots available. - for _, p := range freeWorkers { + for _, p := range freeEligibleWorkers { r := workers[p] // The worker has free work slots, it should @@ -322,13 +335,19 @@ Loop: // batch's error channel. We do this since a // cancellation applies to the whole batch. if batch != nil { - batch.errChan <- result.err + if batch.errChan != nil { + batch.errChan <- result.err + } delete(currentBatches, batchNum) log.Debugf("Canceled batch %v", batchNum) continue Loop } + // If response exists for query continue loop. + case result.err == ErrResponseExistForQuery: + log.Debugf("received response exist for query") + continue Loop // If the query ended with any other error, put it back // into the work queue if it has not reached the @@ -356,7 +375,9 @@ Loop: // Return the error and cancel the // batch. - batch.errChan <- result.err + if batch.errChan != nil { + batch.errChan <- result.err + } delete(currentBatches, batchNum) log.Debugf("Canceled batch %v", @@ -388,6 +409,19 @@ Loop: // Reward the peer for the successful query. w.cfg.Ranking.Reward(result.peer.Addr()) + // If the result is unfinished add 0.0005 to the job index to maintain the + // required priority then push to work queue + if result.unfinished { + result.job.index = result.job.Index() + 0.0005 + log.Debugf("job %v is unfinished, creating new index", result.job.Index()) + + heap.Push(work, result.job) + batch.rem++ + currentQueries[result.job.Index()] = batchNum + } else { + log.Debugf("job %v is Finished", result.job.Index()) + } + // Decrement the number of queries remaining in // the batch. if batch != nil { @@ -399,7 +433,9 @@ Loop: // for this batch, we can notify that // it finished, and delete it. if batch.rem == 0 { - batch.errChan <- nil + if batch.errChan != nil { + batch.errChan <- nil + } delete(currentBatches, batchNum) log.Tracef("Batch %v done", @@ -414,7 +450,9 @@ Loop: if batch != nil { select { case <-batch.timeout: - batch.errChan <- ErrQueryTimeout + if batch.errChan != nil { + batch.errChan <- ErrQueryTimeout + } delete(currentBatches, batchNum) log.Warnf("Query(%d) failed with "+ @@ -463,7 +501,7 @@ Loop: maxRetries: batch.options.numRetries, timeout: time.After(batch.options.timeout), rem: len(batch.requests), - errChan: batch.errChan, + errChan: batch.options.errChan, } batchIndex++ @@ -482,18 +520,19 @@ func (w *peerWorkManager) Query(requests []*Request, qo := defaultQueryOptions() qo.applyQueryOptions(options...) - errChan := make(chan error, 1) + newBatch := &batch{ + requests: requests, + options: qo, + } // Add query messages to the queue of batches to handle. select { - case w.newBatches <- &batch{ - requests: requests, - options: qo, - errChan: errChan, - }: + case w.newBatches <- newBatch: case <-w.quit: - errChan <- ErrWorkManagerShuttingDown + if newBatch.options.errChan != nil { + newBatch.options.errChan <- ErrWorkManagerShuttingDown + } } - return errChan + return newBatch.options.errChan } diff --git a/query/workmanager_test.go b/query/workmanager_test.go index 940ea2d1..33116ac0 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -138,7 +138,7 @@ func TestWorkManagerWorkDispatcherSingleWorker(t *testing.T) { queries = append(queries, q) } - errChan := wm.Query(queries) + errChan := wm.Query(queries, ErrChan(make(chan error, 1))) wk := workers[0] @@ -226,7 +226,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries[:]) + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) var jobs [numQueries]sched for i := 0; i < numQueries; i++ { @@ -346,7 +346,7 @@ func TestWorkManagerErrQueryTimeout(t *testing.T) { } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries[:]) + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) var iter int var s sched @@ -422,7 +422,7 @@ func TestWorkManagerCancelBatch(t *testing.T) { // Send the query, and include a channel to cancel the batch. cancelChan := make(chan struct{}) - errChan := wm.Query(queries, Cancel(cancelChan)) + errChan := wm.Query(queries, Cancel(cancelChan), ErrChan(make(chan error, 1))) // Respond with a result to half of the queries. for i := 0; i < numQueries/2; i++ { @@ -513,7 +513,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries) + errChan := wm.Query(queries, ErrChan(make(chan error, 1))) // The 4 first workers should get the job. var jobs []*queryJob @@ -576,7 +576,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { } queries = append(queries, q) } - _ = wm.Query(queries) + _ = wm.Query(queries, ErrChan(make(chan error, 1))) // The new jobs should be scheduled on the even numbered workers. for i := 0; i < len(workers); i += 2 { @@ -645,7 +645,7 @@ func TestWorkManagerSchedulePriorityIndex(t *testing.T) { } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries[:]) + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) var jobs [numQueries]sched for i := uint64(0); i < numQueries; i++ { @@ -705,3 +705,275 @@ func TestWorkManagerSchedulePriorityIndex(t *testing.T) { t.Fatalf("nothing received on errChan") } } + +// TestPeerWorkManager_Stop tests the workmanager shutdown. +func TestPeerWorkManager_Stop(t *testing.T) { + const numQueries = 5 + + wm, _ := startWorkManager(t, 0) + + createRequest := func(numQuery int) []*Request { + var queries []*Request + for i := 0; i < numQuery; i++ { + q := &Request{ + Req: &mockQueryEncoded{}, + } + queries = append(queries, q) + } + + return queries + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(createRequest(numQueries), ErrChan(make(chan error, 1))) + errChan2 := wm.Query(createRequest(numQueries)) + + if errChan2 != nil { + t.Fatalf("expected Query call without ErrChan option func to return" + + "niil errChan") + } + + errChan3 := make(chan error, 1) + go func() { + err := wm.Stop() + + errChan3 <- err + }() + + select { + case <-errChan: + case <-time.After(time.Second): + t.Fatalf("expected error workmanager shutting down") + } + + select { + case err := <-errChan3: + if err != nil { + t.Fatalf("unexpected error while stopping workmanager: %v", err) + } + case <-time.After(time.Second): + t.Fatalf("workmanager stop functunction should return error") + } +} + +// TestWorkManagerErrResponseExistForQuery tests a scenario in which a workmanager handles +// an ErrResponseExistForQuery. +func TestWorkManagerErrResponseExistForQuery(t *testing.T) { + const numQueries = 5 + + // Start work manager with as many workers as queries. This is not very + // realistic, but makes the work manager able to schedule all queries + // concurrently. + wm, workers := startWorkManager(t, numQueries) + + // When the jobs gets scheduled, keep track of which worker was + // assigned the job. + type sched struct { + wk *mockWorker + job *queryJob + } + + // Schedule a batch of queries. + var ( + queries [numQueries]*Request + scheduledJobs [numQueries]chan sched + ) + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &mockQueryEncoded{}, + } + queries[i] = q + scheduledJobs[i] = make(chan sched) + } + + // Fot each worker, spin up a goroutine that will forward the job it + // got to our slice of scheduled jobs, such that we can handle them in + // order. + for i := 0; i < len(workers); i++ { + wk := workers[i] + go func() { + for { + job := <-wk.nextJob + scheduledJobs[int(job.index)] <- sched{ + wk: wk, + job: job, + } + } + }() + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) + var jobs [numQueries]sched + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + if s.job.index != float64(i) { + t.Fatalf("wrong index") + } + + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + + jobs[int(s.job.index)] = s + } + + // Go backwards, and make half of it return with an ErrResponseExistForQuery. + for i := numQueries - 1; i >= 0; i-- { + select { + case jobs[i].wk.results <- &jobResult{ + job: jobs[i].job, + err: ErrResponseExistForQuery, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + } + + // Finally, make sure the failed jobs are not retried. + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + t.Fatalf("did not expect any retried job but job"+ + "%v\n retried", s.job.index) + case <-errChan: + t.Fatalf("did not expect an errChan") + case <-time.After(time.Second): + } + } +} + +// TestWorkManagerResultUnfinished tests the workmanager handling a result with an unfinished boolean set +// to true. +func TestWorkManagerResultUnfinished(t *testing.T) { + const numQueries = 10 + + // Start work manager with as many workers as queries. This is not very + // realistic, but makes the work manager able to schedule all queries + // concurrently. + wm, workers := startWorkManager(t, numQueries) + + // When the jobs gets scheduled, keep track of which worker was + // assigned the job. + type sched struct { + wk *mockWorker + job *queryJob + } + + // Schedule a batch of queries. + var ( + queries [numQueries]*Request + scheduledJobs [numQueries]chan sched + ) + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &mockQueryEncoded{}, + } + queries[i] = q + scheduledJobs[i] = make(chan sched) + } + + // Fot each worker, spin up a goroutine that will forward the job it + // got to our slice of scheduled jobs, such that we can handle them in + // order. + for i := 0; i < len(workers); i++ { + wk := workers[i] + go func() { + for { + job := <-wk.nextJob + scheduledJobs[int(job.index)] <- sched{ + wk: wk, + job: job, + } + } + }() + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) + var jobs [numQueries]sched + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + if s.job.index != float64(i) { + t.Fatalf("wrong index") + } + + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + + jobs[int(s.job.index)] = s + } + + // Go backwards, and make half of it unfinished. + for i := numQueries - 1; i >= 0; i-- { + var ( + unfinished bool + ) + if i%2 == 0 { + unfinished = true + } + + select { + case jobs[i].wk.results <- &jobResult{ + job: jobs[i].job, + unfinished: unfinished, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + } + + // Finally, make sure the failed jobs are being retried, in the same + // order as they were originally scheduled. + for i := 0; i < numQueries; i += 2 { + var s sched + select { + case s = <-scheduledJobs[i]: + + // The new tindex the job should have. + idx := float64(i) + 0.0005 + if idx != s.job.index { + t.Fatalf("expected index %v for job"+ + "but got, %v\n", idx, s.job.index) + } + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + select { + case s.wk.results <- &jobResult{ + job: s.job, + err: nil, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + } + + // The query should ultimately succeed. + select { + case err := <-errChan: + if err != nil { + t.Fatalf("got error: %v", err) + } + case <-time.After(time.Second): + t.Fatalf("nothing received on errChan") + } +} diff --git a/query_test.go b/query_test.go index bb45deed..c01dba85 100644 --- a/query_test.go +++ b/query_test.go @@ -324,8 +324,10 @@ func TestBlockCache(t *testing.T) { Header: *header, Transactions: b.MsgBlock().Transactions, } - - progress := reqs[0].HandleResp(getData, resp, "") + var jobErr error + progress := reqs[0].HandleResp(&encodedQuery{ + message: getData, + }, resp, nil, &jobErr) require.True(t, progress.Progressed) require.True(t, progress.Finished)