From 7956fad0b13522008e145335c0befadd32b5c442 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Sat, 21 Sep 2024 06:36:31 +0530 Subject: [PATCH] fix: filter batch duration opt was not propagated correctly --- waku/v2/api/filter/filter.go | 15 ++++++--------- waku/v2/api/filter/filter_manager.go | 14 ++++++++++---- waku/v2/api/filter/filter_test.go | 3 ++- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index f8123704f..b8cf14550 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -27,6 +27,8 @@ func (fc FilterConfig) String() string { return string(jsonStr) } +const filterSubLoopInterval = 5 * time.Second + type Sub struct { ContentFilter protocol.ContentFilter DataCh chan *protocol.Envelope @@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions { } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { - optList := append(defaultOptions(), opts...) - params := new(subscribeParameters) - for _, opt := range optList { - opt(params) - } - +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.multiplex(subs) } } - - go sub.subscriptionLoop(params.batchInterval) + // filter subscription loop is to check if target subscriptions for a filter are active and if not + // trigger resubscribe. + go sub.subscriptionLoop(filterSubLoopInterval) return sub, nil } diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index e4b6e524d..84261882e 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context - opts []SubscribeOptions + params *subscribeParameters minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -64,7 +64,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx - mgr.opts = opts mgr.logger = logger mgr.minPeersPerFilter = minPeersPerFilter mgr.envProcessor = envProcessor @@ -72,10 +71,17 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.node = node mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) mgr.node.SetOnlineChecker(mgr.onlineChecker) - mgr.filterSubBatchDuration = 5 * time.Second mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + + //parsing the subscribe params only to read the batchInterval passed. + mgr.params = new(subscribeParameters) + opts = append(defaultOptions(), opts...) + for _, opt := range opts { + opt(mgr.params) + } + mgr.filterSubBatchDuration = mgr.params.batchInterval go mgr.startFilterSubLoop() return mgr } @@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} - sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock() diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 140dedc66..8a5f2d408 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -54,7 +54,8 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) ctx, cancel := context.WithCancel(context.Background()) s.Log.Info("About to perform API Subscribe()") - apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log) + params := subscribeParameters{300 * time.Second, 1024} + apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, ¶ms) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Log.Info("Subscribed")