From 5c5c7bdc55dab5061543e3bf1114ff83dcadb0f9 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 5 May 2023 13:45:40 +0200 Subject: [PATCH] neutrino+query: use BatchWriter for filter persistance --- neutrino.go | 26 ++++++++++++++++++++++++++ query.go | 9 ++------- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/neutrino.go b/neutrino.go index 9e1e1e1a..47ef2a9b 100644 --- a/neutrino.go +++ b/neutrino.go @@ -25,6 +25,7 @@ import ( "github.com/lightninglabs/neutrino/banman" "github.com/lightninglabs/neutrino/blockntfns" "github.com/lightninglabs/neutrino/cache/lru" + "github.com/lightninglabs/neutrino/chanutils" "github.com/lightninglabs/neutrino/filterdb" "github.com/lightninglabs/neutrino/headerfs" "github.com/lightninglabs/neutrino/pushtx" @@ -661,6 +662,7 @@ type ChainService struct { // nolint:maligned broadcaster *pushtx.Broadcaster banStore banman.Store workManager query.WorkManager + filterBatchWriter *chanutils.BatchWriter[*filterdb.FilterData] // peerSubscribers is a slice of active peer subscriptions, that we // will notify each time a new peer is connected. @@ -748,6 +750,22 @@ func NewChainService(cfg Config) (*ChainService, error) { return nil, err } + if s.persistToDisk { + cfg := &chanutils.BatchWriterConfig[*filterdb.FilterData]{ + QueueBufferSize: chanutils.DefaultQueueSize, + MaxBatch: 1000, + DBWritesTickerDuration: time.Millisecond * 500, + Logger: log, + PutItems: s.FilterDB.PutFilters, + } + + batchWriter := chanutils.NewBatchWriter[*filterdb.FilterData]( + cfg, + ) + + s.filterBatchWriter = batchWriter + } + filterCacheSize := DefaultFilterCacheSize if cfg.FilterCacheSize != 0 { filterCacheSize = cfg.FilterCacheSize @@ -1606,6 +1624,10 @@ func (s *ChainService) Start() error { err) } + if s.persistToDisk { + s.filterBatchWriter.Start() + } + go s.connManager.Start() // Start the peer handler which in turn starts the address and block @@ -1645,6 +1667,10 @@ func (s *ChainService) Stop() error { returnErr = err } + if s.persistToDisk { + s.filterBatchWriter.Stop() + } + // Signal the remaining goroutines to quit. close(s.quit) s.wg.Wait() diff --git a/query.go b/query.go index 6913e407..fb5deb13 100644 --- a/query.go +++ b/query.go @@ -534,16 +534,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message, } if q.cs.persistToDisk { - filterData := &filterdb.FilterData{ + q.cs.filterBatchWriter.AddItem(&filterdb.FilterData{ Filter: filter, BlockHash: &response.BlockHash, Type: dbFilterType, - } - - err = q.cs.FilterDB.PutFilters(filterData) - if err != nil { - log.Warnf("Couldn't write filter to filterDB: %v", err) - } + }) } // We delete the entry for this filter from the headerIndex to indicate