From d983c7349586f45a6cdb4cd671766cdc629af615 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 10 Nov 2021 13:16:13 +0200 Subject: [PATCH 1/4] neutrino+query: use work dispatcher for GetBlock Use the work dipatcher interface from the query package to make getdata requests instead of using the old queryPeers function. --- neutrino.go | 3 ++ query.go | 135 +++++++++++++++++++++++++++++--------------------- query_test.go | 41 +++++++++------ 3 files changed, 108 insertions(+), 71 deletions(-) diff --git a/neutrino.go b/neutrino.go index 2e2abf6a..8df13ff0 100644 --- a/neutrino.go +++ b/neutrino.go @@ -597,6 +597,7 @@ type ChainService struct { // nolint:maligned utxoScanner *UtxoScanner broadcaster *pushtx.Broadcaster banStore banman.Store + queryDispatcher query.Dispatcher workManager *query.WorkManager // peerSubscribers is a slice of active peer subscriptions, that we @@ -679,6 +680,8 @@ func NewChainService(cfg Config) (*ChainService, error) { Ranking: query.NewPeerRanking(), }) + s.queryDispatcher = s.workManager + // We set the queryPeers method to point to queryChainServicePeers, // passing a reference to the newly created ChainService. s.queryPeers = func(msg wire.Message, f func(*ServerPeer, diff --git a/query.go b/query.go index c4990d65..6388543e 100644 --- a/query.go +++ b/query.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/lightninglabs/neutrino/query" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" @@ -947,75 +949,96 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, getData := wire.NewMsgGetData() _ = getData.AddInvVect(inv) - // The block is only updated from the checkResponse function argument, - // which is always called single-threadedly. We don't check the block - // until after the query is finished, so we can just write to it - // naively. var foundBlock *btcutil.Block - s.queryPeers( - // Send a wire.GetDataMsg - getData, - - // Check responses and if we get one that matches, end the - // query early. - func(sp *ServerPeer, resp wire.Message, - quit chan<- struct{}) { - switch response := resp.(type) { - // We're only interested in "block" messages. - case *wire.MsgBlock: - // Only keep this going if we haven't already - // found a block, or we risk closing an already - // closed channel. - if foundBlock != nil { - return + request := &query.Request{ + Req: getData, + HandleResp: func(req, resp wire.Message, peer string) query.Progress { + // The request must have been a "getdata" msg. + _, ok := req.(*wire.MsgGetData) + if !ok { + return query.Progress{ + Finished: false, + Progressed: false, } + } - // If this isn't our block, ignore it. - if response.BlockHash() != blockHash { - return + // We're only interested in "block" messages. + response, ok := resp.(*wire.MsgBlock) + if !ok { + return query.Progress{ + Finished: false, + Progressed: false, } - block := btcutil.NewBlock(response) + } - // Only set height if btcutil hasn't - // automagically put one in. - if block.Height() == btcutil.BlockHeightUnknown { - block.SetHeight(int32(height)) + // If this isn't the block we asked for, ignore it. + if response.BlockHash() != blockHash { + return query.Progress{ + Finished: false, + Progressed: false, } + } - // If this claims our block but doesn't pass - // the sanity check, the peer is trying to - // bamboozle us. Disconnect it. - if err := blockchain.CheckBlockSanity( - block, - // We don't need to check PoW because - // by the time we get here, it's been - // checked during header - // synchronization - s.chainParams.PowLimit, - s.timeSource, - ); err != nil { - log.Warnf("Invalid block for %s "+ - "received from %s -- "+ - "disconnecting peer", blockHash, - sp.Addr()) - sp.Disconnect() - return + block := btcutil.NewBlock(response) + + // Only set height if btcutil hasn't + // automagically put one in. + if block.Height() == btcutil.BlockHeightUnknown { + block.SetHeight(int32(height)) + } + + // If this claims our block but doesn't pass + // the sanity check, the peer is trying to + // bamboozle us. + if err := blockchain.CheckBlockSanity( + block, + // We don't need to check PoW because + // by the time we get here, it's been + // checked during header + // synchronization + s.chainParams.PowLimit, + s.timeSource, + ); err != nil { + log.Warnf("Invalid block for %s "+ + "received from %s -- ", + blockHash, peer) + fmt.Println(err) + + return query.Progress{ + Finished: false, + Progressed: false, } + } - // TODO(roasbeef): modify CheckBlockSanity to - // also check witness commitment + // TODO(roasbeef): modify CheckBlockSanity to + // also check witness commitment - // At this point, the block matches what we - // know about it and we declare it sane. We can - // kill the query and pass the response back to - // the caller. - foundBlock = block - close(quit) - default: + // At this point, the block matches what we + // know about it and we declare it sane. We can + // kill the query and pass the response back to + // the caller. + foundBlock = block + return query.Progress{ + Finished: true, + Progressed: true, } }, - options..., + } + + errChan := s.queryDispatcher.Query( + []*query.Request{request}, query.Encoding(qo.encoding), + query.Cancel(s.quit), ) + + select { + case err := <-errChan: + if err != nil { + return nil, err + } + case <-s.quit: + return nil, ErrShuttingDown + } + if foundBlock == nil { return nil, fmt.Errorf("couldn't retrieve block %s from "+ "network", blockHash) diff --git a/query_test.go b/query_test.go index 378c83b6..8c6e7ca4 100644 --- a/query_test.go +++ b/query_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/lightninglabs/neutrino/query" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -280,19 +282,27 @@ func TestBlockCache(t *testing.T) { chainParams: chaincfg.Params{ PowLimit: maxPowLimit, }, - timeSource: blockchain.NewMedianTime(), + timeSource: blockchain.NewMedianTime(), + queryDispatcher: &mockDispatcher{}, } // We'll set up the queryPeers method to make sure we are only querying // for blocks, and send the block hashes queried over the queries // channel. queries := make(chan chainhash.Hash, 1) - cs.queryPeers = func(msg wire.Message, f func(*ServerPeer, - wire.Message, chan<- struct{}), qo ...QueryOption) { + cs.queryDispatcher.(*mockDispatcher).query = func(requests []*query.Request, + options ...query.QueryOption) chan error { + + errChan := make(chan error, 1) + defer close(errChan) - getData, ok := msg.(*wire.MsgGetData) + if len(requests) != 1 { + t.Fatalf("unexpected 1 request, got %d", len(requests)) + } + + getData, ok := requests[0].Req.(*wire.MsgGetData) if !ok { - t.Fatalf("unexpected type: %T", msg) + t.Fatalf("unexpected type: %T", requests[0].Req) } if len(getData.InvList) != 1 { @@ -308,18 +318,18 @@ func TestBlockCache(t *testing.T) { // Serve the block that matches the requested block header. for _, b := range blocks { if *b.Hash() == inv.Hash { + header, _, err := headers.FetchHeader(b.Hash()) + if err != nil { + t.Fatalf("") + } - // Execute the callback with the found block, - // and wait for the quit channel to be closed. - quit := make(chan struct{}) - f(nil, b.MsgBlock(), quit) - - select { - case <-quit: - case <-time.After(1 * time.Second): - t.Fatalf("channel not closed") + resp := &wire.MsgBlock{ + Header: *header, + Transactions: b.MsgBlock().Transactions, } + requests[0].HandleResp(requests[0].Req, resp, "") + // Notify the test about the query. select { case queries <- inv.Hash: @@ -327,11 +337,12 @@ func TestBlockCache(t *testing.T) { t.Fatalf("query was not handled") } - return + return errChan } } t.Fatalf("queried for unknown block: %v", inv.Hash) + return errChan } // fetchAndAssertPeersQueried calls GetBlock and makes sure the block From fd76d6feefd0c259da3824846a83f3df47fb730c Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 10 Nov 2021 13:22:32 +0200 Subject: [PATCH 2/4] neutrino+query: use work dispatcher for GetCFilter Use the work dispatcher query interface instead of the old queryPeers method for making getcfilter requests. This ensures that the queries are made to the most responsive peers. With this PR we can also remove the queryPeers function. --- neutrino.go | 12 -- query.go | 563 ++++++++++++++++++++++------------------------------ 2 files changed, 241 insertions(+), 334 deletions(-) diff --git a/neutrino.go b/neutrino.go index 8df13ff0..a4a017ec 100644 --- a/neutrino.go +++ b/neutrino.go @@ -575,11 +575,6 @@ type ChainService struct { // nolint:maligned FilterCache *lru.Cache BlockCache *lru.Cache - // queryPeers will be called to send messages to one or more peers, - // expecting a response. - queryPeers func(wire.Message, func(*ServerPeer, wire.Message, - chan<- struct{}), ...QueryOption) - chainParams chaincfg.Params addrManager *addrmgr.AddrManager connManager *connmgr.ConnManager @@ -682,13 +677,6 @@ func NewChainService(cfg Config) (*ChainService, error) { s.queryDispatcher = s.workManager - // We set the queryPeers method to point to queryChainServicePeers, - // passing a reference to the newly created ChainService. - s.queryPeers = func(msg wire.Message, f func(*ServerPeer, - wire.Message, chan<- struct{}), qo ...QueryOption) { - queryChainServicePeers(&s, msg, f, qo...) - } - var err error s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams) diff --git a/query.go b/query.go index 6388543e..99d1b98b 100644 --- a/query.go +++ b/query.go @@ -335,168 +335,6 @@ checkResponses: } } -// queryChainServicePeers is a helper function that sends a query to one or -// more peers of the given ChainService, and waits for an answer. The timeout -// for queries is set by the QueryTimeout package-level variable or the Timeout -// functional option. -func queryChainServicePeers( - // s is the ChainService to use. - s *ChainService, - - // queryMsg is the message to send to each peer selected by selectPeer. - queryMsg wire.Message, - - // checkResponse is called for every message within the timeout period. - // The quit channel lets the query know to terminate because the - // required response has been found. This is done by closing the - // channel. - checkResponse func(sp *ServerPeer, resp wire.Message, - quit chan<- struct{}), - - // options takes functional options for executing the query. - options ...QueryOption) { - - // Starting with the set of default options, we'll apply any specified - // functional options to the query. - qo := defaultQueryOptions() - qo.applyQueryOptions(options...) - - // We get an initial view of our peers, to be updated each time a peer - // query times out. - queryPeer := s.blockManager.SyncPeer() - peerTries := make(map[string]uint8) - - // This will be state used by the peer query goroutine. - queryQuit := make(chan struct{}) - subQuit := make(chan struct{}) - - // Increase this number to be able to handle more queries at once as - // each channel gets results for all queries, otherwise messages can - // get mixed and there's a vicious cycle of retries causing a bigger - // message flood, more of which get missed. - msgChan := make(chan spMsg) - subscription := spMsgSubscription{ - msgChan: msgChan, - quitChan: subQuit, - } - - // Loop for any messages sent to us via our subscription channel and - // check them for whether they satisfy the query. Break the loop if - // it's time to quit. - peerTimeout := time.NewTimer(qo.timeout) - connectionTimeout := time.NewTimer(qo.peerConnectTimeout) - connectionTicker := connectionTimeout.C - if queryPeer != nil { - peerTries[queryPeer.Addr()]++ - queryPeer.subscribeRecvMsg(subscription) - queryPeer.QueueMessageWithEncoding(queryMsg, nil, qo.encoding) - } -checkResponses: - for { - select { - case <-connectionTicker: - // When we time out, we're done. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - case <-queryQuit: - // Same when we get a quit signal. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - case <-s.quit: - // Same when chain server's quit is signaled. - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - break checkResponses - - // A message has arrived over the subscription channel, so we - // execute the checkResponses callback to see if this ends our - // query session. - case sm := <-msgChan: - // TODO: This will get stuck if checkResponse gets - // stuck. This is a caveat for callers that should be - // fixed before exposing this function for public use. - checkResponse(sm.sp, sm.msg, queryQuit) - - // Each time we receive a response from the current - // peer, we'll reset the main peer timeout as they're - // being responsive. - if !peerTimeout.Stop() { - select { - case <-peerTimeout.C: - default: - } - } - peerTimeout.Reset(qo.timeout) - - // Also at this point, if the peerConnectTimeout is - // still active, then we can disable it, as we're - // receiving responses from the current peer. - if connectionTicker != nil && !connectionTimeout.Stop() { - select { - case <-connectionTimeout.C: - default: - } - } - connectionTicker = nil - - // The current peer we're querying has failed to answer the - // query. Time to select a new peer and query it. - case <-peerTimeout.C: - if queryPeer != nil { - queryPeer.unsubscribeRecvMsgs(subscription) - } - - queryPeer = nil - for _, peer := range s.Peers() { - // If the peer is no longer connected, we'll - // skip them. - if !peer.Connected() { - continue - } - - // If we've yet to try this peer, we'll make - // sure to do so. If we've exceeded the number - // of tries we should retry this peer, then - // we'll skip them. - numTries, ok := peerTries[peer.Addr()] - if ok && numTries >= qo.numRetries { - continue - } - - queryPeer = peer - - // Found a peer we can query. - peerTries[queryPeer.Addr()]++ - queryPeer.subscribeRecvMsg(subscription) - queryPeer.QueueMessageWithEncoding( - queryMsg, nil, qo.encoding, - ) - break - } - - // If at this point, we don't yet have a query peer, - // then we'll exit now as all the peers are exhausted. - if queryPeer == nil { - break checkResponses - } - } - } - - // Close the subscription quit channel and the done channel, if any. - close(subQuit) - peerTimeout.Stop() - if qo.doneChan != nil { - close(qo.doneChan) - } -} - // getFilterFromCache returns a filter from ChainService's FilterCache if it // exists, returning nil and error if it doesn't. func (s *ChainService) getFilterFromCache(blockHash *chainhash.Hash, @@ -529,29 +367,153 @@ func (s *ChainService) putFilterToCache(blockHash *chainhash.Hash, // cfiltersQuery is a struct that holds all the information necessary to // perform batch GetCFilters request, and handle the responses. type cfiltersQuery struct { - filterType wire.FilterType - startHeight int64 - stopHeight int64 - stopHash *chainhash.Hash - filterHeaders []chainhash.Hash - headerIndex map[chainhash.Hash]int - targetHash chainhash.Hash - filterChan chan *gcs.Filter - options []QueryOption + filterType wire.FilterType + startHeight int64 + stopHeight int64 + stopHash *chainhash.Hash + msg wire.Message + filterHeaders []chainhash.Hash + headerIndex map[chainhash.Hash]int + filterChan chan *filterResponse + targetHash chainhash.Hash + targetFilterChan chan *gcs.Filter } -// queryMsg returns the wire message to perform this query. -func (q *cfiltersQuery) queryMsg() wire.Message { - return wire.NewMsgGetCFilters( - q.filterType, uint32(q.startHeight), q.stopHash, +// request couples a query message with the handler to be used for the response +// in a query.Request struct. +func (q *cfiltersQuery) request() *query.Request { + return &query.Request{ + Req: q.msg, + HandleResp: q.handleResponse, + } +} + +// handleResponse validates that the cfilter response we get from a peer is +// sane given the getcfilter query that we made. +func (q *cfiltersQuery) handleResponse(req, resp wire.Message, + _ string) query.Progress { + + // The request must have been a "getcfilters" msg. + request, ok := req.(*wire.MsgGetCFilters) + if !ok { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // We're only interested in "cfilter" messages. + response, ok := resp.(*wire.MsgCFilter) + if !ok { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If the request filter type doesn't match the type we were expecting, + // ignore this message. + if q.filterType != request.FilterType { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If the response filter type doesn't match what we were expecting, + // ignore this message. + if q.filterType != response.FilterType { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If this filter is for a block not in our index, we can ignore it, as + // we either already got it, or it is out of our queried range. + i, ok := q.headerIndex[response.BlockHash] + if !ok { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + filter, err := gcs.FromNBytes( + builder.DefaultP, builder.DefaultM, response.Data, ) + if err != nil { + // Malformed filter data. We can ignore this message. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // Now that we have a proper filter, ensure that re-calculating the + // filter header hash for the header _after_ the filter in the chain + // checks out. If not, we can ignore this response. + curHeader := q.filterHeaders[i] + prevHeader := q.filterHeaders[i-1] + filterHeader, err := builder.MakeHeaderForFilter(filter, prevHeader) + if err != nil { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + if filterHeader != curHeader { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // At this point the filter matches what we know about it, and we + // declare it sane. We send it into a channel to be processed elsewhere. + q.filterChan <- &filterResponse{ + blockHash: &response.BlockHash, + filter: filter, + } + + if response.BlockHash == q.targetHash { + q.targetFilterChan <- filter + } + + // We delete the entry for this filter from the headerIndex to indicate + // that we have received it. + delete(q.headerIndex, response.BlockHash) + + // If there are still entries left in the headerIndex then the query + // has made progress but has not yet completed. + if len(q.headerIndex) != 0 { + return query.Progress{ + Finished: false, + Progressed: true, + } + } + + // The headerIndex is empty and so this query is complete. + close(q.filterChan) + close(q.targetFilterChan) + return query.Progress{ + Finished: true, + Progressed: true, + } +} + +// filterResponse links a filter with its associate block hash. +type filterResponse struct { + blockHash *chainhash.Hash + filter *gcs.Filter } // prepareCFiltersQuery creates a cfiltersQuery that can be used to fetch a // CFilter fo the given block hash. func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, - filterType wire.FilterType, options ...QueryOption) ( - *cfiltersQuery, error) { + filterType wire.FilterType, optimisticBatch optimisticBatchType, + maxBatchSize int64) (*cfiltersQuery, error) { _, height, err := s.BlockHeaders.FetchHeader(&blockHash) if err != nil { @@ -565,9 +527,6 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, } bestHeight := int64(bestBlock.Height) - qo := defaultQueryOptions() - qo.applyQueryOptions(options...) - // If the query specifies an optimistic batch we will attempt to fetch // the maximum number of filters, which is defaulted to // wire.MaxGetCFiltersReqRange, in anticipation of calls for the following @@ -577,11 +536,11 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, // If the query specifies a maximum batch size, we will limit the number of // requested filters accordingly. - if qo.maxBatchSize > 0 && qo.maxBatchSize < wire.MaxGetCFiltersReqRange { - batchSize = qo.maxBatchSize + if maxBatchSize > 0 && maxBatchSize < wire.MaxGetCFiltersReqRange { + batchSize = maxBatchSize } - switch qo.optimisticBatch { + switch optimisticBatch { // No batching, the start and stop height will be the same. case noBatch: @@ -593,18 +552,10 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, startHeight = int64(height) stopHeight = startHeight + batchSize - 1 - // We need a longer timeout, since we are going to receive more - // than a single response. - options = append(options, Timeout(QueryBatchTimeout)) - // Reverse batch, fetch as many of the preceding filters as possible. case reverseBatch: stopHeight = int64(height) startHeight = stopHeight - batchSize + 1 - - // We need a longer timeout, since we are going to receive more - // than a single response. - options = append(options, Timeout(QueryBatchTimeout)) } // Block 1 is the earliest one we can fetch. @@ -624,6 +575,10 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, "stopHeight=%d: %v", stopHeight, err) } + queryMsg := wire.NewMsgGetCFilters( + filterType, uint32(startHeight), stopHash, + ) + // In order to verify the authenticity of the received filters, we'll // fetch the block headers and filter headers in the range // [startHeight-1, stopHeight]. We go one below our startHeight since @@ -675,118 +630,19 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, filterChan := make(chan *gcs.Filter, 1) return &cfiltersQuery{ - filterType: filterType, - startHeight: startHeight, - stopHeight: stopHeight, - stopHash: stopHash, - filterHeaders: filterHeaders, - headerIndex: headerIndex, - targetHash: blockHash, - filterChan: filterChan, - options: options, + filterType: filterType, + startHeight: startHeight, + stopHeight: stopHeight, + stopHash: stopHash, + msg: queryMsg, + filterHeaders: filterHeaders, + targetHash: blockHash, + headerIndex: headerIndex, + filterChan: make(chan *filterResponse, numFilters), + targetFilterChan: filterChan, }, nil } -// handleCFiltersRespons is called every time we receive a response for the -// GetCFilters request. -func (s *ChainService) handleCFiltersResponse(q *cfiltersQuery, - resp wire.Message, quit chan<- struct{}) { - - // We're only interested in "cfilter" messages. - response, ok := resp.(*wire.MsgCFilter) - if !ok { - return - } - - // If the response doesn't match our request, ignore this message. - if q.filterType != response.FilterType { - return - } - - // If this filter is for a block not in our index, we can ignore it, as - // we either already got it, or it is out of our queried range. - i, ok := q.headerIndex[response.BlockHash] - if !ok { - return - } - - gotFilter, err := gcs.FromNBytes( - builder.DefaultP, builder.DefaultM, response.Data, - ) - if err != nil { - // Malformed filter data. We can ignore this message. - return - } - - // Now that we have a proper filter, ensure that re-calculating the - // filter header hash for the header _after_ the filter in the chain - // checks out. If not, we can ignore this response. - curHeader := q.filterHeaders[i] - prevHeader := q.filterHeaders[i-1] - gotHeader, err := builder.MakeHeaderForFilter( - gotFilter, prevHeader, - ) - if err != nil { - return - } - - if gotHeader != curHeader { - return - } - - // At this point, the filter matches what we know about it and we - // declare it sane. If this is the filter requested initially, send it - // to the caller immediately. - if response.BlockHash == q.targetHash { - q.filterChan <- gotFilter - } - - // Put the filter in the cache and persistToDisk if the caller - // requested it. - // TODO(halseth): for an LRU we could take care to insert the next - // height filter last. - dbFilterType := filterdb.RegularFilter - evict, err := s.putFilterToCache( - &response.BlockHash, dbFilterType, gotFilter, - ) - if err != nil { - log.Warnf("Couldn't write filter to cache: %v", err) - } - - // TODO(halseth): dynamically increase/decrease the batch size to match - // our cache capacity. - numFilters := q.stopHeight - q.startHeight + 1 - if evict && s.FilterCache.Len() < int(numFilters) { - log.Debugf("Items evicted from the cache with less "+ - "than %d elements. Consider increasing the "+ - "cache size...", numFilters) - } - - qo := defaultQueryOptions() - qo.applyQueryOptions(q.options...) - if s.persistToDisk { - err = s.FilterDB.PutFilter( - &response.BlockHash, gotFilter, dbFilterType, - ) - if err != nil { - log.Warnf("Couldn't write filter to filterDB: "+ - "%v", err) - } - - log.Tracef("Wrote filter for block %s, type %d", - &response.BlockHash, dbFilterType) - } - - // Finally, we can delete it from the headerIndex. - delete(q.headerIndex, response.BlockHash) - - // If the headerIndex is empty, we got everything we wanted, and can - // exit. - if len(q.headerIndex) == 0 { - close(quit) - } -} - // GetCFilter gets a cfilter from the database. Failing that, it requests the // cfilter from the network and writes it to the database. If extended is true, // an extended filter will be queried for. Otherwise, we'll fetch the regular @@ -840,9 +696,14 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, return nil, err } + qo := defaultQueryOptions() + qo.applyQueryOptions(options...) + // We didn't get the filter from the DB, so we'll try to get it from // the network. - query, err := s.prepareCFiltersQuery(blockHash, filterType, options...) + q, err := s.prepareCFiltersQuery( + blockHash, filterType, qo.optimisticBatch, qo.maxBatchSize, + ) if err != nil { s.mtxCFilter.Unlock() return nil, err @@ -851,31 +712,89 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, // With all the necessary items retrieved, we'll launch our concurrent // query to the set of connected peers. log.Debugf("Fetching filters for heights=[%v, %v], stophash=%v", - query.startHeight, query.stopHeight, query.stopHash) + q.startHeight, q.stopHeight, q.stopHash) go func() { defer s.mtxCFilter.Unlock() - defer close(query.filterChan) - - s.queryPeers( - // Send a wire.MsgGetCFilters. - query.queryMsg(), - - // Check responses and if we get one that matches, end - // the query early. - func(_ *ServerPeer, resp wire.Message, quit chan<- struct{}) { - s.handleCFiltersResponse(query, resp, quit) - }, - query.options..., + + // Hand the query to the work manager, and consume the verified + // responses as they come back. + errChan := s.queryDispatcher.Query( + []*query.Request{q.request()}, query.Cancel(s.quit), + query.Encoding(qo.encoding), + ) + + var ( + resp *filterResponse + ok bool ) - // If there are elements left to receive, the query failed. - if len(query.headerIndex) > 0 { - numFilters := query.stopHeight - query.startHeight + 1 - numRecv := numFilters - int64(len(query.headerIndex)) - log.Errorf("Query failed with %d out of %d filters "+ - "received", numRecv, numFilters) - return + for { + select { + case resp, ok = <-q.filterChan: + if !ok { + // If filterChan is closed then the + // query has finished successfully. + return + } + + case err := <-errChan: + switch { + case err == query.ErrWorkManagerShuttingDown: + return + + case err != nil: + log.Errorf("Query finished with "+ + "error before all responses "+ + "received: %v", err) + return + } + + // The query did finish successfully, but + // continue to allow picking up the last filter + // sent on the filterChan. + continue + + case <-s.quit: + return + } + + // Put the filter in the cache and persistToDisk if the + // caller requested it. + // TODO(halseth): for an LRU we could take care to + // insert the next height filter last. + dbFilterType := filterdb.RegularFilter + evict, err := s.putFilterToCache( + resp.blockHash, dbFilterType, resp.filter, + ) + if err != nil { + log.Warnf("Couldn't write filter to cache: %v", + err) + } + + // TODO(halseth): dynamically increase/decrease the + // batch size to match our cache capacity. + numFilters := q.stopHeight - q.startHeight + 1 + if evict && s.FilterCache.Len() < int(numFilters) { + log.Debugf("Items evicted from the cache "+ + "with less than %d elements. Consider "+ + "increasing the cache size...", + numFilters) + } + + if s.persistToDisk { + err = s.FilterDB.PutFilter( + resp.blockHash, resp.filter, + dbFilterType, + ) + if err != nil { + log.Warnf("Couldn't write filter to "+ + "filterDB: %v", err) + } + + log.Tracef("Wrote filter for block %s, type %d", + resp.blockHash, dbFilterType) + } } }() @@ -887,7 +806,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, for { select { - case filter, ok = <-query.filterChan: + case filter, ok = <-q.targetFilterChan: if !ok { // Query has finished, if we have a result we'll // return it. From 2b2c363e64c69c5d052c53589a666b19138a720c Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 11 Nov 2021 10:09:00 +0200 Subject: [PATCH 3/4] query: separate goroutine for filter db writes Let the GetCFilter function return, and hence unlock the mutex, as soon as it is done writing filters to the cache and then let the writing to the DB happen in a separate goroutine. This greatly improves the speed at which filters can be downloaded since the bottle neck in this operation is writing the filters to the db. --- query.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/query.go b/query.go index 99d1b98b..d598fdf2 100644 --- a/query.go +++ b/query.go @@ -714,7 +714,9 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, log.Debugf("Fetching filters for heights=[%v, %v], stophash=%v", q.startHeight, q.stopHeight, q.stopHash) + persistChan := make(chan *filterResponse, len(q.headerIndex)) go func() { + defer close(persistChan) defer s.mtxCFilter.Unlock() // Hand the query to the work manager, and consume the verified @@ -782,7 +784,33 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, numFilters) } - if s.persistToDisk { + persistChan <- resp + } + }() + + if s.persistToDisk { + // Persisting to disk is the bottleneck for fetching filters. + // So we run the persisting logic in a separate goroutine so + // that we can unlock the mtxCFilter mutex as soon as we are + // done with caching the filters in order to allow more + // GetCFilter calls from the caller sooner. + go func() { + var ( + resp *filterResponse + ok bool + ) + + for { + select { + case resp, ok = <-persistChan: + if !ok { + return + } + + case <-s.quit: + return + } + err = s.FilterDB.PutFilter( resp.blockHash, resp.filter, dbFilterType, @@ -795,8 +823,8 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, log.Tracef("Wrote filter for block %s, type %d", resp.blockHash, dbFilterType) } - } - }() + }() + } var ok bool var resultFilter *gcs.Filter From 0e994313242d67aace15ecfd4f12e3eec2ac5070 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 11 Nov 2021 10:10:12 +0200 Subject: [PATCH 4/4] rescan: wait till current or end height reached Let the rescan function wait until the filter headers have either caught up to the back end chain or until they have caught up to the specified rescan end block. This lets the rescan operation take advantage of doing batch filter fetching during rescan making the operation a lot faster since filters can be fetched in batches of 1000 instead of one at a time. --- rescan.go | 97 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/rescan.go b/rescan.go index ba373d73..45463aca 100644 --- a/rescan.go +++ b/rescan.go @@ -332,7 +332,7 @@ func rescan(chain ChainSource, options ...RescanOption) error { } } - // If the ending hash it nil, then check to see if the target + // If the ending hash is nil, then check to see if the target // height is non-nil. If not, then we'll use this to find the // stopping hash. if (ro.endBlock.Hash == chainhash.Hash{}) { @@ -409,19 +409,24 @@ func rescan(chain ChainSource, options ...RescanOption) error { // begin processing updates from the client. var updates []*updateOptions - // We'll need to ensure that the backing chain has actually caught up to - // the rescan's starting height. - bestBlock, err := chain.BestBlock() - if err != nil { - return err - } + // waitFor is a helper closure that can be used to wait on block + // notifications until the given predicate returns true. + waitFor := func(predicate func(hash chainhash.Hash, + height uint32) bool) error { + + bestBlock, err := chain.BestBlock() + if err != nil { + return err + } + + // Before subscribing to block notifications, first check if the + // predicate is not already satisfied by the current best block. + if predicate(bestBlock.Hash, uint32(bestBlock.Height)) { + return nil + } - // If it hasn't, we'll subscribe for block notifications at tip and wait - // until we receive a notification for a block with the rescan's - // starting height. - if bestBlock.Height < curStamp.Height { - log.Debugf("Waiting to catch up to the rescan start height=%d "+ - "from height=%d", curStamp.Height, bestBlock.Height) + log.Debugf("Waiting for blocks. Starting from height=%d", + bestBlock.Height) blockSubscription, err := chain.Subscribe( uint32(bestBlock.Height), @@ -430,7 +435,9 @@ func rescan(chain ChainSource, options ...RescanOption) error { return err } - waitUntilSynced: + defer blockSubscription.Cancel() + + waitForBlocks: for { select { // We'll make sure to process any updates while we're @@ -449,23 +456,24 @@ func rescan(chain ChainSource, options ...RescanOption) error { "while waiting to catch up") } - if _, ok := ntfn.(*blockntfns.Connected); !ok { - continue - } - if ntfn.Height() < uint32(curStamp.Height) { + connectedNtfn, ok := ntfn.(*blockntfns.Connected) + if !ok { continue } - break waitUntilSynced + header := connectedNtfn.Header() + if predicate( + header.BlockHash(), + connectedNtfn.Height(), + ) { + break waitForBlocks + } case <-ro.quit: - blockSubscription.Cancel() return ErrRescanExit } } - blockSubscription.Cancel() - // If any updates were queued while waiting to catch up to the // start height of the rescan, apply them now. for _, upd := range updates { @@ -476,6 +484,51 @@ func rescan(chain ChainSource, options ...RescanOption) error { return err } } + + return nil + } + + // We'll need to ensure that the backing chain has actually caught up to + // the rescan's starting height. If it hasn't, we'll subscribe for block + // notifications at tip and wait until we receive a notification for a + // block with the rescan's starting height. + log.Debugf("Waiting to catch up to the rescan start height=%d", + curStamp.Height) + + if err := waitFor(func(_ chainhash.Hash, height uint32) bool { + return height >= uint32(curStamp.Height) + }); err != nil { + return err + } + + // To ensure that we batch as many filter queries as possible, we also + // wait for the header chain to either be current or for the . + r, ok := chain.(*RescanChainSource) + if ok { + log.Debugf("Waiting for the chain source to be current or " + + "for the rescan end height to be reached.") + + if err := waitFor(func(hash chainhash.Hash, height uint32) bool { + // If the header chain is current, then there + // is no need to wait. + if r.IsCurrent() { + return true + } + + // If an end height was specified then we wait until + // the notification corresponding to that block height. + if ro.endBlock.Height > 0 && + height >= uint32(ro.endBlock.Height) { + + return true + } + + // If a block hash was specified, check if the + // notification is for that block. + return hash == ro.endBlock.Hash + }); err != nil { + return err + } } log.Debugf("Starting rescan from known block %d (%s)", curStamp.Height,