From 644c68a66f6251ec2f7f4b7dff3f80b0db577d11 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 8 Oct 2024 11:47:59 +0530 Subject: [PATCH] fix: provide option to set onlinechecker --- waku/v2/api/filter/filter.go | 25 ++++++++++++++++--------- waku/v2/api/filter/filter_manager.go | 13 ++++++++----- waku/v2/api/filter/filter_test.go | 2 +- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 020bb23f..659e1195 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -49,34 +49,41 @@ type Sub struct { errcnt int } -type subscribeParameters struct { +type FilterParameters struct { batchInterval time.Duration multiplexChannelBuffer int + checker *onlinechecker.DefaultOnlineChecker } -type SubscribeOptions func(*subscribeParameters) +type FilterOptions func(*FilterParameters) -func WithBatchInterval(t time.Duration) SubscribeOptions { - return func(params *subscribeParameters) { +func WithBatchInterval(t time.Duration) FilterOptions { + return func(params *FilterParameters) { params.batchInterval = t } } -func WithMultiplexChannelBuffer(value int) SubscribeOptions { - return func(params *subscribeParameters) { +func WithMultiplexChannelBuffer(value int) FilterOptions { + return func(params *FilterParameters) { params.multiplexChannelBuffer = value } } -func defaultOptions() []SubscribeOptions { - return []SubscribeOptions{ +func WithOnlineChecker(checker *onlinechecker.DefaultOnlineChecker) FilterOptions { + return func(params *FilterParameters) { + params.checker = checker + } +} + +func defaultOptions() []FilterOptions { + return []FilterOptions{ WithBatchInterval(5 * time.Second), WithMultiplexChannelBuffer(100), } } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *FilterParameters) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index b4933a79..b0f4f907 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -32,7 +32,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context - params *subscribeParameters + params *FilterParameters minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -61,7 +61,7 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...FilterOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx @@ -70,18 +70,21 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.envProcessor = envProcessor mgr.filterSubscriptions = make(map[string]SubDetails) mgr.node = node - mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) - mgr.node.SetOnlineChecker(mgr.onlineChecker) mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) //parsing the subscribe params only to read the batchInterval passed. - mgr.params = new(subscribeParameters) + mgr.params = new(FilterParameters) opts = append(defaultOptions(), opts...) for _, opt := range opts { opt(mgr.params) } + if mgr.params.checker == nil { + mgr.params.checker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) + } + mgr.node.SetOnlineChecker(mgr.params.checker) + mgr.filterSubBatchDuration = mgr.params.batchInterval go mgr.startFilterSubLoop() return mgr diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d40..d26fd858 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) ctx, cancel := context.WithCancel(context.Background()) s.Log.Info("About to perform API Subscribe()") - params := subscribeParameters{300 * time.Second, 1024} + params := FilterParameters{300 * time.Second, 1024} apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, ¶ms) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter)