diff --git a/neutrino.go b/neutrino.go index 6473b82b..45a7eb38 100644 --- a/neutrino.go +++ b/neutrino.go @@ -25,6 +25,7 @@ import ( "github.com/lightninglabs/neutrino/banman" "github.com/lightninglabs/neutrino/blockntfns" "github.com/lightninglabs/neutrino/cache/lru" + "github.com/lightninglabs/neutrino/chanutils" "github.com/lightninglabs/neutrino/filterdb" "github.com/lightninglabs/neutrino/headerfs" "github.com/lightninglabs/neutrino/pushtx" @@ -662,6 +663,7 @@ type ChainService struct { // nolint:maligned banStore banman.Store workManager *query.WorkManager queryDispatcher query.Dispatcher + filterBatchWriter *chanutils.BatchWriter[*filterdb.FilterData] // peerSubscribers is a slice of active peer subscriptions, that we // will notify each time a new peer is connected. @@ -750,6 +752,22 @@ func NewChainService(cfg Config) (*ChainService, error) { return nil, err } + if s.persistToDisk { + cfg := &chanutils.BatchWriterConfig[*filterdb.FilterData]{ + QueueBufferSize: chanutils.DefaultQueueSize, + MaxBatch: 1000, + DBWritesTickerDuration: time.Millisecond * 500, + Logger: log, + PutItems: s.FilterDB.PutFilters, + } + + batchWriter := chanutils.NewBatchWriter[*filterdb.FilterData]( + cfg, + ) + + s.filterBatchWriter = batchWriter + } + filterCacheSize := DefaultFilterCacheSize if cfg.FilterCacheSize != 0 { filterCacheSize = cfg.FilterCacheSize @@ -1608,6 +1626,10 @@ func (s *ChainService) Start() error { err) } + if s.persistToDisk { + s.filterBatchWriter.Start() + } + go s.connManager.Start() // Start the peer handler which in turn starts the address and block @@ -1647,6 +1669,10 @@ func (s *ChainService) Stop() error { returnErr = err } + if s.persistToDisk { + s.filterBatchWriter.Stop() + } + // Signal the remaining goroutines to quit. close(s.quit) s.wg.Wait() diff --git a/query.go b/query.go index 3e3326ee..d2a9f682 100644 --- a/query.go +++ b/query.go @@ -551,16 +551,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message, } if q.cs.persistToDisk { - filterData := &filterdb.FilterData{ + q.cs.filterBatchWriter.AddItem(&filterdb.FilterData{ Filter: filter, BlockHash: &response.BlockHash, Type: dbFilterType, - } - - err = q.cs.FilterDB.PutFilters(filterData) - if err != nil { - log.Warnf("Couldn't write filter to filterDB: %v", err) - } + }) } // We delete the entry for this filter from the headerIndex to indicate